fix: prevent storage operations in preview mode and add tests for transfer preview logic

This commit is contained in:
jxxghp
2026-05-13 13:39:57 +08:00
parent f8d096f476
commit f0bc1bd681
6 changed files with 237 additions and 57 deletions

View File

@@ -1974,6 +1974,9 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
return False, f"{fileitem.name} 没有找到可整理的媒体文件"
planned_file_count = len(file_items)
if preview:
logger.info(f"正在预览 {planned_file_count} 个文件的整理路径...")
else:
logger.info(f"正在计划整理 {planned_file_count} 个文件...")
# 整理所有文件

View File

@@ -860,6 +860,17 @@ class PluginHelper(metaclass=WeakSingleton):
strategies.append(("直连", base_cmd))
return strategies
@staticmethod
def __build_runtime_pip_command(*args: str) -> List[str]:
"""
优先使用当前解释器同目录的 pip 入口,以便 uv-pip-compat 能接管兼容命令。
"""
pip_name = "pip.exe" if sys.platform == "win32" else "pip"
pip_bin = Path(sys.executable).with_name(pip_name)
if pip_bin.exists():
return [str(pip_bin), *args]
return [sys.executable, "-m", "pip", *args]
@staticmethod
def __format_pkg_name_for_pip(name: str) -> str:
"""
@@ -1217,7 +1228,7 @@ class PluginHelper(metaclass=WeakSingleton):
安装完成后立即执行运行环境自检,尽量在插件加载前发现依赖图已被污染。
"""
checks = [
("pip check", [sys.executable, "-m", "pip", "check"]),
("pip check", cls.__build_runtime_pip_command("check")),
("核心依赖导入检查", [sys.executable, "-c", cls._runtime_import_probe]),
]
for check_name, command in checks:

View File

@@ -63,6 +63,26 @@ class TransHandler:
current_value = value
setattr(result, key, current_value)
@staticmethod
def __build_preview_item(
storage: str,
path: Path,
item_type: str,
size: Optional[int] = None,
) -> FileItem:
"""
构造预览结果中的文件项,不访问真实存储。
"""
return FileItem(
storage=storage,
path=path.as_posix(),
name=path.name,
basename=path.stem,
type=item_type,
extension=path.suffix.lstrip(".") if item_type == "file" else None,
size=size if item_type == "file" else None,
)
def transfer_media(
self,
fileitem: FileItem,
@@ -161,12 +181,10 @@ class TransHandler:
else:
new_path = target_path / fileitem.name
if preview:
preview_diritem = FileItem(
preview_diritem = self.__build_preview_item(
storage=target_storage,
path=new_path.as_posix(),
name=new_path.name,
basename=new_path.stem,
type="dir",
path=new_path,
item_type="dir",
)
self.__update_result(
result=result,
@@ -291,14 +309,34 @@ class TransHandler:
# 目标目录
if preview:
target_diritem = FileItem(
# 预览只做路径推算,不检查目录或同名文件冲突,避免目标存储探测触发真实整理。
target_diritem = self.__build_preview_item(
storage=target_storage,
path=folder_path.as_posix(),
name=folder_path.name,
basename=folder_path.stem,
type="dir",
path=folder_path,
item_type="dir",
)
else:
target_item = self.__build_preview_item(
storage=target_storage,
path=new_file,
item_type="file",
size=fileitem.size,
)
self.__update_result(
result=result,
success=True,
fileitem=fileitem,
target_item=target_item,
target_diritem=target_diritem,
file_list=[fileitem.path],
file_list_new=[new_file.as_posix()],
file_count=1,
total_size=fileitem.size or 0,
need_scrape=need_scrape,
transfer_type=transfer_type,
need_notify=False,
)
return result
target_diritem = target_oper.get_folder(folder_path)
if not target_diritem:
logger.error(f"目标目录 {folder_path} 获取失败")
@@ -439,40 +477,11 @@ class TransHandler:
logger.info(
f"当前整理覆盖模式设置为 {overwrite_mode},仅保留最新版本,正在删除已有版本文件 ..."
)
if not preview:
self.__delete_version_files(target_oper, new_file)
else:
# 附加文件 总是需要覆盖
overflag = True
if preview:
target_item = target_oper.get_item(new_file)
if not target_item:
target_item = FileItem(
storage=target_storage,
path=new_file.as_posix(),
name=new_file.name,
basename=new_file.stem,
type="file",
extension=new_file.suffix.lstrip("."),
size=fileitem.size,
)
self.__update_result(
result=result,
success=True,
fileitem=fileitem,
target_item=target_item,
target_diritem=target_diritem,
file_list=[fileitem.path],
file_list_new=[new_file.as_posix()],
file_count=1,
total_size=fileitem.size or 0,
need_scrape=need_scrape,
transfer_type=transfer_type,
need_notify=False,
)
return result
# 整理文件
new_item, err_msg = self.__transfer_file(
fileitem=fileitem,

View File

@@ -24,6 +24,21 @@ else
exit 127
fi
has_environment_option() {
while [ "$#" -gt 0 ]; do
case "$1" in
-p|--python|--python=*|-p*|--system)
return 0
;;
--)
return 1
;;
esac
shift
done
return 1
}
case "${COMMAND_NAME}" in
pip|pip3|pip3.*)
if [ "$#" -eq 0 ]; then
@@ -38,6 +53,14 @@ case "${COMMAND_NAME}" in
shift
exec "${UV_BIN}" help pip "$@"
;;
check)
if [ -x "${SCRIPT_DIR}/python" ] && ! has_environment_option "$@"; then
# uv 不会仅凭 pip 软链接位置锁定 venv显式绑定当前运行态解释器。
shift
exec "${UV_BIN}" pip check --python "${SCRIPT_DIR}/python" "$@"
fi
exec "${UV_BIN}" pip "$@"
;;
*)
exec "${UV_BIN}" pip "$@"
;;

