diff --git a/app/chain/transfer.py b/app/chain/transfer.py index 5a7b40ba..4d4602b9 100755 --- a/app/chain/transfer.py +++ b/app/chain/transfer.py @@ -2003,6 +2003,143 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): return None + @staticmethod + def __optional_attr_equal( + source: MetaBase, + target: MetaBase, + attr: str, + normalizer: Callable = None, + ) -> bool: + """ + 比较可选识别字段。 + + 字段两边都没有识别到时不参与判断;只要任意一边识别到了,就要求两边值一致, + 避免把同名不同年份或不同季集的附加文件误归到当前主视频。 + """ + source_value = getattr(source, attr, None) + target_value = getattr(target, attr, None) + if source_value is None and target_value is None: + return True + if source_value is None or target_value is None: + return False + if normalizer: + source_value = normalizer(source_value) + target_value = normalizer(target_value) + return source_value == target_value + + def __is_same_media_meta( + self, source_meta: MetaBase, target_meta: MetaBase + ) -> bool: + """ + 判断两个文件识别出的媒体身份是否一致。 + """ + if not source_meta or not target_meta: + return False + if source_meta.type != target_meta.type: + return False + if StringUtils.clear_upper(source_meta.name) != StringUtils.clear_upper( + target_meta.name + ): + return False + if not self.__optional_attr_equal(source_meta, target_meta, "year", str): + return False + for attr in ( + "begin_season", + "end_season", + "begin_episode", + "end_episode", + ): + if not self.__optional_attr_equal(source_meta, target_meta, attr, int): + return False + return True + + def __get_sync_extra_fileitems( + self, + main_fileitem: FileItem, + main_meta: MetaBase, + meta_factory: Callable[[Path], Optional[MetaBase]], + predicate: Optional[Callable[[FileItem, bool], bool]] = None, + extra_cache: Optional[Dict[Tuple[str, str], List[FileItem]]] = None, + ) -> List[Tuple[FileItem, bool]]: + """ + 获取与当前主视频识别信息一致的同目录附加文件。 + """ + if ( + not main_fileitem + or main_fileitem.type != "file" + or not self.__is_media_file(main_fileitem) + or not main_meta + ): + return [] + + parent_key = self.__get_file_parent_key(main_fileitem) + if extra_cache is not None and parent_key in extra_cache: + extra_candidates = extra_cache[parent_key] + else: + storagechain = StorageChain() + parent_item = storagechain.get_parent_item(main_fileitem) + if not parent_item: + logger.debug(f"{main_fileitem.path} 未找到父目录,跳过同步整理附加文件") + return [] + + parent_key = self.__get_dir_key(parent_item) + extra_candidates: List[FileItem] = [] + for item in storagechain.list_files(parent_item, recursion=False) or []: + if ( + not item + or item.type != "file" + or not ( + self.__is_subtitle_file(item) + or self.__is_audio_file(item) + ) + ): + continue + if predicate and not predicate(item, False): + continue + + extra_candidates.append(item) + + if extra_cache is not None: + extra_cache[parent_key] = extra_candidates + + extra_fileitems: List[Tuple[FileItem, bool]] = [] + for item in extra_candidates: + if item.path == main_fileitem.path: + continue + extra_meta = meta_factory(Path(item.path)) + # 不能直接按文件名判断归属,必须基于解析后的媒体身份和季集信息。 + if self.__is_same_media_meta(main_meta, extra_meta): + extra_fileitems.append((item, False)) + + if extra_fileitems: + logger.info( + f"{main_fileitem.path} 同步匹配到 {len(extra_fileitems)} 个附加文件" + ) + return extra_fileitems + + @staticmethod + def __normalize_dir_path(dir_path: Union[str, Path]) -> str: + """ + 归一化目录路径,用于同一父目录候选缓存。 + """ + normalized = Path(dir_path).as_posix().rstrip("/") + return normalized or "/" + + def __get_dir_key(self, dir_item: FileItem) -> Tuple[str, str]: + """ + 获取目录缓存键。 + """ + return dir_item.storage, self.__normalize_dir_path(dir_item.path) + + def __get_file_parent_key(self, current_item: FileItem) -> Tuple[str, str]: + """ + 获取文件父目录缓存键。 + """ + return ( + current_item.storage, + self.__normalize_dir_path(Path(current_item.path).parent), + ) + def do_transfer( self, fileitem: FileItem, @@ -2024,6 +2161,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): background: Optional[bool] = True, manual: Optional[bool] = False, preview: Optional[bool] = False, + sync_extra_files: Optional[bool] = False, continue_callback: Callable = None, ) -> Tuple[bool, Union[str, dict]]: """ @@ -2047,6 +2185,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): :param background: 是否后台运行 :param manual: 是否手动整理 :param preview: 是否仅预览 + :param sync_extra_files: 是否在整理主视频文件时同步整理同媒体附加文件 :param continue_callback: 继续处理回调 返回:成功标识,错误信息 """ @@ -2076,6 +2215,77 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): # 汇总错误信息 err_msgs: List[str] = [] + def _get_subscribe_custom_words( + history_record: Optional[DownloadHistory], + ) -> Optional[List[str]]: + """ + 根据下载记录获取订阅自定义识别词。 + """ + if not history_record or not isinstance(history_record.note, dict): + return None + # 使用source动态获取订阅 + subscribe = SubscribeChain().get_subscribe_by_source( + history_record.note.get("source") + ) + return ( + subscribe.custom_words.split("\n") + if subscribe and subscribe.custom_words + else None + ) + + def _build_file_meta( + source_path: Path, + custom_word_list: Optional[List[str]] = None, + ) -> Optional[MetaBase]: + """ + 构建整理任务使用的文件元数据,并应用手动季集/自定义格式覆盖。 + """ + built_meta = deepcopy(meta) if meta else _build_path_meta( + source_path, custom_word_list=custom_word_list + ) + if not built_meta: + return None + return _apply_meta_overrides(built_meta, source_path) + + def _build_path_meta( + source_path: Path, + custom_word_list: Optional[List[str]] = None, + ) -> Optional[MetaBase]: + """ + 从文件路径识别媒体信息,用于判断附加文件是否属于当前主视频。 + """ + path_meta = MetaInfoPath( + source_path, custom_words=custom_word_list + ) + if not path_meta: + return None + return _apply_meta_overrides(path_meta, source_path) + + def _apply_meta_overrides( + current_meta: MetaBase, source_path: Path + ) -> Optional[MetaBase]: + """ + 应用手动传入的季集覆盖和自定义识别格式。 + """ + # 合并季 + if season is not None: + current_meta.begin_season = season + + # 自定义识别 + if formaterHandler: + # 开始集、结束集、PART + begin_ep, end_ep, part = formaterHandler.split_episode( + file_name=source_path.name, file_meta=current_meta + ) + if begin_ep is not None: + current_meta.begin_episode = begin_ep + if part is not None: + current_meta.part = part + if end_ep is not None: + current_meta.end_episode = end_ep + + return current_meta + def _filter(item: FileItem, is_bluray_dir: bool) -> bool: """ 过滤文件项 @@ -2123,6 +2333,98 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): logger.warn(f"{fileitem.path} 没有找到可整理的媒体文件") return False, f"{fileitem.name} 没有找到可整理的媒体文件" + if sync_extra_files: + # 单文件和目录整理都按“主视频 -> 同媒体附加文件”补齐;目录场景会逐个视频处理。 + extra_file_cache: Dict[Tuple[str, str], List[FileItem]] = {} + main_file_items: List[Tuple[FileItem, bool]] = [] + for candidate_item, candidate_bluray_dir in file_items: + if not candidate_item: + continue + if candidate_bluray_dir or self.__is_media_file(candidate_item): + main_file_items.append((candidate_item, candidate_bluray_dir)) + continue + if ( + candidate_item.type == "file" + and ( + self.__is_subtitle_file(candidate_item) + or self.__is_audio_file(candidate_item) + ) + ): + # 目录递归阶段已拿到附加文件时,直接填入父目录缓存,避免后续重复列目录。 + extra_file_cache.setdefault( + self.__get_file_parent_key(candidate_item), [] + ).append(candidate_item) + + if main_file_items: + file_items = list(main_file_items) + seen_file_keys = { + (item.storage, item.path) + for item, _ in file_items + if item and item.path + } + downloadhis = DownloadHistoryOper() + extra_meta_cache: Dict[ + Tuple[str, Tuple[str, ...]], Optional[MetaBase] + ] = {} + + def _get_cached_extra_meta( + extra_path: Path, custom_words_key: Tuple[str, ...] + ) -> Optional[MetaBase]: + """ + 同一个父目录下的附加文件只解析一次,多个主视频只做内存匹配。 + """ + cache_key = (extra_path.as_posix(), custom_words_key) + if cache_key not in extra_meta_cache: + extra_meta_cache[cache_key] = _build_path_meta( + extra_path, + custom_word_list=list(custom_words_key) or None, + ) + return extra_meta_cache[cache_key] + + def _build_extra_meta_factory( + custom_word_list: Optional[List[str]], + ) -> Callable[[Path], Optional[MetaBase]]: + """ + 将可变识别词列表转成不可变缓存键,避免闭包默认参数持有可变对象。 + """ + custom_words_key = tuple(custom_word_list or []) + + def _extra_meta_factory(extra_path: Path) -> Optional[MetaBase]: + return _get_cached_extra_meta(extra_path, custom_words_key) + + return _extra_meta_factory + + for main_item, main_bluray_dir in list(main_file_items): + if main_bluray_dir or not self.__is_media_file(main_item): + continue + + main_path = Path(main_item.path) + main_download_history = self._resolve_download_history( + downloadhis=downloadhis, + file_path=main_path, + bluray_dir=main_bluray_dir, + download_hash=download_hash, + ) + subscribe_custom_words = _get_subscribe_custom_words( + main_download_history + ) + main_meta = _build_file_meta( + main_path, custom_word_list=subscribe_custom_words + ) + extra_items = self.__get_sync_extra_fileitems( + main_fileitem=main_item, + main_meta=main_meta, + meta_factory=_build_extra_meta_factory(subscribe_custom_words), + predicate=_filter, + extra_cache=extra_file_cache, + ) + for extra_item, extra_bluray_dir in extra_items: + extra_key = (extra_item.storage, extra_item.path) + if extra_key in seen_file_keys: + continue + file_items.append((extra_item, extra_bluray_dir)) + seen_file_keys.add(extra_key) + planned_file_count = len(file_items) if preview: logger.info(f"正在预览 {planned_file_count} 个文件的整理路径...") @@ -2172,27 +2474,13 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): ) if not meta: - subscribe_custom_words = None - if download_history and isinstance(download_history.note, dict): - # 使用source动态获取订阅 - subscribe = SubscribeChain().get_subscribe_by_source( - download_history.note.get("source") - ) - subscribe_custom_words = ( - subscribe.custom_words.split("\n") - if subscribe and subscribe.custom_words - else None - ) # 文件元数据(优先使用订阅识别词) - file_meta = MetaInfoPath( - file_path, custom_words=subscribe_custom_words + file_meta = _build_file_meta( + file_path, + custom_word_list=_get_subscribe_custom_words(download_history), ) else: - file_meta = meta - - # 合并季 - if season is not None: - file_meta.begin_season = season + file_meta = _build_file_meta(file_path) if not file_meta: all_success = False @@ -2200,19 +2488,6 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): err_msgs.append(f"{file_path.name} 无法识别有效信息") continue - # 自定义识别 - if formaterHandler: - # 开始集、结束集、PART - begin_ep, end_ep, part = formaterHandler.split_episode( - file_name=file_path.name, file_meta=file_meta - ) - if begin_ep is not None: - file_meta.begin_episode = begin_ep - if part is not None: - file_meta.part = part - if end_ep is not None: - file_meta.end_episode = end_ep - # 获取下载Hash if download_history and (not downloader or not download_hash): _downloader = download_history.downloader diff --git a/tests/test_transfer_job_manager.py b/tests/test_transfer_job_manager.py index fb05a5bc..de6610d0 100644 --- a/tests/test_transfer_job_manager.py +++ b/tests/test_transfer_job_manager.py @@ -101,6 +101,22 @@ def make_transfer_chain() -> TransferChain: return chain +def make_fileitem(path: str, size: int = 1024) -> FileItem: + file_path = path + name = file_path.rsplit("/", 1)[-1] + suffix = name.rsplit(".", 1)[-1] if "." in name else "" + basename = name[: -(len(suffix) + 1)] if suffix else name + return FileItem( + storage="local", + path=file_path, + type="file", + name=name, + basename=basename, + extension=suffix, + size=size, + ) + + def migrate_to_media_job(jobview: JobManager, task: TransferTask): task.mediainfo = FakeMedia() jobview.migrate_task(task) @@ -345,6 +361,177 @@ class TransferJobManagerTest(unittest.TestCase): self.assertEqual([("abc123", "qbittorrent")], completed) self.assertEqual([], chain.jobview.list_jobs()) + def test_do_transfer_does_not_sync_extra_files_by_default(self): + chain = make_transfer_chain() + planned = [] + main_fileitem = make_fileitem( + "/downloads/Test Show (2026)/Test.Show.S01E01.2026.mkv" + ) + subtitle_fileitem = make_fileitem( + "/downloads/Test Show (2026)/Test.Show.S01E01.2026.zh-cn.srt" + ) + + chain._TransferChain__get_trans_fileitems = lambda fileitem, predicate: [ + (main_fileitem, False) + ] + chain._TransferChain__put_to_jobview = lambda task: True + chain._TransferChain__register_scrape_batch_task = lambda task: None + chain._TransferChain__close_scrape_batch = lambda batch_id: None + + def fake_handle_transfer(task, callback=None): + planned.append(task.fileitem.path) + return True, "" + + chain._TransferChain__handle_transfer = fake_handle_transfer + transfer_history_oper = SimpleNamespace(get_by_src=lambda src, storage=None: None) + download_history_oper = SimpleNamespace( + get_by_hash=lambda download_hash: None, + get_file_by_fullpath=lambda fullpath: None, + get_files_by_savepath=lambda savepath: [], + get_by_path=lambda path: None, + ) + system_config_oper = SimpleNamespace(get=lambda key: None) + storage_chain = SimpleNamespace( + get_parent_item=lambda fileitem: FileItem( + storage="local", + path="/downloads/Test Show (2026)/", + type="dir", + name="Test Show (2026)", + ), + list_files=lambda fileitem, recursion=False: [ + main_fileitem, + subtitle_fileitem, + ], + ) + + with patch( + "app.chain.transfer.TransferHistoryOper", + return_value=transfer_history_oper, + ), patch( + "app.chain.transfer.DownloadHistoryOper", + return_value=download_history_oper, + ), patch( + "app.chain.transfer.SystemConfigOper", + return_value=system_config_oper, + ), patch( + "app.chain.transfer.StorageChain", + return_value=storage_chain, + ): + state, errmsg = TransferChain.do_transfer( + chain, + fileitem=main_fileitem, + background=False, + ) + + self.assertTrue(state) + self.assertEqual("", errmsg) + self.assertEqual([main_fileitem.path], planned) + + def test_do_transfer_syncs_matching_extra_files_for_each_main_video(self): + chain = make_transfer_chain() + planned = [] + main_ep1_fileitem = make_fileitem( + "/downloads/Test Show (2026)/Test.Show.S01E01.2026.mkv" + ) + main_ep2_fileitem = make_fileitem( + "/downloads/Test Show (2026)/Test.Show.S01E02.2026.mkv" + ) + ep1_subtitle_fileitem = make_fileitem( + "/downloads/Test Show (2026)/Test.Show.S01E01.2026.zh-cn.srt" + ) + ep1_audio_fileitem = make_fileitem( + "/downloads/Test Show (2026)/Test.Show.S01E01.2026.commentary.mka" + ) + ep2_subtitle_fileitem = make_fileitem( + "/downloads/Test Show (2026)/Test.Show.S01E02.2026.zh-cn.srt" + ) + other_title_fileitem = make_fileitem( + "/downloads/Test Show (2026)/Other.Show.S01E01.2026.zh-cn.srt" + ) + parent_fileitem = FileItem( + storage="local", + path="/downloads/Test Show (2026)/", + type="dir", + name="Test Show (2026)", + ) + + chain._TransferChain__get_trans_fileitems = lambda fileitem, predicate: [ + (main_ep1_fileitem, False), + (main_ep2_fileitem, False), + (ep1_subtitle_fileitem, False), + (ep1_audio_fileitem, False), + (ep2_subtitle_fileitem, False), + (other_title_fileitem, False), + ] + chain._TransferChain__put_to_jobview = lambda task: True + chain._TransferChain__register_scrape_batch_task = lambda task: None + chain._TransferChain__close_scrape_batch = lambda batch_id: None + + def fake_handle_transfer(task, callback=None): + planned.append((task.fileitem.path, task.meta.begin_episode)) + return True, "" + + chain._TransferChain__handle_transfer = fake_handle_transfer + transfer_history_oper = SimpleNamespace(get_by_src=lambda src, storage=None: None) + download_history_oper = SimpleNamespace( + get_by_hash=lambda download_hash: None, + get_file_by_fullpath=lambda fullpath: None, + get_files_by_savepath=lambda savepath: [], + get_by_path=lambda path: None, + ) + system_config_oper = SimpleNamespace(get=lambda key: None) + list_files_calls = [] + + def fake_list_files(fileitem, recursion=False): + list_files_calls.append((fileitem.path, recursion)) + return [ + main_ep1_fileitem, + main_ep2_fileitem, + ep1_subtitle_fileitem, + ep1_audio_fileitem, + ep2_subtitle_fileitem, + other_title_fileitem, + ] + + storage_chain = SimpleNamespace( + get_parent_item=lambda fileitem: parent_fileitem, + list_files=fake_list_files, + ) + + with patch( + "app.chain.transfer.TransferHistoryOper", + return_value=transfer_history_oper, + ), patch( + "app.chain.transfer.DownloadHistoryOper", + return_value=download_history_oper, + ), patch( + "app.chain.transfer.SystemConfigOper", + return_value=system_config_oper, + ), patch( + "app.chain.transfer.StorageChain", + return_value=storage_chain, + ): + state, errmsg = TransferChain.do_transfer( + chain, + fileitem=parent_fileitem, + background=False, + sync_extra_files=True, + ) + + self.assertTrue(state) + self.assertEqual("", errmsg) + self.assertEqual( + [ + (main_ep1_fileitem.path, 1), + (main_ep2_fileitem.path, 2), + (ep1_subtitle_fileitem.path, 1), + (ep1_audio_fileitem.path, 1), + (ep2_subtitle_fileitem.path, 2), + ], + planned, + ) + self.assertEqual([], list_files_calls) + def test_scrape_event_is_aggregated_by_transfer_batch_across_seasons(self): chain = make_transfer_chain() chain.eventmanager = MagicMock()