diff --git a/.pylintrc b/.pylintrc index f4b2c0f7..ee6860c7 100644 --- a/.pylintrc +++ b/.pylintrc @@ -5,38 +5,30 @@ init-hook='import sys; sys.path.append(".")' # 忽略的文件和目录 ignore=.git,__pycache__,.venv,build,dist,tests,docs +# 通过 `pylint app/` 检查主程序时不扫描内置插件目录, +# 插件依赖和动态模型较多,容易产生与主程序无关的误报。 +ignore-paths=^app/plugins(/|$) + # 并行作业数量 jobs=0 [MESSAGES CONTROL] -# 只关注错误级别的问题,禁用警告、约定和重构建议 -# E = Error (错误) - 会导致构建失败 -# W = Warning (警告) - 仅显示,不会失败 -# R = Refactor (重构建议) - 仅显示,不会失败 -# C = Convention (约定) - 仅显示,不会失败 -# I = Information (信息) - 仅显示,不会失败 - -# 禁用大部分警告、约定和重构建议,只保留错误和重要警告 +# 只启用确定性较强的严重问题检查,避免 SQLAlchemy、FastAPI 依赖注入、 +# 第三方 SDK 等动态对象被 Pylint 推断成误报。 disable=all -enable=E, - syntax-error, +enable=syntax-error, undefined-variable, used-before-assignment, + possibly-used-before-assignment, unreachable, return-outside-function, yield-outside-function, continue-in-finally, nonlocal-without-binding, undefined-loop-variable, - redefined-builtin, - not-callable, - assignment-from-no-return, - no-value-for-parameter, - too-many-function-args, - unexpected-keyword-arg, - redundant-keyword-arg, import-error, - relative-beyond-top-level + relative-beyond-top-level, + no-name-in-module [REPORTS] # 设置报告格式 diff --git a/app/chain/transfer.py b/app/chain/transfer.py index 4d4602b9..b54c0594 100755 --- a/app/chain/transfer.py +++ b/app/chain/transfer.py @@ -2245,6 +2245,10 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): ) if not built_meta: return None + if not meta: + # _build_path_meta 已经应用过手动季集/自定义格式覆盖; + # 这里避免再次偏移集数,导致手动整理的集数偏移翻倍。 + return built_meta return _apply_meta_overrides(built_meta, source_path) def _build_path_meta( diff --git a/tests/test_transfer_job_manager.py b/tests/test_transfer_job_manager.py index de6610d0..c833d0c0 100644 --- a/tests/test_transfer_job_manager.py +++ b/tests/test_transfer_job_manager.py @@ -1,10 +1,11 @@ import unittest +from pathlib import Path from types import SimpleNamespace from unittest.mock import patch, MagicMock from app.core.config import settings from app.chain.transfer import JobManager, TransferChain -from app.schemas import FileItem, TransferInfo, TransferTask +from app.schemas import EpisodeFormat, FileItem, TransferInfo, TransferTask from app.schemas.types import EventType, MediaType @@ -126,6 +127,49 @@ def migrate_to_media_job(jobview: JobManager, task: TransferTask): class TransferJobManagerTest(unittest.TestCase): + def test_manual_episode_offset_applies_once(self): + chain = make_transfer_chain() + source_fileitem = make_fileitem("/downloads/Test.Show.2026.S01E14.mkv") + planned_episodes = [] + + chain._TransferChain__get_trans_fileitems = lambda fileitem, predicate: [ + (source_fileitem, False) + ] + chain._TransferChain__put_to_jobview = lambda task: True + chain._TransferChain__register_scrape_batch_task = lambda task: None + chain._TransferChain__close_scrape_batch = lambda batch_id: None + + def fake_handle_transfer(task, callback=None): + planned_episodes.append(task.meta.begin_episode) + return True, "" + + chain._TransferChain__handle_transfer = fake_handle_transfer + + transfer_history_oper = SimpleNamespace(get_by_src=lambda src, storage=None: None) + download_history_oper = SimpleNamespace( + get_by_hash=lambda download_hash: None, + get_file_by_fullpath=lambda fullpath: None, + get_files_by_savepath=lambda savepath: [], + get_by_path=lambda path: None, + ) + system_config_oper = SimpleNamespace(get=lambda key: None) + + with patch("app.chain.transfer.TransferHistoryOper", return_value=transfer_history_oper), \ + patch("app.chain.transfer.DownloadHistoryOper", return_value=download_history_oper), \ + patch("app.chain.transfer.SystemConfigOper", return_value=system_config_oper), \ + patch("app.chain.transfer.MetaInfoPath", lambda *args, **kwargs: FakeMeta(14)): + state, errmsg = chain.do_transfer( + fileitem=source_fileitem, + mediainfo=FakeMedia(), + target_path=Path("/library"), + epformat=EpisodeFormat(offset="-1"), + background=False, + ) + + self.assertTrue(state, errmsg) + # 手动集数偏移只能应用一次,避免 E14 + (-1) 被二次处理成 E12。 + self.assertEqual([13], planned_episodes) + def test_completed_media_job_is_removed_after_last_meta_task_fails(self): jobview = JobManager() tasks = [make_task(episode) for episode in range(1, 4)]