Compare commits

...

44 Commits

Author SHA1 Message Date
jxxghp
d45a7fb262 更新 version.py 2025-08-24 19:59:31 +08:00
jxxghp
918d192c0f OpenList自动延迟重试获取文件项 2025-08-24 19:47:00 +08:00
jxxghp
f7cd6eac50 feat:整理手动中止功能 2025-08-24 19:17:41 +08:00
jxxghp
88f4428ff0 fix bug 2025-08-24 17:07:45 +08:00
jxxghp
069ea22ba2 fix bug 2025-08-24 16:55:37 +08:00
jxxghp
8fac8c5307 fix progress step 2025-08-24 16:33:44 +08:00
jxxghp
2285befebb fix cache set 2025-08-24 16:10:48 +08:00
jxxghp
1cd0648e4e fix cache set 2025-08-24 15:36:56 +08:00
jxxghp
0b7ba285c6 fix:优雅停止超时处理 2025-08-24 13:07:52 +08:00
jxxghp
30446c4526 fix cache is_redis 2025-08-24 12:27:14 +08:00
jxxghp
9b843c9ed2 fix:整理记录登记 2025-08-24 12:19:12 +08:00
jxxghp
2ce1c3bef8 feat:整理进度登记 2025-08-24 12:04:05 +08:00
jxxghp
e463094dc7 feat:整理进度 2025-08-24 09:21:55 +08:00
jxxghp
71a9fe10f4 refactor ProgressHelper 2025-08-24 09:02:55 +08:00
jxxghp
ba146e13ef fix 优化cache模块声明 2025-08-24 08:36:37 +08:00
jxxghp
c060d7e3e0 更新 postgresql-setup.md 2025-08-23 22:26:34 +08:00
jxxghp
ba96678822 v2.7.5 2025-08-23 20:46:36 +08:00
jxxghp
4f6354f383 Merge pull request #4820 from DDS-Derek/dev 2025-08-23 18:46:52 +08:00
DDSRem
2766e80346 fix(database): use logger as log output
Co-Authored-By: Aqr-K <95741669+Aqr-K@users.noreply.github.com>
2025-08-23 18:36:11 +08:00
jxxghp
7cc3777a60 fix async cache 2025-08-23 18:34:47 +08:00
DDSRem
cb1dd9f17d fix(database): upgrade error in pg database
Co-Authored-By: Aqr-K <95741669+Aqr-K@users.noreply.github.com>
2025-08-23 18:12:13 +08:00
jxxghp
31f342fe4f fix torrent 2025-08-23 18:10:33 +08:00
jxxghp
e90359eb08 fix douban 2025-08-23 15:56:30 +08:00
jxxghp
58b0768a30 fix redis key 2025-08-23 15:53:03 +08:00
jxxghp
3b04506893 fix redis key 2025-08-23 15:40:38 +08:00
jxxghp
354165aa0a fix cache 2025-08-23 14:21:50 +08:00
jxxghp
343109836f fix cache 2025-08-23 14:06:44 +08:00
jxxghp
fcadac2adb Merge pull request #4817 from jxxghp/cursor/add-dict-operations-to-cachebackend-3877 2025-08-23 12:42:04 +08:00
Cursor Agent
5e7dcdfe97 Modify cache region key generation to use consistent prefix format
Co-authored-by: jxxghp <jxxghp@live.cn>
2025-08-23 04:13:25 +00:00
Cursor Agent
2ec9a57391 Remove implementation and migration documentation files
Co-authored-by: jxxghp <jxxghp@live.cn>
2025-08-23 04:07:04 +00:00
Cursor Agent
973c545723 Checkpoint before follow-up message
Co-authored-by: jxxghp <jxxghp@live.cn>
2025-08-23 04:06:16 +00:00
Cursor Agent
fd62eecfef Simplify TTLCache, remove dict-like methods, enhance Cache interface
Co-authored-by: jxxghp <jxxghp@live.cn>
2025-08-23 04:01:17 +00:00
Cursor Agent
b5ca7058c2 Add helper methods for cache backend in sync and async versions
Co-authored-by: jxxghp <jxxghp@live.cn>
2025-08-23 03:58:04 +00:00
Cursor Agent
57a48f099f Add dict-like operations to CacheBackend with sync and async support
Co-authored-by: jxxghp <jxxghp@live.cn>
2025-08-23 03:50:52 +00:00
jxxghp
4699f511bf Handle magnet links in torrent parsing and downloader modules (#4815)
Co-authored-by: Cursor Agent <cursoragent@cursor.com>
Co-authored-by: jxxghp <jxxghp@live.cn>
2025-08-23 10:51:32 +08:00
jxxghp
cd8f7e72e0 同步错误修复 2025-08-22 17:33:24 +08:00
jxxghp
78803fa284 fix search_imdbid type 2025-08-22 16:37:30 +08:00
jxxghp
2e8d75df16 fix monitor cache 2025-08-22 15:30:49 +08:00
jxxghp
7e3bbfd960 Merge pull request #4807 from carolcoral/v2 2025-08-22 15:23:04 +08:00
jxxghp
1734d53b3c Replace file-based snapshot caching with FileCache implementation (#4809)
Co-authored-by: Cursor Agent <cursoragent@cursor.com>
Co-authored-by: jxxghp <jxxghp@live.cn>
2025-08-22 13:59:30 +08:00
jxxghp
f37540f4e5 fix get_rss timeout 2025-08-22 11:44:16 +08:00
jxxghp
addb9d836a remove cache singleton 2025-08-22 11:33:53 +08:00
Carol
4184d8c7ac 补充迁移数据库异常的注意事项
add: sqlite迁移到postgresql的注意事项
2025-08-22 10:55:26 +08:00
jxxghp
724c15a68c add 插件内存统计API 2025-08-22 09:46:11 +08:00
45 changed files with 2101 additions and 781 deletions

View File

@@ -13,7 +13,7 @@ from app import schemas
from app.command import Command
from app.core.config import settings
from app.core.plugin import PluginManager
from app.core.security import verify_apikey, verify_token
from app.core.security import verify_apikey, verify_token, verify_apitoken
from app.db.models import User
from app.db.systemconfig_oper import SystemConfigOper
from app.db.user_oper import get_current_active_superuser, get_current_active_superuser_async
@@ -21,6 +21,7 @@ from app.factory import app
from app.helper.plugin import PluginHelper
from app.log import logger
from app.scheduler import Scheduler
from app.schemas.plugin import PluginMemoryInfo
from app.schemas.types import SystemConfigKey
PROTECTED_ROUTES = {"/api/v1/openapi.json", "/docs", "/docs/oauth2-redirect", "/redoc"}
@@ -463,6 +464,87 @@ async def update_folder_plugins(folder_name: str, plugin_ids: List[str],
return schemas.Response(success=True, message=f"文件夹 '{folder_name}' 中的插件已更新")
@router.post("/clone/{plugin_id}", summary="创建插件分身", response_model=schemas.Response)
def clone_plugin(plugin_id: str,
clone_data: dict,
_: User = Depends(get_current_active_superuser)) -> Any:
"""
创建插件分身
"""
try:
success, message = PluginManager().clone_plugin(
plugin_id=plugin_id,
suffix=clone_data.get("suffix", ""),
name=clone_data.get("name", ""),
description=clone_data.get("description", ""),
version=clone_data.get("version", ""),
icon=clone_data.get("icon", "")
)
if success:
# 注册插件服务
reload_plugin(message)
# 将分身插件添加到原插件所在的文件夹中
_add_clone_to_plugin_folder(plugin_id, message)
return schemas.Response(success=True, message="插件分身创建成功")
else:
return schemas.Response(success=False, message=message)
except Exception as e:
logger.error(f"创建插件分身失败:{str(e)}")
return schemas.Response(success=False, message=f"创建插件分身失败:{str(e)}")
@router.get("/memory", summary="插件内存使用统计", response_model=List[PluginMemoryInfo])
def plugin_memory_stats(_: Annotated[str, Depends(verify_apitoken)]) -> Any:
"""
获取所有插件的内存使用统计信息
"""
try:
plugin_manager = PluginManager()
memory_stats = plugin_manager.get_plugin_memory_stats()
return memory_stats
except Exception as e:
logger.error(f"获取插件内存统计失败:{str(e)}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"获取插件内存统计失败:{str(e)}")
@router.get("/memory/{plugin_id}", summary="单个插件内存使用统计", response_model=PluginMemoryInfo)
def plugin_memory_stat(plugin_id: str, _: Annotated[str, Depends(verify_apitoken)]) -> Any:
"""
获取指定插件的内存使用统计信息
"""
try:
plugin_manager = PluginManager()
memory_stats = plugin_manager.get_plugin_memory_stats(plugin_id)
if not memory_stats:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,
detail=f"插件 {plugin_id} 不存在或未运行")
return memory_stats[0]
except HTTPException:
raise
except Exception as e:
logger.error(f"获取插件 {plugin_id} 内存统计失败:{str(e)}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"获取插件内存统计失败:{str(e)}")
@router.delete("/memory/cache", summary="清除插件内存统计缓存")
def clear_plugin_memory_cache(_: Annotated[str, Depends(verify_apitoken)],
plugin_id: Optional[str] = None) -> Any:
"""
清除插件内存统计缓存
"""
try:
plugin_manager = PluginManager()
plugin_manager.clear_plugin_memory_cache(plugin_id)
message = f"已清除插件 {plugin_id} 的内存统计缓存" if plugin_id else "已清除所有插件的内存统计缓存"
return schemas.Response(success=True, message=message)
except Exception as e:
logger.error(f"清除插件内存统计缓存失败:{str(e)}")
return schemas.Response(success=False, message=f"清除缓存失败:{str(e)}")
@router.get("/{plugin_id}", summary="获取插件配置")
async def plugin_config(plugin_id: str,
_: User = Depends(get_current_active_superuser_async)) -> dict:
@@ -528,36 +610,6 @@ def uninstall_plugin(plugin_id: str,
return schemas.Response(success=True)
@router.post("/clone/{plugin_id}", summary="创建插件分身", response_model=schemas.Response)
def clone_plugin(plugin_id: str,
clone_data: dict,
_: User = Depends(get_current_active_superuser)) -> Any:
"""
创建插件分身
"""
try:
success, message = PluginManager().clone_plugin(
plugin_id=plugin_id,
suffix=clone_data.get("suffix", ""),
name=clone_data.get("name", ""),
description=clone_data.get("description", ""),
version=clone_data.get("version", ""),
icon=clone_data.get("icon", "")
)
if success:
# 注册插件服务
reload_plugin(message)
# 将分身插件添加到原插件所在的文件夹中
_add_clone_to_plugin_folder(plugin_id, message)
return schemas.Response(success=True, message="插件分身创建成功")
else:
return schemas.Response(success=False, message=message)
except Exception as e:
logger.error(f"创建插件分身失败:{str(e)}")
return schemas.Response(success=False, message=f"创建插件分身失败:{str(e)}")
def _add_clone_to_plugin_folder(original_plugin_id: str, clone_plugin_id: str):
"""
将分身插件添加到原插件所在的文件夹中

View File

@@ -171,15 +171,14 @@ def rename(fileitem: schemas.FileItem,
sub_files: List[schemas.FileItem] = StorageChain().list_files(fileitem)
if sub_files:
# 开始进度
progress = ProgressHelper()
progress.start(ProgressKey.BatchRename)
progress = ProgressHelper(ProgressKey.BatchRename)
progress.start()
total = len(sub_files)
handled = 0
for sub_file in sub_files:
handled += 1
progress.update(value=handled / total * 100,
text=f"正在处理 {sub_file.name} ...",
key=ProgressKey.BatchRename)
text=f"正在处理 {sub_file.name} ...")
if sub_file.type == "dir":
continue
if not sub_file.extension:
@@ -190,19 +189,19 @@ def rename(fileitem: schemas.FileItem,
meta = MetaInfoPath(sub_path)
mediainfo = transferchain.recognize_media(meta)
if not mediainfo:
progress.end(ProgressKey.BatchRename)
progress.end()
return schemas.Response(success=False, message=f"{sub_path.name} 未识别到媒体信息")
new_path = transferchain.recommend_name(meta=meta, mediainfo=mediainfo)
if not new_path:
progress.end(ProgressKey.BatchRename)
progress.end()
return schemas.Response(success=False, message=f"{sub_path.name} 未识别到新名称")
ret: schemas.Response = rename(fileitem=sub_file,
new_name=Path(new_path).name,
recursive=False)
if not ret.success:
progress.end(ProgressKey.BatchRename)
progress.end()
return schemas.Response(success=False, message=f"{sub_path.name} 重命名失败!")
progress.end(ProgressKey.BatchRename)
progress.end()
# 重命名自己
result = StorageChain().rename_file(fileitem, new_name)
if result:

View File

@@ -254,14 +254,14 @@ async def get_progress(request: Request, process_type: str, _: schemas.TokenPayl
"""
实时获取处理进度返回格式为SSE
"""
progress = ProgressHelper()
progress = ProgressHelper(process_type)
async def event_generator():
try:
while not global_vars.is_system_stopped:
if await request.is_disconnected():
break
detail = progress.get(process_type)
detail = progress.get()
yield f"data: {json.dumps(detail)}\n\n"
await asyncio.sleep(0.5)
except asyncio.CancelledError:

View File

@@ -8,7 +8,7 @@ from app import schemas
from app.chain.media import MediaChain
from app.chain.storage import StorageChain
from app.chain.transfer import TransferChain
from app.core.config import settings
from app.core.config import settings, global_vars
from app.core.metainfo import MetaInfoPath
from app.core.security import verify_token, verify_apitoken
from app.db import get_db
@@ -75,6 +75,8 @@ async def remove_queue(fileitem: schemas.FileItem, _: schemas.TokenPayload = Dep
:param _: Token校验
"""
TransferChain().remove_from_queue(fileitem)
# 取消整理
global_vars.stop_transfer(fileitem.path)
return schemas.Response(success=True)

View File

@@ -215,12 +215,11 @@ class SearchChain(ChainBase):
return []
# 开始新进度
progress = ProgressHelper()
progress.start(ProgressKey.Search)
progress = ProgressHelper(ProgressKey.Search)
progress.start()
# 开始过滤
progress.update(value=0, text=f'开始过滤,总 {len(torrents)} 个资源,请稍候...',
key=ProgressKey.Search)
progress.update(value=0, text=f'开始过滤,总 {len(torrents)} 个资源,请稍候...')
# 匹配订阅附加参数
if filter_params:
logger.info(f'开始附加参数过滤,附加参数:{filter_params} ...')
@@ -238,7 +237,7 @@ class SearchChain(ChainBase):
logger.info(f"过滤规则/剧集过滤完成,剩余 {len(torrents)} 个资源")
# 过滤完成
progress.update(value=50, text=f'过滤完成,剩余 {len(torrents)} 个资源', key=ProgressKey.Search)
progress.update(value=50, text=f'过滤完成,剩余 {len(torrents)} 个资源')
# 总数
_total = len(torrents)
@@ -251,14 +250,13 @@ class SearchChain(ChainBase):
try:
# 英文标题应该在别名/原标题中,不需要再匹配
logger.info(f"开始匹配结果 标题:{mediainfo.title},原标题:{mediainfo.original_title},别名:{mediainfo.names}")
progress.update(value=51, text=f'开始匹配,总 {_total} 个资源 ...', key=ProgressKey.Search)
progress.update(value=51, text=f'开始匹配,总 {_total} 个资源 ...')
for torrent in torrents:
if global_vars.is_system_stopped:
break
_count += 1
progress.update(value=(_count / _total) * 96,
text=f'正在匹配 {torrent.site_name},已完成 {_count} / {_total} ...',
key=ProgressKey.Search)
text=f'正在匹配 {torrent.site_name},已完成 {_count} / {_total} ...')
if not torrent.title:
continue
@@ -291,8 +289,7 @@ class SearchChain(ChainBase):
# 匹配完成
logger.info(f"匹配完成,共匹配到 {len(_match_torrents)} 个资源")
progress.update(value=97,
text=f'匹配完成,共匹配到 {len(_match_torrents)} 个资源',
key=ProgressKey.Search)
text=f'匹配完成,共匹配到 {len(_match_torrents)} 个资源')
# 去掉mediainfo中多余的数据
mediainfo.clear()
@@ -308,16 +305,14 @@ class SearchChain(ChainBase):
# 排序
progress.update(value=99,
text=f'正在对 {len(contexts)} 个资源进行排序,请稍候...',
key=ProgressKey.Search)
text=f'正在对 {len(contexts)} 个资源进行排序,请稍候...')
contexts = torrenthelper.sort_torrents(contexts)
# 结束进度
logger.info(f'搜索完成,共 {len(contexts)} 个资源')
progress.update(value=100,
text=f'搜索完成,共 {len(contexts)} 个资源',
key=ProgressKey.Search)
progress.end(ProgressKey.Search)
text=f'搜索完成,共 {len(contexts)} 个资源')
progress.end()
# 去重后返回
return self.__remove_duplicate(contexts)
@@ -521,8 +516,8 @@ class SearchChain(ChainBase):
return []
# 开始进度
progress = ProgressHelper()
progress.start(ProgressKey.Search)
progress = ProgressHelper(ProgressKey.Search)
progress.start()
# 开始计时
start_time = datetime.now()
# 总数
@@ -531,8 +526,7 @@ class SearchChain(ChainBase):
finish_count = 0
# 更新进度
progress.update(value=0,
text=f"开始搜索,共 {total_num} 个站点 ...",
key=ProgressKey.Search)
text=f"开始搜索,共 {total_num} 个站点 ...")
# 结果集
results = []
# 多线程
@@ -561,17 +555,15 @@ class SearchChain(ChainBase):
results.extend(result)
logger.info(f"站点搜索进度:{finish_count} / {total_num}")
progress.update(value=finish_count / total_num * 100,
text=f"正在搜索{keyword or ''},已完成 {finish_count} / {total_num} 个站点 ...",
key=ProgressKey.Search)
text=f"正在搜索{keyword or ''},已完成 {finish_count} / {total_num} 个站点 ...")
# 计算耗时
end_time = datetime.now()
# 更新进度
progress.update(value=100,
text=f"站点搜索完成,有效资源数:{len(results)},总耗时 {(end_time - start_time).seconds}",
key=ProgressKey.Search)
text=f"站点搜索完成,有效资源数:{len(results)},总耗时 {(end_time - start_time).seconds}")
logger.info(f"站点搜索完成,有效资源数:{len(results)},总耗时 {(end_time - start_time).seconds}")
# 结束进度
progress.end(ProgressKey.Search)
progress.end()
# 返回
return results
@@ -606,8 +598,8 @@ class SearchChain(ChainBase):
return []
# 开始进度
progress = ProgressHelper()
progress.start(ProgressKey.Search)
progress = ProgressHelper(ProgressKey.Search)
progress.start()
# 开始计时
start_time = datetime.now()
# 总数
@@ -616,8 +608,7 @@ class SearchChain(ChainBase):
finish_count = 0
# 更新进度
progress.update(value=0,
text=f"开始搜索,共 {total_num} 个站点 ...",
key=ProgressKey.Search)
text=f"开始搜索,共 {total_num} 个站点 ...")
# 结果集
results = []
@@ -648,18 +639,16 @@ class SearchChain(ChainBase):
results.extend(result)
logger.info(f"站点搜索进度:{finish_count} / {total_num}")
progress.update(value=finish_count / total_num * 100,
text=f"正在搜索{keyword or ''},已完成 {finish_count} / {total_num} 个站点 ...",
key=ProgressKey.Search)
text=f"正在搜索{keyword or ''},已完成 {finish_count} / {total_num} 个站点 ...")
# 计算耗时
end_time = datetime.now()
# 更新进度
progress.update(value=100,
text=f"站点搜索完成,有效资源数:{len(results)},总耗时 {(end_time - start_time).seconds}",
key=ProgressKey.Search)
text=f"站点搜索完成,有效资源数:{len(results)},总耗时 {(end_time - start_time).seconds}")
logger.info(f"站点搜索完成,有效资源数:{len(results)},总耗时 {(end_time - start_time).seconds}")
# 结束进度
progress.end(ProgressKey.Search)
progress.end()
# 返回
return results

