fix: improve u115 multipart upload handling

This commit is contained in:
jxxghp
2026-05-27 15:02:03 +08:00
parent d9a06f4433
commit 2b2e088784
2 changed files with 206 additions and 87 deletions

View File

@@ -26,6 +26,20 @@ from app.utils.limit import QpsRateLimiter, RateStats
lock = Lock()
MIN_U115_UPLOAD_PART_SIZE = 1 * 1024 * 1024
U115_UPLOAD_PART_COUNT_TARGET = 96
U115_UPLOAD_PART_SIZE_STEPS = (
10 * 1024 * 1024,
16 * 1024 * 1024,
32 * 1024 * 1024,
64 * 1024 * 1024,
128 * 1024 * 1024,
256 * 1024 * 1024,
512 * 1024 * 1024,
1024 * 1024 * 1024,
)
class NoCheckInException(Exception):
pass
@@ -410,6 +424,24 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
sha1.update(chunk)
return sha1.finalize().hex()
@staticmethod
def __get_upload_part_size(file_size: int) -> int:
"""
根据文件大小获取 115 OSS 上传分片大小。
"""
if file_size <= 0:
return U115_UPLOAD_PART_SIZE_STEPS[0]
target_part_size = max(
MIN_U115_UPLOAD_PART_SIZE,
(file_size + U115_UPLOAD_PART_COUNT_TARGET - 1)
// U115_UPLOAD_PART_COUNT_TARGET,
)
for part_size in U115_UPLOAD_PART_SIZE_STEPS:
if target_part_size <= part_size:
return part_size
return U115_UPLOAD_PART_SIZE_STEPS[-1]
def init_storage(self):
pass
@@ -675,8 +707,9 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
security_token=SecurityToken,
)
bucket = oss2.Bucket(auth, endpoint, bucket_name) # noqa
# determine_part_size方法用于确定分片大小,设置分片大小为 10M
part_size = determine_part_size(file_size, preferred_size=10 * 1024 * 1024)
part_size = determine_part_size(
file_size, preferred_size=self.__get_upload_part_size(file_size)
)
# 初始化进度条
logger.info(
@@ -729,14 +762,19 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
result = bucket.complete_multipart_upload(
object_name, upload_id, parts, headers=headers
)
if result.status == 200:
logger.debug(
f"【115】上传 Step 6 回调结果:{result.resp.response.json()}"
)
logger.info(f"【115】{target_name} 上传成功")
else:
if result.status != 200:
logger.warn(f"【115】{target_name} 上传失败,错误码: {result.status}")
return None
try:
callback_result = result.resp.response.json()
except Exception as e:
logger.error(f"【115】{target_name} 上传完成回调解析失败: {str(e)}")
return None
logger.debug(f"【115】上传 Step 6 回调结果:{callback_result}")
if not callback_result or not callback_result.get("state"):
logger.warn(f"【115】{target_name} 上传完成回调失败: {callback_result}")
return None
logger.info(f"【115】{target_name} 上传成功")
except oss2.exceptions.OssError as e:
if e.code == "FileAlreadyExists":
logger.warn(f"【115】{target_name} 已存在")

View File

@@ -5,6 +5,115 @@ from app.modules.filemanager.storages.u115 import U115Pan
from app.schemas import FileItem
def _target_dir() -> FileItem:
"""
构造 115 上传目标目录。
"""
return FileItem(
storage="u115",
path="/library/Test Show (2026)/Season 1",
type="dir",
name="Season 1",
fileid="100",
)
def _fake_sha1(*_args, **_kwargs) -> str:
"""
返回固定 SHA1避免单测重复计算文件哈希。
"""
return "sha1"
def _fake_request_api(_method, endpoint, *_args, **_kwargs):
"""
模拟 115 初始化、凭证和断点续传接口。
"""
if endpoint == "/open/upload/init":
return {
"state": True,
"data": {
"bucket": "bucket",
"object": "object",
"callback": {"callback": "callback", "callback_var": "var"},
"pick_code": "pickcode",
"status": 1,
},
}
if endpoint == "/open/upload/get_token":
return {
"endpoint": "endpoint",
"AccessKeyId": "access_key_id",
"AccessKeySecret": "access_key_secret",
"SecurityToken": "security_token",
}
if endpoint == "/open/upload/resume":
return None
return None
class _FakeBucket:
"""
模拟 OSS 分片上传客户端。
"""
complete_payload = {"state": True}
def __init__(self, *_args, **_kwargs):
"""
初始化伪造 Bucket。
"""
pass
def init_multipart_upload(self, *_args, **_kwargs):
"""
返回固定 upload_id。
"""
return SimpleNamespace(upload_id="upload_id")
def upload_part(self, *_args, **_kwargs):
"""
返回固定分片 etag。
"""
return SimpleNamespace(etag="etag")
def complete_multipart_upload(self, *_args, **_kwargs):
"""
返回可配置的 115 回调结果。
"""
response = SimpleNamespace(json=lambda: self.complete_payload)
return SimpleNamespace(status=200, resp=SimpleNamespace(response=response))
def _build_storage() -> U115Pan:
"""
构造跳过初始化流程的 115 存储实例。
"""
storage = object.__new__(U115Pan)
storage._calc_sha1 = _fake_sha1
storage.get_item = lambda _path: None
storage._request_api = _fake_request_api
return storage
def _upload_with_fakes(storage: U115Pan, target_dir: FileItem, local_file):
"""
使用伪造 OSS 和进度回调执行上传。
"""
with patch(
"app.modules.filemanager.storages.u115.oss2.StsAuth",
return_value=object(),
), patch(
"app.modules.filemanager.storages.u115.oss2.Bucket",
_FakeBucket,
), patch(
"app.modules.filemanager.storages.u115.transfer_process",
return_value=lambda _progress: None,
):
return storage.upload(target_dir, local_file)
def test_upload_returns_target_fileitem_when_uploaded_metadata_is_delayed(tmp_path):
"""
115 上传完成后目录索引暂不可见时,应返回可落库的目标文件项。
@@ -12,88 +121,60 @@ def test_upload_returns_target_fileitem_when_uploaded_metadata_is_delayed(tmp_pa
local_file = tmp_path / "Test.Show.S01E01.mkv"
local_file.write_bytes(b"movie")
target_dir = FileItem(
storage="u115",
path="/library/Test Show (2026)/Season 1",
type="dir",
name="Season 1",
fileid="100",
)
storage = object.__new__(U115Pan)
storage._calc_sha1 = lambda *_args, **_kwargs: "sha1"
storage.get_item = lambda _path: None
def fake_request_api(_method, endpoint, *_args, **_kwargs):
"""
模拟 115 初始化、凭证和断点续传接口。
"""
if endpoint == "/open/upload/init":
return {
"state": True,
"data": {
"bucket": "bucket",
"object": "object",
"callback": {"callback": "callback", "callback_var": "var"},
"pick_code": "pickcode",
"status": 1,
},
}
if endpoint == "/open/upload/get_token":
return {
"endpoint": "endpoint",
"AccessKeyId": "access_key_id",
"AccessKeySecret": "access_key_secret",
"SecurityToken": "security_token",
}
if endpoint == "/open/upload/resume":
return None
return None
class FakeBucket:
"""
模拟 OSS 分片上传客户端。
"""
def __init__(self, *_args, **_kwargs):
pass
def init_multipart_upload(self, *_args, **_kwargs):
"""
返回固定 upload_id。
"""
return SimpleNamespace(upload_id="upload_id")
def upload_part(self, *_args, **_kwargs):
"""
返回固定分片 etag。
"""
return SimpleNamespace(etag="etag")
def complete_multipart_upload(self, *_args, **_kwargs):
"""
模拟 OSS 完成分片上传成功。
"""
response = SimpleNamespace(json=lambda: {"state": True})
return SimpleNamespace(
status=200, resp=SimpleNamespace(response=response)
)
storage._request_api = fake_request_api
with patch(
"app.modules.filemanager.storages.u115.oss2.StsAuth",
return_value=object(),
), patch(
"app.modules.filemanager.storages.u115.oss2.Bucket",
FakeBucket,
), patch(
"app.modules.filemanager.storages.u115.transfer_process",
return_value=lambda _progress: None,
):
uploaded_item = storage.upload(target_dir, local_file)
storage = _build_storage()
uploaded_item = _upload_with_fakes(storage, _target_dir(), local_file)
assert uploaded_item is not None
assert uploaded_item.storage == "u115"
assert uploaded_item.path == "/library/Test Show (2026)/Season 1/Test.Show.S01E01.mkv"
assert uploaded_item.type == "file"
assert uploaded_item.size == local_file.stat().st_size
def test_upload_returns_none_when_complete_callback_reports_failure(tmp_path):
"""
115 完成回调失败时,不应把文件视为上传成功。
"""
local_file = tmp_path / "Test.Show.S01E02.mkv"
local_file.write_bytes(b"movie")
storage = _build_storage()
with patch.object(
_FakeBucket,
"complete_payload",
{"state": False, "message": "callback failed"},
):
uploaded_item = _upload_with_fakes(storage, _target_dir(), local_file)
assert uploaded_item is None
def test_upload_uses_dynamic_part_size(tmp_path):
"""
115 上传应根据文件大小动态控制 OSS 分片大小。
"""
local_file = tmp_path / "Test.Show.S01E03.mkv"
local_file.write_bytes(b"movie")
storage = _build_storage()
with patch(
"app.modules.filemanager.storages.u115.determine_part_size",
return_value=local_file.stat().st_size,
) as determine_part_size_mock:
uploaded_item = _upload_with_fakes(storage, _target_dir(), local_file)
assert uploaded_item is not None
determine_part_size_mock.assert_called_once_with(
local_file.stat().st_size, preferred_size=10 * 1024 * 1024
)
def test_upload_part_size_grows_for_large_files():
"""
大文件上传应自动放大分片大小,避免产生过多分片。
"""
file_size = int(5.6 * 1024 * 1024 * 1024)
part_size = U115Pan._U115Pan__get_upload_part_size(file_size)
assert part_size == 64 * 1024 * 1024