Compare commits

...

26 Commits

Author SHA1 Message Date
jxxghp
2552219991 更新 version.py 2025-08-28 11:11:32 +08:00
jxxghp
a038b698d7 fix haidan 2025-08-28 09:36:19 +08:00
jxxghp
a3b222574e add thetvdb cache 2025-08-28 08:05:10 +08:00
jxxghp
e0cd467293 rollback fix #4856 2025-08-28 07:51:05 +08:00
jxxghp
9c056030d2 fix:捕促115&alipan请求异常 2025-08-27 20:21:06 +08:00
jxxghp
19efa9d4cc fix #4795 2025-08-27 16:15:45 +08:00
jxxghp
90633a6495 fix #4851 2025-08-27 15:57:43 +08:00
jxxghp
edc432fbd8 fix #4846 2025-08-27 12:45:23 +08:00
jxxghp
1b7bdbf516 fix #4834 2025-08-27 08:28:16 +08:00
jxxghp
8c1be70c85 更新 version.py 2025-08-26 12:20:16 +08:00
jxxghp
b8e0c0db9e feat:精细化事件错误 2025-08-26 08:41:47 +08:00
jxxghp
7b7fb6cc82 Merge pull request #4836 from jxxghp/cursor/alter-siteuser-data-userid-to-character-type-9f4d 2025-08-25 22:05:19 +08:00
Cursor Agent
62512ba215 Remove SQLite-specific migration code for userid field
Co-authored-by: jxxghp <jxxghp@live.cn>
2025-08-25 14:00:33 +00:00
Cursor Agent
e1beb64c01 Simplify userid conversion to integer in Synology Chat module
Co-authored-by: jxxghp <jxxghp@live.cn>
2025-08-25 13:58:15 +00:00
Cursor Agent
c81f26ddad Remove downgrade methods for PostgreSQL and SQLite userid migration
Co-authored-by: jxxghp <jxxghp@live.cn>
2025-08-25 13:56:21 +00:00
Cursor Agent
340114c2a1 Remove migration README after completing SiteUserData userid type migration
Co-authored-by: jxxghp <jxxghp@live.cn>
2025-08-25 13:54:58 +00:00
Cursor Agent
cd7767b331 Checkpoint before follow-up message
Co-authored-by: jxxghp <jxxghp@live.cn>
2025-08-25 13:54:48 +00:00
Cursor Agent
25289dad8a Migrate SiteUserData userid field from Integer to String type
Co-authored-by: jxxghp <jxxghp@live.cn>
2025-08-25 13:50:58 +00:00
jxxghp
47c6917129 remove _check_restart_policy 2025-08-25 21:30:53 +08:00
jxxghp
6379cda148 fix 异步定时服务 2025-08-25 21:19:07 +08:00
jxxghp
91a124ab8f fix 异步定时服务 2025-08-25 20:44:38 +08:00
jxxghp
2357a7135e fix run_async 2025-08-25 17:46:06 +08:00
jxxghp
da0b3b3de9 fix:日历缓存 2025-08-25 16:46:10 +08:00
jxxghp
6664fb1716 feat:增加插件和日历的自动缓存 2025-08-25 16:37:02 +08:00
jxxghp
1206f24fa9 修复缓存迭代时的并发问题 2025-08-25 13:11:44 +08:00
jxxghp
ffb5823e84 fix #4829 优化模块导入逻辑,增加对 Async 类的特殊处理 2025-08-25 08:14:43 +08:00
26 changed files with 510 additions and 303 deletions

View File

