Compare commits

...

41 Commits

Author SHA1 Message Date
jxxghp
2552219991 更新 version.py 2025-08-28 11:11:32 +08:00
jxxghp
a038b698d7 fix haidan 2025-08-28 09:36:19 +08:00
jxxghp
a3b222574e add thetvdb cache 2025-08-28 08:05:10 +08:00
jxxghp
e0cd467293 rollback fix #4856 2025-08-28 07:51:05 +08:00
jxxghp
9c056030d2 fix:捕促115&alipan请求异常 2025-08-27 20:21:06 +08:00
jxxghp
19efa9d4cc fix #4795 2025-08-27 16:15:45 +08:00
jxxghp
90633a6495 fix #4851 2025-08-27 15:57:43 +08:00
jxxghp
edc432fbd8 fix #4846 2025-08-27 12:45:23 +08:00
jxxghp
1b7bdbf516 fix #4834 2025-08-27 08:28:16 +08:00
jxxghp
8c1be70c85 更新 version.py 2025-08-26 12:20:16 +08:00
jxxghp
b8e0c0db9e feat:精细化事件错误 2025-08-26 08:41:47 +08:00
jxxghp
7b7fb6cc82 Merge pull request #4836 from jxxghp/cursor/alter-siteuser-data-userid-to-character-type-9f4d 2025-08-25 22:05:19 +08:00
Cursor Agent
62512ba215 Remove SQLite-specific migration code for userid field
Co-authored-by: jxxghp <jxxghp@live.cn>
2025-08-25 14:00:33 +00:00
Cursor Agent
e1beb64c01 Simplify userid conversion to integer in Synology Chat module
Co-authored-by: jxxghp <jxxghp@live.cn>
2025-08-25 13:58:15 +00:00
Cursor Agent
c81f26ddad Remove downgrade methods for PostgreSQL and SQLite userid migration
Co-authored-by: jxxghp <jxxghp@live.cn>
2025-08-25 13:56:21 +00:00
Cursor Agent
340114c2a1 Remove migration README after completing SiteUserData userid type migration
Co-authored-by: jxxghp <jxxghp@live.cn>
2025-08-25 13:54:58 +00:00
Cursor Agent
cd7767b331 Checkpoint before follow-up message
Co-authored-by: jxxghp <jxxghp@live.cn>
2025-08-25 13:54:48 +00:00
Cursor Agent
25289dad8a Migrate SiteUserData userid field from Integer to String type
Co-authored-by: jxxghp <jxxghp@live.cn>
2025-08-25 13:50:58 +00:00
jxxghp
47c6917129 remove _check_restart_policy 2025-08-25 21:30:53 +08:00
jxxghp
6379cda148 fix 异步定时服务 2025-08-25 21:19:07 +08:00
jxxghp
91a124ab8f fix 异步定时服务 2025-08-25 20:44:38 +08:00
jxxghp
2357a7135e fix run_async 2025-08-25 17:46:06 +08:00
jxxghp
da0b3b3de9 fix:日历缓存 2025-08-25 16:46:10 +08:00
jxxghp
6664fb1716 feat:增加插件和日历的自动缓存 2025-08-25 16:37:02 +08:00
jxxghp
1206f24fa9 修复缓存迭代时的并发问题 2025-08-25 13:11:44 +08:00
jxxghp
ffb5823e84 fix #4829 优化模块导入逻辑,增加对 Async 类的特殊处理 2025-08-25 08:14:43 +08:00
jxxghp
d45a7fb262 更新 version.py 2025-08-24 19:59:31 +08:00
jxxghp
918d192c0f OpenList自动延迟重试获取文件项 2025-08-24 19:47:00 +08:00
jxxghp
f7cd6eac50 feat:整理手动中止功能 2025-08-24 19:17:41 +08:00
jxxghp
88f4428ff0 fix bug 2025-08-24 17:07:45 +08:00
jxxghp
069ea22ba2 fix bug 2025-08-24 16:55:37 +08:00
jxxghp
8fac8c5307 fix progress step 2025-08-24 16:33:44 +08:00
jxxghp
2285befebb fix cache set 2025-08-24 16:10:48 +08:00
jxxghp
1cd0648e4e fix cache set 2025-08-24 15:36:56 +08:00
jxxghp
0b7ba285c6 fix:优雅停止超时处理 2025-08-24 13:07:52 +08:00
jxxghp
30446c4526 fix cache is_redis 2025-08-24 12:27:14 +08:00
jxxghp
9b843c9ed2 fix:整理记录登记 2025-08-24 12:19:12 +08:00
jxxghp
2ce1c3bef8 feat:整理进度登记 2025-08-24 12:04:05 +08:00
jxxghp
e463094dc7 feat:整理进度 2025-08-24 09:21:55 +08:00
jxxghp
71a9fe10f4 refactor ProgressHelper 2025-08-24 09:02:55 +08:00
jxxghp
ba146e13ef fix 优化cache模块声明 2025-08-24 08:36:37 +08:00
37 changed files with 1422 additions and 664 deletions

View File

