diff --git a/app/modules/filemanager/storages/u115.py b/app/modules/filemanager/storages/u115.py index 2c683e22..56ba8ae4 100644 --- a/app/modules/filemanager/storages/u115.py +++ b/app/modules/filemanager/storages/u115.py @@ -26,6 +26,20 @@ from app.utils.limit import QpsRateLimiter, RateStats lock = Lock() +MIN_U115_UPLOAD_PART_SIZE = 1 * 1024 * 1024 +U115_UPLOAD_PART_COUNT_TARGET = 96 +U115_UPLOAD_PART_SIZE_STEPS = ( + 10 * 1024 * 1024, + 16 * 1024 * 1024, + 32 * 1024 * 1024, + 64 * 1024 * 1024, + 128 * 1024 * 1024, + 256 * 1024 * 1024, + 512 * 1024 * 1024, + 1024 * 1024 * 1024, +) + + class NoCheckInException(Exception): pass @@ -410,6 +424,24 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): sha1.update(chunk) return sha1.finalize().hex() + @staticmethod + def __get_upload_part_size(file_size: int) -> int: + """ + 根据文件大小获取 115 OSS 上传分片大小。 + """ + if file_size <= 0: + return U115_UPLOAD_PART_SIZE_STEPS[0] + + target_part_size = max( + MIN_U115_UPLOAD_PART_SIZE, + (file_size + U115_UPLOAD_PART_COUNT_TARGET - 1) + // U115_UPLOAD_PART_COUNT_TARGET, + ) + for part_size in U115_UPLOAD_PART_SIZE_STEPS: + if target_part_size <= part_size: + return part_size + return U115_UPLOAD_PART_SIZE_STEPS[-1] + def init_storage(self): pass @@ -675,8 +707,9 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): security_token=SecurityToken, ) bucket = oss2.Bucket(auth, endpoint, bucket_name) # noqa - # determine_part_size方法用于确定分片大小,设置分片大小为 10M - part_size = determine_part_size(file_size, preferred_size=10 * 1024 * 1024) + part_size = determine_part_size( + file_size, preferred_size=self.__get_upload_part_size(file_size) + ) # 初始化进度条 logger.info( @@ -729,14 +762,19 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): result = bucket.complete_multipart_upload( object_name, upload_id, parts, headers=headers ) - if result.status == 200: - logger.debug( - f"【115】上传 Step 6 回调结果:{result.resp.response.json()}" - ) - logger.info(f"【115】{target_name} 上传成功") - else: + if result.status != 200: logger.warn(f"【115】{target_name} 上传失败,错误码: {result.status}") return None + try: + callback_result = result.resp.response.json() + except Exception as e: + logger.error(f"【115】{target_name} 上传完成回调解析失败: {str(e)}") + return None + logger.debug(f"【115】上传 Step 6 回调结果:{callback_result}") + if not callback_result or not callback_result.get("state"): + logger.warn(f"【115】{target_name} 上传完成回调失败: {callback_result}") + return None + logger.info(f"【115】{target_name} 上传成功") except oss2.exceptions.OssError as e: if e.code == "FileAlreadyExists": logger.warn(f"【115】{target_name} 已存在") diff --git a/tests/test_u115_storage.py b/tests/test_u115_storage.py index 96aca1c8..92a08757 100644 --- a/tests/test_u115_storage.py +++ b/tests/test_u115_storage.py @@ -5,6 +5,115 @@ from app.modules.filemanager.storages.u115 import U115Pan from app.schemas import FileItem +def _target_dir() -> FileItem: + """ + 构造 115 上传目标目录。 + """ + return FileItem( + storage="u115", + path="/library/Test Show (2026)/Season 1", + type="dir", + name="Season 1", + fileid="100", + ) + + +def _fake_sha1(*_args, **_kwargs) -> str: + """ + 返回固定 SHA1,避免单测重复计算文件哈希。 + """ + return "sha1" + + +def _fake_request_api(_method, endpoint, *_args, **_kwargs): + """ + 模拟 115 初始化、凭证和断点续传接口。 + """ + if endpoint == "/open/upload/init": + return { + "state": True, + "data": { + "bucket": "bucket", + "object": "object", + "callback": {"callback": "callback", "callback_var": "var"}, + "pick_code": "pickcode", + "status": 1, + }, + } + if endpoint == "/open/upload/get_token": + return { + "endpoint": "endpoint", + "AccessKeyId": "access_key_id", + "AccessKeySecret": "access_key_secret", + "SecurityToken": "security_token", + } + if endpoint == "/open/upload/resume": + return None + return None + + +class _FakeBucket: + """ + 模拟 OSS 分片上传客户端。 + """ + + complete_payload = {"state": True} + + def __init__(self, *_args, **_kwargs): + """ + 初始化伪造 Bucket。 + """ + pass + + def init_multipart_upload(self, *_args, **_kwargs): + """ + 返回固定 upload_id。 + """ + return SimpleNamespace(upload_id="upload_id") + + def upload_part(self, *_args, **_kwargs): + """ + 返回固定分片 etag。 + """ + return SimpleNamespace(etag="etag") + + def complete_multipart_upload(self, *_args, **_kwargs): + """ + 返回可配置的 115 回调结果。 + """ + response = SimpleNamespace(json=lambda: self.complete_payload) + return SimpleNamespace(status=200, resp=SimpleNamespace(response=response)) + + +def _build_storage() -> U115Pan: + """ + 构造跳过初始化流程的 115 存储实例。 + """ + storage = object.__new__(U115Pan) + storage._calc_sha1 = _fake_sha1 + storage.get_item = lambda _path: None + storage._request_api = _fake_request_api + return storage + + +def _upload_with_fakes(storage: U115Pan, target_dir: FileItem, local_file): + """ + 使用伪造 OSS 和进度回调执行上传。 + """ + + with patch( + "app.modules.filemanager.storages.u115.oss2.StsAuth", + return_value=object(), + ), patch( + "app.modules.filemanager.storages.u115.oss2.Bucket", + _FakeBucket, + ), patch( + "app.modules.filemanager.storages.u115.transfer_process", + return_value=lambda _progress: None, + ): + return storage.upload(target_dir, local_file) + + def test_upload_returns_target_fileitem_when_uploaded_metadata_is_delayed(tmp_path): """ 115 上传完成后目录索引暂不可见时,应返回可落库的目标文件项。 @@ -12,88 +121,60 @@ def test_upload_returns_target_fileitem_when_uploaded_metadata_is_delayed(tmp_pa local_file = tmp_path / "Test.Show.S01E01.mkv" local_file.write_bytes(b"movie") - target_dir = FileItem( - storage="u115", - path="/library/Test Show (2026)/Season 1", - type="dir", - name="Season 1", - fileid="100", - ) - storage = object.__new__(U115Pan) - storage._calc_sha1 = lambda *_args, **_kwargs: "sha1" - storage.get_item = lambda _path: None - - def fake_request_api(_method, endpoint, *_args, **_kwargs): - """ - 模拟 115 初始化、凭证和断点续传接口。 - """ - if endpoint == "/open/upload/init": - return { - "state": True, - "data": { - "bucket": "bucket", - "object": "object", - "callback": {"callback": "callback", "callback_var": "var"}, - "pick_code": "pickcode", - "status": 1, - }, - } - if endpoint == "/open/upload/get_token": - return { - "endpoint": "endpoint", - "AccessKeyId": "access_key_id", - "AccessKeySecret": "access_key_secret", - "SecurityToken": "security_token", - } - if endpoint == "/open/upload/resume": - return None - return None - - class FakeBucket: - """ - 模拟 OSS 分片上传客户端。 - """ - - def __init__(self, *_args, **_kwargs): - pass - - def init_multipart_upload(self, *_args, **_kwargs): - """ - 返回固定 upload_id。 - """ - return SimpleNamespace(upload_id="upload_id") - - def upload_part(self, *_args, **_kwargs): - """ - 返回固定分片 etag。 - """ - return SimpleNamespace(etag="etag") - - def complete_multipart_upload(self, *_args, **_kwargs): - """ - 模拟 OSS 完成分片上传成功。 - """ - response = SimpleNamespace(json=lambda: {"state": True}) - return SimpleNamespace( - status=200, resp=SimpleNamespace(response=response) - ) - - storage._request_api = fake_request_api - - with patch( - "app.modules.filemanager.storages.u115.oss2.StsAuth", - return_value=object(), - ), patch( - "app.modules.filemanager.storages.u115.oss2.Bucket", - FakeBucket, - ), patch( - "app.modules.filemanager.storages.u115.transfer_process", - return_value=lambda _progress: None, - ): - uploaded_item = storage.upload(target_dir, local_file) + storage = _build_storage() + uploaded_item = _upload_with_fakes(storage, _target_dir(), local_file) assert uploaded_item is not None assert uploaded_item.storage == "u115" assert uploaded_item.path == "/library/Test Show (2026)/Season 1/Test.Show.S01E01.mkv" assert uploaded_item.type == "file" assert uploaded_item.size == local_file.stat().st_size + + +def test_upload_returns_none_when_complete_callback_reports_failure(tmp_path): + """ + 115 完成回调失败时,不应把文件视为上传成功。 + """ + local_file = tmp_path / "Test.Show.S01E02.mkv" + local_file.write_bytes(b"movie") + + storage = _build_storage() + with patch.object( + _FakeBucket, + "complete_payload", + {"state": False, "message": "callback failed"}, + ): + uploaded_item = _upload_with_fakes(storage, _target_dir(), local_file) + + assert uploaded_item is None + + +def test_upload_uses_dynamic_part_size(tmp_path): + """ + 115 上传应根据文件大小动态控制 OSS 分片大小。 + """ + local_file = tmp_path / "Test.Show.S01E03.mkv" + local_file.write_bytes(b"movie") + + storage = _build_storage() + with patch( + "app.modules.filemanager.storages.u115.determine_part_size", + return_value=local_file.stat().st_size, + ) as determine_part_size_mock: + uploaded_item = _upload_with_fakes(storage, _target_dir(), local_file) + + assert uploaded_item is not None + determine_part_size_mock.assert_called_once_with( + local_file.stat().st_size, preferred_size=10 * 1024 * 1024 + ) + + +def test_upload_part_size_grows_for_large_files(): + """ + 大文件上传应自动放大分片大小,避免产生过多分片。 + """ + file_size = int(5.6 * 1024 * 1024 * 1024) + + part_size = U115Pan._U115Pan__get_upload_part_size(file_size) + + assert part_size == 64 * 1024 * 1024