Compare commits

...

9 Commits

11 changed files with 3326 additions and 443 deletions

View File

@@ -195,11 +195,13 @@
"name": "媒体库刮削",
"description": "定时对媒体库进行刮削,补齐缺失元数据和图片。",
"labels": "刮削",
"version": "2.1.1",
"version": "2.1.3",
"icon": "scraper.png",
"author": "jxxghp",
"level": 1,
"history": {
"v2.1.3": "修复分类路径下按文件标记识别及强制类型跨库扫描问题",
"v2.1.2": "修复分类目录被误识别导致下级媒体未刮削的问题",
"v2.1.1": "调整目录计算方法,以支持更多重命名格式",
"v2.1": "优化执行周期输入需要MoviePilot v2.2.1+",
"v2.0": "兼容MoviePilot V2 版本",
@@ -586,11 +588,12 @@
"name": "美剧生词标注",
"description": "根据CEFR等级为英语影视剧标注高级词汇。",
"labels": "英语",
"version": "1.2.5",
"version": "1.2.6",
"icon": "LexiAnnot.png",
"author": "wumode",
"level": 1,
"history": {
"v1.2.6": "适配 MoviePilot 新版 LLM 助手",
"v1.2.5": "langchain 1.x 兼容 (主程序版本需高于 2.9.17)",
"v1.2.4": "增强数据校验",
"v1.2.3": "优化提示词",
@@ -663,12 +666,13 @@
"name": "动态企微可信IP",
"description": "修改企微应用可信IP支持Srever酱等第三方通知。验证码以结尾发送到企业微信应用",
"labels": "消息通知",
"version": "2.1.1",
"version": "2.1.2",
"icon": "Wecom_A.png",
"author": "RamenRa",
"level": 2,
"system_version": ">=2.12.0",
"history": {
"v2.1.2": "修复本地扫码获取不到验证码的问题" ,
"v2.1.1": "优化MP/Nas关闭期间IP变动检测不到的现象。支持IYUU通知移除AnPush v2支持在微信通知失效时用第三方发送通知 支持||Q修改IP时不发送通知 使用全局AI助手需使用/wxcode 510010的格式发送验证码",
"v2.0.1": "修复企业微信后台页面语言未稳定切换为中文导致无法匹配配置按钮的问题。",
"v2.0.0": "V2 专用大版本改用 CloakBrowser 启动企业微信浏览器流程,默认插件不再声明 V2 兼容。",
@@ -1099,5 +1103,38 @@
"v1.0.2": "修复UI界面显示不全及前端路由报错问题",
"v1.0.1": "新增 Agent Tokens 配额管理、供应商优先级切换和用量展示"
}
},
"TraktCleaner": {
"name": "Trakt 观看清理",
"description": "根据 Trakt 播放记录,自动清理下载器中已观看的种子。",
"labels": "Trakt,清理",
"version": "1.0",
"icon": "https://cdn.jsdelivr.net/gh/homarr-labs/dashboard-icons/png/trakt.png",
"author": "Guoyin-Wen",
"level": 1,
"history": {
"v1.0": "初始版本:根据 Trakt 播放记录自动清理下载器中已观看的种子"
}
},
"UpdateWeChatIp": {
"name": "动态企微可信IP",
"description": "修改企微应用可信IP,可本地扫码刷新Cookie,直接调用接口更稳定",
"labels": "消息通知",
"version": "1.0.8",
"icon": "Wecom_A.png",
"author": "书小白",
"level": 2,
"v2": true,
"history": {
"1.0.8": "完善日志输出",
"1.0.7": "插件初始化时调用一下check确定登录状态",
"1.0.6": "修复未登录时_party_cache_data为空导致UI崩溃的BUG\n图片地址优先使用MP_DOMAIN获取,如果未配置使用127.0.0.1地址\n回调解析qrcode_key时判断是否存在,不存在发送错误\n优化请求企微接口的参数",
"1.0.5": "根据Code Review结果优化代码",
"1.0.4": "增加IP更新记录查询",
"1.0.3": "cookie保活输出返回值",
"1.0.2": "支持多个应用ID",
"1.0.1": "IP更新时发送通知,增加API接口,指定更新的IP",
"1.0.0": "初始化"
}
}
}

View File

@@ -31,7 +31,7 @@ class DynamicWeChat(_PluginBase):
# 插件图标
plugin_icon = "Wecom_A.png"
# 插件版本
plugin_version = "2.1.1"
plugin_version = "2.1.2"
# 插件作者
plugin_author = "RamenRa"
# 作者主页
@@ -320,6 +320,7 @@ class DynamicWeChat(_PluginBase):
if not event_data or event_data.get("action") != "dynamicwechat":
return
context = None
self._qr_running = True
try:
context = self._launch_browser_context(headless=True)
page = context.new_page()
@@ -349,6 +350,7 @@ class DynamicWeChat(_PluginBase):
except Exception as e:
logger.error(f"本地扫码任务: 本地扫码失败: {e}")
finally:
self._qr_running = False
if context:
context.close()

View File

