update(LexiAnnot): 添加任务页面

This commit is contained in:
wumode
2025-08-22 17:03:07 +08:00
parent 8e1d336250
commit cbf541992f
2 changed files with 268 additions and 32 deletions

View File

@@ -510,11 +510,12 @@
"name": "美剧生词标注",
"description": "根据CEFR等级为英语影视剧标注高级词汇。",
"labels": "英语",
"version": "1.1.0",
"version": "1.1.1",
"icon": "LexiAnnot.png",
"author": "wumode",
"level": 1,
"history": {
"v1.1.1": "添加任务页面; 改进 spaCy 模型加载逻辑",
"v1.1.0": "支持考试词汇标注; 优化分词处理; 修复错误",
"v1.0.1": "合并连字符词; 避免ARM平台依赖问题",
"v1.0": "新增LexiAnnot"

View File

@@ -1,14 +1,17 @@
from collections import Counter
from datetime import datetime
from enum import Enum
import os
import re
import sys
import json
import queue
import re
import shutil
import subprocess
import sys
import time
import threading
import queue
import shutil
from typing import Any, List, Dict, Tuple, Optional, Union, Type, TypeVar
from typing import Any, Dict, List, Tuple, Optional, Union, Type, TypeVar
import uuid
import venv
from pathlib import Path
@@ -20,6 +23,7 @@ import spacy
from spacy.tokenizer import Tokenizer
from app.core.config import settings
from app.helper.directory import DirectoryHelper
from app.log import logger
from app.plugins import _PluginBase
from app.core.cache import cached
@@ -33,9 +37,48 @@ from app.schemas.types import EventType
from app.core.context import MediaInfo
from app.plugins.lexiannot.query_gemini import DialogueTranslationTask, VocabularyTranslationTask, Vocabulary, Context
T = TypeVar('T', VocabularyTranslationTask, DialogueTranslationTask)
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELED = "canceled"
IGNORED = "ignored"
class Task:
def __init__(self, video_path: str,
task_id: Optional[str] = None,
status: TaskStatus = TaskStatus.PENDING,
add_time: Optional[datetime] = None,
complete_time: Optional[datetime] = None,
tokens_used: int = 0):
self.task_id = task_id or str(uuid.uuid4())
self.video_path = video_path
self.status: TaskStatus = status
self.add_time: Optional[datetime] = add_time
self.complete_time: Optional[datetime] = complete_time
self.tokens_used: int = tokens_used
def __repr__(self):
return f"<Task {self.task_id[:8]} status={self.status} video={self.video_path}>"
def to_dict(self):
return {
"task_id": self.task_id,
"video_path": self.video_path,
"status": self.status.value,
"add_time": self.add_time.isoformat() if self.add_time else None,
"complete_time": self.complete_time.isoformat() if self.complete_time else None,
"tokens_used": self.tokens_used
}
class LexiAnnot(_PluginBase):
# 插件名称
plugin_name = "美剧生词标注"
@@ -44,7 +87,7 @@ class LexiAnnot(_PluginBase):
# 插件图标
plugin_icon = "LexiAnnot.png"
# 插件版本
plugin_version = "1.1.0"
plugin_version = "1.1.1"
# 插件作者
plugin_author = "wumode"
# 作者主页
@@ -81,13 +124,13 @@ class LexiAnnot(_PluginBase):
_exam_tags: List[str] = []
_spacy_model: str = ''
_delete_data: bool = False
_libraries: List[str] = []
# protected variables
_lexicon_repo = 'https://raw.githubusercontent.com/wumode/LexiAnnot/'
_worker_thread = None
_task_queue = None
_task_queue: queue.Queue[Task] = queue.Queue()
_shutdown_event = None
_client = None
_total_token_count = 0
_venv_python = None
_query_gemini_script = ''
@@ -95,11 +138,11 @@ class LexiAnnot(_PluginBase):
_accent_color_rgb = None
_color_alpha = 0
_loaded = False
_config_updating_lock: Optional[threading.Lock] = None
_config_updating_lock: threading.Lock = threading.Lock()
_tasks_lock: threading.RLock = threading.RLock()
_tasks: Dict[str, Task] = {}
def init_plugin(self, config=None):
self._task_queue = queue.Queue()
self._config_updating_lock = threading.Lock()
self.stop_service()
if config:
self._enabled = config.get("enabled")
@@ -127,7 +170,10 @@ class LexiAnnot(_PluginBase):
self._spacy_model = config.get("spacy_model") or 'en_core_web_sm'
self._exam_tags = config.get("exam_tags") or []
self._delete_data = config.get("delete_data") or False
self._libraries = config.get("libraries") or []
libraries = [library.name for library in DirectoryHelper().get_library_dirs()]
self._libraries = [library for library in self._libraries if library in libraries]
self._accent_color_rgb = LexiAnnot.hex_to_rgb(self._accent_color) or (255, 255, 0)
self._color_alpha = int(self._opacity) if self._opacity and len(self._opacity) else 0
if self._delete_data:
@@ -139,6 +185,10 @@ class LexiAnnot(_PluginBase):
self.delete_data()
self._delete_data = False
self._loaded = False
tasks = self.load_tasks()
with self._tasks_lock:
self._tasks = tasks
if self._enabled:
self._query_gemini_script = str(settings.ROOT_PATH / "app" / "plugins" / "lexiannot" / "query_gemini.py")
@@ -158,6 +208,8 @@ class LexiAnnot(_PluginBase):
"""
拼装插件配置页面需要返回两块数据1、页面配置2、数据结构
"""
library_options = [{'title': library.name,'value': library.name}
for library in DirectoryHelper().get_library_dirs()]
return [
{
'component': 'VForm',
@@ -742,6 +794,34 @@ class LexiAnnot(_PluginBase):
}
]
},
{
'component': 'VRow',
'props': {
'style': {
'margin-top': '0px'
}
},
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VSelect',
'props': {
'chips': True,
'multiple': True,
'model': 'libraries',
'label': '监控入库',
'items': library_options
}
}
]
}
]
},
{
'component': 'VRow',
'props': {
@@ -835,14 +915,90 @@ class LexiAnnot(_PluginBase):
"opacity": '0',
"spacy_model": 'en_core_web_sm',
"exam_tags": [],
"delete_data": False
"delete_data": False,
"libraries": []
}
def get_api(self) -> List[Dict[str, Any]]:
pass
def get_page(self) -> List[dict]:
pass
headers = [
{'title': '添加时间', 'key': 'add_time', 'sortable': True},
{'title': '视频文件', 'key': 'video_path', 'sortable': True},
{'title': '消耗 Tokens', 'key': 'tokens_used', 'sortable': True},
{'title': '完成时间', 'key': 'complete_time', 'sortable': True},
{'title': '任务状态', 'key': 'status', 'sortable': True},
]
items = []
with self._tasks_lock:
sorted_tasks = sorted(
self._tasks.items(),
key=lambda x: x[1].add_time,
reverse=True
)
status_map = {
TaskStatus.PENDING: "等待中",
TaskStatus.RUNNING: "处理中",
TaskStatus.COMPLETED: "已完成",
TaskStatus.IGNORED: "已忽略",
TaskStatus.FAILED: "失败",
TaskStatus.CANCELED: "已取消"
}
for task_id, task in sorted_tasks:
status_text = status_map.get(task.status, task.status)
item = {
'task_id': task_id,
'status': status_text,
'video_path': task.video_path,
'add_time': task.add_time.strftime("%Y-%m-%d %H:%M:%S") if task.add_time else '-',
'tokens_used': task.tokens_used,
'complete_time': task.complete_time.strftime("%Y-%m-%d %H:%M:%S") if task.complete_time else '-',
}
items.append(item)
return [
{
'component': 'VRow',
'props': {
'style': {
'overflow': 'hidden',
}
},
'content': [
{
'component': 'VRow',
'props': {
'class': 'd-none d-sm-block',
},
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VDataTableVirtual',
'props': {
'class': 'text-sm',
'headers': headers,
'items': items,
'height': '30rem',
'density': 'compact',
'fixed-header': True,
'hide-no-data': True,
'hover': True
}
}
]
}
]
}
]
}
]
@staticmethod
def get_command() -> List[Dict[str, Any]]:
@@ -877,6 +1033,7 @@ class LexiAnnot(_PluginBase):
logger.debug(" No running worker thread to stop.")
def delete_data(self):
# 删除词典
data_path = self.get_data_path()
lexicon_path = data_path / 'lexicon.json'
try:
@@ -886,7 +1043,9 @@ class LexiAnnot(_PluginBase):
pass
except Exception as e:
logger.error(f"词典 {lexicon_path} 删除失败: {e}")
self.__load_lexicon_from_local.cache_clear()
# 删除虚拟环境
venv_dir = data_path / "venv_genai"
if os.path.exists(venv_dir):
try:
@@ -895,12 +1054,49 @@ class LexiAnnot(_PluginBase):
except Exception as e:
logger.error(f"虚拟环境 {venv_dir} 删除失败: {e}")
# 删除任务记录
with self._tasks_lock:
self._tasks = {}
self.save_tasks()
def load_tasks(self) -> Dict[str, Task]:
raw_tasks = self.get_data('tasks') or {}
tasks = {}
for task_id, task_dict in raw_tasks.items():
try:
task = Task(
video_path=task_dict['video_path'],
task_id=task_dict['task_id'],
status=TaskStatus(task_dict['status']),
add_time=datetime.fromisoformat(task_dict['add_time']) if task_dict['add_time'] else None,
complete_time=datetime.fromisoformat(task_dict['complete_time'])
if task_dict['complete_time'] else None,
tokens_used=task_dict['tokens_used'],
)
tasks[task_id] = task
except Exception as e:
logger.error(f"加载任务失败:{e}")
return tasks
def save_tasks(self):
with self._tasks_lock:
tasks_dict = {task_id: task.to_dict() for task_id, task in self._tasks.items()}
self.save_data("tasks", tasks_dict)
def add_task(self, video_file: str):
task = Task(video_path=video_file, add_time=datetime.now())
with self._tasks_lock:
self._tasks[task.task_id] = task
self._task_queue.put(task)
self.save_tasks()
logger.info(f"加入任务队列: {video_file}")
def add_media_file(self, path: str):
"""
添加新任务
"""
if not self._shutdown_event.is_set():
self._task_queue.put(path)
self.add_task(path)
else:
raise RuntimeError("Plugin is shutting down. Cannot add new tasks.")
@@ -930,6 +1126,8 @@ class LexiAnnot(_PluginBase):
'opacity': self._opacity,
'spacy_model': self._spacy_model,
'exam_tags': self._exam_tags,
'delete_data': self._delete_data,
'libraries': self._libraries
})
def __process_tasks(self):
@@ -953,29 +1151,44 @@ class LexiAnnot(_PluginBase):
if not self._gemini_apikey:
logger.warn(f"未提供GEMINI APIKEY")
self._gemini_available = False
with self._tasks_lock:
for task_id, task in self._tasks.items():
if task.status == 'pending':
self._task_queue.put(task)
while not self._shutdown_event.is_set():
try:
task = self._task_queue.get(timeout=1) # 最多等待1秒
task = self._task_queue.get(timeout=1)
if task is None:
continue
self.__process_file(task)
tokens = self._total_token_count
try:
task.status = TaskStatus.RUNNING
task.status = self.__process_file(task.video_path)
except Exception as e:
task.status = TaskStatus.FAILED
logger.error(f"处理 {task} 出错: {e}")
finally:
self._task_queue.task_done()
task.complete_time = datetime.now()
task.tokens_used = self._total_token_count - tokens
self.save_tasks()
except queue.Empty:
continue
logger.debug("🛑 Worker received shutdown signal, exiting...")
def __process_file(self, path: str):
def __process_file(self, path: str) -> TaskStatus:
"""
处理视频文件
"""
if not self._loaded:
return
return TaskStatus.FAILED
lexicon = self.__load_lexicon_from_local()
if not lexicon:
logger.error(f"字典加载失败")
return
return TaskStatus.FAILED
try:
# 为减少内存占用,只在处理时加载 spaCy 模型
nlp = spacy.load(self._spacy_model)
nlp = LexiAnnot.__load_nlp(self._spacy_model)
infixes = list(nlp.Defaults.infixes)
infixes = [i for i in infixes if '-' not in i]
# 使用修改后的正则表达式重新创建 tokenizer
@@ -989,17 +1202,17 @@ class LexiAnnot(_PluginBase):
)
except Exception as e:
logger.error(f"spaCy 模型 {self._spacy_model} 加载失败: {e}")
return
return TaskStatus.FAILED
video = Path(path)
if video.suffix.lower() not in settings.RMT_MEDIAEXT:
return
return TaskStatus.CANCELED
if not video.exists() or not video.is_file():
logger.warn(f"文件 {str(video)} 不存在, 跳过")
return
return TaskStatus.FAILED
subtitle = video.with_suffix(".en.ass")
if subtitle.exists():
logger.warn(f"字幕文件 ({subtitle}) 已存在, 跳过")
return
return TaskStatus.IGNORED
logger.info(f"📂 Processing file: {path}")
if self._send_notify:
message = f"正在处理文件: {path}"
@@ -1014,7 +1227,7 @@ class LexiAnnot(_PluginBase):
logger.info(f'提取到 {len(embedded_subtitles)} 条英语文本字幕')
for embedded_subtitle in embedded_subtitles:
if self._shutdown_event.is_set():
return
return TaskStatus.CANCELED
ass_subtitle = pysubs2.SSAFile.from_string(embedded_subtitle['subtitle'], format_='ass')
if embedded_subtitle.get('codec_id') == 'S_TEXT/UTF8':
ass_subtitle = LexiAnnot.set_srt_style(ass_subtitle)
@@ -1022,7 +1235,7 @@ class LexiAnnot(_PluginBase):
ass_subtitle = self.process_subtitles(ass_subtitle, lexicon.get('cefr'), lexicon.get('coca20k'),
lexicon.get('examinations'),lexicon.get('swear_words'), nlp)
if self._shutdown_event.is_set():
return
return TaskStatus.CANCELED
if ass_subtitle:
try:
ass_subtitle.save(str(subtitle))
@@ -1044,6 +1257,8 @@ class LexiAnnot(_PluginBase):
mtype=NotificationType.Plugin,
text=f"{ret_message}")
return TaskStatus.COMPLETED
@cached(maxsize=1000, ttl=1800)
def __load_lexicon_version(self) -> Optional[str]:
logger.info(f"正在检查远程词典文件版本...")
@@ -1053,6 +1268,7 @@ class LexiAnnot(_PluginBase):
return None
return version.strip()
@cached(maxsize=1, ttl=3600*6)
def __load_lexicon_from_local(self) -> Optional[Dict[str, Any]]:
data_path = self.get_data_path()
lexicon = {}
@@ -1067,6 +1283,11 @@ class LexiAnnot(_PluginBase):
return None
return lexicon
@staticmethod
@cached(maxsize=1, ttl=3600*6)
def __load_nlp(model: str) -> spacy.Language:
return spacy.load(model)
def __retrieve_lexicon_online(self, version: str) -> Optional[Dict[str, Any]]:
logger.info('开始下载词典文件...')
lexicon_files = ['cefr', 'coca20k', 'swear_words', 'examinations']
@@ -1094,7 +1315,8 @@ class LexiAnnot(_PluginBase):
测试插件数据加载
"""
try:
nlp = spacy.load(self._spacy_model)
logger.info(f"加载 spaCy 模型 {self._spacy_model}...")
nlp = LexiAnnot.__load_nlp(self._spacy_model)
except OSError:
nlp = LexiAnnot.__load_spacy_model(self._spacy_model)
lexicon = self.__load_lexicon_from_local()
@@ -1112,13 +1334,14 @@ class LexiAnnot(_PluginBase):
@staticmethod
def __load_spacy_model(model_name: str):
try:
logger.info(f"下载 spaCy 模型 {model_name}...")
subprocess.run(
[sys.executable, "-m", "spacy", "download", model_name],
capture_output=True,
text=True,
check=True
)
nlp = spacy.load(model_name)
nlp = LexiAnnot.__load_nlp(model_name)
logger.info(f"spaCy 模型 '{model_name}' 加载成功!")
return nlp
except subprocess.CalledProcessError as e:
@@ -1143,9 +1366,20 @@ class LexiAnnot(_PluginBase):
transfer_info: TransferInfo = event_info.get("transferinfo")
if not transfer_info or not transfer_info.target_diritem or not transfer_info.target_diritem.path:
return
# 检查是否为选择的媒体库
in_libraries = False
libraries = {library.name: library.library_path for library in DirectoryHelper().get_library_dirs()}
for library_name in self._libraries:
if Path(transfer_info.target_diritem.path).is_relative_to(Path(libraries[library_name])):
in_libraries = True
break
if not in_libraries:
return
mediainfo: MediaInfo = event_info.get("mediainfo")
if self._english_only:
if mediainfo.original_language != 'en':
if mediainfo.original_language and mediainfo.original_language != 'en':
logger.info(f"原始语言 ({mediainfo.original_language}) 不为英语, 跳过 {mediainfo.title} ")
return
for new_path in transfer_info.file_list_new:
@@ -1488,7 +1722,7 @@ class LexiAnnot(_PluginBase):
subtitle_stream_index = track.stream_identifier # MediaInfo 的 stream_id 从 1 开始ffmpeg 从 0 开始
subtitle = LexiAnnot.__extract_subtitle(video_path, subtitle_stream_index, ffmpeg)
if subtitle:
subtitles.append({'title': track.title, 'subtitle': subtitle, 'codec_id': track.codec_id,
subtitles.append({'title': track.title or '', 'subtitle': subtitle, 'codec_id': track.codec_id,
'stream_id': subtitle_stream_index})
if subtitles:
return subtitles
@@ -1536,7 +1770,7 @@ class LexiAnnot(_PluginBase):
temperature: float
) -> List[T]:
input_dict = {
'tasks': [task.dict() for task in tasks], # 保证是可序列化格式
'tasks': [task.dict() for task in tasks],
'params': {
'api_key': api_key,
'system_instruction': system_instruction,
@@ -1570,6 +1804,7 @@ class LexiAnnot(_PluginBase):
return tasks
try:
self._total_token_count += response['data']['total_token_count'] or 0
return [task_type(**task_data) for task_data in response["data"]["tasks"]]
except Exception as e:
logger.warning(f"Failed to reconstruct tasks: {str(e)}")