View File

@@ -313,6 +313,11 @@ class SiteChain(ChainBase):
siteoper = SiteOper()
rsshelper = RssHelper()
for domain, cookie in cookies.items():
# 检查系统是否停止
if global_vars.is_system_stopped:
logger.info("系统正在停止中断CookieCloud同步")
return False, "系统正在停止,同步被中断"
# 索引器信息
indexer = siteshelper.get_indexer(domain)
# 数据库的站点信息
@@ -331,7 +336,7 @@ class SiteChain(ChainBase):
cookie=cookie,
ua=site_info.ua or settings.USER_AGENT,
proxy=True if site_info.proxy else False,
timeout=site_info.timeout
timeout=site_info.timeout or 15
)
if rss_url:
logger.info(f"更新站点 {domain} RSS地址 ...")

View File

@@ -555,8 +555,10 @@ class TransferChain(ChainBase, metaclass=Singleton):
processed_num = 0
# 失败数量
fail_num = 0
# 已完成文件
finished_files = []
progress = ProgressHelper()
progress = ProgressHelper(ProgressKey.FileTransfer)
while not global_vars.is_system_stopped:
try:
@@ -571,7 +573,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
if __queue_start:
logger.info("开始整理队列处理...")
# 启动进度
progress.start(ProgressKey.FileTransfer)
progress.start()
# 重置计数
processed_num = 0
fail_num = 0
@@ -579,8 +581,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
__process_msg = f"开始整理队列处理,当前共 {total_num} 个文件 ..."
logger.info(__process_msg)
progress.update(value=0,
text=__process_msg,
key=ProgressKey.FileTransfer)
text=__process_msg)
# 队列已开始
__queue_start = False
# 更新进度
@@ -588,7 +589,10 @@ class TransferChain(ChainBase, metaclass=Singleton):
logger.info(__process_msg)
progress.update(value=processed_num / total_num * 100,
text=__process_msg,
key=ProgressKey.FileTransfer)
data={
"current": Path(fileitem.path).as_posix(),
"finished":finished_files
})
# 整理
state, err_msg = self.__handle_transfer(task=task, callback=item.callback)
if not state:
@@ -596,20 +600,20 @@ class TransferChain(ChainBase, metaclass=Singleton):
fail_num += 1
# 更新进度
processed_num += 1
finished_files.append(Path(fileitem.path).as_posix())
__process_msg = f"{fileitem.name} 整理完成"
logger.info(__process_msg)
progress.update(value=processed_num / total_num * 100,
progress.update(value=(processed_num / total_num) * 100,
text=__process_msg,
key=ProgressKey.FileTransfer)
data={})
except queue.Empty:
if not __queue_start:
# 结束进度
__end_msg = f"整理队列处理完成,共整理 {processed_num} 个文件,失败 {fail_num}"
logger.info(__end_msg)
progress.update(value=100,
text=__end_msg,
key=ProgressKey.FileTransfer)
progress.end(ProgressKey.FileTransfer)
text=__end_msg)
progress.end()
# 重置计数
processed_num = 0
fail_num = 0
@@ -1165,15 +1169,16 @@ class TransferChain(ChainBase, metaclass=Singleton):
processed_num = 0
# 失败数量
fail_num = 0
# 已完成文件
finished_files = []
# 启动进度
progress = ProgressHelper()
progress.start(ProgressKey.FileTransfer)
progress = ProgressHelper(ProgressKey.FileTransfer)
progress.start()
__process_msg = f"开始整理,共 {total_num} 个文件 ..."
logger.info(__process_msg)
progress.update(value=0,
text=__process_msg,
key=ProgressKey.FileTransfer)
text=__process_msg)
try:
for transfer_task in transfer_tasks:
if global_vars.is_system_stopped:
@@ -1185,7 +1190,10 @@ class TransferChain(ChainBase, metaclass=Singleton):
logger.info(__process_msg)
progress.update(value=(processed_num + fail_num) / total_num * 100,
text=__process_msg,
key=ProgressKey.FileTransfer)
data={
"current": Path(transfer_task.fileitem.path).as_posix(),
"finished": finished_files,
})
state, err_msg = self.__handle_transfer(
task=transfer_task,
callback=self.__default_callback
@@ -1197,6 +1205,8 @@ class TransferChain(ChainBase, metaclass=Singleton):
fail_num += 1
else:
processed_num += 1
# 记录已完成
finished_files.append(Path(transfer_task.fileitem.path).as_posix())
finally:
transfer_tasks.clear()
del transfer_tasks
@@ -1206,8 +1216,8 @@ class TransferChain(ChainBase, metaclass=Singleton):
logger.info(__end_msg)
progress.update(value=100,
text=__end_msg,
key=ProgressKey.FileTransfer)
progress.end(ProgressKey.FileTransfer)
data={})
progress.end()
error_msg = "".join(err_msgs[:2]) + (f",等{len(err_msgs)}个文件错误!" if len(err_msgs) > 2 else "")
return all_success, error_msg
@@ -1352,12 +1362,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
else:
# 更新媒体图片
self.obtain_images(mediainfo=mediainfo)
# 开始进度
progress = ProgressHelper()
progress.start(ProgressKey.FileTransfer)
progress.update(value=0,
text=f"开始整理 {fileitem.path} ...",
key=ProgressKey.FileTransfer)
# 开始整理
state, errmsg = self.do_transfer(
fileitem=fileitem,
@@ -1378,7 +1383,6 @@ class TransferChain(ChainBase, metaclass=Singleton):
if not state:
return False, errmsg
progress.end(ProgressKey.FileTransfer)
logger.info(f"{fileitem.path} 整理完成")
return True, ""
else:

File diff suppressed because it is too large Load Diff

View File

@@ -798,6 +798,8 @@ class GlobalVar(object):
SUBSCRIPTIONS: List[dict] = []
# 需应急停止的工作流
EMERGENCY_STOP_WORKFLOWS: List[int] = []
# 需应急停止文件整理
EMERGENCY_STOP_TRANSFER: List[str] = []
def stop_system(self):
"""
@@ -838,12 +840,30 @@ class GlobalVar(object):
if workflow_id in self.EMERGENCY_STOP_WORKFLOWS:
self.EMERGENCY_STOP_WORKFLOWS.remove(workflow_id)
def is_workflow_stopped(self, workflow_id: int):
def is_workflow_stopped(self, workflow_id: int) -> bool:
"""
是否停止工作流
"""
return self.is_system_stopped or workflow_id in self.EMERGENCY_STOP_WORKFLOWS
def stop_transfer(self, path: str):
"""
停止文件整理
"""
if path not in self.EMERGENCY_STOP_TRANSFER:
self.EMERGENCY_STOP_TRANSFER.append(path)
def is_transfer_stopped(self, path: str) -> bool:
"""
是否停止文件整理
"""
if self.is_system_stopped:
return True
if path in self.EMERGENCY_STOP_TRANSFER:
self.EMERGENCY_STOP_TRANSFER.remove(path)
return True
return False
# 全局标识
global_vars = GlobalVar()

View File

@@ -21,7 +21,7 @@ from app.core.config import settings
from app.core.event import eventmanager, Event
from app.db.plugindata_oper import PluginDataOper
from app.db.systemconfig_oper import SystemConfigOper
from app.helper.plugin import PluginHelper
from app.helper.plugin import PluginHelper, PluginMemoryMonitor
from app.helper.sites import SitesHelper # noqa
from app.log import logger
from app.schemas.types import EventType, SystemConfigKey
@@ -98,6 +98,8 @@ class PluginManager(metaclass=Singleton):
self._config_key: str = "plugin.%s"
# 监听器
self._observer: Observer = None
# 内存监控器
self._memory_monitor = PluginMemoryMonitor()
# 开发者模式监测插件修改
if settings.DEV or settings.PLUGIN_AUTO_RELOAD:
self.__start_monitor()
@@ -863,6 +865,28 @@ class PluginManager(metaclass=Singleton):
"""
return list(self._running_plugins.keys())
def get_plugin_memory_stats(self, pid: Optional[str] = None) -> List[Dict[str, Any]]:
"""
获取插件内存统计信息
:param pid: 插件ID为空则获取所有插件
:return: 内存统计信息列表
"""
if pid:
plugin_instance = self._running_plugins.get(pid)
if plugin_instance:
return [self._memory_monitor.get_plugin_memory_usage(pid, plugin_instance)]
else:
return []
else:
return self._memory_monitor.get_all_plugins_memory_usage(self._running_plugins)
def clear_plugin_memory_cache(self, pid: Optional[str] = None):
"""
清除插件内存统计缓存
:param pid: 插件ID为空则清除所有缓存
"""
self._memory_monitor.clear_cache(pid)
def get_online_plugins(self, force: bool = False) -> List[schemas.Plugin]:
"""
获取所有在线插件信息

View File

@@ -252,19 +252,19 @@ def __verify_key(key: str, expected_key: str, key_type: str) -> str:
def verify_apitoken(token: Annotated[str, Security(__get_api_token)]) -> str:
"""
使用 API Token 进行身份认证
:param token: API Token从 URL 查询参数中获取
:param token: API Token从 URL 查询参数中获取 token=xxx
:return: 返回校验通过的 API Token
"""
return __verify_key(token, settings.API_TOKEN, "API_TOKEN")
return __verify_key(token, settings.API_TOKEN, "token")
def verify_apikey(apikey: Annotated[str, Security(__get_api_key)]) -> str:
"""
使用 API Key 进行身份认证
:param apikey: API Key从 URL 查询参数或请求头中获取
:param apikey: API Key从 URL 查询参数中获取 apikey=xxx
:return: 返回校验通过的 API Key
"""
return __verify_key(apikey, settings.API_TOKEN, "API_KEY")
return __verify_key(apikey, settings.API_TOKEN, "apikey")
def verify_password(plain_password: str, hashed_password: str) -> bool:

View File

@@ -34,6 +34,7 @@ class SubscribeOper(DbOper):
"backdrop": mediainfo.get_backdrop_image(),
"vote": mediainfo.vote_average,
"description": mediainfo.overview,
"search_imdbid": 1 if kwargs.get('search_imdbid') else 0,
"date": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
})
if not subscribe:

View File

@@ -4,10 +4,11 @@ import json
import shutil
import site
import sys
import time
import traceback
import zipfile
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Set, Callable, Awaitable
from typing import Dict, List, Optional, Tuple, Set, Callable, Awaitable, Any
import aiofiles
import aioshutil
@@ -24,6 +25,7 @@ from app.db.systemconfig_oper import SystemConfigOper
from app.log import logger
from app.schemas.types import SystemConfigKey
from app.utils.http import RequestUtils, AsyncRequestUtils
from app.utils.memory import MemoryCalculator
from app.utils.singleton import WeakSingleton
from app.utils.system import SystemUtils
from app.utils.url import UrlUtils
@@ -1569,3 +1571,87 @@ class PluginHelper(metaclass=WeakSingleton):
except Exception as e:
logger.error(f"解压 Release 压缩包失败:{e}")
return False, f"解压 Release 压缩包失败:{e}"
class PluginMemoryMonitor:
"""
插件内存监控器
"""
def __init__(self):
self._calculator = MemoryCalculator()
self._cache = {}
self._cache_ttl = 300 # 缓存5分钟
def get_plugin_memory_usage(self, plugin_id: str, plugin_instance: Any) -> Dict[str, Any]:
"""
获取插件内存使用情况
:param plugin_id: 插件ID
:param plugin_instance: 插件实例
:return: 内存使用信息
"""
# 检查缓存
if self._is_cache_valid(plugin_id):
return self._cache[plugin_id]
# 计算内存使用
memory_info = self._calculator.calculate_object_memory(plugin_instance)
# 添加插件信息
result = {
'plugin_id': plugin_id,
'plugin_name': getattr(plugin_instance, 'plugin_name', 'Unknown'),
'plugin_version': getattr(plugin_instance, 'plugin_version', 'Unknown'),
'timestamp': time.time(),
**memory_info
}
# 更新缓存
self._cache[plugin_id] = result
return result
def get_all_plugins_memory_usage(self, plugins: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
获取所有插件的内存使用情况
:param plugins: 插件实例字典
:return: 内存使用信息列表
"""
results = []
for plugin_id, plugin_instance in plugins.items():
if plugin_instance:
try:
memory_info = self.get_plugin_memory_usage(plugin_id, plugin_instance)
results.append(memory_info)
except Exception as e:
logger.error(f"获取插件 {plugin_id} 内存使用情况失败:{str(e)}")
results.append({
'plugin_id': plugin_id,
'plugin_name': getattr(plugin_instance, 'plugin_name', 'Unknown'),
'error': str(e),
'total_memory_bytes': 0,
'total_memory_mb': 0,
'object_count': 0,
'calculation_time_ms': 0
})
# 按内存使用量排序
results.sort(key=lambda x: x.get('total_memory_bytes', 0), reverse=True)
return results
def _is_cache_valid(self, plugin_id: str) -> bool:
"""
检查缓存是否有效
"""
if plugin_id not in self._cache:
return False
return time.time() - self._cache[plugin_id]['timestamp'] < self._cache_ttl
def clear_cache(self, plugin_id: Optional[str] = None):
"""
清除缓存
:param plugin_id: 插件ID为空则清除所有缓存
"""
if plugin_id:
self._cache.pop(plugin_id, None)
else:
self._cache.clear()

View File

@@ -1,55 +1,76 @@
from enum import Enum
from typing import Union, Optional
from app.core.cache import TTLCache
from app.schemas.types import ProgressKey
from app.utils.singleton import WeakSingleton
class ProgressHelper(metaclass=WeakSingleton):
"""
处理进度辅助类
"""
def __init__(self):
self._process_detail = {}
def init_config(self):
pass
def __reset(self, key: Union[ProgressKey, str]):
def __init__(self, key: Union[ProgressKey, str]):
if isinstance(key, Enum):
key = key.value
self._process_detail[key] = {
self._key = key
self._progress = TTLCache(region="progress", maxsize=1024, ttl=24 * 60 * 60)
def __reset(self):
"""
重置进度
"""
self._progress[self._key] = {
"enable": False,
"value": 0,
"text": "请稍候..."
"text": "请稍候...",
"data": {}
}
def start(self, key: Union[ProgressKey, str]):
self.__reset(key)
if isinstance(key, Enum):
key = key.value
self._process_detail[key]['enable'] = True
def end(self, key: Union[ProgressKey, str]):
if isinstance(key, Enum):
key = key.value
if not self._process_detail.get(key):
def start(self):
"""
开始进度
"""
self.__reset()
current = self._progress.get(self._key)
if not current:
return
self._process_detail[key] = {
"enable": False,
"value": 100,
"text": "正在处理..."
}
current['enable'] = True
self._progress[self._key] = current
def update(self, key: Union[ProgressKey, str], value: Union[float, int] = None, text: Optional[str] = None):
if isinstance(key, Enum):
key = key.value
if not self._process_detail.get(key, {}).get('enable'):
def end(self):
"""
结束进度
"""
current = self._progress.get(self._key)
if not current:
return
current.update(
{
"enable": False,
"value": 100,
"text": ""
}
)
self._progress[self._key] = current
def update(self, value: Union[float, int] = None, text: Optional[str] = None, data: dict = None):
"""
更新进度
"""
current = self._progress.get(self._key)
if not current or not current.get('enable'):
return
if value:
self._process_detail[key]['value'] = value
current['value'] = value
if text:
self._process_detail[key]['text'] = text
current['text'] = text
if data:
if not current.get('data'):
current['data'] = {}
current['data'].update(data)
self._progress[self._key] = current
def get(self, key: Union[ProgressKey, str]) -> dict:
if isinstance(key, Enum):
key = key.value
return self._process_detail.get(key)
def get(self) -> dict:
return self._progress.get(self._key)

View File

@@ -1,6 +1,6 @@
import json
import pickle
from typing import Any, Optional, Generator, Tuple, AsyncGenerator
from typing import Any, Optional, Generator, Tuple, AsyncGenerator, Union
from urllib.parse import quote
import redis
@@ -140,20 +140,34 @@ class RedisHelper(metaclass=Singleton):
logger.error(f"Failed to set Redis maxmemory or policy: {e}")
@staticmethod
def get_region(region: Optional[str] = "DEFAULT"):
def __get_region(region: Optional[str] = None):
"""
获取缓存的区
"""
return f"region:{region}" if region else "region:default"
return f"region:{quote(region)}" if region else "region:DEFAULT"
def get_redis_key(self, region: str, key: str) -> str:
def __make_redis_key(self, region: str, key: str) -> str:
"""
获取缓存Key
"""
# 使用region作为缓存键的一部分
region = self.get_region(quote(region))
region = self.__get_region(region)
return f"{region}:key:{quote(key)}"
@staticmethod
def __get_original_key(redis_key: Union[str, bytes]) -> str:
"""
从Redis键中提取原始key
"""
try:
if isinstance(redis_key, bytes):
redis_key = redis_key.decode("utf-8")
parts = redis_key.split(":key:")
return parts[-1]
except Exception as e:
logger.warn(f"Failed to parse redis key: {redis_key}, error: {e}")
return redis_key
def set(self, key: str, value: Any, ttl: Optional[int] = None,
region: Optional[str] = "DEFAULT", **kwargs) -> None:
"""
@@ -167,7 +181,7 @@ class RedisHelper(metaclass=Singleton):
"""
try:
self._connect()
redis_key = self.get_redis_key(region, key)
redis_key = self.__make_redis_key(region, key)
# 对值进行序列化
serialized_value = serialize(value)
kwargs.pop("maxsize", None)
@@ -185,7 +199,7 @@ class RedisHelper(metaclass=Singleton):
"""
try:
self._connect()
redis_key = self.get_redis_key(region, key)
redis_key = self.__make_redis_key(region, key)
return self.client.exists(redis_key) == 1
except Exception as e:
logger.error(f"Failed to exists key: {key} region: {region}, error: {e}")
@@ -201,7 +215,7 @@ class RedisHelper(metaclass=Singleton):
"""
try:
self._connect()
redis_key = self.get_redis_key(region, key)
redis_key = self.__make_redis_key(region, key)
value = self.client.get(redis_key)
if value is not None:
return deserialize(value)
@@ -219,7 +233,7 @@ class RedisHelper(metaclass=Singleton):
"""
try:
self._connect()
redis_key = self.get_redis_key(region, key)
redis_key = self.__make_redis_key(region, key)
self.client.delete(redis_key)
except Exception as e:
logger.error(f"Failed to delete key: {key} in region: {region}, error: {e}")
@@ -233,7 +247,7 @@ class RedisHelper(metaclass=Singleton):
try:
self._connect()
if region:
cache_region = self.get_region(quote(region))
cache_region = self.__get_region(region)
redis_key = f"{cache_region}:key:*"
with self.client.pipeline() as pipe:
for key in self.client.scan_iter(redis_key):
@@ -256,17 +270,17 @@ class RedisHelper(metaclass=Singleton):
try:
self._connect()
if region:
cache_region = self.get_region(quote(region))
cache_region = self.__get_region(region)
redis_key = f"{cache_region}:key:*"
for key in self.client.scan_iter(redis_key):
value = self.client.get(key)
if value is not None:
yield key, deserialize(value)
yield self.__get_original_key(key), deserialize(value)
else:
for key in self.client.scan_iter("*"):
value = self.client.get(key)
if value is not None:
yield key, deserialize(value)
yield self.__get_original_key(key), deserialize(value)
except Exception as e:
logger.error(f"Failed to get items from Redis, region: {region}, error: {e}")
@@ -367,20 +381,34 @@ class AsyncRedisHelper(metaclass=Singleton):
logger.error(f"Failed to set Redis maxmemory or policy (async): {e}")
@staticmethod
def get_region(region: Optional[str] = "DEFAULT"):
def __get_region(region: Optional[str] = "DEFAULT"):
"""
获取缓存的区
"""
return f"region:{region}" if region else "region:default"
def get_redis_key(self, region: str, key: str) -> str:
def __make_redis_key(self, region: str, key: str) -> str:
"""
获取缓存Key
"""
# 使用region作为缓存键的一部分
region = self.get_region(quote(region))
region = self.__get_region(region)
return f"{region}:key:{quote(key)}"
@staticmethod
def __get_original_key(redis_key: Union[str, bytes]) -> str:
"""
从Redis键中提取原始key
"""
try:
if isinstance(redis_key, bytes):
redis_key = redis_key.decode("utf-8")
parts = redis_key.split(":key:")
return parts[-1]
except Exception as e:
logger.warn(f"Failed to parse redis key: {redis_key}, error: {e}")
return redis_key
async def set(self, key: str, value: Any, ttl: Optional[int] = None,
region: Optional[str] = "DEFAULT", **kwargs) -> None:
"""
@@ -394,7 +422,7 @@ class AsyncRedisHelper(metaclass=Singleton):
"""
try:
await self._connect()
redis_key = self.get_redis_key(region, key)
redis_key = self.__make_redis_key(region, key)
# 对值进行序列化
serialized_value = serialize(value)
kwargs.pop("maxsize", None)
@@ -412,7 +440,7 @@ class AsyncRedisHelper(metaclass=Singleton):
"""
try:
await self._connect()
redis_key = self.get_redis_key(region, key)
redis_key = self.__make_redis_key(region, key)
result = await self.client.exists(redis_key)
return result == 1
except Exception as e:
@@ -429,7 +457,7 @@ class AsyncRedisHelper(metaclass=Singleton):
"""
try:
await self._connect()
redis_key = self.get_redis_key(region, key)
redis_key = self.__make_redis_key(region, key)
value = await self.client.get(redis_key)
if value is not None:
return deserialize(value)
@@ -447,7 +475,7 @@ class AsyncRedisHelper(metaclass=Singleton):
"""
try:
await self._connect()
redis_key = self.get_redis_key(region, key)
redis_key = self.__make_redis_key(region, key)
await self.client.delete(redis_key)
except Exception as e:
logger.error(f"Failed to delete key (async): {key} in region: {region}, error: {e}")
@@ -461,7 +489,7 @@ class AsyncRedisHelper(metaclass=Singleton):
try:
await self._connect()
if region:
cache_region = self.get_region(quote(region))
cache_region = self.__get_region(region)
redis_key = f"{cache_region}:key:*"
async with self.client.pipeline() as pipe:
async for key in self.client.scan_iter(redis_key):
@@ -484,17 +512,17 @@ class AsyncRedisHelper(metaclass=Singleton):
try:
await self._connect()
if region:
cache_region = self.get_region(quote(region))
cache_region = self.__get_region(region)
redis_key = f"{cache_region}:key:*"
async for key in self.client.scan_iter(redis_key):
value = await self.client.get(key)
if value is not None:
yield key, deserialize(value)
yield self.__get_original_key(key), deserialize(value)
else:
async for key in self.client.scan_iter("*"):
value = await self.client.get(key)
if value is not None:
yield key, deserialize(value)
yield self.__get_original_key(key), deserialize(value)
except Exception as e:
logger.error(f"Failed to get items from Redis, region: {region}, error: {e}")

View File

@@ -1,5 +1,7 @@
import os
import signal
import threading
import time
from pathlib import Path
from typing import Tuple
@@ -41,8 +43,8 @@ class SystemHelper:
判断是否可以内部重启
"""
return (
Path("/var/run/docker.sock").exists()
or settings.DOCKER_CLIENT_API != "tcp://127.0.0.1:38379"
Path("/var/run/docker.sock").exists()
or settings.DOCKER_CLIENT_API != "tcp://127.0.0.1:38379"
)
@staticmethod
@@ -64,7 +66,7 @@ class SystemHelper:
if index_resolv_conf != -1:
index_second_slash = data.rfind(" ", 0, index_resolv_conf)
index_first_slash = (
data.rfind("/", 0, index_second_slash) + 1
data.rfind("/", 0, index_second_slash) + 1
)
container_id = data[index_first_slash:index_second_slash]
except Exception as e:
@@ -113,6 +115,8 @@ class SystemHelper:
if has_restart_policy:
# 有重启策略,使用优雅退出方式
logger.info("检测到容器配置了自动重启策略,使用优雅重启方式...")
# 启动优雅退出超时监控
SystemHelper._start_graceful_shutdown_monitor()
# 发送SIGTERM信号给当前进程触发优雅停止
os.kill(os.getpid(), signal.SIGTERM)
return True, ""
@@ -126,6 +130,25 @@ class SystemHelper:
logger.warning("降级为Docker API重启...")
return SystemHelper._docker_api_restart()
@staticmethod
def _start_graceful_shutdown_monitor():
"""
启动优雅退出超时监控
如果30秒内进程没有退出则使用Docker API强制重启
"""
def monitor_thread():
time.sleep(30) # 等待30秒
logger.warning("优雅退出超时30秒使用Docker API强制重启...")
try:
SystemHelper._docker_api_restart()
except Exception as e:
logger.error(f"强制重启失败: {str(e)}")
# 在后台线程中启动监控
thread = threading.Thread(target=monitor_thread, daemon=True)
thread.start()
@staticmethod
def _docker_api_restart() -> Tuple[bool, str]:
"""

View File

@@ -7,6 +7,7 @@ from urllib.parse import unquote
from torrentool.api import Torrent
from app.core.cache import FileCache
from app.core.cache import TTLCache
from app.core.config import settings
from app.core.context import Context, TorrentInfo, MediaInfo
from app.core.meta import MetaBase
@@ -16,17 +17,16 @@ from app.db.systemconfig_oper import SystemConfigOper
from app.log import logger
from app.schemas.types import MediaType, SystemConfigKey
from app.utils.http import RequestUtils
from app.utils.singleton import WeakSingleton
from app.utils.string import StringUtils
class TorrentHelper(metaclass=WeakSingleton):
class TorrentHelper:
"""
种子帮助类
"""
def __init__(self):
self._invalid_torrents = []
self._invalid_torrents = TTLCache(maxsize=128, ttl=3600 * 24)
def download_torrent(self, url: str,
cookie: Optional[str] = None,
@@ -199,8 +199,14 @@ class TorrentHelper(metaclass=WeakSingleton):
:param torrent_content: 种子内容
:return: 文件夹名、文件清单,单文件种子返回空文件夹名
"""
if not torrent_content:
return "", []
# 检查是否为磁力链接
if StringUtils.is_magnet_link(torrent_content):
return "", []
try:
# 解析种子内容
torrentinfo = Torrent.from_string(torrent_content)
@@ -346,7 +352,7 @@ class TorrentHelper(metaclass=WeakSingleton):
添加无效种子
"""
if url not in self._invalid_torrents:
self._invalid_torrents.append(url)
self._invalid_torrents[url] = True
@staticmethod
def match_torrent(mediainfo: MediaInfo, torrent_meta: MetaBase, torrent: TorrentInfo) -> bool:

View File

@@ -938,6 +938,8 @@ class DoubanModule(_ModuleBase):
"""
搜索人物信息
"""
if settings.SEARCH_SOURCE and "douban" not in settings.SEARCH_SOURCE:
return None
if not name:
return []
result = self.doubanapi.person_search(keyword=name)
@@ -956,6 +958,8 @@ class DoubanModule(_ModuleBase):
"""
搜索人物信息(异步版本)
"""
if settings.SEARCH_SOURCE and "douban" not in settings.SEARCH_SOURCE:
return None
if not name:
return []
result = await self.doubanapi.async_person_search(keyword=name)

View File

@@ -1,10 +1,39 @@
from abc import ABCMeta, abstractmethod
from pathlib import Path
from typing import Optional, List, Dict, Tuple
from typing import Optional, List, Dict, Tuple, Callable, Union
from tqdm import tqdm
from app import schemas
from app.helper.progress import ProgressHelper
from app.helper.storage import StorageHelper
from app.log import logger
from app.utils.crypto import HashUtils
def transfer_process(path: str) -> Callable[[int | float], None]:
"""
传输进度回调
"""
pbar = tqdm(total=100, desc="整理进度", unit="%")
progress = ProgressHelper(HashUtils.md5(path))
progress.start()
def update_progress(percent: Union[int, float]) -> None:
"""
更新进度百分比
"""
percent_value = int(percent)
pbar.n = percent_value
# 更新进度
pbar.refresh()
progress.update(value=percent_value, text=f"{path} 进度:{percent_value}%")
# 完成时结束
if percent_value >= 100:
progress.end()
pbar.close()
return update_progress
class StorageBase(metaclass=ABCMeta):

View File

@@ -1,6 +1,5 @@
import base64
import hashlib
import io
import secrets
import threading
import time
@@ -8,12 +7,12 @@ from pathlib import Path
from typing import List, Optional, Tuple, Union
import requests
from tqdm import tqdm
from app import schemas
from app.core.config import settings
from app.core.config import settings, global_vars
from app.log import logger
from app.modules.filemanager import StorageBase
from app.modules.filemanager.storages import transfer_process
from app.schemas.types import StorageSchema
from app.utils.singleton import WeakSingleton
from app.utils.string import StringUtils
@@ -46,6 +45,9 @@ class AliPan(StorageBase, metaclass=WeakSingleton):
# 基础url
base_url = "https://openapi.alipan.com"
# 文件块大小默认10MB
chunk_size = 10 * 1024 * 1024
def __init__(self):
super().__init__()
self._auth_state = {}
@@ -580,29 +582,6 @@ class AliPan(StorageBase, metaclass=WeakSingleton):
raise Exception(resp.get("message"))
return resp
@staticmethod
def _log_progress(desc: str, total: int) -> tqdm:
"""
创建一个可以输出到日志的进度条
"""
class TqdmToLogger(io.StringIO):
def write(s, buf): # noqa
buf = buf.strip('\r\n\t ')
if buf:
logger.info(buf)
return tqdm(
total=total,
unit='B',
unit_scale=True,
desc=desc,
file=TqdmToLogger(),
mininterval=1.0,
maxinterval=5.0,
miniters=1
)
def upload(self, target_dir: schemas.FileItem, local_path: Path,
new_name: Optional[str] = None) -> Optional[schemas.FileItem]:
"""
@@ -643,21 +622,26 @@ class AliPan(StorageBase, metaclass=WeakSingleton):
# 4. 初始化进度条
logger.info(f"【阿里云盘】开始上传: {local_path} -> {target_path},分片数:{len(part_info_list)}")
progress_bar = self._log_progress(f"【阿里云盘】{target_name} 上传进度", file_size)
progress_callback = transfer_process(local_path.as_posix())
# 5. 分片上传循环
uploaded_size = 0
with open(local_path, 'rb') as f:
for part_info in part_info_list:
part_num = part_info['part_number']
if global_vars.is_transfer_stopped(local_path.as_posix()):
logger.info(f"【阿里云盘】{target_name} 上传已取消!")
return None
# 计算分片参数
part_num = part_info['part_number']
start = (part_num - 1) * chunk_size
end = min(start + chunk_size, file_size)
current_chunk_size = end - start
# 更新进度条(已存在的分片)
if part_num in uploaded_parts:
progress_bar.update(current_chunk_size)
uploaded_size += current_chunk_size
progress_callback((uploaded_size * 100) / file_size)
continue
# 准备分片数据
@@ -675,7 +659,6 @@ class AliPan(StorageBase, metaclass=WeakSingleton):
upload_url = new_urls[0]['upload_url']
else:
upload_url = part_info['upload_url']
# 执行上传
logger.info(
f"【阿里云盘】开始 第{attempt + 1}次 上传 {target_name} 分片 {part_num} ...")
@@ -694,13 +677,13 @@ class AliPan(StorageBase, metaclass=WeakSingleton):
# 处理上传结果
if success:
uploaded_parts.add(part_num)
progress_bar.update(current_chunk_size)
uploaded_size += current_chunk_size
progress_callback((uploaded_size * 100) / file_size)
else:
raise Exception(f"【阿里云盘】{target_name} 分片 {part_num} 上传失败!")
# 6. 关闭进度条
if progress_bar:
progress_bar.close()
progress_callback(100)
# 7. 完成上传
result = self._complete_upload(drive_id=target_dir.drive_id, file_id=file_id, upload_id=upload_id)
@@ -712,7 +695,7 @@ class AliPan(StorageBase, metaclass=WeakSingleton):
def download(self, fileitem: schemas.FileItem, path: Path = None) -> Optional[Path]:
"""
限速处理的下载
实时进度显示的下载
"""
download_info = self._request_api(
"POST",
@@ -723,14 +706,57 @@ class AliPan(StorageBase, metaclass=WeakSingleton):
}
)
if not download_info:
logger.error(f"【阿里云盘】获取下载链接失败: {fileitem.name}")
return None
download_url = download_info.get("url")
if not download_url:
logger.error(f"【阿里云盘】下载链接为空: {fileitem.name}")
return None
local_path = path or settings.TEMP_PATH / fileitem.name
with requests.get(download_url, stream=True) as r:
r.raise_for_status()
with open(local_path, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
# 获取文件大小
file_size = fileitem.size
# 初始化进度条
logger.info(f"【阿里云盘】开始下载: {fileitem.name} -> {local_path}")
progress_callback = transfer_process(Path(fileitem.path).as_posix())
try:
with requests.get(download_url, stream=True) as r:
r.raise_for_status()
downloaded_size = 0
with open(local_path, "wb") as f:
for chunk in r.iter_content(chunk_size=self.chunk_size):
if global_vars.is_transfer_stopped(fileitem.path):
logger.info(f"【阿里云盘】{fileitem.path} 下载已取消!")
return None
if chunk:
f.write(chunk)
# 更新进度
downloaded_size += len(chunk)
if file_size:
progress = (downloaded_size * 100) / file_size
progress_callback(progress)
# 完成下载
progress_callback(100)
logger.info(f"【阿里云盘】下载完成: {fileitem.name}")
except requests.exceptions.RequestException as e:
logger.error(f"【阿里云盘】下载网络错误: {fileitem.name} - {str(e)}")
# 删除可能部分下载的文件
if local_path.exists():
local_path.unlink()
return None
except Exception as e:
logger.error(f"【阿里云盘】下载失败: {fileitem.name} - {str(e)}")
# 删除可能部分下载的文件
if local_path.exists():
local_path.unlink()
return None
return local_path
def check(self) -> bool:

View File

@@ -1,4 +1,5 @@
import json
import time
from datetime import datetime
from pathlib import Path
from typing import Optional, List
@@ -7,9 +8,9 @@ import requests
from app import schemas
from app.core.cache import cached
from app.core.config import settings
from app.core.config import settings, global_vars
from app.log import logger
from app.modules.filemanager.storages import StorageBase
from app.modules.filemanager.storages import StorageBase, transfer_process
from app.schemas.types import StorageSchema
from app.utils.http import RequestUtils
from app.utils.singleton import WeakSingleton
@@ -31,6 +32,7 @@ class Alist(StorageBase, metaclass=WeakSingleton):
"move": "移动",
}
# 快照检查目录修改时间
snapshot_check_folder_modtime = settings.OPENLIST_SNAPSHOT_CHECK_FOLDER_MODTIME
def __init__(self):
@@ -42,6 +44,17 @@ class Alist(StorageBase, metaclass=WeakSingleton):
"""
self.__generate_token.cache_clear() # noqa
def _delay_get_item(self, path: Path) -> Optional[schemas.FileItem]:
"""
自动延迟重试 get_item 模块
"""
for _ in range(2):
time.sleep(2)
fileitem = self.get_item(path)
if fileitem:
return fileitem
return None
@property
def __get_base_url(self) -> str:
"""
@@ -269,7 +282,7 @@ class Alist(StorageBase, metaclass=WeakSingleton):
logger.warn(f'【OpenList】创建目录 {path} 失败,错误信息:{result["message"]}')
return None
return self.get_item(path)
return self._delay_get_item(path)
def get_folder(self, path: Path) -> Optional[schemas.FileItem]:
"""
@@ -560,6 +573,9 @@ class Alist(StorageBase, metaclass=WeakSingleton):
r.raise_for_status()
with open(local_path, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
if global_vars.is_transfer_stopped(fileitem.path):
logger.info(f"【OpenList】{fileitem.path} 下载已取消!")
return None
f.write(chunk)
if local_path.exists():
@@ -570,36 +586,81 @@ class Alist(StorageBase, metaclass=WeakSingleton):
self, fileitem: schemas.FileItem, path: Path, new_name: Optional[str] = None, task: bool = False
) -> Optional[schemas.FileItem]:
"""
上传文件
上传文件(带进度)
:param fileitem: 上传目录项
:param path: 本地文件路径
:param new_name: 上传后文件名
:param task: 是否为任务默认为False避免未完成上传时对文件进行操作
"""
encoded_path = UrlUtils.quote((Path(fileitem.path) / path.name).as_posix())
headers = self.__get_header_with_token()
headers.setdefault("Content-Type", "application/octet-stream")
headers.setdefault("As-Task", str(task).lower())
headers.setdefault("File-Path", encoded_path)
with open(path, "rb") as f:
resp = RequestUtils(headers=headers).put_res(
self.__get_api_url("/api/fs/put"),
data=f,
)
try:
# 获取文件大小
target_name = new_name or path.name
target_path = Path(fileitem.path) / target_name
if resp is None:
logger.warn(f"【OpenList】请求上传文件 {path} 失败")
# 初始化进度回调
progress_callback = transfer_process(path.as_posix())
# 准备上传请求
encoded_path = UrlUtils.quote(target_path.as_posix())
headers = self.__get_header_with_token()
headers.setdefault("Content-Type", "application/octet-stream")
headers.setdefault("As-Task", str(task).lower())
headers.setdefault("File-Path", encoded_path)
# 创建自定义的文件流,支持进度回调
class ProgressFileReader:
def __init__(self, file_path: Path, callback):
self.file = open(file_path, 'rb')
self.callback = callback
self.uploaded_size = 0
self.file_size = file_path.stat().st_size
def read(self, size=-1):
if global_vars.is_transfer_stopped(path.as_posix()):
logger.info(f"【OpenList】{path} 上传已取消!")
return None
chunk = self.file.read(size)
if chunk:
self.uploaded_size += len(chunk)
if self.callback:
percent = (self.uploaded_size * 100) / self.file_size
self.callback(percent)
return chunk
def close(self):
self.file.close()
# 使用自定义文件流上传
progress_reader = ProgressFileReader(path, progress_callback)
try:
resp = RequestUtils(headers=headers).put_res(
self.__get_api_url("/api/fs/put"),
data=progress_reader,
)
finally:
progress_reader.close()
if resp is None:
logger.warn(f"【OpenList】请求上传文件 {path} 失败")
return None
if resp.status_code != 200:
logger.warn(f"【OpenList】请求上传文件 {path} 失败,状态码:{resp.status_code}")
return None
# 完成上传
progress_callback(100)
# 获取上传后的文件项
new_item = self._delay_get_item(target_path)
if new_item and new_name and new_name != path.name:
if self.rename(new_item, new_name):
return self._delay_get_item(Path(new_item.path).with_name(new_name))
return new_item
except Exception as e:
logger.error(f"【OpenList】上传文件 {path} 失败:{e}")
return None
if resp.status_code != 200:
logger.warn(f"【OpenList】请求上传文件 {path} 失败,状态码:{resp.status_code}")
return None
new_item = self.get_item(Path(fileitem.path) / path.name)
if new_item and new_name and new_name != path.name:
if self.rename(new_item, new_name):
return self.get_item(Path(new_item.path).with_name(new_name))
return new_item
def detail(self, fileitem: schemas.FileItem) -> Optional[schemas.FileItem]:
"""
@@ -658,9 +719,9 @@ class Alist(StorageBase, metaclass=WeakSingleton):
return False
# 重命名
if fileitem.name != new_name:
self.rename(
self.get_item(path / fileitem.name), new_name
)
new_item = self._delay_get_item(path / fileitem.name)
if new_item:
self.rename(new_item, new_name)
return True
def move(self, fileitem: schemas.FileItem, path: Path, new_name: str) -> bool:

View File

@@ -3,9 +3,10 @@ from pathlib import Path
from typing import Optional, List
from app import schemas
from app.core.config import global_vars
from app.helper.directory import DirectoryHelper
from app.log import logger
from app.modules.filemanager.storages import StorageBase
from app.modules.filemanager.storages import StorageBase, transfer_process
from app.schemas.types import StorageSchema
from app.utils.system import SystemUtils
@@ -25,6 +26,9 @@ class LocalStorage(StorageBase):
"softlink": "软链接"
}
# 文件块大小默认100MB
chunk_size = 100 * 1024 * 1024
def init_storage(self):
"""
初始化
@@ -95,7 +99,7 @@ class LocalStorage(StorageBase):
# 遍历目录
path_obj = Path(path)
if not path_obj.exists():
logger.warn(f"local】目录不存在:{path}")
logger.warn(f"本地】目录不存在:{path}")
return []
# 如果是文件
@@ -167,7 +171,7 @@ class LocalStorage(StorageBase):
else:
shutil.rmtree(path_obj, ignore_errors=True)
except Exception as e:
logger.error(f"local】删除文件失败:{e}")
logger.error(f"本地】删除文件失败:{e}")
return False
return True
@@ -181,7 +185,7 @@ class LocalStorage(StorageBase):
try:
path_obj.rename(path_obj.parent / name)
except Exception as e:
logger.error(f"local】重命名文件失败:{e}")
logger.error(f"本地】重命名文件失败:{e}")
return False
return True
@@ -191,21 +195,94 @@ class LocalStorage(StorageBase):
"""
return Path(fileitem.path)
def upload(self, fileitem: schemas.FileItem, path: Path,
new_name: Optional[str] = None) -> Optional[schemas.FileItem]:
def _copy_with_progress(self, src: Path, dest: Path):
"""
上传文件
:param fileitem: 上传目录项
:param path: 本地文件路径
:param new_name: 上传后文件名
分块复制文件并回调进度
"""
dir_path = Path(fileitem.path)
target_path = dir_path / (new_name or path.name)
code, message = SystemUtils.move(path, target_path)
if code != 0:
logger.error(f"【local】移动文件失败{message}")
return None
return self.get_item(target_path)
total_size = src.stat().st_size
copied_size = 0
progress_callback = transfer_process(src.as_posix())
try:
with open(src, "rb") as fsrc, open(dest, "wb") as fdst:
while True:
if global_vars.is_transfer_stopped(src.as_posix()):
logger.info(f"【本地】{src} 复制已取消!")
return False
buf = fsrc.read(self.chunk_size)
if not buf:
break
fdst.write(buf)
copied_size += len(buf)
# 更新进度
if progress_callback:
percent = copied_size / total_size * 100
progress_callback(percent)
# 保留文件时间戳、权限等信息
shutil.copystat(src, dest)
return True
except Exception as e:
logger.error(f"【本地】复制文件 {src} 失败:{e}")
return False
finally:
progress_callback(100)
def upload(
self,
fileitem: schemas.FileItem,
path: Path,
new_name: Optional[str] = None
) -> Optional[schemas.FileItem]:
"""
上传文件(带进度)
"""
try:
dir_path = Path(fileitem.path)
target_path = dir_path / (new_name or path.name)
if self._copy_with_progress(path, target_path):
# 上传删除源文件
path.unlink()
return self.get_item(target_path)
except Exception as err:
logger.error(f"【本地】移动文件失败:{err}")
return None
def copy(
self,
fileitem: schemas.FileItem,
path: Path,
new_name: str
) -> bool:
"""
复制文件(带进度)
"""
try:
src = Path(fileitem.path)
dest = path / new_name
if self._copy_with_progress(src, dest):
return True
except Exception as err:
logger.error(f"【本地】复制文件失败:{err}")
return False
def move(
self,
fileitem: schemas.FileItem,
path: Path,
new_name: str
) -> bool:
"""
移动文件(带进度)
"""
try:
src = Path(fileitem.path)
dest = path / new_name
if self._copy_with_progress(src, dest):
# 复制成功删除源文件
src.unlink()
return True
except Exception as err:
logger.error(f"【本地】移动文件失败:{err}")
return False
def link(self, fileitem: schemas.FileItem, target_file: Path) -> bool:
"""
@@ -214,7 +291,7 @@ class LocalStorage(StorageBase):
file_path = Path(fileitem.path)
code, message = SystemUtils.link(file_path, target_file)
if code != 0:
logger.error(f"local】硬链接文件失败:{message}")
logger.error(f"本地】硬链接文件失败:{message}")
return False
return True
@@ -225,35 +302,7 @@ class LocalStorage(StorageBase):
file_path = Path(fileitem.path)
code, message = SystemUtils.softlink(file_path, target_file)
if code != 0:
logger.error(f"local】软链接文件失败:{message}")
return False
return True
def copy(self, fileitem: schemas.FileItem, path: Path, new_name: str) -> bool:
"""
复制文件
:param fileitem: 文件项
:param path: 目标目录
:param new_name: 新文件名
"""
file_path = Path(fileitem.path)
code, message = SystemUtils.copy(file_path, path / new_name)
if code != 0:
logger.error(f"【local】复制文件失败{message}")
return False
return True
def move(self, fileitem: schemas.FileItem, path: Path, new_name: str) -> bool:
"""
移动文件
:param fileitem: 文件项
:param path: 目标目录
:param new_name: 新文件名
"""
file_path = Path(fileitem.path)
code, message = SystemUtils.move(file_path, path / new_name)
if code != 0:
logger.error(f"【local】移动文件失败{message}")
logger.error(f"本地】软链接文件失败:{message}")
return False
return True

View File

@@ -6,7 +6,7 @@ from typing import Optional, List
from app import schemas
from app.core.config import settings
from app.log import logger
from app.modules.filemanager.storages import StorageBase
from app.modules.filemanager.storages import StorageBase, transfer_process
from app.schemas.types import StorageSchema
from app.utils.string import StringUtils
from app.utils.system import SystemUtils
@@ -58,6 +58,41 @@ class Rclone(StorageBase):
else:
return None
@staticmethod
def __parse_rclone_progress(line: str) -> Optional[float]:
"""
解析rclone进度输出
"""
if not line:
return None
line = line.strip()
# 检查是否包含百分比
if '%' not in line:
return None
try:
# 尝试多种进度输出格式
if 'ETA' in line:
# 格式: "Transferred: 1.234M / 5.678M, 22%, 1.234MB/s, ETA 2m3s"
percent_str = line.split('%')[0].split()[-1]
return float(percent_str)
elif 'Transferred:' in line and '100%' in line:
# 传输完成
return 100.0
else:
# 其他包含百分比的格式
parts = line.split()
for part in parts:
if '%' in part:
percent_str = part.replace('%', '')
return float(percent_str)
except (ValueError, IndexError):
pass
return None
def __get_rcloneitem(self, item: dict, parent: Optional[str] = "/") -> schemas.FileItem:
"""
获取rclone文件项
@@ -238,47 +273,115 @@ class Rclone(StorageBase):
def download(self, fileitem: schemas.FileItem, path: Path = None) -> Optional[Path]:
"""
下载文件
带实时进度显示的下载
"""
path = (path or settings.TEMP_PATH) / fileitem.name
local_path = (path or settings.TEMP_PATH) / fileitem.name
# 初始化进度条
logger.info(f"【rclone】开始下载: {fileitem.name} -> {local_path}")
progress_callback = transfer_process(Path(fileitem.path).as_posix())
try:
retcode = subprocess.run(
# 使用rclone的进度显示功能
process = subprocess.Popen(
[
'rclone', 'copyto',
'--progress', # 启用进度显示
'--stats', '1s', # 每秒更新一次统计信息
f'MP:{fileitem.path}',
f'{path}'
f'{local_path}'
],
startupinfo=self.__get_hidden_shell()
).returncode
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
startupinfo=self.__get_hidden_shell(),
universal_newlines=True,
bufsize=1
)
# 监控进度输出
last_progress = 0
for line in process.stdout:
if line:
# 解析rclone的进度输出
progress = self.__parse_rclone_progress(line)
if progress is not None and progress > last_progress:
progress_callback(progress)
last_progress = progress
if progress >= 100:
break
# 等待进程完成
retcode = process.wait()
if retcode == 0:
return path
logger.info(f"【rclone】下载完成: {fileitem.name}")
return local_path
else:
logger.error(f"【rclone】下载失败: {fileitem.name}")
return None
except Exception as err:
logger.error(f"【rclone】复制文件失败:{err}")
return None
logger.error(f"【rclone】下载失败: {fileitem.name} - {err}")
# 删除可能部分下载的文件
if local_path.exists():
local_path.unlink()
return None
def upload(self, fileitem: schemas.FileItem, path: Path,
new_name: Optional[str] = None) -> Optional[schemas.FileItem]:
"""
上传文件
带实时进度显示的上传
:param fileitem: 上传目录项
:param path: 本地文件路径
:param new_name: 上传后文件名
"""
target_name = new_name or path.name
new_path = Path(fileitem.path) / target_name
# 初始化进度条
logger.info(f"【rclone】开始上传: {path} -> {new_path}")
progress_callback = transfer_process(path.as_posix())
try:
new_path = Path(fileitem.path) / (new_name or path.name)
retcode = subprocess.run(
# 使用rclone的进度显示功能
process = subprocess.Popen(
[
'rclone', 'copyto',
'--progress', # 启用进度显示
'--stats', '1s', # 每秒更新一次统计信息
path.as_posix(),
f'MP:{new_path}'
],
startupinfo=self.__get_hidden_shell()
).returncode
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
startupinfo=self.__get_hidden_shell(),
universal_newlines=True,
bufsize=1
)
# 监控进度输出
last_progress = 0
for line in process.stdout:
if line:
# 解析rclone的进度输出
progress = self.__parse_rclone_progress(line)
if progress is not None and progress > last_progress:
progress_callback(progress)
last_progress = progress
if progress >= 100:
break
# 等待进程完成
retcode = process.wait()
if retcode == 0:
logger.info(f"【rclone】上传完成: {target_name}")
return self.get_item(new_path)
else:
logger.error(f"【rclone】上传失败: {target_name}")
return None
except Exception as err:
logger.error(f"【rclone】上传文件失败:{err}")
return None
logger.error(f"【rclone】上传失败: {target_name} - {err}")
return None
def detail(self, fileitem: schemas.FileItem) -> Optional[schemas.FileItem]:
"""
@@ -307,20 +410,53 @@ class Rclone(StorageBase):
:param path: 目标目录
:param new_name: 新文件名
"""
target_path = path / new_name
# 初始化进度条
logger.info(f"【rclone】开始移动: {fileitem.path} -> {target_path}")
progress_callback = transfer_process(Path(fileitem.path).as_posix())
try:
retcode = subprocess.run(
# 使用rclone的进度显示功能
process = subprocess.Popen(
[
'rclone', 'moveto',
'--progress', # 启用进度显示
'--stats', '1s', # 每秒更新一次统计信息
f'MP:{fileitem.path}',
f'MP:{path / new_name}'
f'MP:{target_path}'
],
startupinfo=self.__get_hidden_shell()
).returncode
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
startupinfo=self.__get_hidden_shell(),
universal_newlines=True,
bufsize=1
)
# 监控进度输出
last_progress = 0
for line in process.stdout:
if line:
# 解析rclone的进度输出
progress = self.__parse_rclone_progress(line)
if progress is not None and progress > last_progress:
progress_callback(progress)
last_progress = progress
if progress >= 100:
break
# 等待进程完成
retcode = process.wait()
if retcode == 0:
logger.info(f"【rclone】移动完成: {fileitem.name}")
return True
else:
logger.error(f"【rclone】移动失败: {fileitem.name}")
return False
except Exception as err:
logger.error(f"【rclone】移动文件失败:{err}")
return False
logger.error(f"【rclone】移动失败: {fileitem.name} - {err}")
return False
def copy(self, fileitem: schemas.FileItem, path: Path, new_name: str) -> bool:
"""
@@ -329,20 +465,53 @@ class Rclone(StorageBase):
:param path: 目标目录
:param new_name: 新文件名
"""
target_path = path / new_name
# 初始化进度条
logger.info(f"【rclone】开始复制: {fileitem.path} -> {target_path}")
progress_callback = transfer_process(Path(fileitem.path).as_posix())
try:
retcode = subprocess.run(
# 使用rclone的进度显示功能
process = subprocess.Popen(
[
'rclone', 'copyto',
'--progress', # 启用进度显示
'--stats', '1s', # 每秒更新一次统计信息
f'MP:{fileitem.path}',
f'MP:{path / new_name}'
f'MP:{target_path}'
],
startupinfo=self.__get_hidden_shell()
).returncode
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
startupinfo=self.__get_hidden_shell(),
universal_newlines=True,
bufsize=1
)
# 监控进度输出
last_progress = 0
for line in process.stdout:
if line:
# 解析rclone的进度输出
progress = self.__parse_rclone_progress(line)
if progress is not None and progress > last_progress:
progress_callback(progress)
last_progress = progress
if progress >= 100:
break
# 等待进程完成
retcode = process.wait()
if retcode == 0:
logger.info(f"【rclone】复制完成: {fileitem.name}")
return True
else:
logger.error(f"【rclone】复制失败: {fileitem.name}")
return False
except Exception as err:
logger.error(f"【rclone】复制文件失败:{err}")
return False
logger.error(f"【rclone】复制失败: {fileitem.name} - {err}")
return False
def link(self, fileitem: schemas.FileItem, target_file: Path) -> bool:
pass

View File

@@ -8,9 +8,10 @@ from smbclient import ClientConfig, register_session, reset_connection_cache
from smbprotocol.exceptions import SMBException, SMBResponseException, SMBAuthenticationError
from app import schemas
from app.core.config import settings
from app.core.config import settings, global_vars
from app.log import logger
from app.modules.filemanager import StorageBase
from app.modules.filemanager.storages import transfer_process
from app.schemas.types import StorageSchema
from app.utils.singleton import WeakSingleton
@@ -38,6 +39,9 @@ class SMB(StorageBase, metaclass=WeakSingleton):
"copy": "复制",
}
# 文件块大小默认100MB
chunk_size = 100 * 1024 * 1024
def __init__(self):
super().__init__()
self._connected = False
@@ -412,63 +416,99 @@ class SMB(StorageBase, metaclass=WeakSingleton):
def download(self, fileitem: schemas.FileItem, path: Path = None) -> Optional[Path]:
"""
下载文件
带实时进度显示的下载
"""
local_path = path or settings.TEMP_PATH / fileitem.name
smb_path = self._normalize_path(fileitem.path)
try:
self._check_connection()
smb_path = self._normalize_path(fileitem.path)
local_path = path or settings.TEMP_PATH / fileitem.name
# 确保本地目录存在
local_path.parent.mkdir(parents=True, exist_ok=True)
# 获取文件大小
file_size = fileitem.size
# 初始化进度条
logger.info(f"【SMB】开始下载: {fileitem.name} -> {local_path}")
progress_callback = transfer_process(Path(fileitem.path).as_posix())
# 使用更高效的文件传输方式
with smbclient.open_file(smb_path, mode="rb") as src_file:
with open(local_path, "wb") as dst_file:
# 使用更大的缓冲区提高性能
buffer_size = 1024 * 1024 # 1MB
downloaded_size = 0
while True:
chunk = src_file.read(buffer_size)
if global_vars.is_transfer_stopped(fileitem.path):
logger.info(f"【SMB】{fileitem.path} 下载已取消!")
return None
chunk = src_file.read(self.chunk_size)
if not chunk:
break
dst_file.write(chunk)
downloaded_size += len(chunk)
# 更新进度
if file_size:
progress = (downloaded_size * 100) / file_size
progress_callback(progress)
logger.info(f"【SMB】下载成功: {fileitem.path} -> {local_path}")
# 完成下载
progress_callback(100)
logger.info(f"【SMB】下载完成: {fileitem.name}")
return local_path
except Exception as e:
logger.error(f"【SMB】下载失败: {e}")
logger.error(f"【SMB】下载失败: {fileitem.name} - {e}")
# 删除可能部分下载的文件
if local_path.exists():
local_path.unlink()
return None
def upload(self, fileitem: schemas.FileItem, path: Path,
new_name: Optional[str] = None) -> Optional[schemas.FileItem]:
"""
上传文件
带实时进度显示的上传
"""
target_name = new_name or path.name
target_path = Path(fileitem.path) / target_name
smb_path = self._normalize_path(str(target_path))
try:
self._check_connection()
target_name = new_name or path.name
target_path = Path(fileitem.path) / target_name
smb_path = self._normalize_path(str(target_path))
# 获取文件大小
file_size = path.stat().st_size
# 初始化进度条
logger.info(f"【SMB】开始上传: {path} -> {target_path}")
progress_callback = transfer_process(path.as_posix())
# 使用更高效的文件传输方式
with open(path, "rb") as src_file:
with smbclient.open_file(smb_path, mode="wb") as dst_file:
# 使用更大的缓冲区提高性能
buffer_size = 1024 * 1024 # 1MB
uploaded_size = 0
while True:
chunk = src_file.read(buffer_size)
if global_vars.is_transfer_stopped(path.as_posix()):
logger.info(f"【SMB】{path} 上传已取消!")
return None
chunk = src_file.read(self.chunk_size)
if not chunk:
break
dst_file.write(chunk)
uploaded_size += len(chunk)
# 更新进度
if file_size:
progress = (uploaded_size * 100) / file_size
progress_callback(progress)
logger.info(f"【SMB】上传成功: {path} -> {target_path}")
# 完成上传
progress_callback(100)
logger.info(f"【SMB】上传完成: {target_name}")
# 返回上传后的文件信息
return self.get_item(target_path)
except Exception as e:
logger.error(f"【SMB】上传失败: {e}")
logger.error(f"【SMB】上传失败: {target_name} - {e}")
return None
def copy(self, fileitem: schemas.FileItem, path: Path, new_name: str) -> bool:

View File

@@ -1,6 +1,5 @@
import base64
import hashlib
import io
import secrets
import threading
import time
@@ -11,12 +10,12 @@ import oss2
import requests
from oss2 import SizedFileAdapter, determine_part_size
from oss2.models import PartInfo
from tqdm import tqdm
from app import schemas
from app.core.config import settings
from app.core.config import settings, global_vars
from app.log import logger
from app.modules.filemanager import StorageBase
from app.modules.filemanager.storages import transfer_process
from app.schemas.types import StorageSchema
from app.utils.singleton import WeakSingleton
from app.utils.string import StringUtils
@@ -44,6 +43,9 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
# 基础url
base_url = "https://proapi.115.com"
# 文件块大小默认10MB
chunk_size = 10 * 1024 * 1024
def __init__(self):
super().__init__()
self._auth_state = {}
@@ -352,29 +354,6 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
modify_time=int(time.time())
)
@staticmethod
def _log_progress(desc: str, total: int) -> tqdm:
"""
创建一个可以输出到日志的进度条
"""
class TqdmToLogger(io.StringIO):
def write(s, buf): # noqa
buf = buf.strip('\r\n\t ')
if buf:
logger.info(buf)
return tqdm(
total=total,
unit='B',
unit_scale=True,
desc=desc,
file=TqdmToLogger(),
mininterval=1.0,
maxinterval=5.0,
miniters=1
)
def upload(self, target_dir: schemas.FileItem, local_path: Path,
new_name: Optional[str] = None) -> Optional[schemas.FileItem]:
"""
@@ -539,13 +518,7 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
# 初始化进度条
logger.info(f"【115】开始上传: {local_path} -> {target_path},分片大小:{StringUtils.str_filesize(part_size)}")
progress_bar = tqdm(
total=file_size,
unit='B',
unit_scale=True,
desc="上传进度",
ascii=True
)
progress_callback = transfer_process(local_path.as_posix())
# 初始化分片
upload_id = bucket.init_multipart_upload(object_name,
@@ -559,6 +532,9 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
part_number = 1
offset = 0
while offset < file_size:
if global_vars.is_transfer_stopped(local_path.as_posix()):
logger.info(f"【115】{local_path} 上传已取消!")
return None
num_to_upload = min(part_size, file_size - offset)
# 调用SizedFileAdapter(fileobj, size)方法会生成一个新的文件对象,重新计算起始追加位置。
logger.info(f"【115】开始上传 {target_name} 分片 {part_number}: {offset} -> {offset + num_to_upload}")
@@ -569,11 +545,11 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
offset += num_to_upload
part_number += 1
# 更新进度
progress_bar.update(num_to_upload)
progress = (offset * 100) / file_size
progress_callback(progress)
# 关闭进度条
if progress_bar:
progress_bar.close()
# 完成上传
progress_callback(100)
# 请求头
headers = {
@@ -601,11 +577,13 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
def download(self, fileitem: schemas.FileItem, path: Path = None) -> Optional[Path]:
"""
限速处理的下载
实时进度显示的下载
"""
detail = self.get_item(Path(fileitem.path))
if not detail:
logger.error(f"【115】获取文件详情失败: {fileitem.name}")
return None
download_info = self._request_api(
"POST",
"/open/ufile/downurl",
@@ -615,14 +593,58 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
}
)
if not download_info:
logger.error(f"【115】获取下载链接失败: {fileitem.name}")
return None
download_url = list(download_info.values())[0].get("url", {}).get("url")
if not download_url:
logger.error(f"【115】下载链接为空: {fileitem.name}")
return None
local_path = path or settings.TEMP_PATH / fileitem.name
with self.session.get(download_url, stream=True) as r:
r.raise_for_status()
with open(local_path, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
# 获取文件大小
file_size = detail.size
# 初始化进度条
logger.info(f"【115】开始下载: {fileitem.name} -> {local_path}")
progress_callback = transfer_process(Path(fileitem.path).as_posix())
try:
with self.session.get(download_url, stream=True) as r:
r.raise_for_status()
downloaded_size = 0
with open(local_path, "wb") as f:
for chunk in r.iter_content(chunk_size=self.chunk_size):
if global_vars.is_transfer_stopped(fileitem.path):
logger.info(f"【115】{fileitem.path} 下载已取消!")
return None
if chunk:
f.write(chunk)
downloaded_size += len(chunk)
# 更新进度
if file_size:
progress = (downloaded_size * 100) / file_size
progress_callback(progress)
# 完成下载
progress_callback(100)
logger.info(f"【115】下载完成: {fileitem.name}")
except requests.exceptions.RequestException as e:
logger.error(f"【115】下载网络错误: {fileitem.name} - {str(e)}")
# 删除可能部分下载的文件
if local_path.exists():
local_path.unlink()
return None
except Exception as e:
logger.error(f"【115】下载失败: {fileitem.name} - {str(e)}")
# 删除可能部分下载的文件
if local_path.exists():
local_path.unlink()
return None
return local_path
def check(self) -> bool:

View File

@@ -118,15 +118,17 @@ class QbittorrentModule(_ModuleBase, _DownloaderBase[Qbittorrent]):
if content.exists():
torrent_content = content.read_bytes()
else:
# 缓存处理器
cache_backend = FileCache()
# 读取缓存的种子文件
torrent_content = cache_backend.get(content.as_posix(), region="torrents")
torrent_content = FileCache().get(content.as_posix(), region="torrents")
else:
torrent_content = content
if torrent_content:
torrent_info = Torrent.from_string(torrent_content)
# 检查是否为磁力链接
if StringUtils.is_magnet_link(torrent_content):
return None, torrent_content
else:
torrent_info = Torrent.from_string(torrent_content)
return torrent_info, torrent_content
except Exception as e:
@@ -138,7 +140,11 @@ class QbittorrentModule(_ModuleBase, _DownloaderBase[Qbittorrent]):
# 读取种子的名称
torrent, content = __get_torrent_info()
if not torrent:
# 检查是否为磁力链接
is_magnet = isinstance(content, str) and content.startswith("magnet:") or isinstance(content,
bytes) and content.startswith(
b"magnet:")
if not torrent and not is_magnet:
return None, None, None, f"添加种子任务失败:无法读取种子文件"
# 获取下载器

View File

@@ -639,6 +639,8 @@ class TheMovieDbModule(_ModuleBase):
"""
搜索人物信息
"""
if settings.SEARCH_SOURCE and "themoviedb" not in settings.SEARCH_SOURCE:
return None
if not name:
return []
results = self.tmdb.search_persons(name)
@@ -646,6 +648,19 @@ class TheMovieDbModule(_ModuleBase):
return [MediaPerson(source='themoviedb', **person) for person in results]
return []
async def async_search_persons(self, name: str) -> Optional[List[MediaPerson]]:
"""
异步搜索人物信息
"""
if settings.SEARCH_SOURCE and "themoviedb" not in settings.SEARCH_SOURCE:
return None
if not name:
return []
results = await self.tmdb.async_search_persons(name)
if results:
return [MediaPerson(source='themoviedb', **person) for person in results]
return []
def search_collections(self, name: str) -> Optional[List[MediaInfo]]:
"""
搜索集合信息

View File

@@ -119,15 +119,17 @@ class TransmissionModule(_ModuleBase, _DownloaderBase[Transmission]):
if content.exists():
torrent_content = content.read_bytes()
else:
# 缓存处理器
cache_backend = FileCache()
# 读取缓存的种子文件
torrent_content = cache_backend.get(content.as_posix(), region="torrents")
torrent_content = FileCache().get(content.as_posix(), region="torrents")
else:
torrent_content = content
if torrent_content:
torrent_info = Torrent.from_string(torrent_content)
# 检查是否为磁力链接
if StringUtils.is_magnet_link(torrent_content):
return None, torrent_content
else:
torrent_info = Torrent.from_string(torrent_content)
return torrent_info, torrent_content
except Exception as e:
@@ -139,7 +141,11 @@ class TransmissionModule(_ModuleBase, _DownloaderBase[Transmission]):
# 读取种子的名称
torrent, content = __get_torrent_info()
if not torrent:
# 检查是否为磁力链接
is_magnet = isinstance(content, str) and content.startswith("magnet:") or isinstance(content,
bytes) and content.startswith(
b"magnet:")
if not torrent and not is_magnet:
return None, None, None, f"添加种子任务失败:无法读取种子文件"
# 获取下载器

View File

@@ -10,7 +10,7 @@ from threading import Lock
from typing import Any, Optional, Dict, List
from apscheduler.schedulers.background import BackgroundScheduler
from app.core.cache import TTLCache
from app.core.cache import TTLCache, FileCache
from watchdog.events import FileSystemEventHandler, FileSystemMovedEvent, FileSystemEvent
from watchdog.observers.polling import PollingObserver
@@ -67,17 +67,14 @@ class Monitor(metaclass=Singleton):
self._observers = []
# 定时服务
self._scheduler = None
# 存储快照缓存目录
self._snapshot_cache_dir = None
# 存储过照间隔(分钟)
self._snapshot_interval = 5
# TTL缓存10秒钟有效
self._cache = TTLCache(region="monitor", maxsize=1024, ttl=10)
# 快照文件缓存
self._snapshot_cache = FileCache(base=settings.CACHE_PATH / "snapshots")
# 监控的文件扩展名
self.all_exts = settings.RMT_MEDIAEXT
# 初始化快照缓存目录
self._snapshot_cache_dir = settings.TEMP_PATH / "snapshots"
self._snapshot_cache_dir.mkdir(exist_ok=True)
# 启动目录监控和文件整理
self.init()
@@ -98,14 +95,13 @@ class Monitor(metaclass=Singleton):
def save_snapshot(self, storage: str, snapshot: Dict, file_count: int = 0,
last_snapshot_time: Optional[float] = None):
"""
保存快照到文件
保存快照到文件缓存
:param storage: 存储名称
:param snapshot: 快照数据
:param last_snapshot_time: 上次快照时间戳
:param file_count: 文件数量,用于调整监控间隔
"""
try:
cache_file = self._snapshot_cache_dir / f"{storage}_snapshot.json"
snapshot_time = max((item.get('modify_time', 0) for item in snapshot.values()), default=None)
if snapshot_time is None:
snapshot_time = last_snapshot_time or time.time()
@@ -114,9 +110,11 @@ class Monitor(metaclass=Singleton):
'file_count': file_count,
'snapshot': snapshot
}
with open(cache_file, 'w', encoding='utf-8') as f:
json.dump(snapshot_data, f, ensure_ascii=False, indent=2) # noqa
logger.debug(f"快照已保存到 {cache_file}")
# 使用FileCache保存快照数据
cache_key = f"{storage}_snapshot"
snapshot_json = json.dumps(snapshot_data, ensure_ascii=False, indent=2)
self._snapshot_cache.set(cache_key, snapshot_json.encode('utf-8'), region="snapshots")
logger.debug(f"快照已保存到缓存: {storage}")
except Exception as e:
logger.error(f"保存快照失败: {e}")
@@ -127,9 +125,9 @@ class Monitor(metaclass=Singleton):
:return: 是否成功
"""
try:
cache_file = self._snapshot_cache_dir / f"{storage}_snapshot.json"
if cache_file.exists():
cache_file.unlink()
cache_key = f"{storage}_snapshot"
if self._snapshot_cache.exists(cache_key, region="snapshots"):
self._snapshot_cache.delete(cache_key, region="snapshots")
logger.info(f"快照已重置: {storage}")
return True
logger.debug(f"快照文件不存在,无需重置: {storage}")
@@ -187,18 +185,18 @@ class Monitor(metaclass=Singleton):
def load_snapshot(self, storage: str) -> Optional[Dict]:
"""
从文件加载快照
从文件缓存加载快照
:param storage: 存储名称
:return: 快照数据或None
"""
try:
cache_file = self._snapshot_cache_dir / f"{storage}_snapshot.json"
if cache_file.exists():
with open(cache_file, 'r', encoding='utf-8') as f:
data = json.load(f)
logger.debug(f"成功加载快照: {cache_file}, 包含 {len(data.get('snapshot', {}))} 个文件")
return data
logger.debug(f"快照文件不存在: {cache_file}")
cache_key = f"{storage}_snapshot"
snapshot_data = self._snapshot_cache.get(cache_key, region="snapshots")
if snapshot_data:
data = json.loads(snapshot_data.decode('utf-8'))
logger.debug(f"成功加载快照: {storage}, 包含 {len(data.get('snapshot', {}))} 个文件")
return data
logger.debug(f"快照文件不存在: {storage}")
return None
except Exception as e:
logger.error(f"加载快照失败: {e}")
@@ -793,4 +791,6 @@ class Monitor(metaclass=Singleton):
self._scheduler = None
if self._cache:
self._cache.close()
if self._snapshot_cache:
self._snapshot_cache.close()
self._event.clear()

View File

@@ -1,4 +1,4 @@
from typing import Optional, List
from typing import Optional, List, Dict, Any
from pydantic import BaseModel, Field
@@ -67,3 +67,17 @@ class PluginDashboard(Plugin):
cols: Optional[dict] = Field(default_factory=dict)
# 页面元素
elements: Optional[List[dict]] = Field(default_factory=list)
class PluginMemoryInfo(BaseModel):
"""插件内存信息"""
plugin_id: str = Field(description="插件ID")
plugin_name: str = Field(description="插件名称")
plugin_version: str = Field(description="插件版本")
total_memory_bytes: int = Field(description="总内存使用量(字节)")
total_memory_mb: float = Field(description="总内存使用量(MB)")
object_count: int = Field(description="对象数量")
calculation_time_ms: float = Field(description="计算耗时(毫秒)")
timestamp: float = Field(description="统计时间戳")
error: Optional[str] = Field(default=None, description="错误信息")
object_details: Optional[List[Dict[str, Any]]] = Field(default=None, description="大对象详情")

178
app/utils/memory.py Normal file
View File

@@ -0,0 +1,178 @@
import sys
import time
from collections import deque
from typing import Any, Dict, Set
from app.log import logger
class MemoryCalculator:
"""
内存计算器,用于递归计算对象的内存占用
"""
def __init__(self):
# 缓存已计算的对象ID避免重复计算
self._calculated_ids: Set[int] = set()
# 最大递归深度,防止无限递归
self._max_depth = 10
# 最大对象数量,防止计算过多对象
self._max_objects = 10000
def calculate_object_memory(self, obj: Any, max_depth: int = None, max_objects: int = None) -> Dict[str, Any]:
"""
计算对象的内存占用
:param obj: 要计算的对象
:param max_depth: 最大递归深度
:param max_objects: 最大对象数量
:return: 内存统计信息
"""
if max_depth is None:
max_depth = self._max_depth
if max_objects is None:
max_objects = self._max_objects
# 重置缓存
self._calculated_ids.clear()
start_time = time.time()
object_details = []
try:
# 递归计算内存
memory_info = self._calculate_recursive(obj, depth=0, max_depth=max_depth,
max_objects=max_objects, object_count=0)
total_memory = memory_info['total_memory']
object_count = memory_info['object_count']
object_details = memory_info['object_details']
except Exception as e:
logger.error(f"计算对象内存时出错:{str(e)}")
total_memory = 0
object_count = 0
calculation_time = time.time() - start_time
return {
'total_memory_bytes': total_memory,
'total_memory_mb': round(total_memory / (1024 * 1024), 2),
'object_count': object_count,
'calculation_time_ms': round(calculation_time * 1000, 2),
'object_details': object_details[:10] # 只返回前10个最大的对象
}
def _calculate_recursive(self, obj: Any, depth: int, max_depth: int,
max_objects: int, object_count: int) -> Dict[str, Any]:
"""
递归计算对象内存
"""
if depth > max_depth or object_count > max_objects:
return {
'total_memory': 0,
'object_count': object_count,
'object_details': []
}
total_memory = 0
object_details = []
# 获取对象ID避免重复计算
obj_id = id(obj)
if obj_id in self._calculated_ids:
return {
'total_memory': 0,
'object_count': object_count,
'object_details': []
}
self._calculated_ids.add(obj_id)
object_count += 1
try:
# 计算对象本身的内存
obj_memory = sys.getsizeof(obj)
total_memory += obj_memory
# 记录大对象
if obj_memory > 1024: # 大于1KB的对象
object_details.append({
'type': type(obj).__name__,
'memory_bytes': obj_memory,
'memory_mb': round(obj_memory / (1024 * 1024), 2),
'depth': depth
})
# 递归计算容器对象的内容
if depth < max_depth:
container_memory = self._calculate_container_memory(
obj, depth + 1, max_depth, max_objects, object_count
)
total_memory += container_memory['total_memory']
object_count = container_memory['object_count']
object_details.extend(container_memory['object_details'])
except Exception as e:
logger.debug(f"计算对象 {type(obj).__name__} 内存时出错:{str(e)}")
return {
'total_memory': total_memory,
'object_count': object_count,
'object_details': object_details
}
def _calculate_container_memory(self, obj: Any, depth: int, max_depth: int,
max_objects: int, object_count: int) -> Dict[str, Any]:
"""
计算容器对象的内存
"""
total_memory = 0
object_details = []
try:
# 处理不同类型的容器
if isinstance(obj, (list, tuple, deque)):
for item in obj:
if object_count > max_objects:
break
item_memory = self._calculate_recursive(item, depth, max_depth, max_objects, object_count)
total_memory += item_memory['total_memory']
object_count = item_memory['object_count']
object_details.extend(item_memory['object_details'])
elif isinstance(obj, dict):
for key, value in obj.items():
if object_count > max_objects:
break
# 计算key的内存
key_memory = self._calculate_recursive(key, depth, max_depth, max_objects, object_count)
total_memory += key_memory['total_memory']
object_count = key_memory['object_count']
object_details.extend(key_memory['object_details'])
# 计算value的内存
value_memory = self._calculate_recursive(value, depth, max_depth, max_objects, object_count)
total_memory += value_memory['total_memory']
object_count = value_memory['object_count']
object_details.extend(value_memory['object_details'])
elif hasattr(obj, '__dict__'):
# 处理有__dict__属性的对象
for attr_name, attr_value in obj.__dict__.items():
if object_count > max_objects:
break
# 跳过一些特殊属性
if attr_name.startswith('_') and attr_name not in ['_calculated_ids']:
continue
attr_memory = self._calculate_recursive(attr_value, depth, max_depth, max_objects, object_count)
total_memory += attr_memory['total_memory']
object_count = attr_memory['object_count']
object_details.extend(attr_memory['object_details'])
except Exception as e:
logger.debug(f"计算容器对象 {type(obj).__name__} 内存时出错:{str(e)}")
return {
'total_memory': total_memory,
'object_count': object_count,
'object_details': object_details
}

View File

@@ -229,7 +229,7 @@ class StringUtils:
size = float(size)
d = [(1024 - 1, 'K'), (1024 ** 2 - 1, 'M'), (1024 ** 3 - 1, 'G'), (1024 ** 4 - 1, 'T')]
s = [x[0] for x in d]
index = bisect.bisect_left(s, size) - 1 # noqa
index = bisect.bisect_left(s, size) - 1 # noqa
if index == -1:
return str(size) + "B"
else:
@@ -925,3 +925,16 @@ class StringUtils:
if re.match(r'^[a-zA-Z0-9.-]+(\.[a-zA-Z]{2,})?$', text):
return True
return False
@staticmethod
def is_magnet_link(content: Union[str, bytes]) -> bool:
"""
判断内容是否为磁力链接
"""
if not content:
return False
if isinstance(content, str) and content.startswith("magnet:"):
return True
if isinstance(content, bytes) and content.startswith(b"magnet:"):
return True
return False

View File

@@ -21,7 +21,11 @@ depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
# 站点数据统计增加站点名称
with contextlib.suppress(Exception):
conn = op.get_bind()
inspector = sa.inspect(conn)
columns = inspector.get_columns('siteuserdata')
# 检查 'name' 字段是否已存在
if not any(c['name'] == 'name' for c in columns):
op.add_column('siteuserdata', sa.Column('name', sa.String(), nullable=True))
# ### end Alembic commands ###

View File

@@ -18,19 +18,18 @@ depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with contextlib.suppress(Exception):
# 添加触发类型字段
conn = op.get_bind()
inspector = sa.inspect(conn)
columns = inspector.get_columns('workflow')
if not any(c['name'] == 'trigger_type' for c in columns):
op.add_column('workflow', sa.Column('trigger_type', sa.String(), nullable=True, default='timer'))
with contextlib.suppress(Exception):
# 添加事件类型字段
if not any(c['name'] == 'event_type' for c in columns):
op.add_column('workflow', sa.Column('event_type', sa.String(), nullable=True))
with contextlib.suppress(Exception):
# 添加事件条件字段
if not any(c['name'] == 'event_conditions' for c in columns):
op.add_column('workflow', sa.Column('event_conditions', sa.JSON(), nullable=True, default={}))
# ### end Alembic commands ###
def downgrade() -> None:

View File

@@ -19,13 +19,28 @@ depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with contextlib.suppress(Exception):
conn = op.get_bind()
inspector = sa.inspect(conn)
# 检查并添加 downloadhistory.episode_group
dh_columns = inspector.get_columns('downloadhistory')
if not any(c['name'] == 'episode_group' for c in dh_columns):
op.add_column('downloadhistory', sa.Column('episode_group', sa.String, nullable=True))
# 检查并添加 subscribe.episode_group
s_columns = inspector.get_columns('subscribe')
if not any(c['name'] == 'episode_group' for c in s_columns):
op.add_column('subscribe', sa.Column('episode_group', sa.String, nullable=True))
# 检查并添加 subscribehistory.episode_group
sh_columns = inspector.get_columns('subscribehistory')
if not any(c['name'] == 'episode_group' for c in sh_columns):
op.add_column('subscribehistory', sa.Column('episode_group', sa.String, nullable=True))
# 检查并添加 transferhistory.episode_group
th_columns = inspector.get_columns('transferhistory')
if not any(c['name'] == 'episode_group' for c in th_columns):
op.add_column('transferhistory', sa.Column('episode_group', sa.String, nullable=True))
# ### end Alembic commands ###
def downgrade() -> None:

View File

@@ -18,11 +18,11 @@ depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
# 整理历史记录 增加下载器字段
with contextlib.suppress(Exception):
conn = op.get_bind()
inspector = sa.inspect(conn)
columns = inspector.get_columns('transferhistory')
if not any(c['name'] == 'downloader' for c in columns):
op.add_column('transferhistory', sa.Column('downloader', sa.String(), nullable=True))
# ### end Alembic commands ###
def downgrade() -> None:

View File

@@ -8,6 +8,7 @@ Create Date: 2025-08-19 12:27:08.451371
import sqlalchemy as sa
from alembic import op
from app.log import logger
from app.core.config import settings
# revision identifiers, used by Alembic.
@@ -41,7 +42,7 @@ def fix_postgresql_sequences():
"""))
tables = [row[0] for row in result.fetchall()]
print(f"发现 {len(tables)} 个表需要检查序列")
logger.info(f"发现 {len(tables)} 个表需要检查序列")
for table_name in tables:
fix_table_sequence(connection, table_name)
@@ -54,7 +55,7 @@ def fix_table_sequence(connection, table_name):
try:
# 跳过alembic_version表它没有id列
if table_name == 'alembic_version':
print(f"跳过表 {table_name}这是Alembic版本表")
logger.debug(f"跳过表 {table_name}这是Alembic版本表")
return
# 检查表是否有id列
@@ -67,22 +68,22 @@ def fix_table_sequence(connection, table_name):
id_column = result.fetchone()
if not id_column:
print(f"{table_name} 没有id列跳过")
logger.debug(f"{table_name} 没有id列跳过")
return
is_identity, column_default = id_column
# 检查是否已经是Identity类型
if is_identity == 'YES' or (column_default and 'GENERATED BY DEFAULT AS IDENTITY' in column_default):
print(f"{table_name} 的id列已经是Identity类型跳过")
logger.debug(f"{table_name} 的id列已经是Identity类型跳过")
return
# 检查是否有序列
print(f"{table_name} 存在序列,需要修复")
logger.info(f"{table_name} 存在序列,需要修复")
convert_to_identity(connection, table_name)
except Exception as e:
print(f"修复表 {table_name} 序列时出错: {e}")
logger.error(f"修复表 {table_name} 序列时出错: {e}")
# 回滚当前事务,避免影响后续操作
connection.rollback()
@@ -106,12 +107,12 @@ def convert_to_identity(connection, table_name):
ALTER COLUMN id ADD GENERATED BY DEFAULT AS IDENTITY (START WITH {next_value})
"""))
print(f"{table_name} 序列已转换为Identity起始值为 {next_value}")
logger.info(f"{table_name} 序列已转换为Identity起始值为 {next_value}")
except Exception as e:
print(f"转换表 {table_name} 序列时出错: {e}")
# 如果是已经存在的Identity错误则忽略
if "already an identity column" in str(e):
print(f"{table_name} 的id列已经是Identity类型忽略此错误")
logger.warn(f"{table_name} 的id列已经是Identity类型忽略此错误: {e}")
return
logger.error(f"转换表 {table_name} 序列时出错: {e}")
raise

View File

@@ -19,10 +19,11 @@ depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with contextlib.suppress(Exception):
conn = op.get_bind()
inspector = sa.inspect(conn)
columns = inspector.get_columns('workflow')
if not any(c['name'] == 'flows' for c in columns):
op.add_column('workflow', sa.Column('flows', sa.JSON(), nullable=True))
# ### end Alembic commands ###
def downgrade() -> None:

View File

@@ -18,11 +18,11 @@ depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
# 下载历史记录 增加下载器字段
with contextlib.suppress(Exception):
conn = op.get_bind()
inspector = sa.inspect(conn)
columns = inspector.get_columns('downloadhistory')
if not any(c['name'] == 'downloader' for c in columns):
op.add_column('downloadhistory', sa.Column('downloader', sa.String(), nullable=True))
# ### end Alembic commands ###
def downgrade() -> None:

View File

@@ -18,13 +18,23 @@ depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
# 订阅增加mediaid
with contextlib.suppress(Exception):
conn = op.get_bind()
inspector = sa.inspect(conn)
# 检查并添加 subscribe.mediaid
s_columns = inspector.get_columns('subscribe')
if not any(c['name'] == 'mediaid' for c in s_columns):
op.add_column('subscribe', sa.Column('mediaid', sa.String(), nullable=True))
# 检查并创建索引
s_indexes = inspector.get_indexes('subscribe')
if not any(i['name'] == 'ix_subscribe_mediaid' for i in s_indexes):
op.create_index('ix_subscribe_mediaid', 'subscribe', ['mediaid'], unique=False)
# 检查并添加 subscribehistory.mediaid
sh_columns = inspector.get_columns('subscribehistory')
if not any(c['name'] == 'mediaid' for c in sh_columns):
op.add_column('subscribehistory', sa.Column('mediaid', sa.String(), nullable=True))
# ### end Alembic commands ###
def downgrade() -> None:

View File

@@ -10,6 +10,7 @@ import contextlib
from alembic import op
import sqlalchemy as sa
from app.log import logger
from app.db import SessionFactory
from app.db.models import UserConfig
@@ -21,28 +22,58 @@ depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
# 支持订阅自定义媒体类别和过滤规则组、自定义识别词
with contextlib.suppress(Exception):
conn = op.get_bind()
inspector = sa.inspect(conn)
# 检查并添加 downloadhistory.media_category
dh_columns = inspector.get_columns('downloadhistory')
if not any(c['name'] == 'media_category' for c in dh_columns):
op.add_column('downloadhistory', sa.Column('media_category', sa.String(), nullable=True))
# 检查并添加 subscribe 表的列
sub_columns = inspector.get_columns('subscribe')
if not any(c['name'] == 'custom_words' for c in sub_columns):
op.add_column('subscribe', sa.Column('custom_words', sa.String(), nullable=True))
if not any(c['name'] == 'media_category' for c in sub_columns):
op.add_column('subscribe', sa.Column('media_category', sa.String(), nullable=True))
if not any(c['name'] == 'filter_groups' for c in sub_columns):
op.add_column('subscribe', sa.Column('filter_groups', sa.JSON(), nullable=True))
# 将String转换为JSON类型
with contextlib.suppress(Exception):
op.alter_column('subscribe', 'note', existing_type=sa.String(), type_=sa.JSON())
op.alter_column('downloadhistory', 'note', existing_type=sa.String(), type_=sa.JSON())
op.alter_column('mediaserveritem', 'note', existing_type=sa.String(), type_=sa.JSON())
op.alter_column('message', 'note', existing_type=sa.String(), type_=sa.JSON())
op.alter_column('plugindata', 'value', existing_type=sa.String(), type_=sa.JSON())
op.alter_column('site', 'note', existing_type=sa.String(), type_=sa.JSON())
op.alter_column('sitestatistic', 'note', existing_type=sa.String(), type_=sa.JSON())
op.alter_column('systemconfig', 'value', existing_type=sa.String(), type_=sa.JSON())
op.alter_column('userconfig', 'value', existing_type=sa.String(), type_=sa.JSON())
# 清空用户配置表中不兼容的数据
# 定义需要检查和转换的表和列
columns_to_alter = {
'subscribe': 'note',
'downloadhistory': 'note',
'mediaserveritem': 'note',
'message': 'note',
'plugindata': 'value',
'site': 'note',
'sitestatistic': 'note',
'systemconfig': 'value',
'userconfig': 'value'
}
for table, column_name in columns_to_alter.items():
try:
cols = inspector.get_columns(table)
# 找到对应的列信息
target_col = next((c for c in cols if c['name'] == column_name), None)
# 如果列存在且类型不是JSON则进行修改
if target_col and not isinstance(target_col['type'], sa.JSON):
# PostgreSQL需要指定USING子句来处理类型转换
if conn.dialect.name == 'postgresql':
op.alter_column(table, column_name,
existing_type=sa.String(),
type_=sa.JSON(),
postgresql_using=f'"{column_name}"::json')
else:
op.alter_column(table, column_name,
existing_type=sa.String(),
type_=sa.JSON())
except Exception as e:
logger.error(f"Could not alter column {column_name} in table {table}: {e}")
with SessionFactory() as db:
UserConfig.truncate(db)
# ### end Alembic commands ###
def downgrade() -> None:

View File

@@ -18,14 +18,19 @@ depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
# 站点管理、订阅增加下载器选项
with contextlib.suppress(Exception):
conn = op.get_bind()
inspector = sa.inspect(conn)
# 检查并添加 site.downloader
site_columns = inspector.get_columns('site')
if not any(c['name'] == 'downloader' for c in site_columns):
op.add_column('site', sa.Column('downloader', sa.String(), nullable=True))
# 检查并添加 subscribe.downloader
subscribe_columns = inspector.get_columns('subscribe')
if not any(c['name'] == 'downloader' for c in subscribe_columns):
op.add_column('subscribe', sa.Column('downloader', sa.String(), nullable=True))
# ### end Alembic commands ###
def downgrade() -> None:
pass

View File

@@ -10,6 +10,8 @@ import contextlib
from alembic import op
import sqlalchemy as sa
from app.log import logger
# revision identifiers, used by Alembic.
revision = 'ecf3c693fdf3'
@@ -19,15 +21,35 @@ depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
# 将String转换为JSON类型
with contextlib.suppress(Exception):
op.alter_column('subscribehistory', 'sites', existing_type=sa.String(), type_=sa.JSON())
with contextlib.suppress(Exception):
op.add_column('subscribehistory', sa.Column('custom_words', sa.String(), nullable=True))
op.add_column('subscribehistory', sa.Column('media_category', sa.String(), nullable=True))
op.add_column('subscribehistory', sa.Column('filter_groups', sa.JSON(), nullable=True))
# ### end Alembic commands ###
conn = op.get_bind()
inspector = sa.inspect(conn)
table_name = 'subscribehistory'
columns = inspector.get_columns(table_name)
try:
sites_col = next((c for c in columns if c['name'] == 'sites'), None)
# 如果 'sites' 列存在且类型不是 JSON则进行修改
if sites_col and not isinstance(sites_col['type'], sa.JSON):
if conn.dialect.name == 'postgresql':
op.alter_column(table_name, 'sites',
existing_type=sa.String(),
type_=sa.JSON(),
postgresql_using='sites::json')
else:
op.alter_column(table_name, 'sites',
existing_type=sa.String(),
type_=sa.JSON())
except Exception as e:
logger.error(f"Could not alter column 'sites' in table {table_name}: {e}")
if not any(c['name'] == 'custom_words' for c in columns):
op.add_column(table_name, sa.Column('custom_words', sa.String(), nullable=True))
if not any(c['name'] == 'media_category' for c in columns):
op.add_column(table_name, sa.Column('media_category', sa.String(), nullable=True))
if not any(c['name'] == 'filter_groups' for c in columns):
op.add_column(table_name, sa.Column('filter_groups', sa.JSON(), nullable=True))
def downgrade() -> None:

View File

@@ -45,59 +45,6 @@ DB_POSTGRESQL_MAX_OVERFLOW=30
## Docker 部署
### 使用内置 PostgreSQL
如果您使用 Docker 部署MoviePilot 容器内置了 PostgreSQL 服务:
#### 使用 Docker Compose推荐
1. 创建 `docker-compose.yml` 文件:
```yaml
version: '3.8'
services:
moviepilot:
image: jxxghp/moviepilot:latest
container_name: moviepilot
restart: unless-stopped
ports:
- "3000:3000" # 前端端口
- "3001:3001" # API端口
environment:
- DB_TYPE=postgresql
- DB_POSTGRESQL_HOST=localhost
- DB_POSTGRESQL_PORT=5432
- DB_POSTGRESQL_DATABASE=moviepilot
- DB_POSTGRESQL_USERNAME=moviepilot
- DB_POSTGRESQL_PASSWORD=moviepilot
volumes:
- ./config:/config
```
2. 启动服务:
```bash
docker-compose up -d
```
#### 使用 Docker 命令
1. 设置环境变量:
```bash
DB_TYPE=postgresql
```
2. 启动容器时PostgreSQL 服务会自动:
- 在配置目录下创建 `postgresql/` 子目录作为数据目录
- 初始化 PostgreSQL 数据目录
- 启动 PostgreSQL 服务
- 创建数据库和用户
- 配置连接权限
3. 数据持久化:
- PostgreSQL 数据存储在 `${CONFIG_DIR}/postgresql/` 目录中
- 日志文件存储在 `${CONFIG_DIR}/postgresql/logs/` 目录中
- 这些目录会通过 Docker 卷映射持久化保存
### 使用外部 PostgreSQL
如果您想使用外部的 PostgreSQL 服务:
@@ -122,6 +69,36 @@ DB_POSTGRESQL_PASSWORD=your-password
3. 启动应用,数据库表会自动创建
4. 使用数据库迁移工具或手动导入数据
#### 注意事项
完成数据迁移后需要对postgresql中的表进行索引初始值进行更新否则会出现唯一索引已存在的异常
例如:
```json
EventType.SiteUpdated
SiteChain.cache_site_userdata
(psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "siteuserdata_pkey"
DETAIL: Key (id)=(18) already exists.
[SQL: INSERT INTO siteuserdata (domain, name, username, userid, user_level, join_at, bonus, upload, download, ratio, seeding, leeching, seeding_size, leeching_size, seeding_info, message_unread, message_unread_contents, err_msg, updated_day, updated_time) VALUES (%(domain)s, %(name)s, %(username)s, %(userid)s, %(user_level)s, %(join_at)s, %(bonus)s, %(upload)s, %(download)s, %(ratio)s, %(seeding)s, %(leeching)s, %(seeding_size)s, %(leeching_size)s, %(seeding_info)s::JSON, %(message_unread)s, %(message_unread_contents)s::JSON, %(err_msg)s, %(updated_day)s, %(updated_time)s) RETURNING siteuserdata.id]
[parameters: {'domain': 'btschool.club', 'name': '', 'username': None, 'userid': None, 'user_level': None, 'join_at': None, 'bonus': 0.0, 'upload': 0, 'download': 0, 'ratio': 0.0, 'seeding': 0, 'leeching': 0, 'seeding_size': 0, 'leeching_size': 0, 'seeding_info': '[]', 'message_unread': 0, 'message_unread_contents': '[]', 'err_msg': 'cookies', 'updated_day': '2025-08-22', 'updated_time': '09:52:01'}]
(Background on this error at: https://sqlalche.me/e/20/gkpj)
```
需要对每一个表分别执行下面的语句(下面的SQL以`workflowc`数据表为例,每张表请自行修改,其中`user`表因为关键字原因,应该写成`public.user`的方式)
```sql
DO $$
DECLARE
max_id INTEGER;
BEGIN
-- 查询最大 ID 值
SELECT COALESCE(MAX(id), 0) INTO max_id FROM workflow;
-- 调整序列
EXECUTE format('ALTER SEQUENCE workflow_id_seq RESTART WITH %s', max_id + 1);
END $$;
```
### 从 PostgreSQL 迁移到 SQLite
1. 导出 PostgreSQL 数据

View File

@@ -1,2 +1,2 @@
APP_VERSION = 'v2.7.4'
FRONTEND_VERSION = 'v2.7.4'
APP_VERSION = 'v2.7.6'
FRONTEND_VERSION = 'v2.7.6'