@@ -1,55 +1,39 @@
import asyncio
import copy
import os
import json
import os
import queue
import re
import subprocess
import sys
import threading
from collections import Counter
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Tuple, Optional, Literal
from typing import Any, Dict, List, Literal, Optional, Tuple
import pymediainfo
from langdetect import detect
from langchain_community.callbacks import get_openai_callback
from pysubs2 import SSAFile, SSAEvent, SSAStyle, Color, Alignment
from pysubs2 import Alignment, Color, SSAEvent, SSAStyle, SSAFile
from app.core.config import settings
from app.agent.llm.helper import LLMHelper
from app.chain.media import MediaChain
from app.core.cache import cached
from app.core.config import global_vars, settings
from app.core.context import MediaInfo
from app.core.event import Event, eventmanager
from app.helper.directory import DirectoryHelper
from app.log import logger
from app.plugins import _PluginBase
from app.core.cache import cached
from app.core.event import eventmanager, Event
from app.schemas import Response
from app.schemas.types import NotificationType, MediaType
from app.schemas import Context, Response, TransferInfo
from app.schemas.types import EventType, MediaType, NotificationType
from app.utils.http import RequestUtils
from app.utils.string import StringUtils
from app.schemas import TransferInfo, Context
from app.schemas.types import EventType
from app.core.context import MediaInfo
from app.chain.media import MediaChain
from .agenttool import QueryAnnotationTasksTool, VocabularyAnnotatingTool
from .lexicon import Lexicon
from .schemas import (
IDGenerator,
TaskStatus,
Task,
TasksApiParams,
ProcessResult,
SegmentList,
TaskParams, SegmentStatistics,
)
from .pipeline import UNIVERSAL_POS_MAP, extract_advanced_words, llm_process_chain
from .schemas import IDGenerator, ProcessResult, SegmentList, SegmentStatistics, Task, TaskParams, TasksApiParams, \
TaskStatus, LLMConfig
from .spacyworker import SpacyWorker
from .subtitle import SubtitleProcessor, style_text
from .pipeline import (
extract_advanced_words,
llm_process_chain,
initialize_llm,
UNIVERSAL_POS_MAP,
)
from .subtitle import SubtitleHelper, SubtitleProcessor, style_text
class LexiAnnot(_PluginBase):
@@ -60,7 +44,7 @@ class LexiAnnot(_PluginBase):
# 插件图标
plugin_icon = "LexiAnnot.png"
# 插件版本
plugin_version = "1.2.5"
plugin_version = "1.2.6"
# 插件作者
plugin_author = "wumode"
# 作者主页
@@ -91,7 +75,6 @@ class LexiAnnot(_PluginBase):
_ffmpeg_path: str = "ffmpeg"
_english_only = False
_when_file_trans = False
_model_temperature = ""
_custom_files = ""
_accent_color = ""
_font_scaling = ""
@@ -102,6 +85,8 @@ class LexiAnnot(_PluginBase):
_libraries: List[str] = []
_use_mp_agent: bool = False
_use_proxy: bool = False
_test_llm: bool = False
_thinking_level: str = None
# protected variables
_lexicon_repo = "https://raw.githubusercontent.com/wumode/LexiAnnot/"
@@ -137,7 +122,6 @@ class LexiAnnot(_PluginBase):
self._ffmpeg_path = config.get("ffmpeg_path") or "ffmpeg"
self._english_only = config.get("english_only")
self._when_file_trans = config.get("when_file_trans")
self._model_temperature = config.get("model_temperature") or "0.3"
self._show_phonetics = config.get("show_phonetics")
self._custom_files = config.get("custom_files") or ""
self._accent_color = config.get("accent_color")
@@ -151,6 +135,8 @@ class LexiAnnot(_PluginBase):
self._llm_provider = config.get("llm_provider") or "google"
self._use_mp_agent = config.get("use_mp_agent") or False
self._use_proxy = config.get("use_proxy") or False
self._test_llm = config.get("test_llm") or False
self._thinking_level = config.get("thinking_level") or "off"
libraries = [
library.name for library in DirectoryHelper().get_library_dirs()
@@ -158,7 +144,7 @@ class LexiAnnot(_PluginBase):
self._libraries = [
library for library in self._libraries if library in libraries
]
self._accent_color_rgb = LexiAnnot.hex_to_rgb(self._accent_color) or (255, 255, 0,)
self._accent_color_rgb = SubtitleHelper.hex_to_rgb(self._accent_color) or (255, 255, 0,)
self._color_alpha = int(self._opacity) if self._opacity and len(self._opacity) else 0
if self._delete_data:
# 删除不再保存在数据库的数据
@@ -193,6 +179,9 @@ class LexiAnnot(_PluginBase):
continue
self.add_media_file(file_path)
self._onlyonce = False
if self._test_llm:
asyncio.run_coroutine_threadsafe(self.test_llm(), global_vars.loop)
self._test_llm = False
self.__update_config()
def get_form(self) -> Tuple[List[dict], Dict[str, Any]]:
@@ -679,14 +668,17 @@ class LexiAnnot(_PluginBase):
"model": "gemini_model",
"disabled": "use_mp_agent",
"label": "模型名称",
"hint": "支持手动输入",
"persistent-hint": True,
"items": [
"gemini-2.5-flash",
"gemini-2.5-flash-lite",
"gemini-3.5-flash",
"gemini-3.1-flash-lite",
"gemini-2.5-pro",
"gemini-2.0-flash",
"gemini-2.0-flash-lite",
"deepseek-ai/DeepSeek-V3.2",
"deepseek-ai/DeepSeek-R1"
"gemini-2.5-flash-lite",
"deepseek-ai/DeepSeek-V4-Pro",
"deepseek-ai/DeepSeek-V4-Flash",
"deepseek-v4-flash",
"deepseek-v4-pro"
],
},
}
@@ -735,28 +727,6 @@ class LexiAnnot(_PluginBase):
}
],
},
{
"component": "VCol",
"props": {"cols": 12, "md": 4},
"content": [
{
"component": "VSelect",
"props": {
"model": "model_temperature",
"label": "模型温度",
"items": [
{"title": "0", "value": "0"},
{"title": "0.1", "value": "0.1"},
{"title": "0.2", "value": "0.2"},
{"title": "0.3", "value": "0.3"},
{"title": "0.4", "value": "0.4"},
{"title": "0.5", "value": "0.5"},
{"title": "1.0", "value": "1.0"},
],
},
}
],
},
{
"component": "VCol",
"props": {
@@ -777,8 +747,55 @@ class LexiAnnot(_PluginBase):
}
],
},
{
"component": "VCol",
"props": {"cols": 12, "md": 4},
"content": [
{
"component": "VSelect",
"props": {
"model": "thinking_level",
"label": "思考模式",
"disabled": "use_mp_agent",
"items": [
{"title": "关闭 (off)", "value": "off"},
{"title": "自动 (auto)", "value": "auto"},
{"title": "最小 (minimal)", "value": "minimal"},
{"title": "低 (low)", "value": "low"},
{"title": "中 (medium)", "value": "medium"},
{"title": "高 (high)", "value": "high"},
{"title": "极高 (max)", "value": "max"},
{"title": "超高 (xhigh)", "value": "xhigh"},
],
},
}
],
},
],
},
{
"component": "VRow",
"content": [
{
"component": "VCol",
"props": {
"cols": 12,
"md": 12,
},
"content": [
{
"component": "VSwitch",
"props": {
"model": "test_llm",
"label": "测试调用",
"hint": "启用后,请在插件日志查看测试结果",
"persistent-hint": True
},
}
],
},
]
}
],
},
],
@@ -883,7 +900,6 @@ class LexiAnnot(_PluginBase):
"ffmpeg_path": "",
"english_only": True,
"when_file_trans": True,
"model_temperature": "0.3",
"custom_files": "",
"accent_color": "",
"font_scaling": "1",
@@ -896,6 +912,8 @@ class LexiAnnot(_PluginBase):
"llm_base_url": "",
"use_mp_agent": False,
"use_proxy": False,
"test_llm": False,
"thinking_level": "off"
}
def get_api(self) -> List[Dict[str, Any]]:
@@ -1046,6 +1064,25 @@ class LexiAnnot(_PluginBase):
else:
logger.debug(" No running worker thread to stop.")
async def test_llm(self):
model_config = self.get_model_config()
try:
logger.info("测试 LLM 调用...")
result = await LLMHelper.test_current_settings(
provider=model_config.provider,
model=model_config.model_name,
thinking_level=model_config.thinking_level,
use_proxy=model_config.use_proxy,
base_url=model_config.base_url,
api_key=model_config.apikey
)
if not result.get("reply_preview"):
logger.warning("LLM 响应为空")
else:
logger.info(f"LLM 返回: {result['reply_preview']}")
except Exception as err:
logger.error(f"LLM 调用出错: {str(err)}")
def delete_data(self):
# 删除词典
data_path = self.get_data_path()
@@ -1156,7 +1193,6 @@ class LexiAnnot(_PluginBase):
"ffmpeg_path": self._ffmpeg_path,
"english_only": self._english_only,
"when_file_trans": self._when_file_trans,
"model_temperature": self._model_temperature,
"show_phonetics": self._show_phonetics,
"custom_files": self._custom_files,
"accent_color": self._accent_color,
@@ -1170,6 +1206,8 @@ class LexiAnnot(_PluginBase):
"llm_base_url": self._llm_base_url,
"use_mp_agent": self._use_mp_agent,
"use_proxy": self._use_proxy,
"test_llm": self._test_llm,
"thinking_level": self._thinking_level
}
)
@@ -1310,7 +1348,7 @@ class LexiAnnot(_PluginBase):
ffmpeg_path = self._ffmpeg_path if self._ffmpeg_path else "ffmpeg"
eng_mark = ["en", "en-US", "eng", "en-GB", "english", "en-AU"]
embedded_subtitles = LexiAnnot._extract_subtitles_by_lang(path, eng_mark, ffmpeg_path)
embedded_subtitles = SubtitleHelper.extract_subtitles_by_lang(path, eng_mark, ffmpeg_path)
if not embedded_subtitles:
return ProcessResult(
status=TaskStatus.CANCELED, message="未找到嵌入式英文文本字幕"
@@ -1332,7 +1370,7 @@ class LexiAnnot(_PluginBase):
return ProcessResult(status=TaskStatus.CANCELED, message="任务已取消")
ass_subtitle = SSAFile.from_string(embedded_subtitle["subtitle"], format_="ass")
if embedded_subtitle.get("codec_id") == "S_TEXT/UTF8":
ass_subtitle = LexiAnnot.set_srt_style(ass_subtitle)
ass_subtitle = SubtitleHelper.set_srt_style(ass_subtitle)
ass_subtitle = self.__set_style(ass_subtitle)
ass_subtitle, stat = self.process_subtitles(ass_subtitle, lexi, spacy_worker, mediainfo)
if self._shutdown_event.is_set():
@@ -1498,170 +1536,6 @@ class LexiAnnot(_PluginBase):
for new_path in transfer_info.file_list_new or []:
self.add_media_file(new_path)
@staticmethod
def format_duration(ms):
total_seconds, milliseconds = divmod(ms, 1000)
hours, remainder = divmod(total_seconds, 3600)
minutes, seconds = divmod(remainder, 60)
hundredths = milliseconds // 10
return f"{hours}:{minutes:02}:{seconds:02}.{hundredths:02}"
@staticmethod
def _remove_substring(replacements: list[dict]):
new_list = []
replacements.sort(key=lambda x: x["end"] - x["start"], reverse=True)
for r in replacements:
if any((r["start"] >= new["start"] and r["end"] <= new["end"]) for new in new_list):
continue
new_list.append(r)
return new_list
@staticmethod
def replace_by_plaintext_positions(line: SSAEvent, replacements: List[dict]):
"""
使用 replacements 中的 plaintext 位置信息, 替换 line.text 中的内容。
:param line: SSAEvent line
:param replacements: [{'start': int, 'end': int, 'old_text': str, 'new_text': str}, ...]
"""
text = line.text
tag_pattern = re.compile(r"{.*?}") # 匹配 {xxx} 格式控制符
special_pattern = re.compile(r"\\[Nh]")
# 构建 plaintext 位置到 text 索引的映射
mapping = {} # plaintext_index -> text_index
p_index = 0 # 当前 plaintext 索引
t_index = 0 # 当前 text 索引
while t_index < len(text):
if text[t_index] == "{":
# 跳过格式标签
match = tag_pattern.match(text, t_index)
if match:
t_index = match.end()
continue
elif text[t_index] == "\\":
match = special_pattern.match(text, t_index)
if match:
t_index = match.end() - 1
continue
# 非格式字符
mapping[p_index] = t_index
p_index += 1
t_index += 1
replacements = LexiAnnot._remove_substring(replacements)
# 按照 mapping 执行替换(倒序替换防止位置错位)
new_text = text
for r in sorted(replacements, key=lambda x: x["start"], reverse=True):
start = mapping.get(r["start"])
end = mapping.get(r["end"] - 1)
if start is None or end is None:
continue
end += 1
new_text = new_text[:start] + r["new_text"] + new_text[end:]
line.text = new_text
@staticmethod
def analyze_ass_language(ass_file: SSAFile):
def _replace_with_spaces(_text):
"""
使用等长的空格替换文本中的 (xxx) 模式。
例如:"(Hi)" 会被替换成 " " (4个空格)
"""
pattern = r"(\([^()]*\)|\[[^\[\]]*\])"
return re.sub(pattern, lambda match: " " * len(match.group(1)), _text)
styles = {}
for style in ass_file.styles:
styles[style] = {"text": [], "duration": 0, "text_size": 0, "times": 0}
for dialogue in ass_file:
style = dialogue.style
text = _replace_with_spaces(dialogue.plaintext)
sub_text = text.split("\n")
if style not in styles or not text:
continue
styles[style]["text"].extend(sub_text)
styles[style]["duration"] += dialogue.duration
styles[style]["text_size"] += len(text)
styles[style]["times"] += 1
style_language_analysis = {}
for style_name, data in styles.items():
all_text = " ".join(data["text"])
if not all_text.strip():
style_language_analysis[style_name] = None
continue
languages = []
# 对每个文本片段进行语言检测
for text_fragment in data["text"]:
try:
lang = detect(text_fragment)
languages.append(lang)
except Exception as e:
# 无法检测的文本
logger.debug(e)
pass
if languages:
language_counts = Counter(languages)
most_common_language = language_counts.most_common(1)[0]
style_language_analysis[style_name] = {
"main_language": most_common_language[0],
"proportion": most_common_language[1] / len(languages),
"duration": data["duration"],
"text_size": data["text_size"],
"times": data["times"],
}
else:
style_language_analysis[style_name] = None
return style_language_analysis
@staticmethod
def select_main_style_weighted(analysis: Dict[str, Any], known_language: str, weights = None):
"""
根据语言分析结果和已知的字幕语言,使用加权评分选择主要样式
:params analysis: `analyze_ass_language` 函数的输出结果
:params known_language: 已知的字幕语言代码
:params weights: 各个维度的权重,权重之和应为 1
:returns: 主要字幕的样式名称,如果没有匹配的样式则返回 None
"""
if weights is None:
weights = {"times": 0.5, "text_size": 0.4, "duration": 0.1}
matching_styles = []
max_times = max([analysis.get("times", 0) for _, analysis in analysis.items() if analysis] or [0]) or 1
max_text_size = max([analysis.get("text_size", 0) for _, analysis in analysis.items() if analysis] or [0]) or 1
max_duration = max([analysis.get("duration", 0) for _, analysis in analysis.items() if analysis] or [0]) or 1
for style, analysis in analysis.items():
if not analysis:
continue
if analysis.get("main_language") == known_language:
# 跳过多语言
if analysis.get("proportion", 0) < 0.5:
continue
score = 0
score += analysis.get("times", 0) * weights.get("times", 0) / max_times
score += analysis.get("text_size", 0) * weights.get("text_size", 0) / max_text_size
score += analysis.get("duration", 0) * weights.get("duration", 0) / max_duration
matching_styles.append((style, score))
if not matching_styles:
return None
sorted_styles = sorted(matching_styles, key=lambda item: item[1], reverse=True)
return sorted_styles[0][0]
@staticmethod
def set_srt_style(ass: SSAFile) -> SSAFile:
ass.info["ScaledBorderAndShadow"] = "no"
play_res_y = int(ass.info["PlayResY"])
if "Default" in ass.styles:
ass.styles["Default"].marginv = play_res_y // 16
ass.styles["Default"].fontname = "Microsoft YaHei"
ass.styles["Default"].fontsize = play_res_y // 16
return ass
def __set_style(self, ass: SSAFile) -> SSAFile:
font_scaling = (
float(self._font_scaling)
@@ -1747,107 +1621,25 @@ class LexiAnnot(_PluginBase):
ass.styles["Annotation EXAM"] = cefr_style
return ass
@staticmethod
def hex_to_rgb(hex_color: str | None) -> tuple[int, ...] | None:
if not hex_color:
return None
pattern = r"^#[0-9a-fA-F]{6}$"
if re.match(pattern, hex_color) is None:
return None
hex_color = hex_color.lstrip("#") # 去掉前面的 #
return tuple(int(hex_color[i: i + 2], 16) for i in (0, 2, 4))
@staticmethod
def __extract_subtitle(
video_path: str,
subtitle_stream_index: str,
ffmpeg_path: str = "ffmpeg",
sub_format="ass",
) -> Optional[str]:
if sub_format not in ["srt", "ass"]:
raise ValueError("Invalid subtitle format")
try:
map_parameter = f"0:s:{subtitle_stream_index}"
command = [ffmpeg_path, "-i", video_path, "-map", map_parameter, "-f", sub_format, "-"]
result = subprocess.run(
command, capture_output=True, text=True, encoding="utf-8", check=True
def get_model_config(self) -> LLMConfig:
if self._use_mp_agent:
return LLMConfig(
apikey=settings.LLM_API_KEY,
base_url=settings.LLM_BASE_URL,
model_name=settings.LLM_MODEL,
thinking_level=settings.LLM_THINKING_LEVEL,
provider=settings.LLM_PROVIDER.lower(),
use_proxy=settings.LLM_USE_PROXY
)
return result.stdout
except FileNotFoundError:
logger.warn(f"错误:找不到视频文件 '{video_path}'")
return None
except subprocess.CalledProcessError as e:
logger.warn(f"错误:提取字幕失败。\n错误信息:{e}")
logger.warn(
f"FFmpeg 输出 (stderr):\n{e.stderr.decode('utf-8', errors='ignore')}"
else:
return LLMConfig(
apikey=self._gemini_apikey,
base_url=self._llm_base_url,
model_name=self._gemini_model,
thinking_level=self._thinking_level,
provider=self._llm_provider.lower(),
use_proxy=self._use_proxy
)
return None
@staticmethod
def _extract_subtitles_by_lang(
video_path: str, lang: str | list = "en", ffmpeg: str = "ffmpeg"
) -> list[dict]:
"""
提取视频文件中的内嵌英文字幕,使用 MediaInfo 查找字幕流。
"""
def check_lang(track_lang: str) -> bool:
if isinstance(lang, list):
return track_lang in lang
return track_lang == lang
supported_codec = ["S_TEXT/UTF8", "S_TEXT/ASS", "tx3g"]
subtitles = []
try:
media_info: pymediainfo.MediaInfo = pymediainfo.MediaInfo.parse(video_path)
for track in media_info.tracks:
if (
track.track_type == "Text"
and check_lang(track_lang=track.language)
and track.codec_id in supported_codec
):
subtitle_stream_index = (
track.stream_identifier
) # MediaInfo 的 stream_id 从 1 开始ffmpeg 从 0 开始
extracted_subtitle = LexiAnnot.__extract_subtitle(
video_path, subtitle_stream_index, ffmpeg
)
duration = 0
if hasattr(track, "duration"):
try:
duration = int(float(track.duration))
except (ValueError, TypeError):
pass
if extracted_subtitle:
subtitles.append(
{
"title": track.title or "",
"subtitle": extracted_subtitle,
"codec_id": track.codec_id,
"stream_id": subtitle_stream_index,
"duration": duration,
}
)
if subtitles:
# remove outliers with abnormally short duration
if len(subtitles) > 1:
durations = [sub["duration"] for sub in subtitles if sub["duration"] > 0]
if durations:
avg_duration = sum(durations) / len(durations)
subtitles = [
sub for sub in subtitles if sub["duration"] >= avg_duration * 0.2
]
if not subtitles:
logger.warn("未找到标记为英语的文本字幕流")
except FileNotFoundError:
logger.error(f"找不到视频文件 '{video_path}'")
except subprocess.CalledProcessError as e:
logger.error(f"错误:提取字幕失败。\n错误信息:{e}")
logger.error(f"FFmpeg 输出 (stderr):\n{e.stderr}")
except Exception as e:
logger.error(f"使用 MediaInfo 提取字幕时发生错误:{e}")
return subtitles
def _process_chain(
self,
@@ -1867,7 +1659,6 @@ class LexiAnnot(_PluginBase):
CEFR_LEVELS = ["A1", "A2", "B1", "B2", "C1", "C2"]
simple_vocabulary = set(filter(lambda x: x < self._annot_level, CEFR_LEVELS))
learner_level = max(simple_vocabulary)
model_temperature = float(self._model_temperature) if self._model_temperature else 0.3
logger.info("通过 spaCy 分词...")
for seg in segments:
if self._shutdown_event.is_set():
@@ -1879,25 +1670,19 @@ class LexiAnnot(_PluginBase):
simple_level=simple_vocabulary
)
if self._gemini_available:
if self._use_mp_agent:
llm_apikey = settings.LLM_API_KEY
llm_base_url = settings.LLM_BASE_URL
llm_model_name = settings.LLM_MODEL
llm_provider = settings.LLM_PROVIDER.lower()
else:
llm_apikey = self._gemini_apikey
llm_base_url = self._llm_base_url
llm_model_name = self._gemini_model
llm_provider = self._llm_provider.lower()
llm = initialize_llm(
provider=llm_provider,
model_name=llm_model_name,
base_url=llm_base_url,
api_key=llm_apikey or '',
temperature=model_temperature,
max_retries=self._max_retries,
proxy=self._use_proxy,
)
llm_config = self.get_model_config()
llm = asyncio.run_coroutine_threadsafe(
LLMHelper.get_llm(
provider=llm_config.provider,
model=llm_config.model_name,
thinking_level=llm_config.thinking_level,
api_key=llm_config.apikey,
base_url=llm_config.base_url,
use_proxy=llm_config.use_proxy
),
global_vars.loop
).result()
segments = llm_process_chain(
lexi=lexi,
llm=llm,
@@ -1926,8 +1711,8 @@ class LexiAnnot(_PluginBase):
f"{self._accent_color_rgb[1]:02x}{self._accent_color_rgb[0]:02x}&"
) # &H00FFFFFF&
statistical_res = LexiAnnot.analyze_ass_language(ass_file)
main_style: str | None = LexiAnnot.select_main_style_weighted(statistical_res, lang)
statistical_res = SubtitleHelper.analyze_ass_language(ass_file)
main_style: str | None = SubtitleHelper.select_main_style_weighted(statistical_res, lang)
if not main_style:
logger.error("无法确定主要字幕样式")
return None, None
@@ -2004,7 +1789,7 @@ class LexiAnnot(_PluginBase):
"new_text": new_text,
}
replacements.append(replacement)
LexiAnnot.replace_by_plaintext_positions(
SubtitleHelper.replace_by_plaintext_positions(
main_processor[seg.index], replacements
)
if self._sentence_translation:

View File

@@ -4,9 +4,7 @@ import threading
from langchain_core.language_models.chat_models import BaseChatModel
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import PydanticOutputParser
from pydantic import SecretStr
from app.core.config import settings
from app.schemas import Context
from app.schemas.types import MediaType
from app.log import logger
@@ -60,59 +58,6 @@ UNIVERSAL_POS_MAP: dict[UniversalPos, str | None] = {
}
def initialize_llm(
provider: str,
api_key: str,
model_name: str,
base_url: str | None,
temperature: float = 0.1,
max_retries: int = 3,
proxy: bool = False,
) -> BaseChatModel:
"""初始化 LLM"""
if provider == "google":
if proxy:
from langchain_openai import ChatOpenAI
return ChatOpenAI(
model=settings.LLM_MODEL,
api_key=SecretStr(api_key),
max_retries=3,
base_url="https://generativelanguage.googleapis.com/v1beta/openai",
temperature=settings.LLM_TEMPERATURE,
openai_proxy=settings.PROXY_HOST,
)
from langchain_google_genai import ChatGoogleGenerativeAI
return ChatGoogleGenerativeAI(
model=model_name,
google_api_key=api_key, # noqa
max_retries=max_retries,
temperature=temperature,
)
elif provider == "deepseek":
from langchain_deepseek import ChatDeepSeek
return ChatDeepSeek(
model=model_name,
api_key=SecretStr(api_key),
max_retries=max_retries,
temperature=temperature,
)
else:
from langchain_openai import ChatOpenAI
return ChatOpenAI(
model=model_name,
api_key=SecretStr(api_key),
max_retries=max_retries,
base_url=base_url,
temperature=temperature,
openai_proxy=settings.PROXY_HOST if proxy else None,
)
def convert_pos_to_spacy(pos: str):
"""
将给定的词性列表转换为 spaCy 库中使用的词性标签
@@ -727,5 +672,4 @@ def llm_process_chain(
lexi, llm, context, start, end, learner_level, media_name, translate_sentences
)
)
return SegmentList(root=segments_list)

View File

@@ -365,3 +365,12 @@ class VocabularyAnnotatingToolInput(BaseModel):
class QueryAnnotationTasksToolInput(BaseModel):
count: int = Field(default=5, description="The maximum number of returned annotation tasks")
explanation: str = Field(..., description="This is a tool for querying the latest annotation tasks in AnnotLexi")
class LLMConfig(BaseModel):
apikey: str
provider: str
model_name: str
thinking_level: str | None = Field(default=None)
base_url: str | None = Field(default=None)
use_proxy: bool = Field(default=False)

View File

@@ -1,10 +1,277 @@
import re
import subprocess
from collections import Counter
from typing import Generator, Any, overload
from pysubs2 import SSAEvent
import pymediainfo
from langdetect import detect
from pysubs2 import SSAEvent, SSAFile
from app.log import logger
from .schemas import SubtitleSegment
class SubtitleHelper:
@staticmethod
def remove_substring(replacements: list[dict]):
new_list = []
replacements.sort(key=lambda x: x["end"] - x["start"], reverse=True)
for r in replacements:
if any((r["start"] >= new["start"] and r["end"] <= new["end"]) for new in new_list):
continue
new_list.append(r)
return new_list
@staticmethod
def analyze_ass_language(ass_file: SSAFile):
def _replace_with_spaces(_text):
"""
使用等长的空格替换文本中的 (xxx) 模式。
例如:"(Hi)" 会被替换成 " " (4个空格)
"""
pattern = r"(\([^()]*\)|\[[^\[\]]*\])"
return re.sub(pattern, lambda match: " " * len(match.group(1)), _text)
styles = {}
for style in ass_file.styles:
styles[style] = {"text": [], "duration": 0, "text_size": 0, "times": 0}
for dialogue in ass_file:
style = dialogue.style
text = _replace_with_spaces(dialogue.plaintext)
sub_text = text.split("\n")
if style not in styles or not text:
continue
styles[style]["text"].extend(sub_text)
styles[style]["duration"] += dialogue.duration
styles[style]["text_size"] += len(text)
styles[style]["times"] += 1
style_language_analysis = {}
for style_name, data in styles.items():
all_text = " ".join(data["text"])
if not all_text.strip():
style_language_analysis[style_name] = None
continue
languages = []
# 对每个文本片段进行语言检测
for text_fragment in data["text"]:
try:
lang = detect(text_fragment)
languages.append(lang)
except Exception as e:
# 无法检测的文本
logger.debug(e)
if languages:
language_counts = Counter(languages)
most_common_language = language_counts.most_common(1)[0]
style_language_analysis[style_name] = {
"main_language": most_common_language[0],
"proportion": most_common_language[1] / len(languages),
"duration": data["duration"],
"text_size": data["text_size"],
"times": data["times"],
}
else:
style_language_analysis[style_name] = None
return style_language_analysis
@staticmethod
def select_main_style_weighted(analysis: dict[str, Any], known_language: str, weights = None):
"""
根据语言分析结果和已知的字幕语言,使用加权评分选择主要样式
:params analysis: `analyze_ass_language` 函数的输出结果
:params known_language: 已知的字幕语言代码
:params weights: 各个维度的权重,权重之和应为 1
:returns: 主要字幕的样式名称,如果没有匹配的样式则返回 None
"""
if weights is None:
weights = {"times": 0.5, "text_size": 0.4, "duration": 0.1}
matching_styles = []
max_times = max([analysis.get("times", 0) for _, analysis in analysis.items() if analysis] or [0]) or 1
max_text_size = max([analysis.get("text_size", 0) for _, analysis in analysis.items() if analysis] or [0]) or 1
max_duration = max([analysis.get("duration", 0) for _, analysis in analysis.items() if analysis] or [0]) or 1
for style, info in analysis.items():
if not info:
continue
if info.get("main_language") == known_language:
# 跳过多语言
if info.get("proportion", 0) < 0.5:
continue
score = 0
score += info.get("times", 0) * weights.get("times", 0) / max_times
score += info.get("text_size", 0) * weights.get("text_size", 0) / max_text_size
score += info.get("duration", 0) * weights.get("duration", 0) / max_duration
matching_styles.append((style, score))
if not matching_styles:
return None
sorted_styles = sorted(matching_styles, key=lambda item: item[1], reverse=True)
return sorted_styles[0][0]
@staticmethod
def set_srt_style(ass: SSAFile) -> SSAFile:
ass.info["ScaledBorderAndShadow"] = "no"
play_res_y = int(ass.info["PlayResY"])
if "Default" in ass.styles:
ass.styles["Default"].marginv = play_res_y // 16
ass.styles["Default"].fontname = "Microsoft YaHei"
ass.styles["Default"].fontsize = play_res_y // 16
return ass
@staticmethod
def __extract_subtitle(
video_path: str,
subtitle_stream_index: str,
ffmpeg_path: str = "ffmpeg",
sub_format="ass",
) -> str | None:
if sub_format not in ["srt", "ass"]:
raise ValueError("Invalid subtitle format")
try:
map_parameter = f"0:s:{subtitle_stream_index}"
command = [ffmpeg_path, "-i", video_path, "-map", map_parameter, "-f", sub_format, "-"]
result = subprocess.run(
command, capture_output=True, text=True, encoding="utf-8", check=True
)
return result.stdout
except FileNotFoundError:
logger.warn(f"错误:找不到视频文件 '{video_path}'")
return None
except subprocess.CalledProcessError as e:
logger.warn(f"错误:提取字幕失败。\n错误信息:{e}")
logger.warn(
f"FFmpeg 输出 (stderr):\n{e.stderr.decode('utf-8', errors='ignore')}"
)
return None
@staticmethod
def extract_subtitles_by_lang(
video_path: str, lang: str | list = "en", ffmpeg: str = "ffmpeg"
) -> list[dict]:
"""
提取视频文件中的内嵌英文字幕,使用 MediaInfo 查找字幕流。
"""
def check_lang(track_lang: str) -> bool:
if isinstance(lang, list):
return track_lang in lang
return track_lang == lang
supported_codec = ["S_TEXT/UTF8", "S_TEXT/ASS", "tx3g"]
subtitles = []
try:
media_info: pymediainfo.MediaInfo = pymediainfo.MediaInfo.parse(video_path)
for track in media_info.tracks:
if (
track.track_type == "Text"
and check_lang(track_lang=track.language)
and track.codec_id in supported_codec
):
subtitle_stream_index = (
track.stream_identifier
) # MediaInfo 的 stream_id 从 1 开始ffmpeg 从 0 开始
extracted_subtitle = SubtitleHelper.__extract_subtitle(
video_path, subtitle_stream_index, ffmpeg
)
duration = 0
if hasattr(track, "duration"):
try:
duration = int(float(track.duration))
except (ValueError, TypeError):
pass
if extracted_subtitle:
subtitles.append(
{
"title": track.title or "",
"subtitle": extracted_subtitle,
"codec_id": track.codec_id,
"stream_id": subtitle_stream_index,
"duration": duration,
}
)
if subtitles:
# remove outliers with abnormally short duration
if len(subtitles) > 1:
durations = [sub["duration"] for sub in subtitles if sub["duration"] > 0]
if durations:
avg_duration = sum(durations) / len(durations)
subtitles = [
sub for sub in subtitles if sub["duration"] >= avg_duration * 0.2
]
if not subtitles:
logger.warn("未找到标记为英语的文本字幕流")
except FileNotFoundError:
logger.error(f"找不到视频文件 '{video_path}'")
except subprocess.CalledProcessError as e:
logger.error(f"错误:提取字幕失败。\n错误信息:{e}")
logger.error(f"FFmpeg 输出 (stderr):\n{e.stderr}")
except Exception as e:
logger.error(f"使用 MediaInfo 提取字幕时发生错误:{e}")
return subtitles
@staticmethod
def replace_by_plaintext_positions(line: SSAEvent, replacements: list[dict]):
"""
使用 replacements 中的 plaintext 位置信息, 替换 line.text 中的内容。
:param line: SSAEvent line
:param replacements: [{'start': int, 'end': int, 'old_text': str, 'new_text': str}, ...]
"""
text = line.text
tag_pattern = re.compile(r"{.*?}") # 匹配 {xxx} 格式控制符
special_pattern = re.compile(r"\\[Nh]")
# 构建 plaintext 位置到 text 索引的映射
mapping = {} # plaintext_index -> text_index
p_index = 0 # 当前 plaintext 索引
t_index = 0 # 当前 text 索引
while t_index < len(text):
if text[t_index] == "{":
# 跳过格式标签
match = tag_pattern.match(text, t_index)
if match:
t_index = match.end()
continue
elif text[t_index] == "\\":
match = special_pattern.match(text, t_index)
if match:
t_index = match.end() - 1
continue
# 非格式字符
mapping[p_index] = t_index
p_index += 1
t_index += 1
replacements = SubtitleHelper.remove_substring(replacements)
# 按照 mapping 执行替换(倒序替换防止位置错位)
new_text = text
for r in sorted(replacements, key=lambda x: x["start"], reverse=True):
start = mapping.get(r["start"])
end = mapping.get(r["end"] - 1)
if start is None or end is None:
continue
end += 1
new_text = new_text[:start] + r["new_text"] + new_text[end:]
line.text = new_text
@staticmethod
def hex_to_rgb(hex_color: str | None) -> tuple[int, ...] | None:
if not hex_color:
return None
pattern = r"^#[0-9a-fA-F]{6}$"
if re.match(pattern, hex_color) is None:
return None
hex_color = hex_color.lstrip("#") # 去掉前面的 #
return tuple(int(hex_color[i: i + 2], 16) for i in (0, 2, 4))
class SubtitleProcessor:
def __init__(self):
self._events: list[SSAEvent] = []

View File

@@ -1,7 +1,7 @@
from datetime import datetime, timedelta
from pathlib import Path
from threading import Event
from typing import List, Tuple, Dict, Any
from typing import List, Tuple, Dict, Any, Optional
import pytz
from apscheduler.schedulers.background import BackgroundScheduler
@@ -27,7 +27,7 @@ class LibraryScraper(_PluginBase):
# 插件图标
plugin_icon = "scraper.png"
# 插件版本
plugin_version = "2.1.1"
plugin_version = "2.1.3"
# 插件作者
plugin_author = "jxxghp"
# 作者主页
@@ -51,6 +51,9 @@ class LibraryScraper(_PluginBase):
_exclude_paths = ""
# 退出事件
_event = Event()
# 刮削目标类型
_target_dir = "dir"
_target_file = "file"
def init_plugin(self, config: dict = None):
@@ -302,7 +305,7 @@ class LibraryScraper(_PluginBase):
exclude_paths = self._exclude_paths.split("\n")
# 已选择的目录
paths = self._scraper_paths.split("\n")
# 需要削的媒体文件
# 需要削的媒体目录或文件
scraper_paths = []
for path in paths:
if not path:
@@ -339,38 +342,116 @@ class LibraryScraper(_PluginBase):
if exclude_flag:
logger.debug(f"{file_path} 在排除目录中,跳过 ...")
continue
# 识别是电影还是电视剧
if not mtype:
file_meta = MetaInfoPath(file_path)
mtype = file_meta.type
# 重命名格式
rename_format = settings.TV_RENAME_FORMAT \
if mtype == MediaType.TV else settings.MOVIE_RENAME_FORMAT
# 计算重命名中的文件夹层数
rename_format_level = len(rename_format.split("/")) - 1
if rename_format_level < 1:
if mtype and not self.__match_forced_type_path(
file_path=file_path,
scraper_path=scraper_path,
mtype=mtype
):
logger.debug(f"{file_path} 不属于强制指定的{mtype.value}目录,跳过 ...")
continue
# 取相对路径的第1层目录
media_path = file_path.parents[rename_format_level - 1]
dir_item = (media_path, mtype)
if dir_item not in scraper_paths:
logger.info(f"发现目录:{dir_item}")
scraper_paths.append(dir_item)
# 识别是电影还是电视剧,强制类型只作为默认值,不污染后续文件识别结果
file_meta = MetaInfoPath(file_path)
file_mtype = mtype
if not file_mtype:
file_mtype = file_meta.type
if file_mtype == MediaType.UNKNOWN:
file_mtype = self.__infer_type_from_path(file_path=file_path, scraper_path=scraper_path)
scraper_item = self.__get_scrape_item(
file_path=file_path,
scraper_path=scraper_path,
mtype=file_mtype,
tmdbid=file_meta.tmdbid
)
if scraper_item and not self.__contains_scrape_item(scraper_paths, scraper_item):
logger.info(f"发现刮削目标:{scraper_item}")
scraper_paths.append(scraper_item)
# 开始刮削
if scraper_paths:
for item in scraper_paths:
logger.info(f"开始刮削目{item[0]} ...")
self.__scrape_dir(path=item[0], mtype=item[1])
logger.info(f"开始刮削目{item[0]} ...")
self.__scrape_path(path=item[0], mtype=item[1], target_type=item[2], tmdbid=item[3])
else:
logger.info(f"未发现需要刮削的目录")
def __scrape_dir(self, path: Path, mtype: MediaType):
@staticmethod
def __get_scrape_item(
file_path: Path,
scraper_path: Path,
mtype: MediaType,
tmdbid: Optional[int] = None
) -> Optional[Tuple[Path, MediaType, str, Optional[int]]]:
"""
削刮一个目录,该目录必须是媒体文件目录
根据扫描根目录和重命名格式,计算真正需要刮削的媒体目录
分类目录通常位于扫描根目录下方,必须用相对路径计算,否则会被误当成媒体目录。
"""
# 优先读取本地nfo文件
tmdbid = None
if mtype == MediaType.MOVIE:
if not file_path or not scraper_path or not mtype:
return None
rename_format = settings.TV_RENAME_FORMAT if mtype == MediaType.TV else settings.MOVIE_RENAME_FORMAT
rename_format_level = len(rename_format.strip("/").split("/")) - 1
try:
relative_path = file_path.relative_to(scraper_path)
except ValueError:
relative_path = Path(file_path.name)
if rename_format_level >= 1:
relative_parts = Path(relative_path).parts
# 重命名格式中包含几层目录,就从文件往上取几层目录;前缀分类目录不会参与计算。
if len(relative_parts) > rename_format_level:
media_path = scraper_path.joinpath(*relative_parts[:-rename_format_level])
return media_path, mtype, LibraryScraper._target_dir, tmdbid
# 扁平目录或自定义重命名格式无目录层级时,退回到单文件刮削,避免分类目录识别失败。
return file_path, mtype, LibraryScraper._target_file, tmdbid
@staticmethod
def __contains_scrape_item(scraper_paths: List[Tuple[Path, MediaType, str, Optional[int]]],
scraper_item: Tuple[Path, MediaType, str, Optional[int]]) -> bool:
"""
判断刮削目标是否已存在同一目标只刮削一次tmdbid 仅作为识别辅助信息。
"""
return any(item[:3] == scraper_item[:3] for item in scraper_paths)
@staticmethod
def __match_forced_type_path(file_path: Path, scraper_path: Path, mtype: MediaType) -> bool:
"""
强制指定媒体类型时,如果扫描根目录下同时存在“电影/电视剧”分类,则只处理匹配类型的目录。
"""
if mtype not in (MediaType.MOVIE, MediaType.TV):
return True
try:
relative_parts = file_path.relative_to(scraper_path).parts
except ValueError:
return True
media_type_parts = {MediaType.MOVIE.value, MediaType.TV.value}.intersection(relative_parts)
return not media_type_parts or mtype.value in media_type_parts
@staticmethod
def __infer_type_from_path(file_path: Path, scraper_path: Path) -> MediaType:
"""
文件名无法识别类型时,从扫描根目录下的“电影/电视剧”分类层推断媒体类型。
"""
try:
relative_parts = file_path.relative_to(scraper_path).parts
except ValueError:
relative_parts = file_path.parts
if MediaType.TV.value in relative_parts:
return MediaType.TV
if MediaType.MOVIE.value in relative_parts:
return MediaType.MOVIE
return MediaType.UNKNOWN
def __scrape_path(self, path: Path, mtype: MediaType, target_type: str = _target_dir,
tmdbid: Optional[int] = None):
"""
刮削一个媒体目录或媒体文件
"""
# 优先读取本地nfo文件文件路径中解析出的 tmdbid 作为兜底识别信息保留。
if target_type == self._target_file:
nfo_path = path.with_suffix(".nfo")
if nfo_path.exists():
tmdbid = self.__get_tmdbid_from_nfo(nfo_path)
elif mtype == MediaType.MOVIE:
# 电影
movie_nfo = path / "movie.nfo"
if movie_nfo.exists():
@@ -393,6 +474,10 @@ class LibraryScraper(_PluginBase):
meta.type = mtype
mediainfo = self.chain.recognize_media(meta=meta)
if not mediainfo:
if target_type == self._target_dir:
# 目录名无法识别时,通常是分类目录,继续尝试其中的具体媒体文件。
self.__scrape_child_files(path=path, mtype=mtype)
return
logger.warn(f"未识别到媒体信息:{path}")
return
@@ -405,13 +490,17 @@ class LibraryScraper(_PluginBase):
# 获取图片
self.chain.obtain_images(mediainfo)
# 刮削
item_path = str(path).replace("\\", "/")
if target_type == self._target_dir:
item_path = f"{item_path}/"
MediaChain().scrape_metadata(
fileitem=schemas.FileItem(
storage="local",
type="dir",
path=str(path).replace("\\", "/") + "/",
type=target_type,
path=item_path,
name=path.name,
basename=path.stem,
extension=path.suffix[1:] if target_type == self._target_file else None,
modify_time=path.stat().st_mtime,
),
mediainfo=mediainfo,
@@ -419,6 +508,26 @@ class LibraryScraper(_PluginBase):
)
logger.info(f"{path} 刮削完成")
def __scrape_child_files(self, path: Path, mtype: MediaType):
"""
分类目录无法作为单个媒体识别时,继续按目录内的媒体文件逐个刮削。
"""
child_files = SystemUtils.list_files(path, settings.RMT_MEDIAEXT)
if not child_files:
logger.warn(f"未识别到媒体信息:{path}")
return
logger.info(f"{path} 可能是分类目录,开始刮削目录内媒体文件 ...")
for child_file in child_files:
if self._event.is_set():
logger.info(f"媒体库刮削服务停止")
return
child_mtype = mtype
child_meta = MetaInfoPath(child_file)
if not child_mtype:
child_mtype = child_meta.type
self.__scrape_path(path=child_file, mtype=child_mtype, target_type=self._target_file,
tmdbid=child_meta.tmdbid)
@staticmethod
def __get_tmdbid_from_nfo(file_path: Path):
"""

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,821 @@
import random
import re
import time
import uuid
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Tuple
from urllib.parse import urlparse, parse_qs
import requests
from apscheduler.triggers.cron import CronTrigger
from fastapi.responses import FileResponse
from app.core.config import settings
from app.core.event import eventmanager, Event
from app.log import logger
from app.plugins import _PluginBase
from app.schemas.types import EventType
class UpdateWeChatIp(_PluginBase):
# 插件在界面中的展示名称
plugin_name = "动态企微可信IP"
# 插件描述
plugin_desc = "修改企微应用可信IP可本地扫码刷新Cookie"
# 插件图标
plugin_icon = "Wecom_A.png"
# 插件版本,必须和 package.v2.json 中保持一致
plugin_version = "1.0.8"
# 作者信息
plugin_author = "书小白"
author_url = "https://github.com/thshu/MoviePilot-Plugins"
# 配置项前缀,建议保持唯一,避免与其他插件冲突
plugin_config_prefix = "UpdateWeChatIp_"
# 插件加载顺序,数值越小越早
plugin_order = 50
# 插件可见权限级别
auth_level = 1
# 运行时状态字段
_enabled = False
_se = None
_qrcode_key = None
_tl_key = None
_captcha = {}
_wwrtx_sid = None
_party_cache_data = None
_app_id = ""
_ip = None
_is_login = False
onlyonce = False
_cron = ""
_UpdateLogKey = 'UpdateLog'
_ip_urls = ["https://myip.ipip.net", "https://ddns.oray.com/checkip", "https://ip.3322.net", "https://4.ipw.cn",
'http://v4.666666.host:66/ip', 'https://ipv4.ddnspod.com', 'https://v4.66666.host:66/ip',
'https://4.ipw.cn', 'https://ip.3322.net', 'https://6.66666.host:66/ip']
_ip_pattern = r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b'
_headers = {
'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/148.0.0.0 Safari/537.36",
'Accept-Encoding': "gzip, deflate, br, zstd",
'pragma': "no-cache",
'cache-control': "no-cache",
'sec-ch-ua-platform': "\"Windows\"",
'x-requested-with': "XMLHttpRequest",
'sec-ch-ua': "\"Chromium\";v=\"148\", \"Google Chrome\";v=\"148\", \"Not/A)Brand\";v=\"99\"",
'sec-ch-ua-mobile': "?0",
'sec-fetch-site': "same-origin",
'sec-fetch-mode': "cors",
'sec-fetch-dest': "empty",
'referer': "https://work.weixin.qq.com/wework_admin/wwqrlogin/mng/login_qrcode",
'accept-language': "zh-CN,zh;q=0.9,ja;q=0.8,en;q=0.7",
'priority': "u=1, i",
}
def init_plugin(self, config: dict = None):
"""根据当前配置初始化插件。"""
config = config or {}
self._enabled = bool(config.get("_enabled"))
self._wwrtx_sid = config.get("_wwrtx_sid")
self._app_id = config.get("_app_id")
self._cron = config.get("_cron")
self._party_cache_data = config.get("_party_cache_data")
self._se = requests.Session()
self._se.cookies.set('wwrtx.sid', self._wwrtx_sid)
def _save_current_config(self):
self._login_success()
def get_state(self) -> bool:
"""返回插件当前是否启用。"""
return self._enabled
def get_service(self) -> List[Dict[str, Any]]:
if self._enabled and self._cron:
return [
{
"id": self.__class__.__name__,
"name": f"{self.__class__.__name__}_{self.plugin_name}服务",
"trigger": CronTrigger.from_crontab(self._cron),
"func": self.check,
"kwargs": {}
},
]
return []
@staticmethod
def get_command() -> List[Dict[str, Any]]:
"""
注册插件远程命令
"""
return [{
"cmd": "/update_wechat_ip",
"event": EventType.PluginAction,
"desc": "获取企业微信二维码",
"category": "获取企业微信二维码",
"data": {
"action": "update_wechat_ip"
}
}
]
@eventmanager.register(EventType.PluginAction)
def command_action(self, event: Event):
"""
远程命令响应
"""
event_data = event.event_data
if not event_data or event_data.get("action") not in [i['data']['action'] for i in self.get_command()]:
return
# 获取用户信息
channel = event_data.get("channel")
arg_str = event_data.get("arg_str")
source = event_data.get("source")
user = event_data.get("user")
if arg_str is not None:
if arg_str == '扫码完成':
self._login(channel, user)
elif len(re.findall('[0-9]', arg_str)) == 6:
self._captcha[self._qrcode_key] = arg_str
self._confirm_captcha(self._tl_key, self._captcha.get(self._qrcode_key))
self._wwrtx_sid = self._se.cookies.get_dict().get('wwrtx.sid')
if self._party_cache():
self._login_success()
self.post_message(
channel=channel,
title="登录成功",
userid=user,
text=f"成功登录企业:{self._party_cache_data.get('party_list', {}).get('list', [{}])[0].get('name')}",
)
else:
self.post_message(
channel=channel,
title="登录失败",
userid=user,
text=f"登录失败,返回值:{self._party_cache_data}",
)
else:
self.post_message(
channel=channel,
title="无效的输入",
userid=user,
content="无效的输入",
)
else:
# 初始化变量
self._qrcode_key = None
self._tl_key = None
self._captcha = {}
self._qrcode_key = self._get_key()
image_url = self._qrcode(self._qrcode_key)
self.post_message(
channel=channel,
title="登录二维码",
text='\n'.join(
[
"请选择要执行的操作:",
f"如果按钮不可用,可回复:\n```\n/update_wechat_ip 扫码完成\n```"
]
),
userid=user,
buttons=[[{"text": f'扫码完成',
"callback_data": f"[PLUGIN]{self.__class__.__name__}|扫码完成|{self._qrcode_key}"}]],
image=image_url
)
@eventmanager.register(EventType.MessageAction)
def message_action(self, event: Event):
"""
处理消息按钮回调
"""
event_data = event.event_data
if not event_data:
return
# 检查是否为本插件的回调
plugin_id = event_data.get("plugin_id")
if plugin_id != self.__class__.__name__:
return
# 获取回调数据
channel = event_data.get("channel")
source = event_data.get("source")
userid = event_data.get("userid")
# 获取原始消息ID和聊天ID用于直接更新原消息
original_message_id = event_data.get("original_message_id")
original_chat_id = event_data.get("original_chat_id")
callback_text = event_data.get("text", "")
if "|" not in callback_text:
self.post_message(
channel=channel,
title="登录失败",
userid=userid,
text=f"未获取到本地登录对应的qrcode_key",
)
return
text, qrcode_key = callback_text.split("|", 1)
if text == "扫码完成":
self._qrcode_key = qrcode_key
self._login(channel, userid)
if text == "输入完毕":
self._confirm_captcha(self._tl_key, self._captcha.get(self._qrcode_key))
if self._party_cache():
self._login_success()
self.post_message(
channel=channel,
title="登录成功",
userid=userid,
text=f"成功登录企业:{self._party_cache_data.get('party_list', {}).get('list', [{}])[0].get('name')}",
)
else:
self.post_message(
channel=channel,
title="登录失败",
userid=userid,
text=f"登录失败,返回值:{self._party_cache_data}",
)
elif len(re.findall('[0-9]', text)) != 0:
if qrcode_key not in self._captcha.keys():
self._captcha[qrcode_key] = ""
self._captcha[qrcode_key] += text
self.post_message(
channel=channel,
title="短信验证码",
userid=userid,
buttons=self._get_buttons(),
text='\n'.join(
[
"触发验证码:",
f"验证码内容:{self._captcha[qrcode_key]}\n"
f"如果按钮不可用,可回复:\n```\n/update_wechat_ip 验证码内容\n```"
]
),
original_message_id=original_message_id,
original_chat_id=original_chat_id
)
else:
self.post_message(
channel=channel,
title="无效的输入",
userid=userid,
content="无效的输入",
)
def get_api(self) -> List[Dict[str, Any]]:
"""没有插件 API 时直接返回空列表。"""
return [
{
"path": "/img/{uuid}",
"endpoint": self.get_img,
"methods": ["GET"],
# 前端插件页面通过 api 模块调用时,通常使用 bear
"auth": "apikey",
"summary": "获取图片",
"description": "获取图片",
},
{
"path": "/UpdateIP",
"endpoint": self.UpdateIp,
"methods": ["GET"],
# 前端插件页面通过 api 模块调用时,通常使用 bear
"auth": "apikey",
"summary": "更新企业微信IP白名单",
"description": "更新企业微信IP白名单,需要传递查询参数,参数名为:ip",
},
]
def UpdateIp(self, ip):
self._ip = ip
self._save_ip_config()
def get_img(self, uuid):
save_path: Path = self.get_data_path() / f"WeChatQr.jpg"
return FileResponse(
save_path,
media_type="image/jpeg"
)
def get_form(self) -> Tuple[List[dict], Dict[str, Any]]:
"""返回配置页 JSON 和默认配置模型。"""
return [
{
'component': 'VForm',
'content': [
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 4
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': '_enabled',
'label': '启用插件',
}
}
]
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 4
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'onlyonce',
'label': '立即检测一次',
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 6
},
'content': [
{
'component': 'VTextField',
'props': {
'model': '_cron',
'label': '[必填]检测周期',
'placeholder': '*/10 * * * *'
}
}
]
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 6
},
'content': [
{
'component': 'VTextarea',
'props': {
'model': '_app_id',
'label': '[必填]应用ID',
'rows': 1,
'placeholder': '输入应用ID,多个使用(,)英文逗号隔开,在企业微信应用页面URL末尾获取'
}
}
]
}
]
}
]
}
], {
"_enabled": False,
"_wwrtx_sid": "",
"_app_id": "",
"_party_cache_data": {},
"_cron": '*/10 * * * *'
}
def get_page(self) -> List[dict]:
"""返回详情页 JSON。"""
# ---------- 获取并排序更新日志 ----------
raw_data = self.get_data(self._UpdateLogKey) or []
update_log: List[UpdateLogDto] = [UpdateLogDto.from_dict(i) for i in raw_data]
data_list = sorted(update_log, key=lambda x: x.UpdateTime, reverse=True)
update_log_trs = [
{
"component": "tr",
"props": {"class": "text-sm"},
"content": [
{
"component": "td",
"props": {
"style": {"color": "red"} if not data.status else {}
},
"text": "成功" if data.status else "失败",
},
{"component": "td", "text": data.app_id},
{"component": "td", "text": data.ip},
{"component": "td", "text": data.result},
{"component": "td",
"text": data.UpdateTime.strftime('%Y-%m-%d %H:%M:%S') if data.UpdateTime else ""},
],
}
for data in data_list
]
# ---------- 安全获取 party 名称 ----------
party_cache = self._party_cache_data or {}
party_list = party_cache.get("party_list", {}).get("list") or [{}]
party_name = party_list[0].get("name", "未知")
# ---------- 构建页面结构 ----------
return [
{
"component": "VRow",
"content": [
{
"component": "VCol",
"props": {"cols": 12},
"content": [
# 顶部状态标题
{
"component": "div",
"props": {
"style": {
"display": "flex",
"justifyContent": "center",
"alignItems": "center",
"flexDirection": "column",
"gap": "10px",
"marginBottom": "20px", # 增加与表格的间距
}
},
"content": [
{
"component": "div",
"text": f"{party_name}已登录" if self._is_login else "登录失效",
"props": {
"style": {
"fontSize": "22px",
"fontWeight": "bold",
"color": "#ffffff",
"backgroundColor": "#9B50FF",
"padding": "8px 16px",
"borderRadius": "5px",
"textAlign": "center",
"display": "inline-block",
}
},
}
],
},
# 日志表格
{
"component": "VTable",
"props": {"hover": True},
"content": [
{
"component": "thead",
"props": {"class": "text-no-wrap"},
"content": [
{
"component": "th",
"props": {"class": "text-start ps-4"},
"text": "状态",
},
{
"component": "th",
"props": {"class": "text-start ps-4"},
"text": "appId",
},
{
"component": "th",
"props": {"class": "text-start ps-4"},
"text": "更新IP",
},
{
"component": "th",
"props": {"class": "text-start ps-4"},
"text": "返回值",
},
{
"component": "th",
"props": {"class": "text-start ps-4"},
"text": "更新时间",
},
],
},
{
"component": "tbody",
"content": update_log_trs,
},
],
},
],
}
],
}
]
def stop_service(self):
"""没有后台任务时可以留空。"""
pass
def _get_key(self):
logger.info("开始获取登录二维码key")
url = "https://work.weixin.qq.com/wework_admin/wwqrlogin/mng/get_key"
current_ts = int(time.time() * 1000)
params = {
'r': str(random.random()),
'login_type': "login_admin",
'callback': f"wwqrloginCallback_{current_ts}",
'redirect_uri': "https://work.weixin.qq.com/wework_admin/loginpage_wx?_r=234&redirect_uri=https%3A%2F%2Fwork.weixin.qq.com%2Fwework_admin%2Fframe&url_hash=%23%2Fapps#/apps",
'crossorigin': "1"
}
response = self._se.get(url, params=params, headers=self._headers)
logger.info(f"获取登录二维码key成功,返回值:{response.text}")
return response.json().get('data', {}).get('qrcode_key')
def _qrcode(self, key) -> str:
logger.info("开始获取登录二维码图片")
url = "https://work.weixin.qq.com/wework_admin/wwqrlogin/mng/qrcode"
params = {
'qrcode_key': key,
'login_type': "login_admin"
}
response = self._se.get(url, params=params, headers=self._headers)
logger.info("登录二维码图片获取成功")
img_path: Path = self.get_data_path() / f"WeChatQr.jpg"
img_path.write_bytes(response.content)
logger.info(f"登录二维码已写入文件,路径:{img_path}")
uri = f"/api/v1/plugin/{self.__class__.__name__}/img/{uuid.uuid4().__str__().replace('-', '')}?apikey={settings.API_TOKEN}"
img_url = settings.MP_DOMAIN(uri) or f"http://127.0.0.1:{settings.PORT}{uri}"
logger.info(f"构建二维码地址为:{img_url}")
return img_url
def _check(self, key) -> Dict:
logger.info(f"开始获取扫码结果")
for _ in range(2):
url = "https://work.weixin.qq.com/wework_admin/wwqrlogin/mng/check"
params = {
'qrcode_key': key,
'status': "QRCODE_SCAN_ING"
}
response = self._se.get(url, params=params, headers=self._headers)
data = response.json().get('data', {})
logger.info(f"扫码结果获取完成:{response.text}")
if data.get("status") == "QRCODE_SCAN_SUCC":
return data
time.sleep(1)
logger.info(f"获取扫码结果超时")
return None
def _loginpage_wx(self, key, code) -> requests.Response:
logger.info(f"开始登录")
url = "https://work.weixin.qq.com/wework_admin/loginpage_wx"
params = {
'_r': "234",
'redirect_uri': "https://work.weixin.qq.com/wework_admin/frame",
'url_hash': "#/apps",
'code': code,
'auth_redirect_time': "1780446137000",
'getauth_time': "1780446137000",
'wwqrlogin': "1",
'qrcode_key': key,
'auth_source': "SOURCE_FROM_WEWORK",
'confirm_type': "0"
}
response = self._se.get(url, params=params, headers=self._headers)
logger.info(f"登录完成,返回值:{response.text}")
return response
def _confirm_captcha(self, tl_key, captcha):
logger.info(f"开始提交验证码")
_url = "https://work.weixin.qq.com/wework_admin/mobile_confirm/confirm_captcha?ajax=1&f=json&d2st="
_data = {
"captcha": captcha,
"tl_key": tl_key
}
res = self._se.post(_url, json=_data, headers=self._headers)
logger.info(f"提交验证码返回值:{res.text}")
res = self._se.get(f"https://work.weixin.qq.com/wework_admin/login/choose_corp?tl_key={tl_key}")
logger.info(f"choose_corp接口返回值:{res.text}")
def _party_cache(self):
logger.info(f"开始获取企业信息,判断是否登录成功")
if not self._wwrtx_sid:
return False
url = "https://work.weixin.qq.com/wework_admin/contacts/party/cache"
params = {
'lang': "zh_CN",
'f': "json",
'ajax': "1",
'timeZoneInfo[zone_offset]': "-8",
}
self._se.cookies.set('wwrtx.sid', self._wwrtx_sid)
try:
res = self._se.post(url, params=params, headers=self._headers, timeout=10)
if res.status_code == 200:
data = res.json()
if 'errCode' not in res.text:
self._party_cache_data = data.get('data')
self._is_login = True
return True
else:
self._party_cache_data = data
else:
logger.error(f"获取企业微信部门缓存失败HTTP状态码{res.status_code}")
except Exception as e:
logger.error(f"获取企业微信部门缓存异常: {e}")
self._is_login = False
return False
def _login(self, channel, userid):
logger.info(f"触发登录回调,开始执行登录步骤")
check_data = self._check(self._qrcode_key)
if check_data:
code = check_data.get('auth_code')
res = self._loginpage_wx(self._qrcode_key, code)
if 'tl_key' in res.url:
logger.info(f"返回值中获取到tl_key,触发短信验证码")
self.post_message(
channel=channel,
title="短信验证码",
userid=userid,
buttons=self._get_buttons(),
text='\n'.join(
[
"触发验证码:",
f"如果按钮不可用,可回复:\n```\n/update_wechat_ip 验证码内容\n```"
]
),
)
parsed = urlparse(res.url)
query_params = parse_qs(parsed.query)
# 获取 tl_key 的值parse_qs 返回字典,每个键对应一个列表)
self._tl_key = query_params.get('tl_key', [None])[0]
else:
self._wwrtx_sid = self._se.cookies.get_dict().get('wwrtx.sid')
if self._party_cache():
logger.info(f"登录成功")
self._login_success()
self.post_message(
channel=channel,
title="登录成功",
userid=userid,
text=f"成功登录企业:{self._party_cache_data.get('party_list', {}).get('list', [{}])[0].get('name')}",
)
else:
logger.error(f"登录失败,返回值:{self._party_cache_data}")
self.post_message(
channel=channel,
title="登录失败",
userid=userid,
text=f"登录失败,返回值:{self._party_cache_data}",
)
def _save_ip_config(self):
logger.info(f"更新IP为:{self._ip}")
_update_log = []
url = 'https://work.weixin.qq.com/wework_admin/apps/saveIpConfig?lang=zh_CN&f=json&ajax=1'
for appId in self._app_id.split(','):
appId = appId.strip()
if not appId:
continue
data = {
'app_id': appId,
'ipList[]': self._ip
}
res = self._se.post(url, data=data, headers=self._headers)
if 'err' in res.text:
logger.error(f"{appId}更新IP白名单失败返回值{res.text}")
else:
logger.info(f'{appId}更新白名单成功更新IP为{self._ip},接口返回值:{res.text}')
_update_log.append(UpdateLogDto(
status='err' not in res.text,
ip=self._ip,
app_id=appId,
result=res.text
))
update_log: List[UpdateLogDto] = [UpdateLogDto.from_dict(i) for i in self.get_data(self._UpdateLogKey) or []]
self.save_data(self._UpdateLogKey, [i.to_dict() for i in update_log + _update_log])
def _login_success(self):
logger.info("保存配置文件")
self.update_config({
'_enabled': self._enabled,
'_wwrtx_sid': self._wwrtx_sid,
'_app_id': self._app_id,
'_party_cache_data': self._party_cache_data,
'_cron': self._cron,
})
def _get_buttons(self):
buttons = [
[
{
"text": str(j),
"callback_data": f"[PLUGIN]{self.__class__.__name__}|{j}|{self._qrcode_key}"
}
for j in range(i * 5, (i + 1) * 5)
]
for i in range(2)
]
buttons.append(
[{"text": f'输入完毕',
"callback_data": f"[PLUGIN]{self.__class__.__name__}|输入完毕|{self._qrcode_key}"}]
)
return buttons
def get_ip_from_url(self):
urls = self._ip_urls
for url in urls:
try:
response = requests.get(url, timeout=3)
if response.status_code == 200:
ip_address = re.search(self._ip_pattern, response.text)
if ip_address:
return ip_address.group()
except Exception as e:
if "104" not in str(e) and 'Read timed out' not in str(e): # 忽略网络波动,都失败会返回None, "获取IP失败"
logger.warning(f"{url} 获取IP失败, Error: {e}")
return "获取IP失败"
def _get_corp_app_v2(self):
logger.info(f"开始获取企业应用配置")
if not self._app_id:
logger.error("未配置应用ID")
return {}
app_id = self._app_id.split(",")[0].strip()
url = f'https://work.weixin.qq.com/wework_admin/apps/getCorpAppV2?lang=zh_CN&f=json&ajax=1&app_id={app_id}'
try:
res = self._se.get(url, timeout=10)
if res.status_code == 200:
return res.json().get('data', {})
else:
logger.error(f"获取企业应用配置失败HTTP状态码{res.status_code}")
except Exception as e:
logger.error(f"获取企业应用配置异常: {e}")
return {}
def check(self):
if not self._enabled:
logger.error("插件未开启")
return
self._party_cache()
if not self._is_login:
logger.error("未登录")
self.post_message(
title="企业微信登录状态失效",
text='企业微信登录状态失效,请重新操作登录'
)
return
self._ip = self.get_ip_from_url()
if not self._ip or self._ip == "获取IP失败":
logger.error("获取当前公网IP失败跳过本次检测")
return
app_config = self._get_corp_app_v2()
app_config_ips = app_config.get('app', {}).get('white_ip_list', {}).get('ip', [])
if self._ip not in app_config_ips:
self._save_ip_config()
self.post_message(
title='企业微信IP更新',
text="出发IP更新,最新IP为:" + self._ip
)
@dataclass
class UpdateLogDto:
status: bool
ip: str
app_id: str
result: str
UpdateTime: datetime = None
def __post_init__(self):
if self.UpdateTime is None:
self.UpdateTime = datetime.now()
def to_dict(self):
return {
"status": self.status,
"ip": self.ip,
"app_id": self.app_id,
"result": self.result,
"UpdateTime": self.UpdateTime.isoformat()
}
@classmethod
def from_dict(cls, data: dict):
# 深拷贝一份,避免修改原字典
kwargs = dict(data)
# 将 'UpdateTime' 字符串转为 datetime注意参数名对应 __init__ 的 update_time
kwargs['UpdateTime'] = datetime.fromisoformat(kwargs.pop('UpdateTime'))
return cls(**kwargs)

