mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-06-13 23:16:46 +00:00
Compare commits
17 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8c1be70c85 | ||
|
|
b8e0c0db9e | ||
|
|
7b7fb6cc82 | ||
|
|
62512ba215 | ||
|
|
e1beb64c01 | ||
|
|
c81f26ddad | ||
|
|
340114c2a1 | ||
|
|
cd7767b331 | ||
|
|
25289dad8a | ||
|
|
47c6917129 | ||
|
|
6379cda148 | ||
|
|
91a124ab8f | ||
|
|
2357a7135e | ||
|
|
da0b3b3de9 | ||
|
|
6664fb1716 | ||
|
|
1206f24fa9 | ||
|
|
ffb5823e84 |
@@ -1184,6 +1184,42 @@ class SubscribeChain(ChainBase):
|
||||
logger.error(f'follow用户分享订阅 {title} 添加失败:{message}')
|
||||
logger.info(f'follow用户分享订阅刷新完成,共添加 {success_count} 个订阅')
|
||||
|
||||
async def cache_calendar(self):
|
||||
"""
|
||||
预缓存订阅日历,实际上就是查询一遍所有订阅的媒体信息
|
||||
前端请示是异常的,所以需要使用异步缓存方法
|
||||
"""
|
||||
logger.info(f'开始预缓存订阅日历 ...')
|
||||
for subscribe in await SubscribeOper().async_list():
|
||||
if global_vars.is_system_stopped:
|
||||
break
|
||||
try:
|
||||
mtype = MediaType(subscribe.type)
|
||||
except ValueError:
|
||||
logger.error(f'订阅 {subscribe.name} 类型错误:{subscribe.type}')
|
||||
continue
|
||||
# 识别媒体信息
|
||||
if mtype == MediaType.MOVIE:
|
||||
mediainfo: MediaInfo = await self.async_recognize_media(mtype=mtype,
|
||||
tmdbid=subscribe.tmdbid,
|
||||
doubanid=subscribe.doubanid,
|
||||
bangumiid=subscribe.bangumiid,
|
||||
episode_group=subscribe.episode_group,
|
||||
cache=False)
|
||||
if not mediainfo:
|
||||
logger.warn(
|
||||
f'未识别到媒体信息,标题:{subscribe.name},tmdbid:{subscribe.tmdbid},doubanid:{subscribe.doubanid}')
|
||||
continue
|
||||
else:
|
||||
episodes = await TmdbChain().async_tmdb_episodes(tmdbid=subscribe.tmdbid,
|
||||
season=subscribe.season,
|
||||
episode_group=subscribe.episode_group)
|
||||
if not episodes:
|
||||
logger.warn(
|
||||
f'未识别到季集信息,标题:{subscribe.name},tmdbid:{subscribe.tmdbid},豆瓣ID:{subscribe.doubanid},季:{subscribe.season}')
|
||||
continue
|
||||
logger.info(f'订阅日历预缓存完成')
|
||||
|
||||
@staticmethod
|
||||
def __update_subscribe_note(subscribe: Subscribe, downloads: Optional[List[Context]]):
|
||||
"""
|
||||
|
||||
@@ -474,7 +474,11 @@ class MemoryBackend(CacheBackend):
|
||||
if region_cache is None:
|
||||
yield from ()
|
||||
return
|
||||
for item in region_cache.items():
|
||||
# 使用锁保护迭代过程,避免在迭代时缓存被修改
|
||||
with lock:
|
||||
# 创建快照避免并发修改问题
|
||||
items_snapshot = list(region_cache.items())
|
||||
for item in items_snapshot:
|
||||
yield item
|
||||
|
||||
def close(self) -> None:
|
||||
@@ -603,7 +607,11 @@ class AsyncMemoryBackend(AsyncCacheBackend):
|
||||
region_cache = self.__get_region_cache(region)
|
||||
if region_cache is None:
|
||||
return
|
||||
for item in region_cache.items():
|
||||
# 使用锁保护迭代过程,避免在迭代时缓存被修改
|
||||
with lock:
|
||||
# 创建快照避免并发修改问题
|
||||
items_snapshot = list(region_cache.items())
|
||||
for item in items_snapshot:
|
||||
yield item
|
||||
|
||||
async def close(self) -> None:
|
||||
@@ -1385,7 +1393,7 @@ class TTLCache(CacheProxy):
|
||||
def __init__(self,
|
||||
region: Optional[str] = DEFAULT_CACHE_REGION,
|
||||
maxsize: Optional[int] = DEFAULT_CACHE_SIZE,
|
||||
ttl: Optional[int]= DEFAULT_CACHE_TTL):
|
||||
ttl: Optional[int] = DEFAULT_CACHE_TTL):
|
||||
"""
|
||||
初始化 TTL 缓存
|
||||
|
||||
@@ -1404,7 +1412,7 @@ class LRUCache(CacheProxy):
|
||||
|
||||
def __init__(self,
|
||||
region: Optional[str] = DEFAULT_CACHE_REGION,
|
||||
maxsize: Optional[int]= DEFAULT_CACHE_SIZE
|
||||
maxsize: Optional[int] = DEFAULT_CACHE_SIZE
|
||||
):
|
||||
"""
|
||||
初始化 LRU 缓存
|
||||
|
||||
@@ -450,10 +450,7 @@ class EventManager(metaclass=Singleton):
|
||||
logger.debug(f"Handler {self.__get_handler_identifier(handler)} is disabled. Skipping execution")
|
||||
return
|
||||
|
||||
try:
|
||||
self.__invoke_handler_by_type_sync(handler, event)
|
||||
except Exception as e:
|
||||
self.__handle_event_error(event, handler, e)
|
||||
self.__invoke_handler_by_type_sync(handler, event)
|
||||
|
||||
async def __safe_invoke_handler_async(self, handler: Callable, event: Event):
|
||||
"""
|
||||
@@ -465,10 +462,7 @@ class EventManager(metaclass=Singleton):
|
||||
logger.debug(f"Handler {self.__get_handler_identifier(handler)} is disabled. Skipping execution")
|
||||
return
|
||||
|
||||
try:
|
||||
await self.__invoke_handler_by_type_async(handler, event)
|
||||
except Exception as e:
|
||||
self.__handle_event_error(event, handler, e)
|
||||
await self.__invoke_handler_by_type_async(handler, event)
|
||||
|
||||
def __invoke_handler_by_type_sync(self, handler: Callable, event: Event):
|
||||
"""
|
||||
@@ -486,7 +480,17 @@ class EventManager(metaclass=Singleton):
|
||||
|
||||
if class_name in plugin_manager.get_plugin_ids():
|
||||
# 插件处理器
|
||||
plugin_manager.run_plugin_method(class_name, method_name, event)
|
||||
plugin = plugin_manager.running_plugins.get(class_name)
|
||||
if not plugin:
|
||||
return
|
||||
method = getattr(plugin, method_name, None)
|
||||
if not method:
|
||||
return
|
||||
try:
|
||||
method(event)
|
||||
except Exception as e:
|
||||
self.__handle_event_error(event=event, module_name=plugin.name,
|
||||
class_name=class_name, method_name=method_name, e=e)
|
||||
elif class_name in module_manager.get_module_ids():
|
||||
# 模块处理器
|
||||
module = module_manager.get_running_module(class_name)
|
||||
@@ -495,16 +499,24 @@ class EventManager(metaclass=Singleton):
|
||||
method = getattr(module, method_name, None)
|
||||
if not method:
|
||||
return
|
||||
method(event)
|
||||
try:
|
||||
method(event)
|
||||
except Exception as e:
|
||||
self.__handle_event_error(event=event, module_name=module.get_name(),
|
||||
class_name=class_name, method_name=method_name, e=e)
|
||||
else:
|
||||
# 全局处理器
|
||||
class_obj = self.__get_class_instance(class_name)
|
||||
if not class_obj or not hasattr(class_obj, method_name):
|
||||
return
|
||||
method = getattr(class_obj, method_name)
|
||||
method = getattr(class_obj, method_name, None)
|
||||
if not method:
|
||||
return
|
||||
method(event)
|
||||
try:
|
||||
method(event)
|
||||
except Exception as e:
|
||||
self.__handle_event_error(event=event, module_name=class_name,
|
||||
class_name=class_name, method_name=method_name, e=e)
|
||||
|
||||
async def __invoke_handler_by_type_async(self, handler: Callable, event: Event):
|
||||
"""
|
||||
@@ -537,52 +549,62 @@ class EventManager(metaclass=Singleton):
|
||||
names = handler.__qualname__.split(".")
|
||||
return names[0], names[1]
|
||||
|
||||
@staticmethod
|
||||
async def __invoke_plugin_method_async(handler: Any, class_name: str, method_name: str, event: Event):
|
||||
async def __invoke_plugin_method_async(self, handler: Any, class_name: str, method_name: str, event: Event):
|
||||
"""
|
||||
异步调用插件方法
|
||||
"""
|
||||
plugin = handler.running_plugins.get(class_name)
|
||||
if plugin and hasattr(plugin, method_name):
|
||||
method = getattr(plugin, method_name)
|
||||
if not plugin:
|
||||
return
|
||||
method = getattr(plugin, method_name, None)
|
||||
if not method:
|
||||
return
|
||||
try:
|
||||
if inspect.iscoroutinefunction(method):
|
||||
await method(event)
|
||||
else:
|
||||
# 插件同步函数在异步环境中运行,避免阻塞
|
||||
await run_in_threadpool(method, event)
|
||||
except Exception as e:
|
||||
self.__handle_event_error(event=event, handler=handler, e=e, module_name=plugin.name)
|
||||
|
||||
@staticmethod
|
||||
async def __invoke_module_method_async(handler: Any, class_name: str, method_name: str, event: Event):
|
||||
async def __invoke_module_method_async(self, handler: Any, class_name: str, method_name: str, event: Event):
|
||||
"""
|
||||
异步调用模块方法
|
||||
"""
|
||||
module = handler.get_running_module(class_name)
|
||||
if not module:
|
||||
return
|
||||
|
||||
method = getattr(module, method_name, None)
|
||||
if not method:
|
||||
return
|
||||
|
||||
if inspect.iscoroutinefunction(method):
|
||||
await method(event)
|
||||
else:
|
||||
method(event)
|
||||
try:
|
||||
if inspect.iscoroutinefunction(method):
|
||||
await method(event)
|
||||
else:
|
||||
method(event)
|
||||
except Exception as e:
|
||||
self.__handle_event_error(event=event, module_name=module.get_name(),
|
||||
class_name=class_name, method_name=method_name, e=e)
|
||||
|
||||
async def __invoke_global_method_async(self, class_name: str, method_name: str, event: Event):
|
||||
"""
|
||||
异步调用全局对象方法
|
||||
"""
|
||||
class_obj = self.__get_class_instance(class_name)
|
||||
if not class_obj or not hasattr(class_obj, method_name):
|
||||
if not class_obj:
|
||||
return
|
||||
|
||||
method = getattr(class_obj, method_name)
|
||||
|
||||
if inspect.iscoroutinefunction(method):
|
||||
await method(event)
|
||||
else:
|
||||
method(event)
|
||||
method = getattr(class_obj, method_name, None)
|
||||
if not method:
|
||||
return
|
||||
try:
|
||||
if inspect.iscoroutinefunction(method):
|
||||
await method(event)
|
||||
else:
|
||||
method(event)
|
||||
except Exception as e:
|
||||
self.__handle_event_error(event=event, module_name=class_name,
|
||||
class_name=class_name, method_name=method_name, e=e)
|
||||
|
||||
@staticmethod
|
||||
def __get_class_instance(class_name: str):
|
||||
@@ -609,7 +631,11 @@ class EventManager(metaclass=Singleton):
|
||||
module_name = f"app.chain.{class_name[:-5].lower()}"
|
||||
module = importlib.import_module(module_name)
|
||||
elif class_name.endswith("Helper"):
|
||||
module_name = f"app.helper.{class_name[:-6].lower()}"
|
||||
# 特殊处理 Async 类
|
||||
if class_name.startswith("Async"):
|
||||
module_name = f"app.helper.{class_name[5:-6].lower()}"
|
||||
else:
|
||||
module_name = f"app.helper.{class_name[:-6].lower()}"
|
||||
module = importlib.import_module(module_name)
|
||||
else:
|
||||
module_name = f"app.{class_name.lower()}"
|
||||
@@ -649,18 +675,16 @@ class EventManager(metaclass=Singleton):
|
||||
"""
|
||||
logger.debug(f"{stage} - {event}")
|
||||
|
||||
def __handle_event_error(self, event: Event, handler: Callable, e: Exception):
|
||||
def __handle_event_error(self, event: Event, module_name: str,
|
||||
class_name: str, method_name: str, e: Exception):
|
||||
"""
|
||||
全局错误处理器,用于处理事件处理中的异常
|
||||
"""
|
||||
logger.error(f"事件处理出错:{str(e)} - {traceback.format_exc()}")
|
||||
|
||||
names = handler.__qualname__.split(".")
|
||||
class_name, method_name = names[0], names[1]
|
||||
logger.error(f"{module_name} 事件处理出错:{str(e)} - {traceback.format_exc()}")
|
||||
|
||||
# 发送系统错误通知
|
||||
from app.helper.message import MessageHelper
|
||||
MessageHelper().put(title=f"{event.event_type} 事件处理出错",
|
||||
MessageHelper().put(title=f"{module_name} 处理事件 {event.event_type} 时出错",
|
||||
message=f"{class_name}.{method_name}:{str(e)}",
|
||||
role="system")
|
||||
self.send_event(
|
||||
|
||||
@@ -1189,6 +1189,7 @@ class PluginManager(metaclass=Singleton):
|
||||
async def async_get_online_plugins(self, force: bool = False) -> List[schemas.Plugin]:
|
||||
"""
|
||||
异步获取所有在线插件信息
|
||||
:param force: 是否强制刷新(忽略缓存)
|
||||
"""
|
||||
if not settings.PLUGIN_MARKET:
|
||||
return []
|
||||
|
||||
@@ -20,7 +20,7 @@ class SiteUserData(Base):
|
||||
# 用户名
|
||||
username = Column(String)
|
||||
# 用户ID
|
||||
userid = Column(Integer)
|
||||
userid = Column(String)
|
||||
# 用户等级
|
||||
user_level = Column(String)
|
||||
# 加入时间
|
||||
|
||||
@@ -119,6 +119,14 @@ class SubscribeOper(DbOper):
|
||||
return Subscribe.get_by_state(self._db, state)
|
||||
return Subscribe.list(self._db)
|
||||
|
||||
async def async_list(self, state: Optional[str] = None) -> List[Subscribe]:
|
||||
"""
|
||||
异步获取订阅列表
|
||||
"""
|
||||
if state:
|
||||
return await Subscribe.async_get_by_state(self._db, state)
|
||||
return await Subscribe.async_list(self._db)
|
||||
|
||||
def delete(self, sid: int):
|
||||
"""
|
||||
删除订阅
|
||||
|
||||
@@ -911,10 +911,10 @@ class PluginHelper(metaclass=WeakSingleton):
|
||||
"""
|
||||
# 异步版本直接调用不带缓存的版本(缓存在异步环境下可能有并发问题)
|
||||
if force:
|
||||
return await self._async_get_plugins_uncached(repo_url, package_version)
|
||||
await self._async_get_plugins_cached.cache_clear()
|
||||
return await self._async_get_plugins_cached(repo_url, package_version)
|
||||
|
||||
@cached(maxsize=64, ttl=1800)
|
||||
@cached(maxsize=128, ttl=1800)
|
||||
async def _async_get_plugins_cached(self, repo_url: str,
|
||||
package_version: Optional[str] = None) -> Optional[Dict[str, dict]]:
|
||||
"""
|
||||
|
||||
@@ -1,7 +1,3 @@
|
||||
import os
|
||||
import signal
|
||||
import threading
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Tuple
|
||||
|
||||
@@ -73,34 +69,6 @@ class SystemHelper:
|
||||
logger.debug(f"获取容器ID失败: {str(e)}")
|
||||
return container_id.strip() if container_id else None
|
||||
|
||||
@staticmethod
|
||||
def _check_restart_policy() -> bool:
|
||||
"""
|
||||
检查当前容器是否配置了自动重启策略
|
||||
"""
|
||||
try:
|
||||
# 获取当前容器ID
|
||||
container_id = SystemHelper._get_container_id()
|
||||
if not container_id:
|
||||
return False
|
||||
|
||||
# 创建 Docker 客户端
|
||||
client = docker.DockerClient(base_url=settings.DOCKER_CLIENT_API)
|
||||
# 获取容器信息
|
||||
container = client.containers.get(container_id)
|
||||
restart_policy = container.attrs.get('HostConfig', {}).get('RestartPolicy', {})
|
||||
policy_name = restart_policy.get('Name', 'no')
|
||||
# 检查是否有有效的重启策略
|
||||
auto_restart_policies = ['always', 'unless-stopped', 'on-failure']
|
||||
has_restart_policy = policy_name in auto_restart_policies
|
||||
|
||||
logger.info(f"容器重启策略: {policy_name}, 支持自动重启: {has_restart_policy}")
|
||||
return has_restart_policy
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"检查重启策略失败: {str(e)}")
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def restart() -> Tuple[bool, str]:
|
||||
"""
|
||||
@@ -109,45 +77,8 @@ class SystemHelper:
|
||||
if not SystemUtils.is_docker():
|
||||
return False, "非Docker环境,无法重启!"
|
||||
|
||||
try:
|
||||
# 检查容器是否配置了自动重启策略
|
||||
has_restart_policy = SystemHelper._check_restart_policy()
|
||||
if has_restart_policy:
|
||||
# 有重启策略,使用优雅退出方式
|
||||
logger.info("检测到容器配置了自动重启策略,使用优雅重启方式...")
|
||||
# 启动优雅退出超时监控
|
||||
SystemHelper._start_graceful_shutdown_monitor()
|
||||
# 发送SIGTERM信号给当前进程,触发优雅停止
|
||||
os.kill(os.getpid(), signal.SIGTERM)
|
||||
return True, ""
|
||||
else:
|
||||
# 没有重启策略,使用Docker API强制重启
|
||||
logger.info("容器未配置自动重启策略,使用Docker API重启...")
|
||||
return SystemHelper._docker_api_restart()
|
||||
except Exception as err:
|
||||
logger.error(f"重启失败: {str(err)}")
|
||||
# 降级为Docker API重启
|
||||
logger.warning("降级为Docker API重启...")
|
||||
return SystemHelper._docker_api_restart()
|
||||
|
||||
@staticmethod
|
||||
def _start_graceful_shutdown_monitor():
|
||||
"""
|
||||
启动优雅退出超时监控
|
||||
如果30秒内进程没有退出,则使用Docker API强制重启
|
||||
"""
|
||||
|
||||
def monitor_thread():
|
||||
time.sleep(30) # 等待30秒
|
||||
logger.warning("优雅退出超时30秒,使用Docker API强制重启...")
|
||||
try:
|
||||
SystemHelper._docker_api_restart()
|
||||
except Exception as e:
|
||||
logger.error(f"强制重启失败: {str(e)}")
|
||||
|
||||
# 在后台线程中启动监控
|
||||
thread = threading.Thread(target=monitor_thread, daemon=True)
|
||||
thread.start()
|
||||
logger.info("正在重启容器...")
|
||||
return SystemHelper._docker_api_restart()
|
||||
|
||||
@staticmethod
|
||||
def _docker_api_restart() -> Tuple[bool, str]:
|
||||
|
||||
@@ -25,7 +25,7 @@ from app.log import logger
|
||||
from app.schemas import ConfigChangeEventData
|
||||
from app.schemas import FileItem
|
||||
from app.schemas.types import SystemConfigKey, EventType
|
||||
from app.utils.singleton import Singleton
|
||||
from app.utils.singleton import SingletonClass
|
||||
|
||||
lock = Lock()
|
||||
snapshot_lock = Lock()
|
||||
@@ -54,7 +54,7 @@ class FileMonitorHandler(FileSystemEventHandler):
|
||||
file_size=Path(event.dest_path).stat().st_size)
|
||||
|
||||
|
||||
class Monitor(metaclass=Singleton):
|
||||
class Monitor(metaclass=SingletonClass):
|
||||
"""
|
||||
目录监控处理链,单例模式
|
||||
"""
|
||||
|
||||
@@ -78,7 +78,7 @@ class FastAPIMonitor:
|
||||
# 告警状态
|
||||
self.alerts: List[str] = []
|
||||
|
||||
logger.info("FastAPI性能监控器已初始化")
|
||||
logger.debug("FastAPI性能监控器已初始化")
|
||||
|
||||
def record_request(self, request: Request, response: Response, response_time: float):
|
||||
"""
|
||||
@@ -172,7 +172,7 @@ class FastAPIMonitor:
|
||||
'count': 0,
|
||||
'total_time': 0,
|
||||
'errors': 0,
|
||||
'avg_time': 0
|
||||
'avg_time': 0.0
|
||||
})
|
||||
|
||||
for req in self.request_history:
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
import asyncio
|
||||
import inspect
|
||||
import threading
|
||||
import traceback
|
||||
from datetime import datetime, timedelta
|
||||
@@ -21,13 +23,13 @@ from app.core.config import settings
|
||||
from app.core.event import eventmanager, Event
|
||||
from app.core.plugin import PluginManager
|
||||
from app.db.systemconfig_oper import SystemConfigOper
|
||||
from app.helper.message import MessageHelper
|
||||
from app.helper.sites import SitesHelper # noqa
|
||||
from app.helper.message import MessageHelper
|
||||
from app.helper.wallpaper import WallpaperHelper
|
||||
from app.log import logger
|
||||
from app.schemas import Notification, NotificationType, Workflow, ConfigChangeEventData
|
||||
from app.schemas.types import EventType, SystemConfigKey
|
||||
from app.utils.singleton import Singleton
|
||||
from app.utils.singleton import SingletonClass
|
||||
from app.utils.timer import TimerUtils
|
||||
|
||||
lock = threading.Lock()
|
||||
@@ -37,7 +39,7 @@ class SchedulerChain(ChainBase):
|
||||
pass
|
||||
|
||||
|
||||
class Scheduler(metaclass=Singleton):
|
||||
class Scheduler(metaclass=SingletonClass):
|
||||
"""
|
||||
定时任务管理
|
||||
"""
|
||||
@@ -55,6 +57,8 @@ class Scheduler(metaclass=Singleton):
|
||||
self._auth_count = 0
|
||||
# 用户认证失败消息发送
|
||||
self._auth_message = False
|
||||
# 当前事件循环
|
||||
self.loop = asyncio.get_event_loop()
|
||||
self.init()
|
||||
|
||||
@eventmanager.register(EventType.ConfigChanged)
|
||||
@@ -162,6 +166,19 @@ class Scheduler(metaclass=Singleton):
|
||||
"name": "推荐缓存",
|
||||
"func": RecommendChain().refresh_recommend,
|
||||
"running": False,
|
||||
},
|
||||
"plugin_market_refresh": {
|
||||
"name": "插件市场缓存",
|
||||
"func": PluginManager().async_get_online_plugins,
|
||||
"running": False,
|
||||
"kwargs": {
|
||||
"force": True
|
||||
}
|
||||
},
|
||||
"subscribe_calendar_cache": {
|
||||
"name": "订阅日历缓存",
|
||||
"func": SubscribeChain().cache_calendar,
|
||||
"running": False
|
||||
}
|
||||
}
|
||||
|
||||
@@ -180,7 +197,7 @@ class Scheduler(metaclass=Singleton):
|
||||
id="cookiecloud",
|
||||
name="同步CookieCloud站点",
|
||||
minutes=int(settings.COOKIECLOUD_INTERVAL),
|
||||
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=1),
|
||||
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=5),
|
||||
kwargs={
|
||||
'job_id': 'cookiecloud'
|
||||
}
|
||||
@@ -195,7 +212,7 @@ class Scheduler(metaclass=Singleton):
|
||||
id="mediaserver_sync",
|
||||
name="同步媒体服务器",
|
||||
hours=int(settings.MEDIASERVER_SYNC_INTERVAL),
|
||||
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=5),
|
||||
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=10),
|
||||
kwargs={
|
||||
'job_id': 'mediaserver_sync'
|
||||
}
|
||||
@@ -301,7 +318,7 @@ class Scheduler(metaclass=Singleton):
|
||||
id="random_wallpager",
|
||||
name="壁纸缓存",
|
||||
minutes=30,
|
||||
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(seconds=3),
|
||||
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(seconds=1),
|
||||
kwargs={
|
||||
'job_id': 'random_wallpager'
|
||||
}
|
||||
@@ -363,12 +380,37 @@ class Scheduler(metaclass=Singleton):
|
||||
id="recommend_refresh",
|
||||
name="推荐缓存",
|
||||
hours=24,
|
||||
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(seconds=3),
|
||||
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(seconds=5),
|
||||
kwargs={
|
||||
'job_id': 'recommend_refresh'
|
||||
}
|
||||
)
|
||||
|
||||
# 插件市场缓存
|
||||
self._scheduler.add_job(
|
||||
self.start,
|
||||
"interval",
|
||||
id="plugin_market_refresh",
|
||||
name="插件市场缓存",
|
||||
minutes=30,
|
||||
kwargs={
|
||||
'job_id': 'plugin_market_refresh'
|
||||
}
|
||||
)
|
||||
|
||||
# 订阅日历缓存
|
||||
self._scheduler.add_job(
|
||||
self.start,
|
||||
"interval",
|
||||
id="subscribe_calendar_cache",
|
||||
name="订阅日历缓存",
|
||||
hours=6,
|
||||
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=2),
|
||||
kwargs={
|
||||
'job_id': 'subscribe_calendar_cache'
|
||||
}
|
||||
)
|
||||
|
||||
# 初始化工作流服务
|
||||
self.init_workflow_jobs()
|
||||
|
||||
@@ -409,6 +451,13 @@ class Scheduler(metaclass=Singleton):
|
||||
"""
|
||||
启动定时服务
|
||||
"""
|
||||
|
||||
def __start_coro(coro):
|
||||
"""
|
||||
启动协程
|
||||
"""
|
||||
return asyncio.run_coroutine_threadsafe(coro, self.loop)
|
||||
|
||||
# 获取定时任务
|
||||
job = self.__prepare_job(job_id)
|
||||
if not job:
|
||||
@@ -417,7 +466,13 @@ class Scheduler(metaclass=Singleton):
|
||||
try:
|
||||
if not kwargs:
|
||||
kwargs = job.get("kwargs") or {}
|
||||
job["func"](*args, **kwargs)
|
||||
func = job.get("func")
|
||||
if not func:
|
||||
return
|
||||
if inspect.iscoroutinefunction(func):
|
||||
__start_coro(func(*args, **kwargs))
|
||||
else:
|
||||
job["func"](*args, **kwargs)
|
||||
except Exception as e:
|
||||
logger.error(f"定时任务 {job.get('name')} 执行失败:{str(e)} - {traceback.format_exc()}")
|
||||
MessageHelper().put(title=f"{job.get('name')} 执行失败",
|
||||
@@ -519,7 +574,7 @@ class Scheduler(metaclass=Singleton):
|
||||
except JobLookupError:
|
||||
pass
|
||||
if job_removed:
|
||||
logger.info(f"移除插件服务({plugin_name}):{service.get('name')}")
|
||||
logger.info(f"移除插件服务({plugin_name}):{service.get('name')}") # noqa
|
||||
except Exception as e:
|
||||
logger.error(f"移除插件服务失败:{str(e)} - {job_id}: {service}")
|
||||
SchedulerChain().messagehelper.put(title=f"插件 {plugin_name} 服务移除失败",
|
||||
|
||||
@@ -77,7 +77,7 @@ class SiteUserData(BaseModel):
|
||||
# 用户名
|
||||
username: Optional[str] = None
|
||||
# 用户ID
|
||||
userid: Optional[Union[int, str]] = None
|
||||
userid: Optional[str] = None
|
||||
# 用户等级
|
||||
user_level: Optional[str] = None
|
||||
# 加入时间
|
||||
|
||||
@@ -35,10 +35,10 @@ async def lifespan(app: FastAPI):
|
||||
定义应用的生命周期事件
|
||||
"""
|
||||
print("Starting up...")
|
||||
# 初始化模块
|
||||
init_modules()
|
||||
# 初始化路由
|
||||
init_routers(app)
|
||||
# 初始化模块
|
||||
init_modules()
|
||||
# 恢复插件备份
|
||||
SystemChain().restore_plugins()
|
||||
# 初始化插件
|
||||
|
||||
@@ -1,83 +0,0 @@
|
||||
import asyncio
|
||||
import threading
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from typing import Coroutine, Any, TypeVar
|
||||
|
||||
T = TypeVar('T')
|
||||
|
||||
|
||||
class AsyncUtils:
|
||||
"""
|
||||
异步工具类,用于在同步环境中调用异步方法
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def run_async(coro: Coroutine[Any, Any, T]) -> T:
|
||||
"""
|
||||
在同步环境中安全地执行异步协程
|
||||
|
||||
:param coro: 要执行的协程
|
||||
:return: 协程的返回值
|
||||
:raises: 协程执行过程中的任何异常
|
||||
"""
|
||||
try:
|
||||
# 尝试获取当前运行的事件循环
|
||||
asyncio.get_running_loop()
|
||||
# 如果有运行中的事件循环,在新线程中执行
|
||||
return AsyncUtils._run_in_thread(coro)
|
||||
except RuntimeError:
|
||||
# 没有运行中的事件循环,直接使用 asyncio.run
|
||||
return asyncio.run(coro)
|
||||
|
||||
@staticmethod
|
||||
def _run_in_thread(coro: Coroutine[Any, Any, T]) -> T:
|
||||
"""
|
||||
在新线程中创建事件循环并执行协程
|
||||
|
||||
:param coro: 要执行的协程
|
||||
:return: 协程的返回值
|
||||
"""
|
||||
result = None
|
||||
exception = None
|
||||
|
||||
def _run():
|
||||
nonlocal result, exception
|
||||
try:
|
||||
# 在新线程中创建新的事件循环
|
||||
new_loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(new_loop)
|
||||
try:
|
||||
result = new_loop.run_until_complete(coro)
|
||||
finally:
|
||||
new_loop.close()
|
||||
except Exception as e:
|
||||
exception = e
|
||||
|
||||
# 在新线程中执行
|
||||
thread = threading.Thread(target=_run)
|
||||
thread.start()
|
||||
thread.join()
|
||||
|
||||
if exception:
|
||||
raise exception
|
||||
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def run_async_in_executor(coro: Coroutine[Any, Any, T]) -> T:
|
||||
"""
|
||||
使用线程池执行器在新线程中运行异步协程
|
||||
|
||||
:param coro: 要执行的协程
|
||||
:return: 协程的返回值
|
||||
"""
|
||||
try:
|
||||
# 检查是否有运行中的事件循环
|
||||
asyncio.get_running_loop()
|
||||
# 有运行中的事件循环,使用线程池
|
||||
with ThreadPoolExecutor() as executor:
|
||||
future = executor.submit(asyncio.run, coro)
|
||||
return future.result()
|
||||
except RuntimeError:
|
||||
# 没有运行中的事件循环,直接运行
|
||||
return asyncio.run(coro)
|
||||
80
database/versions/a946dae52526_2_2_1.py
Normal file
80
database/versions/a946dae52526_2_2_1.py
Normal file
@@ -0,0 +1,80 @@
|
||||
"""2.2.1
|
||||
|
||||
Revision ID: a946dae52526
|
||||
Revises: 5b3355c964bb
|
||||
Create Date: 2025-08-20 17:50:00.000000
|
||||
|
||||
"""
|
||||
import sqlalchemy as sa
|
||||
from alembic import op
|
||||
|
||||
from app.log import logger
|
||||
from app.core.config import settings
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = 'a946dae52526'
|
||||
down_revision = '5b3355c964bb'
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
"""
|
||||
升级:将SiteUserData表的userid字段从Integer改为String
|
||||
"""
|
||||
connection = op.get_bind()
|
||||
|
||||
if settings.DB_TYPE.lower() == "postgresql":
|
||||
# PostgreSQL数据库迁移
|
||||
migrate_postgresql_userid(connection)
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
"""
|
||||
降级:将SiteUserData表的userid字段从String改回Integer
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
def migrate_postgresql_userid(connection):
|
||||
"""
|
||||
PostgreSQL数据库userid字段迁移
|
||||
"""
|
||||
try:
|
||||
logger.info("开始PostgreSQL数据库userid字段迁移...")
|
||||
|
||||
# 1. 创建临时列
|
||||
connection.execute(sa.text("""
|
||||
ALTER TABLE siteuserdata
|
||||
ADD COLUMN userid_new VARCHAR
|
||||
"""))
|
||||
|
||||
# 2. 将现有数据转换为字符串并复制到新列
|
||||
connection.execute(sa.text("""
|
||||
UPDATE siteuserdata
|
||||
SET userid_new = CAST(userid AS VARCHAR)
|
||||
WHERE userid IS NOT NULL
|
||||
"""))
|
||||
|
||||
# 3. 删除旧列
|
||||
connection.execute(sa.text("""
|
||||
ALTER TABLE siteuserdata
|
||||
DROP COLUMN userid
|
||||
"""))
|
||||
|
||||
# 4. 重命名新列
|
||||
connection.execute(sa.text("""
|
||||
ALTER TABLE siteuserdata
|
||||
RENAME COLUMN userid_new TO userid
|
||||
"""))
|
||||
|
||||
logger.info("PostgreSQL数据库userid字段迁移完成")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"PostgreSQL数据库userid字段迁移失败: {e}")
|
||||
raise
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -1,2 +1,2 @@
|
||||
APP_VERSION = 'v2.7.6'
|
||||
FRONTEND_VERSION = 'v2.7.6'
|
||||
APP_VERSION = 'v2.7.7'
|
||||
FRONTEND_VERSION = 'v2.7.7'
|
||||
|
||||
Reference in New Issue
Block a user