From 2831eecbeb87c66f5346448d2dc12937ee82052b Mon Sep 17 00:00:00 2001 From: jxxghp Date: Fri, 15 May 2026 13:37:36 +0800 Subject: [PATCH] perf: optimize media recognition internals --- app/core/meta/customization.py | 9 +- app/core/meta/metavideo.py | 6 +- app/core/meta/releasegroup.py | 18 ++- app/core/meta/words.py | 77 ++++++----- app/core/metainfo.py | 176 ++++++++++++++------------ app/modules/douban/__init__.py | 90 ++++++------- app/modules/themoviedb/__init__.py | 149 +++++++++------------- tests/test_media_recognize_modules.py | 107 ++++++++++++++++ tests/test_metainfo.py | 18 ++- 9 files changed, 396 insertions(+), 254 deletions(-) create mode 100644 tests/test_media_recognize_modules.py diff --git a/app/core/meta/customization.py b/app/core/meta/customization.py index 9cdacc8d..f26f8b02 100644 --- a/app/core/meta/customization.py +++ b/app/core/meta/customization.py @@ -14,6 +14,7 @@ class CustomizationMatcher(metaclass=Singleton): self.systemconfig = SystemConfigOper() self.customization = None self.custom_separator = None + self._customization_re_cache = {} @staticmethod def _normalize_customization(customization): @@ -42,10 +43,14 @@ class CustomizationMatcher(metaclass=Singleton): return "" self.customization = "|".join([f"({item})" for item in customization]) - customization_re = re.compile(r"%s" % self.customization) + customization_re = self._customization_re_cache.get(self.customization) + if not customization_re: + # 配置每次读取、编译结果按规则缓存,兼顾实时生效和高频识别性能。 + customization_re = re.compile(r"%s" % self.customization) + self._customization_re_cache[self.customization] = customization_re # 处理重复多次的情况,保留先后顺序(按添加自定义占位符的顺序) unique_customization = {} - for item in re.findall(customization_re, title): + for item in customization_re.findall(title): if not isinstance(item, tuple): item = (item,) for i in range(len(item)): diff --git a/app/core/meta/metavideo.py b/app/core/meta/metavideo.py index 063c1749..c9da5ad8 100644 --- a/app/core/meta/metavideo.py +++ b/app/core/meta/metavideo.py @@ -105,6 +105,7 @@ class MetaVideo(MetaBase): tokens = Tokens(title) # 实例化StreamingPlatforms对象 streaming_platforms = StreamingPlatforms() + media_exts = settings.RMT_MEDIAEXT + settings.RMT_SUBEXT + settings.RMT_AUDIOEXT # 解析名称、年份、季、集、资源类型、分辨率等 token = tokens.get_next() while token: @@ -113,7 +114,7 @@ class MetaVideo(MetaBase): self.__init_part(token, tokens) # 标题 if self._continue_flag: - self.__init_name(token) + self.__init_name(token, media_exts) # 年份 if self._continue_flag: self.__init_year(token) @@ -226,7 +227,7 @@ class MetaVideo(MetaBase): name = None return name - def __init_name(self, token: Optional[str]): + def __init_name(self, token: Optional[str], media_exts: list): """ 识别名称 """ @@ -313,7 +314,6 @@ class MetaVideo(MetaBase): return else: # 后缀名不要 - media_exts = settings.RMT_MEDIAEXT + settings.RMT_SUBEXT + settings.RMT_AUDIOEXT if ".%s".lower() % token in media_exts: return # 英文或者英文+数字,拼装起来 diff --git a/app/core/meta/releasegroup.py b/app/core/meta/releasegroup.py index 82d42af6..0f11dd54 100644 --- a/app/core/meta/releasegroup.py +++ b/app/core/meta/releasegroup.py @@ -86,6 +86,18 @@ class ReleaseGroupsMatcher(metaclass=Singleton): for release_group in site_groups: release_groups.append(release_group) self.__release_groups = '|'.join(release_groups) + self.systemconfig = SystemConfigOper() + self.__groups_re_cache = {} + + def __get_groups_re(self, groups: str): + """ + 发布组规则通常很长,按规则文本缓存编译结果,避免每个标题都重复编译。 + """ + groups_re = self.__groups_re_cache.get(groups) + if not groups_re: + groups_re = re.compile(r"(?<=[-@\[£【&])(?:(?:%s))(?=$|[@.\s\]\[】&])" % groups, re.I) + self.__groups_re_cache[groups] = groups_re + return groups_re def match(self, title: str = None, groups: str = None): """ @@ -97,7 +109,7 @@ class ReleaseGroupsMatcher(metaclass=Singleton): return "" if not groups: # 自定义组 - custom_release_groups = SystemConfigOper().get(SystemConfigKey.CustomReleaseGroups) + custom_release_groups = self.systemconfig.get(SystemConfigKey.CustomReleaseGroups) if isinstance(custom_release_groups, list): custom_release_groups = list(filter(None, custom_release_groups)) if custom_release_groups: @@ -106,9 +118,9 @@ class ReleaseGroupsMatcher(metaclass=Singleton): else: groups = self.__release_groups title = f"{title} " - groups_re = re.compile(r"(?<=[-@\[£【&])(?:(?:%s))(?=$|[@.\s\]\[】&])" % groups, re.I) + groups_re = self.__get_groups_re(groups) unique_groups = [] - for item in re.findall(groups_re, title): + for item in groups_re.findall(title): item_str = item[0] if isinstance(item, tuple) else item if item_str not in unique_groups: unique_groups.append(item_str) diff --git a/app/core/meta/words.py b/app/core/meta/words.py index eebbf8ce..fb8df04a 100644 --- a/app/core/meta/words.py +++ b/app/core/meta/words.py @@ -1,4 +1,4 @@ -from typing import List, Tuple +from typing import List, Optional, Tuple import cn2an import regex as re @@ -9,6 +9,10 @@ from app.schemas.types import SystemConfigKey from app.utils.singleton import Singleton +_COMBINED_WORD_RE = re.compile(r'^\s*(.*?)\s*=>\s*(.*?)\s*&&\s*(.*?)\s*<>\s*(.*?)\s*>>\s*(.*?)\s*$') +_LEADING_ZERO_RE = re.compile(r"^0+") + + class WordsMatcher(metaclass=Singleton): def __init__(self): @@ -28,37 +32,23 @@ class WordsMatcher(metaclass=Singleton): if not word or word.startswith("#"): continue try: - if word.count(" => ") and word.count(" && ") and word.count(" >> ") and word.count(" <> "): - # 替换词 - thc = str(re.findall(r'(.*?)\s*=>', word)[0]).strip() - # 被替换词 - bthc = str(re.findall(r'=>\s*(.*?)\s*&&', word)[0]).strip() - # 集偏移前字段 - pyq = str(re.findall(r'&&\s*(.*?)\s*<>', word)[0]).strip() - # 集偏移后字段 - pyh = str(re.findall(r'<>(.*?)\s*>>', word)[0]).strip() - # 集偏移 - offsets = str(re.findall(r'>>\s*(.*?)$', word)[0]).strip() + word_info = self.__parse_word(word) + if not word_info: + continue + word_type, params = word_info + if word_type == "replace_and_offset": + thc, bthc, pyq, pyh, offsets = params # 替换词 title, message, state = self.__replace_regex(title, thc, bthc) if state: # 替换词成功再进行集偏移 title, message, state = self.__episode_offset(title, pyq, pyh, offsets) - elif word.count(" => "): - # 替换词 - strings = word.split(" => ") - title, message, state = self.__replace_regex(title, strings[0], strings[1]) - elif word.count(" >> ") and word.count(" <> "): - # 集偏移 - strings = word.split(" <> ") - offsets = strings[1].split(" >> ") - strings[1] = offsets[0] - title, message, state = self.__episode_offset(title, strings[0], strings[1], offsets[1]) - else: - # 屏蔽词 - if not word.strip(): - continue - title, message, state = self.__replace_regex(title, word, "") + elif word_type == "replace": + title, message, state = self.__replace_regex(title, params[0], params[1]) + elif word_type == "offset": + title, message, state = self.__episode_offset(title, params[0], params[1], params[2]) + else: # block + title, message, state = self.__replace_regex(title, params[0], "") if state: appley_words.append(word) @@ -68,16 +58,37 @@ class WordsMatcher(metaclass=Singleton): return title, appley_words + @staticmethod + def __parse_word(word: str) -> Optional[Tuple[str, Tuple[str, ...]]]: + """ + 解析识别词格式。复杂识别词保留原来的字段含义,只把多次正则提取合并为一次。 + """ + if word.count(" => ") and word.count(" && ") and word.count(" >> ") and word.count(" <> "): + word_match = _COMBINED_WORD_RE.match(word) + if not word_match: + raise ValueError("复杂识别词格式不正确") + return "replace_and_offset", tuple(item.strip() for item in word_match.groups()) + if word.count(" => "): + strings = word.split(" => ") + return "replace", (strings[0], strings[1]) + if word.count(" >> ") and word.count(" <> "): + strings = word.split(" <> ") + offsets = strings[1].split(" >> ") + strings[1] = offsets[0] + return "offset", (strings[0], strings[1], offsets[1]) + if not word.strip(): + return None + return "block", (word,) + @staticmethod def __replace_regex(title: str, replaced: str, replace: str) -> Tuple[str, str, bool]: """ 正则替换 """ try: - if not re.findall(r'%s' % replaced, title): - return title, "", False - else: - return re.sub(r'%s' % replaced, r'%s' % replace, title), "", True + replaced_re = re.compile(r'%s' % replaced) + title, count = replaced_re.subn(r'%s' % replace, title) + return title, "", count > 0 except Exception as err: logger.warn(f"自定义识别词正则替换失败:{str(err)} - 标题:{title},被替换词:{replaced},替换词:{replace}") return title, str(err), False @@ -112,9 +123,9 @@ class WordsMatcher(metaclass=Singleton): if not episode_num_str.isdigit(): episode_num_offset_str = cn2an.an2cn(episode_num_offset_int, "low") else: - count_0 = re.findall(r"^0+", episode_num_str) + count_0 = _LEADING_ZERO_RE.search(episode_num_str) if count_0: - episode_num_offset_str = f"{count_0[0]}{episode_num_offset_int}" + episode_num_offset_str = f"{count_0.group(0)}{episode_num_offset_int}" else: episode_num_offset_str = str(episode_num_offset_int) episode_nums_offset_str.append(episode_num_offset_str) diff --git a/app/core/metainfo.py b/app/core/metainfo.py index 03da4236..10201648 100644 --- a/app/core/metainfo.py +++ b/app/core/metainfo.py @@ -14,6 +14,60 @@ from app.log import logger from app.schemas.types import MediaType +_ANIME_BRACKET_RE = re.compile(r'【[+0-9XVPI-]+】\s*【', re.IGNORECASE) +_ANIME_DASH_EPISODE_RE = re.compile(r'\s+-\s+[\dv]{1,4}\s+', re.IGNORECASE) +_VIDEO_SEASON_EPISODE_RE = re.compile( + r"S\d{2}\s*-\s*S\d{2}|S\d{2}|\s+S\d{1,2}|" + r"EP?\d{2,4}\s*-\s*EP?\d{2,4}|EP?\d{2,4}|\s+EP?\d{1,4}", + re.IGNORECASE, +) +_ANIME_SQUARE_BRACKET_RE = re.compile(r'\[[+0-9XVPI-]+]\s*\[', re.IGNORECASE) + +_BRACED_METAINFO_RE = re.compile(r'(?<={\[)[\W\w]+(?=]})') +_BRACED_TMDBID_RE = re.compile(r'(?<=tmdbid=)\d+') +_BRACED_DOUBANID_RE = re.compile(r'(?<=doubanid=)\d+') +_BRACED_TYPE_RE = re.compile(r'(?<=type=)\w+') +_BRACED_BEGIN_SEASON_RE = re.compile(r'(?<=s=)\d+') +_BRACED_END_SEASON_RE = re.compile(r'(?<=s=\d+-)\d+') +_BRACED_BEGIN_EPISODE_RE = re.compile(r'(?<=e=)\d+') +_BRACED_END_EPISODE_RE = re.compile(r'(?<=e=\d+-)\d+') +_EMBY_TMDB_RE_LIST = ( + re.compile(r'\[tmdbid[=\-](\d+)\]'), + re.compile(r'\[tmdb[=\-](\d+)\]'), + re.compile(r'\{tmdbid[=\-](\d+)\}'), + re.compile(r'\{tmdb[=\-](\d+)\}'), +) + + +def _empty_metainfo() -> dict: + """ + 返回媒体标签的默认结构,避免不同识别请求之间共享可变状态。 + """ + return { + 'tmdbid': None, + 'doubanid': None, + 'type': None, + 'begin_season': None, + 'end_season': None, + 'total_season': None, + 'begin_episode': None, + 'end_episode': None, + 'total_episode': None, + } + + +def _apply_range_total(metainfo: dict, begin_key: str, end_key: str, total_key: str) -> None: + """ + 计算季/集范围总数;保留原有倒序输入自动交换的兼容行为。 + """ + if metainfo.get(begin_key) and metainfo.get(end_key): + if metainfo[begin_key] > metainfo[end_key]: + metainfo[begin_key], metainfo[end_key] = metainfo[end_key], metainfo[begin_key] + metainfo[total_key] = metainfo[end_key] - metainfo[begin_key] + 1 + elif metainfo.get(begin_key) and not metainfo.get(end_key): + metainfo[total_key] = 1 + + def _build_meta_info( title: str, subtitle: Optional[str] = None, @@ -30,10 +84,11 @@ def _build_meta_info( title, metainfo = find_metainfo(title) # 判断是否处理文件 media_exts = settings.RMT_MEDIAEXT + settings.RMT_SUBEXT + settings.RMT_AUDIOEXT - if title and Path(title).suffix.lower() in media_exts: + title_path = Path(title) if title else None + if title_path and title_path.suffix.lower() in media_exts: isfile = True # 去掉后缀 - title = Path(title).stem + title = title_path.stem else: isfile = False # 识别 @@ -115,15 +170,13 @@ def is_anime(name: str) -> bool: """ if not name: return False - if re.search(r'【[+0-9XVPI-]+】\s*【', name, re.IGNORECASE): + if _ANIME_BRACKET_RE.search(name): return True - if re.search(r'\s+-\s+[\dv]{1,4}\s+', name, re.IGNORECASE): + if _ANIME_DASH_EPISODE_RE.search(name): return True - if re.search(r"S\d{2}\s*-\s*S\d{2}|S\d{2}|\s+S\d{1,2}|EP?\d{2,4}\s*-\s*EP?\d{2,4}|EP?\d{2,4}|\s+EP?\d{1,4}", - name, - re.IGNORECASE): + if _VIDEO_SEASON_EPISODE_RE.search(name): return False - if re.search(r'\[[+0-9XVPI-]+]\s*\[', name, re.IGNORECASE): + if _ANIME_SQUARE_BRACKET_RE.search(name): return True return False @@ -132,95 +185,62 @@ def find_metainfo(title: str) -> Tuple[str, dict]: """ 从标题中提取媒体信息 """ - metainfo = { - 'tmdbid': None, - 'doubanid': None, - 'type': None, - 'begin_season': None, - 'end_season': None, - 'total_season': None, - 'begin_episode': None, - 'end_episode': None, - 'total_episode': None, - } + metainfo = _empty_metainfo() if not title: return title, metainfo # 从标题中提取媒体信息 格式为{[tmdbid=xxx;type=xxx;s=xxx;e=xxx]} - results = re.findall(r'(?<={\[)[\W\w]+(?=]})', title) + results = _BRACED_METAINFO_RE.findall(title) if results: for result in results: # 查找tmdbid信息 - tmdbid = re.findall(r'(?<=tmdbid=)\d+', result) - if tmdbid and tmdbid[0].isdigit(): - metainfo['tmdbid'] = tmdbid[0] + tmdbid = _BRACED_TMDBID_RE.search(result) + if tmdbid and tmdbid.group(0).isdigit(): + metainfo['tmdbid'] = tmdbid.group(0) # 查找豆瓣id信息 - doubanid = re.findall(r'(?<=doubanid=)\d+', result) - if doubanid and doubanid[0].isdigit(): - metainfo['doubanid'] = doubanid[0] + doubanid = _BRACED_DOUBANID_RE.search(result) + if doubanid and doubanid.group(0).isdigit(): + metainfo['doubanid'] = doubanid.group(0) # 查找媒体类型 - mtype = re.findall(r'(?<=type=)\w+', result) + mtype = _BRACED_TYPE_RE.search(result) if mtype: - if mtype[0] == "movies": + media_type = mtype.group(0) + if media_type == "movies": metainfo['type'] = MediaType.MOVIE - elif mtype[0] == "tv": + elif media_type == "tv": metainfo['type'] = MediaType.TV # 查找季信息 - begin_season = re.findall(r'(?<=s=)\d+', result) - if begin_season and begin_season[0].isdigit(): - metainfo['begin_season'] = int(begin_season[0]) - end_season = re.findall(r'(?<=s=\d+-)\d+', result) - if end_season and end_season[0].isdigit(): - metainfo['end_season'] = int(end_season[0]) + begin_season = _BRACED_BEGIN_SEASON_RE.search(result) + if begin_season and begin_season.group(0).isdigit(): + metainfo['begin_season'] = int(begin_season.group(0)) + end_season = _BRACED_END_SEASON_RE.search(result) + if end_season and end_season.group(0).isdigit(): + metainfo['end_season'] = int(end_season.group(0)) # 查找集信息 - begin_episode = re.findall(r'(?<=e=)\d+', result) - if begin_episode and begin_episode[0].isdigit(): - metainfo['begin_episode'] = int(begin_episode[0]) - end_episode = re.findall(r'(?<=e=\d+-)\d+', result) - if end_episode and end_episode[0].isdigit(): - metainfo['end_episode'] = int(end_episode[0]) + begin_episode = _BRACED_BEGIN_EPISODE_RE.search(result) + if begin_episode and begin_episode.group(0).isdigit(): + metainfo['begin_episode'] = int(begin_episode.group(0)) + end_episode = _BRACED_END_EPISODE_RE.search(result) + if end_episode and end_episode.group(0).isdigit(): + metainfo['end_episode'] = int(end_episode.group(0)) # 去除title中该部分 if tmdbid or mtype or begin_season or end_season or begin_episode or end_episode: title = title.replace(f"{{[{result}]}}", '') - # 支持Emby格式的ID标签 - # 1. [tmdbid=xxxx] 或 [tmdbid-xxxx] 格式 - tmdb_match = re.search(r'\[tmdbid[=\-](\d+)\]', title) + # 支持Emby格式的ID标签;第一个 [tmdbid] 历史上始终优先处理,用于覆盖前面 {[...]} 中的旧标签。 + tmdb_match = _EMBY_TMDB_RE_LIST[0].search(title) if tmdb_match: metainfo['tmdbid'] = tmdb_match.group(1) - title = re.sub(r'\[tmdbid[=\-](\d+)\]', '', title).strip() - - # 2. [tmdb=xxxx] 或 [tmdb-xxxx] 格式 - if not metainfo['tmdbid']: - tmdb_match = re.search(r'\[tmdb[=\-](\d+)\]', title) - if tmdb_match: - metainfo['tmdbid'] = tmdb_match.group(1) - title = re.sub(r'\[tmdb[=\-](\d+)\]', '', title).strip() - - # 3. {tmdbid=xxxx} 或 {tmdbid-xxxx} 格式 - if not metainfo['tmdbid']: - tmdb_match = re.search(r'\{tmdbid[=\-](\d+)\}', title) - if tmdb_match: - metainfo['tmdbid'] = tmdb_match.group(1) - title = re.sub(r'\{tmdbid[=\-](\d+)\}', '', title).strip() - - # 4. {tmdb=xxxx} 或 {tmdb-xxxx} 格式 - if not metainfo['tmdbid']: - tmdb_match = re.search(r'\{tmdb[=\-](\d+)\}', title) - if tmdb_match: - metainfo['tmdbid'] = tmdb_match.group(1) - title = re.sub(r'\{tmdb[=\-](\d+)\}', '', title).strip() + title = _EMBY_TMDB_RE_LIST[0].sub('', title).strip() + elif not metainfo['tmdbid']: + # 保持原有优先级:[tmdbid] > [tmdb] > {tmdbid} > {tmdb} + for tmdb_re in _EMBY_TMDB_RE_LIST[1:]: + tmdb_match = tmdb_re.search(title) + if tmdb_match: + metainfo['tmdbid'] = tmdb_match.group(1) + title = tmdb_re.sub('', title).strip() + break # 计算季集总数 - if metainfo.get('begin_season') and metainfo.get('end_season'): - if metainfo['begin_season'] > metainfo['end_season']: - metainfo['begin_season'], metainfo['end_season'] = metainfo['end_season'], metainfo['begin_season'] - metainfo['total_season'] = metainfo['end_season'] - metainfo['begin_season'] + 1 - elif metainfo.get('begin_season') and not metainfo.get('end_season'): - metainfo['total_season'] = 1 - if metainfo.get('begin_episode') and metainfo.get('end_episode'): - if metainfo['begin_episode'] > metainfo['end_episode']: - metainfo['begin_episode'], metainfo['end_episode'] = metainfo['end_episode'], metainfo['begin_episode'] - metainfo['total_episode'] = metainfo['end_episode'] - metainfo['begin_episode'] + 1 - elif metainfo.get('begin_episode') and not metainfo.get('end_episode'): - metainfo['total_episode'] = 1 + _apply_range_total(metainfo, 'begin_season', 'end_season', 'total_season') + _apply_range_total(metainfo, 'begin_episode', 'end_episode', 'total_episode') return title, metainfo diff --git a/app/modules/douban/__init__.py b/app/modules/douban/__init__.py index b67eca27..986521fd 100644 --- a/app/modules/douban/__init__.py +++ b/app/modules/douban/__init__.py @@ -71,6 +71,42 @@ class DoubanModule(_ModuleBase): """ return 2 + @staticmethod + def _prepare_search_names(meta: MetaBase) -> List[str]: + """ + 准备搜索名称列表,保留中英文名称分别识别且按顺序去重的历史行为。 + """ + # 简体名称 + zh_name = zhconv.convert(meta.cn_name, "zh-hans") if meta.cn_name else None + # 使用中英文名分别识别,去重去空,但要保持顺序 + return list(dict.fromkeys([k for k in [meta.cn_name, zh_name, meta.en_name] if k])) + + @staticmethod + def _build_search_medias_result(meta: MetaBase, items: Optional[List[dict]]) -> List[MediaInfo]: + """ + 构建豆瓣搜索结果,并沿用原有的类型、标题包含和季信息处理规则。 + """ + if not items: + return [] + ret_medias = [] + for item_obj in items: + if meta.type and meta.type != MediaType.UNKNOWN and meta.type.value != item_obj.get("type_name"): + continue + if item_obj.get("type_name") not in (MediaType.TV.value, MediaType.MOVIE.value): + continue + if meta.name not in item_obj.get("target", {}).get("title"): + continue + ret_medias.append(MediaInfo(douban_info=item_obj.get("target"))) + # 将搜索词中的季写入标题中 + if ret_medias and meta.begin_season: + # 小写数据转大写 + season_str = cn2an.an2cn(meta.begin_season, "low") + for media in ret_medias: + if media.type == MediaType.TV: + media.title = f"{media.title} 第{season_str}季" + media.season = meta.begin_season + return ret_medias + def _recognize_media_core(self, meta: MetaBase = None, mtype: MediaType = None, doubanid: Optional[str] = None, @@ -107,7 +143,7 @@ class DoubanModule(_ModuleBase): meta.type = mtype if doubanid: meta.doubanid = doubanid - cache_info = self.cache.get(meta) + cache_info = self.cache.get(meta) if cache else {} cache_hit = False # 识别豆瓣信息 @@ -118,11 +154,7 @@ class DoubanModule(_ModuleBase): info = douban_info_func(doubanid=doubanid, mtype=mtype or meta.type) elif meta: info = {} - # 简体名称 - zh_name = zhconv.convert(meta.cn_name, "zh-hans") if meta.cn_name else None - # 使用中英文名分别识别,去重去空,但要保持顺序 - names = list(dict.fromkeys([k for k in [meta.cn_name, zh_name, meta.en_name] if k])) - for name in names: + for name in self._prepare_search_names(meta): if meta.begin_season: logger.info(f"正在识别 {name} 第{meta.begin_season}季 ...") else: @@ -211,7 +243,7 @@ class DoubanModule(_ModuleBase): meta.type = mtype if doubanid: meta.doubanid = doubanid - cache_info = self.cache.get(meta) + cache_info = self.cache.get(meta) if cache else {} cache_hit = False # 识别豆瓣信息 @@ -222,11 +254,7 @@ class DoubanModule(_ModuleBase): info = await async_douban_info_func(doubanid=doubanid, mtype=mtype or meta.type) elif meta: info = {} - # 简体名称 - zh_name = zhconv.convert(meta.cn_name, "zh-hans") if meta.cn_name else None - # 使用中英文名分别识别,去重去空,但要保持顺序 - names = list(dict.fromkeys([k for k in [meta.cn_name, zh_name, meta.en_name] if k])) - for name in names: + for name in self._prepare_search_names(meta): if meta.begin_season: logger.info(f"正在识别 {name} 第{meta.begin_season}季 ...") else: @@ -913,24 +941,7 @@ class DoubanModule(_ModuleBase): if not result or not result.get("items"): return [] # 返回数据 - ret_medias = [] - for item_obj in result.get("items"): - if meta.type and meta.type != MediaType.UNKNOWN and meta.type.value != item_obj.get("type_name"): - continue - if item_obj.get("type_name") not in (MediaType.TV.value, MediaType.MOVIE.value): - continue - if meta.name not in item_obj.get("target", {}).get("title"): - continue - ret_medias.append(MediaInfo(douban_info=item_obj.get("target"))) - # 将搜索词中的季写入标题中 - if ret_medias and meta.begin_season: - # 小写数据转大写 - season_str = cn2an.an2cn(meta.begin_season, "low") - for media in ret_medias: - if media.type == MediaType.TV: - media.title = f"{media.title} 第{season_str}季" - media.season = meta.begin_season - return ret_medias + return self._build_search_medias_result(meta, result.get("items")) async def async_search_medias(self, meta: MetaBase) -> Optional[List[MediaInfo]]: """ @@ -946,24 +957,7 @@ class DoubanModule(_ModuleBase): if not result or not result.get("items"): return [] # 返回数据 - ret_medias = [] - for item_obj in result.get("items"): - if meta.type and meta.type != MediaType.UNKNOWN and meta.type.value != item_obj.get("type_name"): - continue - if item_obj.get("type_name") not in (MediaType.TV.value, MediaType.MOVIE.value): - continue - if meta.name not in item_obj.get("target", {}).get("title"): - continue - ret_medias.append(MediaInfo(douban_info=item_obj.get("target"))) - # 将搜索词中的季写入标题中 - if ret_medias and meta.begin_season: - # 小写数据转大写 - season_str = cn2an.an2cn(meta.begin_season, "low") - for media in ret_medias: - if media.type == MediaType.TV: - media.title = f"{media.title} 第{season_str}季" - media.season = meta.begin_season - return ret_medias + return self._build_search_medias_result(meta, result.get("items")) def search_persons(self, name: str) -> Optional[List[MediaPerson]]: """ diff --git a/app/modules/themoviedb/__init__.py b/app/modules/themoviedb/__init__.py index f93d91b3..c099bf40 100644 --- a/app/modules/themoviedb/__init__.py +++ b/app/modules/themoviedb/__init__.py @@ -19,6 +19,8 @@ from app.schemas.types import MediaType, MediaImageType, ModuleType, MediaRecogn from app.utils.http import RequestUtils +_DATE_RE = re.compile(r"^\d{4}-\d{2}-\d{2}$") + class TheMovieDbModule(_ModuleBase): """ @@ -118,6 +120,59 @@ class TheMovieDbModule(_ModuleBase): # 使用中英文名分别识别,去重去空,但要保持顺序 return list(dict.fromkeys([k for k in [meta.cn_name, zh_name, meta.en_name] if k])) + @staticmethod + def _fill_group_season_info(mediainfo: MediaInfo, episode_group: Optional[str], + group_seasons: List[dict]) -> None: + """ + 将指定剧集组的季、集、年份信息写入 MediaInfo。 + """ + seasons = {} + season_info = [] + season_years = {} + for group_season in group_seasons: + # 季 + season = group_season.get("order") + # 集列表 + episodes = group_season.get("episodes") + if not episodes: + continue + seasons[season] = [ep.get("episode_number") for ep in episodes] + season_info.append(group_season) + # 当前季第一集时间 + first_date = episodes[0].get("air_date") + if first_date and _DATE_RE.match(first_date): + season_years[season] = str(first_date).split("-")[0] + # 每季集清单 + if seasons: + mediainfo.seasons = seasons + mediainfo.number_of_seasons = len(seasons) + # 每季集详情 + if season_info: + mediainfo.season_info = season_info + # 每季年份 + if season_years: + mediainfo.season_years = season_years + # 所有剧集组 + mediainfo.episode_group = episode_group + mediainfo.episode_groups = group_seasons + + @staticmethod + def _build_search_medias_result(meta: MetaBase, results: Optional[List[dict]]) -> List[MediaInfo]: + """ + 构建搜索结果,并沿用原有逻辑把搜索词中的季写入电视剧标题中。 + """ + if not results: + return [] + medias = [MediaInfo(tmdb_info=info) for info in results] + if meta.begin_season: + # 小写数据转大写 + season_str = cn2an.an2cn(meta.begin_season, "low") + for media in medias: + if media.type == MediaType.TV: + media.title = f"{media.title} 第{season_str}季" + media.season = meta.begin_season + return medias + def _get_info_by_tmdbid(self, tmdbid: int, mtype: Optional[MediaType], meta: Optional[MetaBase]) -> Optional[dict]: """ @@ -289,36 +344,7 @@ class TheMovieDbModule(_ModuleBase): """ if mediainfo.type == MediaType.TV and mediainfo.episode_groups: if group_seasons: - # 指定剧集组时 - seasons = {} - season_info = [] - season_years = {} - for group_season in group_seasons: - # 季 - season = group_season.get("order") - # 集列表 - episodes = group_season.get("episodes") - if not episodes: - continue - seasons[season] = [ep.get("episode_number") for ep in episodes] - season_info.append(group_season) - # 当前季第一季时间 - first_date = episodes[0].get("air_date") - if re.match(r"^\d{4}-\d{2}-\d{2}$", first_date): - season_years[season] = str(first_date).split("-")[0] - # 每季集清单 - if seasons: - mediainfo.seasons = seasons - mediainfo.number_of_seasons = len(seasons) - # 每季集详情 - if season_info: - mediainfo.season_info = season_info - # 每季年份 - if season_years: - mediainfo.season_years = season_years - # 所有剧集组 - mediainfo.episode_group = episode_group - mediainfo.episode_groups = group_seasons + self._fill_group_season_info(mediainfo, episode_group, group_seasons) else: # 每季年份 season_years = {} @@ -337,7 +363,7 @@ class TheMovieDbModule(_ModuleBase): # 当前季第一季时间 first_date = episodes[0].get("air_date") # 判断是不是日期格式 - if first_date and re.match(r"^\d{4}-\d{2}-\d{2}$", first_date): + if first_date and _DATE_RE.match(first_date): season_years[season] = str(first_date).split("-")[0] if season_years: mediainfo.season_years = season_years @@ -350,36 +376,7 @@ class TheMovieDbModule(_ModuleBase): """ if mediainfo.type == MediaType.TV and mediainfo.episode_groups: if group_seasons: - # 指定剧集组时 - seasons = {} - season_info = [] - season_years = {} - for group_season in group_seasons: - # 季 - season = group_season.get("order") - # 集列表 - episodes = group_season.get("episodes") - if not episodes: - continue - seasons[season] = [ep.get("episode_number") for ep in episodes] - season_info.append(group_season) - # 当前季第一季时间 - first_date = episodes[0].get("air_date") - if re.match(r"^\d{4}-\d{2}-\d{2}$", first_date): - season_years[season] = str(first_date).split("-")[0] - # 每季集清单 - if seasons: - mediainfo.seasons = seasons - mediainfo.number_of_seasons = len(seasons) - # 每季集详情 - if season_info: - mediainfo.season_info = season_info - # 每季年份 - if season_years: - mediainfo.season_years = season_years - # 所有剧集组 - mediainfo.episode_group = episode_group - mediainfo.episode_groups = group_seasons + self._fill_group_season_info(mediainfo, episode_group, group_seasons) else: # 每季年份 season_years = {} @@ -398,7 +395,7 @@ class TheMovieDbModule(_ModuleBase): # 当前季第一季时间 first_date = episodes[0].get("air_date") # 判断是不是日期格式 - if first_date and re.match(r"^\d{4}-\d{2}-\d{2}$", first_date): + if first_date and _DATE_RE.match(first_date): season_years[season] = str(first_date).split("-")[0] if season_years: mediainfo.season_years = season_years @@ -484,7 +481,7 @@ class TheMovieDbModule(_ModuleBase): meta.type = mtype if tmdbid: meta.tmdbid = tmdbid - cache_info = self.cache.get(meta) + cache_info = self.cache.get(meta) if cache else {} # 查询剧集组 group_seasons = [] @@ -573,7 +570,7 @@ class TheMovieDbModule(_ModuleBase): meta.type = mtype if tmdbid: meta.tmdbid = tmdbid - cache_info = self.cache.get(meta) + cache_info = self.cache.get(meta) if cache else {} # 查询剧集组 group_seasons = [] @@ -764,17 +761,7 @@ class TheMovieDbModule(_ModuleBase): else: results = self.tmdb.search_tvs(meta.name, meta.year) # 将搜索词中的季写入标题中 - if results: - medias = [MediaInfo(tmdb_info=info) for info in results] - if meta.begin_season: - # 小写数据转大写 - season_str = cn2an.an2cn(meta.begin_season, "low") - for media in medias: - if media.type == MediaType.TV: - media.title = f"{media.title} 第{season_str}季" - media.season = meta.begin_season - return medias - return [] + return self._build_search_medias_result(meta, results) def search_persons(self, name: str) -> Optional[List[schemas.MediaPerson]]: """ @@ -1206,17 +1193,7 @@ class TheMovieDbModule(_ModuleBase): else: results = await self.tmdb.async_search_tvs(meta.name, meta.year) # 将搜索词中的季写入标题中 - if results: - medias = [MediaInfo(tmdb_info=info) for info in results] - if meta.begin_season: - # 小写数据转大写 - season_str = cn2an.an2cn(meta.begin_season, "low") - for media in medias: - if media.type == MediaType.TV: - media.title = f"{media.title} 第{season_str}季" - media.season = meta.begin_season - return medias - return [] + return self._build_search_medias_result(meta, results) async def async_tmdb_discover(self, mtype: MediaType, sort_by: str, with_genres: str, diff --git a/tests/test_media_recognize_modules.py b/tests/test_media_recognize_modules.py new file mode 100644 index 00000000..74dad713 --- /dev/null +++ b/tests/test_media_recognize_modules.py @@ -0,0 +1,107 @@ +import asyncio +from unittest import TestCase +from unittest.mock import Mock + +from app.core.context import MediaInfo +from app.core.meta import MetaBase +from app.modules.douban import DoubanModule +from app.modules.themoviedb import TheMovieDbModule +from app.schemas.types import MediaType + + +class MediaRecognizeModulesTest(TestCase): + def test_tmdb_cache_false_skips_cache_lookup(self): + """cache=False 时应跳过缓存读取,但仍按正常流程查询 TMDB。""" + module = TheMovieDbModule() + meta = MetaBase("测试电影") + meta.name = "测试电影" + meta.type = MediaType.MOVIE + module.cache = Mock() + module.tmdb = Mock() + module.tmdb.get_info.return_value = { + "id": 100, + "media_type": MediaType.MOVIE, + "title": "测试电影", + "genres": [], + } + module.category = Mock() + module.category.get_movie_category.return_value = None + + result = module.recognize_media(meta=meta, tmdbid=100, cache=False) + + self.assertIsInstance(result, MediaInfo) + self.assertEqual(result.tmdb_id, 100) + module.cache.get.assert_not_called() + module.cache.update.assert_called_once() + + def test_async_tmdb_cache_false_skips_cache_lookup(self): + """异步 cache=False 时也应跳过缓存读取。""" + module = TheMovieDbModule() + meta = MetaBase("测试电影") + meta.name = "测试电影" + meta.type = MediaType.MOVIE + module.cache = Mock() + module.tmdb = Mock() + + async def _async_get_info(**kwargs): + return { + "id": 101, + "media_type": MediaType.MOVIE, + "title": "测试电影", + "genres": [], + } + + module.tmdb.async_get_info = _async_get_info + module.category = Mock() + module.category.get_movie_category.return_value = None + + result = asyncio.run(module.async_recognize_media(meta=meta, tmdbid=101, cache=False)) + + self.assertIsInstance(result, MediaInfo) + self.assertEqual(result.tmdb_id, 101) + module.cache.get.assert_not_called() + module.cache.update.assert_called_once() + + def test_douban_prepare_search_names_deduplicates_simplified_name(self): + """豆瓣候选名称应保留顺序,并去掉繁简转换后的重复项。""" + meta = MetaBase("流浪地球") + meta.cn_name = "流浪地球" + meta.en_name = "The Wandering Earth" + + self.assertEqual( + DoubanModule._prepare_search_names(meta), + ["流浪地球", "The Wandering Earth"], + ) + + def test_douban_search_result_helper_preserves_season_title_rule(self): + """豆瓣搜索结果 helper 应保留电视剧标题追加季号的旧逻辑。""" + meta = MetaBase("测试剧") + meta.name = "测试剧" + meta.type = MediaType.TV + meta.begin_season = 2 + items = [ + { + "type_name": MediaType.TV.value, + "target": { + "id": "200", + "title": "测试剧", + "type": "tv", + "year": "2024", + }, + }, + { + "type_name": MediaType.MOVIE.value, + "target": { + "id": "201", + "title": "测试剧 电影版", + "type": "movie", + "year": "2024", + }, + }, + ] + + result = DoubanModule._build_search_medias_result(meta, items) + + self.assertEqual(len(result), 1) + self.assertEqual(result[0].title, "测试剧 第二季") + self.assertEqual(result[0].season, 2) diff --git a/tests/test_metainfo.py b/tests/test_metainfo.py index 08e4854d..c77ffe4f 100644 --- a/tests/test_metainfo.py +++ b/tests/test_metainfo.py @@ -2,7 +2,7 @@ from pathlib import Path from unittest import TestCase -from app.core.metainfo import MetaInfo, MetaInfoPath +from app.core.metainfo import MetaInfo, MetaInfoPath, find_metainfo from tests.cases.meta import meta_cases @@ -124,6 +124,22 @@ class MetaInfoTest(TestCase): self.assertEqual(meta.name, "电影名称") self.assertEqual(meta.original_name, "电影测试替换名称") + def test_custom_words_replace_then_episode_offset(self): + """测试复杂识别词仍按先替换、后集数偏移的顺序处理""" + custom_words = ["旧名 => 新名 && 第 <> 集 >> EP+1"] + meta = MetaInfo(title="旧名 第03集", custom_words=custom_words) + self.assertEqual(meta.name, "新名") + self.assertEqual(meta.episode, "E04") + self.assertEqual(meta.apply_words, custom_words) + + def test_emby_tmdbid_overrides_braced_metainfo_tmdbid(self): + """ + 同时存在内嵌元信息和 Emby [tmdbid] 标签时,保持历史上的 [tmdbid] 优先级。 + """ + title, metainfo = find_metainfo("Movie {[tmdbid=111;type=movies]} [tmdbid=222]") + self.assertEqual(metainfo["tmdbid"], "222") + self.assertNotIn("[tmdbid=222]", title) + def test_metainfopath_auxiliary_chinese_stem_uses_parent_title(self): """ 文件名为简英双语/特效等压制标签、父目录为拉丁片名时,应合并父目录标题与年份。