Compare commits

...

29 Commits

Author SHA1 Message Date
jxxghp
c060d7e3e0 更新 postgresql-setup.md 2025-08-23 22:26:34 +08:00
jxxghp
ba96678822 v2.7.5 2025-08-23 20:46:36 +08:00
jxxghp
4f6354f383 Merge pull request #4820 from DDS-Derek/dev 2025-08-23 18:46:52 +08:00
DDSRem
2766e80346 fix(database): use logger as log output
Co-Authored-By: Aqr-K <95741669+Aqr-K@users.noreply.github.com>
2025-08-23 18:36:11 +08:00
jxxghp
7cc3777a60 fix async cache 2025-08-23 18:34:47 +08:00
DDSRem
cb1dd9f17d fix(database): upgrade error in pg database
Co-Authored-By: Aqr-K <95741669+Aqr-K@users.noreply.github.com>
2025-08-23 18:12:13 +08:00
jxxghp
31f342fe4f fix torrent 2025-08-23 18:10:33 +08:00
jxxghp
e90359eb08 fix douban 2025-08-23 15:56:30 +08:00
jxxghp
58b0768a30 fix redis key 2025-08-23 15:53:03 +08:00
jxxghp
3b04506893 fix redis key 2025-08-23 15:40:38 +08:00
jxxghp
354165aa0a fix cache 2025-08-23 14:21:50 +08:00
jxxghp
343109836f fix cache 2025-08-23 14:06:44 +08:00
jxxghp
fcadac2adb Merge pull request #4817 from jxxghp/cursor/add-dict-operations-to-cachebackend-3877 2025-08-23 12:42:04 +08:00
Cursor Agent
5e7dcdfe97 Modify cache region key generation to use consistent prefix format
Co-authored-by: jxxghp <jxxghp@live.cn>
2025-08-23 04:13:25 +00:00
Cursor Agent
2ec9a57391 Remove implementation and migration documentation files
Co-authored-by: jxxghp <jxxghp@live.cn>
2025-08-23 04:07:04 +00:00
Cursor Agent
973c545723 Checkpoint before follow-up message
Co-authored-by: jxxghp <jxxghp@live.cn>
2025-08-23 04:06:16 +00:00
Cursor Agent
fd62eecfef Simplify TTLCache, remove dict-like methods, enhance Cache interface
Co-authored-by: jxxghp <jxxghp@live.cn>
2025-08-23 04:01:17 +00:00
Cursor Agent
b5ca7058c2 Add helper methods for cache backend in sync and async versions
Co-authored-by: jxxghp <jxxghp@live.cn>
2025-08-23 03:58:04 +00:00
Cursor Agent
57a48f099f Add dict-like operations to CacheBackend with sync and async support
Co-authored-by: jxxghp <jxxghp@live.cn>
2025-08-23 03:50:52 +00:00
jxxghp
4699f511bf Handle magnet links in torrent parsing and downloader modules (#4815)
Co-authored-by: Cursor Agent <cursoragent@cursor.com>
Co-authored-by: jxxghp <jxxghp@live.cn>
2025-08-23 10:51:32 +08:00
jxxghp
cd8f7e72e0 同步错误修复 2025-08-22 17:33:24 +08:00
jxxghp
78803fa284 fix search_imdbid type 2025-08-22 16:37:30 +08:00
jxxghp
2e8d75df16 fix monitor cache 2025-08-22 15:30:49 +08:00
jxxghp
7e3bbfd960 Merge pull request #4807 from carolcoral/v2 2025-08-22 15:23:04 +08:00
jxxghp
1734d53b3c Replace file-based snapshot caching with FileCache implementation (#4809)
Co-authored-by: Cursor Agent <cursoragent@cursor.com>
Co-authored-by: jxxghp <jxxghp@live.cn>
2025-08-22 13:59:30 +08:00
jxxghp
f37540f4e5 fix get_rss timeout 2025-08-22 11:44:16 +08:00
jxxghp
addb9d836a remove cache singleton 2025-08-22 11:33:53 +08:00
Carol
4184d8c7ac 补充迁移数据库异常的注意事项
add: sqlite迁移到postgresql的注意事项
2025-08-22 10:55:26 +08:00
jxxghp
724c15a68c add 插件内存统计API 2025-08-22 09:46:11 +08:00
30 changed files with 1227 additions and 458 deletions

View File

@@ -13,7 +13,7 @@ from app import schemas
from app.command import Command
from app.core.config import settings
from app.core.plugin import PluginManager
from app.core.security import verify_apikey, verify_token
from app.core.security import verify_apikey, verify_token, verify_apitoken
from app.db.models import User
from app.db.systemconfig_oper import SystemConfigOper
from app.db.user_oper import get_current_active_superuser, get_current_active_superuser_async
@@ -21,6 +21,7 @@ from app.factory import app
from app.helper.plugin import PluginHelper
from app.log import logger
from app.scheduler import Scheduler
from app.schemas.plugin import PluginMemoryInfo
from app.schemas.types import SystemConfigKey
PROTECTED_ROUTES = {"/api/v1/openapi.json", "/docs", "/docs/oauth2-redirect", "/redoc"}
@@ -463,6 +464,87 @@ async def update_folder_plugins(folder_name: str, plugin_ids: List[str],
return schemas.Response(success=True, message=f"文件夹 '{folder_name}' 中的插件已更新")
@router.post("/clone/{plugin_id}", summary="创建插件分身", response_model=schemas.Response)
def clone_plugin(plugin_id: str,
clone_data: dict,
_: User = Depends(get_current_active_superuser)) -> Any:
"""
创建插件分身
"""
try:
success, message = PluginManager().clone_plugin(
plugin_id=plugin_id,
suffix=clone_data.get("suffix", ""),
name=clone_data.get("name", ""),
description=clone_data.get("description", ""),
version=clone_data.get("version", ""),
icon=clone_data.get("icon", "")
)
if success:
# 注册插件服务
reload_plugin(message)
# 将分身插件添加到原插件所在的文件夹中
_add_clone_to_plugin_folder(plugin_id, message)
return schemas.Response(success=True, message="插件分身创建成功")
else:
return schemas.Response(success=False, message=message)
except Exception as e:
logger.error(f"创建插件分身失败:{str(e)}")
return schemas.Response(success=False, message=f"创建插件分身失败:{str(e)}")
@router.get("/memory", summary="插件内存使用统计", response_model=List[PluginMemoryInfo])
def plugin_memory_stats(_: Annotated[str, Depends(verify_apitoken)]) -> Any:
"""
获取所有插件的内存使用统计信息
"""
try:
plugin_manager = PluginManager()
memory_stats = plugin_manager.get_plugin_memory_stats()
return memory_stats
except Exception as e:
logger.error(f"获取插件内存统计失败:{str(e)}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"获取插件内存统计失败:{str(e)}")
@router.get("/memory/{plugin_id}", summary="单个插件内存使用统计", response_model=PluginMemoryInfo)
def plugin_memory_stat(plugin_id: str, _: Annotated[str, Depends(verify_apitoken)]) -> Any:
"""
获取指定插件的内存使用统计信息
"""
try:
plugin_manager = PluginManager()
memory_stats = plugin_manager.get_plugin_memory_stats(plugin_id)
if not memory_stats:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,
detail=f"插件 {plugin_id} 不存在或未运行")
return memory_stats[0]
except HTTPException:
raise
except Exception as e:
logger.error(f"获取插件 {plugin_id} 内存统计失败:{str(e)}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"获取插件内存统计失败:{str(e)}")
@router.delete("/memory/cache", summary="清除插件内存统计缓存")
def clear_plugin_memory_cache(_: Annotated[str, Depends(verify_apitoken)],
plugin_id: Optional[str] = None) -> Any:
"""
清除插件内存统计缓存
"""
try:
plugin_manager = PluginManager()
plugin_manager.clear_plugin_memory_cache(plugin_id)
message = f"已清除插件 {plugin_id} 的内存统计缓存" if plugin_id else "已清除所有插件的内存统计缓存"
return schemas.Response(success=True, message=message)
except Exception as e:
logger.error(f"清除插件内存统计缓存失败:{str(e)}")
return schemas.Response(success=False, message=f"清除缓存失败:{str(e)}")
@router.get("/{plugin_id}", summary="获取插件配置")
async def plugin_config(plugin_id: str,
_: User = Depends(get_current_active_superuser_async)) -> dict:
@@ -528,36 +610,6 @@ def uninstall_plugin(plugin_id: str,
return schemas.Response(success=True)
@router.post("/clone/{plugin_id}", summary="创建插件分身", response_model=schemas.Response)
def clone_plugin(plugin_id: str,
clone_data: dict,
_: User = Depends(get_current_active_superuser)) -> Any:
"""
创建插件分身
"""
try:
success, message = PluginManager().clone_plugin(
plugin_id=plugin_id,
suffix=clone_data.get("suffix", ""),
name=clone_data.get("name", ""),
description=clone_data.get("description", ""),
version=clone_data.get("version", ""),
icon=clone_data.get("icon", "")
)
if success:
# 注册插件服务
reload_plugin(message)
# 将分身插件添加到原插件所在的文件夹中
_add_clone_to_plugin_folder(plugin_id, message)
return schemas.Response(success=True, message="插件分身创建成功")
else:
return schemas.Response(success=False, message=message)
except Exception as e:
logger.error(f"创建插件分身失败:{str(e)}")
return schemas.Response(success=False, message=f"创建插件分身失败:{str(e)}")
def _add_clone_to_plugin_folder(original_plugin_id: str, clone_plugin_id: str):
"""
将分身插件添加到原插件所在的文件夹中

View File

@@ -313,6 +313,11 @@ class SiteChain(ChainBase):
siteoper = SiteOper()
rsshelper = RssHelper()
for domain, cookie in cookies.items():
# 检查系统是否停止
if global_vars.is_system_stopped:
logger.info("系统正在停止中断CookieCloud同步")
return False, "系统正在停止,同步被中断"
# 索引器信息
indexer = siteshelper.get_indexer(domain)
# 数据库的站点信息
@@ -331,7 +336,7 @@ class SiteChain(ChainBase):
cookie=cookie,
ua=site_info.ua or settings.USER_AGENT,
proxy=True if site_info.proxy else False,
timeout=site_info.timeout
timeout=site_info.timeout or 15
)
if rss_url:
logger.info(f"更新站点 {domain} RSS地址 ...")

View File

@@ -16,7 +16,6 @@ from cachetools.keys import hashkey
from app.core.config import settings
from app.helper.redis import RedisHelper, AsyncRedisHelper
from app.log import logger
from app.utils.singleton import Singleton
# 默认缓存区
DEFAULT_CACHE_REGION = "DEFAULT"
@@ -24,11 +23,53 @@ DEFAULT_CACHE_REGION = "DEFAULT"
lock = threading.Lock()
class CacheBackend(ABC, metaclass=Singleton):
class CacheBackend(ABC):
"""
缓存后端基类,定义通用的缓存接口
"""
def __getitem__(self, key: str) -> Any:
"""
获取缓存项,类似 dict[key]
"""
value = self.get(key)
if value is None:
raise KeyError(key)
return value
def __setitem__(self, key: str, value: Any) -> None:
"""
设置缓存项,类似 dict[key] = value
"""
self.set(key, value)
def __delitem__(self, key: str) -> None:
"""
删除缓存项,类似 del dict[key]
"""
if not self.exists(key):
raise KeyError(key)
self.delete(key)
def __contains__(self, key: str) -> bool:
"""
检查键是否存在,类似 key in dict
"""
return self.exists(key)
def __iter__(self):
"""
返回缓存的迭代器,类似 iter(dict)
"""
for key, _ in self.items():
yield key
def __len__(self) -> int:
"""
返回缓存项的数量,类似 len(dict)
"""
return sum(1 for _ in self.items())
@abstractmethod
def set(self, key: str, value: Any, ttl: Optional[int] = None,
region: Optional[str] = DEFAULT_CACHE_REGION, **kwargs) -> None:
@@ -94,6 +135,62 @@ class CacheBackend(ABC, metaclass=Singleton):
"""
pass
def keys(self, region: Optional[str] = DEFAULT_CACHE_REGION) -> Generator[str, None, None]:
"""
获取所有缓存键,类似 dict.keys()
"""
for key, _ in self.items(region=region):
yield key
def values(self, region: Optional[str] = DEFAULT_CACHE_REGION) -> Generator[Any, None, None]:
"""
获取所有缓存值,类似 dict.values()
"""
for _, value in self.items(region=region):
yield value
def update(self, other: Dict[str, Any], region: Optional[str] = DEFAULT_CACHE_REGION,
ttl: Optional[int] = None, **kwargs) -> None:
"""
更新缓存,类似 dict.update()
"""
for key, value in other.items():
self.set(key, value, ttl=ttl, region=region, **kwargs)
def pop(self, key: str, default: Any = None, region: Optional[str] = DEFAULT_CACHE_REGION) -> Any:
"""
弹出缓存项,类似 dict.pop()
"""
value = self.get(key, region=region)
if value is not None:
self.delete(key, region=region)
return value
if default is not None:
return default
raise KeyError(key)
def popitem(self, region: Optional[str] = DEFAULT_CACHE_REGION) -> Tuple[str, Any]:
"""
弹出最后一个缓存项,类似 dict.popitem()
"""
items = list(self.items(region=region))
if not items:
raise KeyError("popitem(): cache is empty")
key, value = items[-1]
self.delete(key, region=region)
return key, value
def setdefault(self, key: str, default: Any = None, region: Optional[str] = DEFAULT_CACHE_REGION,
ttl: Optional[int] = None, **kwargs) -> Any:
"""
设置默认值,类似 dict.setdefault()
"""
value = self.get(key, region=region)
if value is None:
self.set(key, default, ttl=ttl, region=region, **kwargs)
return default
return value
@abstractmethod
def close(self) -> None:
"""
@@ -102,42 +199,21 @@ class CacheBackend(ABC, metaclass=Singleton):
pass
@staticmethod
def get_region(region: Optional[str] = DEFAULT_CACHE_REGION):
def get_region(region: Optional[str] = None) -> str:
"""
获取缓存的区
"""
return f"region:{region}" if region else "region:default"
@staticmethod
def get_cache_key(func, args, kwargs):
"""
获取缓存的键,通过哈希函数对函数的参数进行处理
:param func: 被装饰的函数
:param args: 位置参数
:param kwargs: 关键字参数
:return: 缓存键
"""
signature = inspect.signature(func)
# 绑定传入的参数并应用默认值
bound = signature.bind(*args, **kwargs)
bound.apply_defaults()
# 忽略第一个参数,如果它是实例(self)或类(cls)
parameters = list(signature.parameters.keys())
if parameters and parameters[0] in ("self", "cls"):
bound.arguments.pop(parameters[0], None)
# 按照函数签名顺序提取参数值列表
keys = [
bound.arguments[param] for param in signature.parameters if param in bound.arguments
]
# 使用有序参数生成缓存键
return f"{func.__name__}_{hashkey(*keys)}"
@staticmethod
def is_redis() -> bool:
"""
判断当前缓存后端是否为 Redis
"""
return settings.CACHE_BACKEND_TYPE == "redis"
class AsyncCacheBackend(ABC, metaclass=Singleton):
class AsyncCacheBackend(CacheBackend):
"""
缓存后端基类,定义通用的缓存接口(异步)
"""
@@ -207,6 +283,64 @@ class AsyncCacheBackend(ABC, metaclass=Singleton):
"""
pass
async def keys(self, region: Optional[str] = DEFAULT_CACHE_REGION) -> AsyncGenerator[str, None]:
"""
获取所有缓存键,类似 dict.keys()(异步)
"""
async for key, _ in await self.items(region=region):
yield key
async def values(self, region: Optional[str] = DEFAULT_CACHE_REGION) -> AsyncGenerator[Any, None]:
"""
获取所有缓存值,类似 dict.values()(异步)
"""
async for _, value in await self.items(region=region):
yield value
async def update(self, other: Dict[str, Any], region: Optional[str] = DEFAULT_CACHE_REGION,
ttl: Optional[int] = None, **kwargs) -> None:
"""
更新缓存,类似 dict.update()(异步)
"""
for key, value in other.items():
await self.set(key, value, ttl=ttl, region=region, **kwargs)
async def pop(self, key: str, default: Any = None, region: Optional[str] = DEFAULT_CACHE_REGION) -> Any:
"""
弹出缓存项,类似 dict.pop()(异步)
"""
value = await self.get(key, region=region)
if value is not None:
await self.delete(key, region=region)
return value
if default is not None:
return default
raise KeyError(key)
async def popitem(self, region: Optional[str] = DEFAULT_CACHE_REGION) -> Tuple[str, Any]:
"""
弹出最后一个缓存项,类似 dict.popitem()(异步)
"""
items = []
async for item in await self.items(region=region):
items.append(item)
if not items:
raise KeyError("popitem(): cache is empty")
key, value = items[-1]
await self.delete(key, region=region)
return key, value
async def setdefault(self, key: str, default: Any = None, region: Optional[str] = DEFAULT_CACHE_REGION,
ttl: Optional[int] = None, **kwargs) -> Any:
"""
设置默认值,类似 dict.setdefault()(异步)
"""
value = await self.get(key, region=region)
if value is None:
await self.set(key, default, ttl=ttl, region=region, **kwargs)
return default
return value
@abstractmethod
async def close(self) -> None:
"""
@@ -214,41 +348,6 @@ class AsyncCacheBackend(ABC, metaclass=Singleton):
"""
pass
@staticmethod
def get_region(region: Optional[str] = DEFAULT_CACHE_REGION):
"""
获取缓存的区
"""
return f"region:{region}" if region else "region:default"
@staticmethod
def get_cache_key(func, args, kwargs):
"""
获取缓存的键,通过哈希函数对函数的参数进行处理
:param func: 被装饰的函数
:param args: 位置参数
:param kwargs: 关键字参数
:return: 缓存键
"""
signature = inspect.signature(func)
# 绑定传入的参数并应用默认值
bound = signature.bind(*args, **kwargs)
bound.apply_defaults()
# 忽略第一个参数,如果它是实例(self)或类(cls)
parameters = list(signature.parameters.keys())
if parameters and parameters[0] in ("self", "cls"):
bound.arguments.pop(parameters[0], None)
# 按照函数签名顺序提取参数值列表
keys = [
bound.arguments[param] for param in signature.parameters if param in bound.arguments
]
# 使用有序参数生成缓存键
return f"{func.__name__}_{hashkey(*keys)}"
@staticmethod
def is_redis() -> bool:
return settings.CACHE_BACKEND_TYPE == "redis"
class MemoryBackend(CacheBackend):
"""
@@ -373,6 +472,128 @@ class MemoryBackend(CacheBackend):
pass
class AsyncMemoryBackend(AsyncCacheBackend):
"""
基于 `cachetools.TTLCache` 实现的异步缓存后端
"""
def __init__(self, maxsize: Optional[int] = None, ttl: Optional[int] = None):
"""
初始化缓存实例
:param maxsize: 缓存的最大条目数
:param ttl: 默认缓存存活时间,单位秒
"""
self.maxsize = maxsize or 1024 # 未设置时默认最大条目数为 1024
self.ttl = ttl
# 存储各个 region 的缓存实例region -> TTLCache
self._region_caches: Dict[str, MemoryTTLCache] = {}
def __get_region_cache(self, region: str) -> Optional[MemoryTTLCache]:
"""
获取指定区域的缓存实例,如果不存在则返回 None
"""
region = self.get_region(region)
return self._region_caches.get(region)
async def set(self, key: str, value: Any, ttl: Optional[int] = None,
region: Optional[str] = DEFAULT_CACHE_REGION, **kwargs) -> None:
"""
设置缓存值支持每个 key 独立配置 TTL
:param key: 缓存的键
:param value: 缓存的值
:param ttl: 缓存的存活时间,不传入为永久缓存,单位秒
:param region: 缓存的区
"""
ttl = ttl or self.ttl
maxsize = kwargs.get("maxsize", self.maxsize)
region = self.get_region(region)
# 如果该 key 尚未有缓存实例,则创建一个新的 TTLCache 实例
region_cache = self._region_caches.setdefault(region, MemoryTTLCache(maxsize=maxsize, ttl=ttl))
# 设置缓存值
with lock:
region_cache[key] = value
async def exists(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION) -> bool:
"""
判断缓存键是否存在
:param key: 缓存的键
:param region: 缓存的区
:return: 存在返回 True否则返回 False
"""
region_cache = self.__get_region_cache(region)
if region_cache is None:
return False
return key in region_cache
async def get(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION) -> Any:
"""
获取缓存的值
:param key: 缓存的键
:param region: 缓存的区
:return: 返回缓存的值,如果缓存不存在返回 None
"""
region_cache = self.__get_region_cache(region)
if region_cache is None:
return None
return region_cache.get(key)
async def delete(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION):
"""
删除缓存
:param key: 缓存的键
:param region: 缓存的区
"""
region_cache = self.__get_region_cache(region)
if region_cache is None:
return
with lock:
del region_cache[key]
async def clear(self, region: Optional[str] = DEFAULT_CACHE_REGION) -> None:
"""
清除指定区域的缓存或全部缓存
:param region: 缓存的区为None时清空所有区缓存
"""
if region:
# 清理指定缓存区
region_cache = self.__get_region_cache(region)
if region_cache:
with lock:
region_cache.clear()
logger.info(f"Cleared cache for region: {region}")
else:
# 清除所有区域的缓存
for region_cache in self._region_caches.values():
with lock:
region_cache.clear()
logger.info("Cleared all cache")
async def items(self, region: Optional[str] = DEFAULT_CACHE_REGION) -> AsyncGenerator[Tuple[str, Any], None]:
"""
获取指定区域的所有缓存项
:param region: 缓存的区
:return: 返回一个字典,包含所有缓存键值对
"""
region_cache = self.__get_region_cache(region)
if region_cache is None:
return
for item in region_cache.items():
yield item
async def close(self) -> None:
"""
内存缓存不需要关闭资源
"""
pass
class RedisBackend(CacheBackend):
"""
基于 Redis 实现的缓存后端,支持通过 Redis 存储缓存
@@ -640,7 +861,7 @@ class FileBackend(CacheBackend):
for item in cache_path.iterdir():
if item.is_file():
with open(item, 'r') as f:
yield item.name, f.read()
yield item.as_posix(), f.read()
def close(self) -> None:
"""
@@ -753,7 +974,7 @@ class AsyncFileBackend(AsyncCacheBackend):
async for item in cache_path.iterdir():
if await item.is_file():
async with aiofiles.open(item, 'r') as f:
yield item.name, await f.read()
yield item.as_posix(), await f.read()
async def close(self) -> None:
"""
@@ -801,143 +1022,19 @@ def Cache(maxsize: Optional[int] = None, ttl: Optional[int] = None) -> CacheBack
return MemoryBackend(maxsize=maxsize, ttl=ttl)
class TTLCache:
def AsyncCache(maxsize: Optional[int] = None, ttl: Optional[int] = None) -> AsyncCacheBackend:
"""
TTL缓存类根据配置自动选择使用Redis或cachetoolsmaxsize仅在未启用Redis时生效
根据配置获取异步缓存后端实例内存或Redismaxsize仅在未启用Redis时生效
特性:
- 提供与cachetools.TTLCache相同的接口
- 根据配置自动选择缓存后端
- 支持Redis和cachetools的切换
:param maxsize: 缓存的最大条目数仅使用cachetools时生效
:param ttl: 缓存的默认存活时间,单位秒
:return: 返回异步缓存后端实例
"""
def __init__(self, region: Optional[str] = DEFAULT_CACHE_REGION,
maxsize: int = None, ttl: int = None):
"""
初始化TTL缓存
:param region: 缓存的区,默认为 DEFAULT_CACHE_REGION
:param maxsize: 缓存的最大条目数
:param ttl: 缓存的存活时间,单位秒
"""
self.region = region
self.maxsize = maxsize
self.ttl = ttl
self._backend = Cache(maxsize=maxsize, ttl=ttl)
def __getitem__(self, key: str):
"""
获取缓存项
"""
try:
value = self._backend.get(key, region=self.region)
if value is not None:
return value
except Exception as e:
logger.warning(f"缓存获取失败: {e}")
raise KeyError(key)
def __setitem__(self, key: str, value: Any):
"""
设置缓存项
"""
try:
self._backend.set(key, value, ttl=self.ttl, region=self.region)
except Exception as e:
logger.warning(f"缓存设置失败: {e}")
def __delitem__(self, key: str):
"""
删除缓存项
"""
try:
self._backend.delete(key, region=self.region)
except Exception as e:
logger.warning(f"缓存删除失败: {e}")
def __contains__(self, key: str):
"""
检查键是否存在
"""
try:
return self._backend.exists(key, region=self.region)
except Exception as e:
logger.warning(f"缓存检查失败: {e}")
return False
def __iter__(self):
"""
返回缓存的迭代器
"""
for key, _ in self._backend.items(region=self.region):
yield key
def set(self, key: str, value: Any, ttl: Optional[int] = None):
"""
设置缓存项,支持自定义 TTL
"""
try:
ttl = ttl or self.ttl
self._backend.set(key, value, ttl=ttl, region=self.region)
except Exception as e:
logger.warning(f"缓存设置失败: {e}")
def get(self, key: str, default: Any = None):
"""
获取缓存项,如果不存在返回默认值
"""
try:
value = self._backend.get(key, region=self.region)
if value is not None:
return value
except Exception as e:
logger.warning(f"缓存获取失败: {e}")
return default
def delete(self, key: str):
"""
删除缓存项
"""
try:
self._backend.delete(key, region=self.region)
except Exception as e:
logger.warning(f"缓存删除失败: {e}")
def items(self):
"""
获取缓存的所有键值对
"""
try:
return self._backend.items(region=self.region)
except Exception as e:
logger.warning(f"缓存获取失败: {e}")
return []
def clear(self):
"""
清空缓存
"""
try:
self._backend.clear(region=self.region)
except Exception as e:
logger.warning(f"缓存清空失败: {e}")
def is_redis(self) -> bool:
"""
判断当前缓存后端是否为 Redis
"""
return self._backend.is_redis()
def close(self):
"""
关闭缓存连接
"""
try:
self._backend.close()
except Exception as e:
logger.warning(f"缓存关闭失败: {e}")
if settings.CACHE_BACKEND_TYPE == "redis":
return AsyncRedisBackend(ttl=ttl)
else:
# 使用异步内存缓存maxsize需要有值
return AsyncMemoryBackend(maxsize=maxsize, ttl=ttl)
def cached(region: Optional[str] = None, maxsize: Optional[int] = 1024, ttl: Optional[int] = None,
@@ -952,39 +1049,85 @@ def cached(region: Optional[str] = None, maxsize: Optional[int] = 1024, ttl: Opt
:param skip_empty: 跳过空值缓存(如 None, [], {}, "", set()),默认为 False
:return: 装饰器函数
"""
# 缓存后端实例
cache_backend = Cache(maxsize=maxsize, ttl=ttl)
def should_cache(value: Any) -> bool:
"""
判断是否应该缓存结果,如果返回值是 None 或空值则不缓存
:param value: 要判断的缓存值
:return: 是否缓存结果
"""
if skip_none and value is None:
return False
# if skip_empty and value in [None, [], {}, "", set()]:
if skip_empty and not value:
return False
return True
def is_valid_cache_value(cache_key: str, cached_value: Any, cache_region: str) -> bool:
"""
判断指定的值是否为一个有效的缓存值
:param cache_key: 缓存的键
:param cached_value: 缓存的值
:param cache_region: 缓存的区
:return: 若值是有效的缓存值返回 True否则返回 False
"""
# 如果 skip_none 为 False且 value 为 None需要判断缓存实际是否存在
if not skip_none and cached_value is None:
if not cache_backend.exists(key=cache_key, region=cache_region):
return False
return True
def decorator(func):
# 检查是否为异步函数
is_async = inspect.iscoroutinefunction(func)
# 根据函数类型选择对应的缓存后端
if is_async:
# 异步函数使用异步缓存后端
cache_backend = AsyncCache(maxsize=maxsize, ttl=ttl)
else:
# 同步函数使用同步缓存后端
cache_backend = Cache(maxsize=maxsize, ttl=ttl)
def should_cache(value: Any) -> bool:
"""
判断是否应该缓存结果,如果返回值是 None 或空值则不缓存
:param value: 要判断的缓存值
:return: 是否缓存结果
"""
if skip_none and value is None:
return False
# if skip_empty and value in [None, [], {}, "", set()]:
if skip_empty and not value:
return False
return True
def is_valid_cache_value(_cache_key: str, _cached_value: Any, _cache_region: str) -> bool:
"""
判断指定的值是否为一个有效的缓存值
:param _cache_key: 缓存的键
:param _cached_value: 缓存的值
:param _cache_region: 缓存的区
:return: 若值是有效的缓存值返回 True否则返回 False
"""
# 如果 skip_none 为 False且 value 为 None需要判断缓存实际是否存在
if not skip_none and _cached_value is None:
if not cache_backend.exists(key=_cache_key, region=_cache_region):
return False
return True
async def async_is_valid_cache_value(_cache_key: str, _cached_value: Any, _cache_region: str) -> bool:
"""
判断指定的值是否为一个有效的缓存值(异步版本)
:param _cache_key: 缓存的键
:param _cached_value: 缓存的值
:param _cache_region: 缓存的区
:return: 若值是有效的缓存值返回 True否则返回 False
"""
# 如果 skip_none 为 False且 value 为 None需要判断缓存实际是否存在
if not skip_none and _cached_value is None:
if not await cache_backend.exists(key=_cache_key, region=_cache_region):
return False
return True
def __get_cache_key(args, kwargs) -> str:
"""
根据函数和参数生成缓存键
:param args: 位置参数
:param kwargs: 关键字参数
:return: 缓存键
"""
signature = inspect.signature(func)
# 绑定传入的参数并应用默认值
bound = signature.bind(*args, **kwargs)
bound.apply_defaults()
# 忽略第一个参数,如果它是实例(self)或类(cls)
parameters = list(signature.parameters.keys())
if parameters and parameters[0] in ("self", "cls"):
bound.arguments.pop(parameters[0], None)
# 按照函数签名顺序提取参数值列表
keys = [
bound.arguments[param] for param in signature.parameters if param in bound.arguments
]
# 使用有序参数生成缓存键
return f"{func.__name__}_{hashkey(*keys)}"
# 获取缓存区
cache_region = region if region is not None else f"{func.__module__}.{func.__name__}"
@@ -997,10 +1140,11 @@ def cached(region: Optional[str] = None, maxsize: Optional[int] = 1024, ttl: Opt
@wraps(func)
async def async_wrapper(*args, **kwargs):
# 获取缓存键
cache_key = cache_backend.get_cache_key(func, args, kwargs)
cache_key = __get_cache_key(args, kwargs)
# 尝试获取缓存
cached_value = cache_backend.get(cache_key, region=cache_region)
if should_cache(cached_value) and is_valid_cache_value(cache_key, cached_value, cache_region):
cached_value = await cache_backend.get(cache_key, region=cache_region)
if should_cache(cached_value) and await async_is_valid_cache_value(cache_key, cached_value,
cache_region):
return cached_value
# 执行异步函数并缓存结果
result = await func(*args, **kwargs)
@@ -1008,14 +1152,14 @@ def cached(region: Optional[str] = None, maxsize: Optional[int] = 1024, ttl: Opt
if not should_cache(result):
return result
# 设置缓存(如果有传入的 maxsize 和 ttl则覆盖默认值
cache_backend.set(cache_key, result, ttl=ttl, maxsize=maxsize, region=cache_region)
await cache_backend.set(cache_key, result, ttl=ttl, maxsize=maxsize, region=cache_region)
return result
def cache_clear():
async def cache_clear():
"""
清理缓存区
"""
cache_backend.clear(region=cache_region)
await cache_backend.clear(region=cache_region)
async_wrapper.cache_region = cache_region
async_wrapper.cache_clear = cache_clear
@@ -1025,7 +1169,7 @@ def cached(region: Optional[str] = None, maxsize: Optional[int] = 1024, ttl: Opt
@wraps(func)
def wrapper(*args, **kwargs):
# 获取缓存键
cache_key = cache_backend.get_cache_key(func, args, kwargs)
cache_key = __get_cache_key(args, kwargs)
# 尝试获取缓存
cached_value = cache_backend.get(cache_key, region=cache_region)
if should_cache(cached_value) and is_valid_cache_value(cache_key, cached_value, cache_region):
@@ -1050,3 +1194,125 @@ def cached(region: Optional[str] = None, maxsize: Optional[int] = 1024, ttl: Opt
return wrapper
return decorator
class CacheProxy:
"""
缓存代理类,将缓存后端的方法直接代理到实例上
"""
def __init__(self, cache_backend: CacheBackend, region: str, ttl: Optional[int] = None):
"""
初始化缓存代理
:param cache_backend: 缓存后端实例
:param region: 缓存区域
:param ttl: TTL 时间(仅用于 TTL 缓存)
"""
self._cache_backend = cache_backend
self._region = region
self._ttl = ttl
def __getattr__(self, name):
"""
代理所有未定义的方法到缓存后端
"""
if hasattr(self._cache_backend, name):
method = getattr(self._cache_backend, name)
if callable(method):
# 检查方法签名,自动添加 region 和 ttl 参数
def wrapper(*args, **kwargs):
# 检查方法是否接受 region 参数
sig = inspect.signature(method)
params = sig.parameters
# 如果方法接受 region 参数且未提供,则添加
if 'region' in params and 'region' not in kwargs:
kwargs['region'] = self._region
# 如果方法接受 ttl 参数且未提供,且我们有 ttl 值,则添加
if 'ttl' in params and 'ttl' not in kwargs and self._ttl is not None:
kwargs['ttl'] = self._ttl
return method(*args, **kwargs)
return wrapper
raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'")
def __getitem__(self, key):
"""
获取缓存项
"""
value = self._cache_backend.get(key, region=self._region)
if value is None:
raise KeyError(key)
return value
def __setitem__(self, key, value):
"""
设置缓存项
"""
kwargs = {'region': self._region}
if self._ttl is not None:
kwargs['ttl'] = self._ttl # noqa
self._cache_backend.set(key, value, **kwargs)
def __delitem__(self, key):
"""
删除缓存项
"""
if not self._cache_backend.exists(key, region=self._region):
raise KeyError(key)
self._cache_backend.delete(key, region=self._region)
def __contains__(self, key):
"""
检查键是否存在
"""
return self._cache_backend.exists(key, region=self._region)
def __iter__(self):
"""
返回缓存的迭代器
"""
for key, _ in self._cache_backend.items(region=self._region):
yield key
def __len__(self):
"""
返回缓存项的数量
"""
return sum(1 for _ in self._cache_backend.items(region=self._region))
class TTLCache(CacheProxy):
"""
基于 TTL 的缓存类,兼容 cachetools.TTLCache 接口
使用项目的缓存后端实现,支持 Redis 和内存缓存
"""
def __init__(self, maxsize: int = 1024, ttl: int = None, region: Optional[str] = None):
"""
初始化 TTL 缓存
:param maxsize: 缓存的最大条目数
:param ttl: 缓存的存活时间,单位秒
:param region: 缓存的区,为 None 时使用默认区
"""
super().__init__(Cache(maxsize=maxsize, ttl=ttl), region or DEFAULT_CACHE_REGION, ttl)
class LRUCache(CacheProxy):
"""
基于 LRU 的缓存类,兼容 cachetools.LRUCache 接口
使用项目的缓存后端实现,支持 Redis 和内存缓存
"""
def __init__(self, maxsize: int = 1024, region: Optional[str] = None):
"""
初始化 LRU 缓存
:param maxsize: 缓存的最大条目数
:param region: 缓存的区,为 None 时使用默认区
"""
super().__init__(Cache(maxsize=maxsize), region or DEFAULT_CACHE_REGION)

View File

@@ -21,7 +21,7 @@ from app.core.config import settings
from app.core.event import eventmanager, Event
from app.db.plugindata_oper import PluginDataOper
from app.db.systemconfig_oper import SystemConfigOper
from app.helper.plugin import PluginHelper
from app.helper.plugin import PluginHelper, PluginMemoryMonitor
from app.helper.sites import SitesHelper # noqa
from app.log import logger
from app.schemas.types import EventType, SystemConfigKey
@@ -98,6 +98,8 @@ class PluginManager(metaclass=Singleton):
self._config_key: str = "plugin.%s"
# 监听器
self._observer: Observer = None
# 内存监控器
self._memory_monitor = PluginMemoryMonitor()
# 开发者模式监测插件修改
if settings.DEV or settings.PLUGIN_AUTO_RELOAD:
self.__start_monitor()
@@ -863,6 +865,28 @@ class PluginManager(metaclass=Singleton):
"""
return list(self._running_plugins.keys())
def get_plugin_memory_stats(self, pid: Optional[str] = None) -> List[Dict[str, Any]]:
"""
获取插件内存统计信息
:param pid: 插件ID为空则获取所有插件
:return: 内存统计信息列表
"""
if pid:
plugin_instance = self._running_plugins.get(pid)
if plugin_instance:
return [self._memory_monitor.get_plugin_memory_usage(pid, plugin_instance)]
else:
return []
else:
return self._memory_monitor.get_all_plugins_memory_usage(self._running_plugins)
def clear_plugin_memory_cache(self, pid: Optional[str] = None):
"""
清除插件内存统计缓存
:param pid: 插件ID为空则清除所有缓存
"""
self._memory_monitor.clear_cache(pid)
def get_online_plugins(self, force: bool = False) -> List[schemas.Plugin]:
"""
获取所有在线插件信息

View File

@@ -252,19 +252,19 @@ def __verify_key(key: str, expected_key: str, key_type: str) -> str:
def verify_apitoken(token: Annotated[str, Security(__get_api_token)]) -> str:
"""
使用 API Token 进行身份认证
:param token: API Token从 URL 查询参数中获取
:param token: API Token从 URL 查询参数中获取 token=xxx
:return: 返回校验通过的 API Token
"""
return __verify_key(token, settings.API_TOKEN, "API_TOKEN")
return __verify_key(token, settings.API_TOKEN, "token")
def verify_apikey(apikey: Annotated[str, Security(__get_api_key)]) -> str:
"""
使用 API Key 进行身份认证
:param apikey: API Key从 URL 查询参数或请求头中获取
:param apikey: API Key从 URL 查询参数中获取 apikey=xxx
:return: 返回校验通过的 API Key
"""
return __verify_key(apikey, settings.API_TOKEN, "API_KEY")
return __verify_key(apikey, settings.API_TOKEN, "apikey")
def verify_password(plain_password: str, hashed_password: str) -> bool:

View File

@@ -34,6 +34,7 @@ class SubscribeOper(DbOper):
"backdrop": mediainfo.get_backdrop_image(),
"vote": mediainfo.vote_average,
"description": mediainfo.overview,
"search_imdbid": 1 if kwargs.get('search_imdbid') else 0,
"date": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
})
if not subscribe:

View File

@@ -4,10 +4,11 @@ import json
import shutil
import site
import sys
import time
import traceback
import zipfile
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Set, Callable, Awaitable
from typing import Dict, List, Optional, Tuple, Set, Callable, Awaitable, Any
import aiofiles
import aioshutil
@@ -24,6 +25,7 @@ from app.db.systemconfig_oper import SystemConfigOper
from app.log import logger
from app.schemas.types import SystemConfigKey
from app.utils.http import RequestUtils, AsyncRequestUtils
from app.utils.memory import MemoryCalculator
from app.utils.singleton import WeakSingleton
from app.utils.system import SystemUtils
from app.utils.url import UrlUtils
@@ -1569,3 +1571,87 @@ class PluginHelper(metaclass=WeakSingleton):
except Exception as e:
logger.error(f"解压 Release 压缩包失败:{e}")
return False, f"解压 Release 压缩包失败:{e}"
class PluginMemoryMonitor:
"""
插件内存监控器
"""
def __init__(self):
self._calculator = MemoryCalculator()
self._cache = {}
self._cache_ttl = 300 # 缓存5分钟
def get_plugin_memory_usage(self, plugin_id: str, plugin_instance: Any) -> Dict[str, Any]:
"""
获取插件内存使用情况
:param plugin_id: 插件ID
:param plugin_instance: 插件实例
:return: 内存使用信息
"""
# 检查缓存
if self._is_cache_valid(plugin_id):
return self._cache[plugin_id]
# 计算内存使用
memory_info = self._calculator.calculate_object_memory(plugin_instance)
# 添加插件信息
result = {
'plugin_id': plugin_id,
'plugin_name': getattr(plugin_instance, 'plugin_name', 'Unknown'),
'plugin_version': getattr(plugin_instance, 'plugin_version', 'Unknown'),
'timestamp': time.time(),
**memory_info
}
# 更新缓存
self._cache[plugin_id] = result
return result
def get_all_plugins_memory_usage(self, plugins: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
获取所有插件的内存使用情况
:param plugins: 插件实例字典
:return: 内存使用信息列表
"""
results = []
for plugin_id, plugin_instance in plugins.items():
if plugin_instance:
try:
memory_info = self.get_plugin_memory_usage(plugin_id, plugin_instance)
results.append(memory_info)
except Exception as e:
logger.error(f"获取插件 {plugin_id} 内存使用情况失败:{str(e)}")
results.append({
'plugin_id': plugin_id,
'plugin_name': getattr(plugin_instance, 'plugin_name', 'Unknown'),
'error': str(e),
'total_memory_bytes': 0,
'total_memory_mb': 0,
'object_count': 0,
'calculation_time_ms': 0
})
# 按内存使用量排序
results.sort(key=lambda x: x.get('total_memory_bytes', 0), reverse=True)
return results
def _is_cache_valid(self, plugin_id: str) -> bool:
"""
检查缓存是否有效
"""
if plugin_id not in self._cache:
return False
return time.time() - self._cache[plugin_id]['timestamp'] < self._cache_ttl
def clear_cache(self, plugin_id: Optional[str] = None):
"""
清除缓存
:param plugin_id: 插件ID为空则清除所有缓存
"""
if plugin_id:
self._cache.pop(plugin_id, None)
else:
self._cache.clear()

View File

@@ -1,6 +1,6 @@
import json
import pickle
from typing import Any, Optional, Generator, Tuple, AsyncGenerator
from typing import Any, Optional, Generator, Tuple, AsyncGenerator, Union
from urllib.parse import quote
import redis
@@ -140,20 +140,34 @@ class RedisHelper(metaclass=Singleton):
logger.error(f"Failed to set Redis maxmemory or policy: {e}")
@staticmethod
def get_region(region: Optional[str] = "DEFAULT"):
def __get_region(region: Optional[str] = None):
"""
获取缓存的区
"""
return f"region:{region}" if region else "region:default"
return f"region:{quote(region)}" if region else "region:DEFAULT"
def get_redis_key(self, region: str, key: str) -> str:
def __make_redis_key(self, region: str, key: str) -> str:
"""
获取缓存Key
"""
# 使用region作为缓存键的一部分
region = self.get_region(quote(region))
region = self.__get_region(region)
return f"{region}:key:{quote(key)}"
@staticmethod
def __get_original_key(redis_key: Union[str, bytes]) -> str:
"""
从Redis键中提取原始key
"""
try:
if isinstance(redis_key, bytes):
redis_key = redis_key.decode("utf-8")
parts = redis_key.split(":key:")
return parts[-1]
except Exception as e:
logger.warn(f"Failed to parse redis key: {redis_key}, error: {e}")
return redis_key
def set(self, key: str, value: Any, ttl: Optional[int] = None,
region: Optional[str] = "DEFAULT", **kwargs) -> None:
"""
@@ -167,7 +181,7 @@ class RedisHelper(metaclass=Singleton):
"""
try:
self._connect()
redis_key = self.get_redis_key(region, key)
redis_key = self.__make_redis_key(region, key)
# 对值进行序列化
serialized_value = serialize(value)
kwargs.pop("maxsize", None)
@@ -185,7 +199,7 @@ class RedisHelper(metaclass=Singleton):
"""
try:
self._connect()
redis_key = self.get_redis_key(region, key)
redis_key = self.__make_redis_key(region, key)
return self.client.exists(redis_key) == 1
except Exception as e:
logger.error(f"Failed to exists key: {key} region: {region}, error: {e}")
@@ -201,7 +215,7 @@ class RedisHelper(metaclass=Singleton):
"""
try:
self._connect()
redis_key = self.get_redis_key(region, key)
redis_key = self.__make_redis_key(region, key)
value = self.client.get(redis_key)
if value is not None:
return deserialize(value)
@@ -219,7 +233,7 @@ class RedisHelper(metaclass=Singleton):
"""
try:
self._connect()
redis_key = self.get_redis_key(region, key)
redis_key = self.__make_redis_key(region, key)
self.client.delete(redis_key)
except Exception as e:
logger.error(f"Failed to delete key: {key} in region: {region}, error: {e}")
@@ -233,7 +247,7 @@ class RedisHelper(metaclass=Singleton):
try:
self._connect()
if region:
cache_region = self.get_region(quote(region))
cache_region = self.__get_region(region)
redis_key = f"{cache_region}:key:*"
with self.client.pipeline() as pipe:
for key in self.client.scan_iter(redis_key):
@@ -256,17 +270,17 @@ class RedisHelper(metaclass=Singleton):
try:
self._connect()
if region:
cache_region = self.get_region(quote(region))
cache_region = self.__get_region(region)
redis_key = f"{cache_region}:key:*"
for key in self.client.scan_iter(redis_key):
value = self.client.get(key)
if value is not None:
yield key, deserialize(value)
yield self.__get_original_key(key), deserialize(value)
else:
for key in self.client.scan_iter("*"):
value = self.client.get(key)
if value is not None:
yield key, deserialize(value)
yield self.__get_original_key(key), deserialize(value)
except Exception as e:
logger.error(f"Failed to get items from Redis, region: {region}, error: {e}")
@@ -367,20 +381,34 @@ class AsyncRedisHelper(metaclass=Singleton):
logger.error(f"Failed to set Redis maxmemory or policy (async): {e}")
@staticmethod
def get_region(region: Optional[str] = "DEFAULT"):
def __get_region(region: Optional[str] = "DEFAULT"):
"""
获取缓存的区
"""
return f"region:{region}" if region else "region:default"
def get_redis_key(self, region: str, key: str) -> str:
def __make_redis_key(self, region: str, key: str) -> str:
"""
获取缓存Key
"""
# 使用region作为缓存键的一部分
region = self.get_region(quote(region))
region = self.__get_region(region)
return f"{region}:key:{quote(key)}"
@staticmethod
def __get_original_key(redis_key: Union[str, bytes]) -> str:
"""
从Redis键中提取原始key
"""
try:
if isinstance(redis_key, bytes):
redis_key = redis_key.decode("utf-8")
parts = redis_key.split(":key:")
return parts[-1]
except Exception as e:
logger.warn(f"Failed to parse redis key: {redis_key}, error: {e}")
return redis_key
async def set(self, key: str, value: Any, ttl: Optional[int] = None,
region: Optional[str] = "DEFAULT", **kwargs) -> None:
"""
@@ -394,7 +422,7 @@ class AsyncRedisHelper(metaclass=Singleton):
"""
try:
await self._connect()
redis_key = self.get_redis_key(region, key)
redis_key = self.__make_redis_key(region, key)
# 对值进行序列化
serialized_value = serialize(value)
kwargs.pop("maxsize", None)
@@ -412,7 +440,7 @@ class AsyncRedisHelper(metaclass=Singleton):
"""
try:
await self._connect()
redis_key = self.get_redis_key(region, key)
redis_key = self.__make_redis_key(region, key)
result = await self.client.exists(redis_key)
return result == 1
except Exception as e:
@@ -429,7 +457,7 @@ class AsyncRedisHelper(metaclass=Singleton):
"""
try:
await self._connect()
redis_key = self.get_redis_key(region, key)
redis_key = self.__make_redis_key(region, key)
value = await self.client.get(redis_key)
if value is not None:
return deserialize(value)
@@ -447,7 +475,7 @@ class AsyncRedisHelper(metaclass=Singleton):
"""
try:
await self._connect()
redis_key = self.get_redis_key(region, key)
redis_key = self.__make_redis_key(region, key)
await self.client.delete(redis_key)
except Exception as e:
logger.error(f"Failed to delete key (async): {key} in region: {region}, error: {e}")
@@ -461,7 +489,7 @@ class AsyncRedisHelper(metaclass=Singleton):
try:
await self._connect()
if region:
cache_region = self.get_region(quote(region))
cache_region = self.__get_region(region)
redis_key = f"{cache_region}:key:*"
async with self.client.pipeline() as pipe:
async for key in self.client.scan_iter(redis_key):
@@ -484,17 +512,17 @@ class AsyncRedisHelper(metaclass=Singleton):
try:
await self._connect()
if region:
cache_region = self.get_region(quote(region))
cache_region = self.__get_region(region)
redis_key = f"{cache_region}:key:*"
async for key in self.client.scan_iter(redis_key):
value = await self.client.get(key)
if value is not None:
yield key, deserialize(value)
yield self.__get_original_key(key), deserialize(value)
else:
async for key in self.client.scan_iter("*"):
value = await self.client.get(key)
if value is not None:
yield key, deserialize(value)
yield self.__get_original_key(key), deserialize(value)
except Exception as e:
logger.error(f"Failed to get items from Redis, region: {region}, error: {e}")

View File

@@ -7,6 +7,7 @@ from urllib.parse import unquote
from torrentool.api import Torrent
from app.core.cache import FileCache
from app.core.cache import TTLCache
from app.core.config import settings
from app.core.context import Context, TorrentInfo, MediaInfo
from app.core.meta import MetaBase
@@ -16,17 +17,16 @@ from app.db.systemconfig_oper import SystemConfigOper
from app.log import logger
from app.schemas.types import MediaType, SystemConfigKey
from app.utils.http import RequestUtils
from app.utils.singleton import WeakSingleton
from app.utils.string import StringUtils
class TorrentHelper(metaclass=WeakSingleton):
class TorrentHelper:
"""
种子帮助类
"""
def __init__(self):
self._invalid_torrents = []
self._invalid_torrents = TTLCache(maxsize=128, ttl=3600 * 24)
def download_torrent(self, url: str,
cookie: Optional[str] = None,
@@ -199,8 +199,14 @@ class TorrentHelper(metaclass=WeakSingleton):
:param torrent_content: 种子内容
:return: 文件夹名、文件清单,单文件种子返回空文件夹名
"""
if not torrent_content:
return "", []
# 检查是否为磁力链接
if StringUtils.is_magnet_link(torrent_content):
return "", []
try:
# 解析种子内容
torrentinfo = Torrent.from_string(torrent_content)
@@ -346,7 +352,7 @@ class TorrentHelper(metaclass=WeakSingleton):
添加无效种子
"""
if url not in self._invalid_torrents:
self._invalid_torrents.append(url)
self._invalid_torrents[url] = True
@staticmethod
def match_torrent(mediainfo: MediaInfo, torrent_meta: MetaBase, torrent: TorrentInfo) -> bool:

View File

@@ -938,6 +938,8 @@ class DoubanModule(_ModuleBase):
"""
搜索人物信息
"""
if settings.SEARCH_SOURCE and "douban" not in settings.SEARCH_SOURCE:
return None
if not name:
return []
result = self.doubanapi.person_search(keyword=name)
@@ -956,6 +958,8 @@ class DoubanModule(_ModuleBase):
"""
搜索人物信息(异步版本)
"""
if settings.SEARCH_SOURCE and "douban" not in settings.SEARCH_SOURCE:
return None
if not name:
return []
result = await self.doubanapi.async_person_search(keyword=name)

View File

@@ -118,15 +118,17 @@ class QbittorrentModule(_ModuleBase, _DownloaderBase[Qbittorrent]):
if content.exists():
torrent_content = content.read_bytes()
else:
# 缓存处理器
cache_backend = FileCache()
# 读取缓存的种子文件
torrent_content = cache_backend.get(content.as_posix(), region="torrents")
torrent_content = FileCache().get(content.as_posix(), region="torrents")
else:
torrent_content = content
if torrent_content:
torrent_info = Torrent.from_string(torrent_content)
# 检查是否为磁力链接
if StringUtils.is_magnet_link(torrent_content):
return None, torrent_content
else:
torrent_info = Torrent.from_string(torrent_content)
return torrent_info, torrent_content
except Exception as e:
@@ -138,7 +140,11 @@ class QbittorrentModule(_ModuleBase, _DownloaderBase[Qbittorrent]):
# 读取种子的名称
torrent, content = __get_torrent_info()
if not torrent:
# 检查是否为磁力链接
is_magnet = isinstance(content, str) and content.startswith("magnet:") or isinstance(content,
bytes) and content.startswith(
b"magnet:")
if not torrent and not is_magnet:
return None, None, None, f"添加种子任务失败:无法读取种子文件"
# 获取下载器

View File

@@ -639,6 +639,8 @@ class TheMovieDbModule(_ModuleBase):
"""
搜索人物信息
"""
if settings.SEARCH_SOURCE and "themoviedb" not in settings.SEARCH_SOURCE:
return None
if not name:
return []
results = self.tmdb.search_persons(name)
@@ -646,6 +648,19 @@ class TheMovieDbModule(_ModuleBase):
return [MediaPerson(source='themoviedb', **person) for person in results]
return []
async def async_search_persons(self, name: str) -> Optional[List[MediaPerson]]:
"""
异步搜索人物信息
"""
if settings.SEARCH_SOURCE and "themoviedb" not in settings.SEARCH_SOURCE:
return None
if not name:
return []
results = await self.tmdb.async_search_persons(name)
if results:
return [MediaPerson(source='themoviedb', **person) for person in results]
return []
def search_collections(self, name: str) -> Optional[List[MediaInfo]]:
"""
搜索集合信息

View File

@@ -119,15 +119,17 @@ class TransmissionModule(_ModuleBase, _DownloaderBase[Transmission]):
if content.exists():
torrent_content = content.read_bytes()
else:
# 缓存处理器
cache_backend = FileCache()
# 读取缓存的种子文件
torrent_content = cache_backend.get(content.as_posix(), region="torrents")
torrent_content = FileCache().get(content.as_posix(), region="torrents")
else:
torrent_content = content
if torrent_content:
torrent_info = Torrent.from_string(torrent_content)
# 检查是否为磁力链接
if StringUtils.is_magnet_link(torrent_content):
return None, torrent_content
else:
torrent_info = Torrent.from_string(torrent_content)
return torrent_info, torrent_content
except Exception as e:
@@ -139,7 +141,11 @@ class TransmissionModule(_ModuleBase, _DownloaderBase[Transmission]):
# 读取种子的名称
torrent, content = __get_torrent_info()
if not torrent:
# 检查是否为磁力链接
is_magnet = isinstance(content, str) and content.startswith("magnet:") or isinstance(content,
bytes) and content.startswith(
b"magnet:")
if not torrent and not is_magnet:
return None, None, None, f"添加种子任务失败:无法读取种子文件"
# 获取下载器

View File

@@ -10,7 +10,7 @@ from threading import Lock
from typing import Any, Optional, Dict, List
from apscheduler.schedulers.background import BackgroundScheduler
from app.core.cache import TTLCache
from app.core.cache import TTLCache, FileCache
from watchdog.events import FileSystemEventHandler, FileSystemMovedEvent, FileSystemEvent
from watchdog.observers.polling import PollingObserver
@@ -67,17 +67,14 @@ class Monitor(metaclass=Singleton):
self._observers = []
# 定时服务
self._scheduler = None
# 存储快照缓存目录
self._snapshot_cache_dir = None
# 存储过照间隔(分钟)
self._snapshot_interval = 5
# TTL缓存10秒钟有效
self._cache = TTLCache(region="monitor", maxsize=1024, ttl=10)
# 快照文件缓存
self._snapshot_cache = FileCache(base=settings.CACHE_PATH / "snapshots")
# 监控的文件扩展名
self.all_exts = settings.RMT_MEDIAEXT
# 初始化快照缓存目录
self._snapshot_cache_dir = settings.TEMP_PATH / "snapshots"
self._snapshot_cache_dir.mkdir(exist_ok=True)
# 启动目录监控和文件整理
self.init()
@@ -98,14 +95,13 @@ class Monitor(metaclass=Singleton):
def save_snapshot(self, storage: str, snapshot: Dict, file_count: int = 0,
last_snapshot_time: Optional[float] = None):
"""
保存快照到文件
保存快照到文件缓存
:param storage: 存储名称
:param snapshot: 快照数据
:param last_snapshot_time: 上次快照时间戳
:param file_count: 文件数量,用于调整监控间隔
"""
try:
cache_file = self._snapshot_cache_dir / f"{storage}_snapshot.json"
snapshot_time = max((item.get('modify_time', 0) for item in snapshot.values()), default=None)
if snapshot_time is None:
snapshot_time = last_snapshot_time or time.time()
@@ -114,9 +110,11 @@ class Monitor(metaclass=Singleton):
'file_count': file_count,
'snapshot': snapshot
}
with open(cache_file, 'w', encoding='utf-8') as f:
json.dump(snapshot_data, f, ensure_ascii=False, indent=2) # noqa
logger.debug(f"快照已保存到 {cache_file}")
# 使用FileCache保存快照数据
cache_key = f"{storage}_snapshot"
snapshot_json = json.dumps(snapshot_data, ensure_ascii=False, indent=2)
self._snapshot_cache.set(cache_key, snapshot_json.encode('utf-8'), region="snapshots")
logger.debug(f"快照已保存到缓存: {storage}")
except Exception as e:
logger.error(f"保存快照失败: {e}")
@@ -127,9 +125,9 @@ class Monitor(metaclass=Singleton):
:return: 是否成功
"""
try:
cache_file = self._snapshot_cache_dir / f"{storage}_snapshot.json"
if cache_file.exists():
cache_file.unlink()
cache_key = f"{storage}_snapshot"
if self._snapshot_cache.exists(cache_key, region="snapshots"):
self._snapshot_cache.delete(cache_key, region="snapshots")
logger.info(f"快照已重置: {storage}")
return True
logger.debug(f"快照文件不存在,无需重置: {storage}")
@@ -187,18 +185,18 @@ class Monitor(metaclass=Singleton):
def load_snapshot(self, storage: str) -> Optional[Dict]:
"""
从文件加载快照
从文件缓存加载快照
:param storage: 存储名称
:return: 快照数据或None
"""
try:
cache_file = self._snapshot_cache_dir / f"{storage}_snapshot.json"
if cache_file.exists():
with open(cache_file, 'r', encoding='utf-8') as f:
data = json.load(f)
logger.debug(f"成功加载快照: {cache_file}, 包含 {len(data.get('snapshot', {}))} 个文件")
return data
logger.debug(f"快照文件不存在: {cache_file}")
cache_key = f"{storage}_snapshot"
snapshot_data = self._snapshot_cache.get(cache_key, region="snapshots")
if snapshot_data:
data = json.loads(snapshot_data.decode('utf-8'))
logger.debug(f"成功加载快照: {storage}, 包含 {len(data.get('snapshot', {}))} 个文件")
return data
logger.debug(f"快照文件不存在: {storage}")
return None
except Exception as e:
logger.error(f"加载快照失败: {e}")
@@ -793,4 +791,6 @@ class Monitor(metaclass=Singleton):
self._scheduler = None
if self._cache:
self._cache.close()
if self._snapshot_cache:
self._snapshot_cache.close()
self._event.clear()

View File

@@ -1,4 +1,4 @@
from typing import Optional, List
from typing import Optional, List, Dict, Any
from pydantic import BaseModel, Field
@@ -67,3 +67,17 @@ class PluginDashboard(Plugin):
cols: Optional[dict] = Field(default_factory=dict)
# 页面元素
elements: Optional[List[dict]] = Field(default_factory=list)
class PluginMemoryInfo(BaseModel):
"""插件内存信息"""
plugin_id: str = Field(description="插件ID")
plugin_name: str = Field(description="插件名称")
plugin_version: str = Field(description="插件版本")
total_memory_bytes: int = Field(description="总内存使用量(字节)")
total_memory_mb: float = Field(description="总内存使用量(MB)")
object_count: int = Field(description="对象数量")
calculation_time_ms: float = Field(description="计算耗时(毫秒)")
timestamp: float = Field(description="统计时间戳")
error: Optional[str] = Field(default=None, description="错误信息")
object_details: Optional[List[Dict[str, Any]]] = Field(default=None, description="大对象详情")

178
app/utils/memory.py Normal file
View File

@@ -0,0 +1,178 @@
import sys
import time
from collections import deque
from typing import Any, Dict, Set
from app.log import logger
class MemoryCalculator:
"""
内存计算器,用于递归计算对象的内存占用
"""
def __init__(self):
# 缓存已计算的对象ID避免重复计算
self._calculated_ids: Set[int] = set()
# 最大递归深度,防止无限递归
self._max_depth = 10
# 最大对象数量,防止计算过多对象
self._max_objects = 10000
def calculate_object_memory(self, obj: Any, max_depth: int = None, max_objects: int = None) -> Dict[str, Any]:
"""
计算对象的内存占用
:param obj: 要计算的对象
:param max_depth: 最大递归深度
:param max_objects: 最大对象数量
:return: 内存统计信息
"""
if max_depth is None:
max_depth = self._max_depth
if max_objects is None:
max_objects = self._max_objects
# 重置缓存
self._calculated_ids.clear()
start_time = time.time()
object_details = []
try:
# 递归计算内存
memory_info = self._calculate_recursive(obj, depth=0, max_depth=max_depth,
max_objects=max_objects, object_count=0)
total_memory = memory_info['total_memory']
object_count = memory_info['object_count']
object_details = memory_info['object_details']
except Exception as e:
logger.error(f"计算对象内存时出错:{str(e)}")
total_memory = 0
object_count = 0
calculation_time = time.time() - start_time
return {
'total_memory_bytes': total_memory,
'total_memory_mb': round(total_memory / (1024 * 1024), 2),
'object_count': object_count,
'calculation_time_ms': round(calculation_time * 1000, 2),
'object_details': object_details[:10] # 只返回前10个最大的对象
}
def _calculate_recursive(self, obj: Any, depth: int, max_depth: int,
max_objects: int, object_count: int) -> Dict[str, Any]:
"""
递归计算对象内存
"""
if depth > max_depth or object_count > max_objects:
return {
'total_memory': 0,
'object_count': object_count,
'object_details': []
}
total_memory = 0
object_details = []
# 获取对象ID避免重复计算
obj_id = id(obj)
if obj_id in self._calculated_ids:
return {
'total_memory': 0,
'object_count': object_count,
'object_details': []
}
self._calculated_ids.add(obj_id)
object_count += 1
try:
# 计算对象本身的内存
obj_memory = sys.getsizeof(obj)
total_memory += obj_memory
# 记录大对象
if obj_memory > 1024: # 大于1KB的对象
object_details.append({
'type': type(obj).__name__,
'memory_bytes': obj_memory,
'memory_mb': round(obj_memory / (1024 * 1024), 2),
'depth': depth
})
# 递归计算容器对象的内容
if depth < max_depth:
container_memory = self._calculate_container_memory(
obj, depth + 1, max_depth, max_objects, object_count
)
total_memory += container_memory['total_memory']
object_count = container_memory['object_count']
object_details.extend(container_memory['object_details'])
except Exception as e:
logger.debug(f"计算对象 {type(obj).__name__} 内存时出错:{str(e)}")
return {
'total_memory': total_memory,
'object_count': object_count,
'object_details': object_details
}
def _calculate_container_memory(self, obj: Any, depth: int, max_depth: int,
max_objects: int, object_count: int) -> Dict[str, Any]:
"""
计算容器对象的内存
"""
total_memory = 0
object_details = []
try:
# 处理不同类型的容器
if isinstance(obj, (list, tuple, deque)):
for item in obj:
if object_count > max_objects:
break
item_memory = self._calculate_recursive(item, depth, max_depth, max_objects, object_count)
total_memory += item_memory['total_memory']
object_count = item_memory['object_count']
object_details.extend(item_memory['object_details'])
elif isinstance(obj, dict):
for key, value in obj.items():
if object_count > max_objects:
break
# 计算key的内存
key_memory = self._calculate_recursive(key, depth, max_depth, max_objects, object_count)
total_memory += key_memory['total_memory']
object_count = key_memory['object_count']
object_details.extend(key_memory['object_details'])
# 计算value的内存
value_memory = self._calculate_recursive(value, depth, max_depth, max_objects, object_count)
total_memory += value_memory['total_memory']
object_count = value_memory['object_count']
object_details.extend(value_memory['object_details'])
elif hasattr(obj, '__dict__'):
# 处理有__dict__属性的对象
for attr_name, attr_value in obj.__dict__.items():
if object_count > max_objects:
break
# 跳过一些特殊属性
if attr_name.startswith('_') and attr_name not in ['_calculated_ids']:
continue
attr_memory = self._calculate_recursive(attr_value, depth, max_depth, max_objects, object_count)
total_memory += attr_memory['total_memory']
object_count = attr_memory['object_count']
object_details.extend(attr_memory['object_details'])
except Exception as e:
logger.debug(f"计算容器对象 {type(obj).__name__} 内存时出错:{str(e)}")
return {
'total_memory': total_memory,
'object_count': object_count,
'object_details': object_details
}

View File

@@ -229,7 +229,7 @@ class StringUtils:
size = float(size)
d = [(1024 - 1, 'K'), (1024 ** 2 - 1, 'M'), (1024 ** 3 - 1, 'G'), (1024 ** 4 - 1, 'T')]
s = [x[0] for x in d]
index = bisect.bisect_left(s, size) - 1 # noqa
index = bisect.bisect_left(s, size) - 1 # noqa
if index == -1:
return str(size) + "B"
else:
@@ -925,3 +925,16 @@ class StringUtils:
if re.match(r'^[a-zA-Z0-9.-]+(\.[a-zA-Z]{2,})?$', text):
return True
return False
@staticmethod
def is_magnet_link(content: Union[str, bytes]) -> bool:
"""
判断内容是否为磁力链接
"""
if not content:
return False
if isinstance(content, str) and content.startswith("magnet:"):
return True
if isinstance(content, bytes) and content.startswith(b"magnet:"):
return True
return False

View File

@@ -21,7 +21,11 @@ depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
# 站点数据统计增加站点名称
with contextlib.suppress(Exception):
conn = op.get_bind()
inspector = sa.inspect(conn)
columns = inspector.get_columns('siteuserdata')
# 检查 'name' 字段是否已存在
if not any(c['name'] == 'name' for c in columns):
op.add_column('siteuserdata', sa.Column('name', sa.String(), nullable=True))
# ### end Alembic commands ###

View File

@@ -18,19 +18,18 @@ depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with contextlib.suppress(Exception):
# 添加触发类型字段
conn = op.get_bind()
inspector = sa.inspect(conn)
columns = inspector.get_columns('workflow')
if not any(c['name'] == 'trigger_type' for c in columns):
op.add_column('workflow', sa.Column('trigger_type', sa.String(), nullable=True, default='timer'))
with contextlib.suppress(Exception):
# 添加事件类型字段
if not any(c['name'] == 'event_type' for c in columns):
op.add_column('workflow', sa.Column('event_type', sa.String(), nullable=True))
with contextlib.suppress(Exception):
# 添加事件条件字段
if not any(c['name'] == 'event_conditions' for c in columns):
op.add_column('workflow', sa.Column('event_conditions', sa.JSON(), nullable=True, default={}))
# ### end Alembic commands ###
def downgrade() -> None:

View File

@@ -19,13 +19,28 @@ depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with contextlib.suppress(Exception):
conn = op.get_bind()
inspector = sa.inspect(conn)
# 检查并添加 downloadhistory.episode_group
dh_columns = inspector.get_columns('downloadhistory')
if not any(c['name'] == 'episode_group' for c in dh_columns):
op.add_column('downloadhistory', sa.Column('episode_group', sa.String, nullable=True))
# 检查并添加 subscribe.episode_group
s_columns = inspector.get_columns('subscribe')
if not any(c['name'] == 'episode_group' for c in s_columns):
op.add_column('subscribe', sa.Column('episode_group', sa.String, nullable=True))
# 检查并添加 subscribehistory.episode_group
sh_columns = inspector.get_columns('subscribehistory')
if not any(c['name'] == 'episode_group' for c in sh_columns):
op.add_column('subscribehistory', sa.Column('episode_group', sa.String, nullable=True))
# 检查并添加 transferhistory.episode_group
th_columns = inspector.get_columns('transferhistory')
if not any(c['name'] == 'episode_group' for c in th_columns):
op.add_column('transferhistory', sa.Column('episode_group', sa.String, nullable=True))
# ### end Alembic commands ###
def downgrade() -> None:

View File

@@ -18,11 +18,11 @@ depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
# 整理历史记录 增加下载器字段
with contextlib.suppress(Exception):
conn = op.get_bind()
inspector = sa.inspect(conn)
columns = inspector.get_columns('transferhistory')
if not any(c['name'] == 'downloader' for c in columns):
op.add_column('transferhistory', sa.Column('downloader', sa.String(), nullable=True))
# ### end Alembic commands ###
def downgrade() -> None:

View File

@@ -8,6 +8,7 @@ Create Date: 2025-08-19 12:27:08.451371
import sqlalchemy as sa
from alembic import op
from app.log import logger
from app.core.config import settings
# revision identifiers, used by Alembic.
@@ -41,7 +42,7 @@ def fix_postgresql_sequences():
"""))
tables = [row[0] for row in result.fetchall()]
print(f"发现 {len(tables)} 个表需要检查序列")
logger.info(f"发现 {len(tables)} 个表需要检查序列")
for table_name in tables:
fix_table_sequence(connection, table_name)
@@ -54,7 +55,7 @@ def fix_table_sequence(connection, table_name):
try:
# 跳过alembic_version表它没有id列
if table_name == 'alembic_version':
print(f"跳过表 {table_name}这是Alembic版本表")
logger.debug(f"跳过表 {table_name}这是Alembic版本表")
return
# 检查表是否有id列
@@ -67,22 +68,22 @@ def fix_table_sequence(connection, table_name):
id_column = result.fetchone()
if not id_column:
print(f"{table_name} 没有id列跳过")
logger.debug(f"{table_name} 没有id列跳过")
return
is_identity, column_default = id_column
# 检查是否已经是Identity类型
if is_identity == 'YES' or (column_default and 'GENERATED BY DEFAULT AS IDENTITY' in column_default):
print(f"{table_name} 的id列已经是Identity类型跳过")
logger.debug(f"{table_name} 的id列已经是Identity类型跳过")
return
# 检查是否有序列
print(f"{table_name} 存在序列,需要修复")
logger.info(f"{table_name} 存在序列,需要修复")
convert_to_identity(connection, table_name)
except Exception as e:
print(f"修复表 {table_name} 序列时出错: {e}")
logger.error(f"修复表 {table_name} 序列时出错: {e}")
# 回滚当前事务,避免影响后续操作
connection.rollback()
@@ -106,12 +107,12 @@ def convert_to_identity(connection, table_name):
ALTER COLUMN id ADD GENERATED BY DEFAULT AS IDENTITY (START WITH {next_value})
"""))
print(f"{table_name} 序列已转换为Identity起始值为 {next_value}")
logger.info(f"{table_name} 序列已转换为Identity起始值为 {next_value}")
except Exception as e:
print(f"转换表 {table_name} 序列时出错: {e}")
# 如果是已经存在的Identity错误则忽略
if "already an identity column" in str(e):
print(f"{table_name} 的id列已经是Identity类型忽略此错误")
logger.warn(f"{table_name} 的id列已经是Identity类型忽略此错误: {e}")
return
logger.error(f"转换表 {table_name} 序列时出错: {e}")
raise

View File

@@ -19,10 +19,11 @@ depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with contextlib.suppress(Exception):
conn = op.get_bind()
inspector = sa.inspect(conn)
columns = inspector.get_columns('workflow')
if not any(c['name'] == 'flows' for c in columns):
op.add_column('workflow', sa.Column('flows', sa.JSON(), nullable=True))
# ### end Alembic commands ###
def downgrade() -> None:

View File

@@ -18,11 +18,11 @@ depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
# 下载历史记录 增加下载器字段
with contextlib.suppress(Exception):
conn = op.get_bind()
inspector = sa.inspect(conn)
columns = inspector.get_columns('downloadhistory')
if not any(c['name'] == 'downloader' for c in columns):
op.add_column('downloadhistory', sa.Column('downloader', sa.String(), nullable=True))
# ### end Alembic commands ###
def downgrade() -> None:

View File

@@ -18,13 +18,23 @@ depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
# 订阅增加mediaid
with contextlib.suppress(Exception):
conn = op.get_bind()
inspector = sa.inspect(conn)
# 检查并添加 subscribe.mediaid
s_columns = inspector.get_columns('subscribe')
if not any(c['name'] == 'mediaid' for c in s_columns):
op.add_column('subscribe', sa.Column('mediaid', sa.String(), nullable=True))
# 检查并创建索引
s_indexes = inspector.get_indexes('subscribe')
if not any(i['name'] == 'ix_subscribe_mediaid' for i in s_indexes):
op.create_index('ix_subscribe_mediaid', 'subscribe', ['mediaid'], unique=False)
# 检查并添加 subscribehistory.mediaid
sh_columns = inspector.get_columns('subscribehistory')
if not any(c['name'] == 'mediaid' for c in sh_columns):
op.add_column('subscribehistory', sa.Column('mediaid', sa.String(), nullable=True))
# ### end Alembic commands ###
def downgrade() -> None:

View File

@@ -10,6 +10,7 @@ import contextlib
from alembic import op
import sqlalchemy as sa
from app.log import logger
from app.db import SessionFactory
from app.db.models import UserConfig
@@ -21,28 +22,58 @@ depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
# 支持订阅自定义媒体类别和过滤规则组、自定义识别词
with contextlib.suppress(Exception):
conn = op.get_bind()
inspector = sa.inspect(conn)
# 检查并添加 downloadhistory.media_category
dh_columns = inspector.get_columns('downloadhistory')
if not any(c['name'] == 'media_category' for c in dh_columns):
op.add_column('downloadhistory', sa.Column('media_category', sa.String(), nullable=True))
# 检查并添加 subscribe 表的列
sub_columns = inspector.get_columns('subscribe')
if not any(c['name'] == 'custom_words' for c in sub_columns):
op.add_column('subscribe', sa.Column('custom_words', sa.String(), nullable=True))
if not any(c['name'] == 'media_category' for c in sub_columns):
op.add_column('subscribe', sa.Column('media_category', sa.String(), nullable=True))
if not any(c['name'] == 'filter_groups' for c in sub_columns):
op.add_column('subscribe', sa.Column('filter_groups', sa.JSON(), nullable=True))
# 将String转换为JSON类型
with contextlib.suppress(Exception):
op.alter_column('subscribe', 'note', existing_type=sa.String(), type_=sa.JSON())
op.alter_column('downloadhistory', 'note', existing_type=sa.String(), type_=sa.JSON())
op.alter_column('mediaserveritem', 'note', existing_type=sa.String(), type_=sa.JSON())
op.alter_column('message', 'note', existing_type=sa.String(), type_=sa.JSON())
op.alter_column('plugindata', 'value', existing_type=sa.String(), type_=sa.JSON())
op.alter_column('site', 'note', existing_type=sa.String(), type_=sa.JSON())
op.alter_column('sitestatistic', 'note', existing_type=sa.String(), type_=sa.JSON())
op.alter_column('systemconfig', 'value', existing_type=sa.String(), type_=sa.JSON())
op.alter_column('userconfig', 'value', existing_type=sa.String(), type_=sa.JSON())
# 清空用户配置表中不兼容的数据
# 定义需要检查和转换的表和列
columns_to_alter = {
'subscribe': 'note',
'downloadhistory': 'note',
'mediaserveritem': 'note',
'message': 'note',
'plugindata': 'value',
'site': 'note',
'sitestatistic': 'note',
'systemconfig': 'value',
'userconfig': 'value'
}
for table, column_name in columns_to_alter.items():
try:
cols = inspector.get_columns(table)
# 找到对应的列信息
target_col = next((c for c in cols if c['name'] == column_name), None)
# 如果列存在且类型不是JSON则进行修改
if target_col and not isinstance(target_col['type'], sa.JSON):
# PostgreSQL需要指定USING子句来处理类型转换
if conn.dialect.name == 'postgresql':
op.alter_column(table, column_name,
existing_type=sa.String(),
type_=sa.JSON(),
postgresql_using=f'"{column_name}"::json')
else:
op.alter_column(table, column_name,
existing_type=sa.String(),
type_=sa.JSON())
except Exception as e:
logger.error(f"Could not alter column {column_name} in table {table}: {e}")
with SessionFactory() as db:
UserConfig.truncate(db)
# ### end Alembic commands ###
def downgrade() -> None:

View File

@@ -18,14 +18,19 @@ depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
# 站点管理、订阅增加下载器选项
with contextlib.suppress(Exception):
conn = op.get_bind()
inspector = sa.inspect(conn)
# 检查并添加 site.downloader
site_columns = inspector.get_columns('site')
if not any(c['name'] == 'downloader' for c in site_columns):
op.add_column('site', sa.Column('downloader', sa.String(), nullable=True))
# 检查并添加 subscribe.downloader
subscribe_columns = inspector.get_columns('subscribe')
if not any(c['name'] == 'downloader' for c in subscribe_columns):
op.add_column('subscribe', sa.Column('downloader', sa.String(), nullable=True))
# ### end Alembic commands ###
def downgrade() -> None:
pass

View File

@@ -10,6 +10,8 @@ import contextlib
from alembic import op
import sqlalchemy as sa
from app.log import logger
# revision identifiers, used by Alembic.
revision = 'ecf3c693fdf3'
@@ -19,15 +21,35 @@ depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
# 将String转换为JSON类型
with contextlib.suppress(Exception):
op.alter_column('subscribehistory', 'sites', existing_type=sa.String(), type_=sa.JSON())
with contextlib.suppress(Exception):
op.add_column('subscribehistory', sa.Column('custom_words', sa.String(), nullable=True))
op.add_column('subscribehistory', sa.Column('media_category', sa.String(), nullable=True))
op.add_column('subscribehistory', sa.Column('filter_groups', sa.JSON(), nullable=True))
# ### end Alembic commands ###
conn = op.get_bind()
inspector = sa.inspect(conn)
table_name = 'subscribehistory'
columns = inspector.get_columns(table_name)
try:
sites_col = next((c for c in columns if c['name'] == 'sites'), None)
# 如果 'sites' 列存在且类型不是 JSON则进行修改
if sites_col and not isinstance(sites_col['type'], sa.JSON):
if conn.dialect.name == 'postgresql':
op.alter_column(table_name, 'sites',
existing_type=sa.String(),
type_=sa.JSON(),
postgresql_using='sites::json')
else:
op.alter_column(table_name, 'sites',
existing_type=sa.String(),
type_=sa.JSON())
except Exception as e:
logger.error(f"Could not alter column 'sites' in table {table_name}: {e}")
if not any(c['name'] == 'custom_words' for c in columns):
op.add_column(table_name, sa.Column('custom_words', sa.String(), nullable=True))
if not any(c['name'] == 'media_category' for c in columns):
op.add_column(table_name, sa.Column('media_category', sa.String(), nullable=True))
if not any(c['name'] == 'filter_groups' for c in columns):
op.add_column(table_name, sa.Column('filter_groups', sa.JSON(), nullable=True))
def downgrade() -> None:

View File

@@ -45,59 +45,6 @@ DB_POSTGRESQL_MAX_OVERFLOW=30
## Docker 部署
### 使用内置 PostgreSQL
如果您使用 Docker 部署MoviePilot 容器内置了 PostgreSQL 服务:
#### 使用 Docker Compose推荐
1. 创建 `docker-compose.yml` 文件:
```yaml
version: '3.8'
services:
moviepilot:
image: jxxghp/moviepilot:latest
container_name: moviepilot
restart: unless-stopped
ports:
- "3000:3000" # 前端端口
- "3001:3001" # API端口
environment:
- DB_TYPE=postgresql
- DB_POSTGRESQL_HOST=localhost
- DB_POSTGRESQL_PORT=5432
- DB_POSTGRESQL_DATABASE=moviepilot
- DB_POSTGRESQL_USERNAME=moviepilot
- DB_POSTGRESQL_PASSWORD=moviepilot
volumes:
- ./config:/config
```
2. 启动服务:
```bash
docker-compose up -d
```
#### 使用 Docker 命令
1. 设置环境变量:
```bash
DB_TYPE=postgresql
```
2. 启动容器时PostgreSQL 服务会自动:
- 在配置目录下创建 `postgresql/` 子目录作为数据目录
- 初始化 PostgreSQL 数据目录
- 启动 PostgreSQL 服务
- 创建数据库和用户
- 配置连接权限
3. 数据持久化:
- PostgreSQL 数据存储在 `${CONFIG_DIR}/postgresql/` 目录中
- 日志文件存储在 `${CONFIG_DIR}/postgresql/logs/` 目录中
- 这些目录会通过 Docker 卷映射持久化保存
### 使用外部 PostgreSQL
如果您想使用外部的 PostgreSQL 服务:
@@ -122,6 +69,36 @@ DB_POSTGRESQL_PASSWORD=your-password
3. 启动应用,数据库表会自动创建
4. 使用数据库迁移工具或手动导入数据
#### 注意事项
完成数据迁移后需要对postgresql中的表进行索引初始值进行更新否则会出现唯一索引已存在的异常
例如:
```json
EventType.SiteUpdated
SiteChain.cache_site_userdata
(psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "siteuserdata_pkey"
DETAIL: Key (id)=(18) already exists.
[SQL: INSERT INTO siteuserdata (domain, name, username, userid, user_level, join_at, bonus, upload, download, ratio, seeding, leeching, seeding_size, leeching_size, seeding_info, message_unread, message_unread_contents, err_msg, updated_day, updated_time) VALUES (%(domain)s, %(name)s, %(username)s, %(userid)s, %(user_level)s, %(join_at)s, %(bonus)s, %(upload)s, %(download)s, %(ratio)s, %(seeding)s, %(leeching)s, %(seeding_size)s, %(leeching_size)s, %(seeding_info)s::JSON, %(message_unread)s, %(message_unread_contents)s::JSON, %(err_msg)s, %(updated_day)s, %(updated_time)s) RETURNING siteuserdata.id]
[parameters: {'domain': 'btschool.club', 'name': '', 'username': None, 'userid': None, 'user_level': None, 'join_at': None, 'bonus': 0.0, 'upload': 0, 'download': 0, 'ratio': 0.0, 'seeding': 0, 'leeching': 0, 'seeding_size': 0, 'leeching_size': 0, 'seeding_info': '[]', 'message_unread': 0, 'message_unread_contents': '[]', 'err_msg': 'cookies', 'updated_day': '2025-08-22', 'updated_time': '09:52:01'}]
(Background on this error at: https://sqlalche.me/e/20/gkpj)
```
需要对每一个表分别执行下面的语句(下面的SQL以`workflowc`数据表为例,每张表请自行修改,其中`user`表因为关键字原因,应该写成`public.user`的方式)
```sql
DO $$
DECLARE
max_id INTEGER;
BEGIN
-- 查询最大 ID 值
SELECT COALESCE(MAX(id), 0) INTO max_id FROM workflow;
-- 调整序列
EXECUTE format('ALTER SEQUENCE workflow_id_seq RESTART WITH %s', max_id + 1);
END $$;
```
### 从 PostgreSQL 迁移到 SQLite
1. 导出 PostgreSQL 数据

View File

@@ -1,2 +1,2 @@
APP_VERSION = 'v2.7.4'
FRONTEND_VERSION = 'v2.7.4'
APP_VERSION = 'v2.7.5'
FRONTEND_VERSION = 'v2.7.5'