fix: aggregate metadata scrape events

This commit is contained in:
jxxghp
2026-05-14 07:38:06 +08:00
parent e5b67438d9
commit f4423e121e
4 changed files with 363 additions and 35 deletions

View File

@@ -750,7 +750,7 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
# 媒体根目录
fileitem: FileItem = event_data.get("fileitem")
# 媒体文件列表
file_list: List[str] = event_data.get("file_list", [])
file_list: List[str] = list(dict.fromkeys(event_data.get("file_list") or []))
# 媒体元数据
meta: MetaBase = event_data.get("meta")
# 媒体信息
@@ -793,7 +793,7 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
)
else:
# 1. 收集fileitem和file_list中每个文件之间所有子目录
all_dirs = set()
all_dirs: set[Path] = set()
root_path = Path(fileitem.path)
logger.debug(f"开始收集目录,根目录:{root_path}")
@@ -815,7 +815,10 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
logger.debug(f"共收集到 {len(all_dirs)} 个目录")
# 2. 初始化一遍子目录,但不处理文件
for sub_dir in all_dirs:
for sub_dir in sorted(
all_dirs,
key=lambda item: (len(item.parts), item.as_posix()),
):
sub_dir_item = self.storagechain.get_file_item(
storage=fileitem.storage, path=sub_dir
)
@@ -834,7 +837,7 @@ class MediaChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
# 3. 刮削每个文件
logger.info(f"开始刮削 {len(file_list)} 个文件")
for sub_file_path in file_list:
for sub_file_path in sorted(file_list):
sub_file_item = self.storagechain.get_file_item(
storage=fileitem.storage, path=Path(sub_file_path)
)

View File

