From 3f17c50c2739e6d8da687d966720083fcbc6918c Mon Sep 17 00:00:00 2001 From: TimoYoung Date: Wed, 2 Apr 2025 13:43:07 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9EAI=E5=AD=97=E5=B9=95=E8=87=AA?= =?UTF-8?q?=E5=8A=A8=E7=94=9F=E6=88=90=E6=8F=92=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- package.json | 12 + plugins/autosubv2/README.md | 88 ++ plugins/autosubv2/__init__.py | 1336 +++++++++++++++++++++++ plugins/autosubv2/ffmpeg/__init__.py | 63 ++ plugins/autosubv2/requirements.txt | 4 + plugins/autosubv2/translate/__init__.py | 0 plugins/autosubv2/translate/openai.py | 133 +++ 7 files changed, 1636 insertions(+) create mode 100644 plugins/autosubv2/README.md create mode 100644 plugins/autosubv2/__init__.py create mode 100644 plugins/autosubv2/ffmpeg/__init__.py create mode 100644 plugins/autosubv2/requirements.txt create mode 100644 plugins/autosubv2/translate/__init__.py create mode 100644 plugins/autosubv2/translate/openai.py diff --git a/package.json b/package.json index 85be014..c7cac70 100644 --- a/package.json +++ b/package.json @@ -22,6 +22,18 @@ "v1.9": "支持馒头新架构自动签到" } }, + "AutoSubv2": { + "name": "AI字幕自动生成(v2)", + "description": "使用whisper自动生成视频文件字幕,使用大模型翻译字幕成中文。", + "version": "1.0", + "icon": "autosubtitles.jpeg", + "author": "TimoYoung", + "level": 1, + "v2": true, + "history": { + "v1.0": "first stable version" + } + }, "CustomSites": { "name": "自定义站点", "description": "增加自定义站点为签到和统计使用。", diff --git a/plugins/autosubv2/README.md b/plugins/autosubv2/README.md new file mode 100644 index 0000000..603d212 --- /dev/null +++ b/plugins/autosubv2/README.md @@ -0,0 +1,88 @@ +# AI字幕自动生成(v2) + +自动生成视频字幕,并使用大模型将字幕翻译成中文。 + +基于 [autosub](https://github.com/lightolly/MoviePilot-Plugins) 修改并适配v2,感谢原作者 + +## 功能特点 + +- 支持从视频音轨中提取字幕(使用faster-whisper) +- 支持从视频内嵌字幕中提取字幕 +- 支持从外挂字幕文件中提取字幕 +- 支持使用大模型(OpenAI)将字幕翻译成中文 +- 支持批量翻译以提高效率 +- 支持使用滑动窗口配置上下文提高翻译连贯性 +- 支持多种字幕提取语言偏好设置 + +## 配置说明 + +### 基础配置 + +| 配置项 | 说明 | 默认值 | +|--------|------|--------| +| 立即运行一次 | 保存配置后是否立即执行一次任务 | 否 | +| 本地字幕提取策略 | 设置字幕提取的优先级策略 | 优先原音字幕 | +| 翻译为中文 | 是否在需要时使用大模型将字幕翻译成中文 | 是 | +| 发送通知 | 是否发送任务执行通知 | 否 | + +### ASR配置 + +| 配置项 | 说明 | 默认值 | +|--------|------|--------| +| 允许从音轨提取字幕 | 是否允许从视频音轨中提取字幕 | 是 | +| ASR引擎 | 语音识别引擎 | faster-whisper | +| 模型 | 使用的模型大小 | base | +| 使用代理下载模型 | 是否使用代理下载模型 | 是 | + +### 翻译配置 + +| 配置项 | 说明 | 默认值 | +|--------|------|--------| +| 启用批量翻译 | 是否启用批量翻译以提高效率 | 是 | +| 每批翻译行数 | 每批处理的字幕行数 | 20 | +| 上下文窗口大小 | 翻译时考虑的上下文行数 | 5 | +| llm请求重试次数 | 翻译失败时的重试次数 | 3 | + +### 其他配置 + +| 配置项 | 说明 | 默认值 | +|--------|------|--------| +| 媒体路径 | 要处理的媒体文件或文件夹绝对路径,每行一个 | 空 | +| 文件大小(MB) | 最小处理文件大小 | 10 | + + +## 字幕提取策略说明 +字幕提取优先级:外挂字幕 > 内嵌字幕 > 音轨识别 + +字幕提取策略的选择主要取决于视频源语言和大模型的翻译能力。对于包含多语言字幕的非英语视频,建议根据以下原则选择策略: + +1. 仅英文字幕 + - 仅使用英文字幕作为翻译源 + - 当视频无英文字幕时,使用ASR提取 + - 适用于大模型仅支持中英互译的场景 + +2. 优先英文字幕 + - 优先使用英文字幕作为翻译源 + - 无英文字幕时,使用其他语言字幕 + - 当所有字幕都不存在时,使用ASR提取 + - 适用于大模型在英译中任务上表现更好的场景 + +3. 优先原音字幕 + - 优先使用视频原始语言的字幕 + - 无原音字幕时,使用英文字幕 + - 当所有字幕都不存在时,使用ASR提取 + - 适用于大模型支持多语言翻译且翻译质量较好的场景 + +## 注意事项 + +1. 翻译功能依赖OpenAI插件配置,使用前请确保已正确配置 +2. 首次使用音轨识别功能时,会自动从HuggingFace下载模型。开启"使用代理下载模型"选项会使用MP配置的代理。 +3. 媒体路径支持单个文件或文件夹的绝对路径。选择文件夹时会递归处理其中的所有视频文件,外挂字幕将从媒体文件同级目录中查找 +4. 批量翻译通过一次处理多行字幕来减少API调用次数,提高效率。如果翻译结果与原文行数不匹配,系统会自动降级为逐行翻译 +5. 上下文窗口大小和批量翻译行数需要根据大模型的推理能力来调整。当模型能力不足时,过大的批量或上下文窗口可能会影响翻译质量 +6. 翻译后的中文字幕会打上“机翻”标签。 + +## todo +- 监听媒体入库事件自动调用字幕生成 +- 任务完成后调用媒体库刷新 +- 历史任务管理与展示 \ No newline at end of file diff --git a/plugins/autosubv2/__init__.py b/plugins/autosubv2/__init__.py new file mode 100644 index 0000000..88244aa --- /dev/null +++ b/plugins/autosubv2/__init__.py @@ -0,0 +1,1336 @@ +import copy +import os +import re +import subprocess +import tempfile +import time +import traceback +from datetime import timedelta, datetime +from pathlib import Path +from typing import Tuple, Dict, Any, List +from threading import Event +import iso639 +import psutil +import pytz +import srt +from apscheduler.schedulers.background import BackgroundScheduler +from lxml import etree + +from app.core.config import settings +from app.log import logger +from app.plugins import _PluginBase +from app.utils.system import SystemUtils +from plugins.autosubv2.ffmpeg import Ffmpeg +from plugins.autosubv2.translate.openai import OpenAi +from app.schemas.types import NotificationType + + +# todo +# 监听入库事件,自动调用翻译 + +class AutoSubv2(_PluginBase): + # 插件名称 + plugin_name = "AI字幕自动生成(v2)" + # 插件描述 + plugin_desc = "使用whisper自动生成视频文件字幕,使用大模型翻译字幕成中文。" + # 插件图标 + plugin_icon = "autosubtitles.jpeg" + # 主题色 + plugin_color = "#2C4F7E" + # 插件版本 + plugin_version = "1.0" + # 插件作者 + plugin_author = "TimoYoung" + # 作者主页 + author_url = "https://github.com/TimoYoung" + # 插件配置项ID前缀 + plugin_config_prefix = "autosubv2" + # 加载顺序 + plugin_order = 14 + # 可使用的用户级别 + auth_level = 2 + + # 退出事件 + _event = Event() + # 私有属性 + _running = False + # 语句结束符 + _end_token = ['.', '!', '?', '。', '!', '?', '。"', '!"', '?"', '."', '!"', '?"'] + _noisy_token = [('(', ')'), ('[', ']'), ('{', '}'), ('【', '】'), ('♪', '♪'), ('♫', '♫'), ('♪♪', '♪♪')] + + def __init__(self): + super().__init__() + # ChatGPT + self.openai = None + self._openai_key = None + self._openai_url = None + self._openai_proxy = None + self._openai_model = None + self._scheduler = None + self.process_count = None + self.fail_count = None + self.success_count = None + self.skip_count = None + self.faster_whisper_model_path = None + self.faster_whisper_model = None + self.asr_engine = None + self.send_notify = None + self.additional_args = None + self.enable_asr = None + self.translate_zh = None + self.whisper_model = None + self.whisper_main = None + self.file_size = None + self.enable_batch = None + self.batch_size = None + self.context_window = None + self.max_retries = None + self._proxy = None + self._translate_preference = None + + def init_plugin(self, config=None): + + self.process_count = 0 + self.skip_count = 0 + self.fail_count = 0 + self.success_count = 0 + + # 如果没有配置信息, 则不处理 + if not config: + return + + self.translate_zh = config.get('translate_zh', False) + if self.translate_zh: + chatgpt = self.get_config("ChatGPT") + if not chatgpt: + logger.error(f"翻译依赖于ChatGPT,请先维护ChatGPT插件") + return + self._openai_key = chatgpt and chatgpt.get("openai_key") + self._openai_url = chatgpt and chatgpt.get("openai_url") + self._openai_proxy = chatgpt and chatgpt.get("proxy") + self._openai_model = chatgpt and chatgpt.get("model") + if not self._openai_key: + logger.error(f"翻译依赖于ChatGPT,请先维护openai_key") + return + self.openai = OpenAi(api_key=self._openai_key, api_url=self._openai_url, + proxy=settings.PROXY if self._openai_proxy else None, + model=self._openai_model) + + path_list = list(set(config.get('path_list').split('\n'))) + self.file_size = int(config.get('file_size')) if config.get('file_size') else 10 + self.whisper_main = config.get('whisper_main') + self.whisper_model = config.get('whisper_model') + self.enable_asr = config.get('enable_asr', True) + self.enable_batch = config.get('enable_batch', True) + self.batch_size = int(config.get('batch_size')) if config.get('batch_size') else 20 + self.context_window = int(config.get('context_window')) if config.get('context_window') else 5 + self.max_retries = int(config.get('max_retries')) if config.get('max_retries') else 3 + self.additional_args = config.get('additional_args', '-t 4 -p 1') + self.send_notify = config.get('send_notify', False) + self.asr_engine = config.get('asr_engine', 'faster_whisper') + self.faster_whisper_model = config.get('faster_whisper_model', 'base') + self.faster_whisper_model_path = config.get('faster_whisper_model_path', + self.get_data_path() / "faster-whisper-models") + self._proxy = config.get('proxy', False) + self._translate_preference = config.get('translate_preference', 'origin_first') + run_now = config.get('run_now') + self.stop_service() + + if not run_now: + return + + config['run_now'] = False + self.update_config(config) + # 如果没有配置信息, 则不处理 + if not path_list or not self.file_size: + logger.warn(f"配置信息不完整,不进行处理") + return + + # asr 配置检查 + if self.enable_asr and not self.__check_asr(): + return + + if self._running: + logger.warn(f"上一次任务还未完成,不进行处理") + return + + if run_now: + self._scheduler = BackgroundScheduler(timezone=settings.TZ) + logger.info("AI字幕自动生成任务,立即运行一次") + self._scheduler.add_job(func=self._do_autosub, kwargs={'path_list': path_list}, trigger='date', + run_date=datetime.now(tz=pytz.timezone(settings.TZ)) + timedelta(seconds=3), + name="AI字幕自动生成") + + # 启动任务 + if self._scheduler.get_jobs(): + self._scheduler.print_jobs() + self._scheduler.start() + + def _do_autosub(self, path_list: str): + # 依次处理每个目录 + try: + self._running = True + self.success_count = self.skip_count = self.fail_count = self.process_count = 0 + for path in path_list: + if self._event.is_set(): + logger.info(f"字幕生成服务停止") + return + logger.info(f"开始处理目录/文件:{path} ...") + # 如果目录不存在, 则不处理 + if not os.path.exists(path): + logger.warn(f"目录/文件不存在,不进行处理") + continue + + # 如果目录不是绝对路径, 则不处理 + if not os.path.isabs(path): + logger.warn(f"目录/文件不是绝对路径,不进行处理") + continue + + if os.path.isdir(path): + # 处理目录 + self.__process_folder_subtitle(path) + elif os.path.splitext(path)[-1].lower() in settings.RMT_MEDIAEXT: + # 处理单个视频文件 + self.__process_file_subtitle(path) + # 如果目录不是文件夹, 则不处理 + else: + logger.warn(f"目录不是文件夹或视频文件,不进行处理") + continue + + except Exception as e: + logger.error(f"处理异常: {e}") + finally: + logger.info(f"处理完成: " + f"成功{self.success_count} / 跳过{self.skip_count} / 失败{self.fail_count} / 共{self.process_count}") + self._running = False + + def __check_asr(self): + if self.asr_engine == 'whisper.cpp': + if not self.whisper_main or not self.whisper_model: + logger.warn(f"配置信息不完整,不进行处理") + return + if not os.path.exists(self.whisper_main): + logger.warn(f"whisper.cpp主程序不存在,不进行处理") + return False + if not os.path.exists(self.whisper_model): + logger.warn(f"whisper.cpp模型文件不存在,不进行处理") + return False + # 校验扩展参数是否包含异常字符 + if self.additional_args and re.search(r'[;|&]', self.additional_args): + logger.warn(f"扩展参数包含异常字符,不进行处理") + return False + elif self.asr_engine == 'faster-whisper': + if not self.faster_whisper_model_path or not self.faster_whisper_model: + logger.warn(f"配置信息不完整,不进行处理") + return + if not os.path.exists(self.faster_whisper_model_path): + logger.info(f"创建faster-whisper模型目录:{self.faster_whisper_model_path}") + os.mkdir(self.faster_whisper_model_path) + try: + from faster_whisper import WhisperModel, download_model + except ImportError: + logger.warn(f"faster-whisper 未安装,不进行处理") + return False + return True + else: + logger.warn(f"未配置asr引擎,不进行处理") + return False + return True + + def __process_file_subtitle(self, video_file): + if not video_file: + return + # 如果文件大小小于指定大小, 则不处理 + if os.path.getsize(video_file) < self.file_size: + return + + self.process_count += 1 + start_time = time.time() + file_path, file_ext = os.path.splitext(video_file) + file_name = os.path.basename(video_file) + + try: + logger.info(f"开始处理文件:{video_file} ...") + # 判断目的字幕(和内嵌)是否已存在 + if self.__target_subtitle_exists(video_file): + logger.warn(f"字幕文件已经存在,不进行处理") + self.skip_count += 1 + return + # 生成字幕 + ret, lang, gen_sub_path = self.__generate_subtitle(video_file, file_path, self.enable_asr) + if not ret: + message = f" 媒体: {file_name}\n " + if not self.enable_asr: + message += "内嵌&外挂字幕不存在,不进行翻译" + self.skip_count += 1 + else: + message += "生成字幕失败,跳过后续处理" + self.fail_count += 1 + + if self.send_notify: + self.post_message(mtype=NotificationType.Plugin, title="【自动字幕生成】", text=message) + return + + if self.translate_zh: + # 翻译字幕 + logger.info(f"开始翻译字幕为中文 ...") + # self.__translate_zh_subtitle(lang, f"{file_path}.{lang}.srt", f"{file_path}.zh.机翻.srt") + self.__translate_zh_subtitle(lang, gen_sub_path, f"{file_path}.zh.机翻.srt") + logger.info(f"翻译字幕完成:{file_name}.zh.机翻.srt") + + end_time = time.time() + message = f" 媒体: {file_name}\n 处理完成\n 字幕原始语言: {lang}\n " + if self.translate_zh: + message += f"字幕翻译语言: zh\n " + message += f"耗时:{round(end_time - start_time, 2)}秒" + logger.info(f"自动字幕生成 处理完成:{message}") + if self.send_notify: + self.post_message(mtype=NotificationType.Plugin, title="【自动字幕生成】", text=message) + self.success_count += 1 + except Exception as e: + logger.error(f"自动字幕生成 处理异常:{e}") + end_time = time.time() + message = f" 媒体: {file_name}\n 处理失败\n 耗时:{round(end_time - start_time, 2)}秒" + if self.send_notify: + self.post_message(mtype=NotificationType.Plugin, title="【自动字幕生成】", text=message) + # 打印调用栈 + logger.debug(traceback.format_exc()) + self.fail_count += 1 + + def __process_folder_subtitle(self, path): + """ + 处理目录字幕 + :param path: + :return: + """ + # 获取目录媒体文件列表 + for video_file in self.__get_library_files(path): + if self._event.is_set(): + logger.info(f"{video_file}处理中止") + return + self.__process_file_subtitle(video_file) + + def __do_speech_recognition(self, audio_lang, audio_file): + """ + 语音识别, 生成字幕 + :param audio_lang: + :param audio_file: + :return: + """ + lang = audio_lang + if self.asr_engine == 'whisper.cpp': + command = [self.whisper_main] + self.additional_args.split() + command += ['-l', lang, '-m', self.whisper_model, '-osrt', '-of', audio_file, audio_file] + ret = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + if ret.returncode == 0: + if lang == 'auto': + # 从output中获取语言 "whisper_full_with_state: auto-detected language: en (p = 0.973642)" + output = ret.stdout.decode('utf-8') if ret.stdout else "" + lang = re.search(r"auto-detected language: (\w+)", output) + if lang and lang.group(1): + lang = lang.group(1) + else: + lang = "en" + return True, lang + elif self.asr_engine == 'faster-whisper': + try: + from faster_whisper import WhisperModel, download_model + # 设置缓存目录, 防止缓存同目录出现 cross-device 错误 + cache_dir = os.path.join(self.faster_whisper_model_path, "cache") + if not os.path.exists(cache_dir): + os.mkdir(cache_dir) + os.environ["HF_HUB_CACHE"] = cache_dir + if self._proxy: + os.environ["HTTP_PROXY"] = settings.PROXY['http'] + os.environ["HTTPS_PROXY"] = settings.PROXY['https'] + model = WhisperModel( + download_model(self.faster_whisper_model, local_files_only=False, cache_dir=cache_dir), + device="cpu", compute_type="int8", cpu_threads=psutil.cpu_count(logical=False)) + segments, info = model.transcribe(audio_file, + language=lang if lang != 'auto' else None, + word_timestamps=True, + vad_filter=True, + temperature=0, + beam_size=5) + logger.info("Detected language '%s' with probability %f" % (info.language, info.language_probability)) + + if lang == 'auto': + lang = info.language + + subs = [] + if lang in ['en', 'eng']: + # 英文先生成单词级别字幕,再合并 + idx = 0 + for segment in segments: + if self._event.is_set(): + logger.info(f"whisper音轨转录服务停止") + raise Exception(f"用户中断当前任务") + for word in segment.words: + idx += 1 + subs.append(srt.Subtitle(index=idx, + start=timedelta(seconds=word.start), + end=timedelta(seconds=word.end), + content=word.word)) + subs = self.__merge_srt(subs) + else: + for i, segment in enumerate(segments): + if self._event.is_set(): + logger.info(f"whisper音轨转录服务停止") + raise Exception(f"用户中断当前任务") + subs.append(srt.Subtitle(index=i, + start=timedelta(seconds=segment.start), + end=timedelta(seconds=segment.end), + content=segment.text)) + self.__save_srt(f"{audio_file}.srt", subs) + logger.info(f"音轨转字幕完成") + return True, lang + except ImportError: + logger.warn(f"faster-whisper 未安装,不进行处理") + return False, None + except Exception as e: + traceback.print_exc() + logger.error(f"faster-whisper 处理异常:{e}") + return False, None + return False, None + + def __generate_subtitle(self, video_file, subtitle_file, enable_asr=True): + """ + 生成字幕 + :param video_file: 视频文件 + :param subtitle_file: 字幕文件, 不包含后缀 + :return: 生成成功返回True,字幕语言,字幕路径,否则返回False, None, None + """ + # 获取文件元数据 + video_meta = Ffmpeg().get_video_metadata(video_file) + if not video_meta: + logger.error(f"获取视频文件元数据失败,跳过后续处理") + return False, None, None + # 获取字幕语言偏好 + if self._translate_preference == "english_only": + prefer_subtitle_langs = ['en', 'eng'] + strict = True + elif self._translate_preference == "english_first": + prefer_subtitle_langs = ['en', 'eng'] + strict = False + else: # self.translate_preference == "origin_first" + prefer_subtitle_langs = None + strict = False + + # 从视频文件音轨获取语言信息 + ret, audio_index, audio_lang = self.__get_video_prefer_audio(video_meta, prefer_lang=prefer_subtitle_langs) + if not ret: + logger.info(f"字幕源偏好:{self._translate_preference} 获取音轨元数据失败") + return False, None, None + if not iso639.find(audio_lang) or not iso639.to_iso639_1(audio_lang): + logger.info(f"字幕源偏好:{self._translate_preference} 未从音轨元数据中获取到语言信息") + audio_lang = 'auto' + # 当字幕源偏好为origin_first时,优先使用音轨语言 + if self._translate_preference == "origin_first": + prefer_subtitle_langs = ['en', 'eng'] if audio_lang == 'auto' else [audio_lang, + iso639.to_iso639_1(audio_lang)] + # 获取外挂字幕 + logger.info(f"使用 {prefer_subtitle_langs} 匹配已有外挂字幕文件 ...") + external_sub_exist, external_sub_lang, exist_sub_name = self.__external_subtitle_exists(video_file, + prefer_subtitle_langs, + only_srt=True, + strict=strict) + # 获取内嵌字幕 + logger.info(f"使用 {prefer_subtitle_langs} 匹配内嵌字幕文件 ...") + inner_sub_exist, subtitle_index, inner_sub_lang, = self.__get_video_prefer_subtitle(video_meta, + prefer_subtitle_langs, + strict=strict) + + # 优先返回符合语言要求的外部字幕 + def get_sub_path(): + video_dir, _ = os.path.split(video_file) + return os.path.join(video_dir, exist_sub_name) + + extract_subtitle = False + if self._translate_preference == "english_only": + if external_sub_exist: + logger.info(f"字幕源偏好:{self._translate_preference} 外挂字幕存在,字幕语言 {external_sub_lang}") + return True, iso639.to_iso639_1(external_sub_lang), get_sub_path() + elif inner_sub_exist: + logger.info(f"字幕源偏好:{self._translate_preference} 内嵌字幕存在,字幕语言 {inner_sub_lang}") + extract_subtitle = True + else: + logger.info(f"字幕源偏好:{self._translate_preference} 未匹配到外挂或内嵌字幕,需要使用asr提取") + else: # english_first/origin_first + if external_sub_exist and external_sub_lang in prefer_subtitle_langs: + logger.info(f"字幕源偏好:{self._translate_preference} 外挂字幕存在,字幕语言 {external_sub_lang}") + return True, iso639.to_iso639_1(external_sub_lang), get_sub_path() + elif inner_sub_exist and inner_sub_lang in prefer_subtitle_langs: + logger.info(f"字幕源偏好:{self._translate_preference} 内嵌字幕存在,字幕语言 {inner_sub_lang}") + extract_subtitle = True + elif external_sub_exist: + logger.info(f"字幕源偏好:{self._translate_preference} 外挂字幕存在,字幕语言 {external_sub_lang}") + return True, iso639.to_iso639_1(external_sub_lang), get_sub_path() + elif inner_sub_exist: + logger.info(f"字幕源偏好:{self._translate_preference} 内嵌字幕存在,字幕语言 {inner_sub_lang}") + extract_subtitle = True + else: + logger.info(f"字幕源偏好:{self._translate_preference} 未匹配到外挂或内嵌字幕,需要使用asr提取") + # 提取内嵌字幕 + if extract_subtitle: + inner_sub_lang = iso639.to_iso639_1(inner_sub_lang) \ + if (inner_sub_lang and iso639.find(inner_sub_lang) and iso639.to_iso639_1(inner_sub_lang)) else 'und' + extracted_sub_path = f"{subtitle_file}.{inner_sub_lang}.srt" + Ffmpeg().extract_subtitle_from_video(video_file, extracted_sub_path, subtitle_index) + logger.info(f"提取字幕完成:{extracted_sub_path}") + return True, inner_sub_lang, extracted_sub_path + # 使用asr音轨识别字幕 + if audio_lang != 'auto': + audio_lang = iso639.to_iso639_1(audio_lang) + + if not enable_asr: + logger.info(f"未开启语音识别,且无已有字幕文件,跳过后续处理") + return False, None, None + + # 清理异常退出的临时文件 + tempdir = tempfile.gettempdir() + for file in os.listdir(tempdir): + if file.startswith('autosub-'): + os.remove(os.path.join(tempdir, file)) + + with tempfile.NamedTemporaryFile(prefix='autosub-', suffix='.wav', delete=True) as audio_file: + # 提取音频 + logger.info(f"正在提取音频:{audio_file.name} ...") + Ffmpeg().extract_wav_from_video(video_file, audio_file.name, audio_index) + logger.info(f"提取音频完成:{audio_file.name}") + + # 生成字幕 + logger.info(f"开始生成字幕, 语言 {audio_lang} ...") + ret, lang = self.__do_speech_recognition(audio_lang, audio_file.name) + if ret: + logger.info(f"生成字幕成功,原始语言:{lang}") + # 复制字幕文件 + SystemUtils.copy(Path(f"{audio_file.name}.srt"), Path(f"{subtitle_file}.{lang}.srt")) + logger.info(f"复制字幕文件:{subtitle_file}.{lang}.srt") + # 删除临时文件 + os.remove(f"{audio_file.name}.srt") + return ret, lang, Path(f"{subtitle_file}.{lang}.srt") + else: + logger.error(f"生成字幕失败") + return False, None, None + + @staticmethod + def __get_library_files(in_path, exclude_path=None): + """ + 获取目录媒体文件列表 + """ + if not os.path.isdir(in_path): + yield in_path + return + + for root, dirs, files in os.walk(in_path): + if exclude_path and any(os.path.abspath(root).startswith(os.path.abspath(path)) + for path in exclude_path.split(",")): + continue + + for file in files: + cur_path = os.path.join(root, file) + # 检查后缀 + if os.path.splitext(file)[-1].lower() in settings.RMT_MEDIAEXT: + yield cur_path + + @staticmethod + def __load_srt(file_path): + """ + 加载字幕文件 + :param file_path: 字幕文件路径 + :return: + """ + with open(file_path, 'r', encoding="utf8") as f: + srt_text = f.read() + return list(srt.parse(srt_text)) + + @staticmethod + def __save_srt(file_path, srt_data): + """ + 保存字幕文件 + :param file_path: 字幕文件路径 + :param srt_data: 字幕数据 + :return: + """ + with open(file_path, 'w', encoding="utf8") as f: + f.write(srt.compose(srt_data)) + + def __merge_srt(self, subtitle_data): + """ + 合并整句字幕 + :param subtitle_data: + :return: + """ + subtitle_data = copy.deepcopy(subtitle_data) + # 合并字幕 + merged_subtitle = [] + sentence_end = True + + for index, item in enumerate(subtitle_data): + # 当前字幕先将多行合并为一行,再去除首尾空格 + content = item.content.replace('\n', ' ').strip() + # 去除html标签 + parse = etree.HTML(content) + if parse is not None: + content = parse.xpath('string(.)') + if content == '': + continue + item.content = content + + # 背景音等字幕,跳过 + if self.__is_noisy_subtitle(content): + merged_subtitle.append(item) + sentence_end = True + continue + + if not merged_subtitle or sentence_end: + merged_subtitle.append(item) + elif not sentence_end: + merged_subtitle[-1].content = f"{merged_subtitle[-1].content} {content}" + merged_subtitle[-1].end = item.end + + # 如果当前字幕内容以标志符结尾,则设置语句已经终结 + if content.endswith(tuple(self._end_token)): + sentence_end = True + # 如果上句字幕超过一定长度,则设置语句已经终结 + elif len(merged_subtitle[-1].content) > 80: + sentence_end = True + else: + sentence_end = False + + return merged_subtitle + + def __get_video_prefer_audio(self, video_meta, prefer_lang=None): + """ + 获取视频的首选音轨,如果有多音轨, 优先指定语言音轨,否则获取默认音轨 + :param video_meta + :return: + """ + if type(prefer_lang) == str and prefer_lang: + prefer_lang = [prefer_lang] + + # 获取首选音轨 + audio_lang = None + audio_index = None + audio_stream = filter(lambda x: x.get('codec_type') == 'audio', video_meta.get('streams', [])) + for index, stream in enumerate(audio_stream): + if not audio_index: + audio_index = index + audio_lang = stream.get('tags', {}).get('language', 'und') + # 获取默认音轨 + if stream.get('disposition', {}).get('default'): + audio_index = index + audio_lang = stream.get('tags', {}).get('language', 'und') + # 获取指定语言音轨 + if prefer_lang and stream.get('tags', {}).get('language') in prefer_lang: + audio_index = index + audio_lang = stream.get('tags', {}).get('language', 'und') + break + + # 如果没有音轨, 则不处理 + if audio_index is None: + logger.warn(f"没有音轨,不进行处理") + return False, None, None + + logger.info(f"选中音轨信息:{audio_index}, {audio_lang}") + return True, audio_index, audio_lang + + def __get_video_prefer_subtitle(self, video_meta, prefer_lang=None, strict=False, srt=True): + """ + 获取视频的首选字幕。优先级:1.字幕为偏好语言 2.默认字幕 3.第一个字幕 + :param video_meta: 视频元数据 + :param prefer_lang: 字幕偏好语言 + :param strict: 是否严格模式。如果指定了偏好语言,严格模式下必须返回偏好语言的字幕。 + :return: (是否命中字幕,字幕index,字幕语言) + """ + # from https://wiki.videolan.org/Subtitles_codecs/ + """ + https://trac.ffmpeg.org/wiki/ExtractSubtitles + ffmpeg -codecs | grep subtitle + DES... ass ASS (Advanced SSA) subtitle (decoders: ssa ass ) (encoders: ssa ass ) + DES... dvb_subtitle DVB subtitles (decoders: dvbsub ) (encoders: dvbsub ) + DES... dvd_subtitle DVD subtitles (decoders: dvdsub ) (encoders: dvdsub ) + D.S... hdmv_pgs_subtitle HDMV Presentation Graphic Stream subtitles (decoders: pgssub ) + ..S... hdmv_text_subtitle HDMV Text subtitle + D.S... jacosub JACOsub subtitle + D.S... microdvd MicroDVD subtitle + D.S... mpl2 MPL2 subtitle + D.S... pjs PJS (Phoenix Japanimation Society) subtitle + D.S... realtext RealText subtitle + D.S... sami SAMI subtitle + ..S... srt SubRip subtitle with embedded timing + ..S... ssa SSA (SubStation Alpha) subtitle + D.S... stl Spruce subtitle format + DES... subrip SubRip subtitle (decoders: srt subrip ) (encoders: srt subrip ) + D.S... subviewer SubViewer subtitle + D.S... subviewer1 SubViewer v1 subtitle + D.S... vplayer VPlayer subtitle + DES... webvtt WebVTT subtitle + """ + image_based_subtitle_codecs = ( + 'dvd_subtitle', + 'dvb_subtitle', + 'hdmv_pgs_subtitle', + ) + + if prefer_lang is str and prefer_lang: + prefer_lang = [prefer_lang] + + # 获取首选字幕 + subtitle_lang = None + subtitle_index = None + subtitle_score = 0 + subtitle_stream = filter(lambda x: x.get('codec_type') == 'subtitle', video_meta.get('streams', [])) + for index, stream in enumerate(subtitle_stream): + # 如果是强制字幕,则跳过 + if stream.get('disposition', {}).get('forced'): + continue + # image-based 字幕,跳过 + if srt and ( + 'width' in stream + or stream.get('codec_name') in image_based_subtitle_codecs + ): + continue + cur_is_default = stream.get('disposition', {}).get('default') + cur_lang = stream.get('tags', {}).get('language') + # 计算当前字幕得分:1.字幕为偏好语言*4 2.默认字幕*2 3.第一个字幕*1 + cur_score = 0 + if prefer_lang and cur_lang in prefer_lang: + cur_score += 4 + if cur_is_default: + cur_score += 2 + if subtitle_index is None: + cur_score += 1 + # 第一个字幕初始化为默认字幕 + subtitle_lang, subtitle_index, subtitle_score = cur_lang, index, cur_score + if cur_score > subtitle_score: + subtitle_lang, subtitle_index, subtitle_score = cur_lang, index, cur_score + + # 未找到字幕 + if subtitle_index is None: + logger.debug(f"没有内嵌字幕") + return False, None, None + if strict and prefer_lang and subtitle_lang not in prefer_lang: + logger.warn(f"严格模式,没有偏好语言的字幕") + return False, None, None + logger.debug(f"命中内嵌字幕信息:{subtitle_index}, {subtitle_lang}, score:{subtitle_score}") + return True, subtitle_index, subtitle_lang + + def __is_noisy_subtitle(self, content): + """ + 判断是否为背景音等字幕 + :param content: + :return: + """ + return any(content.startswith(t[0]) and content.endswith(t[1]) for t in self._noisy_token) + + def __get_context(self, all_subs: list, target_indices: List[int], is_batch: bool) -> str: + """通用上下文获取方法""" + min_idx = max(0, min(target_indices) - self.context_window) + max_idx = min(len(all_subs) - 1, max(target_indices) + self.context_window) if is_batch else min(target_indices) + + context = [] + for idx in range(min_idx, max_idx + 1): + status = "[待译]" if idx in target_indices else "" + content = all_subs[idx].content.replace('\n', ' ').strip() + context.append(f"{status}{content}") + + return "\n".join(context) + + def __process_items(self, all_subs: list, items: list) -> list: + """统一处理入口(支持批量和单条)""" + if self.enable_batch and len(items) > 1: + return self.__process_batch(all_subs, items) + return [self.__process_single(all_subs, item) for item in items] + + def __process_batch(self, all_subs: list, batch: list) -> list: + """批量处理逻辑""" + indices = [all_subs.index(item) for item in batch] + context = self.__get_context(all_subs, indices, is_batch=True) if self.context_window > 0 else None + batch_text = '\n'.join([item.content for item in batch]) + + openai = OpenAi(self._openai_key, self._openai_url, self._openai_proxy, self._openai_model) + try: + ret, result = openai.translate_to_zh(batch_text, context) + if not ret: + raise Exception(result) + + translated = [line.strip() for line in result.split('\n') if line.strip()] + if len(translated) != len(batch): + raise Exception(f"批次行数不匹配 {len(translated)}/{len(batch)}") + + for item, trans in zip(batch, translated): + item.content = f"{trans}\n{item.content}" + self._stats['batch_success'] += len(batch) + return batch + except Exception as e: + logger.warning(f"批次翻译失败({str(e)}),降级到单行匹配...") + self._stats['batch_fail'] += 1 + return [self.__process_single(all_subs, item) for item in batch] + + def __process_single(self, all_subs: List[srt.Subtitle], item: srt.Subtitle) -> srt.Subtitle: + """单条处理逻辑""" + openai = OpenAi(self._openai_key, self._openai_url, self._openai_proxy, self._openai_model) + for _ in range(self.max_retries): + idx = all_subs.index(item) + context = self.__get_context(all_subs, [idx], is_batch=False) if self.context_window > 0 else None + success, trans = openai.translate_to_zh(item.content, context) + + if success: + item.content = f"{trans}\n{item.content}" + self._stats['line_fallback'] += 1 + return item + + time.sleep(1) + + item.content = f"[翻译失败]\n{item.content}" + return item + + def __translate_zh_subtitle(self, source_lang: str, source_subtitle: str, dest_subtitle: str): + self._stats = {'total': 0, 'batch_success': 0, 'batch_fail': 0, 'line_fallback': 0} + subs = self.__load_srt(source_subtitle) + valid_subs = self.__merge_srt(subs) + logger.info(f"合并前字幕数: {len(subs)},合并后字幕数: {len(valid_subs)}") + self._stats['total'] = len(valid_subs) + processed = [] + current_batch = [] + + for item in valid_subs: + if self._event.is_set(): + logger.info(f"字幕{source_subtitle}翻译停止") + raise Exception(f"用户中断当前任务") + current_batch.append(item) + + if len(current_batch) >= self.batch_size: + processed += self.__process_items(valid_subs, current_batch) + current_batch = [] + logger.info(f"进度: {len(processed)}/{len(valid_subs)}") + + if current_batch: + processed += self.__process_items(valid_subs, current_batch) + + self.__save_srt(dest_subtitle, processed) + logger.info(f""" + 翻译完成! + 总处理条目: {self._stats['total']} + 批次成功: {self._stats['batch_success']} ({(self._stats['batch_success'] / self._stats['total']) * 100:.1f}%) + 批次失败: {self._stats['batch_fail']} + 行补偿翻译: {self._stats['line_fallback']} + """) + + @staticmethod + def __external_subtitle_exists(video_file, prefer_langs=None, only_srt=False, strict=True): + """ + 外部字幕文件是否存在,支持多种格式及扩展需求。 + :param video_file: 视频文件路径 + :param prefer_langs: 偏好语言列表,支持单个语言字符串或列表 + :param only_srt: 是否只匹配srt格式的字幕 + :param strict: 是否严格匹配偏好语言.当不存在偏好语言字幕但存在其他语言字幕时,是否返回其他字幕 + :return: 元组 (是否存在, 检测到的语言, 文件名) + """ + video_dir, video_name = os.path.split(video_file) + video_name, video_ext = os.path.splitext(video_name) + + if prefer_langs and type(prefer_langs) == str: + prefer_langs = [prefer_langs] + + metadata_flags = ["default", "forced", "foreign", "sdh", "cc", "hi", "机翻"] + if only_srt: + subtitle_extensions = [".srt"] + else: + subtitle_extensions = [".srt", ".sub", ".ass", ".ssa", ".vtt"] + + def parse_props(props): + """ + 解析字幕属性信息,提取语言和元数据标记。 + :param props: 属性字符串 + :return: (语言, 元数据列表) + """ + parts = props.split(".") + if len(parts) < 1: + return None, [] + + cur_subtitle_lang = None + cur_metadata = [] + # 倒序遍历文件名中的标记 + for i in range(len(parts) - 1, -1, -1): + part = parts[i] + if part in metadata_flags: + cur_metadata.append(part) + elif cur_subtitle_lang is None: + try: + iso639.to_iso639_1(part) + except iso639.NonExistentLanguageError: + continue + else: + cur_subtitle_lang = iso639.to_iso639_1(part) # 记录最后一个语言标记 + + return cur_subtitle_lang, cur_metadata + + # 备选的字幕语言.当strict=False时生效, 用于在未找到偏好语言时返回其他语言 + second_lang = None + second_file = None + # 检查字幕文件 + for file in os.listdir(video_dir): + if not file.startswith(video_name): + continue + + # 检查扩展名是否在支持范围内 + _, ext = os.path.splitext(file) + if ext.lower() not in subtitle_extensions: + continue + + # 提取文件名中的语言和元数据信息 + props_str = file[len(video_name) + 1: -len(ext)] if file.startswith(video_name + ".") else "" + subtitle_lang, metadata = parse_props(props_str) + + # 如果没有语言标记,跳过 + if not subtitle_lang: + continue + + # 如果指定了偏好语言 + if prefer_langs: + if subtitle_lang in prefer_langs: + return True, subtitle_lang, file + else: + second_lang = subtitle_lang + second_file = file + else: + # 未指定偏好语言,找到的第一个字幕即返回 + return True, subtitle_lang, file + if not strict and second_lang: + return True, second_lang, second_file + return False, None, None + + def __target_subtitle_exists(self, video_file): + """ + 目标字幕文件是否存在 + :param video_file: + :return: + """ + if self.translate_zh: + prefer_langs = ['zh', 'chi', 'zh-CN', 'chs', 'zhs', 'zh-Hans', 'zhong', 'simp', 'cn'] + strict = True + else: + if self._translate_preference == "english_first": + prefer_langs = ['en', 'eng'] + strict = False + elif self._translate_preference == "english_only": + prefer_langs = ['en', 'eng'] + strict = True + else: + prefer_langs = None + strict = False + + exist, lang, _ = self.__external_subtitle_exists(video_file, prefer_langs, strict=strict) + if exist: + return True + + video_meta = Ffmpeg().get_video_metadata(video_file) + if not video_meta: + return False + ret, subtitle_index, subtitle_lang = self.__get_video_prefer_subtitle(video_meta, prefer_lang=prefer_langs, + srt=False) + if ret and subtitle_lang in prefer_langs: + return True + + return False + + def get_form(self) -> Tuple[List[dict], Dict[str, Any]]: + """ + 拼装插件配置页面,需要返回两块数据:1、页面配置;2、数据结构 + """ + return [ + { + 'component': 'VForm', + 'content': [ + { + 'component': 'VRow', + 'content': [ + { + 'component': 'VCol', + 'props': { + 'cols': 12, + 'md': 3 + }, + 'content': [ + { + 'component': 'VSwitch', + 'props': { + 'model': 'run_now', + 'label': '立即运行一次', + } + } + ] + }, + { + 'component': 'VCol', + 'props': { + 'cols': 12, + 'md': 3 + }, + 'content': [ + { + 'component': 'VSelect', + 'props': { + 'model': 'translate_preference', + 'label': '本地字幕提取策略', + 'items': [ + {'title': '仅英文字幕', 'value': 'english_only'}, + {'title': '优先英文字幕', 'value': 'english_first'}, + {'title': '优先原音字幕', 'value': 'origin_first'} + ] + } + } + ] + }, + { + 'component': 'VCol', + 'props': { + 'cols': 12, + 'md': 3 + }, + 'content': [ + { + 'component': 'VSwitch', + 'props': { + 'model': 'translate_zh', + 'label': '翻译为中文', + } + } + ] + }, + { + 'component': 'VCol', + 'props': { + 'cols': 12, + 'md': 3 + }, + 'content': [ + { + 'component': 'VSwitch', + 'props': { + 'model': 'send_notify', + 'label': '发送通知', + } + } + ] + } + ] + }, + { + 'component': 'VRow', + 'content': [ + { + 'component': 'VCol', + 'props': { + 'cols': 12, + 'md': 3 + }, + 'content': [ + { + 'component': 'VSwitch', + 'props': { + 'model': 'enable_asr', + 'label': '允许从音轨提取字幕', + } + } + ] + }, + { + 'component': 'VCol', + 'props': { + 'cols': 12, + 'md': 3, + 'v-show': 'enable_asr' + }, + 'content': [ + { + 'component': 'VSelect', + 'props': { + 'model': 'asr_engine', + 'label': 'ASR引擎', + 'items': [ + {'title': 'faster-whisper', 'value': 'faster-whisper'} + ] + } + } + ] + }, + { + 'component': 'VCol', + 'props': { + 'cols': 12, + 'md': 3, + 'v-show': 'enable_asr' + }, + 'content': [ + { + 'component': 'VSelect', + 'props': { + 'model': 'faster_whisper_model', + 'label': '模型', + 'items': [ + {'title': 'tiny', 'value': 'tiny'}, + {'title': 'tiny.en', 'value': 'tiny.en'}, + {'title': 'base', 'value': 'base'}, + {'title': 'base.en', 'value': 'base.en'}, + {'title': 'small', 'value': 'small'}, + {'title': 'small.en', 'value': 'small.en'}, + {'title': 'medium', 'value': 'medium'}, + {'title': 'large-v1', 'value': 'large-v1'}, + {'title': 'large-v2', 'value': 'large-v2'}, + {'title': 'large-v3', 'value': 'large-v3'}, + {'title': 'distil-small.en', 'value': 'distil-small.en'}, + {'title': 'distil-medium.en', 'value': 'distil-medium.en'}, + {'title': 'distil-large-v2.en', 'value': 'distil-large-v2'}, + {'title': 'distil-large-v3.en', 'value': 'distil-large-v3'}, + ] + } + } + ] + }, + { + 'component': 'VCol', + 'props': { + 'cols': 12, + 'md': 3, + 'v-show': 'enable_asr' + }, + 'content': [ + { + 'component': 'VSwitch', + 'props': { + 'model': 'proxy', + 'label': '使用代理下载模型', + } + } + ] + }, + ] + }, + { + 'component': 'VRow', + 'content': [ + + { + 'component': 'VCol', + 'props': { + 'cols': 12, + 'md': 3, + 'v-show': 'translate_zh' + }, + 'content': [ + { + 'component': 'VSwitch', + 'props': { + 'model': 'enable_batch', + 'label': '启用批量翻译', + } + } + ] + }, + { + 'component': 'VCol', + 'props': { + 'cols': 12, + 'md': 3, + 'v-show': 'translate_zh' + }, + 'content': [ + { + 'component': 'VTextField', + 'props': { + 'model': 'batch_size', + 'label': '每批翻译行数', + 'placeholder': '20' + } + } + ] + }, + { + 'component': 'VCol', + 'props': { + 'cols': 12, + 'md': 3, + 'v-show': 'translate_zh' + }, + 'content': [ + { + 'component': 'VTextField', + 'props': { + 'model': 'context_window', + 'label': '上下文窗口大小', + 'placeholder': '5' + } + } + ] + }, + { + 'component': 'VCol', + 'props': { + 'cols': 12, + 'md': 3, + 'v-show': 'translate_zh' + }, + 'content': [ + { + 'component': 'VTextField', + 'props': { + 'model': 'max_retries', + 'label': 'llm请求重试次数', + 'placeholder': '3' + } + } + ] + }, + ] + }, + { + 'component': 'VRow', + 'content': [ + { + 'component': 'VCol', + 'props': { + 'cols': 12 + }, + 'content': [ + { + 'component': 'VTextarea', + 'props': { + 'model': 'path_list', + 'label': '媒体路径', + 'rows': 5, + 'placeholder': '媒体文件或文件夹绝对路径(如为文件夹会遍历其中所有媒体文件),每行一个路径,请确保路径正确' + } + } + ] + } + ] + }, + { + 'component': 'VRow', + 'content': [ + { + 'component': 'VCol', + 'props': { + 'cols': 12, + }, + 'content': [ + { + 'component': 'VTextField', + 'props': { + 'model': 'file_size', + 'label': '文件大小(MB)', + 'placeholder': '单位 MB, 大于该大小的文件才会进行字幕生成' + } + } + ] + } + ] + }, + { + 'component': 'VRow', + 'content': [ + { + 'component': 'VCol', + 'props': { + 'cols': 12, + }, + 'content': [ + { + 'component': 'VAlert', + 'props': { + 'type': 'info', + 'variant': 'tonal', + 'text': '翻译依赖 OpenAi 插件配置' + } + } + ] + } + ] + }, + { + 'component': 'VRow', + 'content': [ + { + 'component': 'VCol', + 'props': { + 'cols': 12, + }, + 'content': [ + { + 'component': 'VAlert', + 'props': { + 'type': 'success', + 'variant': 'tonal' + }, + 'content': [ + { + 'component': 'span', + 'text': '详细说明参考:' + }, + { + 'component': 'a', + 'props': { + 'href': 'https://github.com/TimoYoung/MoviePilot-Plugins/blob/main/plugins/autosubv2/README.md', + 'target': '_blank' + }, + 'content': [ + { + 'component': 'u', + 'text': 'README' + } + ] + }] + } + + ] + } + ] + } + ] + } + ], { + "run_now": False, + "send_notify": False, + "translate_zh": True, + "enable_asr": True, + "asr_engine": "faster-whisper", + "faster_whisper_model": "base", + "proxy": True, + "translate_preference": "origin_first", + "enable_batch": True, + "batch_size": 20, + "context_window": 5, + "max_retries": 3, + "path_list": "", + "file_size": "10", + } + + def get_api(self) -> List[Dict[str, Any]]: + pass + + def get_page(self) -> List[dict]: + pass + + @staticmethod + def get_command() -> List[Dict[str, Any]]: + pass + + def get_state(self) -> bool: + """ + 获取插件状态,如果插件正在运行, 则返回True + """ + return self._running + + def stop_service(self): + """ + 退出插件 + """ + if self._running: + self._event.set() + self._running = False + self._scheduler.shutdown() + self._event.clear() + logger.info(f"停止自动字幕生成服务") diff --git a/plugins/autosubv2/ffmpeg/__init__.py b/plugins/autosubv2/ffmpeg/__init__.py new file mode 100644 index 0000000..a949c70 --- /dev/null +++ b/plugins/autosubv2/ffmpeg/__init__.py @@ -0,0 +1,63 @@ +import json +import subprocess + + +class Ffmpeg: + + @staticmethod + def extract_wav_from_video(video_path, audio_path, audio_index=None): + """ + 使用ffmpeg从视频文件中提取16000hz, 16-bit的wav格式音频 + """ + if not video_path or not audio_path: + return False + + # 提取指定音频流 + if audio_index: + command = ['ffmpeg', "-hide_banner", "-loglevel", "warning", '-y', '-i', video_path, + '-map', f'0:a:{audio_index}', + '-acodec', 'pcm_s16le', '-ac', '1', '-ar', '16000', audio_path] + else: + command = ['ffmpeg', "-hide_banner", "-loglevel", "warning", '-y', '-i', video_path, + '-acodec', 'pcm_s16le', '-ac', '1', '-ar', '16000', audio_path] + + ret = subprocess.run(command).returncode + if ret == 0: + return True + return False + + @staticmethod + def get_video_metadata(video_path): + """ + 获取视频元数据 + """ + if not video_path: + return False + + try: + command = ['ffprobe', '-v', 'quiet', '-print_format', 'json', '-show_format', '-show_streams', video_path] + result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + if result.returncode == 0: + return json.loads(result.stdout.decode("utf-8")) + except Exception as e: + print(e) + return None + + @staticmethod + def extract_subtitle_from_video(video_path, subtitle_path, subtitle_index=None): + """ + 从视频中提取字幕 + """ + if not video_path or not subtitle_path: + return False + + if subtitle_index: + command = ['ffmpeg', "-hide_banner", "-loglevel", "warning", '-y', '-i', video_path, + '-map', f'0:s:{subtitle_index}', + subtitle_path] + else: + command = ['ffmpeg', "-hide_banner", "-loglevel", "warning", '-y', '-i', video_path, subtitle_path] + ret = subprocess.run(command).returncode + if ret == 0: + return True + return False diff --git a/plugins/autosubv2/requirements.txt b/plugins/autosubv2/requirements.txt new file mode 100644 index 0000000..fb5a734 --- /dev/null +++ b/plugins/autosubv2/requirements.txt @@ -0,0 +1,4 @@ +iso639~=0.1.4 +srt~=3.5.3 +python-dotenv~=1.0.1 +faster-whisper~=1.0.1 diff --git a/plugins/autosubv2/translate/__init__.py b/plugins/autosubv2/translate/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/plugins/autosubv2/translate/openai.py b/plugins/autosubv2/translate/openai.py new file mode 100644 index 0000000..9654a94 --- /dev/null +++ b/plugins/autosubv2/translate/openai.py @@ -0,0 +1,133 @@ +import time +from typing import List, Union + +import openai +from cacheout import Cache + +OpenAISessionCache = Cache(maxsize=100, ttl=3600, timer=time.time, default=None) + + +class OpenAi: + _api_key: str = None + _api_url: str = None + _model: str = "gpt-3.5-turbo" + + def __init__(self, api_key: str = None, api_url: str = None, proxy: dict = None, model: str = None): + self._api_key = api_key + self._api_url = api_url + openai.api_base = self._api_url + "/v1" + openai.api_key = self._api_key + if proxy and proxy.get("https"): + openai.proxy = proxy.get("https") + if model: + self._model = model + + @staticmethod + def __save_session(session_id: str, message: str): + """ + 保存会话 + :param session_id: 会话ID + :param message: 消息 + :return: + """ + seasion = OpenAISessionCache.get(session_id) + if seasion: + seasion.append({ + "role": "assistant", + "content": message + }) + OpenAISessionCache.set(session_id, seasion) + + @staticmethod + def __get_session(session_id: str, message: str) -> List[dict]: + """ + 获取会话 + :param session_id: 会话ID + :return: 会话上下文 + """ + seasion = OpenAISessionCache.get(session_id) + if seasion: + seasion.append({ + "role": "user", + "content": message + }) + else: + seasion = [ + { + "role": "system", + "content": "请在接下来的对话中请使用中文回复,并且内容尽可能详细。" + }, + { + "role": "user", + "content": message + }] + OpenAISessionCache.set(session_id, seasion) + return seasion + + def __get_model(self, message: Union[str, List[dict]], + prompt: str = None, + user: str = "MoviePilot", + **kwargs): + """ + 获取模型 + """ + if not isinstance(message, list): + if prompt: + message = [ + { + "role": "system", + "content": prompt + }, + { + "role": "user", + "content": message + } + ] + else: + message = [ + { + "role": "user", + "content": message + } + ] + return openai.ChatCompletion.create( + model=self._model, + user=user, + messages=message, + **kwargs + ) + + @staticmethod + def __clear_session(session_id: str): + """ + 清除会话 + :param session_id: 会话ID + :return: + """ + if OpenAISessionCache.get(session_id): + OpenAISessionCache.delete(session_id) + + def translate_to_zh(self, text: str, context: str = None): + """ + 翻译为中文 + :param text: 输入文本 + :param context: 翻译上下文 + """ + system_prompt = """您是一位专业字幕翻译专家,请严格遵循以下规则: +1. 将原文精准翻译为简体中文,保持原文本意 +2. 使用自然的口语化表达,符合中文观影习惯 +3. 结合上下文语境,人物称谓、专业术语、情感语气在上下文中保持连贯 +4. 按行翻译待译内容。翻译结果不要包括上下文。 +5. 输出内容必须仅包括译文。不要输出任何开场白,解释说明或总结""" + user_prompt = f"翻译上下文:\n{context}\n\n需要翻译的内容:\n{text}" if context else f"请翻译:\n{text}" + result = "" + try: + completion = self.__get_model(prompt=system_prompt, + message=user_prompt, + temperature=0.2, + top_p=0.9) + result = completion.choices[0].message.content.strip() + return True, result + except Exception as e: + print(f"{str(e)}:{result}") + return False, f"{str(e)}:{result}"