feat: add support for syncing matching subtitle and audio files with main media during transfer

This commit is contained in:
jxxghp
2026-05-14 21:12:02 +08:00
parent 23784f614b
commit f50773711e
2 changed files with 493 additions and 31 deletions

View File

@@ -2003,6 +2003,143 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
return None
@staticmethod
def __optional_attr_equal(
source: MetaBase,
target: MetaBase,
attr: str,
normalizer: Callable = None,
) -> bool:
"""
比较可选识别字段。
字段两边都没有识别到时不参与判断;只要任意一边识别到了,就要求两边值一致,
避免把同名不同年份或不同季集的附加文件误归到当前主视频。
"""
source_value = getattr(source, attr, None)
target_value = getattr(target, attr, None)
if source_value is None and target_value is None:
return True
if source_value is None or target_value is None:
return False
if normalizer:
source_value = normalizer(source_value)
target_value = normalizer(target_value)
return source_value == target_value
def __is_same_media_meta(
self, source_meta: MetaBase, target_meta: MetaBase
) -> bool:
"""
判断两个文件识别出的媒体身份是否一致。
"""
if not source_meta or not target_meta:
return False
if source_meta.type != target_meta.type:
return False
if StringUtils.clear_upper(source_meta.name) != StringUtils.clear_upper(
target_meta.name
):
return False
if not self.__optional_attr_equal(source_meta, target_meta, "year", str):
return False
for attr in (
"begin_season",
"end_season",
"begin_episode",
"end_episode",
):
if not self.__optional_attr_equal(source_meta, target_meta, attr, int):
return False
return True
def __get_sync_extra_fileitems(
self,
main_fileitem: FileItem,
main_meta: MetaBase,
meta_factory: Callable[[Path], Optional[MetaBase]],
predicate: Optional[Callable[[FileItem, bool], bool]] = None,
extra_cache: Optional[Dict[Tuple[str, str], List[FileItem]]] = None,
) -> List[Tuple[FileItem, bool]]:
"""
获取与当前主视频识别信息一致的同目录附加文件。
"""
if (
not main_fileitem
or main_fileitem.type != "file"
or not self.__is_media_file(main_fileitem)
or not main_meta
):
return []
parent_key = self.__get_file_parent_key(main_fileitem)
if extra_cache is not None and parent_key in extra_cache:
extra_candidates = extra_cache[parent_key]
else:
storagechain = StorageChain()
parent_item = storagechain.get_parent_item(main_fileitem)
if not parent_item:
logger.debug(f"{main_fileitem.path} 未找到父目录,跳过同步整理附加文件")
return []
parent_key = self.__get_dir_key(parent_item)
extra_candidates: List[FileItem] = []
for item in storagechain.list_files(parent_item, recursion=False) or []:
if (
not item
or item.type != "file"
or not (
self.__is_subtitle_file(item)
or self.__is_audio_file(item)
)
):
continue
if predicate and not predicate(item, False):
continue
extra_candidates.append(item)
if extra_cache is not None:
extra_cache[parent_key] = extra_candidates
extra_fileitems: List[Tuple[FileItem, bool]] = []
for item in extra_candidates:
if item.path == main_fileitem.path:
continue
extra_meta = meta_factory(Path(item.path))
# 不能直接按文件名判断归属,必须基于解析后的媒体身份和季集信息。
if self.__is_same_media_meta(main_meta, extra_meta):
extra_fileitems.append((item, False))
if extra_fileitems:
logger.info(
f"{main_fileitem.path} 同步匹配到 {len(extra_fileitems)} 个附加文件"
)
return extra_fileitems
@staticmethod
def __normalize_dir_path(dir_path: Union[str, Path]) -> str:
"""
归一化目录路径,用于同一父目录候选缓存。
"""
normalized = Path(dir_path).as_posix().rstrip("/")
return normalized or "/"
def __get_dir_key(self, dir_item: FileItem) -> Tuple[str, str]:
"""
获取目录缓存键。
"""
return dir_item.storage, self.__normalize_dir_path(dir_item.path)
def __get_file_parent_key(self, current_item: FileItem) -> Tuple[str, str]:
"""
获取文件父目录缓存键。
"""
return (
current_item.storage,
self.__normalize_dir_path(Path(current_item.path).parent),
)
def do_transfer(
self,
fileitem: FileItem,
@@ -2024,6 +2161,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
background: Optional[bool] = True,
manual: Optional[bool] = False,
preview: Optional[bool] = False,
sync_extra_files: Optional[bool] = False,
continue_callback: Callable = None,
) -> Tuple[bool, Union[str, dict]]:
"""
@@ -2047,6 +2185,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
:param background: 是否后台运行
:param manual: 是否手动整理
:param preview: 是否仅预览
:param sync_extra_files: 是否在整理主视频文件时同步整理同媒体附加文件
:param continue_callback: 继续处理回调
返回:成功标识,错误信息
"""
@@ -2076,6 +2215,77 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
# 汇总错误信息
err_msgs: List[str] = []
def _get_subscribe_custom_words(
history_record: Optional[DownloadHistory],
) -> Optional[List[str]]:
"""
根据下载记录获取订阅自定义识别词。
"""
if not history_record or not isinstance(history_record.note, dict):
return None
# 使用source动态获取订阅
subscribe = SubscribeChain().get_subscribe_by_source(
history_record.note.get("source")
)
return (
subscribe.custom_words.split("\n")
if subscribe and subscribe.custom_words
else None
)
def _build_file_meta(
source_path: Path,
custom_word_list: Optional[List[str]] = None,
) -> Optional[MetaBase]:
"""
构建整理任务使用的文件元数据,并应用手动季集/自定义格式覆盖。
"""
built_meta = deepcopy(meta) if meta else _build_path_meta(
source_path, custom_word_list=custom_word_list
)
if not built_meta:
return None
return _apply_meta_overrides(built_meta, source_path)
def _build_path_meta(
source_path: Path,
custom_word_list: Optional[List[str]] = None,
) -> Optional[MetaBase]:
"""
从文件路径识别媒体信息,用于判断附加文件是否属于当前主视频。
"""
path_meta = MetaInfoPath(
source_path, custom_words=custom_word_list
)
if not path_meta:
return None
return _apply_meta_overrides(path_meta, source_path)
def _apply_meta_overrides(
current_meta: MetaBase, source_path: Path
) -> Optional[MetaBase]:
"""
应用手动传入的季集覆盖和自定义识别格式。
"""
# 合并季
if season is not None:
current_meta.begin_season = season
# 自定义识别
if formaterHandler:
# 开始集、结束集、PART
begin_ep, end_ep, part = formaterHandler.split_episode(
file_name=source_path.name, file_meta=current_meta
)
if begin_ep is not None:
current_meta.begin_episode = begin_ep
if part is not None:
current_meta.part = part
if end_ep is not None:
current_meta.end_episode = end_ep
return current_meta
def _filter(item: FileItem, is_bluray_dir: bool) -> bool:
"""
过滤文件项
@@ -2123,6 +2333,98 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
logger.warn(f"{fileitem.path} 没有找到可整理的媒体文件")
return False, f"{fileitem.name} 没有找到可整理的媒体文件"
if sync_extra_files:
# 单文件和目录整理都按“主视频 -> 同媒体附加文件”补齐;目录场景会逐个视频处理。
extra_file_cache: Dict[Tuple[str, str], List[FileItem]] = {}
main_file_items: List[Tuple[FileItem, bool]] = []
for candidate_item, candidate_bluray_dir in file_items:
if not candidate_item:
continue
if candidate_bluray_dir or self.__is_media_file(candidate_item):
main_file_items.append((candidate_item, candidate_bluray_dir))
continue
if (
candidate_item.type == "file"
and (
self.__is_subtitle_file(candidate_item)
or self.__is_audio_file(candidate_item)
)
):
# 目录递归阶段已拿到附加文件时,直接填入父目录缓存,避免后续重复列目录。
extra_file_cache.setdefault(
self.__get_file_parent_key(candidate_item), []
).append(candidate_item)
if main_file_items:
file_items = list(main_file_items)
seen_file_keys = {
(item.storage, item.path)
for item, _ in file_items
if item and item.path
}
downloadhis = DownloadHistoryOper()
extra_meta_cache: Dict[
Tuple[str, Tuple[str, ...]], Optional[MetaBase]
] = {}
def _get_cached_extra_meta(
extra_path: Path, custom_words_key: Tuple[str, ...]
) -> Optional[MetaBase]:
"""
同一个父目录下的附加文件只解析一次,多个主视频只做内存匹配。
"""
cache_key = (extra_path.as_posix(), custom_words_key)
if cache_key not in extra_meta_cache:
extra_meta_cache[cache_key] = _build_path_meta(
extra_path,
custom_word_list=list(custom_words_key) or None,
)
return extra_meta_cache[cache_key]
def _build_extra_meta_factory(
custom_word_list: Optional[List[str]],
) -> Callable[[Path], Optional[MetaBase]]:
"""
将可变识别词列表转成不可变缓存键,避免闭包默认参数持有可变对象。
"""
custom_words_key = tuple(custom_word_list or [])
def _extra_meta_factory(extra_path: Path) -> Optional[MetaBase]:
return _get_cached_extra_meta(extra_path, custom_words_key)
return _extra_meta_factory
for main_item, main_bluray_dir in list(main_file_items):
if main_bluray_dir or not self.__is_media_file(main_item):
continue
main_path = Path(main_item.path)
main_download_history = self._resolve_download_history(
downloadhis=downloadhis,
file_path=main_path,
bluray_dir=main_bluray_dir,
download_hash=download_hash,
)
subscribe_custom_words = _get_subscribe_custom_words(
main_download_history
)
main_meta = _build_file_meta(
main_path, custom_word_list=subscribe_custom_words
)
extra_items = self.__get_sync_extra_fileitems(
main_fileitem=main_item,
main_meta=main_meta,
meta_factory=_build_extra_meta_factory(subscribe_custom_words),
predicate=_filter,
extra_cache=extra_file_cache,
)
for extra_item, extra_bluray_dir in extra_items:
extra_key = (extra_item.storage, extra_item.path)
if extra_key in seen_file_keys:
continue
file_items.append((extra_item, extra_bluray_dir))
seen_file_keys.add(extra_key)
planned_file_count = len(file_items)
if preview:
logger.info(f"正在预览 {planned_file_count} 个文件的整理路径...")
@@ -2172,27 +2474,13 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
)
if not meta:
subscribe_custom_words = None
if download_history and isinstance(download_history.note, dict):
# 使用source动态获取订阅
subscribe = SubscribeChain().get_subscribe_by_source(
download_history.note.get("source")
)
subscribe_custom_words = (
subscribe.custom_words.split("\n")
if subscribe and subscribe.custom_words
else None
)
# 文件元数据(优先使用订阅识别词)
file_meta = MetaInfoPath(
file_path, custom_words=subscribe_custom_words
file_meta = _build_file_meta(
file_path,
custom_word_list=_get_subscribe_custom_words(download_history),
)
else:
file_meta = meta
# 合并季
if season is not None:
file_meta.begin_season = season
file_meta = _build_file_meta(file_path)
if not file_meta:
all_success = False
@@ -2200,19 +2488,6 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
err_msgs.append(f"{file_path.name} 无法识别有效信息")
continue
# 自定义识别
if formaterHandler:
# 开始集、结束集、PART
begin_ep, end_ep, part = formaterHandler.split_episode(
file_name=file_path.name, file_meta=file_meta
)
if begin_ep is not None:
file_meta.begin_episode = begin_ep
if part is not None:
file_meta.part = part
if end_ep is not None:
file_meta.end_episode = end_ep
# 获取下载Hash
if download_history and (not downloader or not download_hash):
_downloader = download_history.downloader

View File

@@ -101,6 +101,22 @@ def make_transfer_chain() -> TransferChain:
return chain
def make_fileitem(path: str, size: int = 1024) -> FileItem:
file_path = path
name = file_path.rsplit("/", 1)[-1]
suffix = name.rsplit(".", 1)[-1] if "." in name else ""
basename = name[: -(len(suffix) + 1)] if suffix else name
return FileItem(
storage="local",
path=file_path,
type="file",
name=name,
basename=basename,
extension=suffix,
size=size,
)
def migrate_to_media_job(jobview: JobManager, task: TransferTask):
task.mediainfo = FakeMedia()
jobview.migrate_task(task)
@@ -345,6 +361,177 @@ class TransferJobManagerTest(unittest.TestCase):
self.assertEqual([("abc123", "qbittorrent")], completed)
self.assertEqual([], chain.jobview.list_jobs())
def test_do_transfer_does_not_sync_extra_files_by_default(self):
chain = make_transfer_chain()
planned = []
main_fileitem = make_fileitem(
"/downloads/Test Show (2026)/Test.Show.S01E01.2026.mkv"
)
subtitle_fileitem = make_fileitem(
"/downloads/Test Show (2026)/Test.Show.S01E01.2026.zh-cn.srt"
)
chain._TransferChain__get_trans_fileitems = lambda fileitem, predicate: [
(main_fileitem, False)
]
chain._TransferChain__put_to_jobview = lambda task: True
chain._TransferChain__register_scrape_batch_task = lambda task: None
chain._TransferChain__close_scrape_batch = lambda batch_id: None
def fake_handle_transfer(task, callback=None):
planned.append(task.fileitem.path)
return True, ""
chain._TransferChain__handle_transfer = fake_handle_transfer
transfer_history_oper = SimpleNamespace(get_by_src=lambda src, storage=None: None)
download_history_oper = SimpleNamespace(
get_by_hash=lambda download_hash: None,
get_file_by_fullpath=lambda fullpath: None,
get_files_by_savepath=lambda savepath: [],
get_by_path=lambda path: None,
)
system_config_oper = SimpleNamespace(get=lambda key: None)
storage_chain = SimpleNamespace(
get_parent_item=lambda fileitem: FileItem(
storage="local",
path="/downloads/Test Show (2026)/",
type="dir",
name="Test Show (2026)",
),
list_files=lambda fileitem, recursion=False: [
main_fileitem,
subtitle_fileitem,
],
)
with patch(
"app.chain.transfer.TransferHistoryOper",
return_value=transfer_history_oper,
), patch(
"app.chain.transfer.DownloadHistoryOper",
return_value=download_history_oper,
), patch(
"app.chain.transfer.SystemConfigOper",
return_value=system_config_oper,
), patch(
"app.chain.transfer.StorageChain",
return_value=storage_chain,
):
state, errmsg = TransferChain.do_transfer(
chain,
fileitem=main_fileitem,
background=False,
)
self.assertTrue(state)
self.assertEqual("", errmsg)
self.assertEqual([main_fileitem.path], planned)
def test_do_transfer_syncs_matching_extra_files_for_each_main_video(self):
chain = make_transfer_chain()
planned = []
main_ep1_fileitem = make_fileitem(
"/downloads/Test Show (2026)/Test.Show.S01E01.2026.mkv"
)
main_ep2_fileitem = make_fileitem(
"/downloads/Test Show (2026)/Test.Show.S01E02.2026.mkv"
)
ep1_subtitle_fileitem = make_fileitem(
"/downloads/Test Show (2026)/Test.Show.S01E01.2026.zh-cn.srt"
)
ep1_audio_fileitem = make_fileitem(
"/downloads/Test Show (2026)/Test.Show.S01E01.2026.commentary.mka"
)
ep2_subtitle_fileitem = make_fileitem(
"/downloads/Test Show (2026)/Test.Show.S01E02.2026.zh-cn.srt"
)
other_title_fileitem = make_fileitem(
"/downloads/Test Show (2026)/Other.Show.S01E01.2026.zh-cn.srt"
)
parent_fileitem = FileItem(
storage="local",
path="/downloads/Test Show (2026)/",
type="dir",
name="Test Show (2026)",
)
chain._TransferChain__get_trans_fileitems = lambda fileitem, predicate: [
(main_ep1_fileitem, False),
(main_ep2_fileitem, False),
(ep1_subtitle_fileitem, False),
(ep1_audio_fileitem, False),
(ep2_subtitle_fileitem, False),
(other_title_fileitem, False),
]
chain._TransferChain__put_to_jobview = lambda task: True
chain._TransferChain__register_scrape_batch_task = lambda task: None
chain._TransferChain__close_scrape_batch = lambda batch_id: None
def fake_handle_transfer(task, callback=None):
planned.append((task.fileitem.path, task.meta.begin_episode))
return True, ""
chain._TransferChain__handle_transfer = fake_handle_transfer
transfer_history_oper = SimpleNamespace(get_by_src=lambda src, storage=None: None)
download_history_oper = SimpleNamespace(
get_by_hash=lambda download_hash: None,
get_file_by_fullpath=lambda fullpath: None,
get_files_by_savepath=lambda savepath: [],
get_by_path=lambda path: None,
)
system_config_oper = SimpleNamespace(get=lambda key: None)
list_files_calls = []
def fake_list_files(fileitem, recursion=False):
list_files_calls.append((fileitem.path, recursion))
return [
main_ep1_fileitem,
main_ep2_fileitem,
ep1_subtitle_fileitem,
ep1_audio_fileitem,
ep2_subtitle_fileitem,
other_title_fileitem,
]
storage_chain = SimpleNamespace(
get_parent_item=lambda fileitem: parent_fileitem,
list_files=fake_list_files,
)
with patch(
"app.chain.transfer.TransferHistoryOper",
return_value=transfer_history_oper,
), patch(
"app.chain.transfer.DownloadHistoryOper",
return_value=download_history_oper,
), patch(
"app.chain.transfer.SystemConfigOper",
return_value=system_config_oper,
), patch(
"app.chain.transfer.StorageChain",
return_value=storage_chain,
):
state, errmsg = TransferChain.do_transfer(
chain,
fileitem=parent_fileitem,
background=False,
sync_extra_files=True,
)
self.assertTrue(state)
self.assertEqual("", errmsg)
self.assertEqual(
[
(main_ep1_fileitem.path, 1),
(main_ep2_fileitem.path, 2),
(ep1_subtitle_fileitem.path, 1),
(ep1_audio_fileitem.path, 1),
(ep2_subtitle_fileitem.path, 2),
],
planned,
)
self.assertEqual([], list_files_calls)
def test_scrape_event_is_aggregated_by_transfer_batch_across_seasons(self):
chain = make_transfer_chain()
chain.eventmanager = MagicMock()