@@ -3,9 +3,10 @@ import queue
import re
import threading
import traceback
import uuid
from copy import deepcopy
from pathlib import Path
from typing import List, Optional, Tuple, Union, Dict, Callable
from typing import List, Optional, Tuple, Union, Dict, Callable, Any
from app import schemas
from app.agent import ReplyMode, prompt_manager, agent_manager
@@ -730,6 +731,8 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
self.retry_scheduler = FailedRetryScheduler()
# 转移成功的文件清单
self._success_target_files: Dict[str, List[str]] = {}
# 批次级刮削缓冲,避免同一批多文件入库重复触发目录刮削
self._scrape_batches: Dict[str, Dict[str, Any]] = {}
# 整理进度进度
self._progress = ProgressHelper(ProgressKey.FileTransfer)
# 队列相关状态
@@ -796,7 +799,10 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
return StorageChain().is_bluray_folder(fileitem)
if not fileitem.extension:
return False
return True if f".{fileitem.extension.lower()}" in self._media_exts else False
media_exts = (
self._media_exts if hasattr(self, "_media_exts") else settings.RMT_MEDIAEXT
)
return True if f".{fileitem.extension.lower()}" in media_exts else False
def __is_allowed_file(self, fileitem: FileItem) -> bool:
"""
@@ -831,7 +837,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
def __notify():
"""
完成时发送消息、刮削事件、移除任务等
完成时发送消息、移除任务等
"""
# 更新文件数量
transferinfo.file_count = (
@@ -842,12 +848,6 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
self.jobview.size(task.mediainfo, task.meta.begin_season)
or task.fileitem.size
)
# 更新文件清单
with job_lock:
transferinfo.file_list_new = self._success_target_files.pop(
transferinfo.target_diritem.path, []
)
# 发送通知,实时手动整理时不发
if transferinfo.need_notify and (task.background or not task.manual):
se_str = None
@@ -868,19 +868,6 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
username=task.username,
)
# 刮削事件
if transferinfo.need_scrape and self.__is_media_file(task.fileitem):
self.eventmanager.send_event(
EventType.MetadataScrape,
{
"meta": task.meta,
"mediainfo": task.mediainfo,
"fileitem": transferinfo.target_diritem,
"file_list": transferinfo.file_list_new,
"overwrite": False,
},
)
transferhis = TransferHistoryOper()
# 转移失败
@@ -1070,9 +1057,19 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
# 设置任务成功
self.jobview.finish_task(task)
# 登记批次级刮削目标
self.__record_scrape_target(task, transferinfo)
# 全部整理完成且有成功的任务时,发送消息和事件
if self.jobview.is_finished(task):
# 更新文件清单
with job_lock:
transferinfo.file_list_new = self._success_target_files.pop(
transferinfo.target_diritem.path, []
)
__notify()
if not task.transfer_batch_id:
self.__send_metadata_scrape_event(task, transferinfo)
# 只要该种子的所有任务都已整理完成,则设置种子状态为已整理
self.__mark_torrent_completed_if_done(task.download_hash, task.downloader)
@@ -1122,6 +1119,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
# 维护整理任务视图,如果任务已存在则不添加到队列
if not self.__put_to_jobview(task):
return False
self.__register_scrape_batch_task(task)
# 添加到队列
self._queue.put(TransferQueue(task=task, callback=self.__default_callback))
return True
@@ -1149,6 +1147,168 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
):
self.transfer_completed(hashs=download_hash, downloader=downloader)
def __send_metadata_scrape_event(
self, task: TransferTask, transferinfo: TransferInfo
):
"""
发送元数据刮削事件,保持对外事件载荷兼容。
"""
if (
not task
or not transferinfo
or not transferinfo.need_scrape
or not transferinfo.target_diritem
or not self.__is_media_file(task.fileitem)
):
return
self.eventmanager.send_event(
EventType.MetadataScrape,
{
"meta": task.meta,
"mediainfo": task.mediainfo,
"fileitem": transferinfo.target_diritem,
"file_list": transferinfo.file_list_new,
"overwrite": False,
},
)
def __register_scrape_batch_task(self, task: TransferTask):
"""
登记批次任务。刮削事件只在批次关闭且任务全部完成后统一发送。
"""
if not task or not task.transfer_batch_id:
return
if not hasattr(self, "_scrape_batches"):
self._scrape_batches = {}
with job_lock:
batch = self._scrape_batches.setdefault(
task.transfer_batch_id,
{
"pending": set(),
"targets": {},
"closed": False,
},
)
batch["pending"].add(task.fileitem.path)
def __close_scrape_batch(self, batch_id: Optional[str]):
"""
标记批次不再接收新任务,并尝试发送已聚合的刮削事件。
"""
if not batch_id:
return
if not hasattr(self, "_scrape_batches"):
self._scrape_batches = {}
with job_lock:
batch = self._scrape_batches.setdefault(
batch_id,
{
"pending": set(),
"targets": {},
"closed": False,
},
)
batch["closed"] = True
self.__flush_scrape_batch_if_ready(batch_id)
def __record_scrape_target(self, task: TransferTask, transferinfo: TransferInfo):
"""
记录批次内需要刮削的目标文件,按目标媒体根目录聚合。
"""
if (
not task
or not task.transfer_batch_id
or not transferinfo
or not transferinfo.need_scrape
or not transferinfo.target_diritem
or not self.__is_media_file(task.fileitem)
):
return
if not hasattr(self, "_scrape_batches"):
self._scrape_batches = {}
target_diritem = transferinfo.target_diritem
target_files = transferinfo.file_list_new or []
target_key = (target_diritem.storage, target_diritem.path)
with job_lock:
batch = self._scrape_batches.setdefault(
task.transfer_batch_id,
{
"pending": set(),
"targets": {},
"closed": False,
},
)
target = batch["targets"].setdefault(
target_key,
{
"fileitem": target_diritem,
"meta": task.meta,
"mediainfo": task.mediainfo,
"files": [],
"overwrite": False,
},
)
if not target.get("meta"):
target["meta"] = task.meta
if not target.get("mediainfo"):
target["mediainfo"] = task.mediainfo
for target_file in target_files:
if target_file and target_file not in target["files"]:
target["files"].append(target_file)
def __finish_scrape_batch_task(self, task: TransferTask):
"""
标记批次内单个任务已结束。
"""
if not task or not task.transfer_batch_id:
return
if not hasattr(self, "_scrape_batches"):
self._scrape_batches = {}
with job_lock:
batch = self._scrape_batches.get(task.transfer_batch_id)
if not batch:
return
batch["pending"].discard(task.fileitem.path)
self.__flush_scrape_batch_if_ready(task.transfer_batch_id)
def __flush_scrape_batch_if_ready(self, batch_id: Optional[str]):
"""
批次任务全部结束后发送聚合后的刮削事件。
"""
if not batch_id:
return
if not hasattr(self, "_scrape_batches"):
self._scrape_batches = {}
with job_lock:
batch = self._scrape_batches.get(batch_id)
if (
not batch
or not batch.get("closed")
or batch.get("pending")
):
return
targets = list(batch.get("targets", {}).values())
self._scrape_batches.pop(batch_id, None)
for target in targets:
fileitem = target.get("fileitem")
if not fileitem:
continue
file_list = list(dict.fromkeys(target.get("files") or []))
self.eventmanager.send_event(
EventType.MetadataScrape,
{
"meta": target.get("meta"),
"mediainfo": target.get("mediainfo"),
"fileitem": fileitem,
"file_list": file_list,
"overwrite": target.get("overwrite", False),
},
)
def remove_from_queue(self, fileitem: FileItem):
"""
从待整理队列移除
@@ -1163,6 +1323,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
"""
self.jobview.fail_unfinished_task(task)
self.jobview.try_remove_job(task)
self.__finish_scrape_batch_task(task)
def __start_transfer(self):
"""
@@ -1487,6 +1648,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
finally:
# 移除已完成的任务
self.jobview.try_remove_job(task)
self.__finish_scrape_batch_task(task)
def get_queue_tasks(self) -> List[TransferJob]:
"""
@@ -1903,6 +2065,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
"""
# 是否全部成功
all_success = True
transfer_batch_id = str(uuid.uuid4())
if preview:
# 预览模式始终同步执行,避免进入异步队列
background = False
@@ -2086,6 +2249,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
downloader=_downloader,
download_hash=_download_hash,
download_history=download_history,
transfer_batch_id=transfer_batch_id,
manual=manual,
background=background,
preview=preview,
@@ -2098,6 +2262,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
else:
# 加入列表
if self.__put_to_jobview(transfer_task):
self.__register_scrape_batch_task(transfer_task)
transfer_tasks.append(transfer_task)
else:
logger.debug(f"{file_path.name} 已在整理列表中,跳过")
@@ -2106,6 +2271,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
finally:
file_items.clear()
del file_items
self.__close_scrape_batch(transfer_batch_id)
# 实时整理
preview_items: List[dict] = []