@@ -171,15 +171,14 @@ def rename(fileitem: schemas.FileItem,
sub_files: List[schemas.FileItem] = StorageChain().list_files(fileitem)
if sub_files:
# 开始进度
progress = ProgressHelper()
progress.start(ProgressKey.BatchRename)
progress = ProgressHelper(ProgressKey.BatchRename)
progress.start()
total = len(sub_files)
handled = 0
for sub_file in sub_files:
handled += 1
progress.update(value=handled / total * 100,
text=f"正在处理 {sub_file.name} ...",
key=ProgressKey.BatchRename)
text=f"正在处理 {sub_file.name} ...")
if sub_file.type == "dir":
continue
if not sub_file.extension:
@@ -190,19 +189,19 @@ def rename(fileitem: schemas.FileItem,
meta = MetaInfoPath(sub_path)
mediainfo = transferchain.recognize_media(meta)
if not mediainfo:
progress.end(ProgressKey.BatchRename)
progress.end()
return schemas.Response(success=False, message=f"{sub_path.name} 未识别到媒体信息")
new_path = transferchain.recommend_name(meta=meta, mediainfo=mediainfo)
if not new_path:
progress.end(ProgressKey.BatchRename)
progress.end()
return schemas.Response(success=False, message=f"{sub_path.name} 未识别到新名称")
ret: schemas.Response = rename(fileitem=sub_file,
new_name=Path(new_path).name,
recursive=False)
if not ret.success:
progress.end(ProgressKey.BatchRename)
progress.end()
return schemas.Response(success=False, message=f"{sub_path.name} 重命名失败!")
progress.end(ProgressKey.BatchRename)
progress.end()
# 重命名自己
result = StorageChain().rename_file(fileitem, new_name)
if result:

View File

@@ -254,14 +254,14 @@ async def get_progress(request: Request, process_type: str, _: schemas.TokenPayl
"""
实时获取处理进度返回格式为SSE
"""
progress = ProgressHelper()
progress = ProgressHelper(process_type)
async def event_generator():
try:
while not global_vars.is_system_stopped:
if await request.is_disconnected():
break
detail = progress.get(process_type)
detail = progress.get()
yield f"data: {json.dumps(detail)}\n\n"
await asyncio.sleep(0.5)
except asyncio.CancelledError:

View File

@@ -8,7 +8,7 @@ from app import schemas
from app.chain.media import MediaChain
from app.chain.storage import StorageChain
from app.chain.transfer import TransferChain
from app.core.config import settings
from app.core.config import settings, global_vars
from app.core.metainfo import MetaInfoPath
from app.core.security import verify_token, verify_apitoken
from app.db import get_db
@@ -75,6 +75,8 @@ async def remove_queue(fileitem: schemas.FileItem, _: schemas.TokenPayload = Dep
:param _: Token校验
"""
TransferChain().remove_from_queue(fileitem)
# 取消整理
global_vars.stop_transfer(fileitem.path)
return schemas.Response(success=True)

View File

@@ -541,6 +541,9 @@ class MediaChain(ChainBase):
# 处理目录内的文件
files = __list_files(_fileitem=fileitem)
for file in files:
if file.type == "dir":
# 电影不处理子目录
continue
self.scrape_metadata(fileitem=file,
mediainfo=mediainfo,
init_folder=False,
@@ -640,6 +643,9 @@ class MediaChain(ChainBase):
if recursive:
files = __list_files(_fileitem=fileitem)
for file in files:
if file.type == "dir" and not file.name.lower().startswith("season"):
# 电视剧不处理非季子目录
continue
self.scrape_metadata(fileitem=file,
mediainfo=mediainfo,
parent=fileitem if file.type == "file" else None,

View File

@@ -215,12 +215,11 @@ class SearchChain(ChainBase):
return []
# 开始新进度
progress = ProgressHelper()
progress.start(ProgressKey.Search)
progress = ProgressHelper(ProgressKey.Search)
progress.start()
# 开始过滤
progress.update(value=0, text=f'开始过滤,总 {len(torrents)} 个资源,请稍候...',
key=ProgressKey.Search)
progress.update(value=0, text=f'开始过滤,总 {len(torrents)} 个资源,请稍候...')
# 匹配订阅附加参数
if filter_params:
logger.info(f'开始附加参数过滤,附加参数:{filter_params} ...')
@@ -238,7 +237,7 @@ class SearchChain(ChainBase):
logger.info(f"过滤规则/剧集过滤完成,剩余 {len(torrents)} 个资源")
# 过滤完成
progress.update(value=50, text=f'过滤完成,剩余 {len(torrents)} 个资源', key=ProgressKey.Search)
progress.update(value=50, text=f'过滤完成,剩余 {len(torrents)} 个资源')
# 总数
_total = len(torrents)
@@ -251,14 +250,13 @@ class SearchChain(ChainBase):
try:
# 英文标题应该在别名/原标题中,不需要再匹配
logger.info(f"开始匹配结果 标题:{mediainfo.title},原标题:{mediainfo.original_title},别名:{mediainfo.names}")
progress.update(value=51, text=f'开始匹配,总 {_total} 个资源 ...', key=ProgressKey.Search)
progress.update(value=51, text=f'开始匹配,总 {_total} 个资源 ...')
for torrent in torrents:
if global_vars.is_system_stopped:
break
_count += 1
progress.update(value=(_count / _total) * 96,
text=f'正在匹配 {torrent.site_name},已完成 {_count} / {_total} ...',
key=ProgressKey.Search)
text=f'正在匹配 {torrent.site_name},已完成 {_count} / {_total} ...')
if not torrent.title:
continue
@@ -291,8 +289,7 @@ class SearchChain(ChainBase):
# 匹配完成
logger.info(f"匹配完成,共匹配到 {len(_match_torrents)} 个资源")
progress.update(value=97,
text=f'匹配完成,共匹配到 {len(_match_torrents)} 个资源',
key=ProgressKey.Search)
text=f'匹配完成,共匹配到 {len(_match_torrents)} 个资源')
# 去掉mediainfo中多余的数据
mediainfo.clear()
@@ -308,16 +305,14 @@ class SearchChain(ChainBase):
# 排序
progress.update(value=99,
text=f'正在对 {len(contexts)} 个资源进行排序,请稍候...',
key=ProgressKey.Search)
text=f'正在对 {len(contexts)} 个资源进行排序,请稍候...')
contexts = torrenthelper.sort_torrents(contexts)
# 结束进度
logger.info(f'搜索完成,共 {len(contexts)} 个资源')
progress.update(value=100,
text=f'搜索完成,共 {len(contexts)} 个资源',
key=ProgressKey.Search)
progress.end(ProgressKey.Search)
text=f'搜索完成,共 {len(contexts)} 个资源')
progress.end()
# 去重后返回
return self.__remove_duplicate(contexts)
@@ -521,8 +516,8 @@ class SearchChain(ChainBase):
return []
# 开始进度
progress = ProgressHelper()
progress.start(ProgressKey.Search)
progress = ProgressHelper(ProgressKey.Search)
progress.start()
# 开始计时
start_time = datetime.now()
# 总数
@@ -531,8 +526,7 @@ class SearchChain(ChainBase):
finish_count = 0
# 更新进度
progress.update(value=0,
text=f"开始搜索,共 {total_num} 个站点 ...",
key=ProgressKey.Search)
text=f"开始搜索,共 {total_num} 个站点 ...")
# 结果集
results = []
# 多线程
@@ -561,17 +555,15 @@ class SearchChain(ChainBase):
results.extend(result)
logger.info(f"站点搜索进度:{finish_count} / {total_num}")
progress.update(value=finish_count / total_num * 100,
text=f"正在搜索{keyword or ''},已完成 {finish_count} / {total_num} 个站点 ...",
key=ProgressKey.Search)
text=f"正在搜索{keyword or ''},已完成 {finish_count} / {total_num} 个站点 ...")
# 计算耗时
end_time = datetime.now()
# 更新进度
progress.update(value=100,
text=f"站点搜索完成,有效资源数:{len(results)},总耗时 {(end_time - start_time).seconds}",
key=ProgressKey.Search)
text=f"站点搜索完成,有效资源数:{len(results)},总耗时 {(end_time - start_time).seconds}")
logger.info(f"站点搜索完成,有效资源数:{len(results)},总耗时 {(end_time - start_time).seconds}")
# 结束进度
progress.end(ProgressKey.Search)
progress.end()
# 返回
return results
@@ -606,8 +598,8 @@ class SearchChain(ChainBase):
return []
# 开始进度
progress = ProgressHelper()
progress.start(ProgressKey.Search)
progress = ProgressHelper(ProgressKey.Search)
progress.start()
# 开始计时
start_time = datetime.now()
# 总数
@@ -616,8 +608,7 @@ class SearchChain(ChainBase):
finish_count = 0
# 更新进度
progress.update(value=0,
text=f"开始搜索,共 {total_num} 个站点 ...",
key=ProgressKey.Search)
text=f"开始搜索,共 {total_num} 个站点 ...")
# 结果集
results = []
@@ -648,18 +639,16 @@ class SearchChain(ChainBase):
results.extend(result)
logger.info(f"站点搜索进度:{finish_count} / {total_num}")
progress.update(value=finish_count / total_num * 100,
text=f"正在搜索{keyword or ''},已完成 {finish_count} / {total_num} 个站点 ...",
key=ProgressKey.Search)
text=f"正在搜索{keyword or ''},已完成 {finish_count} / {total_num} 个站点 ...")
# 计算耗时
end_time = datetime.now()
# 更新进度
progress.update(value=100,
text=f"站点搜索完成,有效资源数:{len(results)},总耗时 {(end_time - start_time).seconds}",
key=ProgressKey.Search)
text=f"站点搜索完成,有效资源数:{len(results)},总耗时 {(end_time - start_time).seconds}")
logger.info(f"站点搜索完成,有效资源数:{len(results)},总耗时 {(end_time - start_time).seconds}")
# 结束进度
progress.end(ProgressKey.Search)
progress.end()
# 返回
return results

View File

@@ -1184,6 +1184,42 @@ class SubscribeChain(ChainBase):
logger.error(f'follow用户分享订阅 {title} 添加失败:{message}')
logger.info(f'follow用户分享订阅刷新完成共添加 {success_count} 个订阅')
async def cache_calendar(self):
"""
预缓存订阅日历,实际上就是查询一遍所有订阅的媒体信息
前端请示是异常的,所以需要使用异步缓存方法
"""
logger.info(f'开始预缓存订阅日历 ...')
for subscribe in await SubscribeOper().async_list():
if global_vars.is_system_stopped:
break
try:
mtype = MediaType(subscribe.type)
except ValueError:
logger.error(f'订阅 {subscribe.name} 类型错误:{subscribe.type}')
continue
# 识别媒体信息
if mtype == MediaType.MOVIE:
mediainfo: MediaInfo = await self.async_recognize_media(mtype=mtype,
tmdbid=subscribe.tmdbid,
doubanid=subscribe.doubanid,
bangumiid=subscribe.bangumiid,
episode_group=subscribe.episode_group,
cache=False)
if not mediainfo:
logger.warn(
f'未识别到媒体信息,标题:{subscribe.name}tmdbid{subscribe.tmdbid}doubanid{subscribe.doubanid}')
continue
else:
episodes = await TmdbChain().async_tmdb_episodes(tmdbid=subscribe.tmdbid,
season=subscribe.season,
episode_group=subscribe.episode_group)
if not episodes:
logger.warn(
f'未识别到季集信息,标题:{subscribe.name}tmdbid{subscribe.tmdbid}豆瓣ID{subscribe.doubanid},季:{subscribe.season}')
continue
logger.info(f'订阅日历预缓存完成')
@staticmethod
def __update_subscribe_note(subscribe: Subscribe, downloads: Optional[List[Context]]):
"""

View File

@@ -555,8 +555,10 @@ class TransferChain(ChainBase, metaclass=Singleton):
processed_num = 0
# 失败数量
fail_num = 0
# 已完成文件
finished_files = []
progress = ProgressHelper()
progress = ProgressHelper(ProgressKey.FileTransfer)
while not global_vars.is_system_stopped:
try:
@@ -571,7 +573,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
if __queue_start:
logger.info("开始整理队列处理...")
# 启动进度
progress.start(ProgressKey.FileTransfer)
progress.start()
# 重置计数
processed_num = 0
fail_num = 0
@@ -579,8 +581,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
__process_msg = f"开始整理队列处理,当前共 {total_num} 个文件 ..."
logger.info(__process_msg)
progress.update(value=0,
text=__process_msg,
key=ProgressKey.FileTransfer)
text=__process_msg)
# 队列已开始
__queue_start = False
# 更新进度
@@ -588,7 +589,10 @@ class TransferChain(ChainBase, metaclass=Singleton):
logger.info(__process_msg)
progress.update(value=processed_num / total_num * 100,
text=__process_msg,
key=ProgressKey.FileTransfer)
data={
"current": Path(fileitem.path).as_posix(),
"finished": finished_files
})
# 整理
state, err_msg = self.__handle_transfer(task=task, callback=item.callback)
if not state:
@@ -596,20 +600,20 @@ class TransferChain(ChainBase, metaclass=Singleton):
fail_num += 1
# 更新进度
processed_num += 1
finished_files.append(Path(fileitem.path).as_posix())
__process_msg = f"{fileitem.name} 整理完成"
logger.info(__process_msg)
progress.update(value=processed_num / total_num * 100,
progress.update(value=(processed_num / total_num) * 100,
text=__process_msg,
key=ProgressKey.FileTransfer)
data={})
except queue.Empty:
if not __queue_start:
# 结束进度
__end_msg = f"整理队列处理完成,共整理 {processed_num} 个文件,失败 {fail_num}"
logger.info(__end_msg)
progress.update(value=100,
text=__end_msg,
key=ProgressKey.FileTransfer)
progress.end(ProgressKey.FileTransfer)
text=__end_msg)
progress.end()
# 重置计数
processed_num = 0
fail_num = 0
@@ -1165,15 +1169,16 @@ class TransferChain(ChainBase, metaclass=Singleton):
processed_num = 0
# 失败数量
fail_num = 0
# 已完成文件
finished_files = []
# 启动进度
progress = ProgressHelper()
progress.start(ProgressKey.FileTransfer)
progress = ProgressHelper(ProgressKey.FileTransfer)
progress.start()
__process_msg = f"开始整理,共 {total_num} 个文件 ..."
logger.info(__process_msg)
progress.update(value=0,
text=__process_msg,
key=ProgressKey.FileTransfer)
text=__process_msg)
try:
for transfer_task in transfer_tasks:
if global_vars.is_system_stopped:
@@ -1185,7 +1190,10 @@ class TransferChain(ChainBase, metaclass=Singleton):
logger.info(__process_msg)
progress.update(value=(processed_num + fail_num) / total_num * 100,
text=__process_msg,
key=ProgressKey.FileTransfer)
data={
"current": Path(transfer_task.fileitem.path).as_posix(),
"finished": finished_files,
})
state, err_msg = self.__handle_transfer(
task=transfer_task,
callback=self.__default_callback
@@ -1197,6 +1205,8 @@ class TransferChain(ChainBase, metaclass=Singleton):
fail_num += 1
else:
processed_num += 1
# 记录已完成
finished_files.append(Path(transfer_task.fileitem.path).as_posix())
finally:
transfer_tasks.clear()
del transfer_tasks
@@ -1206,8 +1216,8 @@ class TransferChain(ChainBase, metaclass=Singleton):
logger.info(__end_msg)
progress.update(value=100,
text=__end_msg,
key=ProgressKey.FileTransfer)
progress.end(ProgressKey.FileTransfer)
data={})
progress.end()
error_msg = "".join(err_msgs[:2]) + (f",等{len(err_msgs)}个文件错误!" if len(err_msgs) > 2 else "")
return all_success, error_msg
@@ -1352,12 +1362,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
else:
# 更新媒体图片
self.obtain_images(mediainfo=mediainfo)
# 开始进度
progress = ProgressHelper()
progress.start(ProgressKey.FileTransfer)
progress.update(value=0,
text=f"开始整理 {fileitem.path} ...",
key=ProgressKey.FileTransfer)
# 开始整理
state, errmsg = self.do_transfer(
fileitem=fileitem,
@@ -1378,7 +1383,6 @@ class TransferChain(ChainBase, metaclass=Singleton):
if not state:
return False, errmsg
progress.end(ProgressKey.FileTransfer)
logger.info(f"{fileitem.path} 整理完成")
return True, ""
else:
@@ -1467,13 +1471,9 @@ class TransferChain(ChainBase, metaclass=Singleton):
for file in torrent_files:
file_path = save_path / file.name
# 如果存在未被屏蔽的媒体文件,则不删除种子
if (
file_path.suffix in self.all_exts
and not self._is_blocked_by_exclude_words(
str(file_path), transfer_exclude_words
)
and file_path.exists()
):
if (file_path.suffix in self.all_exts
and not self._is_blocked_by_exclude_words(str(file_path), transfer_exclude_words)
and file_path.exists()):
return False
# 所有媒体文件都被屏蔽或不存在,可以删除种子

View File

@@ -5,11 +5,12 @@ import threading
from abc import ABC, abstractmethod
from functools import wraps
from pathlib import Path
from typing import Any, Dict, Optional, Generator, AsyncGenerator, Tuple
from typing import Any, Dict, Optional, Generator, AsyncGenerator, Tuple, Literal, Union
import aiofiles
import aioshutil
from anyio import Path as AsyncPath
from cachetools import LRUCache as MemoryLRUCache
from cachetools import TTLCache as MemoryTTLCache
from cachetools.keys import hashkey
@@ -19,6 +20,10 @@ from app.log import logger
# 默认缓存区
DEFAULT_CACHE_REGION = "DEFAULT"
# 默认缓存大小
DEFAULT_CACHE_SIZE = 1024
# 默认缓存有效期
DEFAULT_CACHE_TTL = 365 * 24 * 60 * 60
lock = threading.Lock()
@@ -354,19 +359,22 @@ class MemoryBackend(CacheBackend):
基于 `cachetools.TTLCache` 实现的缓存后端
"""
def __init__(self, maxsize: Optional[int] = None, ttl: Optional[int] = None):
def __init__(self, cache_type: Literal['ttl', 'lru'] = 'ttl',
maxsize: Optional[int] = None, ttl: Optional[int] = None):
"""
初始化缓存实例
:param cache_type: 缓存类型,支持 'ttl'(默认)和 'lru'
:param maxsize: 缓存的最大条目数
:param ttl: 默认缓存存活时间,单位秒
"""
self.maxsize = maxsize or 1024 # 未设置时默认最大条目数为 1024
self.ttl = ttl
self.cache_type = cache_type
self.maxsize = maxsize or DEFAULT_CACHE_SIZE
self.ttl = ttl or DEFAULT_CACHE_TTL
# 存储各个 region 的缓存实例region -> TTLCache
self._region_caches: Dict[str, MemoryTTLCache] = {}
self._region_caches: Dict[str, Union[MemoryTTLCache, MemoryLRUCache]] = {}
def __get_region_cache(self, region: str) -> Optional[MemoryTTLCache]:
def __get_region_cache(self, region: str) -> Optional[Union[MemoryTTLCache, MemoryLRUCache]]:
"""
获取指定区域的缓存实例,如果不存在则返回 None
"""
@@ -386,10 +394,14 @@ class MemoryBackend(CacheBackend):
ttl = ttl or self.ttl
maxsize = kwargs.get("maxsize", self.maxsize)
region = self.get_region(region)
# 如果该 key 尚未有缓存实例,则创建一个新的 TTLCache 实例
region_cache = self._region_caches.setdefault(region, MemoryTTLCache(maxsize=maxsize, ttl=ttl))
# 设置缓存值
with lock:
# 如果该 key 尚未有缓存实例,则创建一个新的 TTLCache 实例
region_cache = self._region_caches.setdefault(
region,
MemoryTTLCache(maxsize=maxsize, ttl=ttl) if self.cache_type == 'ttl'
else MemoryLRUCache(maxsize=maxsize)
)
region_cache[key] = value
def exists(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION) -> bool:
@@ -462,7 +474,11 @@ class MemoryBackend(CacheBackend):
if region_cache is None:
yield from ()
return
for item in region_cache.items():
# 使用锁保护迭代过程,避免在迭代时缓存被修改
with lock:
# 创建快照避免并发修改问题
items_snapshot = list(region_cache.items())
for item in items_snapshot:
yield item
def close(self) -> None:
@@ -477,19 +493,22 @@ class AsyncMemoryBackend(AsyncCacheBackend):
基于 `cachetools.TTLCache` 实现的异步缓存后端
"""
def __init__(self, maxsize: Optional[int] = None, ttl: Optional[int] = None):
def __init__(self, cache_type: Literal['ttl', 'lru'] = 'ttl',
maxsize: Optional[int] = None, ttl: Optional[int] = None):
"""
初始化缓存实例
:param cache_type: 缓存类型,支持 'ttl'(默认)和 'lru'
:param maxsize: 缓存的最大条目数
:param ttl: 默认缓存存活时间,单位秒
"""
self.maxsize = maxsize or 1024 # 未设置时默认最大条目数为 1024
self.ttl = ttl
self.cache_type = cache_type
self.maxsize = maxsize or DEFAULT_CACHE_SIZE
self.ttl = ttl or DEFAULT_CACHE_TTL
# 存储各个 region 的缓存实例region -> TTLCache
self._region_caches: Dict[str, MemoryTTLCache] = {}
self._region_caches: Dict[str, Union[MemoryTTLCache, MemoryLRUCache]] = {}
def __get_region_cache(self, region: str) -> Optional[MemoryTTLCache]:
def __get_region_cache(self, region: str) -> Optional[Union[MemoryTTLCache, MemoryLRUCache]]:
"""
获取指定区域的缓存实例,如果不存在则返回 None
"""
@@ -509,10 +528,14 @@ class AsyncMemoryBackend(AsyncCacheBackend):
ttl = ttl or self.ttl
maxsize = kwargs.get("maxsize", self.maxsize)
region = self.get_region(region)
# 如果该 key 尚未有缓存实例,则创建一个新的 TTLCache 实例
region_cache = self._region_caches.setdefault(region, MemoryTTLCache(maxsize=maxsize, ttl=ttl))
# 设置缓存值
with lock:
# 如果该 key 尚未有缓存实例,则创建一个新的 TTLCache 实例
region_cache = self._region_caches.setdefault(
region,
MemoryTTLCache(maxsize=maxsize, ttl=ttl) if self.cache_type == 'ttl'
else MemoryLRUCache(maxsize=maxsize)
)
region_cache[key] = value
async def exists(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION) -> bool:
@@ -584,7 +607,11 @@ class AsyncMemoryBackend(AsyncCacheBackend):
region_cache = self.__get_region_cache(region)
if region_cache is None:
return
for item in region_cache.items():
# 使用锁保护迭代过程,避免在迭代时缓存被修改
with lock:
# 创建快照避免并发修改问题
items_snapshot = list(region_cache.items())
for item in items_snapshot:
yield item
async def close(self) -> None:
@@ -1007,10 +1034,13 @@ def AsyncFileCache(base: Path = settings.TEMP_PATH, ttl: Optional[int] = None) -
return AsyncFileBackend(base=base)
def Cache(maxsize: Optional[int] = None, ttl: Optional[int] = None) -> CacheBackend:
def Cache(cache_type: Literal['ttl', 'lru'] = 'ttl',
maxsize: Optional[int] = None,
ttl: Optional[int] = None) -> CacheBackend:
"""
根据配置获取缓存后端实例内存或Redismaxsize仅在未启用Redis时生效
:param cache_type: 缓存类型,仅使用内存缓存时生效,支持 'ttl'(默认)和 'lru'
:param maxsize: 缓存的最大条目数仅使用cachetools时生效
:param ttl: 缓存的默认存活时间,单位秒
:return: 返回缓存后端实例
@@ -1019,13 +1049,16 @@ def Cache(maxsize: Optional[int] = None, ttl: Optional[int] = None) -> CacheBack
return RedisBackend(ttl=ttl)
else:
# 使用内存缓存maxsize需要有值
return MemoryBackend(maxsize=maxsize, ttl=ttl)
return MemoryBackend(cache_type=cache_type, maxsize=maxsize, ttl=ttl)
def AsyncCache(maxsize: Optional[int] = None, ttl: Optional[int] = None) -> AsyncCacheBackend:
def AsyncCache(cache_type: Literal['ttl', 'lru'] = 'ttl',
maxsize: Optional[int] = None,
ttl: Optional[int] = None) -> AsyncCacheBackend:
"""
根据配置获取异步缓存后端实例内存或Redismaxsize仅在未启用Redis时生效
:param cache_type: 缓存类型,仅使用内存缓存时生效,支持 'ttl'(默认)和 'lru'
:param maxsize: 缓存的最大条目数仅使用cachetools时生效
:param ttl: 缓存的默认存活时间,单位秒
:return: 返回异步缓存后端实例
@@ -1034,7 +1067,7 @@ def AsyncCache(maxsize: Optional[int] = None, ttl: Optional[int] = None) -> Asyn
return AsyncRedisBackend(ttl=ttl)
else:
# 使用异步内存缓存maxsize需要有值
return AsyncMemoryBackend(maxsize=maxsize, ttl=ttl)
return AsyncMemoryBackend(cache_type=cache_type, maxsize=maxsize, ttl=ttl)
def cached(region: Optional[str] = None, maxsize: Optional[int] = 1024, ttl: Optional[int] = None,
@@ -1054,13 +1087,13 @@ def cached(region: Optional[str] = None, maxsize: Optional[int] = 1024, ttl: Opt
# 检查是否为异步函数
is_async = inspect.iscoroutinefunction(func)
# 根据函数类型选择对应的缓存后端
# 根据函数类型选择对应的缓存后端没有ttl时默认是 LRU 缓存,否则是 TTL 缓存
if is_async:
# 异步函数使用异步缓存后端
cache_backend = AsyncCache(maxsize=maxsize, ttl=ttl)
cache_backend = AsyncCache(cache_type="ttl" if ttl else "lru", maxsize=maxsize, ttl=ttl)
else:
# 同步函数使用同步缓存后端
cache_backend = Cache(maxsize=maxsize, ttl=ttl)
cache_backend = Cache(cache_type="ttl" if ttl else "lru", maxsize=maxsize, ttl=ttl)
def should_cache(value: Any) -> bool:
"""
@@ -1201,43 +1234,15 @@ class CacheProxy:
缓存代理类,将缓存后端的方法直接代理到实例上
"""
def __init__(self, cache_backend: CacheBackend, region: str, ttl: Optional[int] = None):
def __init__(self, cache_backend: CacheBackend, region: str):
"""
初始化缓存代理
:param cache_backend: 缓存后端实例
:param region: 缓存区域
:param ttl: TTL 时间(仅用于 TTL 缓存)
"""
self._cache_backend = cache_backend
self._region = region
self._ttl = ttl
def __getattr__(self, name):
"""
代理所有未定义的方法到缓存后端
"""
if hasattr(self._cache_backend, name):
method = getattr(self._cache_backend, name)
if callable(method):
# 检查方法签名,自动添加 region 和 ttl 参数
def wrapper(*args, **kwargs):
# 检查方法是否接受 region 参数
sig = inspect.signature(method)
params = sig.parameters
# 如果方法接受 region 参数且未提供,则添加
if 'region' in params and 'region' not in kwargs:
kwargs['region'] = self._region
# 如果方法接受 ttl 参数且未提供,且我们有 ttl 值,则添加
if 'ttl' in params and 'ttl' not in kwargs and self._ttl is not None:
kwargs['ttl'] = self._ttl
return method(*args, **kwargs)
return wrapper
raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'")
def __getitem__(self, key):
"""
@@ -1253,8 +1258,6 @@ class CacheProxy:
设置缓存项
"""
kwargs = {'region': self._region}
if self._ttl is not None:
kwargs['ttl'] = self._ttl # noqa
self._cache_backend.set(key, value, **kwargs)
def __delitem__(self, key):
@@ -1284,6 +1287,102 @@ class CacheProxy:
"""
return sum(1 for _ in self._cache_backend.items(region=self._region))
def is_redis(self) -> bool:
"""
检查当前缓存后端是否为 Redis
"""
return self._cache_backend.is_redis()
def get(self, key: str, **kwargs) -> Any:
"""
获取缓存值
"""
kwargs.setdefault('region', self._region)
return self._cache_backend.get(key, **kwargs)
def set(self, key: str, value: Any, **kwargs) -> None:
"""
设置缓存值
"""
kwargs.setdefault('region', self._region)
self._cache_backend.set(key, value, **kwargs)
def delete(self, key: str, **kwargs) -> None:
"""
删除缓存值
"""
kwargs.setdefault('region', self._region)
self._cache_backend.delete(key, **kwargs)
def exists(self, key: str, **kwargs) -> bool:
"""
检查缓存键是否存在
"""
kwargs.setdefault('region', self._region)
return self._cache_backend.exists(key, **kwargs)
def clear(self, **kwargs) -> None:
"""
清除缓存
"""
kwargs.setdefault('region', self._region)
self._cache_backend.clear(**kwargs)
def items(self, **kwargs):
"""
获取所有缓存项
"""
kwargs.setdefault('region', self._region)
return self._cache_backend.items(**kwargs)
def keys(self, **kwargs):
"""
获取所有缓存键
"""
kwargs.setdefault('region', self._region)
return self._cache_backend.keys(**kwargs)
def values(self, **kwargs):
"""
获取所有缓存值
"""
kwargs.setdefault('region', self._region)
return self._cache_backend.values(**kwargs)
def update(self, other: Dict[str, Any], **kwargs) -> None:
"""
更新缓存
"""
kwargs.setdefault('region', self._region)
self._cache_backend.update(other, **kwargs)
def pop(self, key: str, default: Any = None, **kwargs) -> Any:
"""
弹出缓存项
"""
kwargs.setdefault('region', self._region)
return self._cache_backend.pop(key, default, **kwargs)
def popitem(self, **kwargs) -> Tuple[str, Any]:
"""
弹出最后一个缓存项
"""
kwargs.setdefault('region', self._region)
return self._cache_backend.popitem(**kwargs)
def setdefault(self, key: str, default: Any = None, **kwargs) -> Any:
"""
设置默认值
"""
kwargs.setdefault('region', self._region)
return self._cache_backend.setdefault(key, default, **kwargs)
def close(self) -> None:
"""
关闭缓存连接
"""
self._cache_backend.close()
class TTLCache(CacheProxy):
"""
@@ -1291,7 +1390,10 @@ class TTLCache(CacheProxy):
使用项目的缓存后端实现,支持 Redis 和内存缓存
"""
def __init__(self, maxsize: int = 1024, ttl: int = None, region: Optional[str] = None):
def __init__(self,
region: Optional[str] = DEFAULT_CACHE_REGION,
maxsize: Optional[int] = DEFAULT_CACHE_SIZE,
ttl: Optional[int] = DEFAULT_CACHE_TTL):
"""
初始化 TTL 缓存
@@ -1299,7 +1401,7 @@ class TTLCache(CacheProxy):
:param ttl: 缓存的存活时间,单位秒
:param region: 缓存的区,为 None 时使用默认区
"""
super().__init__(Cache(maxsize=maxsize, ttl=ttl), region or DEFAULT_CACHE_REGION, ttl)
super().__init__(Cache(cache_type='ttl', maxsize=maxsize, ttl=ttl), region)
class LRUCache(CacheProxy):
@@ -1308,11 +1410,14 @@ class LRUCache(CacheProxy):
使用项目的缓存后端实现,支持 Redis 和内存缓存
"""
def __init__(self, maxsize: int = 1024, region: Optional[str] = None):
def __init__(self,
region: Optional[str] = DEFAULT_CACHE_REGION,
maxsize: Optional[int] = DEFAULT_CACHE_SIZE
):
"""
初始化 LRU 缓存
:param maxsize: 缓存的最大条目数
:param region: 缓存的区,为 None 时使用默认区
"""
super().__init__(Cache(maxsize=maxsize), region or DEFAULT_CACHE_REGION)
super().__init__(Cache(cache_type='lru', maxsize=maxsize), region)

View File

@@ -798,6 +798,8 @@ class GlobalVar(object):
SUBSCRIPTIONS: List[dict] = []
# 需应急停止的工作流
EMERGENCY_STOP_WORKFLOWS: List[int] = []
# 需应急停止文件整理
EMERGENCY_STOP_TRANSFER: List[str] = []
def stop_system(self):
"""
@@ -838,12 +840,30 @@ class GlobalVar(object):
if workflow_id in self.EMERGENCY_STOP_WORKFLOWS:
self.EMERGENCY_STOP_WORKFLOWS.remove(workflow_id)
def is_workflow_stopped(self, workflow_id: int):
def is_workflow_stopped(self, workflow_id: int) -> bool:
"""
是否停止工作流
"""
return self.is_system_stopped or workflow_id in self.EMERGENCY_STOP_WORKFLOWS
def stop_transfer(self, path: str):
"""
停止文件整理
"""
if path not in self.EMERGENCY_STOP_TRANSFER:
self.EMERGENCY_STOP_TRANSFER.append(path)
def is_transfer_stopped(self, path: str) -> bool:
"""
是否停止文件整理
"""
if self.is_system_stopped:
return True
if path in self.EMERGENCY_STOP_TRANSFER:
self.EMERGENCY_STOP_TRANSFER.remove(path)
return True
return False
# 全局标识
global_vars = GlobalVar()

View File

@@ -450,10 +450,7 @@ class EventManager(metaclass=Singleton):
logger.debug(f"Handler {self.__get_handler_identifier(handler)} is disabled. Skipping execution")
return
try:
self.__invoke_handler_by_type_sync(handler, event)
except Exception as e:
self.__handle_event_error(event, handler, e)
self.__invoke_handler_by_type_sync(handler, event)
async def __safe_invoke_handler_async(self, handler: Callable, event: Event):
"""
@@ -465,10 +462,7 @@ class EventManager(metaclass=Singleton):
logger.debug(f"Handler {self.__get_handler_identifier(handler)} is disabled. Skipping execution")
return
try:
await self.__invoke_handler_by_type_async(handler, event)
except Exception as e:
self.__handle_event_error(event, handler, e)
await self.__invoke_handler_by_type_async(handler, event)
def __invoke_handler_by_type_sync(self, handler: Callable, event: Event):
"""
@@ -486,7 +480,17 @@ class EventManager(metaclass=Singleton):
if class_name in plugin_manager.get_plugin_ids():
# 插件处理器
plugin_manager.run_plugin_method(class_name, method_name, event)
plugin = plugin_manager.running_plugins.get(class_name)
if not plugin:
return
method = getattr(plugin, method_name, None)
if not method:
return
try:
method(event)
except Exception as e:
self.__handle_event_error(event=event, module_name=plugin.name,
class_name=class_name, method_name=method_name, e=e)
elif class_name in module_manager.get_module_ids():
# 模块处理器
module = module_manager.get_running_module(class_name)
@@ -495,16 +499,24 @@ class EventManager(metaclass=Singleton):
method = getattr(module, method_name, None)
if not method:
return
method(event)
try:
method(event)
except Exception as e:
self.__handle_event_error(event=event, module_name=module.get_name(),
class_name=class_name, method_name=method_name, e=e)
else:
# 全局处理器
class_obj = self.__get_class_instance(class_name)
if not class_obj or not hasattr(class_obj, method_name):
return
method = getattr(class_obj, method_name)
method = getattr(class_obj, method_name, None)
if not method:
return
method(event)
try:
method(event)
except Exception as e:
self.__handle_event_error(event=event, module_name=class_name,
class_name=class_name, method_name=method_name, e=e)
async def __invoke_handler_by_type_async(self, handler: Callable, event: Event):
"""
@@ -537,52 +549,62 @@ class EventManager(metaclass=Singleton):
names = handler.__qualname__.split(".")
return names[0], names[1]
@staticmethod
async def __invoke_plugin_method_async(handler: Any, class_name: str, method_name: str, event: Event):
async def __invoke_plugin_method_async(self, handler: Any, class_name: str, method_name: str, event: Event):
"""
异步调用插件方法
"""
plugin = handler.running_plugins.get(class_name)
if plugin and hasattr(plugin, method_name):
method = getattr(plugin, method_name)
if not plugin:
return
method = getattr(plugin, method_name, None)
if not method:
return
try:
if inspect.iscoroutinefunction(method):
await method(event)
else:
# 插件同步函数在异步环境中运行,避免阻塞
await run_in_threadpool(method, event)
except Exception as e:
self.__handle_event_error(event=event, handler=handler, e=e, module_name=plugin.name)
@staticmethod
async def __invoke_module_method_async(handler: Any, class_name: str, method_name: str, event: Event):
async def __invoke_module_method_async(self, handler: Any, class_name: str, method_name: str, event: Event):
"""
异步调用模块方法
"""
module = handler.get_running_module(class_name)
if not module:
return
method = getattr(module, method_name, None)
if not method:
return
if inspect.iscoroutinefunction(method):
await method(event)
else:
method(event)
try:
if inspect.iscoroutinefunction(method):
await method(event)
else:
method(event)
except Exception as e:
self.__handle_event_error(event=event, module_name=module.get_name(),
class_name=class_name, method_name=method_name, e=e)
async def __invoke_global_method_async(self, class_name: str, method_name: str, event: Event):
"""
异步调用全局对象方法
"""
class_obj = self.__get_class_instance(class_name)
if not class_obj or not hasattr(class_obj, method_name):
if not class_obj:
return
method = getattr(class_obj, method_name)
if inspect.iscoroutinefunction(method):
await method(event)
else:
method(event)
method = getattr(class_obj, method_name, None)
if not method:
return
try:
if inspect.iscoroutinefunction(method):
await method(event)
else:
method(event)
except Exception as e:
self.__handle_event_error(event=event, module_name=class_name,
class_name=class_name, method_name=method_name, e=e)
@staticmethod
def __get_class_instance(class_name: str):
@@ -609,7 +631,11 @@ class EventManager(metaclass=Singleton):
module_name = f"app.chain.{class_name[:-5].lower()}"
module = importlib.import_module(module_name)
elif class_name.endswith("Helper"):
module_name = f"app.helper.{class_name[:-6].lower()}"
# 特殊处理 Async 类
if class_name.startswith("Async"):
module_name = f"app.helper.{class_name[5:-6].lower()}"
else:
module_name = f"app.helper.{class_name[:-6].lower()}"
module = importlib.import_module(module_name)
else:
module_name = f"app.{class_name.lower()}"
@@ -649,18 +675,16 @@ class EventManager(metaclass=Singleton):
"""
logger.debug(f"{stage} - {event}")
def __handle_event_error(self, event: Event, handler: Callable, e: Exception):
def __handle_event_error(self, event: Event, module_name: str,
class_name: str, method_name: str, e: Exception):
"""
全局错误处理器,用于处理事件处理中的异常
"""
logger.error(f"事件处理出错:{str(e)} - {traceback.format_exc()}")
names = handler.__qualname__.split(".")
class_name, method_name = names[0], names[1]
logger.error(f"{module_name} 事件处理出错:{str(e)} - {traceback.format_exc()}")
# 发送系统错误通知
from app.helper.message import MessageHelper
MessageHelper().put(title=f"{event.event_type} 事件处理出错",
MessageHelper().put(title=f"{module_name} 处理事件 {event.event_type} 出错",
message=f"{class_name}.{method_name}{str(e)}",
role="system")
self.send_event(

View File

@@ -1189,6 +1189,7 @@ class PluginManager(metaclass=Singleton):
async def async_get_online_plugins(self, force: bool = False) -> List[schemas.Plugin]:
"""
异步获取所有在线插件信息
:param force: 是否强制刷新(忽略缓存)
"""
if not settings.PLUGIN_MARKET:
return []

View File

@@ -20,7 +20,7 @@ class SiteUserData(Base):
# 用户名
username = Column(String)
# 用户ID
userid = Column(Integer)
userid = Column(String)
# 用户等级
user_level = Column(String)
# 加入时间

View File

@@ -119,6 +119,14 @@ class SubscribeOper(DbOper):
return Subscribe.get_by_state(self._db, state)
return Subscribe.list(self._db)
async def async_list(self, state: Optional[str] = None) -> List[Subscribe]:
"""
异步获取订阅列表
"""
if state:
return await Subscribe.async_get_by_state(self._db, state)
return await Subscribe.async_list(self._db)
def delete(self, sid: int):
"""
删除订阅

View File

@@ -610,14 +610,19 @@ class PluginHelper(metaclass=WeakSingleton):
asset = next((a for a in assets if a.get("name") == asset_name), None)
if not asset:
return False, f"未找到资产文件:{asset_name}"
download_url = asset.get("browser_download_url")
if not download_url:
return False, "资产缺少下载地址"
asset_id = asset.get("id")
if not asset_id:
return False, "资产缺少ID信息"
# 构建资产的API下载URL
download_url = f"https://api.github.com/repos/{user_repo}/releases/assets/{asset_id}"
except Exception as e:
logger.error(f"解析 Release 信息失败:{e}")
return False, f"解析 Release 信息失败:{e}"
res = self.__request_with_fallback(download_url, headers=settings.REPO_GITHUB_HEADERS(repo=user_repo))
# 使用资产的API端点下载需要设置Accept头为application/octet-stream
headers = settings.REPO_GITHUB_HEADERS(repo=user_repo).copy()
headers["Accept"] = "application/octet-stream"
res = self.__request_with_fallback(download_url, headers=headers, is_api=True)
if res is None or res.status_code != 200:
return False, f"下载资产失败:{res.status_code if res else '连接失败'}"
@@ -911,10 +916,10 @@ class PluginHelper(metaclass=WeakSingleton):
"""
# 异步版本直接调用不带缓存的版本(缓存在异步环境下可能有并发问题)
if force:
return await self._async_get_plugins_uncached(repo_url, package_version)
await self._async_get_plugins_cached.cache_clear()
return await self._async_get_plugins_cached(repo_url, package_version)
@cached(maxsize=64, ttl=1800)
@cached(maxsize=128, ttl=1800)
async def _async_get_plugins_cached(self, repo_url: str,
package_version: Optional[str] = None) -> Optional[Dict[str, dict]]:
"""
@@ -1525,15 +1530,21 @@ class PluginHelper(metaclass=WeakSingleton):
asset = next((a for a in assets if a.get("name") == asset_name), None)
if not asset:
return False, f"未找到资产文件:{asset_name}"
download_url = asset.get("browser_download_url")
if not download_url:
return False, "资产缺少下载地址"
asset_id = asset.get("id")
if not asset_id:
return False, "资产缺少ID信息"
# 构建资产的API下载URL
download_url = f"https://api.github.com/repos/{user_repo}/releases/assets/{asset_id}"
except Exception as e:
logger.error(f"解析 Release 信息失败:{e}")
return False, f"解析 Release 信息失败:{e}"
# 使用资产的API端点下载需要设置Accept头为application/octet-stream
headers = settings.REPO_GITHUB_HEADERS(repo=user_repo).copy()
headers["Accept"] = "application/octet-stream"
res = await self.__async_request_with_fallback(download_url,
headers=settings.REPO_GITHUB_HEADERS(repo=user_repo))
headers=headers,
is_api=True)
if res is None or res.status_code != 200:
return False, f"下载资产失败:{res.status_code if res else '连接失败'}"

View File

@@ -1,55 +1,76 @@
from enum import Enum
from typing import Union, Optional
from app.core.cache import TTLCache
from app.schemas.types import ProgressKey
from app.utils.singleton import WeakSingleton
class ProgressHelper(metaclass=WeakSingleton):
"""
处理进度辅助类
"""
def __init__(self):
self._process_detail = {}
def init_config(self):
pass
def __reset(self, key: Union[ProgressKey, str]):
def __init__(self, key: Union[ProgressKey, str]):
if isinstance(key, Enum):
key = key.value
self._process_detail[key] = {
self._key = key
self._progress = TTLCache(region="progress", maxsize=1024, ttl=24 * 60 * 60)
def __reset(self):
"""
重置进度
"""
self._progress[self._key] = {
"enable": False,
"value": 0,
"text": "请稍候..."
"text": "请稍候...",
"data": {}
}
def start(self, key: Union[ProgressKey, str]):
self.__reset(key)
if isinstance(key, Enum):
key = key.value
self._process_detail[key]['enable'] = True
def end(self, key: Union[ProgressKey, str]):
if isinstance(key, Enum):
key = key.value
if not self._process_detail.get(key):
def start(self):
"""
开始进度
"""
self.__reset()
current = self._progress.get(self._key)
if not current:
return
self._process_detail[key] = {
"enable": False,
"value": 100,
"text": "正在处理..."
}
current['enable'] = True
self._progress[self._key] = current
def update(self, key: Union[ProgressKey, str], value: Union[float, int] = None, text: Optional[str] = None):
if isinstance(key, Enum):
key = key.value
if not self._process_detail.get(key, {}).get('enable'):
def end(self):
"""
结束进度
"""
current = self._progress.get(self._key)
if not current:
return
current.update(
{
"enable": False,
"value": 100,
"text": ""
}
)
self._progress[self._key] = current
def update(self, value: Union[float, int] = None, text: Optional[str] = None, data: dict = None):
"""
更新进度
"""
current = self._progress.get(self._key)
if not current or not current.get('enable'):
return
if value:
self._process_detail[key]['value'] = value
current['value'] = value
if text:
self._process_detail[key]['text'] = text
current['text'] = text
if data:
if not current.get('data'):
current['data'] = {}
current['data'].update(data)
self._progress[self._key] = current
def get(self, key: Union[ProgressKey, str]) -> dict:
if isinstance(key, Enum):
key = key.value
return self._process_detail.get(key)
def get(self) -> dict:
return self._progress.get(self._key)

View File

@@ -1,5 +1,7 @@
import os
import signal
import threading
import time
from pathlib import Path
from typing import Tuple
@@ -41,8 +43,8 @@ class SystemHelper:
判断是否可以内部重启
"""
return (
Path("/var/run/docker.sock").exists()
or settings.DOCKER_CLIENT_API != "tcp://127.0.0.1:38379"
Path("/var/run/docker.sock").exists()
or settings.DOCKER_CLIENT_API != "tcp://127.0.0.1:38379"
)
@staticmethod
@@ -64,7 +66,7 @@ class SystemHelper:
if index_resolv_conf != -1:
index_second_slash = data.rfind(" ", 0, index_resolv_conf)
index_first_slash = (
data.rfind("/", 0, index_second_slash) + 1
data.rfind("/", 0, index_second_slash) + 1
)
container_id = data[index_first_slash:index_second_slash]
except Exception as e:
@@ -113,6 +115,8 @@ class SystemHelper:
if has_restart_policy:
# 有重启策略,使用优雅退出方式
logger.info("检测到容器配置了自动重启策略,使用优雅重启方式...")
# 启动优雅退出超时监控
SystemHelper._start_graceful_shutdown_monitor()
# 发送SIGTERM信号给当前进程触发优雅停止
os.kill(os.getpid(), signal.SIGTERM)
return True, ""
@@ -126,6 +130,25 @@ class SystemHelper:
logger.warning("降级为Docker API重启...")
return SystemHelper._docker_api_restart()
@staticmethod
def _start_graceful_shutdown_monitor():
"""
启动优雅退出超时监控
如果30秒内进程没有退出则使用Docker API强制重启
"""
def monitor_thread():
time.sleep(30) # 等待30秒
logger.warning("优雅退出超时30秒使用Docker API强制重启...")
try:
SystemHelper._docker_api_restart()
except Exception as e:
logger.error(f"强制重启失败: {str(e)}")
# 在后台线程中启动监控
thread = threading.Thread(target=monitor_thread, daemon=True)
thread.start()
@staticmethod
def _docker_api_restart() -> Tuple[bool, str]:
"""

View File

@@ -226,11 +226,9 @@ class DoubanApi(metaclass=WeakSingleton):
"""
处理HTTP响应
"""
if resp is not None and resp.status_code == 400 and "rate_limit" in resp.text:
return resp.json()
return resp.json() if resp else {}
return resp.json() if resp is not None else None
@cached(maxsize=settings.CONF.douban, ttl=settings.CONF.meta)
@cached(maxsize=settings.CONF.douban, ttl=settings.CONF.meta, skip_none=True)
def __invoke(self, url: str, **kwargs) -> dict:
"""
GET请求
@@ -242,7 +240,7 @@ class DoubanApi(metaclass=WeakSingleton):
).get_res(url=req_url, params=params)
return self._handle_response(resp)
@cached(maxsize=settings.CONF.douban, ttl=settings.CONF.meta)
@cached(maxsize=settings.CONF.douban, ttl=settings.CONF.meta, skip_none=True)
async def __async_invoke(self, url: str, **kwargs) -> dict:
"""
GET请求异步版本
@@ -265,7 +263,7 @@ class DoubanApi(metaclass=WeakSingleton):
params.pop('_ts')
return req_url, params
@cached(maxsize=settings.CONF.douban, ttl=settings.CONF.meta)
@cached(maxsize=settings.CONF.douban, ttl=settings.CONF.meta, skip_none=True)
def __post(self, url: str, **kwargs) -> dict:
"""
POST请求
@@ -287,7 +285,7 @@ class DoubanApi(metaclass=WeakSingleton):
).post_res(url=req_url, data=params)
return self._handle_response(resp)
@cached(maxsize=settings.CONF.douban, ttl=settings.CONF.meta)
@cached(maxsize=settings.CONF.douban, ttl=settings.CONF.meta, skip_none=True)
async def __async_post(self, url: str, **kwargs) -> dict:
"""
POST请求异步版本

View File

@@ -1,10 +1,39 @@
from abc import ABCMeta, abstractmethod
from pathlib import Path
from typing import Optional, List, Dict, Tuple
from typing import Optional, List, Dict, Tuple, Callable, Union
from tqdm import tqdm
from app import schemas
from app.helper.progress import ProgressHelper
from app.helper.storage import StorageHelper
from app.log import logger
from app.utils.crypto import HashUtils
def transfer_process(path: str) -> Callable[[int | float], None]:
"""
传输进度回调
"""
pbar = tqdm(total=100, desc="整理进度", unit="%")
progress = ProgressHelper(HashUtils.md5(path))
progress.start()
def update_progress(percent: Union[int, float]) -> None:
"""
更新进度百分比
"""
percent_value = int(percent)
pbar.n = percent_value
# 更新进度
pbar.refresh()
progress.update(value=percent_value, text=f"{path} 进度:{percent_value}%")
# 完成时结束
if percent_value >= 100:
progress.end()
pbar.close()
return update_progress
class StorageBase(metaclass=ABCMeta):

View File

@@ -1,6 +1,5 @@
import base64
import hashlib
import io
import secrets
import threading
import time
@@ -8,12 +7,12 @@ from pathlib import Path
from typing import List, Optional, Tuple, Union
import requests
from tqdm import tqdm
from app import schemas
from app.core.config import settings
from app.core.config import settings, global_vars
from app.log import logger
from app.modules.filemanager import StorageBase
from app.modules.filemanager.storages import transfer_process
from app.schemas.types import StorageSchema
from app.utils.singleton import WeakSingleton
from app.utils.string import StringUtils
@@ -46,6 +45,9 @@ class AliPan(StorageBase, metaclass=WeakSingleton):
# 基础url
base_url = "https://openapi.alipan.com"
# 文件块大小默认10MB
chunk_size = 10 * 1024 * 1024
def __init__(self):
super().__init__()
self._auth_state = {}
@@ -249,10 +251,15 @@ class AliPan(StorageBase, metaclass=WeakSingleton):
# 检查会话
self._check_session()
resp = self.session.request(
method, f"{self.base_url}{endpoint}",
**kwargs
)
try:
resp = self.session.request(
method, f"{self.base_url}{endpoint}",
**kwargs
)
except requests.exceptions.RequestException as e:
logger.error(f"【阿里云盘】{method} 请求 {endpoint} 网络错误: {str(e)}")
return None
if resp is None:
logger.warn(f"【阿里云盘】{method} 请求 {endpoint} 失败!")
return None
@@ -580,29 +587,6 @@ class AliPan(StorageBase, metaclass=WeakSingleton):
raise Exception(resp.get("message"))
return resp
@staticmethod
def _log_progress(desc: str, total: int) -> tqdm:
"""
创建一个可以输出到日志的进度条
"""
class TqdmToLogger(io.StringIO):
def write(s, buf): # noqa
buf = buf.strip('\r\n\t ')
if buf:
logger.info(buf)
return tqdm(
total=total,
unit='B',
unit_scale=True,
desc=desc,
file=TqdmToLogger(),
mininterval=1.0,
maxinterval=5.0,
miniters=1
)
def upload(self, target_dir: schemas.FileItem, local_path: Path,
new_name: Optional[str] = None) -> Optional[schemas.FileItem]:
"""
@@ -643,21 +627,26 @@ class AliPan(StorageBase, metaclass=WeakSingleton):
# 4. 初始化进度条
logger.info(f"【阿里云盘】开始上传: {local_path} -> {target_path},分片数:{len(part_info_list)}")
progress_bar = self._log_progress(f"【阿里云盘】{target_name} 上传进度", file_size)
progress_callback = transfer_process(local_path.as_posix())
# 5. 分片上传循环
uploaded_size = 0
with open(local_path, 'rb') as f:
for part_info in part_info_list:
part_num = part_info['part_number']
if global_vars.is_transfer_stopped(local_path.as_posix()):
logger.info(f"【阿里云盘】{target_name} 上传已取消!")
return None
# 计算分片参数
part_num = part_info['part_number']
start = (part_num - 1) * chunk_size
end = min(start + chunk_size, file_size)
current_chunk_size = end - start
# 更新进度条(已存在的分片)
if part_num in uploaded_parts:
progress_bar.update(current_chunk_size)
uploaded_size += current_chunk_size
progress_callback((uploaded_size * 100) / file_size)
continue
# 准备分片数据
@@ -675,7 +664,6 @@ class AliPan(StorageBase, metaclass=WeakSingleton):
upload_url = new_urls[0]['upload_url']
else:
upload_url = part_info['upload_url']
# 执行上传
logger.info(
f"【阿里云盘】开始 第{attempt + 1}次 上传 {target_name} 分片 {part_num} ...")
@@ -694,13 +682,13 @@ class AliPan(StorageBase, metaclass=WeakSingleton):
# 处理上传结果
if success:
uploaded_parts.add(part_num)
progress_bar.update(current_chunk_size)
uploaded_size += current_chunk_size
progress_callback((uploaded_size * 100) / file_size)
else:
raise Exception(f"【阿里云盘】{target_name} 分片 {part_num} 上传失败!")
# 6. 关闭进度条
if progress_bar:
progress_bar.close()
progress_callback(100)
# 7. 完成上传
result = self._complete_upload(drive_id=target_dir.drive_id, file_id=file_id, upload_id=upload_id)
@@ -712,7 +700,7 @@ class AliPan(StorageBase, metaclass=WeakSingleton):
def download(self, fileitem: schemas.FileItem, path: Path = None) -> Optional[Path]:
"""
限速处理的下载
实时进度显示的下载
"""
download_info = self._request_api(
"POST",
@@ -723,14 +711,57 @@ class AliPan(StorageBase, metaclass=WeakSingleton):
}
)
if not download_info:
logger.error(f"【阿里云盘】获取下载链接失败: {fileitem.name}")
return None
download_url = download_info.get("url")
if not download_url:
logger.error(f"【阿里云盘】下载链接为空: {fileitem.name}")
return None
local_path = path or settings.TEMP_PATH / fileitem.name
with requests.get(download_url, stream=True) as r:
r.raise_for_status()
with open(local_path, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
# 获取文件大小
file_size = fileitem.size
# 初始化进度条
logger.info(f"【阿里云盘】开始下载: {fileitem.name} -> {local_path}")
progress_callback = transfer_process(Path(fileitem.path).as_posix())
try:
with requests.get(download_url, stream=True) as r:
r.raise_for_status()
downloaded_size = 0
with open(local_path, "wb") as f:
for chunk in r.iter_content(chunk_size=self.chunk_size):
if global_vars.is_transfer_stopped(fileitem.path):
logger.info(f"【阿里云盘】{fileitem.path} 下载已取消!")
return None
if chunk:
f.write(chunk)
# 更新进度
downloaded_size += len(chunk)
if file_size:
progress = (downloaded_size * 100) / file_size
progress_callback(progress)
# 完成下载
progress_callback(100)
logger.info(f"【阿里云盘】下载完成: {fileitem.name}")
except requests.exceptions.RequestException as e:
logger.error(f"【阿里云盘】下载网络错误: {fileitem.name} - {str(e)}")
# 删除可能部分下载的文件
if local_path.exists():
local_path.unlink()
return None
except Exception as e:
logger.error(f"【阿里云盘】下载失败: {fileitem.name} - {str(e)}")
# 删除可能部分下载的文件
if local_path.exists():
local_path.unlink()
return None
return local_path
def check(self) -> bool:

View File

@@ -1,4 +1,5 @@
import json
import time
from datetime import datetime
from pathlib import Path
from typing import Optional, List
@@ -7,9 +8,9 @@ import requests
from app import schemas
from app.core.cache import cached
from app.core.config import settings
from app.core.config import settings, global_vars
from app.log import logger
from app.modules.filemanager.storages import StorageBase
from app.modules.filemanager.storages import StorageBase, transfer_process
from app.schemas.types import StorageSchema
from app.utils.http import RequestUtils
from app.utils.singleton import WeakSingleton
@@ -31,6 +32,7 @@ class Alist(StorageBase, metaclass=WeakSingleton):
"move": "移动",
}
# 快照检查目录修改时间
snapshot_check_folder_modtime = settings.OPENLIST_SNAPSHOT_CHECK_FOLDER_MODTIME
def __init__(self):
@@ -42,6 +44,17 @@ class Alist(StorageBase, metaclass=WeakSingleton):
"""
self.__generate_token.cache_clear() # noqa
def _delay_get_item(self, path: Path) -> Optional[schemas.FileItem]:
"""
自动延迟重试 get_item 模块
"""
for _ in range(2):
time.sleep(2)
fileitem = self.get_item(path)
if fileitem:
return fileitem
return None
@property
def __get_base_url(self) -> str:
"""
@@ -269,7 +282,7 @@ class Alist(StorageBase, metaclass=WeakSingleton):
logger.warn(f'【OpenList】创建目录 {path} 失败,错误信息:{result["message"]}')
return None
return self.get_item(path)
return self._delay_get_item(path)
def get_folder(self, path: Path) -> Optional[schemas.FileItem]:
"""
@@ -560,6 +573,9 @@ class Alist(StorageBase, metaclass=WeakSingleton):
r.raise_for_status()
with open(local_path, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
if global_vars.is_transfer_stopped(fileitem.path):
logger.info(f"【OpenList】{fileitem.path} 下载已取消!")
return None
f.write(chunk)
if local_path.exists():
@@ -570,36 +586,81 @@ class Alist(StorageBase, metaclass=WeakSingleton):
self, fileitem: schemas.FileItem, path: Path, new_name: Optional[str] = None, task: bool = False
) -> Optional[schemas.FileItem]:
"""
上传文件
上传文件(带进度)
:param fileitem: 上传目录项
:param path: 本地文件路径
:param new_name: 上传后文件名
:param task: 是否为任务默认为False避免未完成上传时对文件进行操作
"""
encoded_path = UrlUtils.quote((Path(fileitem.path) / path.name).as_posix())
headers = self.__get_header_with_token()
headers.setdefault("Content-Type", "application/octet-stream")
headers.setdefault("As-Task", str(task).lower())
headers.setdefault("File-Path", encoded_path)
with open(path, "rb") as f:
resp = RequestUtils(headers=headers).put_res(
self.__get_api_url("/api/fs/put"),
data=f,
)
try:
# 获取文件大小
target_name = new_name or path.name
target_path = Path(fileitem.path) / target_name
if resp is None:
logger.warn(f"【OpenList】请求上传文件 {path} 失败")
# 初始化进度回调
progress_callback = transfer_process(path.as_posix())
# 准备上传请求
encoded_path = UrlUtils.quote(target_path.as_posix())
headers = self.__get_header_with_token()
headers.setdefault("Content-Type", "application/octet-stream")
headers.setdefault("As-Task", str(task).lower())
headers.setdefault("File-Path", encoded_path)
# 创建自定义的文件流,支持进度回调
class ProgressFileReader:
def __init__(self, file_path: Path, callback):
self.file = open(file_path, 'rb')
self.callback = callback
self.uploaded_size = 0
self.file_size = file_path.stat().st_size
def read(self, size=-1):
if global_vars.is_transfer_stopped(path.as_posix()):
logger.info(f"【OpenList】{path} 上传已取消!")
return None
chunk = self.file.read(size)
if chunk:
self.uploaded_size += len(chunk)
if self.callback:
percent = (self.uploaded_size * 100) / self.file_size
self.callback(percent)
return chunk
def close(self):
self.file.close()
# 使用自定义文件流上传
progress_reader = ProgressFileReader(path, progress_callback)
try:
resp = RequestUtils(headers=headers).put_res(
self.__get_api_url("/api/fs/put"),
data=progress_reader,
)
finally:
progress_reader.close()
if resp is None:
logger.warn(f"【OpenList】请求上传文件 {path} 失败")
return None
if resp.status_code != 200:
logger.warn(f"【OpenList】请求上传文件 {path} 失败,状态码:{resp.status_code}")
return None
# 完成上传
progress_callback(100)
# 获取上传后的文件项
new_item = self._delay_get_item(target_path)
if new_item and new_name and new_name != path.name:
if self.rename(new_item, new_name):
return self._delay_get_item(Path(new_item.path).with_name(new_name))
return new_item
except Exception as e:
logger.error(f"【OpenList】上传文件 {path} 失败:{e}")
return None
if resp.status_code != 200:
logger.warn(f"【OpenList】请求上传文件 {path} 失败,状态码:{resp.status_code}")
return None
new_item = self.get_item(Path(fileitem.path) / path.name)
if new_item and new_name and new_name != path.name:
if self.rename(new_item, new_name):
return self.get_item(Path(new_item.path).with_name(new_name))
return new_item
def detail(self, fileitem: schemas.FileItem) -> Optional[schemas.FileItem]:
"""
@@ -658,9 +719,9 @@ class Alist(StorageBase, metaclass=WeakSingleton):
return False
# 重命名
if fileitem.name != new_name:
self.rename(
self.get_item(path / fileitem.name), new_name
)
new_item = self._delay_get_item(path / fileitem.name)
if new_item:
self.rename(new_item, new_name)
return True
def move(self, fileitem: schemas.FileItem, path: Path, new_name: str) -> bool:

View File

@@ -3,9 +3,10 @@ from pathlib import Path
from typing import Optional, List
from app import schemas
from app.core.config import global_vars
from app.helper.directory import DirectoryHelper
from app.log import logger
from app.modules.filemanager.storages import StorageBase
from app.modules.filemanager.storages import StorageBase, transfer_process
from app.schemas.types import StorageSchema
from app.utils.system import SystemUtils
@@ -25,6 +26,9 @@ class LocalStorage(StorageBase):
"softlink": "软链接"
}
# 文件块大小默认100MB
chunk_size = 100 * 1024 * 1024
def init_storage(self):
"""
初始化
@@ -95,7 +99,7 @@ class LocalStorage(StorageBase):
# 遍历目录
path_obj = Path(path)
if not path_obj.exists():
logger.warn(f"local】目录不存在:{path}")
logger.warn(f"本地】目录不存在:{path}")
return []
# 如果是文件
@@ -167,7 +171,7 @@ class LocalStorage(StorageBase):
else:
shutil.rmtree(path_obj, ignore_errors=True)
except Exception as e:
logger.error(f"local】删除文件失败:{e}")
logger.error(f"本地】删除文件失败:{e}")
return False
return True
@@ -181,7 +185,7 @@ class LocalStorage(StorageBase):
try:
path_obj.rename(path_obj.parent / name)
except Exception as e:
logger.error(f"local】重命名文件失败:{e}")
logger.error(f"本地】重命名文件失败:{e}")
return False
return True
@@ -191,21 +195,119 @@ class LocalStorage(StorageBase):
"""
return Path(fileitem.path)
def upload(self, fileitem: schemas.FileItem, path: Path,
new_name: Optional[str] = None) -> Optional[schemas.FileItem]:
def _copy_with_progress(self, src: Path, dest: Path):
"""
上传文件
:param fileitem: 上传目录项
:param path: 本地文件路径
:param new_name: 上传后文件名
分块复制文件并回调进度
"""
dir_path = Path(fileitem.path)
target_path = dir_path / (new_name or path.name)
code, message = SystemUtils.move(path, target_path)
if code != 0:
logger.error(f"【local】移动文件失败{message}")
return None
return self.get_item(target_path)
total_size = src.stat().st_size
copied_size = 0
progress_callback = transfer_process(src.as_posix())
try:
with open(src, "rb") as fsrc, open(dest, "wb") as fdst:
while True:
if global_vars.is_transfer_stopped(src.as_posix()):
logger.info(f"【本地】{src} 复制已取消!")
return False
buf = fsrc.read(self.chunk_size)
if not buf:
break
fdst.write(buf)
copied_size += len(buf)
# 更新进度
if progress_callback:
percent = copied_size / total_size * 100
progress_callback(percent)
# 保留文件时间戳、权限等信息
shutil.copystat(src, dest)
return True
except Exception as e:
logger.error(f"【本地】复制文件 {src} 失败:{e}")
return False
finally:
progress_callback(100)
def upload(
self,
fileitem: schemas.FileItem,
path: Path,
new_name: Optional[str] = None
) -> Optional[schemas.FileItem]:
"""
上传文件(带进度)
"""
try:
dir_path = Path(fileitem.path)
target_path = dir_path / (new_name or path.name)
if self._copy_with_progress(path, target_path):
# 上传删除源文件
path.unlink()
return self.get_item(target_path)
except Exception as err:
logger.error(f"【本地】移动文件失败:{err}")
return None
@staticmethod
def __should_show_progress(src: Path, dest: Path):
"""
是否显示进度条
"""
src_isnetwork = SystemUtils.is_network_filesystem(src)
dest_isnetwork = SystemUtils.is_network_filesystem(dest)
if src_isnetwork and dest_isnetwork and SystemUtils.is_same_disk(src, dest):
return True
return False
def copy(
self,
fileitem: schemas.FileItem,
path: Path,
new_name: str
) -> bool:
"""
复制文件(带进度)
"""
try:
src = Path(fileitem.path)
dest = path / new_name
if self.__should_show_progress(src, dest):
if self._copy_with_progress(src, dest):
return True
else:
code, message = SystemUtils.copy(src, dest)
if code == 0:
return True
else:
logger.error(f"【本地】复制文件失败:{message}")
except Exception as err:
logger.error(f"【本地】复制文件失败:{err}")
return False
def move(
self,
fileitem: schemas.FileItem,
path: Path,
new_name: str
) -> bool:
"""
移动文件(带进度)
"""
try:
src = Path(fileitem.path)
dest = path / new_name
if self.__should_show_progress(src, dest):
if self._copy_with_progress(src, dest):
# 复制成功删除源文件
src.unlink()
return True
else:
code, message = SystemUtils.move(src, dest)
if code == 0:
return True
else:
logger.error(f"【本地】移动文件失败:{message}")
except Exception as err:
logger.error(f"【本地】移动文件失败:{err}")
return False
def link(self, fileitem: schemas.FileItem, target_file: Path) -> bool:
"""
@@ -214,7 +316,7 @@ class LocalStorage(StorageBase):
file_path = Path(fileitem.path)
code, message = SystemUtils.link(file_path, target_file)
if code != 0:
logger.error(f"local】硬链接文件失败:{message}")
logger.error(f"本地】硬链接文件失败:{message}")
return False
return True
@@ -225,35 +327,7 @@ class LocalStorage(StorageBase):
file_path = Path(fileitem.path)
code, message = SystemUtils.softlink(file_path, target_file)
if code != 0:
logger.error(f"local】软链接文件失败:{message}")
return False
return True
def copy(self, fileitem: schemas.FileItem, path: Path, new_name: str) -> bool:
"""
复制文件
:param fileitem: 文件项
:param path: 目标目录
:param new_name: 新文件名
"""
file_path = Path(fileitem.path)
code, message = SystemUtils.copy(file_path, path / new_name)
if code != 0:
logger.error(f"【local】复制文件失败{message}")
return False
return True
def move(self, fileitem: schemas.FileItem, path: Path, new_name: str) -> bool:
"""
移动文件
:param fileitem: 文件项
:param path: 目标目录
:param new_name: 新文件名
"""
file_path = Path(fileitem.path)
code, message = SystemUtils.move(file_path, path / new_name)
if code != 0:
logger.error(f"【local】移动文件失败{message}")
logger.error(f"本地】软链接文件失败:{message}")
return False
return True

View File

@@ -6,7 +6,7 @@ from typing import Optional, List
from app import schemas
from app.core.config import settings
from app.log import logger
from app.modules.filemanager.storages import StorageBase
from app.modules.filemanager.storages import StorageBase, transfer_process
from app.schemas.types import StorageSchema
from app.utils.string import StringUtils
from app.utils.system import SystemUtils
@@ -58,6 +58,41 @@ class Rclone(StorageBase):
else:
return None
@staticmethod
def __parse_rclone_progress(line: str) -> Optional[float]:
"""
解析rclone进度输出
"""
if not line:
return None
line = line.strip()
# 检查是否包含百分比
if '%' not in line:
return None
try:
# 尝试多种进度输出格式
if 'ETA' in line:
# 格式: "Transferred: 1.234M / 5.678M, 22%, 1.234MB/s, ETA 2m3s"
percent_str = line.split('%')[0].split()[-1]
return float(percent_str)
elif 'Transferred:' in line and '100%' in line:
# 传输完成
return 100.0
else:
# 其他包含百分比的格式
parts = line.split()
for part in parts:
if '%' in part:
percent_str = part.replace('%', '')
return float(percent_str)
except (ValueError, IndexError):
pass
return None
def __get_rcloneitem(self, item: dict, parent: Optional[str] = "/") -> schemas.FileItem:
"""
获取rclone文件项
@@ -238,47 +273,115 @@ class Rclone(StorageBase):
def download(self, fileitem: schemas.FileItem, path: Path = None) -> Optional[Path]:
"""
下载文件
带实时进度显示的下载
"""
path = (path or settings.TEMP_PATH) / fileitem.name
local_path = (path or settings.TEMP_PATH) / fileitem.name
# 初始化进度条
logger.info(f"【rclone】开始下载: {fileitem.name} -> {local_path}")
progress_callback = transfer_process(Path(fileitem.path).as_posix())
try:
retcode = subprocess.run(
# 使用rclone的进度显示功能
process = subprocess.Popen(
[
'rclone', 'copyto',
'--progress', # 启用进度显示
'--stats', '1s', # 每秒更新一次统计信息
f'MP:{fileitem.path}',
f'{path}'
f'{local_path}'
],
startupinfo=self.__get_hidden_shell()
).returncode
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
startupinfo=self.__get_hidden_shell(),
universal_newlines=True,
bufsize=1
)
# 监控进度输出
last_progress = 0
for line in process.stdout:
if line:
# 解析rclone的进度输出
progress = self.__parse_rclone_progress(line)
if progress is not None and progress > last_progress:
progress_callback(progress)
last_progress = progress
if progress >= 100:
break
# 等待进程完成
retcode = process.wait()
if retcode == 0:
return path
logger.info(f"【rclone】下载完成: {fileitem.name}")
return local_path
else:
logger.error(f"【rclone】下载失败: {fileitem.name}")
return None
except Exception as err:
logger.error(f"【rclone】复制文件失败:{err}")
return None
logger.error(f"【rclone】下载失败: {fileitem.name} - {err}")
# 删除可能部分下载的文件
if local_path.exists():
local_path.unlink()
return None
def upload(self, fileitem: schemas.FileItem, path: Path,
new_name: Optional[str] = None) -> Optional[schemas.FileItem]:
"""
上传文件
带实时进度显示的上传
:param fileitem: 上传目录项
:param path: 本地文件路径
:param new_name: 上传后文件名
"""
target_name = new_name or path.name
new_path = Path(fileitem.path) / target_name
# 初始化进度条
logger.info(f"【rclone】开始上传: {path} -> {new_path}")
progress_callback = transfer_process(path.as_posix())
try:
new_path = Path(fileitem.path) / (new_name or path.name)
retcode = subprocess.run(
# 使用rclone的进度显示功能
process = subprocess.Popen(
[
'rclone', 'copyto',
'--progress', # 启用进度显示
'--stats', '1s', # 每秒更新一次统计信息
path.as_posix(),
f'MP:{new_path}'
],
startupinfo=self.__get_hidden_shell()
).returncode
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
startupinfo=self.__get_hidden_shell(),
universal_newlines=True,
bufsize=1
)
# 监控进度输出
last_progress = 0
for line in process.stdout:
if line:
# 解析rclone的进度输出
progress = self.__parse_rclone_progress(line)
if progress is not None and progress > last_progress:
progress_callback(progress)
last_progress = progress
if progress >= 100:
break
# 等待进程完成
retcode = process.wait()
if retcode == 0:
logger.info(f"【rclone】上传完成: {target_name}")
return self.get_item(new_path)
else:
logger.error(f"【rclone】上传失败: {target_name}")
return None
except Exception as err:
logger.error(f"【rclone】上传文件失败:{err}")
return None
logger.error(f"【rclone】上传失败: {target_name} - {err}")
return None
def detail(self, fileitem: schemas.FileItem) -> Optional[schemas.FileItem]:
"""
@@ -307,20 +410,53 @@ class Rclone(StorageBase):
:param path: 目标目录
:param new_name: 新文件名
"""
target_path = path / new_name
# 初始化进度条
logger.info(f"【rclone】开始移动: {fileitem.path} -> {target_path}")
progress_callback = transfer_process(Path(fileitem.path).as_posix())
try:
retcode = subprocess.run(
# 使用rclone的进度显示功能
process = subprocess.Popen(
[
'rclone', 'moveto',
'--progress', # 启用进度显示
'--stats', '1s', # 每秒更新一次统计信息
f'MP:{fileitem.path}',
f'MP:{path / new_name}'
f'MP:{target_path}'
],
startupinfo=self.__get_hidden_shell()
).returncode
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
startupinfo=self.__get_hidden_shell(),
universal_newlines=True,
bufsize=1
)
# 监控进度输出
last_progress = 0
for line in process.stdout:
if line:
# 解析rclone的进度输出
progress = self.__parse_rclone_progress(line)
if progress is not None and progress > last_progress:
progress_callback(progress)
last_progress = progress
if progress >= 100:
break
# 等待进程完成
retcode = process.wait()
if retcode == 0:
logger.info(f"【rclone】移动完成: {fileitem.name}")
return True
else:
logger.error(f"【rclone】移动失败: {fileitem.name}")
return False
except Exception as err:
logger.error(f"【rclone】移动文件失败:{err}")
return False
logger.error(f"【rclone】移动失败: {fileitem.name} - {err}")
return False
def copy(self, fileitem: schemas.FileItem, path: Path, new_name: str) -> bool:
"""
@@ -329,20 +465,53 @@ class Rclone(StorageBase):
:param path: 目标目录
:param new_name: 新文件名
"""
target_path = path / new_name
# 初始化进度条
logger.info(f"【rclone】开始复制: {fileitem.path} -> {target_path}")
progress_callback = transfer_process(Path(fileitem.path).as_posix())
try:
retcode = subprocess.run(
# 使用rclone的进度显示功能
process = subprocess.Popen(
[
'rclone', 'copyto',
'--progress', # 启用进度显示
'--stats', '1s', # 每秒更新一次统计信息
f'MP:{fileitem.path}',
f'MP:{path / new_name}'
f'MP:{target_path}'
],
startupinfo=self.__get_hidden_shell()
).returncode
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
startupinfo=self.__get_hidden_shell(),
universal_newlines=True,
bufsize=1
)
# 监控进度输出
last_progress = 0
for line in process.stdout:
if line:
# 解析rclone的进度输出
progress = self.__parse_rclone_progress(line)
if progress is not None and progress > last_progress:
progress_callback(progress)
last_progress = progress
if progress >= 100:
break
# 等待进程完成
retcode = process.wait()
if retcode == 0:
logger.info(f"【rclone】复制完成: {fileitem.name}")
return True
else:
logger.error(f"【rclone】复制失败: {fileitem.name}")
return False
except Exception as err:
logger.error(f"【rclone】复制文件失败:{err}")
return False
logger.error(f"【rclone】复制失败: {fileitem.name} - {err}")
return False
def link(self, fileitem: schemas.FileItem, target_file: Path) -> bool:
pass

View File

@@ -8,9 +8,10 @@ from smbclient import ClientConfig, register_session, reset_connection_cache
from smbprotocol.exceptions import SMBException, SMBResponseException, SMBAuthenticationError
from app import schemas
from app.core.config import settings
from app.core.config import settings, global_vars
from app.log import logger
from app.modules.filemanager import StorageBase
from app.modules.filemanager.storages import transfer_process
from app.schemas.types import StorageSchema
from app.utils.singleton import WeakSingleton
@@ -38,6 +39,9 @@ class SMB(StorageBase, metaclass=WeakSingleton):
"copy": "复制",
}
# 文件块大小默认100MB
chunk_size = 100 * 1024 * 1024
def __init__(self):
super().__init__()
self._connected = False
@@ -412,63 +416,99 @@ class SMB(StorageBase, metaclass=WeakSingleton):
def download(self, fileitem: schemas.FileItem, path: Path = None) -> Optional[Path]:
"""
下载文件
带实时进度显示的下载
"""
local_path = path or settings.TEMP_PATH / fileitem.name
smb_path = self._normalize_path(fileitem.path)
try:
self._check_connection()
smb_path = self._normalize_path(fileitem.path)
local_path = path or settings.TEMP_PATH / fileitem.name
# 确保本地目录存在
local_path.parent.mkdir(parents=True, exist_ok=True)
# 获取文件大小
file_size = fileitem.size
# 初始化进度条
logger.info(f"【SMB】开始下载: {fileitem.name} -> {local_path}")
progress_callback = transfer_process(Path(fileitem.path).as_posix())
# 使用更高效的文件传输方式
with smbclient.open_file(smb_path, mode="rb") as src_file:
with open(local_path, "wb") as dst_file:
# 使用更大的缓冲区提高性能
buffer_size = 1024 * 1024 # 1MB
downloaded_size = 0
while True:
chunk = src_file.read(buffer_size)
if global_vars.is_transfer_stopped(fileitem.path):
logger.info(f"【SMB】{fileitem.path} 下载已取消!")
return None
chunk = src_file.read(self.chunk_size)
if not chunk:
break
dst_file.write(chunk)
downloaded_size += len(chunk)
# 更新进度
if file_size:
progress = (downloaded_size * 100) / file_size
progress_callback(progress)
logger.info(f"【SMB】下载成功: {fileitem.path} -> {local_path}")
# 完成下载
progress_callback(100)
logger.info(f"【SMB】下载完成: {fileitem.name}")
return local_path
except Exception as e:
logger.error(f"【SMB】下载失败: {e}")
logger.error(f"【SMB】下载失败: {fileitem.name} - {e}")
# 删除可能部分下载的文件
if local_path.exists():
local_path.unlink()
return None
def upload(self, fileitem: schemas.FileItem, path: Path,
new_name: Optional[str] = None) -> Optional[schemas.FileItem]:
"""
上传文件
带实时进度显示的上传
"""
target_name = new_name or path.name
target_path = Path(fileitem.path) / target_name
smb_path = self._normalize_path(str(target_path))
try:
self._check_connection()
target_name = new_name or path.name
target_path = Path(fileitem.path) / target_name
smb_path = self._normalize_path(str(target_path))
# 获取文件大小
file_size = path.stat().st_size
# 初始化进度条
logger.info(f"【SMB】开始上传: {path} -> {target_path}")
progress_callback = transfer_process(path.as_posix())
# 使用更高效的文件传输方式
with open(path, "rb") as src_file:
with smbclient.open_file(smb_path, mode="wb") as dst_file:
# 使用更大的缓冲区提高性能
buffer_size = 1024 * 1024 # 1MB
uploaded_size = 0
while True:
chunk = src_file.read(buffer_size)
if global_vars.is_transfer_stopped(path.as_posix()):
logger.info(f"【SMB】{path} 上传已取消!")
return None
chunk = src_file.read(self.chunk_size)
if not chunk:
break
dst_file.write(chunk)
uploaded_size += len(chunk)
# 更新进度
if file_size:
progress = (uploaded_size * 100) / file_size
progress_callback(progress)
logger.info(f"【SMB】上传成功: {path} -> {target_path}")
# 完成上传
progress_callback(100)
logger.info(f"【SMB】上传完成: {target_name}")
# 返回上传后的文件信息
return self.get_item(target_path)
except Exception as e:
logger.error(f"【SMB】上传失败: {e}")
logger.error(f"【SMB】上传失败: {target_name} - {e}")
return None
def copy(self, fileitem: schemas.FileItem, path: Path, new_name: str) -> bool:

View File

@@ -1,6 +1,5 @@
import base64
import hashlib
import io
import secrets
import threading
import time
@@ -11,12 +10,12 @@ import oss2
import requests
from oss2 import SizedFileAdapter, determine_part_size
from oss2.models import PartInfo
from tqdm import tqdm
from app import schemas
from app.core.config import settings
from app.core.config import settings, global_vars
from app.log import logger
from app.modules.filemanager import StorageBase
from app.modules.filemanager.storages import transfer_process
from app.schemas.types import StorageSchema
from app.utils.singleton import WeakSingleton
from app.utils.string import StringUtils
@@ -44,6 +43,9 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
# 基础url
base_url = "https://proapi.115.com"
# 文件块大小默认10MB
chunk_size = 10 * 1024 * 1024
def __init__(self):
super().__init__()
self._auth_state = {}
@@ -203,10 +205,15 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
# 检查会话
self._check_session()
resp = self.session.request(
method, f"{self.base_url}{endpoint}",
**kwargs
)
try:
resp = self.session.request(
method, f"{self.base_url}{endpoint}",
**kwargs
)
except requests.exceptions.RequestException as e:
logger.error(f"【115】{method} 请求 {endpoint} 网络错误: {str(e)}")
return None
if resp is None:
logger.warn(f"【115】{method} 请求 {endpoint} 失败!")
return None
@@ -352,29 +359,6 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
modify_time=int(time.time())
)
@staticmethod
def _log_progress(desc: str, total: int) -> tqdm:
"""
创建一个可以输出到日志的进度条
"""
class TqdmToLogger(io.StringIO):
def write(s, buf): # noqa
buf = buf.strip('\r\n\t ')
if buf:
logger.info(buf)
return tqdm(
total=total,
unit='B',
unit_scale=True,
desc=desc,
file=TqdmToLogger(),
mininterval=1.0,
maxinterval=5.0,
miniters=1
)
def upload(self, target_dir: schemas.FileItem, local_path: Path,
new_name: Optional[str] = None) -> Optional[schemas.FileItem]:
"""
@@ -539,13 +523,7 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
# 初始化进度条
logger.info(f"【115】开始上传: {local_path} -> {target_path},分片大小:{StringUtils.str_filesize(part_size)}")
progress_bar = tqdm(
total=file_size,
unit='B',
unit_scale=True,
desc="上传进度",
ascii=True
)
progress_callback = transfer_process(local_path.as_posix())
# 初始化分片
upload_id = bucket.init_multipart_upload(object_name,
@@ -559,6 +537,9 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
part_number = 1
offset = 0
while offset < file_size:
if global_vars.is_transfer_stopped(local_path.as_posix()):
logger.info(f"【115】{local_path} 上传已取消!")
return None
num_to_upload = min(part_size, file_size - offset)
# 调用SizedFileAdapter(fileobj, size)方法会生成一个新的文件对象,重新计算起始追加位置。
logger.info(f"【115】开始上传 {target_name} 分片 {part_number}: {offset} -> {offset + num_to_upload}")
@@ -569,11 +550,11 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
offset += num_to_upload
part_number += 1
# 更新进度
progress_bar.update(num_to_upload)
progress = (offset * 100) / file_size
progress_callback(progress)
# 关闭进度条
if progress_bar:
progress_bar.close()
# 完成上传
progress_callback(100)
# 请求头
headers = {
@@ -601,11 +582,13 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
def download(self, fileitem: schemas.FileItem, path: Path = None) -> Optional[Path]:
"""
限速处理的下载
实时进度显示的下载
"""
detail = self.get_item(Path(fileitem.path))
if not detail:
logger.error(f"【115】获取文件详情失败: {fileitem.name}")
return None
download_info = self._request_api(
"POST",
"/open/ufile/downurl",
@@ -615,14 +598,58 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
}
)
if not download_info:
logger.error(f"【115】获取下载链接失败: {fileitem.name}")
return None
download_url = list(download_info.values())[0].get("url", {}).get("url")
if not download_url:
logger.error(f"【115】下载链接为空: {fileitem.name}")
return None
local_path = path or settings.TEMP_PATH / fileitem.name
with self.session.get(download_url, stream=True) as r:
r.raise_for_status()
with open(local_path, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
# 获取文件大小
file_size = detail.size
# 初始化进度条
logger.info(f"【115】开始下载: {fileitem.name} -> {local_path}")
progress_callback = transfer_process(Path(fileitem.path).as_posix())
try:
with self.session.get(download_url, stream=True) as r:
r.raise_for_status()
downloaded_size = 0
with open(local_path, "wb") as f:
for chunk in r.iter_content(chunk_size=self.chunk_size):
if global_vars.is_transfer_stopped(fileitem.path):
logger.info(f"【115】{fileitem.path} 下载已取消!")
return None
if chunk:
f.write(chunk)
downloaded_size += len(chunk)
# 更新进度
if file_size:
progress = (downloaded_size * 100) / file_size
progress_callback(progress)
# 完成下载
progress_callback(100)
logger.info(f"【115】下载完成: {fileitem.name}")
except requests.exceptions.RequestException as e:
logger.error(f"【115】下载网络错误: {fileitem.name} - {str(e)}")
# 删除可能部分下载的文件
if local_path.exists():
local_path.unlink()
return None
except Exception as e:
logger.error(f"【115】下载失败: {fileitem.name} - {str(e)}")
# 删除可能部分下载的文件
if local_path.exists():
local_path.unlink()
return None
return local_path
def check(self) -> bool:

View File

@@ -85,7 +85,7 @@ class HaiDanSpider:
categories = self._movie_category
# 搜索类型
if keyword.startswith('tt'):
if keyword and keyword.startswith('tt'):
search_area = '4'
else:
search_area = '0'

View File

@@ -125,12 +125,12 @@ class TMDb(object):
def cache(self, cache):
self._cache_enabled = bool(cache)
@cached(maxsize=settings.CONF.tmdb, ttl=settings.CONF.meta)
@cached(maxsize=settings.CONF.tmdb, ttl=settings.CONF.meta, skip_none=True)
def cached_request(self, method, url, data, json,
_ts=datetime.strftime(datetime.now(), '%Y%m%d')):
return self.request(method, url, data, json)
@cached(maxsize=settings.CONF.tmdb, ttl=settings.CONF.meta)
@cached(maxsize=settings.CONF.tmdb, ttl=settings.CONF.meta, skip_none=True)
async def async_cached_request(self, method, url, data, json,
_ts=datetime.strftime(datetime.now(), '%Y%m%d')):
return await self.async_request(method, url, data, json)

View File

@@ -7,6 +7,8 @@ import json
import urllib.parse
from http import HTTPStatus
from app.core.cache import cached
from app.core.config import settings
from app.utils.http import RequestUtils
@@ -15,7 +17,7 @@ class Auth:
TVDB认证类
"""
def __init__(self, url, apikey, pin="", proxy=None, timeout: int = 15):
def __init__(self, url: str, apikey: str, pin: str = "", proxy: dict = None, timeout: int = 15):
login_info = {"apikey": apikey}
if pin != "":
login_info["pin"] = pin
@@ -35,13 +37,14 @@ class Auth:
result = response.json()
self.token = result["data"]["token"]
else:
error_msg = f"登录失败,状态码: {response.status_code if response else 'None'}"
if response:
if response is not None:
try:
error_data = response.json()
error_msg = f"Code: {response.status_code}, {error_data.get('message', '未知错误')}"
except Exception as err:
error_msg = f"Code: {response.status_code}, 响应解析失败:{err}"
else:
error_msg = "网络连接失败,未收到响应"
raise Exception(error_msg)
except Exception as e:
raise Exception(f"TVDB认证失败: {str(e)}")
@@ -58,13 +61,14 @@ class Request:
请求处理类
"""
def __init__(self, auth_token, proxy=None, timeout=15):
def __init__(self, auth_token: str, proxy: dict = None, timeout: int = 15):
self.auth_token = auth_token
self.links = None
self.proxy = proxy
self.timeout = timeout
def make_request(self, url, if_modified_since=None):
@cached(maxsize=settings.CONF.tmdb, ttl=settings.CONF.meta, skip_none=True)
def make_request(self, url: str, if_modified_since: bool = None):
"""
向指定的 URL 发起请求并返回数据
"""
@@ -118,7 +122,8 @@ class Url:
def __init__(self):
self.base_url = "https://api4.thetvdb.com/v4/"
def construct(self, url_sect, url_id=None, url_subsect=None, url_lang=None, **kwargs):
def construct(self, url_sect: str, url_id: int = None,
url_subsect: str = None, url_lang: str = None, **kwargs):
"""
构建API URL
"""
@@ -141,7 +146,7 @@ class TVDB:
TVDB API主类
"""
def __init__(self, apikey: str, pin="", proxy=None, timeout: int = 15):
def __init__(self, apikey: str, pin: str = "", proxy: dict = None, timeout: int = 15):
self.url = Url()
login_url = self.url.construct("login")
self.auth = Auth(login_url, apikey, pin, proxy, timeout)
@@ -154,126 +159,126 @@ class TVDB:
"""
return self.request.links
def get_artwork_statuses(self, meta=None, if_modified_since=None) -> list:
def get_artwork_statuses(self, meta: str = None, if_modified_since: bool = None) -> list:
"""
返回艺术图状态列表
"""
url = self.url.construct("artwork/statuses", meta=meta)
return self.request.make_request(url, if_modified_since)
def get_artwork_types(self, meta=None, if_modified_since=None) -> list:
def get_artwork_types(self, meta: str = None, if_modified_since: bool = None) -> list:
"""
返回艺术图类型列表
"""
url = self.url.construct("artwork/types", meta=meta)
return self.request.make_request(url, if_modified_since)
def get_artwork(self, id: int, meta=None, if_modified_since=None) -> dict:
def get_artwork(self, id: int, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回单个艺术图信息的字典
"""
url = self.url.construct("artwork", id, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_artwork_extended(self, id: int, meta=None, if_modified_since=None) -> dict:
def get_artwork_extended(self, id: int, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回单个艺术图的扩展信息字典
"""
url = self.url.construct("artwork", id, "extended", meta=meta)
return self.request.make_request(url, if_modified_since)
def get_all_awards(self, meta=None, if_modified_since=None) -> list:
def get_all_awards(self, meta: str = None, if_modified_since: bool = None) -> list:
"""
返回奖项列表
"""
url = self.url.construct("awards", meta=meta)
return self.request.make_request(url, if_modified_since)
def get_award(self, id: int, meta=None, if_modified_since=None) -> dict:
def get_award(self, id: int, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回单个奖项信息的字典
"""
url = self.url.construct("awards", id, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_award_extended(self, id: int, meta=None, if_modified_since=None) -> dict:
def get_award_extended(self, id: int, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回单个奖项的扩展信息字典
"""
url = self.url.construct("awards", id, "extended", meta=meta)
return self.request.make_request(url, if_modified_since)
def get_all_award_categories(self, meta=None, if_modified_since=None) -> list:
def get_all_award_categories(self, meta: str = None, if_modified_since: bool = None) -> list:
"""
返回奖项类别列表
"""
url = self.url.construct("awards/categories", meta=meta)
return self.request.make_request(url, if_modified_since)
def get_award_category(self, id: int, meta=None, if_modified_since=None) -> dict:
def get_award_category(self, id: int, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回单个奖项类别信息的字典
"""
url = self.url.construct("awards/categories", id, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_award_category_extended(self, id: int, meta=None, if_modified_since=None) -> dict:
def get_award_category_extended(self, id: int, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回单个奖项类别的扩展信息字典
"""
url = self.url.construct("awards/categories", id, "extended", meta=meta)
return self.request.make_request(url, if_modified_since)
def get_content_ratings(self, meta=None, if_modified_since=None) -> list:
def get_content_ratings(self, meta: str = None, if_modified_since: bool = None) -> list:
"""
返回内容分级列表
"""
url = self.url.construct("content/ratings", meta=meta)
return self.request.make_request(url, if_modified_since)
def get_countries(self, meta=None, if_modified_since=None) -> list:
def get_countries(self, meta: str = None, if_modified_since: bool = None) -> list:
"""
返回国家列表
"""
url = self.url.construct("countries", meta=meta)
return self.request.make_request(url, if_modified_since)
def get_all_companies(self, page=None, meta=None, if_modified_since=None) -> list:
def get_all_companies(self, page: int = None, meta: str = None, if_modified_since: bool = None) -> list:
"""
返回公司列表 (可分页)
"""
url = self.url.construct("companies", page=page, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_company_types(self, meta=None, if_modified_since=None) -> list:
def get_company_types(self, meta: str = None, if_modified_since: bool = None) -> list:
"""
返回公司类型列表
"""
url = self.url.construct("companies/types", meta=meta)
return self.request.make_request(url, if_modified_since)
def get_company(self, id: int, meta=None, if_modified_since=None) -> dict:
def get_company(self, id: int, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回单个公司信息的字典
"""
url = self.url.construct("companies", id, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_all_series(self, page=None, meta=None, if_modified_since=None) -> list:
def get_all_series(self, page: int = None, meta: str = None, if_modified_since: bool = None) -> list:
"""
返回剧集列表 (可分页)
"""
url = self.url.construct("series", page=page, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_series(self, id: int, meta=None, if_modified_since=None) -> dict:
def get_series(self, id: int, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回单个剧集信息的字典
"""
url = self.url.construct("series", id, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_series_by_slug(self, slug: str, meta=None, if_modified_since=None) -> dict:
def get_series_by_slug(self, slug: str, meta: str = None, if_modified_since: bool = None) -> dict:
"""
通过 slug (别名) 返回单个剧集信息的字典
"""
@@ -288,7 +293,7 @@ class TVDB:
return self.request.make_request(url, if_modified_since)
def get_series_episodes(self, id: int, season_type: str = "default", page: int = 0,
lang: str = None, meta=None, if_modified_since=None, **kwargs) -> dict:
lang: str = None, meta: str = None, if_modified_since: bool = None, **kwargs) -> dict:
"""
返回指定剧集和季类型的各集信息字典 (可分页,可指定语言)
"""
@@ -297,7 +302,7 @@ class TVDB:
)
return self.request.make_request(url, if_modified_since)
def get_series_translation(self, id: int, lang: str, meta=None, if_modified_since=None) -> dict:
def get_series_translation(self, id: int, lang: str, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回剧集的指定语言翻译信息字典
"""
@@ -318,21 +323,21 @@ class TVDB:
url = self.url.construct("series", id, "nextAired")
return self.request.make_request(url, if_modified_since)
def get_all_movies(self, page=None, meta=None, if_modified_since=None) -> list:
def get_all_movies(self, page: int = None, meta: str = None, if_modified_since: bool = None) -> list:
"""
返回电影列表 (可分页)
"""
url = self.url.construct("movies", page=page, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_movie(self, id: int, meta=None, if_modified_since=None) -> dict:
def get_movie(self, id: int, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回单个电影信息的字典
"""
url = self.url.construct("movies", id, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_movie_by_slug(self, slug: str, meta=None, if_modified_since=None) -> dict:
def get_movie_by_slug(self, slug: str, meta: str = None, if_modified_since: bool = None) -> dict:
"""
通过 slug (别名) 返回单个电影信息的字典
"""
@@ -346,70 +351,70 @@ class TVDB:
url = self.url.construct("movies", id, "extended", meta=meta, short=short)
return self.request.make_request(url, if_modified_since)
def get_movie_translation(self, id: int, lang: str, meta=None, if_modified_since=None) -> dict:
def get_movie_translation(self, id: int, lang: str, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回电影的指定语言翻译信息字典
"""
url = self.url.construct("movies", id, "translations", lang, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_all_seasons(self, page=None, meta=None, if_modified_since=None) -> list:
def get_all_seasons(self, page: int = None, meta: str = None, if_modified_since: bool = None) -> list:
"""
返回季列表 (可分页)
"""
url = self.url.construct("seasons", page=page, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_season(self, id: int, meta=None, if_modified_since=None) -> dict:
def get_season(self, id: int, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回单季信息的字典
"""
url = self.url.construct("seasons", id, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_season_extended(self, id: int, meta=None, if_modified_since=None) -> dict:
def get_season_extended(self, id: int, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回单季的扩展信息字典
"""
url = self.url.construct("seasons", id, "extended", meta=meta)
return self.request.make_request(url, if_modified_since)
def get_season_types(self, meta=None, if_modified_since=None) -> list:
def get_season_types(self, meta: str = None, if_modified_since: bool = None) -> list:
"""
返回季类型列表
"""
url = self.url.construct("seasons/types", meta=meta)
return self.request.make_request(url, if_modified_since)
def get_season_translation(self, id: int, lang: str, meta=None, if_modified_since=None) -> dict:
def get_season_translation(self, id: int, lang: str, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回季的指定语言翻译信息字典
"""
url = self.url.construct("seasons", id, "translations", lang, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_all_episodes(self, page=None, meta=None, if_modified_since=None) -> list:
def get_all_episodes(self, page: int = None, meta: str = None, if_modified_since: bool = None) -> list:
"""
返回集列表 (可分页)
"""
url = self.url.construct("episodes", page=page, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_episode(self, id: int, meta=None, if_modified_since=None) -> dict:
def get_episode(self, id: int, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回单集信息的字典
"""
url = self.url.construct("episodes", id, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_episode_extended(self, id: int, meta=None, if_modified_since=None) -> dict:
def get_episode_extended(self, id: int, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回单集的扩展信息字典
"""
url = self.url.construct("episodes", id, "extended", meta=meta)
return self.request.make_request(url, if_modified_since)
def get_episode_translation(self, id: int, lang: str, meta=None, if_modified_since=None) -> dict:
def get_episode_translation(self, id: int, lang: str, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回单集的指定语言翻译信息字典
"""
@@ -419,70 +424,70 @@ class TVDB:
# 兼容旧函数名。
get_episodes_translation = get_episode_translation
def get_all_genders(self, meta=None, if_modified_since=None) -> list:
def get_all_genders(self, meta: str = None, if_modified_since: bool = None) -> list:
"""
返回性别列表
"""
url = self.url.construct("genders", meta=meta)
return self.request.make_request(url, if_modified_since)
def get_all_genres(self, meta=None, if_modified_since=None) -> list:
def get_all_genres(self, meta: str = None, if_modified_since: bool = None) -> list:
"""
返回类型(流派)列表
"""
url = self.url.construct("genres", meta=meta)
return self.request.make_request(url, if_modified_since)
def get_genre(self, id: int, meta=None, if_modified_since=None) -> dict:
def get_genre(self, id: int, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回单个类型(流派)信息的字典
"""
url = self.url.construct("genres", id, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_all_languages(self, meta=None, if_modified_since=None) -> list:
def get_all_languages(self, meta: str = None, if_modified_since: bool = None) -> list:
"""
返回语言列表
"""
url = self.url.construct("languages", meta=meta)
return self.request.make_request(url, if_modified_since)
def get_all_people(self, page=None, meta=None, if_modified_since=None) -> list:
def get_all_people(self, page: int = None, meta: str = None, if_modified_since: bool = None) -> list:
"""
返回人物列表 (可分页)
"""
url = self.url.construct("people", page=page, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_person(self, id: int, meta=None, if_modified_since=None) -> dict:
def get_person(self, id: int, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回单个人物信息的字典
"""
url = self.url.construct("people", id, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_person_extended(self, id: int, meta=None, if_modified_since=None) -> dict:
def get_person_extended(self, id: int, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回单个人物的扩展信息字典
"""
url = self.url.construct("people", id, "extended", meta=meta)
return self.request.make_request(url, if_modified_since)
def get_person_translation(self, id: int, lang: str, meta=None, if_modified_since=None) -> dict:
def get_person_translation(self, id: int, lang: str, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回人物的指定语言翻译信息字典
"""
url = self.url.construct("people", id, "translations", lang, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_character(self, id: int, meta=None, if_modified_since=None) -> dict:
def get_character(self, id: int, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回角色信息的字典
"""
url = self.url.construct("characters", id, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_people_types(self, meta=None, if_modified_since=None) -> list:
def get_people_types(self, meta: str = None, if_modified_since: bool = None) -> list:
"""
返回人物类型列表
"""
@@ -492,7 +497,7 @@ class TVDB:
# 兼容旧函数名
get_all_people_types = get_people_types
def get_source_types(self, meta=None, if_modified_since=None) -> list:
def get_source_types(self, meta: str = None, if_modified_since: bool = None) -> list:
"""
返回来源类型列表
"""
@@ -509,56 +514,56 @@ class TVDB:
url = self.url.construct("updates", since=since, **kwargs)
return self.request.make_request(url)
def get_all_tag_options(self, page=None, meta=None, if_modified_since=None) -> list:
def get_all_tag_options(self, page: int = None, meta: str = None, if_modified_since: bool = None) -> list:
"""
返回标签选项列表 (可分页)
"""
url = self.url.construct("tags/options", page=page, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_tag_option(self, id: int, meta=None, if_modified_since=None) -> dict:
def get_tag_option(self, id: int, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回单个标签选项信息的字典
"""
url = self.url.construct("tags/options", id, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_all_lists(self, page=None, meta=None) -> dict:
def get_all_lists(self, page: int = None, meta=None) -> dict:
"""
返回所有公开的列表信息 (可分页)
"""
url = self.url.construct("lists", page=page, meta=meta)
return self.request.make_request(url)
def get_list(self, id: int, meta=None, if_modified_since=None) -> dict:
def get_list(self, id: int, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回单个列表信息的字典
"""
url = self.url.construct("lists", id, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_list_by_slug(self, slug: str, meta=None, if_modified_since=None) -> dict:
def get_list_by_slug(self, slug: str, meta: str = None, if_modified_since: bool = None) -> dict:
"""
通过 slug (别名) 返回单个列表信息的字典
"""
url = self.url.construct("lists/slug", slug, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_list_extended(self, id: int, meta=None, if_modified_since=None) -> dict:
def get_list_extended(self, id: int, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回单个列表的扩展信息字典
"""
url = self.url.construct("lists", id, "extended", meta=meta)
return self.request.make_request(url, if_modified_since)
def get_list_translation(self, id: int, lang: str, meta=None, if_modified_since=None) -> dict:
def get_list_translation(self, id: int, lang: str, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回列表的指定语言翻译信息字典
"""
url = self.url.construct("lists", id, "translations", lang, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_inspiration_types(self, meta=None, if_modified_since=None) -> dict:
def get_inspiration_types(self, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回灵感类型列表
"""

View File

@@ -1,7 +1,6 @@
import json
import platform
import re
import subprocess
import threading
import time
import traceback
@@ -10,13 +9,13 @@ from threading import Lock
from typing import Any, Optional, Dict, List
from apscheduler.schedulers.background import BackgroundScheduler
from app.core.cache import TTLCache, FileCache
from watchdog.events import FileSystemEventHandler, FileSystemMovedEvent, FileSystemEvent
from watchdog.observers.polling import PollingObserver
from app.chain import ChainBase
from app.chain.storage import StorageChain
from app.chain.transfer import TransferChain
from app.core.cache import TTLCache, FileCache
from app.core.config import settings
from app.core.event import Event, eventmanager
from app.helper.directory import DirectoryHelper
@@ -25,7 +24,8 @@ from app.log import logger
from app.schemas import ConfigChangeEventData
from app.schemas import FileItem
from app.schemas.types import SystemConfigKey, EventType
from app.utils.singleton import Singleton
from app.utils.singleton import SingletonClass
from app.utils.system import SystemUtils
lock = Lock()
snapshot_lock = Lock()
@@ -54,7 +54,7 @@ class FileMonitorHandler(FileSystemEventHandler):
file_size=Path(event.dest_path).stat().st_size)
class Monitor(metaclass=Singleton):
class Monitor(metaclass=SingletonClass):
"""
目录监控处理链,单例模式
"""
@@ -355,7 +355,8 @@ class Monitor(metaclass=Singleton):
return tips
def should_use_polling(self, directory: Path, monitor_mode: str,
@staticmethod
def should_use_polling(directory: Path, monitor_mode: str,
file_count: int, limits: dict) -> tuple[bool, str]:
"""
判断是否应该使用轮询模式
@@ -369,7 +370,7 @@ class Monitor(metaclass=Singleton):
return True, "用户配置为兼容模式"
# 检查网络文件系统
if self.is_network_filesystem(directory):
if SystemUtils.is_network_filesystem(directory):
return True, "检测到网络文件系统,建议使用兼容模式"
max_watches = limits.get('max_user_watches')
@@ -377,45 +378,6 @@ class Monitor(metaclass=Singleton):
return True, f"目录文件数量({file_count})接近系统限制({max_watches})"
return False, "使用快速模式"
@staticmethod
def is_network_filesystem(directory: Path) -> bool:
"""
检测是否为网络文件系统
:param directory: 目录路径
:return: 是否为网络文件系统
"""
try:
system = platform.system()
if system == 'Linux':
# 检查挂载信息
result = subprocess.run(['df', '-T', str(directory)],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
output = result.stdout.lower()
# 以下本地文件系统含有fuse关键字
local_fs = [
"fuse.shfs", # Unraid
"zfuse.zfsv", # 极空间(zfuse.zfsv2、zfuse.zfsv3、...)
# TBD
]
if any(fs in output for fs in local_fs):
return False
network_fs = ['nfs', 'cifs', 'smbfs', 'fuse', 'sshfs', 'ftpfs']
return any(fs in output for fs in network_fs)
elif system == 'Darwin':
# macOS 检查
result = subprocess.run(['df', '-T', str(directory)],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
output = result.stdout.lower()
return 'nfs' in output or 'smbfs' in output
elif system == 'Windows':
# Windows 检查网络驱动器
return str(directory).startswith('\\\\')
except Exception as e:
logger.debug(f"检测网络文件系统时出错: {e}")
return False
def init(self):
"""
启动监控

View File

@@ -78,7 +78,7 @@ class FastAPIMonitor:
# 告警状态
self.alerts: List[str] = []
logger.info("FastAPI性能监控器已初始化")
logger.debug("FastAPI性能监控器已初始化")
def record_request(self, request: Request, response: Response, response_time: float):
"""
@@ -172,7 +172,7 @@ class FastAPIMonitor:
'count': 0,
'total_time': 0,
'errors': 0,
'avg_time': 0
'avg_time': 0.0
})
for req in self.request_history:

View File

@@ -1,3 +1,5 @@
import asyncio
import inspect
import threading
import traceback
from datetime import datetime, timedelta
@@ -21,13 +23,13 @@ from app.core.config import settings
from app.core.event import eventmanager, Event
from app.core.plugin import PluginManager
from app.db.systemconfig_oper import SystemConfigOper
from app.helper.message import MessageHelper
from app.helper.sites import SitesHelper # noqa
from app.helper.message import MessageHelper
from app.helper.wallpaper import WallpaperHelper
from app.log import logger
from app.schemas import Notification, NotificationType, Workflow, ConfigChangeEventData
from app.schemas.types import EventType, SystemConfigKey
from app.utils.singleton import Singleton
from app.utils.singleton import SingletonClass
from app.utils.timer import TimerUtils
lock = threading.Lock()
@@ -37,7 +39,7 @@ class SchedulerChain(ChainBase):
pass
class Scheduler(metaclass=Singleton):
class Scheduler(metaclass=SingletonClass):
"""
定时任务管理
"""
@@ -55,6 +57,8 @@ class Scheduler(metaclass=Singleton):
self._auth_count = 0
# 用户认证失败消息发送
self._auth_message = False
# 当前事件循环
self.loop = asyncio.get_event_loop()
self.init()
@eventmanager.register(EventType.ConfigChanged)
@@ -162,6 +166,19 @@ class Scheduler(metaclass=Singleton):
"name": "推荐缓存",
"func": RecommendChain().refresh_recommend,
"running": False,
},
"plugin_market_refresh": {
"name": "插件市场缓存",
"func": PluginManager().async_get_online_plugins,
"running": False,
"kwargs": {
"force": True
}
},
"subscribe_calendar_cache": {
"name": "订阅日历缓存",
"func": SubscribeChain().cache_calendar,
"running": False
}
}
@@ -180,7 +197,7 @@ class Scheduler(metaclass=Singleton):
id="cookiecloud",
name="同步CookieCloud站点",
minutes=int(settings.COOKIECLOUD_INTERVAL),
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=1),
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=5),
kwargs={
'job_id': 'cookiecloud'
}
@@ -195,7 +212,7 @@ class Scheduler(metaclass=Singleton):
id="mediaserver_sync",
name="同步媒体服务器",
hours=int(settings.MEDIASERVER_SYNC_INTERVAL),
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=5),
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=10),
kwargs={
'job_id': 'mediaserver_sync'
}
@@ -301,7 +318,7 @@ class Scheduler(metaclass=Singleton):
id="random_wallpager",
name="壁纸缓存",
minutes=30,
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(seconds=3),
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(seconds=1),
kwargs={
'job_id': 'random_wallpager'
}
@@ -363,12 +380,37 @@ class Scheduler(metaclass=Singleton):
id="recommend_refresh",
name="推荐缓存",
hours=24,
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(seconds=3),
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(seconds=5),
kwargs={
'job_id': 'recommend_refresh'
}
)
# 插件市场缓存
self._scheduler.add_job(
self.start,
"interval",
id="plugin_market_refresh",
name="插件市场缓存",
minutes=30,
kwargs={
'job_id': 'plugin_market_refresh'
}
)
# 订阅日历缓存
self._scheduler.add_job(
self.start,
"interval",
id="subscribe_calendar_cache",
name="订阅日历缓存",
hours=6,
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=2),
kwargs={
'job_id': 'subscribe_calendar_cache'
}
)
# 初始化工作流服务
self.init_workflow_jobs()
@@ -409,6 +451,13 @@ class Scheduler(metaclass=Singleton):
"""
启动定时服务
"""
def __start_coro(coro):
"""
启动协程
"""
return asyncio.run_coroutine_threadsafe(coro, self.loop)
# 获取定时任务
job = self.__prepare_job(job_id)
if not job:
@@ -417,7 +466,13 @@ class Scheduler(metaclass=Singleton):
try:
if not kwargs:
kwargs = job.get("kwargs") or {}
job["func"](*args, **kwargs)
func = job.get("func")
if not func:
return
if inspect.iscoroutinefunction(func):
__start_coro(func(*args, **kwargs))
else:
job["func"](*args, **kwargs)
except Exception as e:
logger.error(f"定时任务 {job.get('name')} 执行失败:{str(e)} - {traceback.format_exc()}")
MessageHelper().put(title=f"{job.get('name')} 执行失败",
@@ -519,7 +574,7 @@ class Scheduler(metaclass=Singleton):
except JobLookupError:
pass
if job_removed:
logger.info(f"移除插件服务({plugin_name}){service.get('name')}")
logger.info(f"移除插件服务({plugin_name}){service.get('name')}") # noqa
except Exception as e:
logger.error(f"移除插件服务失败:{str(e)} - {job_id}: {service}")
SchedulerChain().messagehelper.put(title=f"插件 {plugin_name} 服务移除失败",

View File

@@ -77,7 +77,7 @@ class SiteUserData(BaseModel):
# 用户名
username: Optional[str] = None
# 用户ID
userid: Optional[Union[int, str]] = None
userid: Optional[str] = None
# 用户等级
user_level: Optional[str] = None
# 加入时间

View File

@@ -35,10 +35,10 @@ async def lifespan(app: FastAPI):
定义应用的生命周期事件
"""
print("Starting up...")
# 初始化模块
init_modules()
# 初始化路由
init_routers(app)
# 初始化模块
init_modules()
# 恢复插件备份
SystemChain().restore_plugins()
# 初始化插件

View File

@@ -1,83 +0,0 @@
import asyncio
import threading
from concurrent.futures import ThreadPoolExecutor
from typing import Coroutine, Any, TypeVar
T = TypeVar('T')
class AsyncUtils:
"""
异步工具类,用于在同步环境中调用异步方法
"""
@staticmethod
def run_async(coro: Coroutine[Any, Any, T]) -> T:
"""
在同步环境中安全地执行异步协程
:param coro: 要执行的协程
:return: 协程的返回值
:raises: 协程执行过程中的任何异常
"""
try:
# 尝试获取当前运行的事件循环
asyncio.get_running_loop()
# 如果有运行中的事件循环,在新线程中执行
return AsyncUtils._run_in_thread(coro)
except RuntimeError:
# 没有运行中的事件循环,直接使用 asyncio.run
return asyncio.run(coro)
@staticmethod
def _run_in_thread(coro: Coroutine[Any, Any, T]) -> T:
"""
在新线程中创建事件循环并执行协程
:param coro: 要执行的协程
:return: 协程的返回值
"""
result = None
exception = None
def _run():
nonlocal result, exception
try:
# 在新线程中创建新的事件循环
new_loop = asyncio.new_event_loop()
asyncio.set_event_loop(new_loop)
try:
result = new_loop.run_until_complete(coro)
finally:
new_loop.close()
except Exception as e:
exception = e
# 在新线程中执行
thread = threading.Thread(target=_run)
thread.start()
thread.join()
if exception:
raise exception
return result
@staticmethod
def run_async_in_executor(coro: Coroutine[Any, Any, T]) -> T:
"""
使用线程池执行器在新线程中运行异步协程
:param coro: 要执行的协程
:return: 协程的返回值
"""
try:
# 检查是否有运行中的事件循环
asyncio.get_running_loop()
# 有运行中的事件循环,使用线程池
with ThreadPoolExecutor() as executor:
future = executor.submit(asyncio.run, coro)
return future.result()
except RuntimeError:
# 没有运行中的事件循环,直接运行
return asyncio.run(coro)

View File

@@ -527,6 +527,45 @@ class SystemUtils:
print(f"Error occurred: {e}")
return False
@staticmethod
def is_network_filesystem(directory: Path) -> bool:
"""
检测是否为网络文件系统
:param directory: 目录路径
:return: 是否为网络文件系统
"""
try:
system = platform.system()
if system == 'Linux':
# 检查挂载信息
result = subprocess.run(['df', '-T', str(directory)],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
output = result.stdout.lower()
# 以下本地文件系统含有fuse关键字
local_fs = [
"fuse.shfs", # Unraid
"zfuse.zfsv", # 极空间(zfuse.zfsv2、zfuse.zfsv3、...)
# TBD
]
if any(fs in output for fs in local_fs):
return False
network_fs = ['nfs', 'cifs', 'smbfs', 'fuse', 'sshfs', 'ftpfs']
return any(fs in output for fs in network_fs)
elif system == 'Darwin':
# macOS 检查
result = subprocess.run(['df', '-T', str(directory)],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
output = result.stdout.lower()
return 'nfs' in output or 'smbfs' in output
elif system == 'Windows':
# Windows 检查网络驱动器
return str(directory).startswith('\\\\')
except Exception as e:
print(f"Error occurred: {e}")
return False
@staticmethod
def is_same_disk(src: Path, dest: Path) -> bool:
"""

View File

@@ -0,0 +1,80 @@
"""2.2.1
Revision ID: a946dae52526
Revises: 5b3355c964bb
Create Date: 2025-08-20 17:50:00.000000
"""
import sqlalchemy as sa
from alembic import op
from app.log import logger
from app.core.config import settings
# revision identifiers, used by Alembic.
revision = 'a946dae52526'
down_revision = '5b3355c964bb'
branch_labels = None
depends_on = None
def upgrade() -> None:
"""
升级将SiteUserData表的userid字段从Integer改为String
"""
connection = op.get_bind()
if settings.DB_TYPE.lower() == "postgresql":
# PostgreSQL数据库迁移
migrate_postgresql_userid(connection)
def downgrade() -> None:
"""
降级将SiteUserData表的userid字段从String改回Integer
"""
pass
def migrate_postgresql_userid(connection):
"""
PostgreSQL数据库userid字段迁移
"""
try:
logger.info("开始PostgreSQL数据库userid字段迁移...")
# 1. 创建临时列
connection.execute(sa.text("""
ALTER TABLE siteuserdata
ADD COLUMN userid_new VARCHAR
"""))
# 2. 将现有数据转换为字符串并复制到新列
connection.execute(sa.text("""
UPDATE siteuserdata
SET userid_new = CAST(userid AS VARCHAR)
WHERE userid IS NOT NULL
"""))
# 3. 删除旧列
connection.execute(sa.text("""
ALTER TABLE siteuserdata
DROP COLUMN userid
"""))
# 4. 重命名新列
connection.execute(sa.text("""
ALTER TABLE siteuserdata
RENAME COLUMN userid_new TO userid
"""))
logger.info("PostgreSQL数据库userid字段迁移完成")
except Exception as e:
logger.error(f"PostgreSQL数据库userid字段迁移失败: {e}")
raise

View File

@@ -31,23 +31,34 @@ if [ "${ENABLE_SSL}" = "true" ] && \
if [ ! -d "/config/acme.sh" ]; then
INFO "→ 安装acme.sh..."
# 生成安装参数
INSTALL_ARGS=(
"--install-online"
"--home" "/config/acme.sh"
"--config-home" "/config/acme.sh/data"
"--cert-home" "/config/certs"
)
# 设置安装环境变量
export LE_WORKING_DIR="/config/acme.sh"
export LE_CONFIG_HOME="/config/acme.sh/data"
export LE_CERT_HOME="/config/certs"
# 添加邮箱参数(如果设置
# 执行官方安装命令(添加错误处理
INFO "正在下载并安装 acme.sh..."
# 构建安装命令
INSTALL_CMD="curl -sSL https://get.acme.sh | sh -s -- --install-online"
if [ -n "${SSL_EMAIL}" ]; then
INSTALL_ARGS+=("--accountemail" "${SSL_EMAIL}")
INSTALL_CMD="${INSTALL_CMD} --accountemail ${SSL_EMAIL}"
else
WARN "未设置SSL_EMAIL建议配置邮箱用于证书过期提醒"
fi
if ! eval "${INSTALL_CMD}"; then
ERROR "acme.sh 安装失败"
exit 1
fi
# 执行官方安装命令
curl -sSL https://get.acme.sh | sh -s -- "${INSTALL_ARGS[@]}"
# 验证安装是否成功
if [ ! -f "/config/acme.sh/acme.sh" ]; then
ERROR "acme.sh 安装后文件不存在,安装可能失败"
exit 1
fi
INFO "acme.sh 安装成功"
fi
# 签发证书(仅当证书不存在时)
@@ -77,17 +88,24 @@ if [ "${ENABLE_SSL}" = "true" ] && \
fi
done
# 签发证书
/config/acme.sh/acme.sh --issue \
# 签发证书(添加错误处理)
INFO "正在签发证书..."
if ! /config/acme.sh/acme.sh --issue \
--dns "${DNS_PROVIDER}" \
--domain "${SSL_DOMAIN}" \
--key-file /config/certs/"${SSL_DOMAIN}"/privkey.pem \
--fullchain-file /config/certs/"${SSL_DOMAIN}"/fullchain.pem \
--reloadcmd "nginx -s reload" \
--force
--force; then
ERROR "证书签发失败"
exit 1
fi
# 创建稳定符号链接
ln -sf /config/certs/"${SSL_DOMAIN}" /config/certs/latest
INFO "证书签发成功"
else
INFO "证书已存在,跳过签发步骤"
fi
# 配置自动更新任务
@@ -98,4 +116,12 @@ if [ "${ENABLE_SSL}" = "true" ] && \
elif [ "${ENABLE_SSL}" = "true" ] && [ "${AUTO_ISSUE_CERT}" = "true" ] && [ -z "${SSL_DOMAIN}" ]; then
WARN "已启用自动签发证书但未设置SSL_DOMAIN跳过证书管理"
elif [ "${ENABLE_SSL}" = "true" ] && [ "${AUTO_ISSUE_CERT}" = "false" ]; then
INFO "SSL已启用但自动签发证书已禁用将使用手动配置的证书"
# 检查证书文件是否存在
if [ -f "/config/certs/latest/fullchain.pem" ] && [ -f "/config/certs/latest/privkey.pem" ]; then
INFO "检测到证书文件SSL配置正常"
else
WARN "未检测到证书文件,请确保手动配置了正确的证书路径"
fi
fi

View File

@@ -1,2 +1,2 @@
APP_VERSION = 'v2.7.5'
FRONTEND_VERSION = 'v2.7.5'
APP_VERSION = 'v2.7.8'
FRONTEND_VERSION = 'v2.7.8'