From f4423e121efdbfc80f48d8cdb64afb850ea15b69 Mon Sep 17 00:00:00 2001 From: jxxghp Date: Thu, 14 May 2026 07:38:06 +0800 Subject: [PATCH] fix: aggregate metadata scrape events --- app/chain/media.py | 11 +- app/chain/transfer.py | 210 ++++++++++++++++++++++++++--- app/schemas/transfer.py | 1 + tests/test_transfer_job_manager.py | 176 ++++++++++++++++++++++-- 4 files changed, 363 insertions(+), 35 deletions(-) diff --git a/app/chain/media.py b/app/chain/media.py index b04fd848..2818e12d 100644 --- a/app/chain/media.py +++ b/app/chain/media.py @@ -750,7 +750,7 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): # 媒体根目录 fileitem: FileItem = event_data.get("fileitem") # 媒体文件列表 - file_list: List[str] = event_data.get("file_list", []) + file_list: List[str] = list(dict.fromkeys(event_data.get("file_list") or [])) # 媒体元数据 meta: MetaBase = event_data.get("meta") # 媒体信息 @@ -793,7 +793,7 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): ) else: # 1. 收集fileitem和file_list中每个文件之间所有子目录 - all_dirs = set() + all_dirs: set[Path] = set() root_path = Path(fileitem.path) logger.debug(f"开始收集目录,根目录:{root_path}") @@ -815,7 +815,10 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): logger.debug(f"共收集到 {len(all_dirs)} 个目录") # 2. 初始化一遍子目录,但不处理文件 - for sub_dir in all_dirs: + for sub_dir in sorted( + all_dirs, + key=lambda item: (len(item.parts), item.as_posix()), + ): sub_dir_item = self.storagechain.get_file_item( storage=fileitem.storage, path=sub_dir ) @@ -834,7 +837,7 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): # 3. 刮削每个文件 logger.info(f"开始刮削 {len(file_list)} 个文件") - for sub_file_path in file_list: + for sub_file_path in sorted(file_list): sub_file_item = self.storagechain.get_file_item( storage=fileitem.storage, path=Path(sub_file_path) ) diff --git a/app/chain/transfer.py b/app/chain/transfer.py index 849f77a8..f3123695 100755 --- a/app/chain/transfer.py +++ b/app/chain/transfer.py @@ -3,9 +3,10 @@ import queue import re import threading import traceback +import uuid from copy import deepcopy from pathlib import Path -from typing import List, Optional, Tuple, Union, Dict, Callable +from typing import List, Optional, Tuple, Union, Dict, Callable, Any from app import schemas from app.agent import ReplyMode, prompt_manager, agent_manager @@ -730,6 +731,8 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): self.retry_scheduler = FailedRetryScheduler() # 转移成功的文件清单 self._success_target_files: Dict[str, List[str]] = {} + # 批次级刮削缓冲,避免同一批多文件入库重复触发目录刮削 + self._scrape_batches: Dict[str, Dict[str, Any]] = {} # 整理进度进度 self._progress = ProgressHelper(ProgressKey.FileTransfer) # 队列相关状态 @@ -796,7 +799,10 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): return StorageChain().is_bluray_folder(fileitem) if not fileitem.extension: return False - return True if f".{fileitem.extension.lower()}" in self._media_exts else False + media_exts = ( + self._media_exts if hasattr(self, "_media_exts") else settings.RMT_MEDIAEXT + ) + return True if f".{fileitem.extension.lower()}" in media_exts else False def __is_allowed_file(self, fileitem: FileItem) -> bool: """ @@ -831,7 +837,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): def __notify(): """ - 完成时发送消息、刮削事件、移除任务等 + 完成时发送消息、移除任务等 """ # 更新文件数量 transferinfo.file_count = ( @@ -842,12 +848,6 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): self.jobview.size(task.mediainfo, task.meta.begin_season) or task.fileitem.size ) - # 更新文件清单 - with job_lock: - transferinfo.file_list_new = self._success_target_files.pop( - transferinfo.target_diritem.path, [] - ) - # 发送通知,实时手动整理时不发 if transferinfo.need_notify and (task.background or not task.manual): se_str = None @@ -868,19 +868,6 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): username=task.username, ) - # 刮削事件 - if transferinfo.need_scrape and self.__is_media_file(task.fileitem): - self.eventmanager.send_event( - EventType.MetadataScrape, - { - "meta": task.meta, - "mediainfo": task.mediainfo, - "fileitem": transferinfo.target_diritem, - "file_list": transferinfo.file_list_new, - "overwrite": False, - }, - ) - transferhis = TransferHistoryOper() # 转移失败 @@ -1070,9 +1057,19 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): # 设置任务成功 self.jobview.finish_task(task) + # 登记批次级刮削目标 + self.__record_scrape_target(task, transferinfo) + # 全部整理完成且有成功的任务时,发送消息和事件 if self.jobview.is_finished(task): + # 更新文件清单 + with job_lock: + transferinfo.file_list_new = self._success_target_files.pop( + transferinfo.target_diritem.path, [] + ) __notify() + if not task.transfer_batch_id: + self.__send_metadata_scrape_event(task, transferinfo) # 只要该种子的所有任务都已整理完成,则设置种子状态为已整理 self.__mark_torrent_completed_if_done(task.download_hash, task.downloader) @@ -1122,6 +1119,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): # 维护整理任务视图,如果任务已存在则不添加到队列 if not self.__put_to_jobview(task): return False + self.__register_scrape_batch_task(task) # 添加到队列 self._queue.put(TransferQueue(task=task, callback=self.__default_callback)) return True @@ -1149,6 +1147,168 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): ): self.transfer_completed(hashs=download_hash, downloader=downloader) + def __send_metadata_scrape_event( + self, task: TransferTask, transferinfo: TransferInfo + ): + """ + 发送元数据刮削事件,保持对外事件载荷兼容。 + """ + if ( + not task + or not transferinfo + or not transferinfo.need_scrape + or not transferinfo.target_diritem + or not self.__is_media_file(task.fileitem) + ): + return + + self.eventmanager.send_event( + EventType.MetadataScrape, + { + "meta": task.meta, + "mediainfo": task.mediainfo, + "fileitem": transferinfo.target_diritem, + "file_list": transferinfo.file_list_new, + "overwrite": False, + }, + ) + + def __register_scrape_batch_task(self, task: TransferTask): + """ + 登记批次任务。刮削事件只在批次关闭且任务全部完成后统一发送。 + """ + if not task or not task.transfer_batch_id: + return + if not hasattr(self, "_scrape_batches"): + self._scrape_batches = {} + with job_lock: + batch = self._scrape_batches.setdefault( + task.transfer_batch_id, + { + "pending": set(), + "targets": {}, + "closed": False, + }, + ) + batch["pending"].add(task.fileitem.path) + + def __close_scrape_batch(self, batch_id: Optional[str]): + """ + 标记批次不再接收新任务,并尝试发送已聚合的刮削事件。 + """ + if not batch_id: + return + if not hasattr(self, "_scrape_batches"): + self._scrape_batches = {} + with job_lock: + batch = self._scrape_batches.setdefault( + batch_id, + { + "pending": set(), + "targets": {}, + "closed": False, + }, + ) + batch["closed"] = True + self.__flush_scrape_batch_if_ready(batch_id) + + def __record_scrape_target(self, task: TransferTask, transferinfo: TransferInfo): + """ + 记录批次内需要刮削的目标文件,按目标媒体根目录聚合。 + """ + if ( + not task + or not task.transfer_batch_id + or not transferinfo + or not transferinfo.need_scrape + or not transferinfo.target_diritem + or not self.__is_media_file(task.fileitem) + ): + return + + if not hasattr(self, "_scrape_batches"): + self._scrape_batches = {} + target_diritem = transferinfo.target_diritem + target_files = transferinfo.file_list_new or [] + target_key = (target_diritem.storage, target_diritem.path) + with job_lock: + batch = self._scrape_batches.setdefault( + task.transfer_batch_id, + { + "pending": set(), + "targets": {}, + "closed": False, + }, + ) + target = batch["targets"].setdefault( + target_key, + { + "fileitem": target_diritem, + "meta": task.meta, + "mediainfo": task.mediainfo, + "files": [], + "overwrite": False, + }, + ) + if not target.get("meta"): + target["meta"] = task.meta + if not target.get("mediainfo"): + target["mediainfo"] = task.mediainfo + for target_file in target_files: + if target_file and target_file not in target["files"]: + target["files"].append(target_file) + + def __finish_scrape_batch_task(self, task: TransferTask): + """ + 标记批次内单个任务已结束。 + """ + if not task or not task.transfer_batch_id: + return + if not hasattr(self, "_scrape_batches"): + self._scrape_batches = {} + with job_lock: + batch = self._scrape_batches.get(task.transfer_batch_id) + if not batch: + return + batch["pending"].discard(task.fileitem.path) + self.__flush_scrape_batch_if_ready(task.transfer_batch_id) + + def __flush_scrape_batch_if_ready(self, batch_id: Optional[str]): + """ + 批次任务全部结束后发送聚合后的刮削事件。 + """ + if not batch_id: + return + if not hasattr(self, "_scrape_batches"): + self._scrape_batches = {} + + with job_lock: + batch = self._scrape_batches.get(batch_id) + if ( + not batch + or not batch.get("closed") + or batch.get("pending") + ): + return + targets = list(batch.get("targets", {}).values()) + self._scrape_batches.pop(batch_id, None) + + for target in targets: + fileitem = target.get("fileitem") + if not fileitem: + continue + file_list = list(dict.fromkeys(target.get("files") or [])) + self.eventmanager.send_event( + EventType.MetadataScrape, + { + "meta": target.get("meta"), + "mediainfo": target.get("mediainfo"), + "fileitem": fileitem, + "file_list": file_list, + "overwrite": target.get("overwrite", False), + }, + ) + def remove_from_queue(self, fileitem: FileItem): """ 从待整理队列移除 @@ -1163,6 +1323,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): """ self.jobview.fail_unfinished_task(task) self.jobview.try_remove_job(task) + self.__finish_scrape_batch_task(task) def __start_transfer(self): """ @@ -1487,6 +1648,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): finally: # 移除已完成的任务 self.jobview.try_remove_job(task) + self.__finish_scrape_batch_task(task) def get_queue_tasks(self) -> List[TransferJob]: """ @@ -1903,6 +2065,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): """ # 是否全部成功 all_success = True + transfer_batch_id = str(uuid.uuid4()) if preview: # 预览模式始终同步执行,避免进入异步队列 background = False @@ -2086,6 +2249,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): downloader=_downloader, download_hash=_download_hash, download_history=download_history, + transfer_batch_id=transfer_batch_id, manual=manual, background=background, preview=preview, @@ -2098,6 +2262,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): else: # 加入列表 if self.__put_to_jobview(transfer_task): + self.__register_scrape_batch_task(transfer_task) transfer_tasks.append(transfer_task) else: logger.debug(f"{file_path.name} 已在整理列表中,跳过") @@ -2106,6 +2271,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): finally: file_items.clear() del file_items + self.__close_scrape_batch(transfer_batch_id) # 实时整理 preview_items: List[dict] = [] diff --git a/app/schemas/transfer.py b/app/schemas/transfer.py index e949733e..f4c60b56 100644 --- a/app/schemas/transfer.py +++ b/app/schemas/transfer.py @@ -66,6 +66,7 @@ class TransferTask(BaseModel): downloader: Optional[str] = None download_hash: Optional[str] = None download_history: Optional[DownloadHistory] = None + transfer_batch_id: Optional[str] = None manual: Optional[bool] = False background: Optional[bool] = True preview: Optional[bool] = False diff --git a/tests/test_transfer_job_manager.py b/tests/test_transfer_job_manager.py index 8b8553d7..e032936e 100644 --- a/tests/test_transfer_job_manager.py +++ b/tests/test_transfer_job_manager.py @@ -1,19 +1,19 @@ import unittest from types import SimpleNamespace -from unittest.mock import patch +from unittest.mock import patch, MagicMock from app.chain.transfer import JobManager, TransferChain -from app.schemas import FileItem, TransferTask -from app.schemas.types import MediaType +from app.schemas import FileItem, TransferInfo, TransferTask +from app.schemas.types import EventType, MediaType class FakeMeta: - def __init__(self, episode: int): + def __init__(self, episode: int, season: int = 1): self.name = "Test Show" - self.title = f"Test Show S01E{episode:02d}" + self.title = f"Test Show S{season:02d}E{episode:02d}" self.year = "2026" self.type = MediaType.TV - self.begin_season = 1 + self.begin_season = season self.end_season = None self.total_season = 1 self.begin_episode = episode @@ -25,7 +25,7 @@ class FakeMeta: @property def season(self): - return "S01" + return f"S{self.begin_season:02d}" @property def episode(self): @@ -53,6 +53,8 @@ class FakeMedia: def __init__(self, tmdb_id: int = 12345): self.tmdb_id = tmdb_id self.douban_id = None + self.type = MediaType.TV + self.title_year = "Test Show (2026)" def clear(self): pass @@ -68,8 +70,8 @@ class FakeMedia: } -def make_task(episode: int) -> TransferTask: - name = f"Test.Show.S01E{episode:02d}.mkv" +def make_task(episode: int, season: int = 1) -> TransferTask: + name = f"Test.Show.S{season:02d}E{episode:02d}.mkv" return TransferTask( fileitem=FileItem( storage="local", @@ -299,6 +301,7 @@ class TransferJobManagerTest(unittest.TestCase): chain = object.__new__(TransferChain) chain.jobview = JobManager() chain.post_message = lambda *_args, **_kwargs: None + chain._scrape_batches = {} completed = [] def fake_transfer_completed(hashs, downloader): @@ -332,6 +335,161 @@ class TransferJobManagerTest(unittest.TestCase): self.assertEqual([("abc123", "qbittorrent")], completed) self.assertEqual([], chain.jobview.list_jobs()) + def test_scrape_event_is_aggregated_by_transfer_batch_across_seasons(self): + chain = object.__new__(TransferChain) + chain.jobview = JobManager() + chain._success_target_files = {} + chain._scrape_batches = {} + chain.eventmanager = MagicMock() + chain.transfer_completed = lambda *args, **kwargs: None + + tasks = [make_task(1, season=1), make_task(1, season=2)] + target_diritem = FileItem( + storage="local", + path="/library/Test Show (2026)", + type="dir", + name="Test Show (2026)", + ) + batch_id = "batch-tv-multi-season" + + for task in tasks: + task.mediainfo = FakeMedia() + task.transfer_batch_id = batch_id + task.background = False + task.manual = True + self.assertTrue(chain._TransferChain__put_to_jobview(task)) + chain._TransferChain__register_scrape_batch_task(task) + + chain._TransferChain__close_scrape_batch(batch_id) + + transferinfos = [ + TransferInfo( + success=True, + fileitem=tasks[0].fileitem, + target_diritem=target_diritem, + target_item=FileItem( + storage="local", + path="/library/Test Show (2026)/Season 1/Test.Show.S01E01.mkv", + type="file", + name="Test.Show.S01E01.mkv", + extension="mkv", + ), + file_list_new=[ + "/library/Test Show (2026)/Season 1/Test.Show.S01E01.mkv" + ], + transfer_type="copy", + need_scrape=True, + need_notify=False, + ), + TransferInfo( + success=True, + fileitem=tasks[1].fileitem, + target_diritem=target_diritem, + target_item=FileItem( + storage="local", + path="/library/Test Show (2026)/Season 2/Test.Show.S02E01.mkv", + type="file", + name="Test.Show.S02E01.mkv", + extension="mkv", + ), + file_list_new=[ + "/library/Test Show (2026)/Season 2/Test.Show.S02E01.mkv" + ], + transfer_type="copy", + need_scrape=True, + need_notify=False, + ), + ] + + with patch( + "app.chain.transfer.TransferHistoryOper", + return_value=SimpleNamespace(add_success=lambda **kwargs: SimpleNamespace(id=1)), + ), patch( + "app.chain.transfer.StorageChain" + ) as storage_chain_cls: + storage_chain_cls.return_value.is_bluray_folder.return_value = False + for task, transferinfo in zip(tasks, transferinfos): + chain._TransferChain__default_callback(task, transferinfo) + chain._TransferChain__finish_scrape_batch_task(task) + + metadata_calls = [ + call + for call in chain.eventmanager.send_event.call_args_list + if call.args[0] == EventType.MetadataScrape + ] + self.assertEqual(1, len(metadata_calls)) + event_data = metadata_calls[0].args[1] + self.assertEqual(target_diritem, event_data["fileitem"]) + self.assertEqual( + [ + "/library/Test Show (2026)/Season 1/Test.Show.S01E01.mkv", + "/library/Test Show (2026)/Season 2/Test.Show.S02E01.mkv", + ], + event_data["file_list"], + ) + self.assertEqual({}, chain._scrape_batches) + + def test_scrape_event_keeps_immediate_behavior_without_transfer_batch(self): + chain = object.__new__(TransferChain) + chain.jobview = JobManager() + chain._success_target_files = {} + chain._scrape_batches = {} + chain.eventmanager = MagicMock() + chain.transfer_completed = lambda *args, **kwargs: None + + task = make_task(1) + task.mediainfo = FakeMedia() + task.background = False + task.manual = True + self.assertTrue(chain._TransferChain__put_to_jobview(task)) + + target_diritem = FileItem( + storage="local", + path="/library/Test Show (2026)", + type="dir", + name="Test Show (2026)", + ) + transferinfo = TransferInfo( + success=True, + fileitem=task.fileitem, + target_diritem=target_diritem, + target_item=FileItem( + storage="local", + path="/library/Test Show (2026)/Season 1/Test.Show.S01E01.mkv", + type="file", + name="Test.Show.S01E01.mkv", + extension="mkv", + ), + file_list_new=[ + "/library/Test Show (2026)/Season 1/Test.Show.S01E01.mkv" + ], + transfer_type="copy", + need_scrape=True, + need_notify=False, + ) + + with patch( + "app.chain.transfer.TransferHistoryOper", + return_value=SimpleNamespace(add_success=lambda **kwargs: SimpleNamespace(id=1)), + ), patch( + "app.chain.transfer.StorageChain" + ) as storage_chain_cls: + storage_chain_cls.return_value.is_bluray_folder.return_value = False + chain._TransferChain__default_callback(task, transferinfo) + + metadata_calls = [ + call + for call in chain.eventmanager.send_event.call_args_list + if call.args[0] == EventType.MetadataScrape + ] + self.assertEqual(1, len(metadata_calls)) + event_data = metadata_calls[0].args[1] + self.assertEqual(target_diritem, event_data["fileitem"]) + self.assertEqual( + ["/library/Test Show (2026)/Season 1/Test.Show.S01E01.mkv"], + event_data["file_list"], + ) + if __name__ == "__main__": unittest.main()