@@ -541,6 +541,9 @@ class MediaChain(ChainBase):
# 处理目录内的文件
files = __list_files(_fileitem=fileitem)
for file in files:
if file.type == "dir":
# 电影不处理子目录
continue
self.scrape_metadata(fileitem=file,
mediainfo=mediainfo,
init_folder=False,
@@ -640,6 +643,9 @@ class MediaChain(ChainBase):
if recursive:
files = __list_files(_fileitem=fileitem)
for file in files:
if file.type == "dir" and not file.name.lower().startswith("season"):
# 电视剧不处理非季子目录
continue
self.scrape_metadata(fileitem=file,
mediainfo=mediainfo,
parent=fileitem if file.type == "file" else None,

View File

@@ -1184,6 +1184,42 @@ class SubscribeChain(ChainBase):
logger.error(f'follow用户分享订阅 {title} 添加失败:{message}')
logger.info(f'follow用户分享订阅刷新完成共添加 {success_count} 个订阅')
async def cache_calendar(self):
"""
预缓存订阅日历,实际上就是查询一遍所有订阅的媒体信息
前端请示是异常的,所以需要使用异步缓存方法
"""
logger.info(f'开始预缓存订阅日历 ...')
for subscribe in await SubscribeOper().async_list():
if global_vars.is_system_stopped:
break
try:
mtype = MediaType(subscribe.type)
except ValueError:
logger.error(f'订阅 {subscribe.name} 类型错误:{subscribe.type}')
continue
# 识别媒体信息
if mtype == MediaType.MOVIE:
mediainfo: MediaInfo = await self.async_recognize_media(mtype=mtype,
tmdbid=subscribe.tmdbid,
doubanid=subscribe.doubanid,
bangumiid=subscribe.bangumiid,
episode_group=subscribe.episode_group,
cache=False)
if not mediainfo:
logger.warn(
f'未识别到媒体信息,标题:{subscribe.name}tmdbid{subscribe.tmdbid}doubanid{subscribe.doubanid}')
continue
else:
episodes = await TmdbChain().async_tmdb_episodes(tmdbid=subscribe.tmdbid,
season=subscribe.season,
episode_group=subscribe.episode_group)
if not episodes:
logger.warn(
f'未识别到季集信息,标题:{subscribe.name}tmdbid{subscribe.tmdbid}豆瓣ID{subscribe.doubanid},季:{subscribe.season}')
continue
logger.info(f'订阅日历预缓存完成')
@staticmethod
def __update_subscribe_note(subscribe: Subscribe, downloads: Optional[List[Context]]):
"""

View File

@@ -591,7 +591,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
text=__process_msg,
data={
"current": Path(fileitem.path).as_posix(),
"finished":finished_files
"finished": finished_files
})
# 整理
state, err_msg = self.__handle_transfer(task=task, callback=item.callback)
@@ -1471,13 +1471,9 @@ class TransferChain(ChainBase, metaclass=Singleton):
for file in torrent_files:
file_path = save_path / file.name
# 如果存在未被屏蔽的媒体文件,则不删除种子
if (
file_path.suffix in self.all_exts
and not self._is_blocked_by_exclude_words(
str(file_path), transfer_exclude_words
)
and file_path.exists()
):
if (file_path.suffix in self.all_exts
and not self._is_blocked_by_exclude_words(str(file_path), transfer_exclude_words)
and file_path.exists()):
return False
# 所有媒体文件都被屏蔽或不存在,可以删除种子

View File

@@ -474,7 +474,11 @@ class MemoryBackend(CacheBackend):
if region_cache is None:
yield from ()
return
for item in region_cache.items():
# 使用锁保护迭代过程,避免在迭代时缓存被修改
with lock:
# 创建快照避免并发修改问题
items_snapshot = list(region_cache.items())
for item in items_snapshot:
yield item
def close(self) -> None:
@@ -603,7 +607,11 @@ class AsyncMemoryBackend(AsyncCacheBackend):
region_cache = self.__get_region_cache(region)
if region_cache is None:
return
for item in region_cache.items():
# 使用锁保护迭代过程,避免在迭代时缓存被修改
with lock:
# 创建快照避免并发修改问题
items_snapshot = list(region_cache.items())
for item in items_snapshot:
yield item
async def close(self) -> None:
@@ -1385,7 +1393,7 @@ class TTLCache(CacheProxy):
def __init__(self,
region: Optional[str] = DEFAULT_CACHE_REGION,
maxsize: Optional[int] = DEFAULT_CACHE_SIZE,
ttl: Optional[int]= DEFAULT_CACHE_TTL):
ttl: Optional[int] = DEFAULT_CACHE_TTL):
"""
初始化 TTL 缓存
@@ -1404,7 +1412,7 @@ class LRUCache(CacheProxy):
def __init__(self,
region: Optional[str] = DEFAULT_CACHE_REGION,
maxsize: Optional[int]= DEFAULT_CACHE_SIZE
maxsize: Optional[int] = DEFAULT_CACHE_SIZE
):
"""
初始化 LRU 缓存

View File

@@ -450,10 +450,7 @@ class EventManager(metaclass=Singleton):
logger.debug(f"Handler {self.__get_handler_identifier(handler)} is disabled. Skipping execution")
return
try:
self.__invoke_handler_by_type_sync(handler, event)
except Exception as e:
self.__handle_event_error(event, handler, e)
self.__invoke_handler_by_type_sync(handler, event)
async def __safe_invoke_handler_async(self, handler: Callable, event: Event):
"""
@@ -465,10 +462,7 @@ class EventManager(metaclass=Singleton):
logger.debug(f"Handler {self.__get_handler_identifier(handler)} is disabled. Skipping execution")
return
try:
await self.__invoke_handler_by_type_async(handler, event)
except Exception as e:
self.__handle_event_error(event, handler, e)
await self.__invoke_handler_by_type_async(handler, event)
def __invoke_handler_by_type_sync(self, handler: Callable, event: Event):
"""
@@ -486,7 +480,17 @@ class EventManager(metaclass=Singleton):
if class_name in plugin_manager.get_plugin_ids():
# 插件处理器
plugin_manager.run_plugin_method(class_name, method_name, event)
plugin = plugin_manager.running_plugins.get(class_name)
if not plugin:
return
method = getattr(plugin, method_name, None)
if not method:
return
try:
method(event)
except Exception as e:
self.__handle_event_error(event=event, module_name=plugin.name,
class_name=class_name, method_name=method_name, e=e)
elif class_name in module_manager.get_module_ids():
# 模块处理器
module = module_manager.get_running_module(class_name)
@@ -495,16 +499,24 @@ class EventManager(metaclass=Singleton):
method = getattr(module, method_name, None)
if not method:
return
method(event)
try:
method(event)
except Exception as e:
self.__handle_event_error(event=event, module_name=module.get_name(),
class_name=class_name, method_name=method_name, e=e)
else:
# 全局处理器
class_obj = self.__get_class_instance(class_name)
if not class_obj or not hasattr(class_obj, method_name):
return
method = getattr(class_obj, method_name)
method = getattr(class_obj, method_name, None)
if not method:
return
method(event)
try:
method(event)
except Exception as e:
self.__handle_event_error(event=event, module_name=class_name,
class_name=class_name, method_name=method_name, e=e)
async def __invoke_handler_by_type_async(self, handler: Callable, event: Event):
"""
@@ -537,52 +549,62 @@ class EventManager(metaclass=Singleton):
names = handler.__qualname__.split(".")
return names[0], names[1]
@staticmethod
async def __invoke_plugin_method_async(handler: Any, class_name: str, method_name: str, event: Event):
async def __invoke_plugin_method_async(self, handler: Any, class_name: str, method_name: str, event: Event):
"""
异步调用插件方法
"""
plugin = handler.running_plugins.get(class_name)
if plugin and hasattr(plugin, method_name):
method = getattr(plugin, method_name)
if not plugin:
return
method = getattr(plugin, method_name, None)
if not method:
return
try:
if inspect.iscoroutinefunction(method):
await method(event)
else:
# 插件同步函数在异步环境中运行,避免阻塞
await run_in_threadpool(method, event)
except Exception as e:
self.__handle_event_error(event=event, handler=handler, e=e, module_name=plugin.name)
@staticmethod
async def __invoke_module_method_async(handler: Any, class_name: str, method_name: str, event: Event):
async def __invoke_module_method_async(self, handler: Any, class_name: str, method_name: str, event: Event):
"""
异步调用模块方法
"""
module = handler.get_running_module(class_name)
if not module:
return
method = getattr(module, method_name, None)
if not method:
return
if inspect.iscoroutinefunction(method):
await method(event)
else:
method(event)
try:
if inspect.iscoroutinefunction(method):
await method(event)
else:
method(event)
except Exception as e:
self.__handle_event_error(event=event, module_name=module.get_name(),
class_name=class_name, method_name=method_name, e=e)
async def __invoke_global_method_async(self, class_name: str, method_name: str, event: Event):
"""
异步调用全局对象方法
"""
class_obj = self.__get_class_instance(class_name)
if not class_obj or not hasattr(class_obj, method_name):
if not class_obj:
return
method = getattr(class_obj, method_name)
if inspect.iscoroutinefunction(method):
await method(event)
else:
method(event)
method = getattr(class_obj, method_name, None)
if not method:
return
try:
if inspect.iscoroutinefunction(method):
await method(event)
else:
method(event)
except Exception as e:
self.__handle_event_error(event=event, module_name=class_name,
class_name=class_name, method_name=method_name, e=e)
@staticmethod
def __get_class_instance(class_name: str):
@@ -609,7 +631,11 @@ class EventManager(metaclass=Singleton):
module_name = f"app.chain.{class_name[:-5].lower()}"
module = importlib.import_module(module_name)
elif class_name.endswith("Helper"):
module_name = f"app.helper.{class_name[:-6].lower()}"
# 特殊处理 Async 类
if class_name.startswith("Async"):
module_name = f"app.helper.{class_name[5:-6].lower()}"
else:
module_name = f"app.helper.{class_name[:-6].lower()}"
module = importlib.import_module(module_name)
else:
module_name = f"app.{class_name.lower()}"
@@ -649,18 +675,16 @@ class EventManager(metaclass=Singleton):
"""
logger.debug(f"{stage} - {event}")
def __handle_event_error(self, event: Event, handler: Callable, e: Exception):
def __handle_event_error(self, event: Event, module_name: str,
class_name: str, method_name: str, e: Exception):
"""
全局错误处理器,用于处理事件处理中的异常
"""
logger.error(f"事件处理出错:{str(e)} - {traceback.format_exc()}")
names = handler.__qualname__.split(".")
class_name, method_name = names[0], names[1]
logger.error(f"{module_name} 事件处理出错:{str(e)} - {traceback.format_exc()}")
# 发送系统错误通知
from app.helper.message import MessageHelper
MessageHelper().put(title=f"{event.event_type} 事件处理出错",
MessageHelper().put(title=f"{module_name} 处理事件 {event.event_type} 出错",
message=f"{class_name}.{method_name}{str(e)}",
role="system")
self.send_event(

View File

@@ -1189,6 +1189,7 @@ class PluginManager(metaclass=Singleton):
async def async_get_online_plugins(self, force: bool = False) -> List[schemas.Plugin]:
"""
异步获取所有在线插件信息
:param force: 是否强制刷新(忽略缓存)
"""
if not settings.PLUGIN_MARKET:
return []

View File

@@ -20,7 +20,7 @@ class SiteUserData(Base):
# 用户名
username = Column(String)
# 用户ID
userid = Column(Integer)
userid = Column(String)
# 用户等级
user_level = Column(String)
# 加入时间

View File

@@ -119,6 +119,14 @@ class SubscribeOper(DbOper):
return Subscribe.get_by_state(self._db, state)
return Subscribe.list(self._db)
async def async_list(self, state: Optional[str] = None) -> List[Subscribe]:
"""
异步获取订阅列表
"""
if state:
return await Subscribe.async_get_by_state(self._db, state)
return await Subscribe.async_list(self._db)
def delete(self, sid: int):
"""
删除订阅

View File

@@ -610,14 +610,19 @@ class PluginHelper(metaclass=WeakSingleton):
asset = next((a for a in assets if a.get("name") == asset_name), None)
if not asset:
return False, f"未找到资产文件:{asset_name}"
download_url = asset.get("browser_download_url")
if not download_url:
return False, "资产缺少下载地址"
asset_id = asset.get("id")
if not asset_id:
return False, "资产缺少ID信息"
# 构建资产的API下载URL
download_url = f"https://api.github.com/repos/{user_repo}/releases/assets/{asset_id}"
except Exception as e:
logger.error(f"解析 Release 信息失败:{e}")
return False, f"解析 Release 信息失败:{e}"
res = self.__request_with_fallback(download_url, headers=settings.REPO_GITHUB_HEADERS(repo=user_repo))
# 使用资产的API端点下载需要设置Accept头为application/octet-stream
headers = settings.REPO_GITHUB_HEADERS(repo=user_repo).copy()
headers["Accept"] = "application/octet-stream"
res = self.__request_with_fallback(download_url, headers=headers, is_api=True)
if res is None or res.status_code != 200:
return False, f"下载资产失败:{res.status_code if res else '连接失败'}"
@@ -911,10 +916,10 @@ class PluginHelper(metaclass=WeakSingleton):
"""
# 异步版本直接调用不带缓存的版本(缓存在异步环境下可能有并发问题)
if force:
return await self._async_get_plugins_uncached(repo_url, package_version)
await self._async_get_plugins_cached.cache_clear()
return await self._async_get_plugins_cached(repo_url, package_version)
@cached(maxsize=64, ttl=1800)
@cached(maxsize=128, ttl=1800)
async def _async_get_plugins_cached(self, repo_url: str,
package_version: Optional[str] = None) -> Optional[Dict[str, dict]]:
"""
@@ -1525,15 +1530,21 @@ class PluginHelper(metaclass=WeakSingleton):
asset = next((a for a in assets if a.get("name") == asset_name), None)
if not asset:
return False, f"未找到资产文件:{asset_name}"
download_url = asset.get("browser_download_url")
if not download_url:
return False, "资产缺少下载地址"
asset_id = asset.get("id")
if not asset_id:
return False, "资产缺少ID信息"
# 构建资产的API下载URL
download_url = f"https://api.github.com/repos/{user_repo}/releases/assets/{asset_id}"
except Exception as e:
logger.error(f"解析 Release 信息失败:{e}")
return False, f"解析 Release 信息失败:{e}"
# 使用资产的API端点下载需要设置Accept头为application/octet-stream
headers = settings.REPO_GITHUB_HEADERS(repo=user_repo).copy()
headers["Accept"] = "application/octet-stream"
res = await self.__async_request_with_fallback(download_url,
headers=settings.REPO_GITHUB_HEADERS(repo=user_repo))
headers=headers,
is_api=True)
if res is None or res.status_code != 200:
return False, f"下载资产失败:{res.status_code if res else '连接失败'}"

View File

@@ -226,11 +226,9 @@ class DoubanApi(metaclass=WeakSingleton):
"""
处理HTTP响应
"""
if resp is not None and resp.status_code == 400 and "rate_limit" in resp.text:
return resp.json()
return resp.json() if resp else {}
return resp.json() if resp is not None else None
@cached(maxsize=settings.CONF.douban, ttl=settings.CONF.meta)
@cached(maxsize=settings.CONF.douban, ttl=settings.CONF.meta, skip_none=True)
def __invoke(self, url: str, **kwargs) -> dict:
"""
GET请求
@@ -242,7 +240,7 @@ class DoubanApi(metaclass=WeakSingleton):
).get_res(url=req_url, params=params)
return self._handle_response(resp)
@cached(maxsize=settings.CONF.douban, ttl=settings.CONF.meta)
@cached(maxsize=settings.CONF.douban, ttl=settings.CONF.meta, skip_none=True)
async def __async_invoke(self, url: str, **kwargs) -> dict:
"""
GET请求异步版本
@@ -265,7 +263,7 @@ class DoubanApi(metaclass=WeakSingleton):
params.pop('_ts')
return req_url, params
@cached(maxsize=settings.CONF.douban, ttl=settings.CONF.meta)
@cached(maxsize=settings.CONF.douban, ttl=settings.CONF.meta, skip_none=True)
def __post(self, url: str, **kwargs) -> dict:
"""
POST请求
@@ -287,7 +285,7 @@ class DoubanApi(metaclass=WeakSingleton):
).post_res(url=req_url, data=params)
return self._handle_response(resp)
@cached(maxsize=settings.CONF.douban, ttl=settings.CONF.meta)
@cached(maxsize=settings.CONF.douban, ttl=settings.CONF.meta, skip_none=True)
async def __async_post(self, url: str, **kwargs) -> dict:
"""
POST请求异步版本

View File

@@ -251,10 +251,15 @@ class AliPan(StorageBase, metaclass=WeakSingleton):
# 检查会话
self._check_session()
resp = self.session.request(
method, f"{self.base_url}{endpoint}",
**kwargs
)
try:
resp = self.session.request(
method, f"{self.base_url}{endpoint}",
**kwargs
)
except requests.exceptions.RequestException as e:
logger.error(f"【阿里云盘】{method} 请求 {endpoint} 网络错误: {str(e)}")
return None
if resp is None:
logger.warn(f"【阿里云盘】{method} 请求 {endpoint} 失败!")
return None

View File

@@ -246,6 +246,17 @@ class LocalStorage(StorageBase):
logger.error(f"【本地】移动文件失败:{err}")
return None
@staticmethod
def __should_show_progress(src: Path, dest: Path):
"""
是否显示进度条
"""
src_isnetwork = SystemUtils.is_network_filesystem(src)
dest_isnetwork = SystemUtils.is_network_filesystem(dest)
if src_isnetwork and dest_isnetwork and SystemUtils.is_same_disk(src, dest):
return True
return False
def copy(
self,
fileitem: schemas.FileItem,
@@ -258,8 +269,15 @@ class LocalStorage(StorageBase):
try:
src = Path(fileitem.path)
dest = path / new_name
if self._copy_with_progress(src, dest):
return True
if self.__should_show_progress(src, dest):
if self._copy_with_progress(src, dest):
return True
else:
code, message = SystemUtils.copy(src, dest)
if code == 0:
return True
else:
logger.error(f"【本地】复制文件失败:{message}")
except Exception as err:
logger.error(f"【本地】复制文件失败:{err}")
return False
@@ -276,10 +294,17 @@ class LocalStorage(StorageBase):
try:
src = Path(fileitem.path)
dest = path / new_name
if self._copy_with_progress(src, dest):
# 复制成功删除源文件
src.unlink()
return True
if self.__should_show_progress(src, dest):
if self._copy_with_progress(src, dest):
# 复制成功删除源文件
src.unlink()
return True
else:
code, message = SystemUtils.move(src, dest)
if code == 0:
return True
else:
logger.error(f"【本地】移动文件失败:{message}")
except Exception as err:
logger.error(f"【本地】移动文件失败:{err}")
return False

View File

@@ -205,10 +205,15 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
# 检查会话
self._check_session()
resp = self.session.request(
method, f"{self.base_url}{endpoint}",
**kwargs
)
try:
resp = self.session.request(
method, f"{self.base_url}{endpoint}",
**kwargs
)
except requests.exceptions.RequestException as e:
logger.error(f"【115】{method} 请求 {endpoint} 网络错误: {str(e)}")
return None
if resp is None:
logger.warn(f"【115】{method} 请求 {endpoint} 失败!")
return None

View File

@@ -85,7 +85,7 @@ class HaiDanSpider:
categories = self._movie_category
# 搜索类型
if keyword.startswith('tt'):
if keyword and keyword.startswith('tt'):
search_area = '4'
else:
search_area = '0'

View File

@@ -125,12 +125,12 @@ class TMDb(object):
def cache(self, cache):
self._cache_enabled = bool(cache)
@cached(maxsize=settings.CONF.tmdb, ttl=settings.CONF.meta)
@cached(maxsize=settings.CONF.tmdb, ttl=settings.CONF.meta, skip_none=True)
def cached_request(self, method, url, data, json,
_ts=datetime.strftime(datetime.now(), '%Y%m%d')):
return self.request(method, url, data, json)
@cached(maxsize=settings.CONF.tmdb, ttl=settings.CONF.meta)
@cached(maxsize=settings.CONF.tmdb, ttl=settings.CONF.meta, skip_none=True)
async def async_cached_request(self, method, url, data, json,
_ts=datetime.strftime(datetime.now(), '%Y%m%d')):
return await self.async_request(method, url, data, json)

View File

@@ -7,6 +7,8 @@ import json
import urllib.parse
from http import HTTPStatus
from app.core.cache import cached
from app.core.config import settings
from app.utils.http import RequestUtils
@@ -15,7 +17,7 @@ class Auth:
TVDB认证类
"""
def __init__(self, url, apikey, pin="", proxy=None, timeout: int = 15):
def __init__(self, url: str, apikey: str, pin: str = "", proxy: dict = None, timeout: int = 15):
login_info = {"apikey": apikey}
if pin != "":
login_info["pin"] = pin
@@ -35,13 +37,14 @@ class Auth:
result = response.json()
self.token = result["data"]["token"]
else:
error_msg = f"登录失败,状态码: {response.status_code if response else 'None'}"
if response:
if response is not None:
try:
error_data = response.json()
error_msg = f"Code: {response.status_code}, {error_data.get('message', '未知错误')}"
except Exception as err:
error_msg = f"Code: {response.status_code}, 响应解析失败:{err}"
else:
error_msg = "网络连接失败,未收到响应"
raise Exception(error_msg)
except Exception as e:
raise Exception(f"TVDB认证失败: {str(e)}")
@@ -58,13 +61,14 @@ class Request:
请求处理类
"""
def __init__(self, auth_token, proxy=None, timeout=15):
def __init__(self, auth_token: str, proxy: dict = None, timeout: int = 15):
self.auth_token = auth_token
self.links = None
self.proxy = proxy
self.timeout = timeout
def make_request(self, url, if_modified_since=None):
@cached(maxsize=settings.CONF.tmdb, ttl=settings.CONF.meta, skip_none=True)
def make_request(self, url: str, if_modified_since: bool = None):
"""
向指定的 URL 发起请求并返回数据
"""
@@ -118,7 +122,8 @@ class Url:
def __init__(self):
self.base_url = "https://api4.thetvdb.com/v4/"
def construct(self, url_sect, url_id=None, url_subsect=None, url_lang=None, **kwargs):
def construct(self, url_sect: str, url_id: int = None,
url_subsect: str = None, url_lang: str = None, **kwargs):
"""
构建API URL
"""
@@ -141,7 +146,7 @@ class TVDB:
TVDB API主类
"""
def __init__(self, apikey: str, pin="", proxy=None, timeout: int = 15):
def __init__(self, apikey: str, pin: str = "", proxy: dict = None, timeout: int = 15):
self.url = Url()
login_url = self.url.construct("login")
self.auth = Auth(login_url, apikey, pin, proxy, timeout)
@@ -154,126 +159,126 @@ class TVDB:
"""
return self.request.links
def get_artwork_statuses(self, meta=None, if_modified_since=None) -> list:
def get_artwork_statuses(self, meta: str = None, if_modified_since: bool = None) -> list:
"""
返回艺术图状态列表
"""
url = self.url.construct("artwork/statuses", meta=meta)
return self.request.make_request(url, if_modified_since)
def get_artwork_types(self, meta=None, if_modified_since=None) -> list:
def get_artwork_types(self, meta: str = None, if_modified_since: bool = None) -> list:
"""
返回艺术图类型列表
"""
url = self.url.construct("artwork/types", meta=meta)
return self.request.make_request(url, if_modified_since)
def get_artwork(self, id: int, meta=None, if_modified_since=None) -> dict:
def get_artwork(self, id: int, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回单个艺术图信息的字典
"""
url = self.url.construct("artwork", id, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_artwork_extended(self, id: int, meta=None, if_modified_since=None) -> dict:
def get_artwork_extended(self, id: int, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回单个艺术图的扩展信息字典
"""
url = self.url.construct("artwork", id, "extended", meta=meta)
return self.request.make_request(url, if_modified_since)
def get_all_awards(self, meta=None, if_modified_since=None) -> list:
def get_all_awards(self, meta: str = None, if_modified_since: bool = None) -> list:
"""
返回奖项列表
"""
url = self.url.construct("awards", meta=meta)
return self.request.make_request(url, if_modified_since)
def get_award(self, id: int, meta=None, if_modified_since=None) -> dict:
def get_award(self, id: int, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回单个奖项信息的字典
"""
url = self.url.construct("awards", id, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_award_extended(self, id: int, meta=None, if_modified_since=None) -> dict:
def get_award_extended(self, id: int, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回单个奖项的扩展信息字典
"""
url = self.url.construct("awards", id, "extended", meta=meta)
return self.request.make_request(url, if_modified_since)
def get_all_award_categories(self, meta=None, if_modified_since=None) -> list:
def get_all_award_categories(self, meta: str = None, if_modified_since: bool = None) -> list:
"""
返回奖项类别列表
"""
url = self.url.construct("awards/categories", meta=meta)
return self.request.make_request(url, if_modified_since)
def get_award_category(self, id: int, meta=None, if_modified_since=None) -> dict:
def get_award_category(self, id: int, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回单个奖项类别信息的字典
"""
url = self.url.construct("awards/categories", id, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_award_category_extended(self, id: int, meta=None, if_modified_since=None) -> dict:
def get_award_category_extended(self, id: int, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回单个奖项类别的扩展信息字典
"""
url = self.url.construct("awards/categories", id, "extended", meta=meta)
return self.request.make_request(url, if_modified_since)
def get_content_ratings(self, meta=None, if_modified_since=None) -> list:
def get_content_ratings(self, meta: str = None, if_modified_since: bool = None) -> list:
"""
返回内容分级列表
"""
url = self.url.construct("content/ratings", meta=meta)
return self.request.make_request(url, if_modified_since)
def get_countries(self, meta=None, if_modified_since=None) -> list:
def get_countries(self, meta: str = None, if_modified_since: bool = None) -> list:
"""
返回国家列表
"""
url = self.url.construct("countries", meta=meta)
return self.request.make_request(url, if_modified_since)
def get_all_companies(self, page=None, meta=None, if_modified_since=None) -> list:
def get_all_companies(self, page: int = None, meta: str = None, if_modified_since: bool = None) -> list:
"""
返回公司列表 (可分页)
"""
url = self.url.construct("companies", page=page, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_company_types(self, meta=None, if_modified_since=None) -> list:
def get_company_types(self, meta: str = None, if_modified_since: bool = None) -> list:
"""
返回公司类型列表
"""
url = self.url.construct("companies/types", meta=meta)
return self.request.make_request(url, if_modified_since)
def get_company(self, id: int, meta=None, if_modified_since=None) -> dict:
def get_company(self, id: int, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回单个公司信息的字典
"""
url = self.url.construct("companies", id, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_all_series(self, page=None, meta=None, if_modified_since=None) -> list:
def get_all_series(self, page: int = None, meta: str = None, if_modified_since: bool = None) -> list:
"""
返回剧集列表 (可分页)
"""
url = self.url.construct("series", page=page, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_series(self, id: int, meta=None, if_modified_since=None) -> dict:
def get_series(self, id: int, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回单个剧集信息的字典
"""
url = self.url.construct("series", id, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_series_by_slug(self, slug: str, meta=None, if_modified_since=None) -> dict:
def get_series_by_slug(self, slug: str, meta: str = None, if_modified_since: bool = None) -> dict:
"""
通过 slug (别名) 返回单个剧集信息的字典
"""
@@ -288,7 +293,7 @@ class TVDB:
return self.request.make_request(url, if_modified_since)
def get_series_episodes(self, id: int, season_type: str = "default", page: int = 0,
lang: str = None, meta=None, if_modified_since=None, **kwargs) -> dict:
lang: str = None, meta: str = None, if_modified_since: bool = None, **kwargs) -> dict:
"""
返回指定剧集和季类型的各集信息字典 (可分页,可指定语言)
"""
@@ -297,7 +302,7 @@ class TVDB:
)
return self.request.make_request(url, if_modified_since)
def get_series_translation(self, id: int, lang: str, meta=None, if_modified_since=None) -> dict:
def get_series_translation(self, id: int, lang: str, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回剧集的指定语言翻译信息字典
"""
@@ -318,21 +323,21 @@ class TVDB:
url = self.url.construct("series", id, "nextAired")
return self.request.make_request(url, if_modified_since)
def get_all_movies(self, page=None, meta=None, if_modified_since=None) -> list:
def get_all_movies(self, page: int = None, meta: str = None, if_modified_since: bool = None) -> list:
"""
返回电影列表 (可分页)
"""
url = self.url.construct("movies", page=page, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_movie(self, id: int, meta=None, if_modified_since=None) -> dict:
def get_movie(self, id: int, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回单个电影信息的字典
"""
url = self.url.construct("movies", id, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_movie_by_slug(self, slug: str, meta=None, if_modified_since=None) -> dict:
def get_movie_by_slug(self, slug: str, meta: str = None, if_modified_since: bool = None) -> dict:
"""
通过 slug (别名) 返回单个电影信息的字典
"""
@@ -346,70 +351,70 @@ class TVDB:
url = self.url.construct("movies", id, "extended", meta=meta, short=short)
return self.request.make_request(url, if_modified_since)
def get_movie_translation(self, id: int, lang: str, meta=None, if_modified_since=None) -> dict:
def get_movie_translation(self, id: int, lang: str, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回电影的指定语言翻译信息字典
"""
url = self.url.construct("movies", id, "translations", lang, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_all_seasons(self, page=None, meta=None, if_modified_since=None) -> list:
def get_all_seasons(self, page: int = None, meta: str = None, if_modified_since: bool = None) -> list:
"""
返回季列表 (可分页)
"""
url = self.url.construct("seasons", page=page, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_season(self, id: int, meta=None, if_modified_since=None) -> dict:
def get_season(self, id: int, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回单季信息的字典
"""
url = self.url.construct("seasons", id, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_season_extended(self, id: int, meta=None, if_modified_since=None) -> dict:
def get_season_extended(self, id: int, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回单季的扩展信息字典
"""
url = self.url.construct("seasons", id, "extended", meta=meta)
return self.request.make_request(url, if_modified_since)
def get_season_types(self, meta=None, if_modified_since=None) -> list:
def get_season_types(self, meta: str = None, if_modified_since: bool = None) -> list:
"""
返回季类型列表
"""
url = self.url.construct("seasons/types", meta=meta)
return self.request.make_request(url, if_modified_since)
def get_season_translation(self, id: int, lang: str, meta=None, if_modified_since=None) -> dict:
def get_season_translation(self, id: int, lang: str, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回季的指定语言翻译信息字典
"""
url = self.url.construct("seasons", id, "translations", lang, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_all_episodes(self, page=None, meta=None, if_modified_since=None) -> list:
def get_all_episodes(self, page: int = None, meta: str = None, if_modified_since: bool = None) -> list:
"""
返回集列表 (可分页)
"""
url = self.url.construct("episodes", page=page, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_episode(self, id: int, meta=None, if_modified_since=None) -> dict:
def get_episode(self, id: int, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回单集信息的字典
"""
url = self.url.construct("episodes", id, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_episode_extended(self, id: int, meta=None, if_modified_since=None) -> dict:
def get_episode_extended(self, id: int, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回单集的扩展信息字典
"""
url = self.url.construct("episodes", id, "extended", meta=meta)
return self.request.make_request(url, if_modified_since)
def get_episode_translation(self, id: int, lang: str, meta=None, if_modified_since=None) -> dict:
def get_episode_translation(self, id: int, lang: str, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回单集的指定语言翻译信息字典
"""
@@ -419,70 +424,70 @@ class TVDB:
# 兼容旧函数名。
get_episodes_translation = get_episode_translation
def get_all_genders(self, meta=None, if_modified_since=None) -> list:
def get_all_genders(self, meta: str = None, if_modified_since: bool = None) -> list:
"""
返回性别列表
"""
url = self.url.construct("genders", meta=meta)
return self.request.make_request(url, if_modified_since)
def get_all_genres(self, meta=None, if_modified_since=None) -> list:
def get_all_genres(self, meta: str = None, if_modified_since: bool = None) -> list:
"""
返回类型(流派)列表
"""
url = self.url.construct("genres", meta=meta)
return self.request.make_request(url, if_modified_since)
def get_genre(self, id: int, meta=None, if_modified_since=None) -> dict:
def get_genre(self, id: int, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回单个类型(流派)信息的字典
"""
url = self.url.construct("genres", id, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_all_languages(self, meta=None, if_modified_since=None) -> list:
def get_all_languages(self, meta: str = None, if_modified_since: bool = None) -> list:
"""
返回语言列表
"""
url = self.url.construct("languages", meta=meta)
return self.request.make_request(url, if_modified_since)
def get_all_people(self, page=None, meta=None, if_modified_since=None) -> list:
def get_all_people(self, page: int = None, meta: str = None, if_modified_since: bool = None) -> list:
"""
返回人物列表 (可分页)
"""
url = self.url.construct("people", page=page, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_person(self, id: int, meta=None, if_modified_since=None) -> dict:
def get_person(self, id: int, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回单个人物信息的字典
"""
url = self.url.construct("people", id, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_person_extended(self, id: int, meta=None, if_modified_since=None) -> dict:
def get_person_extended(self, id: int, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回单个人物的扩展信息字典
"""
url = self.url.construct("people", id, "extended", meta=meta)
return self.request.make_request(url, if_modified_since)
def get_person_translation(self, id: int, lang: str, meta=None, if_modified_since=None) -> dict:
def get_person_translation(self, id: int, lang: str, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回人物的指定语言翻译信息字典
"""
url = self.url.construct("people", id, "translations", lang, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_character(self, id: int, meta=None, if_modified_since=None) -> dict:
def get_character(self, id: int, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回角色信息的字典
"""
url = self.url.construct("characters", id, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_people_types(self, meta=None, if_modified_since=None) -> list:
def get_people_types(self, meta: str = None, if_modified_since: bool = None) -> list:
"""
返回人物类型列表
"""
@@ -492,7 +497,7 @@ class TVDB:
# 兼容旧函数名
get_all_people_types = get_people_types
def get_source_types(self, meta=None, if_modified_since=None) -> list:
def get_source_types(self, meta: str = None, if_modified_since: bool = None) -> list:
"""
返回来源类型列表
"""
@@ -509,56 +514,56 @@ class TVDB:
url = self.url.construct("updates", since=since, **kwargs)
return self.request.make_request(url)
def get_all_tag_options(self, page=None, meta=None, if_modified_since=None) -> list:
def get_all_tag_options(self, page: int = None, meta: str = None, if_modified_since: bool = None) -> list:
"""
返回标签选项列表 (可分页)
"""
url = self.url.construct("tags/options", page=page, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_tag_option(self, id: int, meta=None, if_modified_since=None) -> dict:
def get_tag_option(self, id: int, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回单个标签选项信息的字典
"""
url = self.url.construct("tags/options", id, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_all_lists(self, page=None, meta=None) -> dict:
def get_all_lists(self, page: int = None, meta=None) -> dict:
"""
返回所有公开的列表信息 (可分页)
"""
url = self.url.construct("lists", page=page, meta=meta)
return self.request.make_request(url)
def get_list(self, id: int, meta=None, if_modified_since=None) -> dict:
def get_list(self, id: int, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回单个列表信息的字典
"""
url = self.url.construct("lists", id, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_list_by_slug(self, slug: str, meta=None, if_modified_since=None) -> dict:
def get_list_by_slug(self, slug: str, meta: str = None, if_modified_since: bool = None) -> dict:
"""
通过 slug (别名) 返回单个列表信息的字典
"""
url = self.url.construct("lists/slug", slug, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_list_extended(self, id: int, meta=None, if_modified_since=None) -> dict:
def get_list_extended(self, id: int, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回单个列表的扩展信息字典
"""
url = self.url.construct("lists", id, "extended", meta=meta)
return self.request.make_request(url, if_modified_since)
def get_list_translation(self, id: int, lang: str, meta=None, if_modified_since=None) -> dict:
def get_list_translation(self, id: int, lang: str, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回列表的指定语言翻译信息字典
"""
url = self.url.construct("lists", id, "translations", lang, meta=meta)
return self.request.make_request(url, if_modified_since)
def get_inspiration_types(self, meta=None, if_modified_since=None) -> dict:
def get_inspiration_types(self, meta: str = None, if_modified_since: bool = None) -> dict:
"""
返回灵感类型列表
"""

View File

@@ -1,7 +1,6 @@
import json
import platform
import re
import subprocess
import threading
import time
import traceback
@@ -10,13 +9,13 @@ from threading import Lock
from typing import Any, Optional, Dict, List
from apscheduler.schedulers.background import BackgroundScheduler
from app.core.cache import TTLCache, FileCache
from watchdog.events import FileSystemEventHandler, FileSystemMovedEvent, FileSystemEvent
from watchdog.observers.polling import PollingObserver
from app.chain import ChainBase
from app.chain.storage import StorageChain
from app.chain.transfer import TransferChain
from app.core.cache import TTLCache, FileCache
from app.core.config import settings
from app.core.event import Event, eventmanager
from app.helper.directory import DirectoryHelper
@@ -25,7 +24,8 @@ from app.log import logger
from app.schemas import ConfigChangeEventData
from app.schemas import FileItem
from app.schemas.types import SystemConfigKey, EventType
from app.utils.singleton import Singleton
from app.utils.singleton import SingletonClass
from app.utils.system import SystemUtils
lock = Lock()
snapshot_lock = Lock()
@@ -54,7 +54,7 @@ class FileMonitorHandler(FileSystemEventHandler):
file_size=Path(event.dest_path).stat().st_size)
class Monitor(metaclass=Singleton):
class Monitor(metaclass=SingletonClass):
"""
目录监控处理链,单例模式
"""
@@ -355,7 +355,8 @@ class Monitor(metaclass=Singleton):
return tips
def should_use_polling(self, directory: Path, monitor_mode: str,
@staticmethod
def should_use_polling(directory: Path, monitor_mode: str,
file_count: int, limits: dict) -> tuple[bool, str]:
"""
判断是否应该使用轮询模式
@@ -369,7 +370,7 @@ class Monitor(metaclass=Singleton):
return True, "用户配置为兼容模式"
# 检查网络文件系统
if self.is_network_filesystem(directory):
if SystemUtils.is_network_filesystem(directory):
return True, "检测到网络文件系统,建议使用兼容模式"
max_watches = limits.get('max_user_watches')
@@ -377,45 +378,6 @@ class Monitor(metaclass=Singleton):
return True, f"目录文件数量({file_count})接近系统限制({max_watches})"
return False, "使用快速模式"
@staticmethod
def is_network_filesystem(directory: Path) -> bool:
"""
检测是否为网络文件系统
:param directory: 目录路径
:return: 是否为网络文件系统
"""
try:
system = platform.system()
if system == 'Linux':
# 检查挂载信息
result = subprocess.run(['df', '-T', str(directory)],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
output = result.stdout.lower()
# 以下本地文件系统含有fuse关键字
local_fs = [
"fuse.shfs", # Unraid
"zfuse.zfsv", # 极空间(zfuse.zfsv2、zfuse.zfsv3、...)
# TBD
]
if any(fs in output for fs in local_fs):
return False
network_fs = ['nfs', 'cifs', 'smbfs', 'fuse', 'sshfs', 'ftpfs']
return any(fs in output for fs in network_fs)
elif system == 'Darwin':
# macOS 检查
result = subprocess.run(['df', '-T', str(directory)],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
output = result.stdout.lower()
return 'nfs' in output or 'smbfs' in output
elif system == 'Windows':
# Windows 检查网络驱动器
return str(directory).startswith('\\\\')
except Exception as e:
logger.debug(f"检测网络文件系统时出错: {e}")
return False
def init(self):
"""
启动监控

View File

@@ -78,7 +78,7 @@ class FastAPIMonitor:
# 告警状态
self.alerts: List[str] = []
logger.info("FastAPI性能监控器已初始化")
logger.debug("FastAPI性能监控器已初始化")
def record_request(self, request: Request, response: Response, response_time: float):
"""
@@ -172,7 +172,7 @@ class FastAPIMonitor:
'count': 0,
'total_time': 0,
'errors': 0,
'avg_time': 0
'avg_time': 0.0
})
for req in self.request_history:

View File

@@ -1,3 +1,5 @@
import asyncio
import inspect
import threading
import traceback
from datetime import datetime, timedelta
@@ -21,13 +23,13 @@ from app.core.config import settings
from app.core.event import eventmanager, Event
from app.core.plugin import PluginManager
from app.db.systemconfig_oper import SystemConfigOper
from app.helper.message import MessageHelper
from app.helper.sites import SitesHelper # noqa
from app.helper.message import MessageHelper
from app.helper.wallpaper import WallpaperHelper
from app.log import logger
from app.schemas import Notification, NotificationType, Workflow, ConfigChangeEventData
from app.schemas.types import EventType, SystemConfigKey
from app.utils.singleton import Singleton
from app.utils.singleton import SingletonClass
from app.utils.timer import TimerUtils
lock = threading.Lock()
@@ -37,7 +39,7 @@ class SchedulerChain(ChainBase):
pass
class Scheduler(metaclass=Singleton):
class Scheduler(metaclass=SingletonClass):
"""
定时任务管理
"""
@@ -55,6 +57,8 @@ class Scheduler(metaclass=Singleton):
self._auth_count = 0
# 用户认证失败消息发送
self._auth_message = False
# 当前事件循环
self.loop = asyncio.get_event_loop()
self.init()
@eventmanager.register(EventType.ConfigChanged)
@@ -162,6 +166,19 @@ class Scheduler(metaclass=Singleton):
"name": "推荐缓存",
"func": RecommendChain().refresh_recommend,
"running": False,
},
"plugin_market_refresh": {
"name": "插件市场缓存",
"func": PluginManager().async_get_online_plugins,
"running": False,
"kwargs": {
"force": True
}
},
"subscribe_calendar_cache": {
"name": "订阅日历缓存",
"func": SubscribeChain().cache_calendar,
"running": False
}
}
@@ -180,7 +197,7 @@ class Scheduler(metaclass=Singleton):
id="cookiecloud",
name="同步CookieCloud站点",
minutes=int(settings.COOKIECLOUD_INTERVAL),
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=1),
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=5),
kwargs={
'job_id': 'cookiecloud'
}
@@ -195,7 +212,7 @@ class Scheduler(metaclass=Singleton):
id="mediaserver_sync",
name="同步媒体服务器",
hours=int(settings.MEDIASERVER_SYNC_INTERVAL),
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=5),
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=10),
kwargs={
'job_id': 'mediaserver_sync'
}
@@ -301,7 +318,7 @@ class Scheduler(metaclass=Singleton):
id="random_wallpager",
name="壁纸缓存",
minutes=30,
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(seconds=3),
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(seconds=1),
kwargs={
'job_id': 'random_wallpager'
}
@@ -363,12 +380,37 @@ class Scheduler(metaclass=Singleton):
id="recommend_refresh",
name="推荐缓存",
hours=24,
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(seconds=3),
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(seconds=5),
kwargs={
'job_id': 'recommend_refresh'
}
)
# 插件市场缓存
self._scheduler.add_job(
self.start,
"interval",
id="plugin_market_refresh",
name="插件市场缓存",
minutes=30,
kwargs={
'job_id': 'plugin_market_refresh'
}
)
# 订阅日历缓存
self._scheduler.add_job(
self.start,
"interval",
id="subscribe_calendar_cache",
name="订阅日历缓存",
hours=6,
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=2),
kwargs={
'job_id': 'subscribe_calendar_cache'
}
)
# 初始化工作流服务
self.init_workflow_jobs()
@@ -409,6 +451,13 @@ class Scheduler(metaclass=Singleton):
"""
启动定时服务
"""
def __start_coro(coro):
"""
启动协程
"""
return asyncio.run_coroutine_threadsafe(coro, self.loop)
# 获取定时任务
job = self.__prepare_job(job_id)
if not job:
@@ -417,7 +466,13 @@ class Scheduler(metaclass=Singleton):
try:
if not kwargs:
kwargs = job.get("kwargs") or {}
job["func"](*args, **kwargs)
func = job.get("func")
if not func:
return
if inspect.iscoroutinefunction(func):
__start_coro(func(*args, **kwargs))
else:
job["func"](*args, **kwargs)
except Exception as e:
logger.error(f"定时任务 {job.get('name')} 执行失败:{str(e)} - {traceback.format_exc()}")
MessageHelper().put(title=f"{job.get('name')} 执行失败",
@@ -519,7 +574,7 @@ class Scheduler(metaclass=Singleton):
except JobLookupError:
pass
if job_removed:
logger.info(f"移除插件服务({plugin_name}){service.get('name')}")
logger.info(f"移除插件服务({plugin_name}){service.get('name')}") # noqa
except Exception as e:
logger.error(f"移除插件服务失败:{str(e)} - {job_id}: {service}")
SchedulerChain().messagehelper.put(title=f"插件 {plugin_name} 服务移除失败",

View File

@@ -77,7 +77,7 @@ class SiteUserData(BaseModel):
# 用户名
username: Optional[str] = None
# 用户ID
userid: Optional[Union[int, str]] = None
userid: Optional[str] = None
# 用户等级
user_level: Optional[str] = None
# 加入时间

View File

@@ -35,10 +35,10 @@ async def lifespan(app: FastAPI):
定义应用的生命周期事件
"""
print("Starting up...")
# 初始化模块
init_modules()
# 初始化路由
init_routers(app)
# 初始化模块
init_modules()
# 恢复插件备份
SystemChain().restore_plugins()
# 初始化插件

View File

@@ -1,83 +0,0 @@
import asyncio
import threading
from concurrent.futures import ThreadPoolExecutor
from typing import Coroutine, Any, TypeVar
T = TypeVar('T')
class AsyncUtils:
"""
异步工具类,用于在同步环境中调用异步方法
"""
@staticmethod
def run_async(coro: Coroutine[Any, Any, T]) -> T:
"""
在同步环境中安全地执行异步协程
:param coro: 要执行的协程
:return: 协程的返回值
:raises: 协程执行过程中的任何异常
"""
try:
# 尝试获取当前运行的事件循环
asyncio.get_running_loop()
# 如果有运行中的事件循环,在新线程中执行
return AsyncUtils._run_in_thread(coro)
except RuntimeError:
# 没有运行中的事件循环,直接使用 asyncio.run
return asyncio.run(coro)
@staticmethod
def _run_in_thread(coro: Coroutine[Any, Any, T]) -> T:
"""
在新线程中创建事件循环并执行协程
:param coro: 要执行的协程
:return: 协程的返回值
"""
result = None
exception = None
def _run():
nonlocal result, exception
try:
# 在新线程中创建新的事件循环
new_loop = asyncio.new_event_loop()
asyncio.set_event_loop(new_loop)
try:
result = new_loop.run_until_complete(coro)
finally:
new_loop.close()
except Exception as e:
exception = e
# 在新线程中执行
thread = threading.Thread(target=_run)
thread.start()
thread.join()
if exception:
raise exception
return result
@staticmethod
def run_async_in_executor(coro: Coroutine[Any, Any, T]) -> T:
"""
使用线程池执行器在新线程中运行异步协程
:param coro: 要执行的协程
:return: 协程的返回值
"""
try:
# 检查是否有运行中的事件循环
asyncio.get_running_loop()
# 有运行中的事件循环,使用线程池
with ThreadPoolExecutor() as executor:
future = executor.submit(asyncio.run, coro)
return future.result()
except RuntimeError:
# 没有运行中的事件循环,直接运行
return asyncio.run(coro)

View File

@@ -527,6 +527,45 @@ class SystemUtils:
print(f"Error occurred: {e}")
return False
@staticmethod
def is_network_filesystem(directory: Path) -> bool:
"""
检测是否为网络文件系统
:param directory: 目录路径
:return: 是否为网络文件系统
"""
try:
system = platform.system()
if system == 'Linux':
# 检查挂载信息
result = subprocess.run(['df', '-T', str(directory)],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
output = result.stdout.lower()
# 以下本地文件系统含有fuse关键字
local_fs = [
"fuse.shfs", # Unraid
"zfuse.zfsv", # 极空间(zfuse.zfsv2、zfuse.zfsv3、...)
# TBD
]
if any(fs in output for fs in local_fs):
return False
network_fs = ['nfs', 'cifs', 'smbfs', 'fuse', 'sshfs', 'ftpfs']
return any(fs in output for fs in network_fs)
elif system == 'Darwin':
# macOS 检查
result = subprocess.run(['df', '-T', str(directory)],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
output = result.stdout.lower()
return 'nfs' in output or 'smbfs' in output
elif system == 'Windows':
# Windows 检查网络驱动器
return str(directory).startswith('\\\\')
except Exception as e:
print(f"Error occurred: {e}")
return False
@staticmethod
def is_same_disk(src: Path, dest: Path) -> bool:
"""

View File

@@ -0,0 +1,80 @@
"""2.2.1
Revision ID: a946dae52526
Revises: 5b3355c964bb
Create Date: 2025-08-20 17:50:00.000000
"""
import sqlalchemy as sa
from alembic import op
from app.log import logger
from app.core.config import settings
# revision identifiers, used by Alembic.
revision = 'a946dae52526'
down_revision = '5b3355c964bb'
branch_labels = None
depends_on = None
def upgrade() -> None:
"""
升级将SiteUserData表的userid字段从Integer改为String
"""
connection = op.get_bind()
if settings.DB_TYPE.lower() == "postgresql":
# PostgreSQL数据库迁移
migrate_postgresql_userid(connection)
def downgrade() -> None:
"""
降级将SiteUserData表的userid字段从String改回Integer
"""
pass
def migrate_postgresql_userid(connection):
"""
PostgreSQL数据库userid字段迁移
"""
try:
logger.info("开始PostgreSQL数据库userid字段迁移...")
# 1. 创建临时列
connection.execute(sa.text("""
ALTER TABLE siteuserdata
ADD COLUMN userid_new VARCHAR
"""))
# 2. 将现有数据转换为字符串并复制到新列
connection.execute(sa.text("""
UPDATE siteuserdata
SET userid_new = CAST(userid AS VARCHAR)
WHERE userid IS NOT NULL
"""))
# 3. 删除旧列
connection.execute(sa.text("""
ALTER TABLE siteuserdata
DROP COLUMN userid
"""))
# 4. 重命名新列
connection.execute(sa.text("""
ALTER TABLE siteuserdata
RENAME COLUMN userid_new TO userid
"""))
logger.info("PostgreSQL数据库userid字段迁移完成")
except Exception as e:
logger.error(f"PostgreSQL数据库userid字段迁移失败: {e}")
raise

View File

@@ -31,23 +31,34 @@ if [ "${ENABLE_SSL}" = "true" ] && \
if [ ! -d "/config/acme.sh" ]; then
INFO "→ 安装acme.sh..."
# 生成安装参数
INSTALL_ARGS=(
"--install-online"
"--home" "/config/acme.sh"
"--config-home" "/config/acme.sh/data"
"--cert-home" "/config/certs"
)
# 设置安装环境变量
export LE_WORKING_DIR="/config/acme.sh"
export LE_CONFIG_HOME="/config/acme.sh/data"
export LE_CERT_HOME="/config/certs"
# 添加邮箱参数(如果设置
# 执行官方安装命令(添加错误处理
INFO "正在下载并安装 acme.sh..."
# 构建安装命令
INSTALL_CMD="curl -sSL https://get.acme.sh | sh -s -- --install-online"
if [ -n "${SSL_EMAIL}" ]; then
INSTALL_ARGS+=("--accountemail" "${SSL_EMAIL}")
INSTALL_CMD="${INSTALL_CMD} --accountemail ${SSL_EMAIL}"
else
WARN "未设置SSL_EMAIL建议配置邮箱用于证书过期提醒"
fi
if ! eval "${INSTALL_CMD}"; then
ERROR "acme.sh 安装失败"
exit 1
fi
# 执行官方安装命令
curl -sSL https://get.acme.sh | sh -s -- "${INSTALL_ARGS[@]}"
# 验证安装是否成功
if [ ! -f "/config/acme.sh/acme.sh" ]; then
ERROR "acme.sh 安装后文件不存在,安装可能失败"
exit 1
fi
INFO "acme.sh 安装成功"
fi
# 签发证书(仅当证书不存在时)
@@ -77,17 +88,24 @@ if [ "${ENABLE_SSL}" = "true" ] && \
fi
done
# 签发证书
/config/acme.sh/acme.sh --issue \
# 签发证书(添加错误处理)
INFO "正在签发证书..."
if ! /config/acme.sh/acme.sh --issue \
--dns "${DNS_PROVIDER}" \
--domain "${SSL_DOMAIN}" \
--key-file /config/certs/"${SSL_DOMAIN}"/privkey.pem \
--fullchain-file /config/certs/"${SSL_DOMAIN}"/fullchain.pem \
--reloadcmd "nginx -s reload" \
--force
--force; then
ERROR "证书签发失败"
exit 1
fi
# 创建稳定符号链接
ln -sf /config/certs/"${SSL_DOMAIN}" /config/certs/latest
INFO "证书签发成功"
else
INFO "证书已存在,跳过签发步骤"
fi
# 配置自动更新任务
@@ -98,4 +116,12 @@ if [ "${ENABLE_SSL}" = "true" ] && \
elif [ "${ENABLE_SSL}" = "true" ] && [ "${AUTO_ISSUE_CERT}" = "true" ] && [ -z "${SSL_DOMAIN}" ]; then
WARN "已启用自动签发证书但未设置SSL_DOMAIN跳过证书管理"
elif [ "${ENABLE_SSL}" = "true" ] && [ "${AUTO_ISSUE_CERT}" = "false" ]; then
INFO "SSL已启用但自动签发证书已禁用将使用手动配置的证书"
# 检查证书文件是否存在
if [ -f "/config/certs/latest/fullchain.pem" ] && [ -f "/config/certs/latest/privkey.pem" ]; then
INFO "检测到证书文件SSL配置正常"
else
WARN "未检测到证书文件,请确保手动配置了正确的证书路径"
fi
fi

View File

@@ -1,2 +1,2 @@
APP_VERSION = 'v2.7.6'
FRONTEND_VERSION = 'v2.7.6'
APP_VERSION = 'v2.7.8'
FRONTEND_VERSION = 'v2.7.8'