View File

@@ -66,6 +66,7 @@ class TransferTask(BaseModel):
downloader: Optional[str] = None
download_hash: Optional[str] = None
download_history: Optional[DownloadHistory] = None
transfer_batch_id: Optional[str] = None
manual: Optional[bool] = False
background: Optional[bool] = True
preview: Optional[bool] = False

View File

@@ -1,19 +1,19 @@
import unittest
from types import SimpleNamespace
from unittest.mock import patch
from unittest.mock import patch, MagicMock
from app.chain.transfer import JobManager, TransferChain
from app.schemas import FileItem, TransferTask
from app.schemas.types import MediaType
from app.schemas import FileItem, TransferInfo, TransferTask
from app.schemas.types import EventType, MediaType
class FakeMeta:
def __init__(self, episode: int):
def __init__(self, episode: int, season: int = 1):
self.name = "Test Show"
self.title = f"Test Show S01E{episode:02d}"
self.title = f"Test Show S{season:02d}E{episode:02d}"
self.year = "2026"
self.type = MediaType.TV
self.begin_season = 1
self.begin_season = season
self.end_season = None
self.total_season = 1
self.begin_episode = episode
@@ -25,7 +25,7 @@ class FakeMeta:
@property
def season(self):
return "S01"
return f"S{self.begin_season:02d}"
@property
def episode(self):
@@ -53,6 +53,8 @@ class FakeMedia:
def __init__(self, tmdb_id: int = 12345):
self.tmdb_id = tmdb_id
self.douban_id = None
self.type = MediaType.TV
self.title_year = "Test Show (2026)"
def clear(self):
pass
@@ -68,8 +70,8 @@ class FakeMedia:
}
def make_task(episode: int) -> TransferTask:
name = f"Test.Show.S01E{episode:02d}.mkv"
def make_task(episode: int, season: int = 1) -> TransferTask:
name = f"Test.Show.S{season:02d}E{episode:02d}.mkv"
return TransferTask(
fileitem=FileItem(
storage="local",
@@ -299,6 +301,7 @@ class TransferJobManagerTest(unittest.TestCase):
chain = object.__new__(TransferChain)
chain.jobview = JobManager()
chain.post_message = lambda *_args, **_kwargs: None
chain._scrape_batches = {}
completed = []
def fake_transfer_completed(hashs, downloader):
@@ -332,6 +335,161 @@ class TransferJobManagerTest(unittest.TestCase):
self.assertEqual([("abc123", "qbittorrent")], completed)
self.assertEqual([], chain.jobview.list_jobs())
def test_scrape_event_is_aggregated_by_transfer_batch_across_seasons(self):
chain = object.__new__(TransferChain)
chain.jobview = JobManager()
chain._success_target_files = {}
chain._scrape_batches = {}
chain.eventmanager = MagicMock()
chain.transfer_completed = lambda *args, **kwargs: None
tasks = [make_task(1, season=1), make_task(1, season=2)]
target_diritem = FileItem(
storage="local",
path="/library/Test Show (2026)",
type="dir",
name="Test Show (2026)",
)
batch_id = "batch-tv-multi-season"
for task in tasks:
task.mediainfo = FakeMedia()
task.transfer_batch_id = batch_id
task.background = False
task.manual = True
self.assertTrue(chain._TransferChain__put_to_jobview(task))
chain._TransferChain__register_scrape_batch_task(task)
chain._TransferChain__close_scrape_batch(batch_id)
transferinfos = [
TransferInfo(
success=True,
fileitem=tasks[0].fileitem,
target_diritem=target_diritem,
target_item=FileItem(
storage="local",
path="/library/Test Show (2026)/Season 1/Test.Show.S01E01.mkv",
type="file",
name="Test.Show.S01E01.mkv",
extension="mkv",
),
file_list_new=[
"/library/Test Show (2026)/Season 1/Test.Show.S01E01.mkv"
],
transfer_type="copy",
need_scrape=True,
need_notify=False,
),
TransferInfo(
success=True,
fileitem=tasks[1].fileitem,
target_diritem=target_diritem,
target_item=FileItem(
storage="local",
path="/library/Test Show (2026)/Season 2/Test.Show.S02E01.mkv",
type="file",
name="Test.Show.S02E01.mkv",
extension="mkv",
),
file_list_new=[
"/library/Test Show (2026)/Season 2/Test.Show.S02E01.mkv"
],
transfer_type="copy",
need_scrape=True,
need_notify=False,
),
]
with patch(
"app.chain.transfer.TransferHistoryOper",
return_value=SimpleNamespace(add_success=lambda **kwargs: SimpleNamespace(id=1)),
), patch(
"app.chain.transfer.StorageChain"
) as storage_chain_cls:
storage_chain_cls.return_value.is_bluray_folder.return_value = False
for task, transferinfo in zip(tasks, transferinfos):
chain._TransferChain__default_callback(task, transferinfo)
chain._TransferChain__finish_scrape_batch_task(task)
metadata_calls = [
call
for call in chain.eventmanager.send_event.call_args_list
if call.args[0] == EventType.MetadataScrape
]
self.assertEqual(1, len(metadata_calls))
event_data = metadata_calls[0].args[1]
self.assertEqual(target_diritem, event_data["fileitem"])
self.assertEqual(
[
"/library/Test Show (2026)/Season 1/Test.Show.S01E01.mkv",
"/library/Test Show (2026)/Season 2/Test.Show.S02E01.mkv",
],
event_data["file_list"],
)
self.assertEqual({}, chain._scrape_batches)
def test_scrape_event_keeps_immediate_behavior_without_transfer_batch(self):
chain = object.__new__(TransferChain)
chain.jobview = JobManager()
chain._success_target_files = {}
chain._scrape_batches = {}
chain.eventmanager = MagicMock()
chain.transfer_completed = lambda *args, **kwargs: None
task = make_task(1)
task.mediainfo = FakeMedia()
task.background = False
task.manual = True
self.assertTrue(chain._TransferChain__put_to_jobview(task))
target_diritem = FileItem(
storage="local",
path="/library/Test Show (2026)",
type="dir",
name="Test Show (2026)",
)
transferinfo = TransferInfo(
success=True,
fileitem=task.fileitem,
target_diritem=target_diritem,
target_item=FileItem(
storage="local",
path="/library/Test Show (2026)/Season 1/Test.Show.S01E01.mkv",
type="file",
name="Test.Show.S01E01.mkv",
extension="mkv",
),
file_list_new=[
"/library/Test Show (2026)/Season 1/Test.Show.S01E01.mkv"
],
transfer_type="copy",
need_scrape=True,
need_notify=False,
)
with patch(
"app.chain.transfer.TransferHistoryOper",
return_value=SimpleNamespace(add_success=lambda **kwargs: SimpleNamespace(id=1)),
), patch(
"app.chain.transfer.StorageChain"
) as storage_chain_cls:
storage_chain_cls.return_value.is_bluray_folder.return_value = False
chain._TransferChain__default_callback(task, transferinfo)
metadata_calls = [
call
for call in chain.eventmanager.send_event.call_args_list
if call.args[0] == EventType.MetadataScrape
]
self.assertEqual(1, len(metadata_calls))
event_data = metadata_calls[0].args[1]
self.assertEqual(target_diritem, event_data["fileitem"])
self.assertEqual(
["/library/Test Show (2026)/Season 1/Test.Show.S01E01.mkv"],
event_data["file_list"],
)
if __name__ == "__main__":
unittest.main()