mirror of
https://github.com/jxxghp/MoviePilot-Plugins.git
synced 2026-06-13 07:26:50 +00:00
Compare commits
9 Commits
InvitesSig
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8dd0783b4a | ||
|
|
afc28446f5 | ||
|
|
e89047a95f | ||
|
|
ad8137a65a | ||
|
|
1ececcb410 | ||
|
|
8490668f5d | ||
|
|
1fb6b4845e | ||
|
|
b63e74b680 | ||
|
|
2b575d9e8d |
@@ -195,11 +195,13 @@
|
||||
"name": "媒体库刮削",
|
||||
"description": "定时对媒体库进行刮削,补齐缺失元数据和图片。",
|
||||
"labels": "刮削",
|
||||
"version": "2.1.1",
|
||||
"version": "2.1.3",
|
||||
"icon": "scraper.png",
|
||||
"author": "jxxghp",
|
||||
"level": 1,
|
||||
"history": {
|
||||
"v2.1.3": "修复分类路径下按文件标记识别及强制类型跨库扫描问题",
|
||||
"v2.1.2": "修复分类目录被误识别导致下级媒体未刮削的问题",
|
||||
"v2.1.1": "调整目录计算方法,以支持更多重命名格式",
|
||||
"v2.1": "优化执行周期输入,需要MoviePilot v2.2.1+",
|
||||
"v2.0": "兼容MoviePilot V2 版本",
|
||||
@@ -586,11 +588,12 @@
|
||||
"name": "美剧生词标注",
|
||||
"description": "根据CEFR等级,为英语影视剧标注高级词汇。",
|
||||
"labels": "英语",
|
||||
"version": "1.2.5",
|
||||
"version": "1.2.6",
|
||||
"icon": "LexiAnnot.png",
|
||||
"author": "wumode",
|
||||
"level": 1,
|
||||
"history": {
|
||||
"v1.2.6": "适配 MoviePilot 新版 LLM 助手",
|
||||
"v1.2.5": "langchain 1.x 兼容 (主程序版本需高于 2.9.17)",
|
||||
"v1.2.4": "增强数据校验",
|
||||
"v1.2.3": "优化提示词",
|
||||
@@ -663,12 +666,13 @@
|
||||
"name": "动态企微可信IP",
|
||||
"description": "修改企微应用可信IP,支持Srever酱等第三方通知。验证码以?结尾发送到企业微信应用",
|
||||
"labels": "消息通知",
|
||||
"version": "2.1.1",
|
||||
"version": "2.1.2",
|
||||
"icon": "Wecom_A.png",
|
||||
"author": "RamenRa",
|
||||
"level": 2,
|
||||
"system_version": ">=2.12.0",
|
||||
"history": {
|
||||
"v2.1.2": "修复本地扫码获取不到验证码的问题" ,
|
||||
"v2.1.1": "优化MP/Nas关闭期间IP变动检测不到的现象。支持IYUU通知移除AnPush v2支持在微信通知失效时用第三方发送通知 支持||Q修改IP时不发送通知 使用全局AI助手需使用/wxcode 510010的格式发送验证码",
|
||||
"v2.0.1": "修复企业微信后台页面语言未稳定切换为中文导致无法匹配配置按钮的问题。",
|
||||
"v2.0.0": "V2 专用大版本改用 CloakBrowser 启动企业微信浏览器流程,默认插件不再声明 V2 兼容。",
|
||||
@@ -1099,5 +1103,38 @@
|
||||
"v1.0.2": "修复UI界面显示不全及前端路由报错问题",
|
||||
"v1.0.1": "新增 Agent Tokens 配额管理、供应商优先级切换和用量展示"
|
||||
}
|
||||
},
|
||||
"TraktCleaner": {
|
||||
"name": "Trakt 观看清理",
|
||||
"description": "根据 Trakt 播放记录,自动清理下载器中已观看的种子。",
|
||||
"labels": "Trakt,清理",
|
||||
"version": "1.0",
|
||||
"icon": "https://cdn.jsdelivr.net/gh/homarr-labs/dashboard-icons/png/trakt.png",
|
||||
"author": "Guoyin-Wen",
|
||||
"level": 1,
|
||||
"history": {
|
||||
"v1.0": "初始版本:根据 Trakt 播放记录自动清理下载器中已观看的种子"
|
||||
}
|
||||
},
|
||||
"UpdateWeChatIp": {
|
||||
"name": "动态企微可信IP",
|
||||
"description": "修改企微应用可信IP,可本地扫码刷新Cookie,直接调用接口更稳定",
|
||||
"labels": "消息通知",
|
||||
"version": "1.0.8",
|
||||
"icon": "Wecom_A.png",
|
||||
"author": "书小白",
|
||||
"level": 2,
|
||||
"v2": true,
|
||||
"history": {
|
||||
"1.0.8": "完善日志输出",
|
||||
"1.0.7": "插件初始化时调用一下check确定登录状态",
|
||||
"1.0.6": "修复未登录时_party_cache_data为空导致UI崩溃的BUG\n图片地址优先使用MP_DOMAIN获取,如果未配置使用127.0.0.1地址\n回调解析qrcode_key时判断是否存在,不存在发送错误\n优化请求企微接口的参数",
|
||||
"1.0.5": "根据Code Review结果优化代码",
|
||||
"1.0.4": "增加IP更新记录查询",
|
||||
"1.0.3": "cookie保活输出返回值",
|
||||
"1.0.2": "支持多个应用ID",
|
||||
"1.0.1": "IP更新时发送通知,增加API接口,指定更新的IP",
|
||||
"1.0.0": "初始化"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -31,7 +31,7 @@ class DynamicWeChat(_PluginBase):
|
||||
# 插件图标
|
||||
plugin_icon = "Wecom_A.png"
|
||||
# 插件版本
|
||||
plugin_version = "2.1.1"
|
||||
plugin_version = "2.1.2"
|
||||
# 插件作者
|
||||
plugin_author = "RamenRa"
|
||||
# 作者主页
|
||||
@@ -320,6 +320,7 @@ class DynamicWeChat(_PluginBase):
|
||||
if not event_data or event_data.get("action") != "dynamicwechat":
|
||||
return
|
||||
context = None
|
||||
self._qr_running = True
|
||||
try:
|
||||
context = self._launch_browser_context(headless=True)
|
||||
page = context.new_page()
|
||||
@@ -349,6 +350,7 @@ class DynamicWeChat(_PluginBase):
|
||||
except Exception as e:
|
||||
logger.error(f"本地扫码任务: 本地扫码失败: {e}")
|
||||
finally:
|
||||
self._qr_running = False
|
||||
if context:
|
||||
context.close()
|
||||
|
||||
|
||||
@@ -1,55 +1,39 @@
|
||||
import asyncio
|
||||
import copy
|
||||
import os
|
||||
import json
|
||||
import os
|
||||
import queue
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
import threading
|
||||
from collections import Counter
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Tuple, Optional, Literal
|
||||
from typing import Any, Dict, List, Literal, Optional, Tuple
|
||||
|
||||
import pymediainfo
|
||||
from langdetect import detect
|
||||
from langchain_community.callbacks import get_openai_callback
|
||||
from pysubs2 import SSAFile, SSAEvent, SSAStyle, Color, Alignment
|
||||
from pysubs2 import Alignment, Color, SSAEvent, SSAStyle, SSAFile
|
||||
|
||||
from app.core.config import settings
|
||||
from app.agent.llm.helper import LLMHelper
|
||||
from app.chain.media import MediaChain
|
||||
from app.core.cache import cached
|
||||
from app.core.config import global_vars, settings
|
||||
from app.core.context import MediaInfo
|
||||
from app.core.event import Event, eventmanager
|
||||
from app.helper.directory import DirectoryHelper
|
||||
from app.log import logger
|
||||
from app.plugins import _PluginBase
|
||||
from app.core.cache import cached
|
||||
from app.core.event import eventmanager, Event
|
||||
from app.schemas import Response
|
||||
from app.schemas.types import NotificationType, MediaType
|
||||
from app.schemas import Context, Response, TransferInfo
|
||||
from app.schemas.types import EventType, MediaType, NotificationType
|
||||
from app.utils.http import RequestUtils
|
||||
from app.utils.string import StringUtils
|
||||
from app.schemas import TransferInfo, Context
|
||||
from app.schemas.types import EventType
|
||||
from app.core.context import MediaInfo
|
||||
from app.chain.media import MediaChain
|
||||
|
||||
from .agenttool import QueryAnnotationTasksTool, VocabularyAnnotatingTool
|
||||
from .lexicon import Lexicon
|
||||
from .schemas import (
|
||||
IDGenerator,
|
||||
TaskStatus,
|
||||
Task,
|
||||
TasksApiParams,
|
||||
ProcessResult,
|
||||
SegmentList,
|
||||
TaskParams, SegmentStatistics,
|
||||
)
|
||||
from .pipeline import UNIVERSAL_POS_MAP, extract_advanced_words, llm_process_chain
|
||||
from .schemas import IDGenerator, ProcessResult, SegmentList, SegmentStatistics, Task, TaskParams, TasksApiParams, \
|
||||
TaskStatus, LLMConfig
|
||||
from .spacyworker import SpacyWorker
|
||||
from .subtitle import SubtitleProcessor, style_text
|
||||
from .pipeline import (
|
||||
extract_advanced_words,
|
||||
llm_process_chain,
|
||||
initialize_llm,
|
||||
UNIVERSAL_POS_MAP,
|
||||
)
|
||||
from .subtitle import SubtitleHelper, SubtitleProcessor, style_text
|
||||
|
||||
|
||||
class LexiAnnot(_PluginBase):
|
||||
@@ -60,7 +44,7 @@ class LexiAnnot(_PluginBase):
|
||||
# 插件图标
|
||||
plugin_icon = "LexiAnnot.png"
|
||||
# 插件版本
|
||||
plugin_version = "1.2.5"
|
||||
plugin_version = "1.2.6"
|
||||
# 插件作者
|
||||
plugin_author = "wumode"
|
||||
# 作者主页
|
||||
@@ -91,7 +75,6 @@ class LexiAnnot(_PluginBase):
|
||||
_ffmpeg_path: str = "ffmpeg"
|
||||
_english_only = False
|
||||
_when_file_trans = False
|
||||
_model_temperature = ""
|
||||
_custom_files = ""
|
||||
_accent_color = ""
|
||||
_font_scaling = ""
|
||||
@@ -102,6 +85,8 @@ class LexiAnnot(_PluginBase):
|
||||
_libraries: List[str] = []
|
||||
_use_mp_agent: bool = False
|
||||
_use_proxy: bool = False
|
||||
_test_llm: bool = False
|
||||
_thinking_level: str = None
|
||||
|
||||
# protected variables
|
||||
_lexicon_repo = "https://raw.githubusercontent.com/wumode/LexiAnnot/"
|
||||
@@ -137,7 +122,6 @@ class LexiAnnot(_PluginBase):
|
||||
self._ffmpeg_path = config.get("ffmpeg_path") or "ffmpeg"
|
||||
self._english_only = config.get("english_only")
|
||||
self._when_file_trans = config.get("when_file_trans")
|
||||
self._model_temperature = config.get("model_temperature") or "0.3"
|
||||
self._show_phonetics = config.get("show_phonetics")
|
||||
self._custom_files = config.get("custom_files") or ""
|
||||
self._accent_color = config.get("accent_color")
|
||||
@@ -151,6 +135,8 @@ class LexiAnnot(_PluginBase):
|
||||
self._llm_provider = config.get("llm_provider") or "google"
|
||||
self._use_mp_agent = config.get("use_mp_agent") or False
|
||||
self._use_proxy = config.get("use_proxy") or False
|
||||
self._test_llm = config.get("test_llm") or False
|
||||
self._thinking_level = config.get("thinking_level") or "off"
|
||||
|
||||
libraries = [
|
||||
library.name for library in DirectoryHelper().get_library_dirs()
|
||||
@@ -158,7 +144,7 @@ class LexiAnnot(_PluginBase):
|
||||
self._libraries = [
|
||||
library for library in self._libraries if library in libraries
|
||||
]
|
||||
self._accent_color_rgb = LexiAnnot.hex_to_rgb(self._accent_color) or (255, 255, 0,)
|
||||
self._accent_color_rgb = SubtitleHelper.hex_to_rgb(self._accent_color) or (255, 255, 0,)
|
||||
self._color_alpha = int(self._opacity) if self._opacity and len(self._opacity) else 0
|
||||
if self._delete_data:
|
||||
# 删除不再保存在数据库的数据
|
||||
@@ -193,6 +179,9 @@ class LexiAnnot(_PluginBase):
|
||||
continue
|
||||
self.add_media_file(file_path)
|
||||
self._onlyonce = False
|
||||
if self._test_llm:
|
||||
asyncio.run_coroutine_threadsafe(self.test_llm(), global_vars.loop)
|
||||
self._test_llm = False
|
||||
self.__update_config()
|
||||
|
||||
def get_form(self) -> Tuple[List[dict], Dict[str, Any]]:
|
||||
@@ -679,14 +668,17 @@ class LexiAnnot(_PluginBase):
|
||||
"model": "gemini_model",
|
||||
"disabled": "use_mp_agent",
|
||||
"label": "模型名称",
|
||||
"hint": "支持手动输入",
|
||||
"persistent-hint": True,
|
||||
"items": [
|
||||
"gemini-2.5-flash",
|
||||
"gemini-2.5-flash-lite",
|
||||
"gemini-3.5-flash",
|
||||
"gemini-3.1-flash-lite",
|
||||
"gemini-2.5-pro",
|
||||
"gemini-2.0-flash",
|
||||
"gemini-2.0-flash-lite",
|
||||
"deepseek-ai/DeepSeek-V3.2",
|
||||
"deepseek-ai/DeepSeek-R1"
|
||||
"gemini-2.5-flash-lite",
|
||||
"deepseek-ai/DeepSeek-V4-Pro",
|
||||
"deepseek-ai/DeepSeek-V4-Flash",
|
||||
"deepseek-v4-flash",
|
||||
"deepseek-v4-pro"
|
||||
],
|
||||
},
|
||||
}
|
||||
@@ -735,28 +727,6 @@ class LexiAnnot(_PluginBase):
|
||||
}
|
||||
],
|
||||
},
|
||||
{
|
||||
"component": "VCol",
|
||||
"props": {"cols": 12, "md": 4},
|
||||
"content": [
|
||||
{
|
||||
"component": "VSelect",
|
||||
"props": {
|
||||
"model": "model_temperature",
|
||||
"label": "模型温度",
|
||||
"items": [
|
||||
{"title": "0", "value": "0"},
|
||||
{"title": "0.1", "value": "0.1"},
|
||||
{"title": "0.2", "value": "0.2"},
|
||||
{"title": "0.3", "value": "0.3"},
|
||||
{"title": "0.4", "value": "0.4"},
|
||||
{"title": "0.5", "value": "0.5"},
|
||||
{"title": "1.0", "value": "1.0"},
|
||||
],
|
||||
},
|
||||
}
|
||||
],
|
||||
},
|
||||
{
|
||||
"component": "VCol",
|
||||
"props": {
|
||||
@@ -777,8 +747,55 @@ class LexiAnnot(_PluginBase):
|
||||
}
|
||||
],
|
||||
},
|
||||
{
|
||||
"component": "VCol",
|
||||
"props": {"cols": 12, "md": 4},
|
||||
"content": [
|
||||
{
|
||||
"component": "VSelect",
|
||||
"props": {
|
||||
"model": "thinking_level",
|
||||
"label": "思考模式",
|
||||
"disabled": "use_mp_agent",
|
||||
"items": [
|
||||
{"title": "关闭 (off)", "value": "off"},
|
||||
{"title": "自动 (auto)", "value": "auto"},
|
||||
{"title": "最小 (minimal)", "value": "minimal"},
|
||||
{"title": "低 (low)", "value": "low"},
|
||||
{"title": "中 (medium)", "value": "medium"},
|
||||
{"title": "高 (high)", "value": "high"},
|
||||
{"title": "极高 (max)", "value": "max"},
|
||||
{"title": "超高 (xhigh)", "value": "xhigh"},
|
||||
],
|
||||
},
|
||||
}
|
||||
],
|
||||
},
|
||||
],
|
||||
},
|
||||
{
|
||||
"component": "VRow",
|
||||
"content": [
|
||||
{
|
||||
"component": "VCol",
|
||||
"props": {
|
||||
"cols": 12,
|
||||
"md": 12,
|
||||
},
|
||||
"content": [
|
||||
{
|
||||
"component": "VSwitch",
|
||||
"props": {
|
||||
"model": "test_llm",
|
||||
"label": "测试调用",
|
||||
"hint": "启用后,请在插件日志查看测试结果",
|
||||
"persistent-hint": True
|
||||
},
|
||||
}
|
||||
],
|
||||
},
|
||||
]
|
||||
}
|
||||
],
|
||||
},
|
||||
],
|
||||
@@ -883,7 +900,6 @@ class LexiAnnot(_PluginBase):
|
||||
"ffmpeg_path": "",
|
||||
"english_only": True,
|
||||
"when_file_trans": True,
|
||||
"model_temperature": "0.3",
|
||||
"custom_files": "",
|
||||
"accent_color": "",
|
||||
"font_scaling": "1",
|
||||
@@ -896,6 +912,8 @@ class LexiAnnot(_PluginBase):
|
||||
"llm_base_url": "",
|
||||
"use_mp_agent": False,
|
||||
"use_proxy": False,
|
||||
"test_llm": False,
|
||||
"thinking_level": "off"
|
||||
}
|
||||
|
||||
def get_api(self) -> List[Dict[str, Any]]:
|
||||
@@ -1046,6 +1064,25 @@ class LexiAnnot(_PluginBase):
|
||||
else:
|
||||
logger.debug("ℹ️ No running worker thread to stop.")
|
||||
|
||||
async def test_llm(self):
|
||||
model_config = self.get_model_config()
|
||||
try:
|
||||
logger.info("测试 LLM 调用...")
|
||||
result = await LLMHelper.test_current_settings(
|
||||
provider=model_config.provider,
|
||||
model=model_config.model_name,
|
||||
thinking_level=model_config.thinking_level,
|
||||
use_proxy=model_config.use_proxy,
|
||||
base_url=model_config.base_url,
|
||||
api_key=model_config.apikey
|
||||
)
|
||||
if not result.get("reply_preview"):
|
||||
logger.warning("LLM 响应为空")
|
||||
else:
|
||||
logger.info(f"LLM 返回: {result['reply_preview']}")
|
||||
except Exception as err:
|
||||
logger.error(f"LLM 调用出错: {str(err)}")
|
||||
|
||||
def delete_data(self):
|
||||
# 删除词典
|
||||
data_path = self.get_data_path()
|
||||
@@ -1156,7 +1193,6 @@ class LexiAnnot(_PluginBase):
|
||||
"ffmpeg_path": self._ffmpeg_path,
|
||||
"english_only": self._english_only,
|
||||
"when_file_trans": self._when_file_trans,
|
||||
"model_temperature": self._model_temperature,
|
||||
"show_phonetics": self._show_phonetics,
|
||||
"custom_files": self._custom_files,
|
||||
"accent_color": self._accent_color,
|
||||
@@ -1170,6 +1206,8 @@ class LexiAnnot(_PluginBase):
|
||||
"llm_base_url": self._llm_base_url,
|
||||
"use_mp_agent": self._use_mp_agent,
|
||||
"use_proxy": self._use_proxy,
|
||||
"test_llm": self._test_llm,
|
||||
"thinking_level": self._thinking_level
|
||||
}
|
||||
)
|
||||
|
||||
@@ -1310,7 +1348,7 @@ class LexiAnnot(_PluginBase):
|
||||
|
||||
ffmpeg_path = self._ffmpeg_path if self._ffmpeg_path else "ffmpeg"
|
||||
eng_mark = ["en", "en-US", "eng", "en-GB", "english", "en-AU"]
|
||||
embedded_subtitles = LexiAnnot._extract_subtitles_by_lang(path, eng_mark, ffmpeg_path)
|
||||
embedded_subtitles = SubtitleHelper.extract_subtitles_by_lang(path, eng_mark, ffmpeg_path)
|
||||
if not embedded_subtitles:
|
||||
return ProcessResult(
|
||||
status=TaskStatus.CANCELED, message="未找到嵌入式英文文本字幕"
|
||||
@@ -1332,7 +1370,7 @@ class LexiAnnot(_PluginBase):
|
||||
return ProcessResult(status=TaskStatus.CANCELED, message="任务已取消")
|
||||
ass_subtitle = SSAFile.from_string(embedded_subtitle["subtitle"], format_="ass")
|
||||
if embedded_subtitle.get("codec_id") == "S_TEXT/UTF8":
|
||||
ass_subtitle = LexiAnnot.set_srt_style(ass_subtitle)
|
||||
ass_subtitle = SubtitleHelper.set_srt_style(ass_subtitle)
|
||||
ass_subtitle = self.__set_style(ass_subtitle)
|
||||
ass_subtitle, stat = self.process_subtitles(ass_subtitle, lexi, spacy_worker, mediainfo)
|
||||
if self._shutdown_event.is_set():
|
||||
@@ -1498,170 +1536,6 @@ class LexiAnnot(_PluginBase):
|
||||
for new_path in transfer_info.file_list_new or []:
|
||||
self.add_media_file(new_path)
|
||||
|
||||
@staticmethod
|
||||
def format_duration(ms):
|
||||
total_seconds, milliseconds = divmod(ms, 1000)
|
||||
hours, remainder = divmod(total_seconds, 3600)
|
||||
minutes, seconds = divmod(remainder, 60)
|
||||
hundredths = milliseconds // 10
|
||||
return f"{hours}:{minutes:02}:{seconds:02}.{hundredths:02}"
|
||||
|
||||
@staticmethod
|
||||
def _remove_substring(replacements: list[dict]):
|
||||
new_list = []
|
||||
replacements.sort(key=lambda x: x["end"] - x["start"], reverse=True)
|
||||
for r in replacements:
|
||||
if any((r["start"] >= new["start"] and r["end"] <= new["end"]) for new in new_list):
|
||||
continue
|
||||
new_list.append(r)
|
||||
return new_list
|
||||
|
||||
@staticmethod
|
||||
def replace_by_plaintext_positions(line: SSAEvent, replacements: List[dict]):
|
||||
"""
|
||||
使用 replacements 中的 plaintext 位置信息, 替换 line.text 中的内容。
|
||||
:param line: SSAEvent line
|
||||
:param replacements: [{'start': int, 'end': int, 'old_text': str, 'new_text': str}, ...]
|
||||
"""
|
||||
text = line.text
|
||||
tag_pattern = re.compile(r"{.*?}") # 匹配 {xxx} 格式控制符
|
||||
special_pattern = re.compile(r"\\[Nh]")
|
||||
# 构建 plaintext 位置到 text 索引的映射
|
||||
mapping = {} # plaintext_index -> text_index
|
||||
p_index = 0 # 当前 plaintext 索引
|
||||
t_index = 0 # 当前 text 索引
|
||||
|
||||
while t_index < len(text):
|
||||
if text[t_index] == "{":
|
||||
# 跳过格式标签
|
||||
match = tag_pattern.match(text, t_index)
|
||||
if match:
|
||||
t_index = match.end()
|
||||
continue
|
||||
elif text[t_index] == "\\":
|
||||
match = special_pattern.match(text, t_index)
|
||||
if match:
|
||||
t_index = match.end() - 1
|
||||
continue
|
||||
# 非格式字符
|
||||
mapping[p_index] = t_index
|
||||
p_index += 1
|
||||
t_index += 1
|
||||
replacements = LexiAnnot._remove_substring(replacements)
|
||||
# 按照 mapping 执行替换(倒序替换防止位置错位)
|
||||
new_text = text
|
||||
for r in sorted(replacements, key=lambda x: x["start"], reverse=True):
|
||||
start = mapping.get(r["start"])
|
||||
end = mapping.get(r["end"] - 1)
|
||||
if start is None or end is None:
|
||||
continue
|
||||
end += 1
|
||||
new_text = new_text[:start] + r["new_text"] + new_text[end:]
|
||||
|
||||
line.text = new_text
|
||||
|
||||
@staticmethod
|
||||
def analyze_ass_language(ass_file: SSAFile):
|
||||
|
||||
def _replace_with_spaces(_text):
|
||||
"""
|
||||
使用等长的空格替换文本中的 (xxx) 模式。
|
||||
例如:"(Hi)" 会被替换成 " " (4个空格)
|
||||
"""
|
||||
pattern = r"(\([^()]*\)|\[[^\[\]]*\])"
|
||||
return re.sub(pattern, lambda match: " " * len(match.group(1)), _text)
|
||||
|
||||
styles = {}
|
||||
for style in ass_file.styles:
|
||||
styles[style] = {"text": [], "duration": 0, "text_size": 0, "times": 0}
|
||||
for dialogue in ass_file:
|
||||
style = dialogue.style
|
||||
text = _replace_with_spaces(dialogue.plaintext)
|
||||
sub_text = text.split("\n")
|
||||
if style not in styles or not text:
|
||||
continue
|
||||
styles[style]["text"].extend(sub_text)
|
||||
styles[style]["duration"] += dialogue.duration
|
||||
styles[style]["text_size"] += len(text)
|
||||
styles[style]["times"] += 1
|
||||
style_language_analysis = {}
|
||||
for style_name, data in styles.items():
|
||||
all_text = " ".join(data["text"])
|
||||
if not all_text.strip():
|
||||
style_language_analysis[style_name] = None
|
||||
continue
|
||||
|
||||
languages = []
|
||||
# 对每个文本片段进行语言检测
|
||||
for text_fragment in data["text"]:
|
||||
try:
|
||||
lang = detect(text_fragment)
|
||||
languages.append(lang)
|
||||
except Exception as e:
|
||||
# 无法检测的文本
|
||||
logger.debug(e)
|
||||
pass
|
||||
|
||||
if languages:
|
||||
language_counts = Counter(languages)
|
||||
most_common_language = language_counts.most_common(1)[0]
|
||||
style_language_analysis[style_name] = {
|
||||
"main_language": most_common_language[0],
|
||||
"proportion": most_common_language[1] / len(languages),
|
||||
"duration": data["duration"],
|
||||
"text_size": data["text_size"],
|
||||
"times": data["times"],
|
||||
}
|
||||
else:
|
||||
style_language_analysis[style_name] = None
|
||||
|
||||
return style_language_analysis
|
||||
|
||||
@staticmethod
|
||||
def select_main_style_weighted(analysis: Dict[str, Any], known_language: str, weights = None):
|
||||
"""
|
||||
根据语言分析结果和已知的字幕语言,使用加权评分选择主要样式
|
||||
|
||||
:params analysis: `analyze_ass_language` 函数的输出结果
|
||||
:params known_language: 已知的字幕语言代码
|
||||
:params weights: 各个维度的权重,权重之和应为 1
|
||||
:returns: 主要字幕的样式名称,如果没有匹配的样式则返回 None
|
||||
"""
|
||||
if weights is None:
|
||||
weights = {"times": 0.5, "text_size": 0.4, "duration": 0.1}
|
||||
matching_styles = []
|
||||
max_times = max([analysis.get("times", 0) for _, analysis in analysis.items() if analysis] or [0]) or 1
|
||||
max_text_size = max([analysis.get("text_size", 0) for _, analysis in analysis.items() if analysis] or [0]) or 1
|
||||
max_duration = max([analysis.get("duration", 0) for _, analysis in analysis.items() if analysis] or [0]) or 1
|
||||
for style, analysis in analysis.items():
|
||||
if not analysis:
|
||||
continue
|
||||
if analysis.get("main_language") == known_language:
|
||||
# 跳过多语言
|
||||
if analysis.get("proportion", 0) < 0.5:
|
||||
continue
|
||||
score = 0
|
||||
score += analysis.get("times", 0) * weights.get("times", 0) / max_times
|
||||
score += analysis.get("text_size", 0) * weights.get("text_size", 0) / max_text_size
|
||||
score += analysis.get("duration", 0) * weights.get("duration", 0) / max_duration
|
||||
matching_styles.append((style, score))
|
||||
|
||||
if not matching_styles:
|
||||
return None
|
||||
|
||||
sorted_styles = sorted(matching_styles, key=lambda item: item[1], reverse=True)
|
||||
return sorted_styles[0][0]
|
||||
|
||||
@staticmethod
|
||||
def set_srt_style(ass: SSAFile) -> SSAFile:
|
||||
ass.info["ScaledBorderAndShadow"] = "no"
|
||||
play_res_y = int(ass.info["PlayResY"])
|
||||
if "Default" in ass.styles:
|
||||
ass.styles["Default"].marginv = play_res_y // 16
|
||||
ass.styles["Default"].fontname = "Microsoft YaHei"
|
||||
ass.styles["Default"].fontsize = play_res_y // 16
|
||||
return ass
|
||||
|
||||
def __set_style(self, ass: SSAFile) -> SSAFile:
|
||||
font_scaling = (
|
||||
float(self._font_scaling)
|
||||
@@ -1747,107 +1621,25 @@ class LexiAnnot(_PluginBase):
|
||||
ass.styles["Annotation EXAM"] = cefr_style
|
||||
return ass
|
||||
|
||||
@staticmethod
|
||||
def hex_to_rgb(hex_color: str | None) -> tuple[int, ...] | None:
|
||||
if not hex_color:
|
||||
return None
|
||||
pattern = r"^#[0-9a-fA-F]{6}$"
|
||||
if re.match(pattern, hex_color) is None:
|
||||
return None
|
||||
hex_color = hex_color.lstrip("#") # 去掉前面的 #
|
||||
return tuple(int(hex_color[i: i + 2], 16) for i in (0, 2, 4))
|
||||
|
||||
@staticmethod
|
||||
def __extract_subtitle(
|
||||
video_path: str,
|
||||
subtitle_stream_index: str,
|
||||
ffmpeg_path: str = "ffmpeg",
|
||||
sub_format="ass",
|
||||
) -> Optional[str]:
|
||||
if sub_format not in ["srt", "ass"]:
|
||||
raise ValueError("Invalid subtitle format")
|
||||
try:
|
||||
map_parameter = f"0:s:{subtitle_stream_index}"
|
||||
command = [ffmpeg_path, "-i", video_path, "-map", map_parameter, "-f", sub_format, "-"]
|
||||
result = subprocess.run(
|
||||
command, capture_output=True, text=True, encoding="utf-8", check=True
|
||||
def get_model_config(self) -> LLMConfig:
|
||||
if self._use_mp_agent:
|
||||
return LLMConfig(
|
||||
apikey=settings.LLM_API_KEY,
|
||||
base_url=settings.LLM_BASE_URL,
|
||||
model_name=settings.LLM_MODEL,
|
||||
thinking_level=settings.LLM_THINKING_LEVEL,
|
||||
provider=settings.LLM_PROVIDER.lower(),
|
||||
use_proxy=settings.LLM_USE_PROXY
|
||||
)
|
||||
return result.stdout
|
||||
except FileNotFoundError:
|
||||
logger.warn(f"错误:找不到视频文件 '{video_path}'")
|
||||
return None
|
||||
except subprocess.CalledProcessError as e:
|
||||
logger.warn(f"错误:提取字幕失败。\n错误信息:{e}")
|
||||
logger.warn(
|
||||
f"FFmpeg 输出 (stderr):\n{e.stderr.decode('utf-8', errors='ignore')}"
|
||||
else:
|
||||
return LLMConfig(
|
||||
apikey=self._gemini_apikey,
|
||||
base_url=self._llm_base_url,
|
||||
model_name=self._gemini_model,
|
||||
thinking_level=self._thinking_level,
|
||||
provider=self._llm_provider.lower(),
|
||||
use_proxy=self._use_proxy
|
||||
)
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _extract_subtitles_by_lang(
|
||||
video_path: str, lang: str | list = "en", ffmpeg: str = "ffmpeg"
|
||||
) -> list[dict]:
|
||||
"""
|
||||
提取视频文件中的内嵌英文字幕,使用 MediaInfo 查找字幕流。
|
||||
"""
|
||||
|
||||
def check_lang(track_lang: str) -> bool:
|
||||
if isinstance(lang, list):
|
||||
return track_lang in lang
|
||||
return track_lang == lang
|
||||
|
||||
supported_codec = ["S_TEXT/UTF8", "S_TEXT/ASS", "tx3g"]
|
||||
subtitles = []
|
||||
try:
|
||||
media_info: pymediainfo.MediaInfo = pymediainfo.MediaInfo.parse(video_path)
|
||||
for track in media_info.tracks:
|
||||
if (
|
||||
track.track_type == "Text"
|
||||
and check_lang(track_lang=track.language)
|
||||
and track.codec_id in supported_codec
|
||||
):
|
||||
subtitle_stream_index = (
|
||||
track.stream_identifier
|
||||
) # MediaInfo 的 stream_id 从 1 开始,ffmpeg 从 0 开始
|
||||
extracted_subtitle = LexiAnnot.__extract_subtitle(
|
||||
video_path, subtitle_stream_index, ffmpeg
|
||||
)
|
||||
duration = 0
|
||||
if hasattr(track, "duration"):
|
||||
try:
|
||||
duration = int(float(track.duration))
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
if extracted_subtitle:
|
||||
subtitles.append(
|
||||
{
|
||||
"title": track.title or "",
|
||||
"subtitle": extracted_subtitle,
|
||||
"codec_id": track.codec_id,
|
||||
"stream_id": subtitle_stream_index,
|
||||
"duration": duration,
|
||||
}
|
||||
)
|
||||
if subtitles:
|
||||
# remove outliers with abnormally short duration
|
||||
if len(subtitles) > 1:
|
||||
durations = [sub["duration"] for sub in subtitles if sub["duration"] > 0]
|
||||
if durations:
|
||||
avg_duration = sum(durations) / len(durations)
|
||||
subtitles = [
|
||||
sub for sub in subtitles if sub["duration"] >= avg_duration * 0.2
|
||||
]
|
||||
if not subtitles:
|
||||
logger.warn("未找到标记为英语的文本字幕流")
|
||||
|
||||
except FileNotFoundError:
|
||||
logger.error(f"找不到视频文件 '{video_path}'")
|
||||
except subprocess.CalledProcessError as e:
|
||||
logger.error(f"错误:提取字幕失败。\n错误信息:{e}")
|
||||
logger.error(f"FFmpeg 输出 (stderr):\n{e.stderr}")
|
||||
except Exception as e:
|
||||
logger.error(f"使用 MediaInfo 提取字幕时发生错误:{e}")
|
||||
return subtitles
|
||||
|
||||
def _process_chain(
|
||||
self,
|
||||
@@ -1867,7 +1659,6 @@ class LexiAnnot(_PluginBase):
|
||||
CEFR_LEVELS = ["A1", "A2", "B1", "B2", "C1", "C2"]
|
||||
simple_vocabulary = set(filter(lambda x: x < self._annot_level, CEFR_LEVELS))
|
||||
learner_level = max(simple_vocabulary)
|
||||
model_temperature = float(self._model_temperature) if self._model_temperature else 0.3
|
||||
logger.info("通过 spaCy 分词...")
|
||||
for seg in segments:
|
||||
if self._shutdown_event.is_set():
|
||||
@@ -1879,25 +1670,19 @@ class LexiAnnot(_PluginBase):
|
||||
simple_level=simple_vocabulary
|
||||
)
|
||||
if self._gemini_available:
|
||||
if self._use_mp_agent:
|
||||
llm_apikey = settings.LLM_API_KEY
|
||||
llm_base_url = settings.LLM_BASE_URL
|
||||
llm_model_name = settings.LLM_MODEL
|
||||
llm_provider = settings.LLM_PROVIDER.lower()
|
||||
else:
|
||||
llm_apikey = self._gemini_apikey
|
||||
llm_base_url = self._llm_base_url
|
||||
llm_model_name = self._gemini_model
|
||||
llm_provider = self._llm_provider.lower()
|
||||
llm = initialize_llm(
|
||||
provider=llm_provider,
|
||||
model_name=llm_model_name,
|
||||
base_url=llm_base_url,
|
||||
api_key=llm_apikey or '',
|
||||
temperature=model_temperature,
|
||||
max_retries=self._max_retries,
|
||||
proxy=self._use_proxy,
|
||||
)
|
||||
llm_config = self.get_model_config()
|
||||
llm = asyncio.run_coroutine_threadsafe(
|
||||
LLMHelper.get_llm(
|
||||
provider=llm_config.provider,
|
||||
model=llm_config.model_name,
|
||||
thinking_level=llm_config.thinking_level,
|
||||
api_key=llm_config.apikey,
|
||||
base_url=llm_config.base_url,
|
||||
use_proxy=llm_config.use_proxy
|
||||
),
|
||||
global_vars.loop
|
||||
).result()
|
||||
|
||||
segments = llm_process_chain(
|
||||
lexi=lexi,
|
||||
llm=llm,
|
||||
@@ -1926,8 +1711,8 @@ class LexiAnnot(_PluginBase):
|
||||
f"{self._accent_color_rgb[1]:02x}{self._accent_color_rgb[0]:02x}&"
|
||||
) # &H00FFFFFF&
|
||||
|
||||
statistical_res = LexiAnnot.analyze_ass_language(ass_file)
|
||||
main_style: str | None = LexiAnnot.select_main_style_weighted(statistical_res, lang)
|
||||
statistical_res = SubtitleHelper.analyze_ass_language(ass_file)
|
||||
main_style: str | None = SubtitleHelper.select_main_style_weighted(statistical_res, lang)
|
||||
if not main_style:
|
||||
logger.error("无法确定主要字幕样式")
|
||||
return None, None
|
||||
@@ -2004,7 +1789,7 @@ class LexiAnnot(_PluginBase):
|
||||
"new_text": new_text,
|
||||
}
|
||||
replacements.append(replacement)
|
||||
LexiAnnot.replace_by_plaintext_positions(
|
||||
SubtitleHelper.replace_by_plaintext_positions(
|
||||
main_processor[seg.index], replacements
|
||||
)
|
||||
if self._sentence_translation:
|
||||
|
||||
@@ -4,9 +4,7 @@ import threading
|
||||
from langchain_core.language_models.chat_models import BaseChatModel
|
||||
from langchain_core.prompts import ChatPromptTemplate
|
||||
from langchain_core.output_parsers import PydanticOutputParser
|
||||
from pydantic import SecretStr
|
||||
|
||||
from app.core.config import settings
|
||||
from app.schemas import Context
|
||||
from app.schemas.types import MediaType
|
||||
from app.log import logger
|
||||
@@ -60,59 +58,6 @@ UNIVERSAL_POS_MAP: dict[UniversalPos, str | None] = {
|
||||
}
|
||||
|
||||
|
||||
def initialize_llm(
|
||||
provider: str,
|
||||
api_key: str,
|
||||
model_name: str,
|
||||
base_url: str | None,
|
||||
temperature: float = 0.1,
|
||||
max_retries: int = 3,
|
||||
proxy: bool = False,
|
||||
) -> BaseChatModel:
|
||||
"""初始化 LLM"""
|
||||
|
||||
if provider == "google":
|
||||
if proxy:
|
||||
from langchain_openai import ChatOpenAI
|
||||
|
||||
return ChatOpenAI(
|
||||
model=settings.LLM_MODEL,
|
||||
api_key=SecretStr(api_key),
|
||||
max_retries=3,
|
||||
base_url="https://generativelanguage.googleapis.com/v1beta/openai",
|
||||
temperature=settings.LLM_TEMPERATURE,
|
||||
openai_proxy=settings.PROXY_HOST,
|
||||
)
|
||||
from langchain_google_genai import ChatGoogleGenerativeAI
|
||||
|
||||
return ChatGoogleGenerativeAI(
|
||||
model=model_name,
|
||||
google_api_key=api_key, # noqa
|
||||
max_retries=max_retries,
|
||||
temperature=temperature,
|
||||
)
|
||||
elif provider == "deepseek":
|
||||
from langchain_deepseek import ChatDeepSeek
|
||||
|
||||
return ChatDeepSeek(
|
||||
model=model_name,
|
||||
api_key=SecretStr(api_key),
|
||||
max_retries=max_retries,
|
||||
temperature=temperature,
|
||||
)
|
||||
else:
|
||||
from langchain_openai import ChatOpenAI
|
||||
|
||||
return ChatOpenAI(
|
||||
model=model_name,
|
||||
api_key=SecretStr(api_key),
|
||||
max_retries=max_retries,
|
||||
base_url=base_url,
|
||||
temperature=temperature,
|
||||
openai_proxy=settings.PROXY_HOST if proxy else None,
|
||||
)
|
||||
|
||||
|
||||
def convert_pos_to_spacy(pos: str):
|
||||
"""
|
||||
将给定的词性列表转换为 spaCy 库中使用的词性标签
|
||||
@@ -727,5 +672,4 @@ def llm_process_chain(
|
||||
lexi, llm, context, start, end, learner_level, media_name, translate_sentences
|
||||
)
|
||||
)
|
||||
|
||||
return SegmentList(root=segments_list)
|
||||
|
||||
@@ -365,3 +365,12 @@ class VocabularyAnnotatingToolInput(BaseModel):
|
||||
class QueryAnnotationTasksToolInput(BaseModel):
|
||||
count: int = Field(default=5, description="The maximum number of returned annotation tasks")
|
||||
explanation: str = Field(..., description="This is a tool for querying the latest annotation tasks in AnnotLexi")
|
||||
|
||||
|
||||
class LLMConfig(BaseModel):
|
||||
apikey: str
|
||||
provider: str
|
||||
model_name: str
|
||||
thinking_level: str | None = Field(default=None)
|
||||
base_url: str | None = Field(default=None)
|
||||
use_proxy: bool = Field(default=False)
|
||||
|
||||
@@ -1,10 +1,277 @@
|
||||
import re
|
||||
import subprocess
|
||||
from collections import Counter
|
||||
from typing import Generator, Any, overload
|
||||
|
||||
from pysubs2 import SSAEvent
|
||||
import pymediainfo
|
||||
from langdetect import detect
|
||||
from pysubs2 import SSAEvent, SSAFile
|
||||
|
||||
from app.log import logger
|
||||
|
||||
from .schemas import SubtitleSegment
|
||||
|
||||
|
||||
class SubtitleHelper:
|
||||
|
||||
@staticmethod
|
||||
def remove_substring(replacements: list[dict]):
|
||||
new_list = []
|
||||
replacements.sort(key=lambda x: x["end"] - x["start"], reverse=True)
|
||||
for r in replacements:
|
||||
if any((r["start"] >= new["start"] and r["end"] <= new["end"]) for new in new_list):
|
||||
continue
|
||||
new_list.append(r)
|
||||
return new_list
|
||||
|
||||
@staticmethod
|
||||
def analyze_ass_language(ass_file: SSAFile):
|
||||
|
||||
def _replace_with_spaces(_text):
|
||||
"""
|
||||
使用等长的空格替换文本中的 (xxx) 模式。
|
||||
例如:"(Hi)" 会被替换成 " " (4个空格)
|
||||
"""
|
||||
pattern = r"(\([^()]*\)|\[[^\[\]]*\])"
|
||||
return re.sub(pattern, lambda match: " " * len(match.group(1)), _text)
|
||||
|
||||
styles = {}
|
||||
for style in ass_file.styles:
|
||||
styles[style] = {"text": [], "duration": 0, "text_size": 0, "times": 0}
|
||||
for dialogue in ass_file:
|
||||
style = dialogue.style
|
||||
text = _replace_with_spaces(dialogue.plaintext)
|
||||
sub_text = text.split("\n")
|
||||
if style not in styles or not text:
|
||||
continue
|
||||
styles[style]["text"].extend(sub_text)
|
||||
styles[style]["duration"] += dialogue.duration
|
||||
styles[style]["text_size"] += len(text)
|
||||
styles[style]["times"] += 1
|
||||
style_language_analysis = {}
|
||||
for style_name, data in styles.items():
|
||||
all_text = " ".join(data["text"])
|
||||
if not all_text.strip():
|
||||
style_language_analysis[style_name] = None
|
||||
continue
|
||||
|
||||
languages = []
|
||||
# 对每个文本片段进行语言检测
|
||||
for text_fragment in data["text"]:
|
||||
try:
|
||||
lang = detect(text_fragment)
|
||||
languages.append(lang)
|
||||
except Exception as e:
|
||||
# 无法检测的文本
|
||||
logger.debug(e)
|
||||
|
||||
if languages:
|
||||
language_counts = Counter(languages)
|
||||
most_common_language = language_counts.most_common(1)[0]
|
||||
style_language_analysis[style_name] = {
|
||||
"main_language": most_common_language[0],
|
||||
"proportion": most_common_language[1] / len(languages),
|
||||
"duration": data["duration"],
|
||||
"text_size": data["text_size"],
|
||||
"times": data["times"],
|
||||
}
|
||||
else:
|
||||
style_language_analysis[style_name] = None
|
||||
|
||||
return style_language_analysis
|
||||
|
||||
@staticmethod
|
||||
def select_main_style_weighted(analysis: dict[str, Any], known_language: str, weights = None):
|
||||
"""
|
||||
根据语言分析结果和已知的字幕语言,使用加权评分选择主要样式
|
||||
|
||||
:params analysis: `analyze_ass_language` 函数的输出结果
|
||||
:params known_language: 已知的字幕语言代码
|
||||
:params weights: 各个维度的权重,权重之和应为 1
|
||||
:returns: 主要字幕的样式名称,如果没有匹配的样式则返回 None
|
||||
"""
|
||||
if weights is None:
|
||||
weights = {"times": 0.5, "text_size": 0.4, "duration": 0.1}
|
||||
matching_styles = []
|
||||
max_times = max([analysis.get("times", 0) for _, analysis in analysis.items() if analysis] or [0]) or 1
|
||||
max_text_size = max([analysis.get("text_size", 0) for _, analysis in analysis.items() if analysis] or [0]) or 1
|
||||
max_duration = max([analysis.get("duration", 0) for _, analysis in analysis.items() if analysis] or [0]) or 1
|
||||
for style, info in analysis.items():
|
||||
if not info:
|
||||
continue
|
||||
if info.get("main_language") == known_language:
|
||||
# 跳过多语言
|
||||
if info.get("proportion", 0) < 0.5:
|
||||
continue
|
||||
score = 0
|
||||
score += info.get("times", 0) * weights.get("times", 0) / max_times
|
||||
score += info.get("text_size", 0) * weights.get("text_size", 0) / max_text_size
|
||||
score += info.get("duration", 0) * weights.get("duration", 0) / max_duration
|
||||
matching_styles.append((style, score))
|
||||
|
||||
if not matching_styles:
|
||||
return None
|
||||
|
||||
sorted_styles = sorted(matching_styles, key=lambda item: item[1], reverse=True)
|
||||
return sorted_styles[0][0]
|
||||
|
||||
@staticmethod
|
||||
def set_srt_style(ass: SSAFile) -> SSAFile:
|
||||
ass.info["ScaledBorderAndShadow"] = "no"
|
||||
play_res_y = int(ass.info["PlayResY"])
|
||||
if "Default" in ass.styles:
|
||||
ass.styles["Default"].marginv = play_res_y // 16
|
||||
ass.styles["Default"].fontname = "Microsoft YaHei"
|
||||
ass.styles["Default"].fontsize = play_res_y // 16
|
||||
return ass
|
||||
|
||||
@staticmethod
|
||||
def __extract_subtitle(
|
||||
video_path: str,
|
||||
subtitle_stream_index: str,
|
||||
ffmpeg_path: str = "ffmpeg",
|
||||
sub_format="ass",
|
||||
) -> str | None:
|
||||
if sub_format not in ["srt", "ass"]:
|
||||
raise ValueError("Invalid subtitle format")
|
||||
try:
|
||||
map_parameter = f"0:s:{subtitle_stream_index}"
|
||||
command = [ffmpeg_path, "-i", video_path, "-map", map_parameter, "-f", sub_format, "-"]
|
||||
result = subprocess.run(
|
||||
command, capture_output=True, text=True, encoding="utf-8", check=True
|
||||
)
|
||||
return result.stdout
|
||||
except FileNotFoundError:
|
||||
logger.warn(f"错误:找不到视频文件 '{video_path}'")
|
||||
return None
|
||||
except subprocess.CalledProcessError as e:
|
||||
logger.warn(f"错误:提取字幕失败。\n错误信息:{e}")
|
||||
logger.warn(
|
||||
f"FFmpeg 输出 (stderr):\n{e.stderr.decode('utf-8', errors='ignore')}"
|
||||
)
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def extract_subtitles_by_lang(
|
||||
video_path: str, lang: str | list = "en", ffmpeg: str = "ffmpeg"
|
||||
) -> list[dict]:
|
||||
"""
|
||||
提取视频文件中的内嵌英文字幕,使用 MediaInfo 查找字幕流。
|
||||
"""
|
||||
|
||||
def check_lang(track_lang: str) -> bool:
|
||||
if isinstance(lang, list):
|
||||
return track_lang in lang
|
||||
return track_lang == lang
|
||||
|
||||
supported_codec = ["S_TEXT/UTF8", "S_TEXT/ASS", "tx3g"]
|
||||
subtitles = []
|
||||
try:
|
||||
media_info: pymediainfo.MediaInfo = pymediainfo.MediaInfo.parse(video_path)
|
||||
for track in media_info.tracks:
|
||||
if (
|
||||
track.track_type == "Text"
|
||||
and check_lang(track_lang=track.language)
|
||||
and track.codec_id in supported_codec
|
||||
):
|
||||
subtitle_stream_index = (
|
||||
track.stream_identifier
|
||||
) # MediaInfo 的 stream_id 从 1 开始,ffmpeg 从 0 开始
|
||||
extracted_subtitle = SubtitleHelper.__extract_subtitle(
|
||||
video_path, subtitle_stream_index, ffmpeg
|
||||
)
|
||||
duration = 0
|
||||
if hasattr(track, "duration"):
|
||||
try:
|
||||
duration = int(float(track.duration))
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
if extracted_subtitle:
|
||||
subtitles.append(
|
||||
{
|
||||
"title": track.title or "",
|
||||
"subtitle": extracted_subtitle,
|
||||
"codec_id": track.codec_id,
|
||||
"stream_id": subtitle_stream_index,
|
||||
"duration": duration,
|
||||
}
|
||||
)
|
||||
if subtitles:
|
||||
# remove outliers with abnormally short duration
|
||||
if len(subtitles) > 1:
|
||||
durations = [sub["duration"] for sub in subtitles if sub["duration"] > 0]
|
||||
if durations:
|
||||
avg_duration = sum(durations) / len(durations)
|
||||
subtitles = [
|
||||
sub for sub in subtitles if sub["duration"] >= avg_duration * 0.2
|
||||
]
|
||||
if not subtitles:
|
||||
logger.warn("未找到标记为英语的文本字幕流")
|
||||
|
||||
except FileNotFoundError:
|
||||
logger.error(f"找不到视频文件 '{video_path}'")
|
||||
except subprocess.CalledProcessError as e:
|
||||
logger.error(f"错误:提取字幕失败。\n错误信息:{e}")
|
||||
logger.error(f"FFmpeg 输出 (stderr):\n{e.stderr}")
|
||||
except Exception as e:
|
||||
logger.error(f"使用 MediaInfo 提取字幕时发生错误:{e}")
|
||||
return subtitles
|
||||
|
||||
@staticmethod
|
||||
def replace_by_plaintext_positions(line: SSAEvent, replacements: list[dict]):
|
||||
"""
|
||||
使用 replacements 中的 plaintext 位置信息, 替换 line.text 中的内容。
|
||||
:param line: SSAEvent line
|
||||
:param replacements: [{'start': int, 'end': int, 'old_text': str, 'new_text': str}, ...]
|
||||
"""
|
||||
text = line.text
|
||||
tag_pattern = re.compile(r"{.*?}") # 匹配 {xxx} 格式控制符
|
||||
special_pattern = re.compile(r"\\[Nh]")
|
||||
# 构建 plaintext 位置到 text 索引的映射
|
||||
mapping = {} # plaintext_index -> text_index
|
||||
p_index = 0 # 当前 plaintext 索引
|
||||
t_index = 0 # 当前 text 索引
|
||||
|
||||
while t_index < len(text):
|
||||
if text[t_index] == "{":
|
||||
# 跳过格式标签
|
||||
match = tag_pattern.match(text, t_index)
|
||||
if match:
|
||||
t_index = match.end()
|
||||
continue
|
||||
elif text[t_index] == "\\":
|
||||
match = special_pattern.match(text, t_index)
|
||||
if match:
|
||||
t_index = match.end() - 1
|
||||
continue
|
||||
# 非格式字符
|
||||
mapping[p_index] = t_index
|
||||
p_index += 1
|
||||
t_index += 1
|
||||
replacements = SubtitleHelper.remove_substring(replacements)
|
||||
# 按照 mapping 执行替换(倒序替换防止位置错位)
|
||||
new_text = text
|
||||
for r in sorted(replacements, key=lambda x: x["start"], reverse=True):
|
||||
start = mapping.get(r["start"])
|
||||
end = mapping.get(r["end"] - 1)
|
||||
if start is None or end is None:
|
||||
continue
|
||||
end += 1
|
||||
new_text = new_text[:start] + r["new_text"] + new_text[end:]
|
||||
|
||||
line.text = new_text
|
||||
|
||||
@staticmethod
|
||||
def hex_to_rgb(hex_color: str | None) -> tuple[int, ...] | None:
|
||||
if not hex_color:
|
||||
return None
|
||||
pattern = r"^#[0-9a-fA-F]{6}$"
|
||||
if re.match(pattern, hex_color) is None:
|
||||
return None
|
||||
hex_color = hex_color.lstrip("#") # 去掉前面的 #
|
||||
return tuple(int(hex_color[i: i + 2], 16) for i in (0, 2, 4))
|
||||
|
||||
|
||||
class SubtitleProcessor:
|
||||
def __init__(self):
|
||||
self._events: list[SSAEvent] = []
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
from threading import Event
|
||||
from typing import List, Tuple, Dict, Any
|
||||
from typing import List, Tuple, Dict, Any, Optional
|
||||
|
||||
import pytz
|
||||
from apscheduler.schedulers.background import BackgroundScheduler
|
||||
@@ -27,7 +27,7 @@ class LibraryScraper(_PluginBase):
|
||||
# 插件图标
|
||||
plugin_icon = "scraper.png"
|
||||
# 插件版本
|
||||
plugin_version = "2.1.1"
|
||||
plugin_version = "2.1.3"
|
||||
# 插件作者
|
||||
plugin_author = "jxxghp"
|
||||
# 作者主页
|
||||
@@ -51,6 +51,9 @@ class LibraryScraper(_PluginBase):
|
||||
_exclude_paths = ""
|
||||
# 退出事件
|
||||
_event = Event()
|
||||
# 刮削目标类型
|
||||
_target_dir = "dir"
|
||||
_target_file = "file"
|
||||
|
||||
def init_plugin(self, config: dict = None):
|
||||
|
||||
@@ -302,7 +305,7 @@ class LibraryScraper(_PluginBase):
|
||||
exclude_paths = self._exclude_paths.split("\n")
|
||||
# 已选择的目录
|
||||
paths = self._scraper_paths.split("\n")
|
||||
# 需要适削的媒体文件夹
|
||||
# 需要刮削的媒体目录或文件
|
||||
scraper_paths = []
|
||||
for path in paths:
|
||||
if not path:
|
||||
@@ -339,38 +342,116 @@ class LibraryScraper(_PluginBase):
|
||||
if exclude_flag:
|
||||
logger.debug(f"{file_path} 在排除目录中,跳过 ...")
|
||||
continue
|
||||
# 识别是电影还是电视剧
|
||||
if not mtype:
|
||||
file_meta = MetaInfoPath(file_path)
|
||||
mtype = file_meta.type
|
||||
# 重命名格式
|
||||
rename_format = settings.TV_RENAME_FORMAT \
|
||||
if mtype == MediaType.TV else settings.MOVIE_RENAME_FORMAT
|
||||
# 计算重命名中的文件夹层数
|
||||
rename_format_level = len(rename_format.split("/")) - 1
|
||||
if rename_format_level < 1:
|
||||
if mtype and not self.__match_forced_type_path(
|
||||
file_path=file_path,
|
||||
scraper_path=scraper_path,
|
||||
mtype=mtype
|
||||
):
|
||||
logger.debug(f"{file_path} 不属于强制指定的{mtype.value}目录,跳过 ...")
|
||||
continue
|
||||
# 取相对路径的第1层目录
|
||||
media_path = file_path.parents[rename_format_level - 1]
|
||||
dir_item = (media_path, mtype)
|
||||
if dir_item not in scraper_paths:
|
||||
logger.info(f"发现目录:{dir_item}")
|
||||
scraper_paths.append(dir_item)
|
||||
# 识别是电影还是电视剧,强制类型只作为默认值,不污染后续文件识别结果
|
||||
file_meta = MetaInfoPath(file_path)
|
||||
file_mtype = mtype
|
||||
if not file_mtype:
|
||||
file_mtype = file_meta.type
|
||||
if file_mtype == MediaType.UNKNOWN:
|
||||
file_mtype = self.__infer_type_from_path(file_path=file_path, scraper_path=scraper_path)
|
||||
scraper_item = self.__get_scrape_item(
|
||||
file_path=file_path,
|
||||
scraper_path=scraper_path,
|
||||
mtype=file_mtype,
|
||||
tmdbid=file_meta.tmdbid
|
||||
)
|
||||
if scraper_item and not self.__contains_scrape_item(scraper_paths, scraper_item):
|
||||
logger.info(f"发现刮削目标:{scraper_item}")
|
||||
scraper_paths.append(scraper_item)
|
||||
# 开始刮削
|
||||
if scraper_paths:
|
||||
for item in scraper_paths:
|
||||
logger.info(f"开始刮削目录:{item[0]} ...")
|
||||
self.__scrape_dir(path=item[0], mtype=item[1])
|
||||
logger.info(f"开始刮削目标:{item[0]} ...")
|
||||
self.__scrape_path(path=item[0], mtype=item[1], target_type=item[2], tmdbid=item[3])
|
||||
else:
|
||||
logger.info(f"未发现需要刮削的目录")
|
||||
|
||||
def __scrape_dir(self, path: Path, mtype: MediaType):
|
||||
@staticmethod
|
||||
def __get_scrape_item(
|
||||
file_path: Path,
|
||||
scraper_path: Path,
|
||||
mtype: MediaType,
|
||||
tmdbid: Optional[int] = None
|
||||
) -> Optional[Tuple[Path, MediaType, str, Optional[int]]]:
|
||||
"""
|
||||
削刮一个目录,该目录必须是媒体文件目录
|
||||
根据扫描根目录和重命名格式,计算真正需要刮削的媒体目录。
|
||||
分类目录通常位于扫描根目录下方,必须用相对路径计算,否则会被误当成媒体目录。
|
||||
"""
|
||||
# 优先读取本地nfo文件
|
||||
tmdbid = None
|
||||
if mtype == MediaType.MOVIE:
|
||||
if not file_path or not scraper_path or not mtype:
|
||||
return None
|
||||
|
||||
rename_format = settings.TV_RENAME_FORMAT if mtype == MediaType.TV else settings.MOVIE_RENAME_FORMAT
|
||||
rename_format_level = len(rename_format.strip("/").split("/")) - 1
|
||||
try:
|
||||
relative_path = file_path.relative_to(scraper_path)
|
||||
except ValueError:
|
||||
relative_path = Path(file_path.name)
|
||||
|
||||
if rename_format_level >= 1:
|
||||
relative_parts = Path(relative_path).parts
|
||||
# 重命名格式中包含几层目录,就从文件往上取几层目录;前缀分类目录不会参与计算。
|
||||
if len(relative_parts) > rename_format_level:
|
||||
media_path = scraper_path.joinpath(*relative_parts[:-rename_format_level])
|
||||
return media_path, mtype, LibraryScraper._target_dir, tmdbid
|
||||
|
||||
# 扁平目录或自定义重命名格式无目录层级时,退回到单文件刮削,避免分类目录识别失败。
|
||||
return file_path, mtype, LibraryScraper._target_file, tmdbid
|
||||
|
||||
@staticmethod
|
||||
def __contains_scrape_item(scraper_paths: List[Tuple[Path, MediaType, str, Optional[int]]],
|
||||
scraper_item: Tuple[Path, MediaType, str, Optional[int]]) -> bool:
|
||||
"""
|
||||
判断刮削目标是否已存在;同一目标只刮削一次,tmdbid 仅作为识别辅助信息。
|
||||
"""
|
||||
return any(item[:3] == scraper_item[:3] for item in scraper_paths)
|
||||
|
||||
@staticmethod
|
||||
def __match_forced_type_path(file_path: Path, scraper_path: Path, mtype: MediaType) -> bool:
|
||||
"""
|
||||
强制指定媒体类型时,如果扫描根目录下同时存在“电影/电视剧”分类,则只处理匹配类型的目录。
|
||||
"""
|
||||
if mtype not in (MediaType.MOVIE, MediaType.TV):
|
||||
return True
|
||||
try:
|
||||
relative_parts = file_path.relative_to(scraper_path).parts
|
||||
except ValueError:
|
||||
return True
|
||||
media_type_parts = {MediaType.MOVIE.value, MediaType.TV.value}.intersection(relative_parts)
|
||||
return not media_type_parts or mtype.value in media_type_parts
|
||||
|
||||
@staticmethod
|
||||
def __infer_type_from_path(file_path: Path, scraper_path: Path) -> MediaType:
|
||||
"""
|
||||
文件名无法识别类型时,从扫描根目录下的“电影/电视剧”分类层推断媒体类型。
|
||||
"""
|
||||
try:
|
||||
relative_parts = file_path.relative_to(scraper_path).parts
|
||||
except ValueError:
|
||||
relative_parts = file_path.parts
|
||||
if MediaType.TV.value in relative_parts:
|
||||
return MediaType.TV
|
||||
if MediaType.MOVIE.value in relative_parts:
|
||||
return MediaType.MOVIE
|
||||
return MediaType.UNKNOWN
|
||||
|
||||
def __scrape_path(self, path: Path, mtype: MediaType, target_type: str = _target_dir,
|
||||
tmdbid: Optional[int] = None):
|
||||
"""
|
||||
刮削一个媒体目录或媒体文件
|
||||
"""
|
||||
# 优先读取本地nfo文件;文件路径中解析出的 tmdbid 作为兜底识别信息保留。
|
||||
if target_type == self._target_file:
|
||||
nfo_path = path.with_suffix(".nfo")
|
||||
if nfo_path.exists():
|
||||
tmdbid = self.__get_tmdbid_from_nfo(nfo_path)
|
||||
elif mtype == MediaType.MOVIE:
|
||||
# 电影
|
||||
movie_nfo = path / "movie.nfo"
|
||||
if movie_nfo.exists():
|
||||
@@ -393,6 +474,10 @@ class LibraryScraper(_PluginBase):
|
||||
meta.type = mtype
|
||||
mediainfo = self.chain.recognize_media(meta=meta)
|
||||
if not mediainfo:
|
||||
if target_type == self._target_dir:
|
||||
# 目录名无法识别时,通常是分类目录,继续尝试其中的具体媒体文件。
|
||||
self.__scrape_child_files(path=path, mtype=mtype)
|
||||
return
|
||||
logger.warn(f"未识别到媒体信息:{path}")
|
||||
return
|
||||
|
||||
@@ -405,13 +490,17 @@ class LibraryScraper(_PluginBase):
|
||||
# 获取图片
|
||||
self.chain.obtain_images(mediainfo)
|
||||
# 刮削
|
||||
item_path = str(path).replace("\\", "/")
|
||||
if target_type == self._target_dir:
|
||||
item_path = f"{item_path}/"
|
||||
MediaChain().scrape_metadata(
|
||||
fileitem=schemas.FileItem(
|
||||
storage="local",
|
||||
type="dir",
|
||||
path=str(path).replace("\\", "/") + "/",
|
||||
type=target_type,
|
||||
path=item_path,
|
||||
name=path.name,
|
||||
basename=path.stem,
|
||||
extension=path.suffix[1:] if target_type == self._target_file else None,
|
||||
modify_time=path.stat().st_mtime,
|
||||
),
|
||||
mediainfo=mediainfo,
|
||||
@@ -419,6 +508,26 @@ class LibraryScraper(_PluginBase):
|
||||
)
|
||||
logger.info(f"{path} 刮削完成")
|
||||
|
||||
def __scrape_child_files(self, path: Path, mtype: MediaType):
|
||||
"""
|
||||
分类目录无法作为单个媒体识别时,继续按目录内的媒体文件逐个刮削。
|
||||
"""
|
||||
child_files = SystemUtils.list_files(path, settings.RMT_MEDIAEXT)
|
||||
if not child_files:
|
||||
logger.warn(f"未识别到媒体信息:{path}")
|
||||
return
|
||||
logger.info(f"{path} 可能是分类目录,开始刮削目录内媒体文件 ...")
|
||||
for child_file in child_files:
|
||||
if self._event.is_set():
|
||||
logger.info(f"媒体库刮削服务停止")
|
||||
return
|
||||
child_mtype = mtype
|
||||
child_meta = MetaInfoPath(child_file)
|
||||
if not child_mtype:
|
||||
child_mtype = child_meta.type
|
||||
self.__scrape_path(path=child_file, mtype=child_mtype, target_type=self._target_file,
|
||||
tmdbid=child_meta.tmdbid)
|
||||
|
||||
@staticmethod
|
||||
def __get_tmdbid_from_nfo(file_path: Path):
|
||||
"""
|
||||
|
||||
1723
plugins.v2/traktcleaner/__init__.py
Normal file
1723
plugins.v2/traktcleaner/__init__.py
Normal file
File diff suppressed because it is too large
Load Diff
821
plugins.v2/updatewechatip/__init__.py
Normal file
821
plugins.v2/updatewechatip/__init__.py
Normal file
@@ -0,0 +1,821 @@
|
||||
import random
|
||||
import re
|
||||
import time
|
||||
import uuid
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Tuple
|
||||
from urllib.parse import urlparse, parse_qs
|
||||
|
||||
import requests
|
||||
from apscheduler.triggers.cron import CronTrigger
|
||||
from fastapi.responses import FileResponse
|
||||
|
||||
from app.core.config import settings
|
||||
from app.core.event import eventmanager, Event
|
||||
from app.log import logger
|
||||
from app.plugins import _PluginBase
|
||||
from app.schemas.types import EventType
|
||||
|
||||
|
||||
class UpdateWeChatIp(_PluginBase):
|
||||
# 插件在界面中的展示名称
|
||||
plugin_name = "动态企微可信IP"
|
||||
# 插件描述
|
||||
plugin_desc = "修改企微应用可信IP,可本地扫码刷新Cookie"
|
||||
# 插件图标
|
||||
plugin_icon = "Wecom_A.png"
|
||||
# 插件版本,必须和 package.v2.json 中保持一致
|
||||
plugin_version = "1.0.8"
|
||||
# 作者信息
|
||||
plugin_author = "书小白"
|
||||
author_url = "https://github.com/thshu/MoviePilot-Plugins"
|
||||
# 配置项前缀,建议保持唯一,避免与其他插件冲突
|
||||
plugin_config_prefix = "UpdateWeChatIp_"
|
||||
# 插件加载顺序,数值越小越早
|
||||
plugin_order = 50
|
||||
# 插件可见权限级别
|
||||
auth_level = 1
|
||||
|
||||
# 运行时状态字段
|
||||
_enabled = False
|
||||
_se = None
|
||||
_qrcode_key = None
|
||||
_tl_key = None
|
||||
_captcha = {}
|
||||
_wwrtx_sid = None
|
||||
_party_cache_data = None
|
||||
_app_id = ""
|
||||
_ip = None
|
||||
_is_login = False
|
||||
onlyonce = False
|
||||
_cron = ""
|
||||
|
||||
_UpdateLogKey = 'UpdateLog'
|
||||
|
||||
_ip_urls = ["https://myip.ipip.net", "https://ddns.oray.com/checkip", "https://ip.3322.net", "https://4.ipw.cn",
|
||||
'http://v4.666666.host:66/ip', 'https://ipv4.ddnspod.com', 'https://v4.66666.host:66/ip',
|
||||
'https://4.ipw.cn', 'https://ip.3322.net', 'https://6.66666.host:66/ip']
|
||||
_ip_pattern = r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b'
|
||||
|
||||
_headers = {
|
||||
'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/148.0.0.0 Safari/537.36",
|
||||
'Accept-Encoding': "gzip, deflate, br, zstd",
|
||||
'pragma': "no-cache",
|
||||
'cache-control': "no-cache",
|
||||
'sec-ch-ua-platform': "\"Windows\"",
|
||||
'x-requested-with': "XMLHttpRequest",
|
||||
'sec-ch-ua': "\"Chromium\";v=\"148\", \"Google Chrome\";v=\"148\", \"Not/A)Brand\";v=\"99\"",
|
||||
'sec-ch-ua-mobile': "?0",
|
||||
'sec-fetch-site': "same-origin",
|
||||
'sec-fetch-mode': "cors",
|
||||
'sec-fetch-dest': "empty",
|
||||
'referer': "https://work.weixin.qq.com/wework_admin/wwqrlogin/mng/login_qrcode",
|
||||
'accept-language': "zh-CN,zh;q=0.9,ja;q=0.8,en;q=0.7",
|
||||
'priority': "u=1, i",
|
||||
}
|
||||
|
||||
def init_plugin(self, config: dict = None):
|
||||
"""根据当前配置初始化插件。"""
|
||||
config = config or {}
|
||||
self._enabled = bool(config.get("_enabled"))
|
||||
self._wwrtx_sid = config.get("_wwrtx_sid")
|
||||
self._app_id = config.get("_app_id")
|
||||
self._cron = config.get("_cron")
|
||||
self._party_cache_data = config.get("_party_cache_data")
|
||||
|
||||
self._se = requests.Session()
|
||||
self._se.cookies.set('wwrtx.sid', self._wwrtx_sid)
|
||||
|
||||
def _save_current_config(self):
|
||||
self._login_success()
|
||||
|
||||
def get_state(self) -> bool:
|
||||
"""返回插件当前是否启用。"""
|
||||
return self._enabled
|
||||
|
||||
def get_service(self) -> List[Dict[str, Any]]:
|
||||
if self._enabled and self._cron:
|
||||
return [
|
||||
{
|
||||
"id": self.__class__.__name__,
|
||||
"name": f"{self.__class__.__name__}_{self.plugin_name}服务",
|
||||
"trigger": CronTrigger.from_crontab(self._cron),
|
||||
"func": self.check,
|
||||
"kwargs": {}
|
||||
},
|
||||
]
|
||||
return []
|
||||
|
||||
@staticmethod
|
||||
def get_command() -> List[Dict[str, Any]]:
|
||||
"""
|
||||
注册插件远程命令
|
||||
"""
|
||||
return [{
|
||||
"cmd": "/update_wechat_ip",
|
||||
"event": EventType.PluginAction,
|
||||
"desc": "获取企业微信二维码",
|
||||
"category": "获取企业微信二维码",
|
||||
"data": {
|
||||
"action": "update_wechat_ip"
|
||||
}
|
||||
}
|
||||
]
|
||||
|
||||
@eventmanager.register(EventType.PluginAction)
|
||||
def command_action(self, event: Event):
|
||||
"""
|
||||
远程命令响应
|
||||
"""
|
||||
event_data = event.event_data
|
||||
if not event_data or event_data.get("action") not in [i['data']['action'] for i in self.get_command()]:
|
||||
return
|
||||
|
||||
# 获取用户信息
|
||||
channel = event_data.get("channel")
|
||||
arg_str = event_data.get("arg_str")
|
||||
source = event_data.get("source")
|
||||
user = event_data.get("user")
|
||||
if arg_str is not None:
|
||||
if arg_str == '扫码完成':
|
||||
self._login(channel, user)
|
||||
elif len(re.findall('[0-9]', arg_str)) == 6:
|
||||
self._captcha[self._qrcode_key] = arg_str
|
||||
self._confirm_captcha(self._tl_key, self._captcha.get(self._qrcode_key))
|
||||
self._wwrtx_sid = self._se.cookies.get_dict().get('wwrtx.sid')
|
||||
if self._party_cache():
|
||||
self._login_success()
|
||||
self.post_message(
|
||||
channel=channel,
|
||||
title="登录成功",
|
||||
userid=user,
|
||||
text=f"成功登录企业:{self._party_cache_data.get('party_list', {}).get('list', [{}])[0].get('name')}",
|
||||
)
|
||||
else:
|
||||
self.post_message(
|
||||
channel=channel,
|
||||
title="登录失败",
|
||||
userid=user,
|
||||
text=f"登录失败,返回值:{self._party_cache_data}",
|
||||
)
|
||||
else:
|
||||
self.post_message(
|
||||
channel=channel,
|
||||
title="无效的输入",
|
||||
userid=user,
|
||||
content="无效的输入",
|
||||
)
|
||||
else:
|
||||
# 初始化变量
|
||||
self._qrcode_key = None
|
||||
self._tl_key = None
|
||||
self._captcha = {}
|
||||
|
||||
self._qrcode_key = self._get_key()
|
||||
image_url = self._qrcode(self._qrcode_key)
|
||||
|
||||
self.post_message(
|
||||
channel=channel,
|
||||
title="登录二维码",
|
||||
text='\n'.join(
|
||||
[
|
||||
"请选择要执行的操作:",
|
||||
f"如果按钮不可用,可回复:\n```\n/update_wechat_ip 扫码完成\n```"
|
||||
]
|
||||
),
|
||||
userid=user,
|
||||
buttons=[[{"text": f'扫码完成',
|
||||
"callback_data": f"[PLUGIN]{self.__class__.__name__}|扫码完成|{self._qrcode_key}"}]],
|
||||
image=image_url
|
||||
)
|
||||
|
||||
@eventmanager.register(EventType.MessageAction)
|
||||
def message_action(self, event: Event):
|
||||
"""
|
||||
处理消息按钮回调
|
||||
"""
|
||||
event_data = event.event_data
|
||||
if not event_data:
|
||||
return
|
||||
|
||||
# 检查是否为本插件的回调
|
||||
plugin_id = event_data.get("plugin_id")
|
||||
if plugin_id != self.__class__.__name__:
|
||||
return
|
||||
|
||||
# 获取回调数据
|
||||
channel = event_data.get("channel")
|
||||
source = event_data.get("source")
|
||||
userid = event_data.get("userid")
|
||||
# 获取原始消息ID和聊天ID(用于直接更新原消息)
|
||||
original_message_id = event_data.get("original_message_id")
|
||||
original_chat_id = event_data.get("original_chat_id")
|
||||
|
||||
callback_text = event_data.get("text", "")
|
||||
if "|" not in callback_text:
|
||||
self.post_message(
|
||||
channel=channel,
|
||||
title="登录失败",
|
||||
userid=userid,
|
||||
text=f"未获取到本地登录对应的qrcode_key",
|
||||
)
|
||||
return
|
||||
text, qrcode_key = callback_text.split("|", 1)
|
||||
|
||||
if text == "扫码完成":
|
||||
self._qrcode_key = qrcode_key
|
||||
self._login(channel, userid)
|
||||
if text == "输入完毕":
|
||||
self._confirm_captcha(self._tl_key, self._captcha.get(self._qrcode_key))
|
||||
if self._party_cache():
|
||||
self._login_success()
|
||||
self.post_message(
|
||||
channel=channel,
|
||||
title="登录成功",
|
||||
userid=userid,
|
||||
text=f"成功登录企业:{self._party_cache_data.get('party_list', {}).get('list', [{}])[0].get('name')}",
|
||||
)
|
||||
else:
|
||||
self.post_message(
|
||||
channel=channel,
|
||||
title="登录失败",
|
||||
userid=userid,
|
||||
text=f"登录失败,返回值:{self._party_cache_data}",
|
||||
)
|
||||
elif len(re.findall('[0-9]', text)) != 0:
|
||||
if qrcode_key not in self._captcha.keys():
|
||||
self._captcha[qrcode_key] = ""
|
||||
self._captcha[qrcode_key] += text
|
||||
self.post_message(
|
||||
channel=channel,
|
||||
title="短信验证码",
|
||||
userid=userid,
|
||||
buttons=self._get_buttons(),
|
||||
text='\n'.join(
|
||||
[
|
||||
"触发验证码:",
|
||||
f"验证码内容:{self._captcha[qrcode_key]}\n"
|
||||
f"如果按钮不可用,可回复:\n```\n/update_wechat_ip 验证码内容\n```"
|
||||
]
|
||||
),
|
||||
original_message_id=original_message_id,
|
||||
original_chat_id=original_chat_id
|
||||
)
|
||||
else:
|
||||
self.post_message(
|
||||
channel=channel,
|
||||
title="无效的输入",
|
||||
userid=userid,
|
||||
content="无效的输入",
|
||||
)
|
||||
|
||||
def get_api(self) -> List[Dict[str, Any]]:
|
||||
"""没有插件 API 时直接返回空列表。"""
|
||||
return [
|
||||
{
|
||||
"path": "/img/{uuid}",
|
||||
"endpoint": self.get_img,
|
||||
"methods": ["GET"],
|
||||
# 前端插件页面通过 api 模块调用时,通常使用 bear
|
||||
"auth": "apikey",
|
||||
"summary": "获取图片",
|
||||
"description": "获取图片",
|
||||
},
|
||||
{
|
||||
"path": "/UpdateIP",
|
||||
"endpoint": self.UpdateIp,
|
||||
"methods": ["GET"],
|
||||
# 前端插件页面通过 api 模块调用时,通常使用 bear
|
||||
"auth": "apikey",
|
||||
"summary": "更新企业微信IP白名单",
|
||||
"description": "更新企业微信IP白名单,需要传递查询参数,参数名为:ip",
|
||||
},
|
||||
]
|
||||
|
||||
def UpdateIp(self, ip):
|
||||
self._ip = ip
|
||||
self._save_ip_config()
|
||||
|
||||
def get_img(self, uuid):
|
||||
save_path: Path = self.get_data_path() / f"WeChatQr.jpg"
|
||||
return FileResponse(
|
||||
save_path,
|
||||
media_type="image/jpeg"
|
||||
)
|
||||
|
||||
def get_form(self) -> Tuple[List[dict], Dict[str, Any]]:
|
||||
"""返回配置页 JSON 和默认配置模型。"""
|
||||
return [
|
||||
{
|
||||
'component': 'VForm',
|
||||
'content': [
|
||||
{
|
||||
'component': 'VRow',
|
||||
'content': [
|
||||
{
|
||||
'component': 'VCol',
|
||||
'props': {
|
||||
'cols': 12,
|
||||
'md': 4
|
||||
},
|
||||
'content': [
|
||||
{
|
||||
'component': 'VSwitch',
|
||||
'props': {
|
||||
'model': '_enabled',
|
||||
'label': '启用插件',
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
'component': 'VCol',
|
||||
'props': {
|
||||
'cols': 12,
|
||||
'md': 4
|
||||
},
|
||||
'content': [
|
||||
{
|
||||
'component': 'VSwitch',
|
||||
'props': {
|
||||
'model': 'onlyonce',
|
||||
'label': '立即检测一次',
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
'component': 'VRow',
|
||||
'content': [
|
||||
{
|
||||
'component': 'VCol',
|
||||
'props': {
|
||||
'cols': 12,
|
||||
'md': 6
|
||||
},
|
||||
'content': [
|
||||
{
|
||||
'component': 'VTextField',
|
||||
'props': {
|
||||
'model': '_cron',
|
||||
'label': '[必填]检测周期',
|
||||
'placeholder': '*/10 * * * *'
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
'component': 'VCol',
|
||||
'props': {
|
||||
'cols': 12,
|
||||
'md': 6
|
||||
},
|
||||
'content': [
|
||||
{
|
||||
'component': 'VTextarea',
|
||||
'props': {
|
||||
'model': '_app_id',
|
||||
'label': '[必填]应用ID',
|
||||
'rows': 1,
|
||||
'placeholder': '输入应用ID,多个使用(,)英文逗号隔开,在企业微信应用页面URL末尾获取'
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
], {
|
||||
"_enabled": False,
|
||||
"_wwrtx_sid": "",
|
||||
"_app_id": "",
|
||||
"_party_cache_data": {},
|
||||
"_cron": '*/10 * * * *'
|
||||
}
|
||||
|
||||
def get_page(self) -> List[dict]:
|
||||
"""返回详情页 JSON。"""
|
||||
# ---------- 获取并排序更新日志 ----------
|
||||
raw_data = self.get_data(self._UpdateLogKey) or []
|
||||
update_log: List[UpdateLogDto] = [UpdateLogDto.from_dict(i) for i in raw_data]
|
||||
data_list = sorted(update_log, key=lambda x: x.UpdateTime, reverse=True)
|
||||
|
||||
update_log_trs = [
|
||||
{
|
||||
"component": "tr",
|
||||
"props": {"class": "text-sm"},
|
||||
"content": [
|
||||
{
|
||||
"component": "td",
|
||||
"props": {
|
||||
"style": {"color": "red"} if not data.status else {}
|
||||
},
|
||||
"text": "成功" if data.status else "失败",
|
||||
},
|
||||
{"component": "td", "text": data.app_id},
|
||||
{"component": "td", "text": data.ip},
|
||||
{"component": "td", "text": data.result},
|
||||
{"component": "td",
|
||||
"text": data.UpdateTime.strftime('%Y-%m-%d %H:%M:%S') if data.UpdateTime else ""},
|
||||
],
|
||||
}
|
||||
for data in data_list
|
||||
]
|
||||
|
||||
# ---------- 安全获取 party 名称 ----------
|
||||
party_cache = self._party_cache_data or {}
|
||||
party_list = party_cache.get("party_list", {}).get("list") or [{}]
|
||||
party_name = party_list[0].get("name", "未知")
|
||||
|
||||
# ---------- 构建页面结构 ----------
|
||||
return [
|
||||
{
|
||||
"component": "VRow",
|
||||
"content": [
|
||||
{
|
||||
"component": "VCol",
|
||||
"props": {"cols": 12},
|
||||
"content": [
|
||||
# 顶部状态标题
|
||||
{
|
||||
"component": "div",
|
||||
"props": {
|
||||
"style": {
|
||||
"display": "flex",
|
||||
"justifyContent": "center",
|
||||
"alignItems": "center",
|
||||
"flexDirection": "column",
|
||||
"gap": "10px",
|
||||
"marginBottom": "20px", # 增加与表格的间距
|
||||
}
|
||||
},
|
||||
"content": [
|
||||
{
|
||||
"component": "div",
|
||||
"text": f"{party_name}已登录" if self._is_login else "登录失效",
|
||||
"props": {
|
||||
"style": {
|
||||
"fontSize": "22px",
|
||||
"fontWeight": "bold",
|
||||
"color": "#ffffff",
|
||||
"backgroundColor": "#9B50FF",
|
||||
"padding": "8px 16px",
|
||||
"borderRadius": "5px",
|
||||
"textAlign": "center",
|
||||
"display": "inline-block",
|
||||
}
|
||||
},
|
||||
}
|
||||
],
|
||||
},
|
||||
# 日志表格
|
||||
{
|
||||
"component": "VTable",
|
||||
"props": {"hover": True},
|
||||
"content": [
|
||||
{
|
||||
"component": "thead",
|
||||
"props": {"class": "text-no-wrap"},
|
||||
"content": [
|
||||
{
|
||||
"component": "th",
|
||||
"props": {"class": "text-start ps-4"},
|
||||
"text": "状态",
|
||||
},
|
||||
{
|
||||
"component": "th",
|
||||
"props": {"class": "text-start ps-4"},
|
||||
"text": "appId",
|
||||
},
|
||||
{
|
||||
"component": "th",
|
||||
"props": {"class": "text-start ps-4"},
|
||||
"text": "更新IP",
|
||||
},
|
||||
{
|
||||
"component": "th",
|
||||
"props": {"class": "text-start ps-4"},
|
||||
"text": "返回值",
|
||||
},
|
||||
{
|
||||
"component": "th",
|
||||
"props": {"class": "text-start ps-4"},
|
||||
"text": "更新时间",
|
||||
},
|
||||
],
|
||||
},
|
||||
{
|
||||
"component": "tbody",
|
||||
"content": update_log_trs,
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
}
|
||||
],
|
||||
}
|
||||
]
|
||||
|
||||
def stop_service(self):
|
||||
"""没有后台任务时可以留空。"""
|
||||
pass
|
||||
|
||||
def _get_key(self):
|
||||
logger.info("开始获取登录二维码key")
|
||||
url = "https://work.weixin.qq.com/wework_admin/wwqrlogin/mng/get_key"
|
||||
current_ts = int(time.time() * 1000)
|
||||
params = {
|
||||
'r': str(random.random()),
|
||||
'login_type': "login_admin",
|
||||
'callback': f"wwqrloginCallback_{current_ts}",
|
||||
'redirect_uri': "https://work.weixin.qq.com/wework_admin/loginpage_wx?_r=234&redirect_uri=https%3A%2F%2Fwork.weixin.qq.com%2Fwework_admin%2Fframe&url_hash=%23%2Fapps#/apps",
|
||||
'crossorigin': "1"
|
||||
}
|
||||
response = self._se.get(url, params=params, headers=self._headers)
|
||||
logger.info(f"获取登录二维码key成功,返回值:{response.text}")
|
||||
|
||||
return response.json().get('data', {}).get('qrcode_key')
|
||||
|
||||
def _qrcode(self, key) -> str:
|
||||
logger.info("开始获取登录二维码图片")
|
||||
url = "https://work.weixin.qq.com/wework_admin/wwqrlogin/mng/qrcode"
|
||||
params = {
|
||||
'qrcode_key': key,
|
||||
'login_type': "login_admin"
|
||||
}
|
||||
response = self._se.get(url, params=params, headers=self._headers)
|
||||
logger.info("登录二维码图片获取成功")
|
||||
img_path: Path = self.get_data_path() / f"WeChatQr.jpg"
|
||||
img_path.write_bytes(response.content)
|
||||
logger.info(f"登录二维码已写入文件,路径:{img_path}")
|
||||
uri = f"/api/v1/plugin/{self.__class__.__name__}/img/{uuid.uuid4().__str__().replace('-', '')}?apikey={settings.API_TOKEN}"
|
||||
img_url = settings.MP_DOMAIN(uri) or f"http://127.0.0.1:{settings.PORT}{uri}"
|
||||
logger.info(f"构建二维码地址为:{img_url}")
|
||||
return img_url
|
||||
|
||||
def _check(self, key) -> Dict:
|
||||
logger.info(f"开始获取扫码结果")
|
||||
for _ in range(2):
|
||||
url = "https://work.weixin.qq.com/wework_admin/wwqrlogin/mng/check"
|
||||
params = {
|
||||
'qrcode_key': key,
|
||||
'status': "QRCODE_SCAN_ING"
|
||||
}
|
||||
response = self._se.get(url, params=params, headers=self._headers)
|
||||
data = response.json().get('data', {})
|
||||
logger.info(f"扫码结果获取完成:{response.text}")
|
||||
if data.get("status") == "QRCODE_SCAN_SUCC":
|
||||
return data
|
||||
time.sleep(1)
|
||||
logger.info(f"获取扫码结果超时")
|
||||
return None
|
||||
|
||||
def _loginpage_wx(self, key, code) -> requests.Response:
|
||||
logger.info(f"开始登录")
|
||||
url = "https://work.weixin.qq.com/wework_admin/loginpage_wx"
|
||||
params = {
|
||||
'_r': "234",
|
||||
'redirect_uri': "https://work.weixin.qq.com/wework_admin/frame",
|
||||
'url_hash': "#/apps",
|
||||
'code': code,
|
||||
'auth_redirect_time': "1780446137000",
|
||||
'getauth_time': "1780446137000",
|
||||
'wwqrlogin': "1",
|
||||
'qrcode_key': key,
|
||||
'auth_source': "SOURCE_FROM_WEWORK",
|
||||
'confirm_type': "0"
|
||||
}
|
||||
response = self._se.get(url, params=params, headers=self._headers)
|
||||
logger.info(f"登录完成,返回值:{response.text}")
|
||||
return response
|
||||
|
||||
def _confirm_captcha(self, tl_key, captcha):
|
||||
logger.info(f"开始提交验证码")
|
||||
_url = "https://work.weixin.qq.com/wework_admin/mobile_confirm/confirm_captcha?ajax=1&f=json&d2st="
|
||||
_data = {
|
||||
"captcha": captcha,
|
||||
"tl_key": tl_key
|
||||
}
|
||||
res = self._se.post(_url, json=_data, headers=self._headers)
|
||||
logger.info(f"提交验证码返回值:{res.text}")
|
||||
res = self._se.get(f"https://work.weixin.qq.com/wework_admin/login/choose_corp?tl_key={tl_key}")
|
||||
logger.info(f"choose_corp接口返回值:{res.text}")
|
||||
|
||||
def _party_cache(self):
|
||||
logger.info(f"开始获取企业信息,判断是否登录成功")
|
||||
if not self._wwrtx_sid:
|
||||
return False
|
||||
url = "https://work.weixin.qq.com/wework_admin/contacts/party/cache"
|
||||
params = {
|
||||
'lang': "zh_CN",
|
||||
'f': "json",
|
||||
'ajax': "1",
|
||||
'timeZoneInfo[zone_offset]': "-8",
|
||||
}
|
||||
self._se.cookies.set('wwrtx.sid', self._wwrtx_sid)
|
||||
try:
|
||||
res = self._se.post(url, params=params, headers=self._headers, timeout=10)
|
||||
if res.status_code == 200:
|
||||
data = res.json()
|
||||
if 'errCode' not in res.text:
|
||||
self._party_cache_data = data.get('data')
|
||||
self._is_login = True
|
||||
return True
|
||||
else:
|
||||
self._party_cache_data = data
|
||||
else:
|
||||
logger.error(f"获取企业微信部门缓存失败,HTTP状态码:{res.status_code}")
|
||||
except Exception as e:
|
||||
logger.error(f"获取企业微信部门缓存异常: {e}")
|
||||
self._is_login = False
|
||||
return False
|
||||
|
||||
def _login(self, channel, userid):
|
||||
logger.info(f"触发登录回调,开始执行登录步骤")
|
||||
check_data = self._check(self._qrcode_key)
|
||||
if check_data:
|
||||
code = check_data.get('auth_code')
|
||||
res = self._loginpage_wx(self._qrcode_key, code)
|
||||
if 'tl_key' in res.url:
|
||||
logger.info(f"返回值中获取到tl_key,触发短信验证码")
|
||||
self.post_message(
|
||||
channel=channel,
|
||||
title="短信验证码",
|
||||
userid=userid,
|
||||
buttons=self._get_buttons(),
|
||||
text='\n'.join(
|
||||
[
|
||||
"触发验证码:",
|
||||
f"如果按钮不可用,可回复:\n```\n/update_wechat_ip 验证码内容\n```"
|
||||
]
|
||||
),
|
||||
)
|
||||
parsed = urlparse(res.url)
|
||||
query_params = parse_qs(parsed.query)
|
||||
# 获取 tl_key 的值(parse_qs 返回字典,每个键对应一个列表)
|
||||
self._tl_key = query_params.get('tl_key', [None])[0]
|
||||
else:
|
||||
self._wwrtx_sid = self._se.cookies.get_dict().get('wwrtx.sid')
|
||||
if self._party_cache():
|
||||
logger.info(f"登录成功")
|
||||
self._login_success()
|
||||
self.post_message(
|
||||
channel=channel,
|
||||
title="登录成功",
|
||||
userid=userid,
|
||||
text=f"成功登录企业:{self._party_cache_data.get('party_list', {}).get('list', [{}])[0].get('name')}",
|
||||
)
|
||||
else:
|
||||
logger.error(f"登录失败,返回值:{self._party_cache_data}")
|
||||
self.post_message(
|
||||
channel=channel,
|
||||
title="登录失败",
|
||||
userid=userid,
|
||||
text=f"登录失败,返回值:{self._party_cache_data}",
|
||||
)
|
||||
|
||||
def _save_ip_config(self):
|
||||
logger.info(f"更新IP为:{self._ip}")
|
||||
_update_log = []
|
||||
url = 'https://work.weixin.qq.com/wework_admin/apps/saveIpConfig?lang=zh_CN&f=json&ajax=1'
|
||||
for appId in self._app_id.split(','):
|
||||
appId = appId.strip()
|
||||
if not appId:
|
||||
continue
|
||||
data = {
|
||||
'app_id': appId,
|
||||
'ipList[]': self._ip
|
||||
}
|
||||
res = self._se.post(url, data=data, headers=self._headers)
|
||||
if 'err' in res.text:
|
||||
logger.error(f"{appId}更新IP白名单失败,返回值:{res.text}")
|
||||
else:
|
||||
logger.info(f'{appId}更新白名单成功,更新IP为:{self._ip},接口返回值:{res.text}')
|
||||
|
||||
_update_log.append(UpdateLogDto(
|
||||
status='err' not in res.text,
|
||||
ip=self._ip,
|
||||
app_id=appId,
|
||||
result=res.text
|
||||
))
|
||||
|
||||
update_log: List[UpdateLogDto] = [UpdateLogDto.from_dict(i) for i in self.get_data(self._UpdateLogKey) or []]
|
||||
self.save_data(self._UpdateLogKey, [i.to_dict() for i in update_log + _update_log])
|
||||
|
||||
def _login_success(self):
|
||||
logger.info("保存配置文件")
|
||||
self.update_config({
|
||||
'_enabled': self._enabled,
|
||||
'_wwrtx_sid': self._wwrtx_sid,
|
||||
'_app_id': self._app_id,
|
||||
'_party_cache_data': self._party_cache_data,
|
||||
'_cron': self._cron,
|
||||
})
|
||||
|
||||
def _get_buttons(self):
|
||||
buttons = [
|
||||
[
|
||||
{
|
||||
"text": str(j),
|
||||
"callback_data": f"[PLUGIN]{self.__class__.__name__}|{j}|{self._qrcode_key}"
|
||||
}
|
||||
for j in range(i * 5, (i + 1) * 5)
|
||||
]
|
||||
for i in range(2)
|
||||
]
|
||||
buttons.append(
|
||||
[{"text": f'输入完毕',
|
||||
"callback_data": f"[PLUGIN]{self.__class__.__name__}|输入完毕|{self._qrcode_key}"}]
|
||||
)
|
||||
return buttons
|
||||
|
||||
def get_ip_from_url(self):
|
||||
urls = self._ip_urls
|
||||
for url in urls:
|
||||
try:
|
||||
response = requests.get(url, timeout=3)
|
||||
if response.status_code == 200:
|
||||
ip_address = re.search(self._ip_pattern, response.text)
|
||||
if ip_address:
|
||||
return ip_address.group()
|
||||
except Exception as e:
|
||||
if "104" not in str(e) and 'Read timed out' not in str(e): # 忽略网络波动,都失败会返回None, "获取IP失败"
|
||||
logger.warning(f"{url} 获取IP失败, Error: {e}")
|
||||
return "获取IP失败"
|
||||
|
||||
def _get_corp_app_v2(self):
|
||||
logger.info(f"开始获取企业应用配置")
|
||||
if not self._app_id:
|
||||
logger.error("未配置应用ID")
|
||||
return {}
|
||||
app_id = self._app_id.split(",")[0].strip()
|
||||
url = f'https://work.weixin.qq.com/wework_admin/apps/getCorpAppV2?lang=zh_CN&f=json&ajax=1&app_id={app_id}'
|
||||
try:
|
||||
res = self._se.get(url, timeout=10)
|
||||
if res.status_code == 200:
|
||||
return res.json().get('data', {})
|
||||
else:
|
||||
logger.error(f"获取企业应用配置失败,HTTP状态码:{res.status_code}")
|
||||
except Exception as e:
|
||||
logger.error(f"获取企业应用配置异常: {e}")
|
||||
return {}
|
||||
|
||||
def check(self):
|
||||
if not self._enabled:
|
||||
logger.error("插件未开启")
|
||||
return
|
||||
self._party_cache()
|
||||
if not self._is_login:
|
||||
logger.error("未登录")
|
||||
self.post_message(
|
||||
title="企业微信登录状态失效",
|
||||
text='企业微信登录状态失效,请重新操作登录'
|
||||
)
|
||||
return
|
||||
self._ip = self.get_ip_from_url()
|
||||
if not self._ip or self._ip == "获取IP失败":
|
||||
logger.error("获取当前公网IP失败,跳过本次检测")
|
||||
return
|
||||
app_config = self._get_corp_app_v2()
|
||||
app_config_ips = app_config.get('app', {}).get('white_ip_list', {}).get('ip', [])
|
||||
if self._ip not in app_config_ips:
|
||||
self._save_ip_config()
|
||||
self.post_message(
|
||||
title='企业微信IP更新',
|
||||
text="出发IP更新,最新IP为:" + self._ip
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class UpdateLogDto:
|
||||
status: bool
|
||||
ip: str
|
||||
app_id: str
|
||||
result: str
|
||||
UpdateTime: datetime = None
|
||||
|
||||
def __post_init__(self):
|
||||
if self.UpdateTime is None:
|
||||
self.UpdateTime = datetime.now()
|
||||
|
||||
def to_dict(self):
|
||||
return {
|
||||
"status": self.status,
|
||||
"ip": self.ip,
|
||||
"app_id": self.app_id,
|
||||
"result": self.result,
|
||||
"UpdateTime": self.UpdateTime.isoformat()
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: dict):
|
||||
# 深拷贝一份,避免修改原字典
|
||||
kwargs = dict(data)
|
||||
# 将 'UpdateTime' 字符串转为 datetime,注意参数名对应 __init__ 的 update_time
|
||||
kwargs['UpdateTime'] = datetime.fromisoformat(kwargs.pop('UpdateTime'))
|
||||
return cls(**kwargs)
|
||||
1
plugins.v2/updatewechatip/requirements.txt
Normal file
1
plugins.v2/updatewechatip/requirements.txt
Normal file
@@ -0,0 +1 @@
|
||||
requests>=2.34.2
|
||||
185
tests/v2/libraryscraper/test_plugin.py
Normal file
185
tests/v2/libraryscraper/test_plugin.py
Normal file
@@ -0,0 +1,185 @@
|
||||
from pathlib import Path
|
||||
|
||||
import libraryscraper
|
||||
from app.core.config import settings
|
||||
from app.schemas import MediaType
|
||||
from libraryscraper import LibraryScraper
|
||||
|
||||
|
||||
def test_get_scrape_item_uses_media_folder_below_category(monkeypatch):
|
||||
"""带分类目录的电影结构,应定位到影片目录,而不是动画电影这类分类目录。"""
|
||||
monkeypatch.setattr(
|
||||
settings,
|
||||
"MOVIE_RENAME_FORMAT",
|
||||
"{{title}}{% if year %} ({{year}}){% endif %}/{{title}}{{fileExt}}",
|
||||
)
|
||||
scraper_path = Path("/media/strm-new/电影")
|
||||
file_path = scraper_path / "动画电影" / "哪吒之魔童闹海 (2025)" / "哪吒之魔童闹海 (2025).strm"
|
||||
|
||||
item = LibraryScraper._LibraryScraper__get_scrape_item(file_path, scraper_path, MediaType.MOVIE)
|
||||
|
||||
assert item == (scraper_path / "动画电影" / "哪吒之魔童闹海 (2025)", MediaType.MOVIE, "dir", None)
|
||||
|
||||
|
||||
def test_get_scrape_item_uses_tv_root_relative_path(monkeypatch):
|
||||
"""扫描根目录可直接配置到分类层,剧集目录计算仍应基于该根目录的相对路径。"""
|
||||
monkeypatch.setattr(
|
||||
settings,
|
||||
"TV_RENAME_FORMAT",
|
||||
"{{title}}{% if year %} ({{year}}){% endif %}/Season {{season}}/{{title}} - {{season_episode}}{{fileExt}}",
|
||||
)
|
||||
scraper_path = Path("/media/strm-new/电视剧/国产剧")
|
||||
file_path = scraper_path / "狂飙 (2023)" / "Season 1" / "狂飙 - S01E01.strm"
|
||||
|
||||
item = LibraryScraper._LibraryScraper__get_scrape_item(file_path, scraper_path, MediaType.TV)
|
||||
|
||||
assert item == (scraper_path / "狂飙 (2023)", MediaType.TV, "dir", None)
|
||||
|
||||
|
||||
def test_get_scrape_item_falls_back_to_file_when_rename_format_is_flat(monkeypatch):
|
||||
"""重命名格式没有目录层级时,不能跳过文件,应退回到单文件刮削。"""
|
||||
monkeypatch.setattr(settings, "TV_RENAME_FORMAT", "{{title}} - {{season_episode}}{{fileExt}}")
|
||||
scraper_path = Path("/media/strm-new/电视剧/国产剧")
|
||||
file_path = scraper_path / "狂飙 - S01E01.strm"
|
||||
|
||||
item = LibraryScraper._LibraryScraper__get_scrape_item(file_path, scraper_path, MediaType.TV)
|
||||
|
||||
assert item == (file_path, MediaType.TV, "file", None)
|
||||
|
||||
|
||||
def test_get_scrape_item_uses_real_issue_paths_from_library_root(monkeypatch):
|
||||
"""从媒体库根目录扫描时,应跳过电影/电视剧下的分类层,定位到真实媒体目录。"""
|
||||
monkeypatch.setattr(
|
||||
settings,
|
||||
"MOVIE_RENAME_FORMAT",
|
||||
"{{title}}{% if year %} ({{year}}){% endif %}/{{title}}{{fileExt}}",
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
settings,
|
||||
"TV_RENAME_FORMAT",
|
||||
"{{title}}{% if year %} ({{year}}){% endif %}/Season {{season}}/{{title}} - {{season_episode}}{{fileExt}}",
|
||||
)
|
||||
scraper_path = Path("/media/strm-new")
|
||||
movie_file = scraper_path / "电影" / "动画电影" / "影片名" / "xxx.strm"
|
||||
tv_file = scraper_path / "电视剧" / "国产剧" / "剧名" / "Season 1" / "xxx.strm"
|
||||
|
||||
movie_item = LibraryScraper._LibraryScraper__get_scrape_item(movie_file, scraper_path, MediaType.MOVIE)
|
||||
tv_item = LibraryScraper._LibraryScraper__get_scrape_item(tv_file, scraper_path, MediaType.TV)
|
||||
|
||||
assert movie_item == (scraper_path / "电影" / "动画电影" / "影片名", MediaType.MOVIE, "dir", None)
|
||||
assert tv_item == (scraper_path / "电视剧" / "国产剧" / "剧名", MediaType.TV, "dir", None)
|
||||
|
||||
|
||||
def test_libraryscraper_scans_real_issue_paths(tmp_path, monkeypatch):
|
||||
"""直接跑扫描入口,确认用户提到的两类路径不会停在分类目录。"""
|
||||
root = tmp_path / "strm-new"
|
||||
movie_dir = root / "电影" / "动画电影" / "影片名"
|
||||
tv_dir = root / "电视剧" / "国产剧" / "剧名" / "Season 1"
|
||||
movie_dir.mkdir(parents=True)
|
||||
tv_dir.mkdir(parents=True)
|
||||
(movie_dir / "xxx.strm").write_text("", encoding="utf-8")
|
||||
(tv_dir / "xxx.strm").write_text("", encoding="utf-8")
|
||||
|
||||
class FakeMediaInfo:
|
||||
tmdb_id = 129
|
||||
|
||||
def __init__(self, mtype):
|
||||
self.type = mtype
|
||||
|
||||
class FakeChain:
|
||||
def __init__(self):
|
||||
self.recognized_types = []
|
||||
|
||||
def recognize_media(self, meta=None, mtype=None, tmdbid=None, **kwargs):
|
||||
recognized_type = mtype or meta.type
|
||||
self.recognized_types.append(recognized_type)
|
||||
return FakeMediaInfo(recognized_type)
|
||||
|
||||
def obtain_images(self, mediainfo):
|
||||
return None
|
||||
|
||||
class FakeMediaChain:
|
||||
def __init__(self):
|
||||
self.scraped_items = []
|
||||
|
||||
def scrape_metadata(self, fileitem, **kwargs):
|
||||
self.scraped_items.append(fileitem)
|
||||
|
||||
fake_media_chain = FakeMediaChain()
|
||||
monkeypatch.setattr(settings, "SCRAP_FOLLOW_TMDB", True)
|
||||
monkeypatch.setattr(
|
||||
settings,
|
||||
"MOVIE_RENAME_FORMAT",
|
||||
"{{title}}{% if year %} ({{year}}){% endif %}/{{title}}{{fileExt}}",
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
settings,
|
||||
"TV_RENAME_FORMAT",
|
||||
"{{title}}{% if year %} ({{year}}){% endif %}/Season {{season}}/{{title}} - {{season_episode}}{{fileExt}}",
|
||||
)
|
||||
monkeypatch.setattr(libraryscraper, "MediaChain", lambda: fake_media_chain)
|
||||
|
||||
plugin = LibraryScraper()
|
||||
fake_chain = FakeChain()
|
||||
plugin.chain = fake_chain
|
||||
plugin.init_plugin({"scraper_paths": str(root), "exclude_paths": ""})
|
||||
plugin._LibraryScraper__libraryscraper()
|
||||
|
||||
scraped_paths = {Path(fileitem.path.rstrip("/")) for fileitem in fake_media_chain.scraped_items}
|
||||
assert scraped_paths == {movie_dir, tv_dir.parent}
|
||||
assert fake_chain.recognized_types == [MediaType.MOVIE, MediaType.TV]
|
||||
|
||||
|
||||
def test_forced_tv_root_skips_movie_category_paths():
|
||||
"""配置媒体库根目录加 #电视剧 时,应跳过同级电影分类路径。"""
|
||||
scraper_path = Path("/media/strm-new")
|
||||
movie_file = scraper_path / "电影" / "动画电影" / "影片名" / "xxx.strm"
|
||||
tv_file = scraper_path / "电视剧" / "国产剧" / "剧名" / "Season 1" / "xxx.strm"
|
||||
|
||||
assert not LibraryScraper._LibraryScraper__match_forced_type_path(movie_file, scraper_path, MediaType.TV)
|
||||
assert LibraryScraper._LibraryScraper__match_forced_type_path(tv_file, scraper_path, MediaType.TV)
|
||||
|
||||
|
||||
def test_scrape_dir_falls_back_to_child_files(tmp_path, monkeypatch):
|
||||
"""分类目录识别失败后,应继续刮削目录内的具体媒体文件。"""
|
||||
category_path = tmp_path / "电影" / "动画电影"
|
||||
category_path.mkdir(parents=True)
|
||||
media_file = category_path / "哪吒之魔童闹海 (2025).strm"
|
||||
media_file.write_text("", encoding="utf-8")
|
||||
|
||||
class FakeMediaInfo:
|
||||
tmdb_id = 129
|
||||
type = MediaType.MOVIE
|
||||
|
||||
class FakeChain:
|
||||
def __init__(self):
|
||||
self.recognize_calls = 0
|
||||
|
||||
def recognize_media(self, **kwargs):
|
||||
self.recognize_calls += 1
|
||||
return None if self.recognize_calls == 1 else FakeMediaInfo()
|
||||
|
||||
def obtain_images(self, mediainfo):
|
||||
return None
|
||||
|
||||
class FakeMediaChain:
|
||||
def __init__(self):
|
||||
self.scraped_items = []
|
||||
|
||||
def scrape_metadata(self, fileitem, **kwargs):
|
||||
self.scraped_items.append(fileitem)
|
||||
|
||||
fake_chain = FakeChain()
|
||||
fake_media_chain = FakeMediaChain()
|
||||
monkeypatch.setattr(settings, "SCRAP_FOLLOW_TMDB", True)
|
||||
monkeypatch.setattr(libraryscraper, "MediaChain", lambda: fake_media_chain)
|
||||
|
||||
plugin = LibraryScraper()
|
||||
plugin.chain = fake_chain
|
||||
|
||||
plugin._LibraryScraper__scrape_path(category_path, MediaType.MOVIE, target_type="dir")
|
||||
|
||||
assert len(fake_media_chain.scraped_items) == 1
|
||||
fileitem = fake_media_chain.scraped_items[0]
|
||||
assert fileitem.type == "file"
|
||||
assert fileitem.path == media_file.as_posix()
|
||||
Reference in New Issue
Block a user