Files
archived-MoviePilot/tests/test_subscribe_chain.py

1615 lines
65 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import importlib.util
import sys
import types
from pathlib import Path
from types import SimpleNamespace
from unittest import TestCase
from unittest.mock import patch
from app.schemas.types import MediaType
def _load_subscribe_chain_class():
"""隔离加载 SubscribeChain避免测试依赖完整运行时环境。"""
module_name = "_test_subscribe_chain"
if module_name in sys.modules:
return sys.modules[module_name], sys.modules[module_name].SubscribeChain
original_modules = {}
def ensure_module(name: str, module: types.ModuleType):
"""临时替换模块依赖,并记录原模块以便加载完成后恢复。"""
if name not in original_modules:
original_modules[name] = sys.modules.get(name)
sys.modules[name] = module
return module
chain_module = ensure_module("app.chain", types.ModuleType("app.chain"))
class _ChainBase:
def __init__(self):
self.messagehelper = SimpleNamespace(put=lambda *args, **kwargs: None)
def post_message(self, *args, **kwargs):
return None
async def async_post_message(self, *args, **kwargs):
return None
chain_module.ChainBase = _ChainBase
interaction_module = ensure_module("app.helper.interaction", types.ModuleType("app.helper.interaction"))
class _SlashInteractionManager:
def create_or_replace(self, *args, **kwargs):
return SimpleNamespace(request_id="request-id")
def get_by_id(self, *args, **kwargs):
return None
def get_by_user(self, *args, **kwargs):
return None
def remove(self, *args, **kwargs):
return None
interaction_module.SlashInteractionManager = _SlashInteractionManager
interaction_module.build_navigation_buttons = lambda *args, **kwargs: []
interaction_module.format_markdown_table = lambda *args, **kwargs: ""
interaction_module.page_items = lambda *args, **kwargs: []
interaction_module.supports_interaction_buttons = lambda *args, **kwargs: False
interaction_module.supports_markdown = lambda *args, **kwargs: False
interaction_module.update_or_post_message = lambda *args, **kwargs: None
config_module = ensure_module("app.core.config", types.ModuleType("app.core.config"))
config_module.global_vars = SimpleNamespace(is_system_stopped=False)
config_module.settings = SimpleNamespace(
RECOGNIZE_SOURCE="themoviedb",
MP_DOMAIN=lambda path: path,
)
context_module = ensure_module("app.core.context", types.ModuleType("app.core.context"))
context_module.TorrentInfo = SimpleNamespace
context_module.Context = SimpleNamespace
context_module.MediaInfo = SimpleNamespace
class _EpisodeLocation(SimpleNamespace):
CONFIDENCE_HIGH = "high"
CONFIDENCE_LOW = "low"
MODE_ABSOLUTE_TO_SEASON = "absolute_to_season"
@classmethod
def locate(cls, meta, target_season, target_episodes, episodes=None):
raw_episodes = episodes if episodes is not None else meta.episode_list
source_episodes = sorted(set(int(episode) for episode in raw_episodes))
if set(source_episodes).intersection(set(target_episodes)):
return None
season_list = meta.season_list or []
if target_season is None or len(season_list) != 1 or season_list[0] != target_season:
return None
if len(source_episodes) != len(target_episodes):
return None
if source_episodes != list(range(source_episodes[0], source_episodes[-1] + 1)):
return None
return cls(
source_episodes=source_episodes,
target_episodes=target_episodes,
mode=cls.MODE_ABSOLUTE_TO_SEASON,
confidence=cls.CONFIDENCE_HIGH,
)
context_module.EpisodeLocation = _EpisodeLocation
event_module = ensure_module("app.core.event", types.ModuleType("app.core.event"))
class _EventManager:
@staticmethod
def send_event(*args, **kwargs):
return None
@staticmethod
async def async_send_event(*args, **kwargs):
return None
@staticmethod
def register(*args, **kwargs):
def decorator(func):
return func
return decorator
@staticmethod
def add_event_listener(*args, **kwargs):
"""兼容模块导入时注册配置变更监听。"""
return None
event_module.eventmanager = _EventManager()
event_module.Event = SimpleNamespace
meta_module = ensure_module("app.core.meta", types.ModuleType("app.core.meta"))
meta_module.MetaBase = SimpleNamespace
metainfo_module = ensure_module("app.core.metainfo", types.ModuleType("app.core.metainfo"))
metainfo_module.MetaInfo = lambda *args, **kwargs: SimpleNamespace(episode_list=[])
words_module = ensure_module("app.core.meta.words", types.ModuleType("app.core.meta.words"))
class _WordsMatcher:
def prepare(self, title, custom_words=None):
return title, []
words_module.WordsMatcher = _WordsMatcher
schemas_module = ensure_module("app.schemas", types.ModuleType("app.schemas"))
class _Notification:
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
class _SubscribeSchema:
def __init__(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
class _NotExistMediaInfo:
def __init__(self, season=None, episodes=None, total_episode=None, start_episode=None):
self.season = season
self.episodes = episodes or []
self.total_episode = total_episode
self.start_episode = start_episode
class _SubscribeEpisodeInfo:
def __init__(self):
self.downloading = []
self.downloaded = []
self.library = []
class _SubscrbieInfo:
def __init__(self):
self.subscribe = None
self.episodes = {}
class _SubscribeDownloadFileInfo:
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
class _SubscribeLibraryFileInfo:
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
class _MediaRecognizeConvertEventData:
def __init__(self, **kwargs):
self.mediaid = kwargs.get("mediaid")
self.convert_type = kwargs.get("convert_type")
self.media_dict = kwargs.get("media_dict")
class _SubscribeEpisodesRefreshEventData:
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
class _SubscribeCompletionCheckEventData:
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
schemas_module.Notification = _Notification
schemas_module.Subscribe = _SubscribeSchema
schemas_module.NotExistMediaInfo = _NotExistMediaInfo
schemas_module.SubscribeEpisodeInfo = _SubscribeEpisodeInfo
schemas_module.SubscrbieInfo = _SubscrbieInfo
schemas_module.SubscribeDownloadFileInfo = _SubscribeDownloadFileInfo
schemas_module.SubscribeLibraryFileInfo = _SubscribeLibraryFileInfo
schemas_module.MediaRecognizeConvertEventData = _MediaRecognizeConvertEventData
schemas_module.SubscribeEpisodesRefreshEventData = _SubscribeEpisodesRefreshEventData
schemas_module.SubscribeCompletionCheckEventData = _SubscribeCompletionCheckEventData
logger_module = ensure_module("app.log", types.ModuleType("app.log"))
class _Logger:
def info(self, *args, **kwargs):
return None
def debug(self, *args, **kwargs):
return None
def warning(self, *args, **kwargs):
return None
def warn(self, *args, **kwargs):
return None
def error(self, *args, **kwargs):
return None
logger_module.logger = _Logger()
helper_server_module = ensure_module("app.helper.server", types.ModuleType("app.helper.server"))
class _MoviePilotServerHelper:
@staticmethod
def sub_done_async(*args, **kwargs):
"""
忽略订阅完成统计上报。
"""
return None
@staticmethod
def sub_reg_async(*args, **kwargs):
"""
忽略订阅新增统计上报。
"""
return None
@staticmethod
async def async_sub_reg(*args, **kwargs):
"""
忽略异步订阅新增统计上报。
"""
return None
@staticmethod
def get_subscribe_shares():
"""
返回空的订阅共享数据。
"""
return []
helper_server_module.MoviePilotServerHelper = _MoviePilotServerHelper
helper_torrent_module = ensure_module("app.helper.torrent", types.ModuleType("app.helper.torrent"))
helper_torrent_module.TorrentHelper = type("TorrentHelper", (), {})
db_model_module = ensure_module("app.db.models.subscribe", types.ModuleType("app.db.models.subscribe"))
class _SubscribeModel:
def __init__(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
def to_dict(self):
return dict(self.__dict__)
db_model_module.Subscribe = _SubscribeModel
subscribe_oper_module = ensure_module("app.db.subscribe_oper", types.ModuleType("app.db.subscribe_oper"))
class _SubscribeOper:
def update(self, *args, **kwargs):
return None
def get(self, *args, **kwargs):
return None
def list(self, *args, **kwargs):
return []
def delete(self, *args, **kwargs):
return None
def add_history(self, *args, **kwargs):
return None
subscribe_oper_module.SubscribeOper = _SubscribeOper
simple_oper_modules = {
"app.db.downloadhistory_oper": "DownloadHistoryOper",
"app.db.site_oper": "SiteOper",
"app.db.systemconfig_oper": "SystemConfigOper",
}
for module_name_key, class_name in simple_oper_modules.items():
module = ensure_module(module_name_key, types.ModuleType(module_name_key))
if class_name == "SystemConfigOper":
class _SystemConfigOper:
def get(self, *args, **kwargs):
return None
def set(self, *args, **kwargs):
return None
setattr(module, class_name, _SystemConfigOper)
else:
setattr(module, class_name, type(class_name, (), {}))
chain_dependencies = {
"app.chain.download": "DownloadChain",
"app.chain.media": "MediaChain",
"app.chain.search": "SearchChain",
"app.chain.tmdb": "TmdbChain",
"app.chain.torrents": "TorrentsChain",
}
for module_name_key, class_name in chain_dependencies.items():
module = ensure_module(module_name_key, types.ModuleType(module_name_key))
setattr(module, class_name, type(class_name, (), {}))
subscribe_path = Path(__file__).resolve().parents[1] / "app" / "chain" / "subscribe.py"
spec = importlib.util.spec_from_file_location(module_name, subscribe_path)
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
assert spec and spec.loader
spec.loader.exec_module(module)
module._injected_modules = {
name: sys.modules.get(name)
for name in original_modules
}
for injected_name, original_module in original_modules.items():
if original_module is None:
sys.modules.pop(injected_name, None)
else:
sys.modules[injected_name] = original_module
return module, module.SubscribeChain
SUBSCRIBE_CHAIN_MODULE, SubscribeChain = _load_subscribe_chain_class()
def _build_context(meta_info, selected_episodes=None, allowed_episodes=None):
"""构造带集数定位能力的最小候选上下文。"""
class _ContextStub(SimpleNamespace):
def locate_episode(self, target_season, target_episodes):
return SUBSCRIBE_CHAIN_MODULE.EpisodeLocation.locate(
meta=self.meta_info,
target_season=target_season,
target_episodes=target_episodes,
)
return _ContextStub(
meta_info=meta_info,
selected_episodes=selected_episodes,
allowed_episodes=allowed_episodes,
located_episodes=None,
)
class SubscribeChainTest(TestCase):
def _build_subscribe(self, **overrides):
data = {
"id": 1,
"name": "Test Show",
"season": 1,
"best_version": 1,
"best_version_full": 0,
"type": MediaType.TV.value,
"start_episode": 1,
"total_episode": 3,
"current_priority": None,
"episode_priority": None,
"lack_episode": 3,
"state": "R",
"note": [],
"manual_total_episode": 0,
"tmdbid": 1,
"doubanid": None,
"year": "2026",
"imdbid": None,
"tvdbid": None,
"episode_group": None,
"poster": None,
"backdrop": None,
"description": None,
"last_update": None,
"username": None,
"to_dict": lambda: {},
}
data.update(overrides)
return SimpleNamespace(**data)
@staticmethod
def _build_download(priority, selected_episodes=None, meta_episodes=None):
return SimpleNamespace(
torrent_info=SimpleNamespace(pri_order=priority),
selected_episodes=selected_episodes,
located_episodes=None,
meta_info=SimpleNamespace(season_list=[1], episode_list=meta_episodes or []),
)
def test_match_title_fallback_calls_torrent_match_from_class(self):
"""确保标题兜底匹配不依赖 TorrentHelper 实例绑定。"""
class _ReachedTitleMatch(Exception):
"""标记测试已经进入标题匹配函数体。"""
class _PlainTorrentHelper:
"""模拟未声明 staticmethod 的历史 TorrentHelper 形态。"""
def match_torrent(mediainfo, torrent_meta, torrent):
"""标记类级调用已经正确进入匹配逻辑。"""
raise _ReachedTitleMatch
def filter_torrent(self, *args, **kwargs):
"""保持订阅匹配后续过滤流程可继续执行。"""
return True
subscribe = self._build_subscribe(
best_version=0,
custom_words=None,
doubanid=None,
episode_group=None,
sites=[],
tmdbid=1,
)
mediainfo = SimpleNamespace(
clear=lambda: None,
douban_id=None,
title_year="Test Show (2026)",
tmdb_id=1,
type=MediaType.TV,
)
context = SimpleNamespace(
media_info=None,
media_recognize_fail_count=3,
meta_info=SimpleNamespace(
begin_season=1,
episode_list=[],
org_string="Test Show",
season_list=[1],
),
torrent_info=SimpleNamespace(
description="",
site=1,
site_name="TestSite",
title="Test Show S01",
),
)
class _SubscribeOper:
"""提供单条订阅,避免依赖真实数据库。"""
def list(self, *args, **kwargs):
"""返回当前测试构造的订阅列表。"""
return [subscribe]
chain = SubscribeChain()
chain.recognize_media = lambda **kwargs: mediainfo
chain.check_and_handle_existing_media = lambda **kwargs: (False, {})
with patch.object(SUBSCRIBE_CHAIN_MODULE, "SubscribeOper", _SubscribeOper), patch.object(
SUBSCRIBE_CHAIN_MODULE,
"TorrentHelper",
_PlainTorrentHelper,
), self.assertRaises(_ReachedTitleMatch):
chain.match({"test.example": [context]})
def test_get_episode_priority_falls_back_to_current_priority(self):
subscribe = self._build_subscribe(current_priority=80, episode_priority=None)
self.assertEqual(
SubscribeChain.get_episode_priority(subscribe),
{"1": 80, "2": 80, "3": 80},
)
def test_get_pending_best_version_episodes_uses_per_episode_status(self):
subscribe = self._build_subscribe(
total_episode=5,
episode_priority={"1": 100, "2": 80, "4": 100},
)
self.assertEqual(
SubscribeChain._get_pending_best_version_episodes(subscribe),
[2, 3, 5],
)
def test_best_version_progress_helpers_return_remaining_priority(self):
subscribe = self._build_subscribe(
total_episode=5,
episode_priority={"1": 100, "2": 80, "3": 90, "4": 100, "5": 70},
current_priority=100,
)
self.assertEqual(SubscribeChain.get_best_version_current_priority(subscribe), 90)
self.assertFalse(SubscribeChain.is_best_version_complete(subscribe))
def test_best_version_progress_helpers_mark_complete_when_all_target_episodes_done(self):
subscribe = self._build_subscribe(
total_episode=3,
episode_priority={"1": 100, "2": 100, "3": 100},
current_priority=90,
)
self.assertEqual(SubscribeChain.get_best_version_current_priority(subscribe), 100)
self.assertTrue(SubscribeChain.is_best_version_complete(subscribe))
def test_is_episode_range_covered_matches_pending_episodes(self):
subscribe = self._build_subscribe(
total_episode=12,
episode_priority={
**{str(ep): 100 for ep in range(1, 5)},
**{str(ep): 100 for ep in range(8, 13)},
},
)
self.assertTrue(
SubscribeChain._is_episode_range_covered(
meta=SimpleNamespace(episode_list=[5, 6, 7]),
subscribe=subscribe,
)
)
self.assertFalse(
SubscribeChain._is_episode_range_covered(
meta=SimpleNamespace(episode_list=[1, 2, 3, 4]),
subscribe=subscribe,
)
)
self.assertTrue(
SubscribeChain._is_episode_range_covered(
meta=SimpleNamespace(episode_list=[]),
subscribe=subscribe,
)
)
absolute_subscribe = self._build_subscribe(
season=5,
total_episode=26,
episode_priority={},
)
self.assertTrue(
SubscribeChain._is_episode_range_covered(
meta=SimpleNamespace(
season_list=[5],
episode_list=list(range(57, 83)),
total_episode=26,
),
subscribe=absolute_subscribe,
)
)
self.assertFalse(
SubscribeChain._is_episode_range_covered(
meta=SimpleNamespace(
season_list=[5],
episode_list=list(range(57, 61)),
total_episode=4,
),
subscribe=absolute_subscribe,
)
)
def test_full_best_version_rejects_episode_resource(self):
subscribe = self._build_subscribe(best_version_full=1, total_episode=3)
self.assertFalse(
SubscribeChain._SubscribeChain__is_full_season_best_version_resource(
meta=SimpleNamespace(season_list=[1], episode_list=[1]),
subscribe=subscribe,
)
)
def test_full_best_version_accepts_full_pack_resource(self):
subscribe = self._build_subscribe(best_version_full=1, total_episode=3)
self.assertTrue(
SubscribeChain._SubscribeChain__is_full_season_best_version_resource(
meta=SimpleNamespace(season_list=[1], episode_list=[]),
subscribe=subscribe,
)
)
self.assertTrue(
SubscribeChain._SubscribeChain__is_full_season_best_version_resource(
meta=SimpleNamespace(season_list=[1], episode_list=[1, 2, 3]),
subscribe=subscribe,
)
)
def test_full_best_version_accepts_absolute_numbered_full_pack(self):
"""同季全集资源使用累计总集编号时,洗版判断应按订阅季内集数定位。"""
subscribe = self._build_subscribe(
best_version_full=1,
season=5,
total_episode=26,
episode_priority={},
)
meta = SimpleNamespace(
season_list=[5],
episode_list=list(range(57, 83)),
total_episode=26,
)
context = _build_context(meta_info=meta)
self.assertTrue(
SubscribeChain._SubscribeChain__is_full_season_best_version_resource(
meta=meta,
subscribe=subscribe,
)
)
self.assertTrue(
SubscribeChain._is_episode_range_covered(
meta=meta,
subscribe=subscribe,
)
)
self.assertEqual(
SubscribeChain._SubscribeChain__get_best_version_interested_episodes(
subscribe=subscribe,
context=context,
priority=100,
),
list(range(1, 27)),
)
def test_low_confidence_episode_location_does_not_drive_best_version_filter(self):
"""低置信定位不能直接参与洗版过滤,只能作为后续弱提示保留。"""
subscribe = self._build_subscribe(
best_version_full=1,
season=5,
total_episode=26,
episode_priority={},
)
meta = SimpleNamespace(
season_list=[1],
episode_list=list(range(57, 83)),
total_episode=26,
)
context = _build_context(meta_info=meta)
low_location = SUBSCRIBE_CHAIN_MODULE.EpisodeLocation(
source_episodes=list(range(57, 83)),
target_episodes=list(range(1, 27)),
mode=SUBSCRIBE_CHAIN_MODULE.EpisodeLocation.MODE_ABSOLUTE_TO_SEASON,
confidence=SUBSCRIBE_CHAIN_MODULE.EpisodeLocation.CONFIDENCE_LOW,
)
self.assertFalse(
SubscribeChain._is_episode_range_covered(
meta=meta,
subscribe=subscribe,
episode_location=low_location,
)
)
self.assertEqual(
SubscribeChain._SubscribeChain__get_best_version_interested_episodes(
subscribe=subscribe,
context=context,
priority=100,
episode_location=low_location,
),
[],
)
def test_full_best_version_rejects_partial_absolute_numbered_pack(self):
"""累计总集编号只有完整覆盖订阅目标长度时才可按整包洗版处理。"""
subscribe = self._build_subscribe(
best_version_full=1,
season=5,
total_episode=26,
episode_priority={},
)
meta = SimpleNamespace(
season_list=[5],
episode_list=list(range(57, 61)),
total_episode=4,
)
self.assertFalse(
SubscribeChain._SubscribeChain__is_full_season_best_version_resource(
meta=meta,
subscribe=subscribe,
)
)
self.assertFalse(
SubscribeChain._is_episode_range_covered(
meta=meta,
subscribe=subscribe,
)
)
def test_full_best_version_rejects_absolute_numbered_pack_without_season_match(self):
"""累计总集编号缺少同季信号时不自动映射,避免跨季合集误匹配。"""
subscribe = self._build_subscribe(
best_version_full=1,
season=5,
total_episode=26,
episode_priority={},
)
meta = SimpleNamespace(
season_list=[1],
episode_list=list(range(57, 83)),
total_episode=26,
)
self.assertFalse(
SubscribeChain._SubscribeChain__is_full_season_best_version_resource(
meta=meta,
subscribe=subscribe,
)
)
def test_full_season_resource_matches_legacy_metainfo_cases(self):
"""保留 #5648 覆盖过的真实标题合集识别样本,避免 MetaInfo 集成回归。"""
import app.core.metainfo as metainfo_module
from app.core.metainfo import MetaInfo
class _SystemConfigOper:
"""提供空系统配置,避免真实 MetaInfo 解析依赖测试数据库。"""
def get(self, *args, **kwargs):
return None
cases = [
{
"title": "Cherry Season S01 2014 2160p 60fps WEB-DL H265 AAC-XXX",
"subtitle": "",
"subscribe": {"season": None, "start_episode": 1, "total_episode": 51},
"expected": True,
},
{
"title": "【爪爪字幕组】★7月新番[欢迎来到实力至上主义的教室 第二季/Youkoso Jitsuryoku Shijou Shugi no Kyoushitsu e S2][11][1080p][HEVC][GB][MP4][招募翻译校对]",
"subtitle": "",
"subscribe": {"season": None, "start_episode": 1, "total_episode": 13},
"expected": False,
},
{
"title": "[秋叶原冥途战争][Akiba Maid Sensou][2022][WEB-DL][1080][TV Series][第01话][LeagueWEB]",
"subtitle": "",
"subscribe": {"season": None, "start_episode": 1, "total_episode": 12},
"expected": False,
},
{
"title": "Qi Refining for 3000 Years S01E06 2022 1080p B-Blobal WEB-DL X264 AAC-AnimeS@AdWeb",
"subtitle": "",
"subscribe": {"season": None, "start_episode": 1, "total_episode": 16},
"expected": False,
},
{
"title": "The Heart of Genius S01 13-14 2022 1080p WEB-DL H264 AAC",
"subtitle": "",
"subscribe": {"season": None, "start_episode": 1, "total_episode": 34},
"expected": False,
},
{
"title": "[xyx98]传颂之物/Utawarerumono/うたわれるもの[BDrip][1920x1080][TV 01-26 Fin][hevc-yuv420p10 flac_ac3][ENG PGS]",
"subtitle": "",
"subscribe": {"season": None, "start_episode": 1, "total_episode": 26},
"expected": True,
},
{
"title": "I Woke Up a Vampire S02 2023 2160p NF WEB-DL DDP5.1 Atmos H 265-HHWEB",
"subtitle": "醒来变成吸血鬼 第二季 | 全8集 | 4K | 类型: 喜剧/家庭/奇幻 | 导演: TommyLynch | 主演: NikoCeci/ZebastinBorjeau/安娜·阿劳约/KaileenAngelicChang/KrisSiddiqi",
"subscribe": {"season": None, "start_episode": 1, "total_episode": 8},
"expected": True,
},
{
"title": "Shadows of the Void S01 2024 1080p WEB-DL H264 AAC-HHWEB",
"subtitle": "虚无边境 | 第01-02集 | 1080p | 类型: 动画 | 导演: 巴西 | 主演: 山新/周一菡/皇贞季/Kenz/李佳怡 [内嵌中字]",
"subscribe": {"season": None, "start_episode": 1, "total_episode": 13},
"expected": False,
},
{
"title": "Mai Xiang S01 2019 2160p WEB-DL H.265 DDP2.0-HHWEB",
"subtitle": "麦香 | 全36集 | 4K | 类型:剧情/爱情/家庭 | 主演:傅晶/章呈赫/王伟/沙景昌/何音",
"subscribe": {"season": None, "start_episode": 1, "total_episode": 36},
"expected": True,
},
{
"title": "Jigokuraku S01E14-E25 2023 1080p CR WEB-DL x264 AAC-Nest@ADWeb",
"subtitle": "地狱乐 / 地獄楽 / Hells Paradise [14-25Fin] [中日双语字幕]",
"subscribe": {"season": None, "start_episode": 14, "total_episode": 25},
"expected": True,
},
{
"title": "Jigokuraku S01 2023 1080p BluRay Remux AVC FLAC 2.0-AnimeF@ADE",
"subtitle": "地狱乐/Hell's Paradise: Jigokuraku [01-13Fin] [中日双语字幕]",
"subscribe": {"season": None, "start_episode": 1, "total_episode": 13},
"expected": True,
},
{
"title": "Jigokuraku S02E12 2026 1080p NF WEB-DL x264 AAC-ADWeb",
"subtitle": "地狱乐 第二季 地獄楽 第二期 第12集 | 类型: 动画",
"subscribe": {"season": None, "start_episode": 1, "total_episode": 12},
"expected": False,
},
{
"title": "Jigokuraku S02E05-E07 2026 1080p NF WEB-DL x264 AAC-ADWeb",
"subtitle": "地狱乐 第二季 地獄楽 第二期 第05-07集 | 类型: 动画",
"subscribe": {"season": None, "start_episode": 1, "total_episode": 12},
"expected": False,
},
{
"title": "Bungo Stray Dogs S01 2016 1080p KKTV WEB-DL x264 AAC-ADWeb",
"subtitle": "文豪野犬 文豪ストレイドッグス 又名: 文豪Stray Dogs 第一季 全12集 | 类型: 剧情 / 动作 / 动画 主演: 上村祐翔 / 宫野真守 / 细谷佳正 *内嵌繁体字幕*",
"subscribe": {"season": None, "start_episode": 1, "total_episode": 12},
"expected": True,
},
{
"title": "Bungou Stray Dogs S1+S2+S3+OAD 1080p BDRip HEVC FLAC-Snow-Raws",
"subtitle": "文豪野犬 第1-3季",
"subscribe": {"season": None, "start_episode": 1, "total_episode": 36},
"expected": True,
},
{
"title": "Bungou Stray Dogs S1+S2+S3+OAD 1080p BDRip HEVC FLAC-Snow-Raws",
"subtitle": "文豪野犬 第1-3季",
"subscribe": {"season": None, "start_episode": 1, "total_episode": 60},
"expected": True,
},
{
"title": "Fu Gui S01 2005 2160p WEB-DL H265 AAC-HHWEB",
"subtitle": "福贵 | 全33集 | 4K | 类型: 剧情/家庭 | 导演: 朱正/袁进 | 主演: 陈创/刘敏涛/李丁/张鹰/温玉娟",
"subscribe": {"season": None, "start_episode": 1, "total_episode": 33},
"expected": True,
},
{
"title": "The Story of Ming Lan S01 2018 2160p WEB-DL CHDWEB",
"subtitle": "知否知否应是绿肥红瘦 全78集 | 2160p | 国语/中字 | 60帧高码TV版 | 类型:剧情/爱情/古装 | 主演:赵丽颖/冯绍峰/朱一龙/施诗/张佳宁",
"subscribe": {"season": None, "start_episode": 1, "total_episode": 78},
"expected": True,
},
{
"title": "Love Beyond the Grave S01 2026 2160p WEB-DL H265 AAC-HHWEB",
"subtitle": "白日提灯 / 慕胥辞 | 第18集 | 4K | 类型: 剧情 | 导演: 秦榛 | 主演: 迪丽热巴/陈飞宇/魏哲鸣/张俪/高鹤元",
"subscribe": {"season": None, "start_episode": 1, "total_episode": 40},
"expected": False,
},
{
"title": "The Long Ballad S01 2021 2160p WEB-DL H265 AAC-HHWEB",
"subtitle": "长歌行 | 全49集 | 4K | 类型: 剧情/爱情/古装 | 主演: 迪丽热巴/吴磊/刘宇宁/赵露思/方逸伦",
"subscribe": {"season": None, "start_episode": 1, "total_episode": 49},
"expected": True,
},
{
"title": "The Long Ballad S01E01-E04 2021 2160p WEB-DL H265 AAC-HHWEB",
"subtitle": "长歌行 | 第01-04集 | 4K | 类型: 剧情/爱情/古装 | 主演: 迪丽热巴/吴磊/刘宇宁/赵露思/方逸伦",
"subscribe": {"season": None, "start_episode": 1, "total_episode": 49},
"expected": False,
},
{
"title": "Spy x Family S02 2023 1080p Baha WEB-DL x264 AAC-ADWeb",
"subtitle": "间谍过家家 第二季 / SPY×FAMILY Season 2 [01-12Fin] [简繁内封字幕]",
"subscribe": {"season": None, "start_episode": 1, "total_episode": 12},
"expected": True,
},
{
"title": "Spy x Family S02E03-E07 2023 1080p Baha WEB-DL x264 AAC-ADWeb",
"subtitle": "间谍过家家 第二季 / SPY×FAMILY Season 2 第03-07集 [简繁内封字幕]",
"subscribe": {"season": None, "start_episode": 1, "total_episode": 12},
"expected": False,
},
{
"title": "Naruto Shippuden S01-S21 Complete 1080p BluRay x264 AAC-ADWeb",
"subtitle": "火影忍者 疾风传 全500集 [1080p][简中字幕]",
"subscribe": {"season": None, "start_episode": 1, "total_episode": 500},
"expected": True,
},
{
"title": "Naruto Shippuden S01-S21 Complete 1080p BluRay x264 AAC-ADWeb",
"subtitle": "火影忍者 疾风传 第01-500集 [1080p][简中字幕]",
"subscribe": {"season": None, "start_episode": 201, "total_episode": 500},
"expected": True,
},
{
"title": "Immortality S05 2025 2160p WEB-DL HDR H265 AAC-ADWeb",
"subtitle": "永生 第五季 全26集 总第57-82集 / 永生之太元仙府 / 永生 新篇章 / Immortality: Taiyuan Immortal Mansion [Bilibili大陆] | 类型:动作 动画 奇幻",
"subscribe": {"season": 5, "start_episode": 1, "total_episode": 26},
"expected": True,
},
{
"title": "Immortality S05 2025 2160p WEB-DL HDR H265 AAC-ADWeb",
"subtitle": "永生 第五季 第57-60集 / 永生之太元仙府",
"subscribe": {"season": 5, "start_episode": 1, "total_episode": 26},
"expected": False,
},
]
metainfo_module._rust_default_parse_options.cache_clear()
metainfo_module._rust_custom_parse_options.cache_clear()
with patch("app.db.systemconfig_oper.SystemConfigOper", _SystemConfigOper), patch(
"app.core.meta.releasegroup.SystemConfigOper", _SystemConfigOper
), patch(
"app.core.meta.words.SystemConfigOper", _SystemConfigOper
), patch(
"app.core.meta.customization.SystemConfigOper", _SystemConfigOper
):
for case in cases:
meta = MetaInfo(
title=case["title"], subtitle=case["subtitle"], custom_words=["#"]
)
subscribe = self._build_subscribe(
best_version_full=1,
episode_priority={},
current_priority=0,
**case["subscribe"],
)
with self.subTest(title=case["title"], subtitle=case["subtitle"]):
self.assertEqual(
SubscribeChain._SubscribeChain__is_full_season_resource(
meta=meta,
subscribe=subscribe,
),
case["expected"],
)
def test_episode_best_version_downloads_full_pack_before_episode_fallback(self):
subscribe = self._build_subscribe(best_version_full=0, total_episode=3)
full_pack_context = SimpleNamespace(
torrent_info=SimpleNamespace(pri_order=90),
media_info=SimpleNamespace(type=MediaType.TV),
meta_info=SimpleNamespace(season_list=[1], episode_list=[]),
)
episode_context = SimpleNamespace(
torrent_info=SimpleNamespace(pri_order=90),
media_info=SimpleNamespace(type=MediaType.TV),
meta_info=SimpleNamespace(season_list=[1], episode_list=[2]),
)
no_exists = {
"media-key": {
1: SimpleNamespace(season=1, episodes=[2], total_episode=3, start_episode=1)
}
}
calls = []
class _FakeDownloadChain:
"""记录批量下载调用,用于验证分集洗版会先尝试全集资源。"""
def batch_download(self, **kwargs):
calls.append(kwargs)
return [full_pack_context], {}
with patch.object(SUBSCRIBE_CHAIN_MODULE, "DownloadChain", _FakeDownloadChain):
downloads, lefts = SubscribeChain()._SubscribeChain__download_best_version_with_full_pack_first(
contexts=[episode_context, full_pack_context],
no_exists=no_exists,
subscribe=subscribe,
mediakey="media-key",
username="user",
save_path="/downloads",
downloader="qb",
source="subscribe",
)
self.assertEqual(downloads, [full_pack_context])
self.assertEqual(lefts, {})
self.assertEqual(len(calls), 1)
self.assertEqual(calls[0]["contexts"], [full_pack_context])
self.assertEqual(calls[0]["no_exists"]["media-key"][1].episodes, [])
def test_episode_best_version_falls_back_when_full_pack_not_downloaded(self):
subscribe = self._build_subscribe(best_version_full=0, total_episode=3)
full_pack_context = SimpleNamespace(
torrent_info=SimpleNamespace(pri_order=90),
media_info=SimpleNamespace(type=MediaType.TV),
meta_info=SimpleNamespace(season_list=[1], episode_list=[]),
)
episode_context = SimpleNamespace(
torrent_info=SimpleNamespace(pri_order=90),
media_info=SimpleNamespace(type=MediaType.TV),
meta_info=SimpleNamespace(season_list=[1], episode_list=[2]),
)
no_exists = {
"media-key": {
1: SimpleNamespace(season=1, episodes=[2], total_episode=3, start_episode=1)
}
}
calls = []
class _FakeDownloadChain:
"""模拟全集下载失败,验证后续会回退到按集下载。"""
def batch_download(self, **kwargs):
calls.append(kwargs)
if len(calls) == 1:
return [], kwargs["no_exists"]
return [episode_context], {}
with patch.object(SUBSCRIBE_CHAIN_MODULE, "DownloadChain", _FakeDownloadChain):
downloads, lefts = SubscribeChain()._SubscribeChain__download_best_version_with_full_pack_first(
contexts=[episode_context, full_pack_context],
no_exists=no_exists,
subscribe=subscribe,
mediakey="media-key",
)
self.assertEqual(downloads, [episode_context])
self.assertEqual(lefts, {})
self.assertEqual(len(calls), 2)
self.assertEqual(calls[0]["contexts"], [full_pack_context])
self.assertIs(calls[1]["no_exists"], no_exists)
def test_episode_best_version_skips_full_pack_first_when_pack_priority_equals_existing_episode(self):
"""验证全集优先级等于目标分集时回退到分集下载。"""
subscribe = self._build_subscribe(
best_version_full=0,
total_episode=3,
episode_priority={"1": 80, "2": 80, "3": 80},
current_priority=80,
)
full_pack_context = SimpleNamespace(
torrent_info=SimpleNamespace(pri_order=80),
media_info=SimpleNamespace(type=MediaType.TV),
meta_info=SimpleNamespace(season_list=[1], episode_list=[]),
)
episode_context = SimpleNamespace(
torrent_info=SimpleNamespace(pri_order=90),
media_info=SimpleNamespace(type=MediaType.TV),
meta_info=SimpleNamespace(season_list=[1], episode_list=[2]),
)
no_exists = {
"media-key": {
1: SimpleNamespace(season=1, episodes=[2], total_episode=3, start_episode=1)
}
}
calls = []
class _FakeDownloadChain:
"""记录回退下载调用,确保全集候选仍可参与拆包匹配。"""
def batch_download(self, **kwargs):
calls.append(kwargs)
return [episode_context], {}
with patch.object(SUBSCRIBE_CHAIN_MODULE, "DownloadChain", _FakeDownloadChain):
downloads, lefts = SubscribeChain()._SubscribeChain__download_best_version_with_full_pack_first(
contexts=[episode_context, full_pack_context],
no_exists=no_exists,
subscribe=subscribe,
mediakey="media-key",
)
self.assertEqual(downloads, [episode_context])
self.assertEqual(lefts, {})
self.assertEqual(len(calls), 1)
self.assertEqual(calls[0]["contexts"], [episode_context, full_pack_context])
self.assertIs(calls[0]["no_exists"], no_exists)
def test_episode_best_version_skips_full_pack_first_when_pack_priority_below_one_episode(self):
"""验证全集低于任一目标分集优先级时不会整包优先。"""
subscribe = self._build_subscribe(
best_version_full=0,
total_episode=3,
episode_priority={"1": 90, "2": 80, "3": 80},
current_priority=80,
)
full_pack_context = SimpleNamespace(
torrent_info=SimpleNamespace(pri_order=85),
media_info=SimpleNamespace(type=MediaType.TV),
meta_info=SimpleNamespace(season_list=[1], episode_list=[]),
)
no_exists = {
"media-key": {
1: SimpleNamespace(season=1, episodes=[2], total_episode=3, start_episode=1)
}
}
calls = []
class _FakeDownloadChain:
"""记录回退下载调用,验证低优先级全集不进入整包优先分支。"""
def batch_download(self, **kwargs):
calls.append(kwargs)
return [], kwargs["no_exists"]
with patch.object(SUBSCRIBE_CHAIN_MODULE, "DownloadChain", _FakeDownloadChain):
downloads, lefts = SubscribeChain()._SubscribeChain__download_best_version_with_full_pack_first(
contexts=[full_pack_context],
no_exists=no_exists,
subscribe=subscribe,
mediakey="media-key",
)
self.assertEqual(downloads, [])
self.assertIs(lefts, no_exists)
self.assertEqual(len(calls), 1)
self.assertEqual(calls[0]["contexts"], [full_pack_context])
self.assertIs(calls[0]["no_exists"], no_exists)
def test_full_pack_priority_check_uses_current_priority_fallback(self):
"""验证旧订阅没有分集状态时使用 current_priority 兜底判断。"""
subscribe = self._build_subscribe(total_episode=3, current_priority=80, episode_priority=None)
self.assertFalse(
SubscribeChain._SubscribeChain__is_full_season_priority_higher_than_all_targets(
subscribe=subscribe,
priority=80,
)
)
self.assertTrue(
SubscribeChain._SubscribeChain__is_full_season_priority_higher_than_all_targets(
subscribe=subscribe,
priority=81,
)
)
def test_update_subscribe_priority_uses_selected_episodes(self):
subscribe = self._build_subscribe(
total_episode=4,
episode_priority={"1": 100, "2": 80, "3": 70, "4": 60},
current_priority=80,
lack_episode=3,
)
download = self._build_download(
priority=90,
selected_episodes=[3],
meta_episodes=[2, 3, 4],
)
chain = SubscribeChain()
mediainfo = SimpleNamespace(title_year="Test Show (2026)")
with patch.object(SUBSCRIBE_CHAIN_MODULE, "SubscribeOper") as subscribe_oper_cls, patch.object(
SubscribeChain,
"_SubscribeChain__finish_subscribe",
) as finish_mock:
subscribe_oper = subscribe_oper_cls.return_value
subscribe_oper.update.return_value = None
chain.update_subscribe_priority(
subscribe=subscribe,
meta=SimpleNamespace(),
mediainfo=mediainfo,
downloads=[download],
)
subscribe_oper.update.assert_called_once()
payload = subscribe_oper.update.call_args.args[1]
self.assertEqual(payload["episode_priority"], {"1": 100, "2": 80, "3": 90, "4": 60})
self.assertEqual(payload["current_priority"], 90)
# update_subscribe_priority 不再回写 lack_episodelack 由下载链路末端的 __update_lack_episodes 维护
self.assertNotIn("lack_episode", payload)
self.assertEqual(subscribe.episode_priority, {"1": 100, "2": 80, "3": 90, "4": 60})
self.assertEqual(subscribe.current_priority, 90)
self.assertEqual(subscribe.lack_episode, 3)
finish_mock.assert_not_called()
def test_update_subscribe_priority_marks_complete_when_all_target_episodes_done(self):
subscribe = self._build_subscribe(
total_episode=3,
episode_priority={"1": 100, "2": 90, "3": 80},
current_priority=90,
lack_episode=2,
)
downloads = [
self._build_download(priority=100, selected_episodes=[2]),
self._build_download(priority=100, selected_episodes=[3]),
]
chain = SubscribeChain()
meta = SimpleNamespace()
mediainfo = SimpleNamespace(title_year="Test Show (2026)")
with patch.object(SUBSCRIBE_CHAIN_MODULE, "SubscribeOper") as subscribe_oper_cls, patch.object(
SubscribeChain,
"_SubscribeChain__finish_subscribe",
) as finish_mock:
subscribe_oper = subscribe_oper_cls.return_value
subscribe_oper.update.return_value = None
chain.update_subscribe_priority(
subscribe=subscribe,
meta=meta,
mediainfo=mediainfo,
downloads=downloads,
)
payload = subscribe_oper.update.call_args.args[1]
self.assertEqual(payload["episode_priority"], {"1": 100, "2": 100, "3": 100})
self.assertEqual(payload["current_priority"], 100)
# 完成判定仍由 __is_best_version_complete 走 episode_priority 字典做出lack_episode 不参与
self.assertNotIn("lack_episode", payload)
finish_mock.assert_called_once_with(subscribe=subscribe, meta=meta, mediainfo=mediainfo)
def test_update_subscribe_priority_normalizes_absolute_numbered_full_pack(self):
"""累计总集编号整包下载完成后,应按订阅季内集数更新洗版状态。"""
subscribe = self._build_subscribe(
best_version_full=1,
season=5,
total_episode=26,
episode_priority={},
current_priority=0,
lack_episode=26,
)
download = SimpleNamespace(
torrent_info=SimpleNamespace(pri_order=100),
selected_episodes=None,
located_episodes=None,
meta_info=SimpleNamespace(
season_list=[5],
episode_list=list(range(57, 83)),
total_episode=26,
),
)
chain = SubscribeChain()
meta = SimpleNamespace()
mediainfo = SimpleNamespace(title_year="Immortality (2025)")
with patch.object(SUBSCRIBE_CHAIN_MODULE, "SubscribeOper") as subscribe_oper_cls, patch.object(
SubscribeChain,
"_SubscribeChain__finish_subscribe",
) as finish_mock:
subscribe_oper = subscribe_oper_cls.return_value
subscribe_oper.update.return_value = None
chain.update_subscribe_priority(
subscribe=subscribe,
meta=meta,
mediainfo=mediainfo,
downloads=[download],
)
payload = subscribe_oper.update.call_args.args[1]
self.assertEqual(payload["episode_priority"], {str(ep): 100 for ep in range(1, 27)})
self.assertEqual(payload["current_priority"], 100)
finish_mock.assert_called_once_with(subscribe=subscribe, meta=meta, mediainfo=mediainfo)
def test_full_best_version_updates_all_episodes_when_pack_has_no_episode_metadata(self):
subscribe = self._build_subscribe(
best_version_full=1,
total_episode=3,
episode_priority={"1": 80, "2": 80, "3": 80},
current_priority=80,
lack_episode=3,
)
download = self._build_download(priority=100, selected_episodes=[], meta_episodes=[])
chain = SubscribeChain()
meta = SimpleNamespace()
mediainfo = SimpleNamespace(title_year="Test Show (2026)")
with patch.object(SUBSCRIBE_CHAIN_MODULE, "SubscribeOper") as subscribe_oper_cls, patch.object(
SubscribeChain,
"_SubscribeChain__finish_subscribe",
) as finish_mock:
subscribe_oper = subscribe_oper_cls.return_value
subscribe_oper.update.return_value = None
chain.update_subscribe_priority(
subscribe=subscribe,
meta=meta,
mediainfo=mediainfo,
downloads=[download],
)
payload = subscribe_oper.update.call_args.args[1]
self.assertEqual(payload["episode_priority"], {"1": 100, "2": 100, "3": 100})
self.assertEqual(payload["current_priority"], 100)
self.assertNotIn("lack_episode", payload)
finish_mock.assert_called_once_with(subscribe=subscribe, meta=meta, mediainfo=mediainfo)
def test_episode_best_version_updates_all_episodes_when_full_pack_has_no_episode_metadata(self):
subscribe = self._build_subscribe(
best_version_full=0,
total_episode=3,
episode_priority={"1": 80, "2": 80, "3": 80},
current_priority=80,
lack_episode=3,
)
download = self._build_download(priority=100, selected_episodes=[], meta_episodes=[])
chain = SubscribeChain()
meta = SimpleNamespace()
mediainfo = SimpleNamespace(title_year="Test Show (2026)")
with patch.object(SUBSCRIBE_CHAIN_MODULE, "SubscribeOper") as subscribe_oper_cls, patch.object(
SubscribeChain,
"_SubscribeChain__finish_subscribe",
) as finish_mock:
subscribe_oper = subscribe_oper_cls.return_value
subscribe_oper.update.return_value = None
chain.update_subscribe_priority(
subscribe=subscribe,
meta=meta,
mediainfo=mediainfo,
downloads=[download],
)
payload = subscribe_oper.update.call_args.args[1]
self.assertEqual(payload["episode_priority"], {"1": 100, "2": 100, "3": 100})
self.assertEqual(payload["current_priority"], 100)
self.assertNotIn("lack_episode", payload)
finish_mock.assert_called_once_with(subscribe=subscribe, meta=meta, mediainfo=mediainfo)
def test_check_resets_current_priority_when_new_episodes_expand_target_range(self):
subscribe = self._build_subscribe(
total_episode=3,
episode_priority={"1": 100, "2": 100, "3": 100},
current_priority=100,
lack_episode=0,
)
chain = SubscribeChain()
chain.recognize_media = lambda **kwargs: SimpleNamespace(
seasons={1: [1, 2, 3, 4, 5]},
title="Test Show",
year="2026",
vote_average=9.5,
overview="overview",
imdb_id="tt1234567",
tvdb_id=99,
get_poster_image=lambda: "poster",
get_backdrop_image=lambda: "backdrop",
)
with patch.object(SUBSCRIBE_CHAIN_MODULE, "SubscribeOper") as subscribe_oper_cls:
subscribe_oper = subscribe_oper_cls.return_value
subscribe_oper.list.return_value = [subscribe]
subscribe_oper.update.return_value = None
chain.check()
payload = subscribe_oper.update.call_args.args[1]
self.assertEqual(payload["total_episode"], 5)
self.assertEqual(payload["lack_episode"], 2)
self.assertEqual(payload["current_priority"], 0)
self.assertEqual(payload["episode_priority"], {"1": 100, "2": 100, "3": 100, "4": 0, "5": 0})
self.assertEqual(subscribe.total_episode, 5)
self.assertEqual(subscribe.lack_episode, 2)
self.assertEqual(subscribe.current_priority, 0)
def test_best_version_interested_episodes_excludes_same_priority(self):
"""同 pri_order 的候选不应再把已达到该优先级的集列为可升级集。
回归场景E2 已记录在 episode_priority 中为 99候选种子标题覆盖 E2/E3 且
其 pri_order=99E2 不应进入 interested 集合E3None则应进入。这是
洗版重复下载链路的源头判定,必须保持"严格大于"语义。
"""
subscribe = self._build_subscribe(
total_episode=3,
episode_priority={"1": 100, "2": 99},
current_priority=100,
)
context = _build_context(meta_info=SimpleNamespace(season_list=[1], episode_list=[2, 3]))
interested = SubscribeChain._SubscribeChain__get_best_version_interested_episodes(
subscribe=subscribe,
context=context,
priority=99,
)
self.assertEqual(interested, [3])
def test_best_version_interested_episodes_uses_title_episode_list_for_full_pack(self):
"""整包候选(标题展开的集列表)只把仍可提升优先级的集纳入 interested。
防回归场景:标题显示"第53-104集",实际目标范围只有 1..92episode_priority
已经把 1..82 升到 100E83 已经记到 99。同 pri_order=99 的同一资源再来时,
interested 应只剩 [84..92],绝不能含 E83否则后续下载层会再下一次同优先级。
"""
subscribe = self._build_subscribe(
total_episode=92,
episode_priority={
**{str(ep): 100 for ep in range(1, 83)},
"83": 99,
},
current_priority=99,
)
context = _build_context(meta_info=SimpleNamespace(season_list=[1], episode_list=list(range(53, 105))))
interested = SubscribeChain._SubscribeChain__get_best_version_interested_episodes(
subscribe=subscribe,
context=context,
priority=99,
)
self.assertEqual(interested, list(range(84, 93)))
class SubscribeFilterAllowedEpisodesTest(TestCase):
"""验证洗版过滤循环会把 interested 集合落到 context.allowed_episodes 上。
这条用例直接覆盖回归点:当 __get_best_version_interested_episodes 返回非空
集合时,候选必须带着允许集进入下载层,下游 batch_download 才能在标题元数据
与实际种子文件错位时做出正确取舍。
"""
def _build_subscribe(self, **overrides):
return SubscribeChainTest()._build_subscribe(**overrides)
def test_filter_writes_allowed_episodes_to_context(self):
subscribe = self._build_subscribe(
total_episode=92,
episode_priority={
**{str(ep): 100 for ep in range(1, 83)},
"83": 99,
},
current_priority=99,
)
context = _build_context(meta_info=SimpleNamespace(season_list=[1], episode_list=list(range(53, 105))))
interested = SubscribeChain._SubscribeChain__get_best_version_interested_episodes(
subscribe=subscribe,
context=context,
priority=99,
)
# 复刻 subscribe.py 过滤循环中的赋值,确认结果作为允许集传递。
context.allowed_episodes = set(interested) if interested else None
self.assertIsNotNone(context.allowed_episodes)
self.assertEqual(context.allowed_episodes, set(range(84, 93)))
# 关键回归点E83 已达到 99不在允许集内下游交集后即不会再下 E83。
self.assertNotIn(83, context.allowed_episodes)
def test_filter_leaves_allowed_episodes_none_when_no_upgrade(self):
"""同 pri_order 且目标集均已达到该优先级时,候选不应被放行,
相应地也不会有 allowed_episodes 被写入。"""
subscribe = self._build_subscribe(
total_episode=3,
episode_priority={"1": 100, "2": 99, "3": 99},
current_priority=99,
)
context = _build_context(meta_info=SimpleNamespace(season_list=[1], episode_list=[2, 3]))
interested = SubscribeChain._SubscribeChain__get_best_version_interested_episodes(
subscribe=subscribe,
context=context,
priority=99,
)
self.assertEqual(interested, [])
def test_filter_writes_allowed_episodes_in_match_path(self):
"""RSS/订阅刷新分支 match() 需要与 search() 对称地写入 allowed_episodes。
match() 路径下候选是 `_context = copy.copy(context)`,再走 best_version
判定。此用例复刻 match() 的过滤序列,验证浅拷贝后的 _context 在写入
allowed_episodes 时不会污染原始 context且写入结果与 search() 一致。
若 match() 分支漏写 allowed_episodes下游 batch_download 将看不到允许集
约束,回归到 2c458317 之前的同优先级重复下载状态。
"""
import copy
subscribe = self._build_subscribe(
total_episode=92,
episode_priority={
**{str(ep): 100 for ep in range(1, 83)},
"83": 99,
},
current_priority=99,
)
original_context = _build_context(
meta_info=SimpleNamespace(season_list=[1], episode_list=list(range(53, 105)))
)
_context = copy.copy(original_context)
interested = SubscribeChain._SubscribeChain__get_best_version_interested_episodes(
subscribe=subscribe,
context=_context,
priority=99,
)
# 复刻 match() 中的赋值search() 与 match() 必须保持同形以避免分支漏改。
if interested:
_context.allowed_episodes = set(interested)
self.assertEqual(_context.allowed_episodes, set(range(84, 93)))
# 浅拷贝 + 新字段写入不应反向污染源 contextmatch() 中 contexts 缓存可能跨多次匹配复用)。
self.assertIsNone(original_context.allowed_episodes)
class SubscribeNoteTrackingTest(TestCase):
"""覆盖洗版与非洗版下 subscribe.note 的下载历史追踪。
回归目标finish_subscribe_or_not 必须在所有订阅模式下都把本轮下载的集数追加进
subscribe.note__get_downloaded 在洗版分支必须把 note 与 episode_priority==100
的完成集合并返回,避免迁移或低优先级下载场景下已下集被误判为"未下载"
"""
def _build_subscribe(self, **overrides):
return SubscribeChainTest()._build_subscribe(**overrides)
@staticmethod
def _build_download_context(episodes):
"""构造一个最小化下载 context只携带 finish_subscribe_or_not / __update_subscribe_note 路径会读到的字段。"""
return SimpleNamespace(
meta_info=SimpleNamespace(season_list=[1], episode_list=list(episodes)),
media_info=SimpleNamespace(
type=MediaType.TV,
tmdb_id=1,
douban_id=None,
),
torrent_info=SimpleNamespace(pri_order=99, title="fake-torrent"),
selected_episodes=list(episodes),
)
def test_finish_subscribe_writes_note_for_best_version_downloads(self):
"""洗版分支若产生 downloadssubscribe.note 必须被追加,不再被 best_version 标志拦截。
旧逻辑只在非洗版分支调用 __update_subscribe_note导致 best_version=1 时
下载历史只落在 episode_priority用户切回普通订阅或排障对账时缺失"下过哪些集"
的事实源。这条用例验证修复后两个分支都会写 note。
"""
subscribe = self._build_subscribe(
best_version=1,
total_episode=92,
episode_priority={"1": 100},
note=[1],
)
chain = SubscribeChain()
downloads = [self._build_download_context([83])]
captured_updates = []
class _SubscribeOper:
def update(self, subscribe_id, payload):
captured_updates.append((subscribe_id, payload))
def get(self, *args, **kwargs):
return subscribe
with patch.object(SUBSCRIBE_CHAIN_MODULE, "SubscribeOper", _SubscribeOper), patch.object(
SubscribeChain,
"update_subscribe_priority",
), patch.object(
SubscribeChain,
"_SubscribeChain__finish_subscribe",
):
chain.finish_subscribe_or_not(
subscribe=subscribe,
meta=SimpleNamespace(type=MediaType.TV),
mediainfo=SimpleNamespace(title_year="Test Show (2026)", type=MediaType.TV,
tmdb_id=1, douban_id=None),
downloads=downloads,
lefts=None,
)
# note 更新必然发生在 SubscribeOper.update 上,定位"note" 键的最近一次写入。
note_writes = [payload["note"] for _, payload in captured_updates if "note" in payload]
self.assertTrue(note_writes, "best_version downloads should still trigger note update")
self.assertIn(83, note_writes[-1])
self.assertIn(1, note_writes[-1]) # 既有 note 保留
def test_finish_subscribe_skips_note_when_no_downloads(self):
"""没有 downloads 时不应触碰 note避免空写入或误清除。"""
subscribe = self._build_subscribe(best_version=1, total_episode=92, note=[1, 2])
chain = SubscribeChain()
captured_updates = []
class _SubscribeOper:
def update(self, subscribe_id, payload):
captured_updates.append((subscribe_id, payload))
def get(self, *args, **kwargs):
return subscribe
with patch.object(SUBSCRIBE_CHAIN_MODULE, "SubscribeOper", _SubscribeOper), patch.object(
SubscribeChain,
"_SubscribeChain__is_best_version_complete",
return_value=False,
), patch.object(
SubscribeChain,
"_SubscribeChain__finish_subscribe",
):
chain.finish_subscribe_or_not(
subscribe=subscribe,
meta=SimpleNamespace(type=MediaType.TV),
mediainfo=SimpleNamespace(title_year="Test Show (2026)", type=MediaType.TV,
tmdb_id=1, douban_id=None),
downloads=None,
lefts=None,
)
# 无下载时不应该有 note 写入。
self.assertFalse(
[payload for _, payload in captured_updates if "note" in payload],
"note must not be touched when downloads is empty",
)
def test_get_downloaded_best_version_returns_only_completed_episodes(self):
"""关键回归:洗版分支不得把 note 合并进 __get_downloaded 返回值。
否则 check_and_handle_existing_media → __get_subscribe_no_exits 会把
priority<100 但已下载的集从 pending no_exists 中减掉,配合 force=True 但
__is_best_version_complete=False 的 finish_subscribe_or_not会让订阅每轮
都跳过搜索却又永远不完成。__get_downloaded 在洗版下的语义是"无需再处理的
",只有 priority==100 才满足该语义。
"""
subscribe = self._build_subscribe(
best_version=1,
total_episode=3,
episode_priority={"1": 100, "2": 100, "3": 99},
note=[1, 2, 3],
)
downloaded = SubscribeChain._SubscribeChain__get_downloaded(subscribe)
# E3 priority=99 仍是 pending绝对不能合并到 downloaded 里
self.assertEqual(downloaded, [1, 2])
self.assertNotIn(3, downloaded)
def test_get_downloaded_non_best_version_reads_note_after_wash_migration(self):
"""迁移场景:洗版期间 finish_subscribe_or_not 把下载集写入 note
用户随后把 best_version 关掉,订阅切回普通模式时 __get_downloaded
从非洗版分支读取 note旧洗版集仍能作为"已下载"被识别,避免重新匹配。
"""
subscribe = self._build_subscribe(
best_version=0,
total_episode=5,
episode_priority={"1": 100, "2": 99}, # 旧洗版残留,普通分支不读
note=[1, 2, 3],
)
downloaded = SubscribeChain._SubscribeChain__get_downloaded(subscribe)
self.assertEqual(downloaded, [1, 2, 3])