perf: optimize media recognition internals

This commit is contained in:
jxxghp
2026-05-15 13:37:36 +08:00
parent b2a18f9ae4
commit 2831eecbeb
9 changed files with 396 additions and 254 deletions

View File

@@ -14,6 +14,7 @@ class CustomizationMatcher(metaclass=Singleton):
self.systemconfig = SystemConfigOper()
self.customization = None
self.custom_separator = None
self._customization_re_cache = {}
@staticmethod
def _normalize_customization(customization):
@@ -42,10 +43,14 @@ class CustomizationMatcher(metaclass=Singleton):
return ""
self.customization = "|".join([f"({item})" for item in customization])
customization_re = re.compile(r"%s" % self.customization)
customization_re = self._customization_re_cache.get(self.customization)
if not customization_re:
# 配置每次读取、编译结果按规则缓存,兼顾实时生效和高频识别性能。
customization_re = re.compile(r"%s" % self.customization)
self._customization_re_cache[self.customization] = customization_re
# 处理重复多次的情况,保留先后顺序(按添加自定义占位符的顺序)
unique_customization = {}
for item in re.findall(customization_re, title):
for item in customization_re.findall(title):
if not isinstance(item, tuple):
item = (item,)
for i in range(len(item)):

View File

@@ -105,6 +105,7 @@ class MetaVideo(MetaBase):
tokens = Tokens(title)
# 实例化StreamingPlatforms对象
streaming_platforms = StreamingPlatforms()
media_exts = settings.RMT_MEDIAEXT + settings.RMT_SUBEXT + settings.RMT_AUDIOEXT
# 解析名称、年份、季、集、资源类型、分辨率等
token = tokens.get_next()
while token:
@@ -113,7 +114,7 @@ class MetaVideo(MetaBase):
self.__init_part(token, tokens)
# 标题
if self._continue_flag:
self.__init_name(token)
self.__init_name(token, media_exts)
# 年份
if self._continue_flag:
self.__init_year(token)
@@ -226,7 +227,7 @@ class MetaVideo(MetaBase):
name = None
return name
def __init_name(self, token: Optional[str]):
def __init_name(self, token: Optional[str], media_exts: list):
"""
识别名称
"""
@@ -313,7 +314,6 @@ class MetaVideo(MetaBase):
return
else:
# 后缀名不要
media_exts = settings.RMT_MEDIAEXT + settings.RMT_SUBEXT + settings.RMT_AUDIOEXT
if ".%s".lower() % token in media_exts:
return
# 英文或者英文+数字,拼装起来

View File

