update(LexiAnnot): 避免spaCy模型常驻内存

This commit is contained in:
wumode
2025-08-29 15:34:29 +08:00
parent 6019cf92ac
commit 21aec36ea5
3 changed files with 133 additions and 60 deletions

View File

@@ -513,11 +513,12 @@
"name": "美剧生词标注",
"description": "根据CEFR等级为英语影视剧标注高级词汇。",
"labels": "英语",
"version": "1.1.1",
"version": "1.1.2",
"icon": "LexiAnnot.png",
"author": "wumode",
"level": 1,
"history": {
"v1.1.2": "使用子进程避免 spaCy 模型常驻内存",
"v1.1.1": "添加任务页面; 改进 spaCy 模型加载逻辑",
"v1.1.0": "支持考试词汇标注; 优化分词处理; 修复错误",
"v1.0.1": "合并连字符词; 避免ARM平台依赖问题",

View File

@@ -16,11 +16,9 @@ from pathlib import Path
from typing import Any, Dict, List, Tuple, Optional, Union, Type, TypeVar
import pysubs2
from pysubs2 import SSAFile, SSAEvent
import pymediainfo
from langdetect import detect
from spacy.tokenizer import Tokenizer
import spacy
from pysubs2 import SSAFile, SSAEvent
from app.core.config import settings
from app.helper.directory import DirectoryHelper
@@ -36,7 +34,7 @@ from app.schemas import TransferInfo
from app.schemas.types import EventType
from app.core.context import MediaInfo
from app.plugins.lexiannot.query_gemini import DialogueTranslationTask, VocabularyTranslationTask, Vocabulary, Context
from app.plugins.lexiannot.spacyworker import SpacyWorker
T = TypeVar('T', VocabularyTranslationTask, DialogueTranslationTask)
@@ -87,7 +85,7 @@ class LexiAnnot(_PluginBase):
# 插件图标
plugin_icon = "LexiAnnot.png"
# 插件版本
plugin_version = "1.1.1"
plugin_version = "1.1.2"
# 插件作者
plugin_author = "wumode"
# 作者主页
@@ -141,7 +139,6 @@ class LexiAnnot(_PluginBase):
_config_updating_lock: threading.Lock = threading.Lock()
_tasks_lock: threading.RLock = threading.RLock()
_tasks: Dict[str, Task] = {}
import spacy
def init_plugin(self, config=None):
self.stop_service()
@@ -191,6 +188,16 @@ class LexiAnnot(_PluginBase):
with self._tasks_lock:
self._tasks = tasks
if self._enabled:
# 清空任务队列,避免残留对象
while not self._task_queue.empty():
self._task_queue.get()
self._task_queue.task_done()
# 从字典中恢复队列
with self._tasks_lock:
for task_id, task in self._tasks.items():
if task.status == TaskStatus.PENDING:
self._task_queue.put(task)
self._query_gemini_script = str(settings.ROOT_PATH / "app" / "plugins" / "lexiannot" / "query_gemini.py")
self._shutdown_event = threading.Event()
@@ -1135,7 +1142,7 @@ class LexiAnnot(_PluginBase):
"""
后台线程:处理任务队列
"""
logger.debug("👷 Worker thread started.")
logger.debug(f"👷 Worker thread {threading.get_ident():#x} started.")
self.__load_data()
if not self._loaded:
@@ -1152,10 +1159,7 @@ class LexiAnnot(_PluginBase):
if not self._gemini_apikey:
logger.warn(f"未提供GEMINI APIKEY")
self._gemini_available = False
with self._tasks_lock:
for task_id, task in self._tasks.items():
if task.status == TaskStatus.PENDING:
self._task_queue.put(task)
while not self._shutdown_event.is_set():
try:
task = self._task_queue.get(timeout=1)
@@ -1164,7 +1168,8 @@ class LexiAnnot(_PluginBase):
tokens = self._total_token_count
try:
task.status = TaskStatus.RUNNING
task.status = self.__process_file(task.video_path)
with SpacyWorker(self._spacy_model) as worker:
task.status = self.__process_file(task.video_path, worker)
except Exception as e:
task.status = TaskStatus.FAILED
logger.error(f"处理 {task} 出错: {e}")
@@ -1175,9 +1180,9 @@ class LexiAnnot(_PluginBase):
self.save_tasks()
except queue.Empty:
continue
logger.debug("🛑 Worker received shutdown signal, exiting...")
logger.debug(f"🛑 Worker thread {threading.get_ident():#x} received shutdown signal, exiting...")
def __process_file(self, path: str) -> TaskStatus:
def __process_file(self, path: str, spacy_worker: SpacyWorker) -> TaskStatus:
"""
处理视频文件
"""
@@ -1187,23 +1192,7 @@ class LexiAnnot(_PluginBase):
if not lexicon:
logger.error(f"字典加载失败")
return TaskStatus.FAILED
try:
# 为减少内存占用,只在处理时加载 spaCy 模型
nlp = LexiAnnot.__load_nlp(self._spacy_model)
infixes = list(nlp.Defaults.infixes)
infixes = [i for i in infixes if '-' not in i]
# 使用修改后的正则表达式重新创建 tokenizer
infix_re = spacy.util.compile_infix_regex(infixes)
nlp.tokenizer = Tokenizer(
nlp.vocab,
prefix_search=nlp.tokenizer.prefix_search,
suffix_search=nlp.tokenizer.suffix_search,
infix_finditer=infix_re.finditer,
token_match=nlp.tokenizer.token_match
)
except Exception as e:
logger.error(f"spaCy 模型 {self._spacy_model} 加载失败: {e}")
return TaskStatus.FAILED
video = Path(path)
if video.suffix.lower() not in settings.RMT_MEDIAEXT:
return TaskStatus.CANCELED
@@ -1234,7 +1223,8 @@ class LexiAnnot(_PluginBase):
ass_subtitle = LexiAnnot.set_srt_style(ass_subtitle)
ass_subtitle = self.__set_style(ass_subtitle)
ass_subtitle = self.process_subtitles(ass_subtitle, lexicon.get('cefr'), lexicon.get('coca20k'),
lexicon.get('examinations'),lexicon.get('swear_words'), nlp)
lexicon.get('examinations'), lexicon.get('swear_words'),
spacy_worker)
if self._shutdown_event.is_set():
return TaskStatus.CANCELED
if ass_subtitle:
@@ -1284,11 +1274,6 @@ class LexiAnnot(_PluginBase):
return None
return lexicon
@staticmethod
@cached(maxsize=1, ttl=3600*6)
def __load_nlp(model: str) -> spacy.Language:
return spacy.load(model)
def __retrieve_lexicon_online(self, version: str) -> Optional[Dict[str, Any]]:
logger.info('开始下载词典文件...')
lexicon_files = ['cefr', 'coca20k', 'swear_words', 'examinations']
@@ -1315,11 +1300,13 @@ class LexiAnnot(_PluginBase):
"""
测试插件数据加载
"""
logger.info(f"加载 spaCy 模型 {self._spacy_model}...")
try:
logger.info(f"加载 spaCy 模型 {self._spacy_model}...")
nlp = LexiAnnot.__load_nlp(self._spacy_model)
except OSError:
nlp = LexiAnnot.__load_spacy_model(self._spacy_model)
with SpacyWorker(self._spacy_model):
nlp = True
except RuntimeError:
nlp = LexiAnnot.__download_spacy_model(self._spacy_model)
lexicon = self.__load_lexicon_from_local()
latest = self.__load_lexicon_version() or '0.0.0'
if not lexicon or StringUtils.compare_version(lexicon.get('version'), '<', latest):
@@ -1333,27 +1320,28 @@ class LexiAnnot(_PluginBase):
logger.info(f"当前词典文件版本: {lexicon.get('version')}")
@staticmethod
def __load_spacy_model(model_name: str):
def __download_spacy_model(model_name: str) -> bool:
logger.info(f"下载 spaCy 模型 {model_name}...")
try:
logger.info(f"下载 spaCy 模型 {model_name}...")
subprocess.run(
[sys.executable, "-m", "spacy", "download", model_name],
capture_output=True,
text=True,
check=True
)
nlp = LexiAnnot.__load_nlp(model_name)
logger.info(f"spaCy 模型 '{model_name}' 加载成功!")
return nlp
with SpacyWorker(model_name):
nlp = True
except subprocess.CalledProcessError as e:
logger.error(f"下载 spaCy 模型 '{model_name}' 失败。")
logger.error(f"命令返回非零退出码:{e.returncode}")
logger.error(f"Stdout:\n{e.stdout}")
logger.error(f"Stderr:\n{e.stderr}")
return None
return False
except Exception as e:
logger.error(f"下载或加载 spaCy 模型时发生意外错误:{e}")
return None
return False
logger.info(f"spaCy 模型 '{model_name}' 加载成功!")
return nlp
@eventmanager.register(EventType.TransferComplete)
def check_media(self, event: Event):
@@ -1817,7 +1805,7 @@ class LexiAnnot(_PluginBase):
coca20k_lexicon: Dict[str, Any],
exams_lexicon: Dict[str, Any],
swear_words: List[str],
nlp: spacy.Language):
spacy_worker: SpacyWorker):
def __replace_with_spaces(_text):
"""
@@ -1842,20 +1830,20 @@ class LexiAnnot(_PluginBase):
text = text_raw.replace('\n', ' ')
text = __replace_with_spaces(text)
new_vocab = []
doc = nlp(text)
doc = spacy_worker.submit(text)
last_end_pos = 0
lemma_to_query = []
for token in doc:
if len(token.text) == 1:
if len(token.get('text')) == 1:
continue
if token.lemma_ in swear_words:
if token.get('lemma_') in swear_words:
continue
if token.pos_ not in ('NOUN', 'AUX', 'VERB', 'ADJ', 'ADV', 'ADP', 'CCONJ', 'SCONJ'):
if token.get('pos_') not in ('NOUN', 'AUX', 'VERB', 'ADJ', 'ADV', 'ADP', 'CCONJ', 'SCONJ'):
continue
striped = token.lemma_.strip('-[')
striped = token.get('lemma_').strip('-[')
if any(p.match(striped) for p in compiled_patterns):
continue
cefr = LexiAnnot.get_cefr_by_spacy(striped, token.pos_, cefr_lexicon)
cefr = LexiAnnot.get_cefr_by_spacy(striped, token.get('pos_'), cefr_lexicon)
if cefr and cefr in simple_vocabulary:
continue
res_of_coco = LexiAnnot.query_coca20k(striped, coca20k_lexicon)
@@ -1869,7 +1857,7 @@ class LexiAnnot(_PluginBase):
continue
else:
lemma_to_query.append(striped)
striped_text = token.text.strip('-*[')
striped_text = token.get('text').strip('-*[')
start_pos = text.find(striped_text, last_end_pos)
end_pos = start_pos + len(striped_text)
phonetics = ''
@@ -1889,7 +1877,7 @@ class LexiAnnot(_PluginBase):
pos_defs = res_of_coco.get('pos_defs') or []
last_end_pos = end_pos
new_vocab.append({'start': start_pos, 'end': end_pos, 'text': striped_text, 'lemma': striped,
'pos': token.pos_, 'cefr': cefr, 'Chinese': '', 'phonetics': phonetics,
'pos': token.get('pos_'), 'cefr': cefr, 'Chinese': '', 'phonetics': phonetics,
'pos_defs': pos_defs, 'exam_tags': exam_tags})
line_data['new_vocab'] = new_vocab
# 查询词汇翻译
@@ -1993,7 +1981,7 @@ class LexiAnnot(_PluginBase):
coca20k_lexicon: Dict[str, Any],
exams_lexicon: Dict[str, Any],
swear_words: List[str],
nlp: spacy.Language) -> Optional[SSAFile]:
spacy_worker: SpacyWorker) -> Optional[SSAFile]:
"""
处理字幕内容,标记词汇并添加翻译。
"""
@@ -2029,7 +2017,7 @@ class LexiAnnot(_PluginBase):
main_dialogue[index] = dialogue
index += 1
lines_to_process = self.__process_by_ai(lines_to_process, cefr_lexicon, coca20k_lexicon, exams_lexicon,
swear_words, nlp)
swear_words, spacy_worker)
# 在原字幕添加标注
main_style_fs = ass_file.styles[main_style].fontsize

View File

@@ -0,0 +1,84 @@
from multiprocessing import Process, Queue
from typing import Dict, List
import spacy
from spacy.tokenizer import Tokenizer
from app.core.cache import cached
from app.log import logger
class SpacyWorker:
def __init__(self, model='en_core_web_sm'):
self.task_q = Queue()
self.result_q = Queue()
self.status_q = Queue()
self.model = model
# 启动子进程
logger.info(f"正在启动 SpacyWorker 子进程...")
self.proc = Process(target=self.run, args=(self.model,))
self.proc.start()
# 等待子进程返回模型加载状态
status, info = self.status_q.get()
if status == 'error':
self.proc.join()
raise RuntimeError(f"spaCy 模型加载失败: {info}")
else:
logger.info(f"spaCy 模型 `{self.model}` 加载成功")
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
def run(self, model: str):
try:
nlp = SpacyWorker.load_nlp(model)
infixes = list(nlp.Defaults.infixes)
infixes = [i for i in infixes if '-' not in i]
infix_re = spacy.util.compile_infix_regex(infixes)
nlp.tokenizer = Tokenizer(
nlp.vocab,
prefix_search=nlp.tokenizer.prefix_search,
suffix_search=nlp.tokenizer.suffix_search,
infix_finditer=infix_re.finditer,
token_match=nlp.tokenizer.token_match
)
except Exception as e:
self.status_q.put(('error', str(e)))
return
# 告诉主进程加载成功
self.status_q.put(('ok', None))
while True:
text = self.task_q.get()
if text is None:
break
doc = nlp(text)
self.result_q.put([{'text': token.text, 'pos_': token.pos_, 'lemma_': token.lemma_} for token in doc])
@staticmethod
@cached(maxsize=1, ttl=3600 * 6)
def load_nlp(model: str) -> spacy.Language:
return spacy.load(model)
def submit(self, text: str) -> List[Dict[str, str]]:
"""
提交任务并等待结果
"""
self.task_q.put(text)
return self.result_q.get()
def close(self):
"""
关闭子进程
"""
if self.proc.is_alive():
self.task_q.put(None)
self.proc.join()
logger.info(f"SpacyWorker 子进程退出")