diff --git a/package.v2.json b/package.v2.json index 29edbc2..6dd6a66 100644 --- a/package.v2.json +++ b/package.v2.json @@ -513,11 +513,12 @@ "name": "美剧生词标注", "description": "根据CEFR等级,为英语影视剧标注高级词汇。", "labels": "英语", - "version": "1.1.1", + "version": "1.1.2", "icon": "LexiAnnot.png", "author": "wumode", "level": 1, "history": { + "v1.1.2": "使用子进程避免 spaCy 模型常驻内存", "v1.1.1": "添加任务页面; 改进 spaCy 模型加载逻辑", "v1.1.0": "支持考试词汇标注; 优化分词处理; 修复错误", "v1.0.1": "合并连字符词; 避免ARM平台依赖问题", diff --git a/plugins.v2/lexiannot/__init__.py b/plugins.v2/lexiannot/__init__.py index 2b5d51b..ab09356 100644 --- a/plugins.v2/lexiannot/__init__.py +++ b/plugins.v2/lexiannot/__init__.py @@ -16,11 +16,9 @@ from pathlib import Path from typing import Any, Dict, List, Tuple, Optional, Union, Type, TypeVar import pysubs2 -from pysubs2 import SSAFile, SSAEvent import pymediainfo from langdetect import detect -from spacy.tokenizer import Tokenizer -import spacy +from pysubs2 import SSAFile, SSAEvent from app.core.config import settings from app.helper.directory import DirectoryHelper @@ -36,7 +34,7 @@ from app.schemas import TransferInfo from app.schemas.types import EventType from app.core.context import MediaInfo from app.plugins.lexiannot.query_gemini import DialogueTranslationTask, VocabularyTranslationTask, Vocabulary, Context - +from app.plugins.lexiannot.spacyworker import SpacyWorker T = TypeVar('T', VocabularyTranslationTask, DialogueTranslationTask) @@ -87,7 +85,7 @@ class LexiAnnot(_PluginBase): # 插件图标 plugin_icon = "LexiAnnot.png" # 插件版本 - plugin_version = "1.1.1" + plugin_version = "1.1.2" # 插件作者 plugin_author = "wumode" # 作者主页 @@ -141,7 +139,6 @@ class LexiAnnot(_PluginBase): _config_updating_lock: threading.Lock = threading.Lock() _tasks_lock: threading.RLock = threading.RLock() _tasks: Dict[str, Task] = {} - import spacy def init_plugin(self, config=None): self.stop_service() @@ -191,6 +188,16 @@ class LexiAnnot(_PluginBase): with self._tasks_lock: self._tasks = tasks if self._enabled: + # 清空任务队列,避免残留对象 + while not self._task_queue.empty(): + self._task_queue.get() + self._task_queue.task_done() + # 从字典中恢复队列 + with self._tasks_lock: + for task_id, task in self._tasks.items(): + if task.status == TaskStatus.PENDING: + self._task_queue.put(task) + self._query_gemini_script = str(settings.ROOT_PATH / "app" / "plugins" / "lexiannot" / "query_gemini.py") self._shutdown_event = threading.Event() @@ -1135,7 +1142,7 @@ class LexiAnnot(_PluginBase): """ 后台线程:处理任务队列 """ - logger.debug("👷 Worker thread started.") + logger.debug(f"👷 Worker thread {threading.get_ident():#x} started.") self.__load_data() if not self._loaded: @@ -1152,10 +1159,7 @@ class LexiAnnot(_PluginBase): if not self._gemini_apikey: logger.warn(f"未提供GEMINI APIKEY") self._gemini_available = False - with self._tasks_lock: - for task_id, task in self._tasks.items(): - if task.status == TaskStatus.PENDING: - self._task_queue.put(task) + while not self._shutdown_event.is_set(): try: task = self._task_queue.get(timeout=1) @@ -1164,7 +1168,8 @@ class LexiAnnot(_PluginBase): tokens = self._total_token_count try: task.status = TaskStatus.RUNNING - task.status = self.__process_file(task.video_path) + with SpacyWorker(self._spacy_model) as worker: + task.status = self.__process_file(task.video_path, worker) except Exception as e: task.status = TaskStatus.FAILED logger.error(f"处理 {task} 出错: {e}") @@ -1175,9 +1180,9 @@ class LexiAnnot(_PluginBase): self.save_tasks() except queue.Empty: continue - logger.debug("🛑 Worker received shutdown signal, exiting...") + logger.debug(f"🛑 Worker thread {threading.get_ident():#x} received shutdown signal, exiting...") - def __process_file(self, path: str) -> TaskStatus: + def __process_file(self, path: str, spacy_worker: SpacyWorker) -> TaskStatus: """ 处理视频文件 """ @@ -1187,23 +1192,7 @@ class LexiAnnot(_PluginBase): if not lexicon: logger.error(f"字典加载失败") return TaskStatus.FAILED - try: - # 为减少内存占用,只在处理时加载 spaCy 模型 - nlp = LexiAnnot.__load_nlp(self._spacy_model) - infixes = list(nlp.Defaults.infixes) - infixes = [i for i in infixes if '-' not in i] - # 使用修改后的正则表达式重新创建 tokenizer - infix_re = spacy.util.compile_infix_regex(infixes) - nlp.tokenizer = Tokenizer( - nlp.vocab, - prefix_search=nlp.tokenizer.prefix_search, - suffix_search=nlp.tokenizer.suffix_search, - infix_finditer=infix_re.finditer, - token_match=nlp.tokenizer.token_match - ) - except Exception as e: - logger.error(f"spaCy 模型 {self._spacy_model} 加载失败: {e}") - return TaskStatus.FAILED + video = Path(path) if video.suffix.lower() not in settings.RMT_MEDIAEXT: return TaskStatus.CANCELED @@ -1234,7 +1223,8 @@ class LexiAnnot(_PluginBase): ass_subtitle = LexiAnnot.set_srt_style(ass_subtitle) ass_subtitle = self.__set_style(ass_subtitle) ass_subtitle = self.process_subtitles(ass_subtitle, lexicon.get('cefr'), lexicon.get('coca20k'), - lexicon.get('examinations'),lexicon.get('swear_words'), nlp) + lexicon.get('examinations'), lexicon.get('swear_words'), + spacy_worker) if self._shutdown_event.is_set(): return TaskStatus.CANCELED if ass_subtitle: @@ -1284,11 +1274,6 @@ class LexiAnnot(_PluginBase): return None return lexicon - @staticmethod - @cached(maxsize=1, ttl=3600*6) - def __load_nlp(model: str) -> spacy.Language: - return spacy.load(model) - def __retrieve_lexicon_online(self, version: str) -> Optional[Dict[str, Any]]: logger.info('开始下载词典文件...') lexicon_files = ['cefr', 'coca20k', 'swear_words', 'examinations'] @@ -1315,11 +1300,13 @@ class LexiAnnot(_PluginBase): """ 测试插件数据加载 """ + logger.info(f"加载 spaCy 模型 {self._spacy_model}...") try: - logger.info(f"加载 spaCy 模型 {self._spacy_model}...") - nlp = LexiAnnot.__load_nlp(self._spacy_model) - except OSError: - nlp = LexiAnnot.__load_spacy_model(self._spacy_model) + with SpacyWorker(self._spacy_model): + nlp = True + except RuntimeError: + nlp = LexiAnnot.__download_spacy_model(self._spacy_model) + lexicon = self.__load_lexicon_from_local() latest = self.__load_lexicon_version() or '0.0.0' if not lexicon or StringUtils.compare_version(lexicon.get('version'), '<', latest): @@ -1333,27 +1320,28 @@ class LexiAnnot(_PluginBase): logger.info(f"当前词典文件版本: {lexicon.get('version')}") @staticmethod - def __load_spacy_model(model_name: str): + def __download_spacy_model(model_name: str) -> bool: + logger.info(f"下载 spaCy 模型 {model_name}...") try: - logger.info(f"下载 spaCy 模型 {model_name}...") subprocess.run( [sys.executable, "-m", "spacy", "download", model_name], capture_output=True, text=True, check=True ) - nlp = LexiAnnot.__load_nlp(model_name) - logger.info(f"spaCy 模型 '{model_name}' 加载成功!") - return nlp + with SpacyWorker(model_name): + nlp = True except subprocess.CalledProcessError as e: logger.error(f"下载 spaCy 模型 '{model_name}' 失败。") logger.error(f"命令返回非零退出码:{e.returncode}") logger.error(f"Stdout:\n{e.stdout}") logger.error(f"Stderr:\n{e.stderr}") - return None + return False except Exception as e: logger.error(f"下载或加载 spaCy 模型时发生意外错误:{e}") - return None + return False + logger.info(f"spaCy 模型 '{model_name}' 加载成功!") + return nlp @eventmanager.register(EventType.TransferComplete) def check_media(self, event: Event): @@ -1817,7 +1805,7 @@ class LexiAnnot(_PluginBase): coca20k_lexicon: Dict[str, Any], exams_lexicon: Dict[str, Any], swear_words: List[str], - nlp: spacy.Language): + spacy_worker: SpacyWorker): def __replace_with_spaces(_text): """ @@ -1842,20 +1830,20 @@ class LexiAnnot(_PluginBase): text = text_raw.replace('\n', ' ') text = __replace_with_spaces(text) new_vocab = [] - doc = nlp(text) + doc = spacy_worker.submit(text) last_end_pos = 0 lemma_to_query = [] for token in doc: - if len(token.text) == 1: + if len(token.get('text')) == 1: continue - if token.lemma_ in swear_words: + if token.get('lemma_') in swear_words: continue - if token.pos_ not in ('NOUN', 'AUX', 'VERB', 'ADJ', 'ADV', 'ADP', 'CCONJ', 'SCONJ'): + if token.get('pos_') not in ('NOUN', 'AUX', 'VERB', 'ADJ', 'ADV', 'ADP', 'CCONJ', 'SCONJ'): continue - striped = token.lemma_.strip('-[') + striped = token.get('lemma_').strip('-[') if any(p.match(striped) for p in compiled_patterns): continue - cefr = LexiAnnot.get_cefr_by_spacy(striped, token.pos_, cefr_lexicon) + cefr = LexiAnnot.get_cefr_by_spacy(striped, token.get('pos_'), cefr_lexicon) if cefr and cefr in simple_vocabulary: continue res_of_coco = LexiAnnot.query_coca20k(striped, coca20k_lexicon) @@ -1869,7 +1857,7 @@ class LexiAnnot(_PluginBase): continue else: lemma_to_query.append(striped) - striped_text = token.text.strip('-*[') + striped_text = token.get('text').strip('-*[') start_pos = text.find(striped_text, last_end_pos) end_pos = start_pos + len(striped_text) phonetics = '' @@ -1889,7 +1877,7 @@ class LexiAnnot(_PluginBase): pos_defs = res_of_coco.get('pos_defs') or [] last_end_pos = end_pos new_vocab.append({'start': start_pos, 'end': end_pos, 'text': striped_text, 'lemma': striped, - 'pos': token.pos_, 'cefr': cefr, 'Chinese': '', 'phonetics': phonetics, + 'pos': token.get('pos_'), 'cefr': cefr, 'Chinese': '', 'phonetics': phonetics, 'pos_defs': pos_defs, 'exam_tags': exam_tags}) line_data['new_vocab'] = new_vocab # 查询词汇翻译 @@ -1993,7 +1981,7 @@ class LexiAnnot(_PluginBase): coca20k_lexicon: Dict[str, Any], exams_lexicon: Dict[str, Any], swear_words: List[str], - nlp: spacy.Language) -> Optional[SSAFile]: + spacy_worker: SpacyWorker) -> Optional[SSAFile]: """ 处理字幕内容,标记词汇并添加翻译。 """ @@ -2029,7 +2017,7 @@ class LexiAnnot(_PluginBase): main_dialogue[index] = dialogue index += 1 lines_to_process = self.__process_by_ai(lines_to_process, cefr_lexicon, coca20k_lexicon, exams_lexicon, - swear_words, nlp) + swear_words, spacy_worker) # 在原字幕添加标注 main_style_fs = ass_file.styles[main_style].fontsize diff --git a/plugins.v2/lexiannot/spacyworker.py b/plugins.v2/lexiannot/spacyworker.py new file mode 100644 index 0000000..df6e437 --- /dev/null +++ b/plugins.v2/lexiannot/spacyworker.py @@ -0,0 +1,84 @@ +from multiprocessing import Process, Queue +from typing import Dict, List + +import spacy +from spacy.tokenizer import Tokenizer + +from app.core.cache import cached +from app.log import logger + + +class SpacyWorker: + + def __init__(self, model='en_core_web_sm'): + self.task_q = Queue() + self.result_q = Queue() + self.status_q = Queue() + self.model = model + + # 启动子进程 + logger.info(f"正在启动 SpacyWorker 子进程...") + self.proc = Process(target=self.run, args=(self.model,)) + self.proc.start() + + # 等待子进程返回模型加载状态 + status, info = self.status_q.get() + if status == 'error': + self.proc.join() + raise RuntimeError(f"spaCy 模型加载失败: {info}") + else: + logger.info(f"spaCy 模型 `{self.model}` 加载成功") + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.close() + + def run(self, model: str): + try: + nlp = SpacyWorker.load_nlp(model) + infixes = list(nlp.Defaults.infixes) + infixes = [i for i in infixes if '-' not in i] + infix_re = spacy.util.compile_infix_regex(infixes) + nlp.tokenizer = Tokenizer( + nlp.vocab, + prefix_search=nlp.tokenizer.prefix_search, + suffix_search=nlp.tokenizer.suffix_search, + infix_finditer=infix_re.finditer, + token_match=nlp.tokenizer.token_match + ) + except Exception as e: + self.status_q.put(('error', str(e))) + return + + # 告诉主进程加载成功 + self.status_q.put(('ok', None)) + + while True: + text = self.task_q.get() + if text is None: + break + doc = nlp(text) + self.result_q.put([{'text': token.text, 'pos_': token.pos_, 'lemma_': token.lemma_} for token in doc]) + + @staticmethod + @cached(maxsize=1, ttl=3600 * 6) + def load_nlp(model: str) -> spacy.Language: + return spacy.load(model) + + def submit(self, text: str) -> List[Dict[str, str]]: + """ + 提交任务并等待结果 + """ + self.task_q.put(text) + return self.result_q.get() + + def close(self): + """ + 关闭子进程 + """ + if self.proc.is_alive(): + self.task_q.put(None) + self.proc.join() + logger.info(f"SpacyWorker 子进程退出")