@@ -86,6 +86,18 @@ class ReleaseGroupsMatcher(metaclass=Singleton):
for release_group in site_groups:
release_groups.append(release_group)
self.__release_groups = '|'.join(release_groups)
self.systemconfig = SystemConfigOper()
self.__groups_re_cache = {}
def __get_groups_re(self, groups: str):
"""
发布组规则通常很长,按规则文本缓存编译结果,避免每个标题都重复编译。
"""
groups_re = self.__groups_re_cache.get(groups)
if not groups_re:
groups_re = re.compile(r"(?<=[-@\[£【&])(?:(?:%s))(?=$|[@.\s\]\[】&])" % groups, re.I)
self.__groups_re_cache[groups] = groups_re
return groups_re
def match(self, title: str = None, groups: str = None):
"""
@@ -97,7 +109,7 @@ class ReleaseGroupsMatcher(metaclass=Singleton):
return ""
if not groups:
# 自定义组
custom_release_groups = SystemConfigOper().get(SystemConfigKey.CustomReleaseGroups)
custom_release_groups = self.systemconfig.get(SystemConfigKey.CustomReleaseGroups)
if isinstance(custom_release_groups, list):
custom_release_groups = list(filter(None, custom_release_groups))
if custom_release_groups:
@@ -106,9 +118,9 @@ class ReleaseGroupsMatcher(metaclass=Singleton):
else:
groups = self.__release_groups
title = f"{title} "
groups_re = re.compile(r"(?<=[-@\[£【&])(?:(?:%s))(?=$|[@.\s\]\[】&])" % groups, re.I)
groups_re = self.__get_groups_re(groups)
unique_groups = []
for item in re.findall(groups_re, title):
for item in groups_re.findall(title):
item_str = item[0] if isinstance(item, tuple) else item
if item_str not in unique_groups:
unique_groups.append(item_str)

View File

@@ -1,4 +1,4 @@
from typing import List, Tuple
from typing import List, Optional, Tuple
import cn2an
import regex as re
@@ -9,6 +9,10 @@ from app.schemas.types import SystemConfigKey
from app.utils.singleton import Singleton
_COMBINED_WORD_RE = re.compile(r'^\s*(.*?)\s*=>\s*(.*?)\s*&&\s*(.*?)\s*<>\s*(.*?)\s*>>\s*(.*?)\s*$')
_LEADING_ZERO_RE = re.compile(r"^0+")
class WordsMatcher(metaclass=Singleton):
def __init__(self):
@@ -28,37 +32,23 @@ class WordsMatcher(metaclass=Singleton):
if not word or word.startswith("#"):
continue
try:
if word.count(" => ") and word.count(" && ") and word.count(" >> ") and word.count(" <> "):
# 替换词
thc = str(re.findall(r'(.*?)\s*=>', word)[0]).strip()
# 被替换词
bthc = str(re.findall(r'=>\s*(.*?)\s*&&', word)[0]).strip()
# 集偏移前字段
pyq = str(re.findall(r'&&\s*(.*?)\s*<>', word)[0]).strip()
# 集偏移后字段
pyh = str(re.findall(r'<>(.*?)\s*>>', word)[0]).strip()
# 集偏移
offsets = str(re.findall(r'>>\s*(.*?)$', word)[0]).strip()
word_info = self.__parse_word(word)
if not word_info:
continue
word_type, params = word_info
if word_type == "replace_and_offset":
thc, bthc, pyq, pyh, offsets = params
# 替换词
title, message, state = self.__replace_regex(title, thc, bthc)
if state:
# 替换词成功再进行集偏移
title, message, state = self.__episode_offset(title, pyq, pyh, offsets)
elif word.count(" => "):
# 替换词
strings = word.split(" => ")
title, message, state = self.__replace_regex(title, strings[0], strings[1])
elif word.count(" >> ") and word.count(" <> "):
# 集偏移
strings = word.split(" <> ")
offsets = strings[1].split(" >> ")
strings[1] = offsets[0]
title, message, state = self.__episode_offset(title, strings[0], strings[1], offsets[1])
else:
# 屏蔽词
if not word.strip():
continue
title, message, state = self.__replace_regex(title, word, "")
elif word_type == "replace":
title, message, state = self.__replace_regex(title, params[0], params[1])
elif word_type == "offset":
title, message, state = self.__episode_offset(title, params[0], params[1], params[2])
else: # block
title, message, state = self.__replace_regex(title, params[0], "")
if state:
appley_words.append(word)
@@ -68,16 +58,37 @@ class WordsMatcher(metaclass=Singleton):
return title, appley_words
@staticmethod
def __parse_word(word: str) -> Optional[Tuple[str, Tuple[str, ...]]]:
"""
解析识别词格式。复杂识别词保留原来的字段含义,只把多次正则提取合并为一次。
"""
if word.count(" => ") and word.count(" && ") and word.count(" >> ") and word.count(" <> "):
word_match = _COMBINED_WORD_RE.match(word)
if not word_match:
raise ValueError("复杂识别词格式不正确")
return "replace_and_offset", tuple(item.strip() for item in word_match.groups())
if word.count(" => "):
strings = word.split(" => ")
return "replace", (strings[0], strings[1])
if word.count(" >> ") and word.count(" <> "):
strings = word.split(" <> ")
offsets = strings[1].split(" >> ")
strings[1] = offsets[0]
return "offset", (strings[0], strings[1], offsets[1])
if not word.strip():
return None
return "block", (word,)
@staticmethod
def __replace_regex(title: str, replaced: str, replace: str) -> Tuple[str, str, bool]:
"""
正则替换
"""
try:
if not re.findall(r'%s' % replaced, title):
return title, "", False
else:
return re.sub(r'%s' % replaced, r'%s' % replace, title), "", True
replaced_re = re.compile(r'%s' % replaced)
title, count = replaced_re.subn(r'%s' % replace, title)
return title, "", count > 0
except Exception as err:
logger.warn(f"自定义识别词正则替换失败:{str(err)} - 标题:{title},被替换词:{replaced},替换词:{replace}")
return title, str(err), False
@@ -112,9 +123,9 @@ class WordsMatcher(metaclass=Singleton):
if not episode_num_str.isdigit():
episode_num_offset_str = cn2an.an2cn(episode_num_offset_int, "low")
else:
count_0 = re.findall(r"^0+", episode_num_str)
count_0 = _LEADING_ZERO_RE.search(episode_num_str)
if count_0:
episode_num_offset_str = f"{count_0[0]}{episode_num_offset_int}"
episode_num_offset_str = f"{count_0.group(0)}{episode_num_offset_int}"
else:
episode_num_offset_str = str(episode_num_offset_int)
episode_nums_offset_str.append(episode_num_offset_str)

View File

@@ -14,6 +14,60 @@ from app.log import logger
from app.schemas.types import MediaType
_ANIME_BRACKET_RE = re.compile(r'【[+0-9XVPI-]+】\s*【', re.IGNORECASE)
_ANIME_DASH_EPISODE_RE = re.compile(r'\s+-\s+[\dv]{1,4}\s+', re.IGNORECASE)
_VIDEO_SEASON_EPISODE_RE = re.compile(
r"S\d{2}\s*-\s*S\d{2}|S\d{2}|\s+S\d{1,2}|"
r"EP?\d{2,4}\s*-\s*EP?\d{2,4}|EP?\d{2,4}|\s+EP?\d{1,4}",
re.IGNORECASE,
)
_ANIME_SQUARE_BRACKET_RE = re.compile(r'\[[+0-9XVPI-]+]\s*\[', re.IGNORECASE)
_BRACED_METAINFO_RE = re.compile(r'(?<={\[)[\W\w]+(?=]})')
_BRACED_TMDBID_RE = re.compile(r'(?<=tmdbid=)\d+')
_BRACED_DOUBANID_RE = re.compile(r'(?<=doubanid=)\d+')
_BRACED_TYPE_RE = re.compile(r'(?<=type=)\w+')
_BRACED_BEGIN_SEASON_RE = re.compile(r'(?<=s=)\d+')
_BRACED_END_SEASON_RE = re.compile(r'(?<=s=\d+-)\d+')
_BRACED_BEGIN_EPISODE_RE = re.compile(r'(?<=e=)\d+')
_BRACED_END_EPISODE_RE = re.compile(r'(?<=e=\d+-)\d+')
_EMBY_TMDB_RE_LIST = (
re.compile(r'\[tmdbid[=\-](\d+)\]'),
re.compile(r'\[tmdb[=\-](\d+)\]'),
re.compile(r'\{tmdbid[=\-](\d+)\}'),
re.compile(r'\{tmdb[=\-](\d+)\}'),
)
def _empty_metainfo() -> dict:
"""
返回媒体标签的默认结构,避免不同识别请求之间共享可变状态。
"""
return {
'tmdbid': None,
'doubanid': None,
'type': None,
'begin_season': None,
'end_season': None,
'total_season': None,
'begin_episode': None,
'end_episode': None,
'total_episode': None,
}
def _apply_range_total(metainfo: dict, begin_key: str, end_key: str, total_key: str) -> None:
"""
计算季/集范围总数;保留原有倒序输入自动交换的兼容行为。
"""
if metainfo.get(begin_key) and metainfo.get(end_key):
if metainfo[begin_key] > metainfo[end_key]:
metainfo[begin_key], metainfo[end_key] = metainfo[end_key], metainfo[begin_key]
metainfo[total_key] = metainfo[end_key] - metainfo[begin_key] + 1
elif metainfo.get(begin_key) and not metainfo.get(end_key):
metainfo[total_key] = 1
def _build_meta_info(
title: str,
subtitle: Optional[str] = None,
@@ -30,10 +84,11 @@ def _build_meta_info(
title, metainfo = find_metainfo(title)
# 判断是否处理文件
media_exts = settings.RMT_MEDIAEXT + settings.RMT_SUBEXT + settings.RMT_AUDIOEXT
if title and Path(title).suffix.lower() in media_exts:
title_path = Path(title) if title else None
if title_path and title_path.suffix.lower() in media_exts:
isfile = True
# 去掉后缀
title = Path(title).stem
title = title_path.stem
else:
isfile = False
# 识别
@@ -115,15 +170,13 @@ def is_anime(name: str) -> bool:
"""
if not name:
return False
if re.search(r'【[+0-9XVPI-]+】\s*【', name, re.IGNORECASE):
if _ANIME_BRACKET_RE.search(name):
return True
if re.search(r'\s+-\s+[\dv]{1,4}\s+', name, re.IGNORECASE):
if _ANIME_DASH_EPISODE_RE.search(name):
return True
if re.search(r"S\d{2}\s*-\s*S\d{2}|S\d{2}|\s+S\d{1,2}|EP?\d{2,4}\s*-\s*EP?\d{2,4}|EP?\d{2,4}|\s+EP?\d{1,4}",
name,
re.IGNORECASE):
if _VIDEO_SEASON_EPISODE_RE.search(name):
return False
if re.search(r'\[[+0-9XVPI-]+]\s*\[', name, re.IGNORECASE):
if _ANIME_SQUARE_BRACKET_RE.search(name):
return True
return False
@@ -132,95 +185,62 @@ def find_metainfo(title: str) -> Tuple[str, dict]:
"""
从标题中提取媒体信息
"""
metainfo = {
'tmdbid': None,
'doubanid': None,
'type': None,
'begin_season': None,
'end_season': None,
'total_season': None,
'begin_episode': None,
'end_episode': None,
'total_episode': None,
}
metainfo = _empty_metainfo()
if not title:
return title, metainfo
# 从标题中提取媒体信息 格式为{[tmdbid=xxx;type=xxx;s=xxx;e=xxx]}
results = re.findall(r'(?<={\[)[\W\w]+(?=]})', title)
results = _BRACED_METAINFO_RE.findall(title)
if results:
for result in results:
# 查找tmdbid信息
tmdbid = re.findall(r'(?<=tmdbid=)\d+', result)
if tmdbid and tmdbid[0].isdigit():
metainfo['tmdbid'] = tmdbid[0]
tmdbid = _BRACED_TMDBID_RE.search(result)
if tmdbid and tmdbid.group(0).isdigit():
metainfo['tmdbid'] = tmdbid.group(0)
# 查找豆瓣id信息
doubanid = re.findall(r'(?<=doubanid=)\d+', result)
if doubanid and doubanid[0].isdigit():
metainfo['doubanid'] = doubanid[0]
doubanid = _BRACED_DOUBANID_RE.search(result)
if doubanid and doubanid.group(0).isdigit():
metainfo['doubanid'] = doubanid.group(0)
# 查找媒体类型
mtype = re.findall(r'(?<=type=)\w+', result)
mtype = _BRACED_TYPE_RE.search(result)
if mtype:
if mtype[0] == "movies":
media_type = mtype.group(0)
if media_type == "movies":
metainfo['type'] = MediaType.MOVIE
elif mtype[0] == "tv":
elif media_type == "tv":
metainfo['type'] = MediaType.TV
# 查找季信息
begin_season = re.findall(r'(?<=s=)\d+', result)
if begin_season and begin_season[0].isdigit():
metainfo['begin_season'] = int(begin_season[0])
end_season = re.findall(r'(?<=s=\d+-)\d+', result)
if end_season and end_season[0].isdigit():
metainfo['end_season'] = int(end_season[0])
begin_season = _BRACED_BEGIN_SEASON_RE.search(result)
if begin_season and begin_season.group(0).isdigit():
metainfo['begin_season'] = int(begin_season.group(0))
end_season = _BRACED_END_SEASON_RE.search(result)
if end_season and end_season.group(0).isdigit():
metainfo['end_season'] = int(end_season.group(0))
# 查找集信息
begin_episode = re.findall(r'(?<=e=)\d+', result)
if begin_episode and begin_episode[0].isdigit():
metainfo['begin_episode'] = int(begin_episode[0])
end_episode = re.findall(r'(?<=e=\d+-)\d+', result)
if end_episode and end_episode[0].isdigit():
metainfo['end_episode'] = int(end_episode[0])
begin_episode = _BRACED_BEGIN_EPISODE_RE.search(result)
if begin_episode and begin_episode.group(0).isdigit():
metainfo['begin_episode'] = int(begin_episode.group(0))
end_episode = _BRACED_END_EPISODE_RE.search(result)
if end_episode and end_episode.group(0).isdigit():
metainfo['end_episode'] = int(end_episode.group(0))
# 去除title中该部分
if tmdbid or mtype or begin_season or end_season or begin_episode or end_episode:
title = title.replace(f"{{[{result}]}}", '')
# 支持Emby格式的ID标签
# 1. [tmdbid=xxxx] 或 [tmdbid-xxxx] 格式
tmdb_match = re.search(r'\[tmdbid[=\-](\d+)\]', title)
# 支持Emby格式的ID标签;第一个 [tmdbid] 历史上始终优先处理,用于覆盖前面 {[...]} 中的旧标签。
tmdb_match = _EMBY_TMDB_RE_LIST[0].search(title)
if tmdb_match:
metainfo['tmdbid'] = tmdb_match.group(1)
title = re.sub(r'\[tmdbid[=\-](\d+)\]', '', title).strip()
# 2. [tmdb=xxxx] [tmdb-xxxx] 格式
if not metainfo['tmdbid']:
tmdb_match = re.search(r'\[tmdb[=\-](\d+)\]', title)
if tmdb_match:
metainfo['tmdbid'] = tmdb_match.group(1)
title = re.sub(r'\[tmdb[=\-](\d+)\]', '', title).strip()
# 3. {tmdbid=xxxx} 或 {tmdbid-xxxx} 格式
if not metainfo['tmdbid']:
tmdb_match = re.search(r'\{tmdbid[=\-](\d+)\}', title)
if tmdb_match:
metainfo['tmdbid'] = tmdb_match.group(1)
title = re.sub(r'\{tmdbid[=\-](\d+)\}', '', title).strip()
# 4. {tmdb=xxxx} 或 {tmdb-xxxx} 格式
if not metainfo['tmdbid']:
tmdb_match = re.search(r'\{tmdb[=\-](\d+)\}', title)
if tmdb_match:
metainfo['tmdbid'] = tmdb_match.group(1)
title = re.sub(r'\{tmdb[=\-](\d+)\}', '', title).strip()
title = _EMBY_TMDB_RE_LIST[0].sub('', title).strip()
elif not metainfo['tmdbid']:
# 保持原有优先级:[tmdbid] > [tmdb] > {tmdbid} > {tmdb}
for tmdb_re in _EMBY_TMDB_RE_LIST[1:]:
tmdb_match = tmdb_re.search(title)
if tmdb_match:
metainfo['tmdbid'] = tmdb_match.group(1)
title = tmdb_re.sub('', title).strip()
break
# 计算季集总数
if metainfo.get('begin_season') and metainfo.get('end_season'):
if metainfo['begin_season'] > metainfo['end_season']:
metainfo['begin_season'], metainfo['end_season'] = metainfo['end_season'], metainfo['begin_season']
metainfo['total_season'] = metainfo['end_season'] - metainfo['begin_season'] + 1
elif metainfo.get('begin_season') and not metainfo.get('end_season'):
metainfo['total_season'] = 1
if metainfo.get('begin_episode') and metainfo.get('end_episode'):
if metainfo['begin_episode'] > metainfo['end_episode']:
metainfo['begin_episode'], metainfo['end_episode'] = metainfo['end_episode'], metainfo['begin_episode']
metainfo['total_episode'] = metainfo['end_episode'] - metainfo['begin_episode'] + 1
elif metainfo.get('begin_episode') and not metainfo.get('end_episode'):
metainfo['total_episode'] = 1
_apply_range_total(metainfo, 'begin_season', 'end_season', 'total_season')
_apply_range_total(metainfo, 'begin_episode', 'end_episode', 'total_episode')
return title, metainfo