View File

@@ -0,0 +1 @@
requests>=2.34.2

View File

@@ -0,0 +1,185 @@
from pathlib import Path
import libraryscraper
from app.core.config import settings
from app.schemas import MediaType
from libraryscraper import LibraryScraper
def test_get_scrape_item_uses_media_folder_below_category(monkeypatch):
"""带分类目录的电影结构,应定位到影片目录,而不是动画电影这类分类目录。"""
monkeypatch.setattr(
settings,
"MOVIE_RENAME_FORMAT",
"{{title}}{% if year %} ({{year}}){% endif %}/{{title}}{{fileExt}}",
)
scraper_path = Path("/media/strm-new/电影")
file_path = scraper_path / "动画电影" / "哪吒之魔童闹海 (2025)" / "哪吒之魔童闹海 (2025).strm"
item = LibraryScraper._LibraryScraper__get_scrape_item(file_path, scraper_path, MediaType.MOVIE)
assert item == (scraper_path / "动画电影" / "哪吒之魔童闹海 (2025)", MediaType.MOVIE, "dir", None)
def test_get_scrape_item_uses_tv_root_relative_path(monkeypatch):
"""扫描根目录可直接配置到分类层,剧集目录计算仍应基于该根目录的相对路径。"""
monkeypatch.setattr(
settings,
"TV_RENAME_FORMAT",
"{{title}}{% if year %} ({{year}}){% endif %}/Season {{season}}/{{title}} - {{season_episode}}{{fileExt}}",
)
scraper_path = Path("/media/strm-new/电视剧/国产剧")
file_path = scraper_path / "狂飙 (2023)" / "Season 1" / "狂飙 - S01E01.strm"
item = LibraryScraper._LibraryScraper__get_scrape_item(file_path, scraper_path, MediaType.TV)
assert item == (scraper_path / "狂飙 (2023)", MediaType.TV, "dir", None)
def test_get_scrape_item_falls_back_to_file_when_rename_format_is_flat(monkeypatch):
"""重命名格式没有目录层级时,不能跳过文件,应退回到单文件刮削。"""
monkeypatch.setattr(settings, "TV_RENAME_FORMAT", "{{title}} - {{season_episode}}{{fileExt}}")
scraper_path = Path("/media/strm-new/电视剧/国产剧")
file_path = scraper_path / "狂飙 - S01E01.strm"
item = LibraryScraper._LibraryScraper__get_scrape_item(file_path, scraper_path, MediaType.TV)
assert item == (file_path, MediaType.TV, "file", None)
def test_get_scrape_item_uses_real_issue_paths_from_library_root(monkeypatch):
"""从媒体库根目录扫描时,应跳过电影/电视剧下的分类层,定位到真实媒体目录。"""
monkeypatch.setattr(
settings,
"MOVIE_RENAME_FORMAT",
"{{title}}{% if year %} ({{year}}){% endif %}/{{title}}{{fileExt}}",
)
monkeypatch.setattr(
settings,
"TV_RENAME_FORMAT",
"{{title}}{% if year %} ({{year}}){% endif %}/Season {{season}}/{{title}} - {{season_episode}}{{fileExt}}",
)
scraper_path = Path("/media/strm-new")
movie_file = scraper_path / "电影" / "动画电影" / "影片名" / "xxx.strm"
tv_file = scraper_path / "电视剧" / "国产剧" / "剧名" / "Season 1" / "xxx.strm"
movie_item = LibraryScraper._LibraryScraper__get_scrape_item(movie_file, scraper_path, MediaType.MOVIE)
tv_item = LibraryScraper._LibraryScraper__get_scrape_item(tv_file, scraper_path, MediaType.TV)
assert movie_item == (scraper_path / "电影" / "动画电影" / "影片名", MediaType.MOVIE, "dir", None)
assert tv_item == (scraper_path / "电视剧" / "国产剧" / "剧名", MediaType.TV, "dir", None)
def test_libraryscraper_scans_real_issue_paths(tmp_path, monkeypatch):
"""直接跑扫描入口,确认用户提到的两类路径不会停在分类目录。"""
root = tmp_path / "strm-new"
movie_dir = root / "电影" / "动画电影" / "影片名"
tv_dir = root / "电视剧" / "国产剧" / "剧名" / "Season 1"
movie_dir.mkdir(parents=True)
tv_dir.mkdir(parents=True)
(movie_dir / "xxx.strm").write_text("", encoding="utf-8")
(tv_dir / "xxx.strm").write_text("", encoding="utf-8")
class FakeMediaInfo:
tmdb_id = 129
def __init__(self, mtype):
self.type = mtype
class FakeChain:
def __init__(self):
self.recognized_types = []
def recognize_media(self, meta=None, mtype=None, tmdbid=None, **kwargs):
recognized_type = mtype or meta.type
self.recognized_types.append(recognized_type)
return FakeMediaInfo(recognized_type)
def obtain_images(self, mediainfo):
return None
class FakeMediaChain:
def __init__(self):
self.scraped_items = []
def scrape_metadata(self, fileitem, **kwargs):
self.scraped_items.append(fileitem)
fake_media_chain = FakeMediaChain()
monkeypatch.setattr(settings, "SCRAP_FOLLOW_TMDB", True)
monkeypatch.setattr(
settings,
"MOVIE_RENAME_FORMAT",
"{{title}}{% if year %} ({{year}}){% endif %}/{{title}}{{fileExt}}",
)
monkeypatch.setattr(
settings,
"TV_RENAME_FORMAT",
"{{title}}{% if year %} ({{year}}){% endif %}/Season {{season}}/{{title}} - {{season_episode}}{{fileExt}}",
)
monkeypatch.setattr(libraryscraper, "MediaChain", lambda: fake_media_chain)
plugin = LibraryScraper()
fake_chain = FakeChain()
plugin.chain = fake_chain
plugin.init_plugin({"scraper_paths": str(root), "exclude_paths": ""})
plugin._LibraryScraper__libraryscraper()
scraped_paths = {Path(fileitem.path.rstrip("/")) for fileitem in fake_media_chain.scraped_items}
assert scraped_paths == {movie_dir, tv_dir.parent}
assert fake_chain.recognized_types == [MediaType.MOVIE, MediaType.TV]
def test_forced_tv_root_skips_movie_category_paths():
"""配置媒体库根目录加 #电视剧 时,应跳过同级电影分类路径。"""
scraper_path = Path("/media/strm-new")
movie_file = scraper_path / "电影" / "动画电影" / "影片名" / "xxx.strm"
tv_file = scraper_path / "电视剧" / "国产剧" / "剧名" / "Season 1" / "xxx.strm"
assert not LibraryScraper._LibraryScraper__match_forced_type_path(movie_file, scraper_path, MediaType.TV)
assert LibraryScraper._LibraryScraper__match_forced_type_path(tv_file, scraper_path, MediaType.TV)
def test_scrape_dir_falls_back_to_child_files(tmp_path, monkeypatch):
"""分类目录识别失败后,应继续刮削目录内的具体媒体文件。"""
category_path = tmp_path / "电影" / "动画电影"
category_path.mkdir(parents=True)
media_file = category_path / "哪吒之魔童闹海 (2025).strm"
media_file.write_text("", encoding="utf-8")
class FakeMediaInfo:
tmdb_id = 129
type = MediaType.MOVIE
class FakeChain:
def __init__(self):
self.recognize_calls = 0
def recognize_media(self, **kwargs):
self.recognize_calls += 1
return None if self.recognize_calls == 1 else FakeMediaInfo()
def obtain_images(self, mediainfo):
return None
class FakeMediaChain:
def __init__(self):
self.scraped_items = []
def scrape_metadata(self, fileitem, **kwargs):
self.scraped_items.append(fileitem)
fake_chain = FakeChain()
fake_media_chain = FakeMediaChain()
monkeypatch.setattr(settings, "SCRAP_FOLLOW_TMDB", True)
monkeypatch.setattr(libraryscraper, "MediaChain", lambda: fake_media_chain)
plugin = LibraryScraper()
plugin.chain = fake_chain
plugin._LibraryScraper__scrape_path(category_path, MediaType.MOVIE, target_type="dir")
assert len(fake_media_chain.scraped_items) == 1
fileitem = fake_media_chain.scraped_items[0]
assert fileitem.type == "file"
assert fileitem.path == media_file.as_posix()