View File

@@ -267,6 +267,7 @@ class PluginHelperTest(TestCase):
repair_commands = []
healthcheck_failed = False
pip_check_cmd = PluginHelper._PluginHelper__build_runtime_pip_command("check")
def fake_execute(cmd):
nonlocal healthcheck_failed
@@ -275,7 +276,7 @@ class PluginHelperTest(TestCase):
repair_commands.append(cmd)
return True, "repaired"
return True, "installed"
if cmd[:4] == [sys.executable, "-m", "pip", "check"]:
if cmd == pip_check_cmd:
if not healthcheck_failed:
healthcheck_failed = True
return False, "broken"

View File

@@ -0,0 +1,133 @@
from pathlib import Path
from app.core.context import MediaInfo
from app.core.metainfo import MetaInfoPath
from app.modules.filemanager import FileManagerModule
from app.schemas import FileItem, TransferDirectoryConf
from app.schemas.types import MediaType
class GuardedStorage:
"""
用于验证预览模式不会访问可能有副作用的存储整理接口。
"""
def get_folder(self, path: Path): # pragma: no cover - 被调用即失败
raise AssertionError(f"预览不应创建或获取目标目录:{path}")
def get_item(self, path: Path): # pragma: no cover - 被调用即失败
raise AssertionError(f"预览不应探测目标文件:{path}")
def copy(self, *args, **kwargs): # pragma: no cover - 被调用即失败
raise AssertionError("预览不应复制文件")
def move(self, *args, **kwargs): # pragma: no cover - 被调用即失败
raise AssertionError("预览不应移动文件")
def rename(self, *args, **kwargs): # pragma: no cover - 被调用即失败
raise AssertionError("预览不应重命名文件")
def delete(self, *args, **kwargs): # pragma: no cover - 被调用即失败
raise AssertionError("预览不应删除文件")
def test_cloud_storage_preview_only_calculates_target_path():
fileitem = FileItem(
storage="alist",
path="/downloads/Test.Show.S01E01.mkv",
type="file",
name="Test.Show.S01E01.mkv",
basename="Test.Show.S01E01",
extension="mkv",
size=1024,
)
meta = MetaInfoPath(Path(fileitem.path))
mediainfo = MediaInfo(
type=MediaType.TV,
title="Test Show",
year="2026",
tmdb_id=12345,
)
target_directory = TransferDirectoryConf(
name="cloud-library",
transfer_type="copy",
overwrite_mode="latest",
library_path="/library",
library_storage="alist",
renaming=True,
scraping=True,
notify=True,
)
guarded_storage = GuardedStorage()
transferinfo = FileManagerModule().transfer(
fileitem=fileitem,
meta=meta,
mediainfo=mediainfo,
target_directory=target_directory,
source_oper=guarded_storage,
target_oper=guarded_storage,
preview=True,
)
assert transferinfo.success is True
assert transferinfo.need_notify is False
assert transferinfo.need_scrape is True
assert transferinfo.target_item.storage == "alist"
assert transferinfo.target_item.path.endswith(".mkv")
assert transferinfo.target_diritem.path.startswith("/library/")
assert transferinfo.file_list == [fileitem.path]
assert transferinfo.file_list_new == [transferinfo.target_item.path]
def test_local_storage_preview_skips_target_conflict_checks(tmp_path):
source_file = tmp_path / "downloads" / "Test.Show.S01E02.mkv"
source_file.parent.mkdir(parents=True)
source_file.write_bytes(b"test video")
library_path = tmp_path / "library"
fileitem = FileItem(
storage="local",
path=source_file.as_posix(),
type="file",
name=source_file.name,
basename=source_file.stem,
extension="mkv",
size=source_file.stat().st_size,
)
meta = MetaInfoPath(source_file)
mediainfo = MediaInfo(
type=MediaType.TV,
title="Test Show",
year="2026",
tmdb_id=12345,
)
target_directory = TransferDirectoryConf(
name="local-library",
transfer_type="copy",
overwrite_mode="latest",
library_path=library_path.as_posix(),
library_storage="local",
renaming=True,
scraping=True,
notify=True,
)
guarded_storage = GuardedStorage()
transferinfo = FileManagerModule().transfer(
fileitem=fileitem,
meta=meta,
mediainfo=mediainfo,
target_directory=target_directory,
source_oper=guarded_storage,
target_oper=guarded_storage,
preview=True,
)
assert transferinfo.success is True
assert transferinfo.need_notify is False
assert transferinfo.need_scrape is True
assert transferinfo.target_item.storage == "local"
assert transferinfo.target_item.path.startswith(library_path.as_posix())
assert transferinfo.target_item.path.endswith(".mkv")
assert transferinfo.file_list == [fileitem.path]
assert transferinfo.file_list_new == [transferinfo.target_item.path]