View File

@@ -71,6 +71,42 @@ class DoubanModule(_ModuleBase):
"""
return 2
@staticmethod
def _prepare_search_names(meta: MetaBase) -> List[str]:
"""
准备搜索名称列表,保留中英文名称分别识别且按顺序去重的历史行为。
"""
# 简体名称
zh_name = zhconv.convert(meta.cn_name, "zh-hans") if meta.cn_name else None
# 使用中英文名分别识别,去重去空,但要保持顺序
return list(dict.fromkeys([k for k in [meta.cn_name, zh_name, meta.en_name] if k]))
@staticmethod
def _build_search_medias_result(meta: MetaBase, items: Optional[List[dict]]) -> List[MediaInfo]:
"""
构建豆瓣搜索结果,并沿用原有的类型、标题包含和季信息处理规则。
"""
if not items:
return []
ret_medias = []
for item_obj in items:
if meta.type and meta.type != MediaType.UNKNOWN and meta.type.value != item_obj.get("type_name"):
continue
if item_obj.get("type_name") not in (MediaType.TV.value, MediaType.MOVIE.value):
continue
if meta.name not in item_obj.get("target", {}).get("title"):
continue
ret_medias.append(MediaInfo(douban_info=item_obj.get("target")))
# 将搜索词中的季写入标题中
if ret_medias and meta.begin_season:
# 小写数据转大写
season_str = cn2an.an2cn(meta.begin_season, "low")
for media in ret_medias:
if media.type == MediaType.TV:
media.title = f"{media.title}{season_str}"
media.season = meta.begin_season
return ret_medias
def _recognize_media_core(self, meta: MetaBase = None,
mtype: MediaType = None,
doubanid: Optional[str] = None,
@@ -107,7 +143,7 @@ class DoubanModule(_ModuleBase):
meta.type = mtype
if doubanid:
meta.doubanid = doubanid
cache_info = self.cache.get(meta)
cache_info = self.cache.get(meta) if cache else {}
cache_hit = False
# 识别豆瓣信息
@@ -118,11 +154,7 @@ class DoubanModule(_ModuleBase):
info = douban_info_func(doubanid=doubanid, mtype=mtype or meta.type)
elif meta:
info = {}
# 简体名称
zh_name = zhconv.convert(meta.cn_name, "zh-hans") if meta.cn_name else None
# 使用中英文名分别识别,去重去空,但要保持顺序
names = list(dict.fromkeys([k for k in [meta.cn_name, zh_name, meta.en_name] if k]))
for name in names:
for name in self._prepare_search_names(meta):
if meta.begin_season:
logger.info(f"正在识别 {name}{meta.begin_season}季 ...")
else:
@@ -211,7 +243,7 @@ class DoubanModule(_ModuleBase):
meta.type = mtype
if doubanid:
meta.doubanid = doubanid
cache_info = self.cache.get(meta)
cache_info = self.cache.get(meta) if cache else {}
cache_hit = False
# 识别豆瓣信息
@@ -222,11 +254,7 @@ class DoubanModule(_ModuleBase):
info = await async_douban_info_func(doubanid=doubanid, mtype=mtype or meta.type)
elif meta:
info = {}
# 简体名称
zh_name = zhconv.convert(meta.cn_name, "zh-hans") if meta.cn_name else None
# 使用中英文名分别识别,去重去空,但要保持顺序
names = list(dict.fromkeys([k for k in [meta.cn_name, zh_name, meta.en_name] if k]))
for name in names:
for name in self._prepare_search_names(meta):
if meta.begin_season:
logger.info(f"正在识别 {name}{meta.begin_season}季 ...")
else:
@@ -913,24 +941,7 @@ class DoubanModule(_ModuleBase):
if not result or not result.get("items"):
return []
# 返回数据
ret_medias = []
for item_obj in result.get("items"):
if meta.type and meta.type != MediaType.UNKNOWN and meta.type.value != item_obj.get("type_name"):
continue
if item_obj.get("type_name") not in (MediaType.TV.value, MediaType.MOVIE.value):
continue
if meta.name not in item_obj.get("target", {}).get("title"):
continue
ret_medias.append(MediaInfo(douban_info=item_obj.get("target")))
# 将搜索词中的季写入标题中
if ret_medias and meta.begin_season:
# 小写数据转大写
season_str = cn2an.an2cn(meta.begin_season, "low")
for media in ret_medias:
if media.type == MediaType.TV:
media.title = f"{media.title}{season_str}"
media.season = meta.begin_season
return ret_medias
return self._build_search_medias_result(meta, result.get("items"))
async def async_search_medias(self, meta: MetaBase) -> Optional[List[MediaInfo]]:
"""
@@ -946,24 +957,7 @@ class DoubanModule(_ModuleBase):
if not result or not result.get("items"):
return []
# 返回数据
ret_medias = []
for item_obj in result.get("items"):
if meta.type and meta.type != MediaType.UNKNOWN and meta.type.value != item_obj.get("type_name"):
continue
if item_obj.get("type_name") not in (MediaType.TV.value, MediaType.MOVIE.value):
continue
if meta.name not in item_obj.get("target", {}).get("title"):
continue
ret_medias.append(MediaInfo(douban_info=item_obj.get("target")))
# 将搜索词中的季写入标题中
if ret_medias and meta.begin_season:
# 小写数据转大写
season_str = cn2an.an2cn(meta.begin_season, "low")
for media in ret_medias:
if media.type == MediaType.TV:
media.title = f"{media.title}{season_str}"
media.season = meta.begin_season
return ret_medias
return self._build_search_medias_result(meta, result.get("items"))
def search_persons(self, name: str) -> Optional[List[MediaPerson]]:
"""

View File

@@ -19,6 +19,8 @@ from app.schemas.types import MediaType, MediaImageType, ModuleType, MediaRecogn
from app.utils.http import RequestUtils
_DATE_RE = re.compile(r"^\d{4}-\d{2}-\d{2}$")
class TheMovieDbModule(_ModuleBase):
"""
@@ -118,6 +120,59 @@ class TheMovieDbModule(_ModuleBase):
# 使用中英文名分别识别,去重去空,但要保持顺序
return list(dict.fromkeys([k for k in [meta.cn_name, zh_name, meta.en_name] if k]))
@staticmethod
def _fill_group_season_info(mediainfo: MediaInfo, episode_group: Optional[str],
group_seasons: List[dict]) -> None:
"""
将指定剧集组的季、集、年份信息写入 MediaInfo。
"""
seasons = {}
season_info = []
season_years = {}
for group_season in group_seasons:
# 季
season = group_season.get("order")
# 集列表
episodes = group_season.get("episodes")
if not episodes:
continue
seasons[season] = [ep.get("episode_number") for ep in episodes]
season_info.append(group_season)
# 当前季第一集时间
first_date = episodes[0].get("air_date")
if first_date and _DATE_RE.match(first_date):
season_years[season] = str(first_date).split("-")[0]
# 每季集清单
if seasons:
mediainfo.seasons = seasons
mediainfo.number_of_seasons = len(seasons)
# 每季集详情
if season_info:
mediainfo.season_info = season_info
# 每季年份
if season_years:
mediainfo.season_years = season_years
# 所有剧集组
mediainfo.episode_group = episode_group
mediainfo.episode_groups = group_seasons
@staticmethod
def _build_search_medias_result(meta: MetaBase, results: Optional[List[dict]]) -> List[MediaInfo]:
"""
构建搜索结果,并沿用原有逻辑把搜索词中的季写入电视剧标题中。
"""
if not results:
return []
medias = [MediaInfo(tmdb_info=info) for info in results]
if meta.begin_season:
# 小写数据转大写
season_str = cn2an.an2cn(meta.begin_season, "low")
for media in medias:
if media.type == MediaType.TV:
media.title = f"{media.title}{season_str}"
media.season = meta.begin_season
return medias
def _get_info_by_tmdbid(self, tmdbid: int, mtype: Optional[MediaType],
meta: Optional[MetaBase]) -> Optional[dict]:
"""
@@ -289,36 +344,7 @@ class TheMovieDbModule(_ModuleBase):
"""
if mediainfo.type == MediaType.TV and mediainfo.episode_groups:
if group_seasons:
# 指定剧集组时
seasons = {}
season_info = []
season_years = {}
for group_season in group_seasons:
# 季
season = group_season.get("order")
# 集列表
episodes = group_season.get("episodes")
if not episodes:
continue
seasons[season] = [ep.get("episode_number") for ep in episodes]
season_info.append(group_season)
# 当前季第一季时间
first_date = episodes[0].get("air_date")
if re.match(r"^\d{4}-\d{2}-\d{2}$", first_date):
season_years[season] = str(first_date).split("-")[0]
# 每季集清单
if seasons:
mediainfo.seasons = seasons
mediainfo.number_of_seasons = len(seasons)
# 每季集详情
if season_info:
mediainfo.season_info = season_info
# 每季年份
if season_years:
mediainfo.season_years = season_years
# 所有剧集组
mediainfo.episode_group = episode_group
mediainfo.episode_groups = group_seasons
self._fill_group_season_info(mediainfo, episode_group, group_seasons)
else:
# 每季年份
season_years = {}
@@ -337,7 +363,7 @@ class TheMovieDbModule(_ModuleBase):
# 当前季第一季时间
first_date = episodes[0].get("air_date")
# 判断是不是日期格式
if first_date and re.match(r"^\d{4}-\d{2}-\d{2}$", first_date):
if first_date and _DATE_RE.match(first_date):
season_years[season] = str(first_date).split("-")[0]
if season_years:
mediainfo.season_years = season_years
@@ -350,36 +376,7 @@ class TheMovieDbModule(_ModuleBase):
"""
if mediainfo.type == MediaType.TV and mediainfo.episode_groups:
if group_seasons:
# 指定剧集组时
seasons = {}
season_info = []
season_years = {}
for group_season in group_seasons:
# 季
season = group_season.get("order")
# 集列表
episodes = group_season.get("episodes")
if not episodes:
continue
seasons[season] = [ep.get("episode_number") for ep in episodes]
season_info.append(group_season)
# 当前季第一季时间
first_date = episodes[0].get("air_date")
if re.match(r"^\d{4}-\d{2}-\d{2}$", first_date):
season_years[season] = str(first_date).split("-")[0]
# 每季集清单
if seasons:
mediainfo.seasons = seasons
mediainfo.number_of_seasons = len(seasons)
# 每季集详情
if season_info:
mediainfo.season_info = season_info
# 每季年份
if season_years:
mediainfo.season_years = season_years
# 所有剧集组
mediainfo.episode_group = episode_group
mediainfo.episode_groups = group_seasons
self._fill_group_season_info(mediainfo, episode_group, group_seasons)
else:
# 每季年份
season_years = {}
@@ -398,7 +395,7 @@ class TheMovieDbModule(_ModuleBase):
# 当前季第一季时间
first_date = episodes[0].get("air_date")
# 判断是不是日期格式
if first_date and re.match(r"^\d{4}-\d{2}-\d{2}$", first_date):
if first_date and _DATE_RE.match(first_date):
season_years[season] = str(first_date).split("-")[0]
if season_years:
mediainfo.season_years = season_years
@@ -484,7 +481,7 @@ class TheMovieDbModule(_ModuleBase):
meta.type = mtype
if tmdbid:
meta.tmdbid = tmdbid
cache_info = self.cache.get(meta)
cache_info = self.cache.get(meta) if cache else {}
# 查询剧集组
group_seasons = []
@@ -573,7 +570,7 @@ class TheMovieDbModule(_ModuleBase):
meta.type = mtype
if tmdbid:
meta.tmdbid = tmdbid
cache_info = self.cache.get(meta)
cache_info = self.cache.get(meta) if cache else {}
# 查询剧集组
group_seasons = []
@@ -764,17 +761,7 @@ class TheMovieDbModule(_ModuleBase):
else:
results = self.tmdb.search_tvs(meta.name, meta.year)
# 将搜索词中的季写入标题中
if results:
medias = [MediaInfo(tmdb_info=info) for info in results]
if meta.begin_season:
# 小写数据转大写
season_str = cn2an.an2cn(meta.begin_season, "low")
for media in medias:
if media.type == MediaType.TV:
media.title = f"{media.title}{season_str}"
media.season = meta.begin_season
return medias
return []
return self._build_search_medias_result(meta, results)
def search_persons(self, name: str) -> Optional[List[schemas.MediaPerson]]:
"""
@@ -1206,17 +1193,7 @@ class TheMovieDbModule(_ModuleBase):
else:
results = await self.tmdb.async_search_tvs(meta.name, meta.year)
# 将搜索词中的季写入标题中
if results:
medias = [MediaInfo(tmdb_info=info) for info in results]
if meta.begin_season:
# 小写数据转大写
season_str = cn2an.an2cn(meta.begin_season, "low")
for media in medias:
if media.type == MediaType.TV:
media.title = f"{media.title}{season_str}"
media.season = meta.begin_season
return medias
return []
return self._build_search_medias_result(meta, results)
async def async_tmdb_discover(self, mtype: MediaType, sort_by: str,
with_genres: str,

View File

@@ -0,0 +1,107 @@
import asyncio
from unittest import TestCase
from unittest.mock import Mock
from app.core.context import MediaInfo
from app.core.meta import MetaBase
from app.modules.douban import DoubanModule
from app.modules.themoviedb import TheMovieDbModule
from app.schemas.types import MediaType
class MediaRecognizeModulesTest(TestCase):
def test_tmdb_cache_false_skips_cache_lookup(self):
"""cache=False 时应跳过缓存读取,但仍按正常流程查询 TMDB。"""
module = TheMovieDbModule()
meta = MetaBase("测试电影")
meta.name = "测试电影"
meta.type = MediaType.MOVIE
module.cache = Mock()
module.tmdb = Mock()
module.tmdb.get_info.return_value = {
"id": 100,
"media_type": MediaType.MOVIE,
"title": "测试电影",
"genres": [],
}
module.category = Mock()
module.category.get_movie_category.return_value = None
result = module.recognize_media(meta=meta, tmdbid=100, cache=False)
self.assertIsInstance(result, MediaInfo)
self.assertEqual(result.tmdb_id, 100)
module.cache.get.assert_not_called()
module.cache.update.assert_called_once()
def test_async_tmdb_cache_false_skips_cache_lookup(self):
"""异步 cache=False 时也应跳过缓存读取。"""
module = TheMovieDbModule()
meta = MetaBase("测试电影")
meta.name = "测试电影"
meta.type = MediaType.MOVIE
module.cache = Mock()
module.tmdb = Mock()
async def _async_get_info(**kwargs):
return {
"id": 101,
"media_type": MediaType.MOVIE,
"title": "测试电影",
"genres": [],
}
module.tmdb.async_get_info = _async_get_info
module.category = Mock()
module.category.get_movie_category.return_value = None
result = asyncio.run(module.async_recognize_media(meta=meta, tmdbid=101, cache=False))
self.assertIsInstance(result, MediaInfo)
self.assertEqual(result.tmdb_id, 101)
module.cache.get.assert_not_called()
module.cache.update.assert_called_once()
def test_douban_prepare_search_names_deduplicates_simplified_name(self):
"""豆瓣候选名称应保留顺序,并去掉繁简转换后的重复项。"""
meta = MetaBase("流浪地球")
meta.cn_name = "流浪地球"
meta.en_name = "The Wandering Earth"
self.assertEqual(
DoubanModule._prepare_search_names(meta),
["流浪地球", "The Wandering Earth"],
)
def test_douban_search_result_helper_preserves_season_title_rule(self):
"""豆瓣搜索结果 helper 应保留电视剧标题追加季号的旧逻辑。"""
meta = MetaBase("测试剧")
meta.name = "测试剧"
meta.type = MediaType.TV
meta.begin_season = 2
items = [
{
"type_name": MediaType.TV.value,
"target": {
"id": "200",
"title": "测试剧",
"type": "tv",
"year": "2024",
},
},
{
"type_name": MediaType.MOVIE.value,
"target": {
"id": "201",
"title": "测试剧 电影版",
"type": "movie",
"year": "2024",
},
},
]
result = DoubanModule._build_search_medias_result(meta, items)
self.assertEqual(len(result), 1)
self.assertEqual(result[0].title, "测试剧 第二季")
self.assertEqual(result[0].season, 2)

View File

@@ -2,7 +2,7 @@
from pathlib import Path
from unittest import TestCase
from app.core.metainfo import MetaInfo, MetaInfoPath
from app.core.metainfo import MetaInfo, MetaInfoPath, find_metainfo
from tests.cases.meta import meta_cases
@@ -124,6 +124,22 @@ class MetaInfoTest(TestCase):
self.assertEqual(meta.name, "电影名称")
self.assertEqual(meta.original_name, "电影测试替换名称")
def test_custom_words_replace_then_episode_offset(self):
"""测试复杂识别词仍按先替换、后集数偏移的顺序处理"""
custom_words = ["旧名 => 新名 && 第 <> 集 >> EP+1"]
meta = MetaInfo(title="旧名 第03集", custom_words=custom_words)
self.assertEqual(meta.name, "新名")
self.assertEqual(meta.episode, "E04")
self.assertEqual(meta.apply_words, custom_words)
def test_emby_tmdbid_overrides_braced_metainfo_tmdbid(self):
"""
同时存在内嵌元信息和 Emby [tmdbid] 标签时,保持历史上的 [tmdbid] 优先级。
"""
title, metainfo = find_metainfo("Movie {[tmdbid=111;type=movies]} [tmdbid=222]")
self.assertEqual(metainfo["tmdbid"], "222")
self.assertNotIn("[tmdbid=222]", title)
def test_metainfopath_auxiliary_chinese_stem_uses_parent_title(self):
"""
文件名为简英双语/特效等压制标签、父目录为拉丁片名时,应合并父目录标题与年份。