diff --git a/icons/LexiAnnot.png b/icons/LexiAnnot.png new file mode 100644 index 0000000..651e097 Binary files /dev/null and b/icons/LexiAnnot.png differ diff --git a/package.v2.json b/package.v2.json index 1cd9dc4..b4e2d50 100644 --- a/package.v2.json +++ b/package.v2.json @@ -450,5 +450,17 @@ "history": { "v0.1.0": "新增ClashRuleProvider" } + }, + "LexiAnnot": { + "name": "美剧生词标注", + "description": "根据CEFR等级,为英语影视剧标注高级词汇。", + "labels": "英语", + "version": "1.0", + "icon": "LexiAnnot.png", + "author": "wumode", + "level": 1, + "history": { + "v1.0": "新增LexiAnnot" + } } } diff --git a/plugins.v2/lexiannot/README.md b/plugins.v2/lexiannot/README.md new file mode 100644 index 0000000..4b08489 --- /dev/null +++ b/plugins.v2/lexiannot/README.md @@ -0,0 +1,54 @@ +# 美剧生词标注 + +根据CEFR等级,为英语影视剧标注高级词汇。 + +在影视剧入库后,LexiAnnot会读取媒体文件的MediaInfo和文件列表,如果视频的原始语言为英语并且包含英文文本字幕,LexiAnnot将为其生成包含词汇注释的.ass字幕文件。 + +![](https://images2.imgbox.com/d6/b6/kZu6EH2a_o.png) +![](https://images2.imgbox.com/c8/3a/rEJBWu5v_o.png) +![](https://images2.imgbox.com/97/b7/d6RXFtwD_o.png) + +# Gemini + +- **[获取APIKEY](https://aistudio.google.com/app/apikey)** +- **[速率限制](https://ai.google.dev/gemini-api/docs/rate-limits)** + +**确保可以正常访问下面的域名** + +- googleapis.com +- google.dev +- aistudio.google.com + +# CEFR + +CEFR全称是Common European Framework of Reference for Languages。 + +它是一个国际标准,用于描述语言学习者的语言能力水平。CEFR 将语言能力分为六个级别,并进一步归类为三大使用者类型: + +- **A - 基础使用者 (Basic User)** + - **A1** (初学者/Beginner):能够理解并使用日常熟悉的表达和非常基本的短语。 + - **A2** (初级/Elementary):能够理解基本的表达方式,并以简单的方式进行交流。 +- **B - 独立使用者 (Independent User)** + - **B1** (中级/Intermediate):能够理解熟悉主题的主要观点,可以处理旅行中可能遇到的多数情况,并能就熟悉的话题发表意见和描述。 + - **B2** (中高级/Upper-Intermediate):能够理解复杂文本的主要思想,并能与母语者进行一定程度的流利、自然的互动,可以就广泛的主题进行清晰、详细的阐述。 +- **C - 熟练使用者 (Proficient User)** + - **C1** (高级/Advanced):能够理解各种较长、要求较高的文本,并能识别隐含意义,表达流利、自然,能灵活有效地使用语言来应对各种目的。 + - **C2** (精通/Proficient):能够轻松理解几乎所有听到的或读到的内容,能够非常流利、准确、精细地表达自己,即使在复杂的情况下也能区分细微的含义。 + +# 计划 + +- 双语字幕支持 +- 考试词汇标注 + +# FAQ + +- **为什么需要用到Gemini** + - LexiAnnot使用的词典仅包含约18000个单词,无法覆盖影视剧中的海量的俚语、习语、流行语等更广泛的表达形式 +- **只能处理已有字幕的视频吗?** + - 是的,视频需要包含**英文文本字幕** +- **为什么无法处理一些包含字幕视频** + - 目前无法识别基于图片的字幕(通常是特效字幕) + +# 感谢 + +- [coca-vocabulary-20000](https://github.com/llt22/coca-vocabulary-20000) \ No newline at end of file diff --git a/plugins.v2/lexiannot/__init__.py b/plugins.v2/lexiannot/__init__.py new file mode 100644 index 0000000..c143024 --- /dev/null +++ b/plugins.v2/lexiannot/__init__.py @@ -0,0 +1,1561 @@ +import os +import re +import sys +import json +import subprocess +import time +import threading +import queue +import shutil +from typing import Any, List, Dict, Tuple, Optional, Union, Type, TypeVar +import venv +from pathlib import Path +from collections import Counter + +from apscheduler.schedulers.background import BackgroundScheduler +import pysubs2 +from pysubs2 import SSAFile, SSAEvent +import spacy +from spacy.tokens import Token +import pymediainfo +from langdetect import detect + +from app.core.config import settings +from app.log import logger +from app.plugins import _PluginBase +from app.core.cache import cached +from app.core.event import eventmanager, Event +from app.utils.system import SystemUtils +from app.schemas.types import NotificationType +from app.utils.http import RequestUtils +from app.utils.string import StringUtils +from app.schemas import TransferInfo +from app.schemas.types import EventType +from app.core.context import MediaInfo +from app.plugins.lexiannot.query_gemini import DialogueTranslationTask, VocabularyTranslationTask, Vocabulary, Context + +T = TypeVar('T', VocabularyTranslationTask, DialogueTranslationTask) + +class LexiAnnot(_PluginBase): + # 插件名称 + plugin_name = "美剧生词标注" + # 插件描述 + plugin_desc = "根据CEFR等级,为英语影视剧标注高级词汇。" + # 插件图标 + plugin_icon = "https://raw.githubusercontent.com/wumode/LexiAnnot/refs/heads/master/LexiAnnot.png" + # 插件版本 + plugin_version = "1.0" + # 插件作者 + plugin_author = "wumode" + # 作者主页 + author_url = "https://github.com/wumode" + # 插件配置项ID前缀 + plugin_config_prefix = "lexiannot_" + # 加载顺序 + plugin_order = 50 + # 可使用的用户级别 + auth_level = 1 + + _enabled: bool = False + _annot_level = '' + _send_notify = False + _onlyonce = False + _show_vocabulary_detail = False + _show_phonetics = False + _sentence_translation = False + _in_place = False + _enable_gemini = False + _gemini_model = False + _gemini_apikey = '' + _context_window = 0 + _max_retries = 0 + _request_interval = 0 + _ffmpeg_path = '' + _english_only = False + _when_file_trans = False + _model_temperature = '' + _custom_files = '' + _accent_color = '' + _font_scaling = '' + _opacity = '' + + # 插件数据 + _lexicon_version = '' + _swear_words = None + _cefr_lexicon = None + _coca2k_lexicon = None + + # protected variables + _lexicon_repo = 'https://raw.githubusercontent.com/wumode/LexiAnnot/' + _spacy_model_name = "en_core_web_sm" + _scheduler: Optional[BackgroundScheduler] = None + _nlp = None + _worker_thread = None + _task_queue = None + _shutdown_event = None + _client = None + _total_token_count = 0 + _venv_python = None + _query_gemini_script = '' + _gemini_available = False + _accent_color_rgb = None + _color_alpha = 0 + + def init_plugin(self, config=None): + self._task_queue = queue.Queue() + self.stop_service() + if config: + self._enabled = config.get("enabled") + self._annot_level = config.get("annot_level") + self._send_notify = config.get("send_notify") + self._onlyonce = config.get("onlyonce") + self._show_vocabulary_detail = config.get("show_vocabulary_detail") + self._sentence_translation = config.get("sentence_translation") + self._in_place = config.get("in_place") + self._enable_gemini = config.get("enable_gemini") + self._gemini_model = config.get("gemini_model") + self._gemini_apikey = config.get("gemini_apikey") + self._context_window = config.get("context_window") + self._max_retries = config.get("max_retries") + self._request_interval = config.get("request_interval") + self._ffmpeg_path = config.get("ffmpeg_path") + self._english_only = config.get("english_only") + self._when_file_trans = config.get("when_file_trans") + self._model_temperature = config.get("model_temperature") + self._show_phonetics = config.get("show_phonetics") + self._custom_files = config.get("custom_files") + self._accent_color = config.get("accent_color") + self._font_scaling = config.get("font_scaling") + self._opacity = config.get("opacity") + + self._accent_color_rgb = LexiAnnot.hex_to_rgb(self._accent_color) or (255, 255, 0) + self._color_alpha = int(self._opacity) if self._opacity and len(self._opacity) else 0 + if self._enabled: + self._query_gemini_script = f"{settings.ROOT_PATH}/app/plugins/lexiannot/query_gemini.py" + self._cefr_lexicon = self.get_data("cefr_lexicon") + self._coca2k_lexicon = self.get_data("coca2k_lexicon") + self._swear_words = self.get_data("swear_words") + self._lexicon_version = self.get_data("lexicon_version") + latest = self.__load_lexicon_version() + if not self._lexicon_version or StringUtils.compare_version(self._lexicon_version, '<', latest): + self.__load_lexicon() + try: + if self._nlp is None: + self._nlp = spacy.load(self._spacy_model_name) + except OSError: + self._nlp = LexiAnnot.__load_spacy_model(self._spacy_model_name) + if not (self._nlp and self._cefr_lexicon and self._coca2k_lexicon and self._swear_words): + _loaded = False + else: + _loaded = True + if not _loaded: + logger.warn(f"插件数据未加载,初始化失败") + self._enabled = False + self.__update_config() + return + if self._enable_gemini: + self._gemini_available = True + res = self.init_venv() + if not res: + self._gemini_available = False + if not self._gemini_apikey: + logger.warn(f"未提供GEMINI APIKEY") + self._gemini_available = False + self._shutdown_event = threading.Event() + self._worker_thread = threading.Thread(target=self.__process_tasks, daemon=True) + self._worker_thread.start() + if self._onlyonce: + for file_path in self._custom_files.split("\n"): + if not file_path: + continue + self.add_media_file(file_path) + self._onlyonce = False + self.__update_config() + + + def get_form(self) -> Tuple[List[dict], Dict[str, Any]]: + """ + 拼装插件配置页面,需要返回两块数据:1、页面配置;2、数据结构 + """ + return [ + { + 'component': 'VForm', + 'content': [ + { + 'component': 'VRow', + 'content': [ + { + 'component': 'VCol', + 'props': { + 'cols': 12, + 'md': 3 + }, + 'content': [ + { + 'component': 'VSwitch', + 'props': { + 'model': 'enabled', + 'label': '启用插件', + } + } + ] + }, + { + 'component': 'VCol', + 'props': { + 'cols': 12, + 'md': 3 + }, + 'content': [ + { + 'component': 'VSwitch', + 'props': { + 'model': 'when_file_trans', + 'label': '监控入库', + } + } + ] + }, + { + 'component': 'VCol', + 'props': { + 'cols': 12, + 'md': 3 + }, + 'content': [ + { + 'component': 'VSwitch', + 'props': { + 'model': 'send_notify', + 'label': '发送通知', + } + } + ] + }, + { + 'component': 'VCol', + 'props': { + 'cols': 12, + 'md': 3 + }, + 'content': [ + { + 'component': 'VSwitch', + 'props': { + 'model': 'onlyonce', + 'label': '立即运行一次', + } + } + ] + } + ] + }, + { + 'component': 'VRow', + 'content': [ + { + 'component': 'VCol', + 'props': { + 'cols': 12, + 'md': 3 + }, + 'content': [ + { + 'component': 'VSelect', + 'props': { + 'model': 'annot_level', + 'label': '标注词汇的最低CEFR等级', + 'items': [ + {'title': 'B1', 'value': 'B1'}, + {'title': 'B2', 'value': 'B2'}, + {'title': 'C1', 'value': 'C1'}, + {'title': 'C2', 'value': 'C2'}, + {'title': 'C2+', 'value': 'C2+'} + ] + } + } + ] + }, + { + 'component': 'VCol', + 'props': { + 'cols': 12, + 'md': 3 + }, + 'content': [ + { + 'component': 'VSelect', + 'props': { + 'model': 'font_scaling', + 'label': '字体缩放', + 'items': [ + {'title': '50%', 'value': '0.5'}, + {'title': '75%', 'value': '0.75'}, + {'title': '100%', 'value': '1'}, + {'title': '125%', 'value': '1.25'}, + {'title': '150%', 'value': '1.5'}, + {'title': '200%', 'value': '2'} + ] + } + } + ] + }, + { + 'component': 'VCol', + 'props': { + 'cols': 12, + 'md': 3, + }, + 'content': [ + { + 'component': 'VTextField', + 'props': { + 'model': 'accent_color', + 'label': '强调色', + 'placeholder': '#FFFF00' + } + } + ] + }, + { + 'component': 'VCol', + 'props': { + 'cols': 12, + 'md': 3 + }, + 'content': [ + { + 'component': 'VSelect', + 'props': { + 'model': 'opacity', + 'label': '不透明度', + 'items': [ + {'title': '0', 'value': '0'}, + {'title': '25%', 'value': '63'}, + {'title': '50%', 'value': '127'}, + {'title': '75%', 'value': '191'}, + {'title': '100%', 'value': '255'}, + ] + } + } + ] + } + ] + }, + { + 'component': 'VRow', + 'content': [ + { + 'component': 'VCol', + 'props': { + 'cols': 12, + 'md': 3 + }, + 'content': [ + { + 'component': 'VSwitch', + 'props': { + 'model': 'show_phonetics', + 'label': '标注音标', + } + } + ] + }, + { + 'component': 'VCol', + 'props': { + 'cols': 12, + 'md': 3 + }, + 'content': [ + { + 'component': 'VSwitch', + 'props': { + 'model': 'in_place', + 'label': '在原字幕插入注释', + } + } + ] + }, + { + 'component': 'VCol', + 'props': { + 'cols': 12, + 'md': 3 + }, + 'content': [ + { + 'component': 'VSwitch', + 'props': { + 'model': 'english_only', + 'label': '仅英语影视剧', + } + } + ] + }, + { + 'component': 'VCol', + 'props': { + 'cols': 12, + 'md': 3 + }, + 'content': [ + { + 'component': 'VSwitch', + 'props': { + 'model': 'show_vocabulary_detail', + 'label': '显示完整释义', + } + } + ] + }, + + ] + }, + { + 'component': 'VRow', + 'content': [ + { + 'component': 'VCol', + 'props': { + 'cols': 12, + 'md': 6, + }, + 'content': [ + { + 'component': 'VSwitch', + 'props': { + 'model': 'enable_gemini', + 'label': '启用Gemini翻译', + } + } + ] + }, + { + 'component': 'VCol', + 'props': { + 'cols': 12, + 'md': 6 + }, + 'content': [ + { + 'component': 'VSwitch', + 'props': { + 'model': 'sentence_translation', + 'label': '整句翻译', + } + } + ] + } + ] + }, + { + 'component': 'VRow', + 'content': [ + { + 'component': 'VCol', + 'props': { + 'cols': 12, + 'md': 6, + }, + 'content': [ + { + 'component': 'VSelect', + 'props': { + 'model': 'gemini_model', + 'label': '模型', + 'items': [ + {'title': 'gemini-2.5-flash-preview-05-20', + 'value': 'gemini-2.5-flash-preview-05-20'}, + {'title': 'gemini-2.5-pro-preview-05-06', + 'value': 'gemini-2.5-pro-preview-05-06'}, + {'title': 'gemini-2.0-flash', 'value': 'gemini-2.0-flash'}, + {'title': 'gemini-2.0-flash-lite', 'value': 'gemini-2.0-flash-lite'}, + {'title': 'gemini-1.5-flash', 'value': 'gemini-1.5-flash'}, + {'title': 'gemini-1.5-pro', 'value': 'gemini-1.5-pro'} + ] + } + } + ] + }, + { + 'component': 'VCol', + 'props': { + 'cols': 12, + 'md': 6, + }, + 'content': [ + { + 'component': 'VTextField', + 'props': { + 'model': 'gemini_apikey', + 'label': 'Gemini APIKEY', + 'placeholder': '' + } + } + ] + }, + ] + }, + { + 'component': 'VRow', + 'content': [ + { + 'component': 'VCol', + 'props': { + 'cols': 12, + 'md': 3, + }, + 'content': [ + { + 'component': 'VTextField', + 'props': { + 'model': 'context_window', + 'label': '上下文窗口大小', + 'placeholder': '10', + 'type': 'number', + 'max': 20, + 'min': 1, + 'hint': '向Gemini发送的上下文长度' + } + } + ] + }, + { + 'component': 'VCol', + 'props': { + 'cols': 12, + 'md': 3 + }, + 'content': [ + { + 'component': 'VSelect', + 'props': { + 'model': 'model_temperature', + 'label': '模型温度', + 'items': [ + {'title': '0', 'value': '0'}, + {'title': '0.1', 'value': '0.1'}, + {'title': '0.2', 'value': '0.2'}, + {'title': '0.3', 'value': '0.3'}, + ] + } + } + ] + }, + { + 'component': 'VCol', + 'props': { + 'cols': 12, + 'md': 3, + }, + 'content': [ + { + 'component': 'VTextField', + 'props': { + 'model': 'max_retries', + 'label': '请求重试次数', + 'placeholder': '3', + 'type': 'number', + 'min': 1, + 'hint': '请求失败重试次数' + } + } + ] + }, + { + 'component': 'VCol', + 'props': { + 'cols': 12, + 'md': 3, + }, + 'content': [ + { + 'component': 'VTextField', + 'props': { + 'model': 'request_interval', + 'label': '请求间隔', + 'type': 'number', + 'placeholder': 5 , + 'min': 1, + 'suffix': '秒', + 'hint': '请求间隔时间,建议不少于3秒' + } + } + ] + }, + ] + }, + { + 'component': 'VRow', + 'content': [ + { + 'component': 'VCol', + 'props': { + 'cols': 12, + }, + 'content': [ + { + 'component': 'VTextField', + 'props': { + 'model': 'ffmpeg_path', + 'label': 'FFmpeg 路径', + 'placeholder': 'ffmpeg' + } + } + ] + } + ] + }, + { + 'component': 'VRow', + 'content': [ + { + 'component': 'VCol', + 'props': { + 'cols': 12, + }, + 'content': [ + { + 'component': 'VTextarea', + 'props': { + 'model': 'custom_files', + 'label': '视频路径', + 'rows': 3, + 'placeholder': '每行一个文件' + } + } + ] + }, + ] + }, + { + 'component': 'VRow', + 'content': [ + { + 'component': 'VCol', + 'props': { + 'cols': 12, + }, + 'content': [ + { + 'component': 'VAlert', + 'props': { + 'type': 'success', + 'variant': 'tonal' + }, + 'content': [ + { + 'component': 'span', + 'text': '配置说明:' + }, + { + 'component': 'a', + 'props': { + 'href': 'https://github.com/jxxghp/MoviePilot-Plugins/tree/main/plugins.v2/lexiannot/README.md', + 'target': '_blank' + }, + 'content': [ + { + 'component': 'u', + 'text': 'README' + } + ] + }] + } + + ] + } + ] + } + ] + } + ], { + "enabled": False, + "annot_level": 'C1', + "send_notify": False, + "onlyonce": False, + "show_vocabulary_detail": False, + "show_phonetics": False, + "sentence_translation": False, + "in_place": False, + "enable_gemini": False, + "gemini_model": 'gemini-2.0-flash', + "gemini_apikey": '', + "context_window": 10, + "max_retries": 3, + 'request_interval': 3, + "ffmpeg_path": "", + "english_only": True, + "when_file_trans": True, + "model_temperature": '0.3', + "custom_files": '', + "accent_color": '', + "font_scaling": '1', + "opacity": '0', + } + + def get_api(self) -> List[Dict[str, Any]]: + pass + + def get_page(self) -> List[dict]: + pass + + @staticmethod + def get_command() -> List[Dict[str, Any]]: + pass + + def get_state(self) -> bool: + """ + 获取插件状态,如果插件正在运行, 则返回True + """ + return self._enabled + + def stop_service(self): + """ + 退出插件 + """ + self.shutdown() + + def shutdown(self): + """ + 关闭插件 + """ + if self._worker_thread and self._worker_thread.is_alive(): + logger.debug("🔻 Stopping existing worker thread...") + self._shutdown_event.set() + self._worker_thread.join() + logger.debug("✅ Existing worker thread stopped.") + else: + logger.debug("ℹ️ No running worker thread to stop.") + + def add_media_file(self, path: str): + """ + 添加新任务 + """ + if not self._shutdown_event.is_set(): + self._task_queue.put(path) + else: + raise RuntimeError("Plugin is shutting down. Cannot add new tasks.") + + def __update_config(self): + self.update_config({'enabled': self._enabled, + 'annot_level': self._annot_level, + 'send_notify': self._send_notify, + 'onlyonce': self._onlyonce, + 'show_vocabulary_detail': self._show_vocabulary_detail, + 'sentence_translation': self._sentence_translation, + 'in_place': self._in_place, + 'enable_gemini': self._enable_gemini, + 'gemini_model': self._gemini_model, + 'gemini_apikey': self._gemini_apikey, + 'context_window': self._context_window, + 'max_retries': self._max_retries, + 'request_interval': self._request_interval, + 'ffmpeg_path': self._ffmpeg_path, + 'english_only': self._english_only, + 'when_file_trans': self._when_file_trans, + 'model_temperature': self._model_temperature, + 'show_phonetics': self._show_phonetics, + 'custom_files': self._custom_files, + 'accent_color': self._accent_color, + 'font_scaling': self._font_scaling, + 'opacity': self._opacity + }) + + def __process_tasks(self): + """ + 后台线程:处理任务队列 + """ + logger.debug("👷 Worker thread started.") + while not self._shutdown_event.is_set(): + try: + task = self._task_queue.get(timeout=1) # 最多等待1秒 + if task is None: + continue + self.__process_file(task) + except queue.Empty: + continue + logger.debug("🛑 Worker received shutdown signal, exiting...") + + def __process_file(self, path: str): + """ + 处理视频文件 + """ + video = Path(path) + if video.suffix.lower() not in settings.RMT_MEDIAEXT: + return + if not video.exists() or not video.is_file(): + logger.warn(f"文件 {str(video)} 不存在, 跳过") + return + subtitle = video.with_suffix(".ass") + if subtitle.exists(): + logger.warn(f"字幕文件 ({subtitle}) 已存在, 跳过") + return + logger.info(f"📂 Processing file: {path}") + if self._send_notify: + message = f"正在处理文件: {path}" + self.post_message(title=f"【{self.plugin_name}】", + mtype=NotificationType.Plugin, + text=f"{message}") + ffmpeg_path = self._ffmpeg_path if self._ffmpeg_path else 'ffmpeg' + embedded_subtitles = LexiAnnot.__extract_subtitles_by_lang(path, 'en', ffmpeg_path) + ret_message = '' + if embedded_subtitles: + logger.info(f'提取到 {len(embedded_subtitles)} 条英语文本字幕') + for embedded_subtitle in embedded_subtitles: + if self._shutdown_event.is_set(): + return + ass_subtitle = pysubs2.SSAFile.from_string(embedded_subtitle['subtitle'], format_='ass') + if embedded_subtitle.get('codec_id') == 'S_TEXT/UTF8': + ass_subtitle = LexiAnnot.set_srt_style(ass_subtitle) + ass_subtitle = self.__set_style(ass_subtitle) + ass_subtitle = self.process_subtitles(ass_subtitle) + if self._shutdown_event.is_set(): + return + if ass_subtitle: + try: + ass_subtitle.save(str(subtitle)) + ret_message = f"字幕已保存:{str(subtitle)}" + except Exception as e: + logger.error(f"字幕文件 {subtitle} 保存失败, {e}") + break + else: + logger.info(f"处理字幕{embedded_subtitle['codec_id']}-{embedded_subtitle['stream_id']}失败") + else: + logger.warn(f"未能在{path}中找到可提取的英文字幕") + if not ret_message: + ret_message= f"未能在{path}中找到可提取的英文字幕" + logger.info(f"✅ Finished: {path}") + if self._send_notify: + self.post_message(title=f"【{self.plugin_name}】", + mtype=NotificationType.Plugin, + text=f"{ret_message}") + + @cached(maxsize=1000, ttl=1800) + def __load_lexicon_version(self) -> Optional[str]: + logger.info(f"正在检查远程词库版本...") + url = f'{self._lexicon_repo}master/version' + version = RequestUtils().get(url, headers=settings.REPO_GITHUB_HEADERS()) + if version is None: + return None + return version + + def __load_lexicon(self): + url = f'{self._lexicon_repo}master/cefr.json' + res = RequestUtils().get_res(url, headers=settings.REPO_GITHUB_HEADERS()) + if res: + self._cefr_lexicon = res.json() + url = f'{self._lexicon_repo}master/coca20k.json' + res = RequestUtils().get_res(url, headers=settings.REPO_GITHUB_HEADERS()) + if res: + self._coca2k_lexicon = res.json() + url = f'{self._lexicon_repo}master/swear_words.json' + res = RequestUtils().get_res(url, headers=settings.REPO_GITHUB_HEADERS()) + if res: + self._swear_words = res.json() + self._lexicon_version = self.__load_lexicon_version() + self.save_data("cefr_lexicon", self._cefr_lexicon) + self.save_data("coca2k_lexicon", self._coca2k_lexicon) + self.save_data("swear_words", self._swear_words) + self.save_data("lexicon_version", self._lexicon_version) + + @staticmethod + def __load_spacy_model(model_name: str): + try: + result = subprocess.run( + [sys.executable, "-m", "spacy", "download", model_name], + capture_output=True, + text=True, + check=True + ) + nlp = spacy.load(model_name) + logger.info(f"spaCy 模型 '{model_name}' 加载成功!") + return nlp + except subprocess.CalledProcessError as e: + logger.error(f"下载 spaCy 模型 '{model_name}' 失败。") + logger.error(f"命令返回非零退出码:{e.returncode}") + logger.error(f"Stdout:\n{e.stdout}") + logger.error(f"Stderr:\n{e.stderr}") + return None + except Exception as e: + logger.error(f"下载或加载 spaCy 模型时发生意外错误:{e}") + return None + + + @eventmanager.register(EventType.TransferComplete) + def check_media(self, event: Event): + if not self._enabled or not self._when_file_trans: + return + event_info: dict = event.event_data + if not event_info: + return + + # 入库数据 + transfer_info: TransferInfo = event_info.get("transferinfo") + if not transfer_info or not transfer_info.target_diritem or not transfer_info.target_diritem.path: + return + mediainfo: MediaInfo = event_info.get("mediainfo") + if self._english_only: + if mediainfo.original_language != 'en': + logger.info(f"原始语言 ({mediainfo.original_language}) 不为英语, 跳过 {mediainfo.title}: ") + return + for new_path in transfer_info.file_list_new: + self.add_media_file(new_path) + + @staticmethod + def query_cefr(word, cefr_lexicon): + word = word.lower().strip("-*'") + if word in cefr_lexicon: + return cefr_lexicon[word] + else: + return None + + @staticmethod + def query_coca20k(word: str, lexicon: Dict[str, Any]): + word = word.lower().strip("-*'") + return lexicon.get(word) + + @staticmethod + def convert_pos_to_spacy(pos: str): + """ + 将给定的词性列表转换为spaCy库中使用的词性标签。 + + Args: + pos: 一个字符串形式词性。 + + Returns: + 一个包含对应spaCy词性标签的列表。对于无法直接映射的词性, + 将返回None。 + """ + spacy_pos_map = { + 'noun': 'NOUN', + 'adjective': 'ADJ', + 'adverb': 'ADV', + 'verb': 'VERB', + 'preposition': 'ADP', + 'conjunction': 'CCONJ', + 'determiner': 'DET', + 'pronoun': 'PRON', + 'interjection': 'INTJ', + 'number': 'NUM' + } + + pos_lower = pos.lower() + if pos_lower in spacy_pos_map: + spacy_pos = spacy_pos_map[pos_lower] + elif pos_lower == 'be-verb': + spacy_pos = 'AUX' # Auxiliary verb (e.g., be, do, have) + elif pos_lower == 'vern': + spacy_pos = 'VERB' # Assuming 'vern' is a typo for 'verb' + elif pos_lower == 'modal auxiliary': + spacy_pos = 'AUX' # Modal verbs are also auxiliaries + elif pos_lower == 'do-verb': + spacy_pos = 'AUX' + elif pos_lower == 'have-verb': + spacy_pos = 'AUX' + elif pos_lower == 'infinitive-to': + spacy_pos = 'PART' # Particle (e.g., to in "to go") + elif not pos_lower: # Handle empty strings + spacy_pos = None + else: + spacy_pos = None # For unmapped POS tags + return spacy_pos + + @staticmethod + def get_cefr_by_spacy(token: Token, cefr_lexicon: Dict[str, Any]) -> Optional[str]: + result = LexiAnnot.query_cefr(token.lemma_, cefr_lexicon) + if result: + all_cefr = [] + if len(result) > 0: + for entry in result: + if token.pos_ == LexiAnnot.convert_pos_to_spacy(entry['pos']): + return entry['cefr'] + all_cefr.append(entry['cefr']) + return min(all_cefr) + return None + + @staticmethod + def format_duration(ms): + total_seconds, milliseconds = divmod(ms, 1000) + hours, remainder = divmod(total_seconds, 3600) + minutes, seconds = divmod(remainder, 60) + hundredths = milliseconds // 10 + return f"{hours}:{minutes:02}:{seconds:02}.{hundredths:02}" + + @staticmethod + def replace_by_plaintext_positions(line: SSAEvent, replacements: List[dict]): + """ + 替换 line.text 中的内容,使用 replacements 中的 plaintext 位置信息。 + replacement + {'start': int, 'end': int, 'old_text': str, 'new_text': str} + """ + text = line.text + tag_pattern = re.compile(r"{.*?}") # 匹配 {xxx} 格式控制符 + special_pattern = re.compile(r"\\[Nh]") + # 构建 plaintext 位置到 text 索引的映射 + mapping = {} # plaintext_index -> text_index + p_index = 0 # 当前 plaintext 索引 + t_index = 0 # 当前 text 索引 + + while t_index < len(text): + if text[t_index] == "{": + # 跳过格式标签 + match = tag_pattern.match(text, t_index) + if match: + t_index = match.end() + continue + elif text[t_index] == "\\": + match = special_pattern.match(text, t_index) + if match: + t_index = match.end() - 1 + continue + # 非格式字符 + mapping[p_index] = t_index + p_index += 1 + t_index += 1 + + # 按照 mapping 执行替换(倒序替换防止位置错位) + new_text = text + for r in sorted(replacements, key=lambda x: x["start"], reverse=True): + start = mapping.get(r["start"]) + end = mapping.get(r["end"] - 1) + if start is None or end is None: + continue + end += 1 # 因为 Python 切片不包含结束索引 + new_text = new_text[:start] + r["new_text"] + new_text[end:] + + line.text = new_text + + @staticmethod + def analyze_ass_language(ass_file: SSAFile): + styles = {} + for style in ass_file.styles: + styles[style] = {'text': [], 'duration': 0, 'text_size': 0, 'times': 0} + for dialogue in ass_file: + style = dialogue.style + text = dialogue.plaintext + sub_text = text.split('\n') + if style not in styles or not text: continue + styles[style]['text'].extend(sub_text) + styles[style]['duration'] += dialogue.duration + styles[style]['text_size'] += len(text) + styles[style]['times'] += 1 + style_language_analysis = {} + for style_name, data in styles.items(): + all_text = ' '.join(data['text']) + if not all_text.strip(): + style_language_analysis[style_name] = None + continue + + languages = [] + # 对每个文本片段进行语言检测 + for text_fragment in data['text']: + try: + lang = detect(text_fragment) + languages.append(lang) + except: + pass # 无法检测的文本 + + if languages: + language_counts = Counter(languages) + most_common_language = language_counts.most_common(1)[0] + style_language_analysis[style_name] = {"main_language": most_common_language[0], + "proportion": most_common_language[1] / len(languages), + "duration": data['duration'], + "text_size": data['text_size'], + "times": data['times']} + else: + style_language_analysis[style_name] = None + + return style_language_analysis + + @staticmethod + def select_main_style_weighted(language_analysis: Dict[str, Any], known_language: str, + weights=None): + """ + 根据语言分析结果和已知的字幕语言,使用加权评分选择主要样式。 + + Args: + language_analysis (dict): `analyze_ass_language` 函数的输出结果。 + known_language (str): 已知的字幕语言代码. + weights (dict): 各个维度的权重,权重之和应为 1. + + Returns: + str or None: 主要字幕的样式名称,如果没有匹配的样式则返回 None。 + """ + if weights is None: + weights = {'times': 0.5, 'text_size': 0.4, 'duration': 0.1} + matching_styles = [] + max_times = max([analysis.get('times', 0) for _, analysis in language_analysis.items() if analysis]) or 1 + max_text_size = max( + [analysis.get('text_size', 0) for _, analysis in language_analysis.items() if analysis]) or 1 + max_duration = max([analysis.get('duration', 0) for _, analysis in language_analysis.items() if analysis]) or 1 + for style, analysis in language_analysis.items(): + if not analysis: + continue + if analysis.get('main_language') == known_language: + # 跳过多语言 + if analysis.get('proportion', 0) < 0.5: + continue + score = 0 + score += analysis.get('times', 0) * weights.get('times', 0) / max_times + score += analysis.get('text_size', 0) * weights.get('text_size', 0) / max_text_size + score += analysis.get('duration', 0) * weights.get('duration', 0) / max_duration + matching_styles.append((style, score)) + + if not matching_styles: + return None + + sorted_styles = sorted(matching_styles, key=lambda item: item[1], reverse=True) + return sorted_styles[0][0] + + @staticmethod + def set_srt_style(ass: SSAFile) -> SSAFile: + ass.info['ScaledBorderAndShadow'] = 'no' + play_res_y = int(ass.info.get('PlayResY')) + play_res_x = int(ass.info.get('PlayResX')) + if 'Default' in ass.styles: + ass.styles['Default'].marginv = play_res_y // 16 + ass.styles['Default'].fontname = 'Microsoft YaHei' + ass.styles['Default'].fontsize = play_res_y // 16 + return ass + + def __set_style(self, ass: SSAFile) -> SSAFile: + font_scaling = float(self._font_scaling) if self._font_scaling and len(self._font_scaling) else 1 + play_res_y = int(ass.info.get('PlayResY')) + play_res_x = int(ass.info.get('PlayResX')) + # 创建一个新样式 + fs = play_res_y // 16*font_scaling + new_style = pysubs2.SSAStyle() + new_style.name = 'Annotation EN' + new_style.fontname = 'Times New Roman' + new_style.fontsize = fs + new_style.primarycolor = pysubs2.Color(self._accent_color_rgb[0], + self._accent_color_rgb[1], + self._accent_color_rgb[2], + self._color_alpha) # 黄色 (BGR, alpha) + new_style.bold = True + new_style.italic = False + new_style.outline = 1 + new_style.shadow = 0 + new_style.alignment = pysubs2.Alignment.TOP_LEFT + new_style.marginl = play_res_x // 20 + new_style.marginr = play_res_x // 20 + new_style.marginv = fs + ass.styles['Annotation EN'] = new_style + zh_style = new_style.copy() + zh_style.name = 'Annotation ZH' + zh_style.fontname = 'Microsoft YaHei' + zh_style.primarycolor = pysubs2.Color(255, 255, 255, self._color_alpha) + ass.styles['Annotation ZH'] = zh_style + + pos_style = zh_style.copy() + pos_style.name = 'Annotation POS' + pos_style.fontname = 'Times New Roman' + pos_style.fontsize = fs * 0.75 + pos_style.italic = True + ass.styles['Annotation POS'] = pos_style + + phone_style = pos_style.copy() + phone_style.name = 'Annotation PHONE' + phone_style.fontname = 'Arial' + phone_style.fontsize = fs * 0.75 + phone_style.bold = False + phone_style.italic = False + ass.styles['Annotation PHONE'] = phone_style + + pos_def_cn_style = zh_style.copy() + pos_def_cn_style.name = 'DETAIL CN' + pos_def_cn_style.fontsize = fs * 0.7 + ass.styles['DETAIL CN'] = pos_def_cn_style + + pos_def_pos_style = pos_style.copy() + pos_def_pos_style.name = 'DETAIL POS' + pos_def_pos_style.fontsize = fs * 0.6 + ass.styles['DETAIL POS'] = pos_def_pos_style + + cefr_style = pos_style.copy() + cefr_style.name = "Annotation CEFR" + cefr_style.fontname = "Times New Roman" + cefr_style.fontsize = fs * 0.5 + cefr_style.bold = True + cefr_style.italic = False + cefr_style.primarycolor = pysubs2.Color(self._accent_color_rgb[0], + self._accent_color_rgb[1], + self._accent_color_rgb[2], + self._color_alpha) + cefr_style.outline = 1 + cefr_style.shadow = 0 + ass.styles['Annotation CEFR'] = cefr_style + return ass + + @staticmethod + def hex_to_rgb(hex_color) -> Optional[Tuple]: + if not hex_color: + return None + pattern = r'^#[0-9a-fA-F]{6}$' + if re.match(pattern, hex_color) is None: + return None + hex_color = hex_color.lstrip('#') # 去掉前面的 # + return tuple(int(hex_color[i:i + 2], 16) for i in (0, 2, 4)) + + @staticmethod + def __extract_subtitle(video_path: str, + subtitle_stream_index: str, + ffmpeg_path: str='ffmpeg', + sub_format='ass') -> Optional[str]: + if sub_format not in ['srt', 'ass']: + raise ValueError('Invalid subtitle format') + try: + map_parameter = f"0:s:{subtitle_stream_index}" + command = [ + ffmpeg_path, + '-i', video_path, + '-map', map_parameter, + '-f', sub_format, + '-' + ] + result = subprocess.run(command, capture_output=True, text=True, encoding='utf-8', check=True) + return result.stdout + except FileNotFoundError: + logger.warn(f"错误:找不到视频文件 '{video_path}'") + return None + except subprocess.CalledProcessError as e: + logger.warn(f"错误:提取字幕失败。\n错误信息:{e}") + logger.warn(f"FFmpeg 输出 (stderr):\n{e.stderr.decode('utf-8', errors='ignore')}") + return None + + @staticmethod + def __extract_subtitles_by_lang(video_path: str, lang: str = 'en', ffmpeg: str = 'ffmpeg') -> Optional[List[Dict]]: + """提取视频文件中的内嵌英文字幕,使用 MediaInfo 查找字幕流。""" + supported_codec = ['S_TEXT/UTF8', 'S_TEXT/ASS'] + subtitles = [] + try: + media_info: pymediainfo.MediaInfo = pymediainfo.MediaInfo.parse(video_path) + for track in media_info.tracks: + if track.track_type == 'Text' and track.language == lang and track.codec_id in supported_codec: + if track.title and 'SDH' in track.title: + continue + subtitle_stream_index = track.stream_identifier # MediaInfo 的 stream_id 从 1 开始,ffmpeg 从 0 开始 + subtitle = LexiAnnot.__extract_subtitle(video_path, subtitle_stream_index, ffmpeg) + if subtitle: + subtitles.append({'title': track.title, 'subtitle': subtitle, 'codec_id': track.codec_id, + 'stream_id': subtitle_stream_index}) + if subtitles: + return subtitles + else: + logger.warn('未找到标记为英语的文本字幕流') + return None + + except FileNotFoundError: + logger.error(f"找不到视频文件 '{video_path}'") + return None + except subprocess.CalledProcessError as e: + logger.error(f"错误:提取字幕失败。\n错误信息:{e}") + logger.error(f"FFmpeg 输出 (stderr):\n{e.stderr.decode('utf-8', errors='ignore')}") + return None + except Exception as e: + logger.error(f"使用 MediaInfo 提取字幕时发生错误:{e}") + return None + + def init_venv(self) -> bool: + venv_dir = os.path.join(self.get_data_path(), "venv_genai") + python_path = os.path.join(venv_dir, "bin", "python") if os.name != "nt" else os.path.join(venv_dir, "Scripts", + "python.exe") + # 创建虚拟环境 + try: + if not os.path.exists(venv_dir): + logger.info(f"为 google-genai 初始化虚拟环境: {venv_dir}") + venv.create(venv_dir, with_pip=True, symlinks=True, clear=True) + logger.info(f"虚拟环境创建成功: {venv_dir}") + SystemUtils.execute_with_subprocess([python_path, "-m", "pip", "install", 'google-genai']) + except subprocess.CalledProcessError: + logger.warn(f"虚拟环境创建失败") + shutil.rmtree(venv_dir) + return False + self._venv_python = python_path + + return True + + def __query_gemini( + self, + tasks: List[T], + task_type: Type[T], + api_key: str, + system_instruction: str, + model: str, + temperature: float + ) -> List[T]: + input_dict = { + 'tasks': [task.dict() for task in tasks], # 保证是可序列化格式 + 'params': { + 'api_key': api_key, + 'system_instruction': system_instruction, + 'schema': task_type.__name__, + 'model': model, + 'temperature': temperature, + 'max_retries': self._max_retries + } + } + + try: + result = subprocess.run( + [self._venv_python, self._query_gemini_script], + input=json.dumps(input_dict), + capture_output=True, + text=True, + check=True + ) + except subprocess.CalledProcessError as e: + logger.warning(f"Subprocess failed: {str(e)}") + return tasks + + try: + response = json.loads(result.stdout) + except json.JSONDecodeError: + logger.warning(f"Invalid JSON from subprocess:\n{result.stdout}") + return tasks + + if not response.get("success"): + logger.warning(f"Error in subprocess response: {response.get('message')}") + return tasks + + try: + return [task_type(**task_data) for task_data in response["data"]["tasks"]] + except Exception as e: + logger.warning(f"Failed to reconstruct tasks: {str(e)}") + return tasks + + def __process_by_ai(self, lines_to_process: List[Dict[str, Any]], cefr_lexicon, swear_words, coca20k_lexicon): + simple_vocabulary = list(filter(lambda x:x= self._context_window or (len(task_bulk) and i == len(lines_to_process)): + logger.info(f"processing dialogues: " + f"{LexiAnnot.format_duration(lines_to_process[task_bulk[0].index]['time_code'][0])} -> " + f"{LexiAnnot.format_duration(lines_to_process[i - 1]['time_code'][1])}") + answer: Optional[List[VocabularyTranslationTask]] = self.__query_gemini(task_bulk, + VocabularyTranslationTask, + self._gemini_apikey, + vocabulary_trans_instruction, + self._gemini_model, + model_temperature) + if not answer: + continue + time.sleep(self._request_interval) + for answer_line in answer: + answer_lemma = tuple(v.lemma for v in answer_line.vocabulary) + filtered_raw = [x for x in lines_to_process if x.get('index') == answer_line.index] + if not len(filtered_raw): + logger.warn(f'Unknown answer: {answer_line.index}: {answer_line.context.original_text}') + available_answer = False + for item in filtered_raw: + lemma = tuple(v['lemma'] for v in item['new_vocab']) + if lemma == answer_lemma: + available_answer = True + for i_, v in enumerate(item['new_vocab']): + v['Chinese'] = answer_line.vocabulary[i_].Chinese + break + if not available_answer: + logger.warn(f'Unknown answer: {answer_line.index}: {answer_line.context.original_text}') + task_bulk = [] + if not self._sentence_translation: + return lines_to_process + if self._gemini_available: + logger.info(f"查询整句翻译...") + # 查询整句翻译 + translation_tasks: List[DialogueTranslationTask] = [] + for line_data in lines_to_process: + translation_tasks.append(DialogueTranslationTask(index=line_data['index'], + original_text=line_data['raw_subtitle'].replace('\n', ' '), + Chinese='')) + i = 0 + dialog_trans_instruction = '''You are an expert translator. You will be given a list of dialogue translation tasks in JSON format. For each entry, provide the most appropriate translation in Simplified Chinese based on the context. + Only complete the `Chinese` field. Do not include pinyin, explanations, or any additional information.''' + while i < len(translation_tasks): + if self._shutdown_event.is_set(): + return lines_to_process + if not self._gemini_available: + break + start_index = max(0, i - 1) + end_index = min(len(translation_tasks), i + self._context_window + 1) + task_bulk: List[DialogueTranslationTask] = translation_tasks[start_index:end_index] + logger.info(f"processing dialogues: " + f"{LexiAnnot.format_duration(lines_to_process[i]['time_code'][0])} -> " + f"{LexiAnnot.format_duration(lines_to_process[min(len(translation_tasks), i + self._context_window)-1]['time_code'][1])}") + answer: List[DialogueTranslationTask] = self.__query_gemini(task_bulk, + DialogueTranslationTask, + self._gemini_apikey, + dialog_trans_instruction, + self._gemini_model, + model_temperature) + time.sleep(self._request_interval) + for answer_line in answer: + if answer_line.index not in range(i, i+self._context_window): + continue + filtered_raw = [x for x in lines_to_process if x.get('index') == answer_line.index] + if not len(filtered_raw): + logger.warn(f'Unknown answer: {answer_line.index}: {answer_line.original_text}') + available_answer = False + for item in filtered_raw: + if item['raw_subtitle'].replace('\n', ' ') == answer_line.original_text: + available_answer = True + item['Chinese'] = answer_line.Chinese + break + if not available_answer: + logger.warn(f'Unknown answer: {answer_line.index}: {answer_line.original_text}') + i += self._context_window + return lines_to_process + + def process_subtitles(self, ass_file: SSAFile) -> Optional[SSAFile]: + """ + 处理字幕内容,标记词汇并添加翻译。 + """ + lang = 'en' + cefr_lexicon = self._cefr_lexicon + swear_words = self._swear_words + coca20k_lexicon = self._coca2k_lexicon + abgr_str = (f'&H{self._color_alpha:02x}{self._accent_color_rgb[2]:02x}' + f'{self._accent_color_rgb[1]:02x}{self._accent_color_rgb[0]:02x}&') #&H00FFFFFF& + pos_map = { + 'NOUN': 'n.', + 'AUX': 'aux.', + 'VERB': 'v.', + 'ADJ': 'adj.', + 'ADV': 'adv.', + 'ADP': 'prep.', + 'CCONJ': 'conj.', + 'SCONJ': 'conj.' + } + statistical_res = LexiAnnot.analyze_ass_language(ass_file) + main_style = LexiAnnot.select_main_style_weighted(statistical_res, lang) + if not main_style: + logger.error(f'无法确定主要字幕样式') + return None + index = 0 + lines_to_process = [] + main_dialogue: Dict[int, SSAEvent] = {} + for dialogue in ass_file: + if dialogue.style != main_style: + continue + time_code = (dialogue.start, dialogue.end) + text_raw = dialogue.plaintext + line_data = {'index': index, 'time_code': time_code, 'raw_subtitle': text_raw, 'new_vocab': [], + 'Chinese': ''} + lines_to_process.append(line_data) + main_dialogue[index] = dialogue + index += 1 + lines_to_process = self.__process_by_ai(lines_to_process, cefr_lexicon, swear_words, coca20k_lexicon) + + # 在原字幕添加标注 + main_style_fs = ass_file.styles[main_style].fontsize + for line_data in lines_to_process: + if self._shutdown_event.is_set(): + return None + if line_data['new_vocab']: + replacements = line_data['new_vocab'] + for replacement in replacements: + part_of_speech = f"{{\\fnTimes New Roman\\fs{int(main_style_fs * 0.75)}\\i1}}{pos_map[replacement['pos']]}{{\\r}}" + new_text = f"{{\\c{abgr_str}}}{replacement['text']}{{\\r}}" + if self._in_place: + new_text = new_text + f" ({replacement['Chinese']} {part_of_speech})" if replacement[ + 'Chinese'] else "" + else: + dialogue = pysubs2.SSAEvent() + dialogue.start = main_dialogue[line_data['index']].start + dialogue.end = main_dialogue[line_data['index']].end + dialogue.style = 'Annotation EN' + cefr_text = f" {{\\rAnnotation CEFR}}{replacement['cefr']}{{\\r}}" if replacement[ + 'cefr'] else "" + __N = r'\N' + phone_text = f"{__N}{{\\rAnnotation PHONE}}/{replacement['phonetics']}/{{\\r}}" if replacement[ + 'phonetics'] and self._show_phonetics else "" + annot_text = f"{replacement['lemma']} {{\\rAnnotation POS}}{pos_map[replacement['pos']]}{{\\r}} {{\\rAnnotation ZH}}{replacement['Chinese']}{{\\r}}{cefr_text}{phone_text}" + dialogue.text = annot_text + ass_file.append(dialogue) + if self._show_vocabulary_detail and replacement['pos_defs']: + dialogue = pysubs2.SSAEvent() + dialogue.start = main_dialogue[line_data['index']].start + dialogue.end = main_dialogue[line_data['index']].end + dialogue.style = 'DETAIL CN' + detail_text = [] + for pos_def in replacement['pos_defs']: + meaning_str = ', '.join(pos_def['meanings']) + pos_text = f"{{\\rDETAIL POS}}{pos_def['pos']}{{\\r}} {meaning_str}" + detail_text.append(pos_text) + dialogue.text = '\\N'.join(detail_text) + ass_file.append(dialogue) + replacement['new_text'] = new_text + LexiAnnot.replace_by_plaintext_positions(main_dialogue[line_data['index']], replacements) + if self._sentence_translation: + chinese = line_data['Chinese'] + if chinese and chinese[-1] in ['。', ',']: + chinese = chinese[:-1] + main_dialogue[line_data['index']].text = main_dialogue[line_data['index']].text + f"\\N{chinese}" + return ass_file + diff --git a/plugins.v2/lexiannot/query_gemini.py b/plugins.v2/lexiannot/query_gemini.py new file mode 100644 index 0000000..243e0c4 --- /dev/null +++ b/plugins.v2/lexiannot/query_gemini.py @@ -0,0 +1,220 @@ +import sys +import json +import time +from typing import List, Dict, Any, Type, Union + +from pydantic import BaseModel, ValidationError + + +class Context(BaseModel): + original_text: str + + +class Vocabulary(BaseModel): + lemma: str + Chinese: str + + +class VocabularyTranslationTask(BaseModel): + index: int + vocabulary: List[Vocabulary] + context: Context + + +class DialogueTranslationTask(BaseModel): + index: int + original_text: str + Chinese: str + + +class GeminiResponse(BaseModel): + tasks: List[Union[VocabularyTranslationTask, DialogueTranslationTask]] + total_token_count: int + success: bool + message: str = "" + + +def validate_input_data(request_data: Dict[str, Any]) -> None: + """Validate the input data structure""" + if not isinstance(request_data, dict): + raise ValueError("Input data must be a dictionary") + if "tasks" not in request_data: + raise ValueError("Missing 'tasks' in input data") + if "params" not in request_data: + raise ValueError("Missing 'params' in input data") + + params = request_data["params"] + required_params = ["api_key", "system_instruction", "schema"] + for param in required_params: + if param not in params: + raise ValueError(f"Missing required parameter: {param}") + + +def get_task_schema(schema_name: str) -> Type[BaseModel]: + """Get the appropriate schema class based on the schema name""" + schema_map = { + 'DialogueTranslationTask': DialogueTranslationTask, + 'VocabularyTranslationTask': VocabularyTranslationTask + } + if schema_name not in schema_map: + raise ValueError(f"Unknown schema name: {schema_name}") + return schema_map[schema_name] + + +def query_gemini( + api_key: str, + translation_tasks: List[Dict[str, Any]], + task_schema: Type[Union[VocabularyTranslationTask, DialogueTranslationTask]], + system_instruction: str, + gemini_model: str = "gemini-2.0-flash", + temperature: float = 0.3, + max_retries: int = 3, + retry_delay: int = 10 +) -> GeminiResponse: + """ + Query the Gemini API for translation tasks with retry logic. + + Args: + api_key: Gemini API key + translation_tasks: List of translation tasks + task_schema: Pydantic model for the task type + system_instruction: System instruction for the model + gemini_model: Model name to use + temperature: Generation temperature + max_retries: Number of retry attempts + retry_delay: Delay between retries in seconds + + Returns: + GeminiResponse containing the results + """ + from google import genai + from google.genai import types + from google.genai.types import SchemaUnion + client = genai.Client(api_key=api_key) + messages = [] + translation_res = [] + total_token_count = 0 + + # Validate input tasks before sending to API + try: + translation_res = [task_schema(**task) for task in translation_tasks] + except ValidationError as e: + return GeminiResponse( + tasks=[], + total_token_count=0, + success=False, + message=f"Input validation failed: {str(e)}" + ) + + for attempt in range(1, max_retries + 1): + try: + response = client.models.generate_content( + model=gemini_model, + contents=json.dumps(translation_tasks, ensure_ascii=False), + config=types.GenerateContentConfig( + system_instruction=system_instruction, + response_mime_type="application/json", + response_schema=list[task_schema], + temperature=temperature + ), + ) + + if not response.parsed: + raise ValueError("Empty response from Gemini API") + + translation_res = response.parsed + total_token_count = response.usage_metadata.total_token_count + return GeminiResponse( + tasks=translation_res, + total_token_count=total_token_count, + success=True + ) + + except Exception as e: + messages.append(f"Attempt {attempt} failed: {str(e)}") + if attempt < max_retries: + time.sleep(retry_delay) + + return GeminiResponse( + tasks=[], + total_token_count=0, + success=False, + message="All retry attempts failed. " + "\n".join(messages) + ) + + +def main(): + try: + # Read and parse input + '''{ + "tasks": [{ + "index": 0, + "original_text": "That was eight years ago.", + "Chinese": "" + }, { + "index": 1, + "original_text": "Much has changed.", + "Chinese": "" + }], + "params": { + "api_key": "", + "system_instruction": "You are an expert translator. You will be given a list of dialogue translation tasks in JSON format. For each entry, provide the most appropriate translation in Simplified Chinese based on the context. \\nOnly complete the `Chinese` field. Do not include pinyin, explanations, or any additional information.", + "schema": "DialogueTranslationTask" + } + }''' + input_text = sys.stdin.read() + if not input_text: + raise ValueError("No input provided") + + request_data = json.loads(input_text) + validate_input_data(request_data) + + # Extract parameters + tasks = request_data["tasks"] + params = request_data["params"] + + # Get schema and make API call + schema = get_task_schema(params["schema"]) + response = query_gemini( + api_key=params["api_key"], + translation_tasks=tasks, + task_schema=schema, + system_instruction=params["system_instruction"], + gemini_model=params.get("model", "gemini-2.0-flash"), + temperature=float(params.get("temperature", 0.3)), + max_retries=int(params.get("max_retries", 3)) + ) + + # Prepare output + if response.success: + result = { + "success": True, + "data": { + "tasks": [task.model_dump() for task in response.tasks], + "total_token_count": response.total_token_count + } + } + else: + result = { + "success": False, + "message": response.message + } + + print(json.dumps(result, ensure_ascii=False)) + + except json.JSONDecodeError as e: + error = { + "success": False, + "message": f"Invalid JSON input: {str(e)}" + } + print(json.dumps(error)) + except Exception as e: + error = { + "success": False, + "message": f"Unexpected error: {str(e)}" + } + print(json.dumps(error)) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/plugins.v2/lexiannot/requirements.txt b/plugins.v2/lexiannot/requirements.txt new file mode 100644 index 0000000..f1dd2bf --- /dev/null +++ b/plugins.v2/lexiannot/requirements.txt @@ -0,0 +1,5 @@ +pysubs2~=1.8.0 +thinc==8.3.4 +spacy==3.8.7 +langdetect~=1.0.9 +pymediainfo~=7.0.1 \ No newline at end of file