Compare commits

...

59 Commits

Author SHA1 Message Date
jxxghp
162ba9307d fix restart 2025-06-10 07:09:59 +08:00
jxxghp
49dae92b8e fix flag path 2025-06-09 21:58:02 +08:00
jxxghp
b484a52b6d v2.5.4
- 插件市场支持手动刷新
- 优化了重置容器时已安装插件的恢复策略
2025-06-09 20:57:44 +08:00
jxxghp
d754091a7c fix log 2025-06-09 20:44:48 +08:00
jxxghp
e2febc24ae feat:插件市场支持强制刷新 2025-06-09 20:33:06 +08:00
jxxghp
d0677edaaa fix 优雅停止 2025-06-09 15:39:11 +08:00
jxxghp
f0aaecd0c7 fix #4413 2025-06-09 14:45:26 +08:00
jxxghp
3518940fec Merge pull request #4413 from cddjr/fix_plugin
修复分身的一些BUG
2025-06-09 14:42:54 +08:00
jxxghp
2e5c92ae0c fix 优雅停止 2025-06-09 13:09:16 +08:00
jxxghp
4ad699dbe6 fix 优雅停止 2025-06-09 13:06:27 +08:00
景大侠
931be9e6aa fix 分身复用原插件配置 2025-06-09 09:54:55 +08:00
景大侠
9656d6fbd0 fix 分身类名使用小写后缀
避免与分身ID不一致,导致误判没有安装
2025-06-09 09:51:06 +08:00
景大侠
c7cbb13044 fix 插件卸载后从系统模块中移除
避免分身时误报插件已存在
2025-06-09 09:50:55 +08:00
jxxghp
327d30dcc2 feat:识别容器是否重置 2025-06-09 09:15:58 +08:00
jxxghp
e4e2079917 fix:插件恢复安全性 2025-06-09 08:30:24 +08:00
jxxghp
0427506572 fix:移除Action类静态属性 2025-06-09 08:18:43 +08:00
jxxghp
ea168edb43 fix:移除Oper类静态属性 2025-06-09 08:08:55 +08:00
jxxghp
aa039c6c05 feat:启停插件自动备份与恢复 2025-06-09 08:04:44 +08:00
jxxghp
3de998051a fix memory snapshot 2025-06-08 21:57:49 +08:00
jxxghp
69ade1ae37 更新内存快照间隔为30分钟,保留的内存快照文件数量减少至20个 2025-06-08 21:48:37 +08:00
jxxghp
1d6133e3b1 fix plugins遍历 2025-06-08 21:39:37 +08:00
jxxghp
203a111d1a remove gc 2025-06-08 21:24:26 +08:00
jxxghp
0a20234268 remove gc 2025-06-08 21:19:15 +08:00
jxxghp
7f8e50f83d fix memory helper 2025-06-08 21:13:37 +08:00
jxxghp
443ef7d41b fix 2025-06-08 21:06:27 +08:00
jxxghp
059ae6595d fix 2025-06-08 20:37:42 +08:00
jxxghp
19c3dad338 fix 2025-06-08 19:41:46 +08:00
jxxghp
81bc51c972 fix pympler 2025-06-08 19:02:25 +08:00
jxxghp
6c17868744 add pympler 2025-06-08 18:55:02 +08:00
jxxghp
a18040ccfa add pympler 2025-06-08 18:54:35 +08:00
jxxghp
0835a75503 更新 thread.py 2025-06-08 14:43:13 +08:00
jxxghp
3ee32757e5 rollback 2025-06-08 14:35:59 +08:00
jxxghp
344abfa8d8 fix memory helper 2025-06-08 14:03:01 +08:00
jxxghp
906b2a3485 fix memory statistics 2025-06-08 11:36:15 +08:00
jxxghp
e0d2b87ed3 wallpaper cache skip empty 2025-06-08 11:30:57 +08:00
jxxghp
83a8c8b42b fix memory threshold 2025-06-08 11:14:16 +08:00
jxxghp
d840ed6c5a fix memory log 2025-06-08 11:08:01 +08:00
jxxghp
0112087be4 refactor #4407 2025-06-08 10:51:59 +08:00
jxxghp
7320084e11 rollback #4379 2025-06-07 22:26:51 +08:00
jxxghp
23929f5eaa fix pool size 2025-06-07 22:00:09 +08:00
jxxghp
c002d4619a 更新 scheduler.py 2025-06-07 20:11:31 +08:00
jxxghp
f60a909bba 更新 version.py 2025-06-07 11:43:04 +08:00
jxxghp
c2c22e3968 Merge pull request #4399 from cddjr/fix_subscribe 2025-06-07 11:42:25 +08:00
jxxghp
f10299b2de Merge pull request #4403 from cddjr/fix_systemconfig 2025-06-07 11:41:36 +08:00
景大侠
1d3563ed97 fix(config): 修复新装的插件会消失的问题 2025-06-07 11:33:28 +08:00
景大侠
f3eb2caa4e fix(subscribe): 避免重复下载已入库的剧集 2025-06-07 02:48:22 +08:00
jxxghp
2364dacd52 添加对 GitHub 容器注册 2025-06-06 22:02:04 +08:00
jxxghp
883f7451c3 fix event log 2025-06-06 21:45:14 +08:00
jxxghp
a534c9bca1 fix 设置保存失败提示 2025-06-06 21:30:11 +08:00
jxxghp
b14202a324 fix logger 2025-06-06 21:18:31 +08:00
jxxghp
a6fae48f07 更新 system.py 2025-06-06 17:15:25 +08:00
jxxghp
963caf2afe fix logger reload 2025-06-06 16:31:00 +08:00
jxxghp
50b0268531 v2.5.3-1 2025-06-06 15:37:44 +08:00
jxxghp
f484b64be3 fix 2025-06-06 15:37:02 +08:00
jxxghp
349535557f 更新 subscribe.py 2025-06-06 14:04:12 +08:00
jxxghp
de4973a270 feat:内存监控开关 2025-06-06 13:49:52 +08:00
jxxghp
e42d2baf8a fix lint 2025-06-05 22:14:14 +08:00
jxxghp
eac435b233 fix lint 2025-06-05 22:13:33 +08:00
jxxghp
447b8564e9 更新 GitHub Actions 工作流 2025-06-05 22:02:52 +08:00
60 changed files with 1034 additions and 549 deletions

View File

@@ -25,7 +25,9 @@ jobs:
id: meta
uses: docker/metadata-action@v5
with:
images: ${{ secrets.DOCKER_USERNAME }}/moviepilot-v2
images: |
${{ secrets.DOCKER_USERNAME }}/moviepilot-v2
ghcr.io/${{ github.repository }}
tags: |
type=raw,value=${{ env.app_version }}
type=raw,value=latest
@@ -42,6 +44,13 @@ jobs:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Login GitHub Container Registry
uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Build Image
uses: docker/build-push-action@v5
with:

30
.github/workflows/issues.yml vendored Normal file
View File

@@ -0,0 +1,30 @@
name: Close inactive issues
on:
workflow_dispatch:
schedule:
# Github Action 只支持 UTC 时间。
# '0 18 * * *' 对应 UTC 时间的 18:00也就是中国时区 (UTC+8) 的第二天凌晨 02:00。
- cron: "0 18 * * *"
jobs:
close-issues:
runs-on: ubuntu-latest
permissions:
issues: write
pull-requests: write
steps:
- uses: actions/stale@v5
with:
# 标记 stale 标签时间
days-before-issue-stale: 30
# 关闭 issues 标签时间
days-before-issue-close: 14
# 自定义标签名
stale-issue-label: "stale"
stale-issue-message: "此问题已过时,因为它已打开 30 天且没有任何活动。"
close-issue-message: "此问题已关闭,因为它在标记为 stale 后,已处于无更新状态 14 天。"
# 忽略所有的 Pull Request只处理 Issue
days-before-pr-stale: -1
days-before-pr-close: -1
repo-token: ${{ secrets.GITHUB_TOKEN }}

View File

@@ -1,16 +1,6 @@
name: Pylint Code Quality Check
on:
# 在推送到任何分支时运行
push:
branches: [ "main", "v2", "develop" ]
paths:
- '**.py'
# 在PR时运行
pull_request:
branches: [ "main", "v2", "develop" ]
paths:
- '**.py'
# 允许手动触发
workflow_dispatch:
@@ -24,12 +14,13 @@ jobs:
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: '3.11'
python-version: '3.12'
cache: 'pip'
- name: Cache pip dependencies
uses: actions/cache@v3
uses: actions/cache@v4
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pip-${{ hashFiles('**/requirements.txt', '**/requirements.in') }}
@@ -38,13 +29,17 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install --upgrade pip setuptools wheel
pip install pylint
# 安装项目依赖
if [ -f requirements.txt ]; then
echo "📦 安装 requirements.txt 中的依赖..."
pip install -r requirements.txt
elif [ -f requirements.in ]; then
echo "📦 安装 requirements.in 中的依赖..."
pip install -r requirements.in
else
echo "⚠️ 未找到依赖文件,仅安装 pylint"
fi
- name: Verify pylint config
@@ -83,7 +78,7 @@ jobs:
pylint app/ --score=yes --reports=no | tail -2 || true
- name: Upload pylint report
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
if: always()
with:
name: pylint-report

View File

@@ -26,37 +26,31 @@ class AddDownloadAction(BaseAction):
添加下载资源
"""
# 已添加的下载
_added_downloads = []
_has_error = False
def __init__(self, action_id: str):
super().__init__(action_id)
self.downloadchain = DownloadChain()
self.mediachain = MediaChain()
self._added_downloads = []
self._has_error = False
@classmethod
@property
def name(cls) -> str: # noqa
def name(cls) -> str: # noqa
return "添加下载"
@classmethod
@property
def description(cls) -> str: # noqa
def description(cls) -> str: # noqa
return "根据资源列表添加下载任务"
@classmethod
@property
def data(cls) -> dict: # noqa
def data(cls) -> dict: # noqa
return AddDownloadParams().dict()
@property
def success(self) -> bool:
return not self._has_error
def execute(self, workflow_id: int, params: dict, context: ActionContext) -> ActionContext:
def execute(self, workflow_id: int, params: dict, context: ActionContext) -> ActionContext:
"""
将上下文中的torrents添加到下载任务中
"""
@@ -73,13 +67,13 @@ class AddDownloadAction(BaseAction):
if not t.meta_info:
t.meta_info = MetaInfo(title=t.torrent_info.title, subtitle=t.torrent_info.description)
if not t.media_info:
t.media_info = self.mediachain.recognize_media(meta=t.meta_info)
t.media_info = MediaChain().recognize_media(meta=t.meta_info)
if not t.media_info:
self._has_error = True
logger.warning(f"{t.torrent_info.title} 未识别到媒体信息,无法下载")
continue
if params.only_lack:
exists_info = self.downloadchain.media_exists(t.media_info)
exists_info = DownloadChain().media_exists(t.media_info)
if exists_info:
if t.media_info.type == MediaType.MOVIE:
# 电影
@@ -96,14 +90,15 @@ class AddDownloadAction(BaseAction):
exists_episodes = exists_seasons.get(t.meta_info.begin_season)
if exists_episodes:
if set(t.meta_info.episode_list).issubset(exists_episodes):
logger.warning(f"{t.meta_info.title}{t.meta_info.begin_season} 季第 {t.meta_info.episode_list} 集已存在,跳过")
logger.warning(
f"{t.meta_info.title}{t.meta_info.begin_season} 季第 {t.meta_info.episode_list} 集已存在,跳过")
continue
_started = True
did = self.downloadchain.download_single(context=t,
downloader=params.downloader,
save_path=params.save_path,
label=params.labels)
did = DownloadChain().download_single(context=t,
downloader=params.downloader,
save_path=params.save_path,
label=params.labels)
if did:
self._added_downloads.append(did)
# 保存缓存

View File

@@ -19,29 +19,24 @@ class AddSubscribeAction(BaseAction):
添加订阅
"""
_added_subscribes = []
_has_error = False
def __init__(self, action_id: str):
super().__init__(action_id)
self.subscribechain = SubscribeChain()
self.subscribeoper = SubscribeOper()
self._added_subscribes = []
self._has_error = False
@classmethod
@property
def name(cls) -> str: # noqa
def name(cls) -> str: # noqa
return "添加订阅"
@classmethod
@property
def description(cls) -> str: # noqa
def description(cls) -> str: # noqa
return "根据媒体列表添加订阅"
@classmethod
@property
def data(cls) -> dict: # noqa
def data(cls) -> dict: # noqa
return AddSubscribeParams().dict()
@property
@@ -63,19 +58,20 @@ class AddSubscribeAction(BaseAction):
continue
mediainfo = MediaInfo()
mediainfo.from_dict(media.dict())
if self.subscribechain.exists(mediainfo):
subscribechain = SubscribeChain()
if subscribechain.exists(mediainfo):
logger.info(f"{media.title} 已存在订阅")
continue
# 添加订阅
_started = True
sid, message = self.subscribechain.add(mtype=mediainfo.type,
title=mediainfo.title,
year=mediainfo.year,
tmdbid=mediainfo.tmdb_id,
season=mediainfo.season,
doubanid=mediainfo.douban_id,
bangumiid=mediainfo.bangumi_id,
username=settings.SUPERUSER)
sid, message = subscribechain.add(mtype=mediainfo.type,
title=mediainfo.title,
year=mediainfo.year,
tmdbid=mediainfo.tmdb_id,
season=mediainfo.season,
doubanid=mediainfo.douban_id,
bangumiid=mediainfo.bangumi_id,
username=settings.SUPERUSER)
if sid:
self._added_subscribes.append(sid)
# 保存缓存
@@ -84,7 +80,7 @@ class AddSubscribeAction(BaseAction):
if self._added_subscribes:
logger.info(f"已添加 {len(self._added_subscribes)} 个订阅")
for sid in self._added_subscribes:
context.subscribes.append(self.subscribeoper.get(sid))
context.subscribes.append(SubscribeOper().get(sid))
elif _started:
self._has_error = True

View File

@@ -16,11 +16,8 @@ class FetchDownloadsAction(BaseAction):
获取下载任务
"""
_downloads = []
def __init__(self, action_id: str):
super().__init__(action_id)
self.chain = ActionChain()
self._downloads = []
@classmethod
@@ -51,7 +48,7 @@ class FetchDownloadsAction(BaseAction):
if global_vars.is_workflow_stopped(workflow_id):
break
logger.info(f"获取下载任务 {download.download_id} 状态 ...")
torrents = self.chain.list_torrents(hashs=[download.download_id])
torrents = ActionChain().list_torrents(hashs=[download.download_id])
if not torrents:
download.completed = True
continue

View File

@@ -27,10 +27,6 @@ class FetchMediasAction(BaseAction):
获取媒体数据
"""
_inner_sources = []
_medias = []
_has_error = False
def __init__(self, action_id: str):
super().__init__(action_id)

View File

@@ -29,29 +29,24 @@ class FetchRssAction(BaseAction):
获取RSS资源列表
"""
_rss_torrents = []
_has_error = False
def __init__(self, action_id: str):
super().__init__(action_id)
self.rsshelper = RssHelper()
self.chain = ActionChain()
self._rss_torrents = []
self._has_error = False
@classmethod
@property
def name(cls) -> str: # noqa
def name(cls) -> str: # noqa
return "获取RSS资源"
@classmethod
@property
def description(cls) -> str: # noqa
def description(cls) -> str: # noqa
return "订阅RSS地址获取资源"
@classmethod
@property
def data(cls) -> dict: # noqa
def data(cls) -> dict: # noqa
return FetchRssParams().dict()
@property
@@ -74,10 +69,10 @@ class FetchRssAction(BaseAction):
if params.ua:
headers["User-Agent"] = params.ua
rss_items = self.rsshelper.parse(url=params.url,
proxy=settings.PROXY if params.proxy else None,
timeout=params.timeout,
headers=headers)
rss_items = RssHelper().parse(url=params.url,
proxy=settings.PROXY if params.proxy else None,
timeout=params.timeout,
headers=headers)
if rss_items is None or rss_items is False:
logger.error(f'RSS地址 {params.url} 请求失败!')
self._has_error = True
@@ -103,7 +98,7 @@ class FetchRssAction(BaseAction):
meta = MetaInfo(title=torrentinfo.title, subtitle=torrentinfo.description)
mediainfo = None
if params.match_media:
mediainfo = self.chain.recognize_media(meta)
mediainfo = ActionChain().recognize_media(meta)
if not mediainfo:
logger.warning(f"{torrentinfo.title} 未识别到媒体信息")
continue

View File

@@ -29,26 +29,23 @@ class FetchTorrentsAction(BaseAction):
搜索站点资源
"""
_torrents = []
def __init__(self, action_id: str):
super().__init__(action_id)
self.searchchain = SearchChain()
self._torrents = []
@classmethod
@property
def name(cls) -> str: # noqa
def name(cls) -> str: # noqa
return "搜索站点资源"
@classmethod
@property
def description(cls) -> str: # noqa
def description(cls) -> str: # noqa
return "搜索站点种子资源列表"
@classmethod
@property
def data(cls) -> dict: # noqa
def data(cls) -> dict: # noqa
return FetchTorrentsParams().dict()
@property
@@ -60,9 +57,10 @@ class FetchTorrentsAction(BaseAction):
搜索站点,获取资源列表
"""
params = FetchTorrentsParams(**params)
searchchain = SearchChain()
if params.search_type == "keyword":
# 按关键字搜索
torrents = self.searchchain.search_by_title(title=params.name, sites=params.sites)
torrents = searchchain.search_by_title(title=params.name, sites=params.sites)
for torrent in torrents:
if global_vars.is_workflow_stopped(workflow_id):
break
@@ -74,7 +72,7 @@ class FetchTorrentsAction(BaseAction):
continue
# 识别媒体信息
if params.match_media:
torrent.media_info = self.searchchain.recognize_media(torrent.meta_info)
torrent.media_info = searchchain.recognize_media(torrent.meta_info)
if not torrent.media_info:
logger.warning(f"{torrent.torrent_info.title} 未识别到媒体信息")
continue
@@ -84,10 +82,10 @@ class FetchTorrentsAction(BaseAction):
for media in context.medias:
if global_vars.is_workflow_stopped(workflow_id):
break
torrents = self.searchchain.search_by_id(tmdbid=media.tmdb_id,
doubanid=media.douban_id,
mtype=MediaType(media.type),
sites=params.sites)
torrents = searchchain.search_by_id(tmdbid=media.tmdb_id,
doubanid=media.douban_id,
mtype=MediaType(media.type),
sites=params.sites)
for torrent in torrents:
self._torrents.append(torrent)

View File

@@ -22,8 +22,6 @@ class FilterMediasAction(BaseAction):
过滤媒体数据
"""
_medias = []
def __init__(self, action_id: str):
super().__init__(action_id)
self._medias = []

View File

@@ -27,12 +27,8 @@ class FilterTorrentsAction(BaseAction):
过滤资源数据
"""
_torrents = []
def __init__(self, action_id: str):
super().__init__(action_id)
self.torrenthelper = TorrentHelper()
self.chain = ActionChain()
self._torrents = []
@classmethod
@@ -62,7 +58,7 @@ class FilterTorrentsAction(BaseAction):
for torrent in context.torrents:
if global_vars.is_workflow_stopped(workflow_id):
break
if self.torrenthelper.filter_torrent(
if TorrentHelper().filter_torrent(
torrent_info=torrent.torrent_info,
filter_params={
"quality": params.quality,
@@ -73,7 +69,7 @@ class FilterTorrentsAction(BaseAction):
"size": params.size
}
):
if self.chain.filter_torrents(
if ActionChain().filter_torrents(
rule_groups=params.rule_groups,
torrent_list=[torrent.torrent_info],
mediainfo=torrent.media_info

View File

@@ -20,8 +20,6 @@ class InvokePluginAction(BaseAction):
调用插件
"""
_success = False
def __init__(self, action_id: str):
super().__init__(action_id)
self._success = False

View File

@@ -24,12 +24,8 @@ class ScanFileAction(BaseAction):
整理文件
"""
_fileitems = []
_has_error = False
def __init__(self, action_id: str):
super().__init__(action_id)
self.storagechain = StorageChain()
self._fileitems = []
self._has_error = False
@@ -59,12 +55,13 @@ class ScanFileAction(BaseAction):
params = ScanFileParams(**params)
if not params.storage or not params.directory:
return context
fileitem = self.storagechain.get_file_item(params.storage, Path(params.directory))
storagechain = StorageChain()
fileitem = storagechain.get_file_item(params.storage, Path(params.directory))
if not fileitem:
logger.error(f"目录不存在: 【{params.storage}{params.directory}")
self._has_error = True
return context
files = self.storagechain.list_files(fileitem, recursion=True)
files = storagechain.list_files(fileitem, recursion=True)
for file in files:
if global_vars.is_workflow_stopped(workflow_id):
break

View File

@@ -21,13 +21,8 @@ class ScrapeFileAction(BaseAction):
刮削文件
"""
_scraped_files = []
_has_error = False
def __init__(self, action_id: str):
super().__init__(action_id)
self.storagechain = StorageChain()
self.mediachain = MediaChain()
self._scraped_files = []
self._has_error = False
@@ -61,7 +56,7 @@ class ScrapeFileAction(BaseAction):
break
if fileitem in self._scraped_files:
continue
if not self.storagechain.exists(fileitem):
if not StorageChain().exists(fileitem):
continue
# 检查缓存
cache_key = f"{fileitem.path}"
@@ -69,12 +64,13 @@ class ScrapeFileAction(BaseAction):
logger.info(f"{fileitem.path} 已刮削过,跳过")
continue
meta = MetaInfoPath(Path(fileitem.path))
mediainfo = self.mediachain.recognize_media(meta)
mediachain = MediaChain()
mediainfo = mediachain.recognize_media(meta)
if not mediainfo:
_failed_count += 1
logger.info(f"{fileitem.path} 未识别到媒体信息,无法刮削")
continue
self.mediachain.scrape_metadata(fileitem=fileitem, meta=meta, mediainfo=mediainfo)
mediachain.scrape_metadata(fileitem=fileitem, meta=meta, mediainfo=mediainfo)
self._scraped_files.append(fileitem)
# 保存缓存
self.save_cache(workflow_id, cache_key)

View File

@@ -4,7 +4,7 @@ from pydantic import Field
from app.actions import BaseAction, ActionChain
from app.schemas import ActionParams, ActionContext, Notification
from core.config import settings
from app.core.config import settings
class SendMessageParams(ActionParams):
@@ -22,7 +22,6 @@ class SendMessageAction(BaseAction):
def __init__(self, action_id: str):
super().__init__(action_id)
self.chain = ActionChain()
@classmethod
@property
@@ -60,7 +59,7 @@ class SendMessageAction(BaseAction):
if not params.client:
params.client = [""]
for client in params.client:
self.chain.post_message(
ActionChain().post_message(
Notification(
source=client,
userid=params.userid,

View File

@@ -26,30 +26,24 @@ class TransferFileAction(BaseAction):
整理文件
"""
_fileitems = []
_has_error = False
def __init__(self, action_id: str):
super().__init__(action_id)
self.transferchain = TransferChain()
self.storagechain = StorageChain()
self.transferhis = TransferHistoryOper()
self._fileitems = []
self._has_error = False
@classmethod
@property
def name(cls) -> str: # noqa
def name(cls) -> str: # noqa
return "整理文件"
@classmethod
@property
def description(cls) -> str: # noqa
def description(cls) -> str: # noqa
return "整理队列中的文件"
@classmethod
@property
def data(cls) -> dict: # noqa
def data(cls) -> dict: # noqa
return TransferFileParams().dict()
@property
@@ -72,6 +66,9 @@ class TransferFileAction(BaseAction):
params = TransferFileParams(**params)
# 失败次数
_failed_count = 0
storagechain = StorageChain()
transferchain = TransferChain()
transferhis = TransferHistoryOper()
if params.source == "downloads":
# 从下载任务中整理文件
for download in context.downloads:
@@ -85,16 +82,16 @@ class TransferFileAction(BaseAction):
if self.check_cache(workflow_id, cache_key):
logger.info(f"{download.path} 已整理过,跳过")
continue
fileitem = self.storagechain.get_file_item(storage="local", path=Path(download.path))
fileitem = storagechain.get_file_item(storage="local", path=Path(download.path))
if not fileitem:
logger.info(f"文件 {download.path} 不存在")
continue
transferd = self.transferhis.get_by_src(fileitem.path, storage=fileitem.storage)
transferd = transferhis.get_by_src(fileitem.path, storage=fileitem.storage)
if transferd:
# 已经整理过的文件不再整理
continue
logger.info(f"开始整理文件 {download.path} ...")
state, errmsg = self.transferchain.do_transfer(fileitem, background=False)
state, errmsg = transferchain.do_transfer(fileitem, background=False)
if not state:
_failed_count += 1
logger.error(f"整理文件 {download.path} 失败: {errmsg}")
@@ -112,13 +109,13 @@ class TransferFileAction(BaseAction):
if self.check_cache(workflow_id, cache_key):
logger.info(f"{fileitem.path} 已整理过,跳过")
continue
transferd = self.transferhis.get_by_src(fileitem.path, storage=fileitem.storage)
transferd = transferhis.get_by_src(fileitem.path, storage=fileitem.storage)
if transferd:
# 已经整理过的文件不再整理
continue
logger.info(f"开始整理文件 {fileitem.path} ...")
state, errmsg = self.transferchain.do_transfer(fileitem, background=False,
continue_callback=check_continue)
state, errmsg = transferchain.do_transfer(fileitem, background=False,
continue_callback=check_continue)
if not state:
_failed_count += 1
logger.error(f"整理文件 {fileitem.path} 失败: {errmsg}")

View File

@@ -7,9 +7,9 @@ from app.core.event import eventmanager
from app.core.security import verify_token
from app.schemas import DiscoverSourceEventData
from app.schemas.types import ChainEventType, MediaType
from chain.bangumi import BangumiChain
from chain.douban import DoubanChain
from chain.tmdb import TmdbChain
from app.chain.bangumi import BangumiChain
from app.chain.douban import DoubanChain
from app.chain.tmdb import TmdbChain
router = APIRouter()

View File

@@ -137,7 +137,7 @@ def register_plugin(plugin_id: str):
@router.get("/", summary="所有插件", response_model=List[schemas.Plugin])
def all_plugins(_: schemas.TokenPayload = Depends(get_current_active_superuser),
state: Optional[str] = "all") -> List[schemas.Plugin]:
state: Optional[str] = "all", force: bool = False) -> List[schemas.Plugin]:
"""
查询所有插件清单包括本地插件和在线插件插件状态installed, market, all
"""
@@ -151,7 +151,7 @@ def all_plugins(_: schemas.TokenPayload = Depends(get_current_active_superuser),
# 未安装的本地插件
not_installed_plugins = [plugin for plugin in local_plugins if not plugin.installed]
# 在线插件
online_plugins = PluginManager().get_online_plugins()
online_plugins = PluginManager().get_online_plugins(force)
if not online_plugins:
# 没有获取在线插件
if state == "market":

View File

@@ -225,7 +225,7 @@ def set_env_setting(env: dict,
if failed_updates:
return schemas.Response(
success=False,
message=f"{', '.join(failed_updates.keys())} 配置项更新失败",
message=f"{', '.join([v[1] for v in failed_updates.values()])}",
data={
"success_updates": success_updates,
"failed_updates": failed_updates

View File

@@ -1,5 +1,4 @@
import copy
import gc
import pickle
import traceback
from abc import ABCMeta
@@ -69,10 +68,6 @@ class ChainBase(metaclass=ABCMeta):
pickle.dump(cache, f) # noqa
except Exception as err:
logger.error(f"保存缓存 {filename} 出错:{str(err)}")
finally:
# 主动资源回收
del cache
gc.collect()
@staticmethod
def remove_cache(filename: str) -> None:

View File

@@ -445,19 +445,6 @@ class DownloadChain(ChainBase):
return 9999
return no_exist[season].total_episode
def _calculate_intersection_ratio(episodes_set: set, target_set: set) -> Tuple[float, set]:
"""
计算种子与目标缺失集之间的交集比例。
:param episodes_set (Set[int]): 当前种子的集数集合。
:param target_set (Set[int]): 当前季缺失的集数集合。
:return: Tuple[float, Set[int]]: - 交集比例0~1- 交集集合Set[int]
"""
cal_intersection = episodes_set & target_set
if not cal_intersection:
return 0.0, set()
cal_ratio = len(cal_intersection) / len(episodes_set)
return cal_ratio, cal_intersection
# 发送资源选择事件,允许外部修改上下文数据
logger.debug(f"Initial contexts: {len(contexts)} items, Downloader: {downloader}")
event_data = ResourceSelectionEventData(
@@ -616,8 +603,6 @@ class DownloadChain(ChainBase):
# 缺失整季的转化为缺失集进行比较
if not need_episodes:
need_episodes = list(range(start_episode, total_episode + 1))
# 计算每个种子的集数与缺失集数的交集比例 shaw
torrent_ratios = []
# 循环种子
for context in contexts:
if global_vars.is_system_stopped:
@@ -644,54 +629,24 @@ class DownloadChain(ChainBase):
# 整季的不处理
if not torrent_episodes:
continue
# 计算交集
# 若种子[5-10],[7-10],[9-10] need_episodes=[9,10,11,12,13,14]
# 计算后的交集比例( len(torrent_episodes ∩ need_episodes) / len(torrent_episodes) )分别 0.33 0.66 1.0
ratio, intersection = _calculate_intersection_ratio(torrent_episodes, set(need_episodes))
if ratio <= (settings.EPISODE_INTERSECTION_MIN_CONFIDENCE or 0.05):
# 可以设定阈值
logger.info(
f"{context.meta_info.title} 与当前缺失集数交集比例过低:{ratio:.2%},跳过")
continue
# 收集候选种子
torrent_ratios.append((context, ratio, len(intersection)))
if not torrent_ratios:
continue
# 按交集比例排序
torrent_ratios.sort(key=lambda x: (x[1], x[2]), reverse=True)
# 按排序后的顺序下载
for context, _, _ in torrent_ratios:
if global_vars.is_system_stopped:
break
# 重新计算与当前need_episodes的交集比例
current_episodes = set(context.meta_info.episode_list)
current_ratio, current_intersection = _calculate_intersection_ratio(current_episodes,
set(need_episodes))
if current_ratio <= (settings.EPISODE_INTERSECTION_MIN_CONFIDENCE or 0.05):
# 可以设定阈值
logger.info(
f"{context.meta_info.title} 与当前缺失集数交集比例过低:{current_ratio:.2%},跳过")
continue
# 下载
logger.info(f"开始下载 {context.meta_info.title} ...")
download_id = self.download_single(context, save_path=save_path,
channel=channel, source=source,
userid=userid, username=username,
downloader=downloader)
if download_id:
# 下载成功
logger.info(f"{context.meta_info.title} 添加下载成功")
downloaded_list.append(context)
# 更新仍需集数
need_episodes = __update_episodes(_mid=need_mid,
_need=need_episodes,
_sea=need_season,
_current=current_intersection)
logger.info(f"{need_season} 剩余需要集:{need_episodes}")
# 如果已经没有需要下载的集数,跳出当前循环
if not need_episodes:
break
# 为需要集的子集则下载
if torrent_episodes.issubset(set(need_episodes)):
# 下载
logger.info(f"开始下载 {meta.title} ...")
download_id = self.download_single(context, save_path=save_path,
channel=channel, source=source,
userid=userid, username=username,
downloader=downloader)
if download_id:
# 下载成功
logger.info(f"{meta.title} 添加下载成功")
downloaded_list.append(context)
# 更新仍需集数
need_episodes = __update_episodes(_mid=need_mid,
_need=need_episodes,
_sea=need_season,
_current=torrent_episodes)
logger.info(f"{need_season} 剩余需要集:{need_episodes}")
# 仍然缺失的剧集从整季中选择需要的集数文件下载仅支持QB和TR
if no_exists:
@@ -841,7 +796,6 @@ class DownloadChain(ChainBase):
totals = {}
mediaserver = MediaServerOper()
if mediainfo.type == MediaType.MOVIE:
# 电影
itemid = mediaserver.get_item_id(mtype=mediainfo.type.value,

View File

@@ -11,7 +11,6 @@ from app.core.config import settings
from app.core.context import MediaInfo, Context
from app.core.meta import MetaBase
from app.db.user_oper import UserOper
from app.helper.memory import memory_optimized
from app.helper.torrent import TorrentHelper
from app.log import logger
from app.schemas import Notification, NotExistMediaInfo, CommingMessage
@@ -118,7 +117,6 @@ class MessageChain(ChainBase):
# 处理消息
self.handle_message(channel=channel, source=source, userid=userid, username=username, text=text)
@memory_optimized(force_gc_after=True, log_memory=True)
def handle_message(self, channel: MessageChannel, source: str,
userid: Union[str, int], username: str, text: str) -> None:
"""
@@ -265,10 +263,6 @@ class MessageChain(ChainBase):
userid=userid,
total=len(contexts))
# 清理内存
del cache_list
del cache_data
elif cache_type in ["Subscribe", "ReSubscribe"]:
# 订阅或洗版媒体
mediainfo: MediaInfo = cache_list[_choice]
@@ -357,9 +351,6 @@ class MessageChain(ChainBase):
items=cache_list[start:end],
userid=userid,
total=len(cache_list))
# 清理内存
del cache_list
del cache_data
elif text.lower() == "n":
# 下一页
@@ -396,9 +387,6 @@ class MessageChain(ChainBase):
source=source,
title=_current_meta.name,
items=cache_list, userid=userid, total=total)
# 清理内存
del cache_list
del cache_data
else:
# 搜索或订阅

View File

@@ -12,7 +12,6 @@ from app.core.context import MediaInfo, TorrentInfo
from app.core.event import eventmanager, Event
from app.core.metainfo import MetaInfo
from app.db.systemconfig_oper import SystemConfigOper
from app.helper.memory import memory_optimized
from app.helper.progress import ProgressHelper
from app.helper.sites import SitesHelper
from app.helper.torrent import TorrentHelper
@@ -98,7 +97,6 @@ class SearchChain(ChainBase):
logger.error(f'加载搜索结果失败:{str(e)} - {traceback.format_exc()}')
return []
@memory_optimized(force_gc_after=True, log_memory=True)
def process(self, mediainfo: MediaInfo,
keyword: Optional[str] = None,
no_exists: Dict[int, Dict[int, NotExistMediaInfo]] = None,

View File

@@ -24,7 +24,6 @@ from app.db.models.subscribe import Subscribe
from app.db.site_oper import SiteOper
from app.db.subscribe_oper import SubscribeOper
from app.db.systemconfig_oper import SystemConfigOper
from app.helper.memory import memory_optimized
from app.helper.subscribe import SubscribeHelper
from app.helper.torrent import TorrentHelper
from app.log import logger
@@ -268,7 +267,6 @@ class SubscribeChain(ChainBase):
return True
return False
@memory_optimized(force_gc_after=True, log_memory=True)
def search(self, sid: Optional[int] = None, state: Optional[str] = 'N', manual: Optional[bool] = False):
"""
订阅搜索
@@ -362,6 +360,8 @@ class SubscribeChain(ChainBase):
# 过滤搜索结果
matched_contexts = []
for context in contexts:
if global_vars.is_system_stopped:
break
torrent_meta = context.meta_info
torrent_info = context.torrent_info
torrent_mediainfo = context.media_info
@@ -540,7 +540,6 @@ class SubscribeChain(ChainBase):
return ret_sites
@memory_optimized(force_gc_after=True, log_memory=True)
def match(self, torrents: Dict[str, List[Context]]):
"""
从缓存中匹配订阅,并自动下载
@@ -557,8 +556,12 @@ class SubscribeChain(ChainBase):
# 预识别所有未识别的种子
processed_torrents: Dict[str, List[Context]] = {}
for domain, contexts in torrents.items():
if global_vars.is_system_stopped:
break
processed_torrents[domain] = []
for context in contexts:
if global_vars.is_system_stopped:
break
# 如果种子未识别,尝试识别
if not context.media_info or (not context.media_info.tmdb_id
and not context.media_info.douban_id):
@@ -629,6 +632,8 @@ class SubscribeChain(ChainBase):
continue
logger.debug(f'开始匹配站点:{domain},共缓存了 {len(contexts)} 个种子...')
for context in contexts:
if global_vars.is_system_stopped:
break
# 提取信息
_context = copy.copy(context)
torrent_meta = _context.meta_info
@@ -770,10 +775,6 @@ class SubscribeChain(ChainBase):
torrent_mediainfo.episode_group = subscribe.episode_group
_match_context.append(_context)
# 清理内存
contexts.clear()
del contexts
if not _match_context:
# 未匹配到资源
logger.info(f'{mediainfo.title_year} 未匹配到符合条件的资源')
@@ -874,6 +875,8 @@ class SubscribeChain(ChainBase):
success_count = 0
subscribeoper = SubscribeOper()
for share_sub in share_subs:
if global_vars.is_system_stopped:
break
uid = share_sub.get("share_uid")
if uid and uid in follow_users:
# 订阅已存在则跳过
@@ -1182,6 +1185,9 @@ class SubscribeChain(ChainBase):
new_episodes = list(range(max(start_episode, start), total_episode + 1))
# 与原集列表取交集
episodes = list(set(episode_list).intersection(set(new_episodes)))
# 交集为空时,说明订阅的剧集均已入库
if not episodes:
return True, {}
# 更新集合
no_exists[mediakey][begin_season] = schemas.NotExistMediaInfo(
season=begin_season,

View File

@@ -32,6 +32,8 @@ class SystemChain(ChainBase):
"""
重启系统
"""
from app.core.config import global_vars
if channel and userid:
self.post_message(Notification(channel=channel, source=source,
title="系统正在重启,请耐心等候!", userid=userid))
@@ -40,6 +42,8 @@ class SystemChain(ChainBase):
"channel": channel.value,
"userid": userid
}, self._restart_file)
# 设置停止标志,通知所有模块准备停止
global_vars.stop_system()
# 重启
SystemHelper.restart()

View File

@@ -1,4 +1,3 @@
import gc
import re
import traceback
from typing import Dict, List, Union, Optional
@@ -10,7 +9,6 @@ from app.core.context import TorrentInfo, Context, MediaInfo
from app.core.metainfo import MetaInfo
from app.db.site_oper import SiteOper
from app.db.systemconfig_oper import SystemConfigOper
from app.helper.memory import memory_optimized, clear_large_objects
from app.helper.rss import RssHelper
from app.helper.sites import SitesHelper
from app.helper.torrent import TorrentHelper
@@ -71,7 +69,6 @@ class TorrentsChain(ChainBase):
self.remove_cache(self._rss_file)
logger.info(f'种子缓存数据清理完成')
@memory_optimized(force_gc_after=True, log_memory=True)
def browse(self, domain: str, keyword: Optional[str] = None, cat: Optional[str] = None,
page: Optional[int] = 0) -> List[TorrentInfo]:
"""
@@ -88,7 +85,6 @@ class TorrentsChain(ChainBase):
return []
return self.refresh_torrents(site=site, keyword=keyword, cat=cat, page=page)
@memory_optimized(force_gc_after=True, log_memory=True)
def rss(self, domain: str) -> List[TorrentInfo]:
"""
获取站点RSS内容返回种子清单TTL缓存3分钟
@@ -134,7 +130,6 @@ class TorrentsChain(ChainBase):
return ret_torrents
@memory_optimized(force_gc_after=True, log_memory=True)
def refresh(self, stype: Optional[str] = None, sites: List[int] = None) -> Dict[str, List[Context]]:
"""
刷新站点最新资源,识别并缓存起来
@@ -180,7 +175,7 @@ class TorrentsChain(ChainBase):
# 按pubdate降序排列
torrents.sort(key=lambda x: x.pubdate or '', reverse=True)
# 取前N条
torrents = torrents[:settings.CACHE_CONF["refresh"]]
torrents = torrents[:settings.CONF["refresh"]]
if torrents:
# 过滤出没有处理过的种子 - 优化:使用集合查找,避免重复创建字符串列表
cached_signatures = {f'{t.torrent_info.title}{t.torrent_info.description}'
@@ -220,18 +215,8 @@ class TorrentsChain(ChainBase):
else:
torrents_cache[domain].append(context)
# 如果超过了限制条数则移除掉前面的
if len(torrents_cache[domain]) > settings.CACHE_CONF["torrents"]:
# 优化直接删除旧数据无需重复清理数据进缓存前已经clear过
old_contexts = torrents_cache[domain][:-settings.CACHE_CONF["torrents"]]
torrents_cache[domain] = torrents_cache[domain][-settings.CACHE_CONF["torrents"]:]
# 清理旧对象
clear_large_objects(*old_contexts)
# 优化:清理不再需要的临时变量
del meta, mediainfo, context
# 回收资源
torrents.clear()
del torrents
gc.collect()
if len(torrents_cache[domain]) > settings.CONF["torrents"]:
torrents_cache[domain] = torrents_cache[domain][-settings.CONF["torrents"]:]
else:
logger.info(f'{indexer.get("name")} 没有获取到种子')
@@ -243,17 +228,7 @@ class TorrentsChain(ChainBase):
# 去除不在站点范围内的缓存种子
if sites and torrents_cache:
old_cache = torrents_cache
torrents_cache = {k: v for k, v in torrents_cache.items() if k in domains}
# 清理不再使用的缓存数据数据进缓存前已经clear过无需重复清理
removed_contexts = []
for domain, contexts in old_cache.items():
if domain not in domains:
removed_contexts.extend(contexts)
# 批量清理
if removed_contexts:
clear_large_objects(*removed_contexts)
del old_cache
return torrents_cache

View File

@@ -25,7 +25,6 @@ from app.db.systemconfig_oper import SystemConfigOper
from app.db.transferhistory_oper import TransferHistoryOper
from app.helper.directory import DirectoryHelper
from app.helper.format import FormatParser
from app.helper.memory import memory_optimized
from app.helper.progress import ProgressHelper
from app.log import logger
from app.schemas import TransferInfo, TransferTorrent, Notification, EpisodeFormat, FileItem, TransferDirectoryConf, \
@@ -938,7 +937,6 @@ class TransferChain(ChainBase, metaclass=Singleton):
return trans_items
@memory_optimized(force_gc_after=True, log_memory=True)
def do_transfer(self, fileitem: FileItem,
meta: MetaBase = None, mediainfo: MediaInfo = None,
target_directory: TransferDirectoryConf = None,

View File

@@ -196,7 +196,7 @@ class CacheToolsBackend(CacheBackend):
return None
return region_cache.get(key)
def delete(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION) -> None:
def delete(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION):
"""
删除缓存
@@ -205,7 +205,7 @@ class CacheToolsBackend(CacheBackend):
"""
region_cache = self.__get_region_cache(region)
if region_cache is None:
return None
return
with lock:
del region_cache[key]

View File

@@ -69,8 +69,8 @@ class ConfigModel(BaseModel):
DB_MAX_OVERFLOW: int = 500
# SQLite 的 busy_timeout 参数,默认为 60 秒
DB_TIMEOUT: int = 60
# SQLite 是否启用 WAL 模式,默认关闭
DB_WAL_ENABLE: bool = False
# SQLite 是否启用 WAL 模式,默认开启
DB_WAL_ENABLE: bool = True
# 缓存类型,支持 cachetools 和 redis默认使用 cachetools
CACHE_BACKEND_TYPE: str = "cachetools"
# 缓存连接字符串,仅外部缓存(如 Redis、Memcached需要
@@ -247,6 +247,12 @@ class ConfigModel(BaseModel):
REPO_GITHUB_TOKEN: Optional[str] = None
# 大内存模式
BIG_MEMORY_MODE: bool = False
# 是否启用内存监控
MEMORY_ANALYSIS: bool = False
# 内存快照间隔(分钟)
MEMORY_SNAPSHOT_INTERVAL: int = 60
# 保留的内存快照文件数量
MEMORY_SNAPSHOT_KEEP_COUNT: int = 20
# 全局图片缓存,将媒体图片缓存到本地
GLOBAL_IMAGE_CACHE: bool = False
# 是否启用编码探测的性能模式
@@ -280,8 +286,6 @@ class ConfigModel(BaseModel):
DEFAULT_SUB: Optional[str] = "zh-cn"
# Docker Client API地址
DOCKER_CLIENT_API: Optional[str] = "tcp://127.0.0.1:38379"
# 剧集交集最小置信度 计算后的交集比例( len(torrent_episodes ∩ need_episodes) / len(torrent_episodes) 低于这个阈值表明包含过多不需要的剧集
EPISODE_INTERSECTION_MIN_CONFIDENCE: float = 0.0
class Settings(BaseSettings, ConfigModel, LogConfigModel):
@@ -520,7 +524,7 @@ class Settings(BaseSettings, ConfigModel, LogConfigModel):
return self.CONFIG_PATH / "cookies"
@property
def CACHE_CONF(self):
def CONF(self):
"""
{
"torrents": "缓存种子数量",
@@ -529,7 +533,9 @@ class Settings(BaseSettings, ConfigModel, LogConfigModel):
"douban": "豆瓣请求缓存数量",
"fanart": "Fanart请求缓存数量",
"meta": "元数据缓存过期时间(秒)",
"memory": "最大占用内存MB"
"memory": "最大占用内存MB",
"scheduler": "调度器缓存数量"
"threadpool": "线程池数量"
}
"""
if self.BIG_MEMORY_MODE:
@@ -541,7 +547,8 @@ class Settings(BaseSettings, ConfigModel, LogConfigModel):
"bangumi": 512,
"fanart": 512,
"meta": (self.META_CACHE_EXPIRE or 24) * 3600,
"memory": 2 * 1024
"scheduler": 100,
"threadpool": 100
}
return {
"torrents": 100,
@@ -551,7 +558,8 @@ class Settings(BaseSettings, ConfigModel, LogConfigModel):
"bangumi": 256,
"fanart": 128,
"meta": (self.META_CACHE_EXPIRE or 2) * 3600,
"memory": 1024
"scheduler": 50,
"threadpool": 50
}
@property

View File

@@ -456,6 +456,9 @@ class EventManager(metaclass=Singleton):
elif class_name.endswith("Chain"):
module_name = f"app.chain.{class_name[:-5].lower()}"
module = importlib.import_module(module_name)
elif class_name.endswith("Helper"):
module_name = f"app.helper.{class_name[:-6].lower()}"
module = importlib.import_module(module_name)
else:
module_name = f"app.{class_name.lower()}"
module = importlib.import_module(module_name)
@@ -465,7 +468,7 @@ class EventManager(metaclass=Singleton):
else:
logger.debug(f"事件处理出错:模块 {module_name} 中没有找到类 {class_name}")
except Exception as e:
logger.error(f"事件处理出错:{str(e)} - {traceback.format_exc()}")
logger.debug(f"事件处理出错:{str(e)} - {traceback.format_exc()}")
return None
def __broadcast_consumer_loop(self):

View File

@@ -307,6 +307,13 @@ class PluginManager(metaclass=Singleton):
"""
self.stop(plugin_id)
# 从模块列表中移除插件
from sys import modules
try:
del modules[f"app.plugins.{plugin_id.lower()}"]
except KeyError:
pass
def reload_plugin(self, plugin_id: str):
"""
将一个插件重新加载到内存
@@ -412,13 +419,14 @@ class PluginManager(metaclass=Singleton):
return {k: v for k, v in conf.items() if k}
return {}
def save_plugin_config(self, pid: str, conf: dict) -> bool:
def save_plugin_config(self, pid: str, conf: dict, force: bool = False) -> bool:
"""
保存插件配置
:param pid: 插件ID
:param conf: 配置
:param force: 强制保存
"""
if not self._plugins.get(pid):
if not force and not self._plugins.get(pid):
return False
SystemConfigOper().set(self._config_key % pid, conf)
return True
@@ -462,7 +470,9 @@ class PluginManager(metaclass=Singleton):
}]
"""
ret_commands = []
for plugin_id, plugin in self._running_plugins.items():
# 创建字典快照避免并发修改
running_plugins_snapshot = dict(self._running_plugins)
for plugin_id, plugin in running_plugins_snapshot.items():
if pid and pid != plugin_id:
continue
if hasattr(plugin, "get_command") and ObjectUtils.check_method(plugin.get_command):
@@ -522,7 +532,9 @@ class PluginManager(metaclass=Singleton):
}]
"""
ret_services = []
for plugin_id, plugin in self._running_plugins.items():
# 创建字典快照避免并发修改
running_plugins_snapshot = dict(self._running_plugins)
for plugin_id, plugin in running_plugins_snapshot.items():
if pid and pid != plugin_id:
continue
if hasattr(plugin, "get_service") and ObjectUtils.check_method(plugin.get_service):
@@ -545,7 +557,9 @@ class PluginManager(metaclass=Singleton):
}
"""
ret_modules = {}
for plugin_id, plugin in self._running_plugins.items():
# 创建字典快照避免并发修改
running_plugins_snapshot = dict(self._running_plugins)
for plugin_id, plugin in running_plugins_snapshot.items():
if pid and pid != plugin_id:
continue
if hasattr(plugin, "get_module") and ObjectUtils.check_method(plugin.get_module):
@@ -569,7 +583,9 @@ class PluginManager(metaclass=Singleton):
}]
"""
ret_actions = []
for plugin_id, plugin in self._running_plugins.items():
# 创建字典快照避免并发修改
running_plugins_snapshot = dict(self._running_plugins)
for plugin_id, plugin in running_plugins_snapshot.items():
if pid and pid != plugin_id:
continue
if hasattr(plugin, "get_actions") and ObjectUtils.check_method(plugin.get_actions):
@@ -606,7 +622,9 @@ class PluginManager(metaclass=Singleton):
获取插件联邦组件列表
"""
remotes = []
for plugin_id, plugin in self._running_plugins.items():
# 创建字典快照避免并发修改
running_plugins_snapshot = dict(self._running_plugins)
for plugin_id, plugin in running_plugins_snapshot.items():
if pid and pid != plugin_id:
continue
if hasattr(plugin, "get_render_mode"):
@@ -625,7 +643,9 @@ class PluginManager(metaclass=Singleton):
获取所有插件仪表盘元信息
"""
dashboard_meta = []
for plugin_id, plugin in self._running_plugins.items():
# 创建字典快照避免并发修改
running_plugins_snapshot = dict(self._running_plugins)
for plugin_id, plugin in running_plugins_snapshot.items():
if not hasattr(plugin, "get_dashboard") or not ObjectUtils.check_method(plugin.get_dashboard):
continue
try:
@@ -734,7 +754,7 @@ class PluginManager(metaclass=Singleton):
"""
return list(self._running_plugins.keys())
def get_online_plugins(self) -> List[schemas.Plugin]:
def get_online_plugins(self, force: bool = False) -> List[schemas.Plugin]:
"""
获取所有在线插件信息
"""
@@ -755,12 +775,13 @@ class PluginManager(metaclass=Singleton):
if not m:
continue
# 提交任务获取 v1 版本插件,存储 future 到 version 的映射
base_future = executor.submit(self.get_plugins_from_market, m, None)
base_future = executor.submit(self.get_plugins_from_market, m, None, force)
futures_to_version[base_future] = "base_version"
# 提交任务获取高版本插件(如 v2、v3存储 future 到 version 的映射
if settings.VERSION_FLAG:
higher_version_future = executor.submit(self.get_plugins_from_market, m, settings.VERSION_FLAG)
higher_version_future = executor.submit(self.get_plugins_from_market, m,
settings.VERSION_FLAG, force)
futures_to_version[higher_version_future] = "higher_version"
# 按照完成顺序处理结果
@@ -888,11 +909,13 @@ class PluginManager(metaclass=Singleton):
return False
def get_plugins_from_market(self, market: str,
package_version: Optional[str] = None) -> Optional[List[schemas.Plugin]]:
package_version: Optional[str] = None,
force: bool = False) -> Optional[List[schemas.Plugin]]:
"""
从指定的市场获取插件信息
:param market: 市场的 URL 或标识
:param package_version: 首选插件版本 (如 "v2", "v3"),如果不指定则获取 v1 版本
:param force: 是否强制刷新(忽略缓存)
:return: 返回插件的列表,若获取失败返回 []
"""
if not market:
@@ -900,7 +923,7 @@ class PluginManager(metaclass=Singleton):
# 已安装插件
installed_apps = SystemConfigOper().get(SystemConfigKey.UserInstalledPlugins) or []
# 获取在线插件
online_plugins = PluginHelper().get_plugins(market, package_version)
online_plugins = PluginHelper().get_plugins(market, package_version, force)
if online_plugins is None:
logger.warning(
f"获取{package_version if package_version else ''}插件库失败:{market},请检查 GitHub 网络连接")
@@ -1086,7 +1109,7 @@ class PluginManager(metaclass=Singleton):
success, msg = self._modify_plugin_files(
plugin_dir=clone_plugin_dir,
original_id=plugin_id,
suffix=suffix,
suffix=suffix.lower(),
name=name,
description=description,
version=version,
@@ -1116,7 +1139,7 @@ class PluginManager(metaclass=Singleton):
# 默认禁用分身插件,让用户手动配置
clone_config['enable'] = False
clone_config['enabled'] = False
self.save_plugin_config(clone_id, clone_config)
self.save_plugin_config(clone_id, clone_config, force=True)
logger.info(f"已为分身插件 {clone_id} 设置初始配置")
else:
logger.info(f"原插件 {plugin_id} 没有配置,分身插件 {clone_id} 将使用默认配置")

View File

@@ -236,7 +236,6 @@ class DbOper:
"""
数据库操作基类
"""
_db: Session = None
def __init__(self, db: Session = None):
self._db = db

View File

@@ -9,3 +9,4 @@ from .transferhistory import TransferHistory
from .user import User
from .userconfig import UserConfig
from .workflow import Workflow
from .userrequest import UserRequest

View File

@@ -1,4 +1,5 @@
from typing import Any, Union, Optional
import copy
from typing import Any, Optional, Union
from app.db import DbOper
from app.db.models.systemconfig import SystemConfig
@@ -7,14 +8,15 @@ from app.utils.singleton import Singleton
class SystemConfigOper(DbOper, metaclass=Singleton):
# 配置对象
__SYSTEMCONF: dict = {}
"""
系统配置管理
"""
def __init__(self):
"""
加载配置到内存
"""
super().__init__()
self.__SYSTEMCONF = {}
for item in SystemConfig.list(self._db):
self.__SYSTEMCONF[item.key] = item.value
@@ -52,14 +54,16 @@ class SystemConfigOper(DbOper, metaclass=Singleton):
if isinstance(key, SystemConfigKey):
key = key.value
if not key:
return self.__SYSTEMCONF
return self.__SYSTEMCONF.get(key)
return self.all()
# 避免将__SYSTEMCONF内的值引用出去会导致set时误判没有变动
return copy.deepcopy(self.__SYSTEMCONF.get(key))
def all(self):
"""
获取所有系统设置
"""
return self.__SYSTEMCONF or {}
# 避免将__SYSTEMCONF内的值引用出去会导致set时误判没有变动
return copy.deepcopy(self.__SYSTEMCONF)
def delete(self, key: Union[str, SystemConfigKey]) -> bool:
"""

View File

@@ -7,14 +7,15 @@ from app.utils.singleton import Singleton
class UserConfigOper(DbOper, metaclass=Singleton):
# 配置缓存
__USERCONF: Dict[str, Dict[str, Any]] = {}
"""
用户配置管理
"""
def __init__(self):
"""
加载配置到内存
"""
super().__init__()
self.__USERCONF = {}
for item in UserConfig.list(self._db):
self.__set_config_cache(username=item.username, key=item.key, value=item.value)

View File

@@ -1,8 +1,8 @@
from typing import Callable, Any, Optional
import gc
from playwright.sync_api import sync_playwright, Page
from cf_clearance import sync_cf_retry, sync_stealth
from playwright.sync_api import sync_playwright, Page
from app.log import logger
@@ -67,8 +67,6 @@ class PlaywrightHelper:
context.close()
if browser:
browser.close()
# 强制垃圾回收
gc.collect()
except Exception as e:
logger.error(f"Playwright初始化失败: {str(e)}")
@@ -120,8 +118,6 @@ class PlaywrightHelper:
context.close()
if browser:
browser.close()
# 强制垃圾回收
gc.collect()
except Exception as e:
logger.error(f"Playwright初始化失败: {str(e)}")

View File

@@ -22,39 +22,36 @@ _executor = concurrent.futures.ThreadPoolExecutor()
_doh_timeout = 5
_doh_cache: Dict[str, str] = {}
def _patched_getaddrinfo(host, *args, **kwargs):
"""
socket.getaddrinfo的补丁版本。
"""
if host not in settings.DOH_DOMAINS.split(","):
return _orig_getaddrinfo(host, *args, **kwargs)
# 检查主机是否已解析
if host in _doh_cache:
ip = _doh_cache[host]
logger.info("已解析 [%s] 为 [%s] (缓存)", host, ip)
return _orig_getaddrinfo(ip, *args, **kwargs)
# 使用DoH解析主机
futures = []
for resolver in settings.DOH_RESOLVERS.split(","):
futures.append(_executor.submit(_doh_query, resolver, host))
for future in concurrent.futures.as_completed(futures):
ip = future.result()
if ip is not None:
logger.info("已解析 [%s] 为 [%s]", host, ip)
_doh_cache[host] = ip
host = ip
break
return _orig_getaddrinfo(host, *args, **kwargs)
# 对 socket.getaddrinfo 进行补丁
if settings.DOH_ENABLE:
# 保存原始的 socket.getaddrinfo 方法
_orig_getaddrinfo = socket.getaddrinfo
def _patched_getaddrinfo(host, *args, **kwargs):
"""
socket.getaddrinfo的补丁版本。
"""
if host not in settings.DOH_DOMAINS.split(","):
return _orig_getaddrinfo(host, *args, **kwargs)
# 检查主机是否已解析
if host in _doh_cache:
ip = _doh_cache[host]
logger.info("已解析 [%s] 为 [%s] (缓存)", host, ip)
return _orig_getaddrinfo(ip, *args, **kwargs)
# 使用DoH解析主机
futures = []
for resolver in settings.DOH_RESOLVERS.split(","):
futures.append(_executor.submit(_doh_query, resolver, host))
for future in concurrent.futures.as_completed(futures):
ip = future.result()
if ip is not None:
logger.info("已解析 [%s] 为 [%s]", host, ip)
_doh_cache[host] = ip
host = ip
break
return _orig_getaddrinfo(host, *args, **kwargs)
# 替换 socket.getaddrinfo 方法
socket.getaddrinfo = _patched_getaddrinfo

View File

@@ -1,93 +1,74 @@
import gc
import psutil
import sys
import threading
import time
from typing import Optional, Callable, Any
from functools import wraps
from datetime import datetime
from typing import Optional
import psutil
from pympler import muppy, summary, asizeof
from app.core.config import settings
from app.core.event import eventmanager, Event
from app.log import logger
from app.schemas import ConfigChangeEventData
from app.schemas.types import EventType
from app.utils.singleton import Singleton
class MemoryManager(metaclass=Singleton):
class MemoryHelper(metaclass=Singleton):
"""
内存管理工具类,用于监控和优化内存使用
"""
def __init__(self):
self._memory_threshold = 512 # 内存使用阈值(MB)
self._check_interval = 300 # 检查间隔(秒)
# 检查间隔(秒) - 从配置获取默认5分钟
self._check_interval = settings.MEMORY_SNAPSHOT_INTERVAL * 60
self._monitoring = False
self._monitor_thread: Optional[threading.Thread] = None
@staticmethod
def get_memory_usage() -> dict:
"""
获取当前内存使用情况
"""
process = psutil.Process()
memory_info = process.memory_info()
system_memory = psutil.virtual_memory()
return {
'rss': memory_info.rss / 1024 / 1024, # MB
'vms': memory_info.vms / 1024 / 1024, # MB
'percent': process.memory_percent(),
'system_percent': system_memory.percent,
'system_available': system_memory.available / 1024 / 1024 / 1024 # GB
}
def force_gc(self, generation: Optional[int] = None) -> int:
"""
强制执行垃圾回收
:param generation: 垃圾回收代数None表示所有代数
:return: 回收的对象数量
"""
before_memory = self.get_memory_usage()
logger.info(f"开始强制垃圾回收,当前内存使用: {before_memory['rss']:.2f}MB")
# 内存快照保存目录
self._memory_snapshot_dir = settings.LOG_PATH / "memory_snapshots"
# 保留的快照文件数量
self._keep_count = settings.MEMORY_SNAPSHOT_KEEP_COUNT
if generation is not None:
collected = gc.collect(generation)
else:
collected = gc.collect()
after_memory = self.get_memory_usage()
memory_freed = before_memory['rss'] - after_memory['rss']
if memory_freed > 0:
logger.info(f"垃圾回收完成: 回收对象 {collected} 个, 释放内存 {memory_freed:.2f}MB")
return collected
def check_memory_and_cleanup(self) -> bool:
@eventmanager.register(EventType.ConfigChanged)
def handle_config_changed(self, event: Event):
"""
检查内存使用量,如果超过阈值则执行清理
:return: 是否执行了清理
处理配置变更事件,更新内存监控设置
:param event: 事件对象
"""
memory_info = self.get_memory_usage()
current_memory_mb = memory_info['rss']
if current_memory_mb > self._memory_threshold:
logger.warning(f"内存使用超过阈值: {current_memory_mb:.1f}MB > {self._memory_threshold:.1f}MB, 开始清理...")
self.force_gc()
# 再次检查清理效果
after_memory = self.get_memory_usage()
logger.info(f"清理后内存: {after_memory['rss']:.1f}MB")
return True
return False
if not event:
return
event_data: ConfigChangeEventData = event.event_data
if event_data.key not in ['MEMORY_ANALYSIS', 'MEMORY_SNAPSHOT_INTERVAL', 'MEMORY_SNAPSHOT_KEEP_COUNT']:
return
# 更新配置
if event_data.key == 'MEMORY_SNAPSHOT_INTERVAL':
self._check_interval = settings.MEMORY_SNAPSHOT_INTERVAL * 60
elif event_data.key == 'MEMORY_SNAPSHOT_KEEP_COUNT':
self._keep_count = settings.MEMORY_SNAPSHOT_KEEP_COUNT
self.stop_monitoring()
self.start_monitoring()
def start_monitoring(self):
"""
开始内存监控
"""
if not settings.MEMORY_ANALYSIS:
return
if self._monitoring:
return
# 创建内存快照目录
self._memory_snapshot_dir.mkdir(parents=True, exist_ok=True)
# 初始化内存分析器
self._monitoring = True
self._monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True)
self._monitor_thread.start()
logger.info(f"内存监控已启动 - 阈值: {self._memory_threshold}MB, 检查间隔: {self._check_interval}")
logger.info("内存监控已启动")
def stop_monitoring(self):
"""
停止内存监控
@@ -95,82 +76,382 @@ class MemoryManager(metaclass=Singleton):
self._monitoring = False
if self._monitor_thread:
self._monitor_thread.join(timeout=5)
logger.info("内存监控已停止")
logger.info("内存监控已停止")
def _monitor_loop(self):
"""
内存监控循环
"""
logger.info("内存监控循环开始")
while self._monitoring:
try:
self.check_memory_and_cleanup()
# 生成内存快照
self._create_memory_snapshot()
time.sleep(self._check_interval)
except Exception as e:
logger.error(f"内存监控出错: {e}")
time.sleep(60) # 出错后等待1分钟再继续
def set_threshold(self, threshold_mb: int):
"""
设置内存使用阈值
:param threshold_mb: 内存阈值单位MB500-4096之间
"""
self._memory_threshold = max(512, min(4096, threshold_mb))
logger.info(f"内存阈值已设置为: {self._memory_threshold}MB")
def set_check_interval(self, interval: int):
"""
设置检查间隔
:param interval: 检查间隔单位秒最少60秒
"""
self._check_interval = max(60, interval)
logger.info(f"内存检查间隔已设置为: {self._check_interval}")
def get_threshold(self) -> int:
"""
获取当前内存阈值
:return: 当前阈值(MB)
"""
return self._memory_threshold
# 出错后等待1分钟再继续
time.sleep(60)
logger.info("内存监控循环结束")
def _create_memory_snapshot(self):
"""
创建内存快照并保存到文件
"""
try:
# 获取当前时间戳
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
snapshot_file = self._memory_snapshot_dir / f"memory_snapshot_{timestamp}.txt"
def memory_optimized(force_gc_after: bool = False, log_memory: bool = False):
"""
内存优化装饰器
:param force_gc_after: 函数执行后是否强制垃圾回收
:param log_memory: 是否记录内存使用情况
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs) -> Any:
memory_manager = MemoryManager()
if log_memory:
before_memory = memory_manager.get_memory_usage()
logger.info(f"{func.__name__} 执行前内存: {before_memory['rss']:.1f}MB")
# 获取系统内存使用情况
memory_usage = psutil.Process().memory_info().rss
logger.info(f"开始创建内存快照: {snapshot_file}")
# 第一步:写入基本信息和对象类型统计
self._write_basic_info(snapshot_file, memory_usage)
# 第二步:分析并写入类实例内存使用情况
self._append_class_analysis(snapshot_file)
# 第三步:分析并写入大内存变量详情
self._append_variable_analysis(snapshot_file)
logger.info(f"内存快照已保存: {snapshot_file}, 当前内存使用: {memory_usage / 1024 / 1024:.2f} MB")
# 清理过期的快照文件保留最近30个
self._cleanup_old_snapshots()
except Exception as e:
logger.error(f"创建内存快照失败: {e}")
@staticmethod
def _write_basic_info(snapshot_file, memory_usage):
"""
写入基本信息和对象类型统计
"""
# 获取当前进程的内存使用情况
all_objects = muppy.get_objects()
sum1 = summary.summarize(all_objects)
with open(snapshot_file, 'w', encoding='utf-8') as f:
f.write(f"内存快照时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"当前进程内存使用: {memory_usage / 1024 / 1024:.2f} MB\n")
f.write("=" * 80 + "\n")
f.write("对象类型统计:\n")
f.write("-" * 80 + "\n")
# 写入对象统计信息
for line in summary.format_(sum1):
f.write(line + "\n")
# 立即刷新到磁盘
f.flush()
logger.debug("基本信息已写入快照文件")
def _append_class_analysis(self, snapshot_file):
"""
分析并追加类实例内存使用情况
"""
with open(snapshot_file, 'a', encoding='utf-8') as f:
f.write("\n" + "=" * 80 + "\n")
f.write("类实例内存使用情况 (按内存大小排序):\n")
f.write("-" * 80 + "\n")
f.write("正在分析中...\n")
# 立即刷新,让用户知道这部分开始了
f.flush()
try:
logger.debug("开始分析类实例内存使用情况")
class_objects = self._get_class_memory_usage()
# 重新打开文件,移除"正在分析中..."并写入实际结果
with open(snapshot_file, 'r', encoding='utf-8') as f:
content = f.read()
# 替换"正在分析中..."
content = content.replace("正在分析中...\n", "")
with open(snapshot_file, 'w', encoding='utf-8') as f:
f.write(content)
if class_objects:
# 只显示前100个类
for i, class_info in enumerate(class_objects[:100], 1):
f.write(f"{i:3d}. {class_info['name']:<50} "
f"{class_info['size_mb']:>8.2f} MB ({class_info['count']} 个实例)\n")
else:
f.write("未找到有效的类实例信息\n")
f.flush()
except Exception as e:
logger.error(f"获取类实例信息失败: {e}")
# 即使出错也要更新文件
with open(snapshot_file, 'r', encoding='utf-8') as f:
content = f.read()
content = content.replace("正在分析中...\n", f"获取类实例信息失败: {e}\n")
with open(snapshot_file, 'w', encoding='utf-8') as f:
f.write(content)
f.flush()
logger.debug("类实例分析已完成并写入")
def _append_variable_analysis(self, snapshot_file):
"""
分析并追加大内存变量详情
"""
with open(snapshot_file, 'a', encoding='utf-8') as f:
f.write("\n" + "=" * 80 + "\n")
f.write("大内存变量详情 (前100个):\n")
f.write("-" * 80 + "\n")
f.write("正在分析中...\n")
# 立即刷新,让用户知道这部分开始了
f.flush()
try:
logger.debug("开始分析大内存变量")
large_variables = self._get_large_variables(100)
# 重新打开文件,移除"正在分析中..."并写入实际结果
with open(snapshot_file, 'r', encoding='utf-8') as f:
content = f.read()
# 替换最后的"正在分析中..."
content = content.replace("正在分析中...\n", "")
with open(snapshot_file, 'w', encoding='utf-8') as f:
f.write(content)
if large_variables:
for i, var_info in enumerate(large_variables, 1):
f.write(
f"{i:3d}. {var_info['name']:<30} {var_info['type']:<15} {var_info['size_mb']:>8.2f} MB\n")
else:
f.write("未找到大内存变量\n")
f.flush()
except Exception as e:
logger.error(f"获取大内存变量信息失败: {e}")
# 即使出错也要更新文件
with open(snapshot_file, 'r', encoding='utf-8') as f:
content = f.read()
content = content.replace("正在分析中...\n", f"获取变量信息失败: {e}\n")
with open(snapshot_file, 'w', encoding='utf-8') as f:
f.write(content)
f.flush()
logger.debug("大内存变量分析已完成并写入")
def _cleanup_old_snapshots(self):
"""
清理过期的内存快照文件,只保留最近的指定数量文件
"""
try:
snapshot_files = list(self._memory_snapshot_dir.glob("memory_snapshot_*.txt"))
if len(snapshot_files) > self._keep_count:
# 按修改时间排序,删除最旧的文件
snapshot_files.sort(key=lambda x: x.stat().st_mtime)
for old_file in snapshot_files[:-self._keep_count]:
old_file.unlink()
logger.debug(f"已删除过期内存快照: {old_file}")
except Exception as e:
logger.error(f"清理过期快照失败: {e}")
@staticmethod
def _get_class_memory_usage():
"""
获取所有类实例的内存使用情况,按内存大小排序
"""
class_info = {}
processed_count = 0
error_count = 0
# 获取所有对象
all_objects = muppy.get_objects()
logger.debug(f"开始分析 {len(all_objects)} 个对象的类实例内存使用情况")
for obj in all_objects:
try:
result = func(*args, **kwargs)
return result
finally:
if force_gc_after:
memory_manager.force_gc()
# 跳过类对象本身,统计类的实例
if isinstance(obj, type):
continue
# 获取对象的类名 - 这里可能会出错
obj_class = type(obj)
# 安全地获取类名
try:
if hasattr(obj_class, '__module__') and hasattr(obj_class, '__name__'):
class_name = f"{obj_class.__module__}.{obj_class.__name__}"
else:
class_name = str(obj_class)
except Exception as e:
# 如果获取类名失败,使用简单的类型描述
class_name = f"<unknown_class_{id(obj_class)}>"
logger.debug(f"获取类名失败: {e}")
# 计算对象本身的内存使用(不包括引用对象,避免重复计算)
size_bytes = sys.getsizeof(obj)
if size_bytes < 100: # 跳过太小的对象
continue
size_mb = size_bytes / 1024 / 1024
processed_count += 1
if class_name in class_info:
class_info[class_name]['size_mb'] += size_mb
class_info[class_name]['count'] += 1
else:
class_info[class_name] = {
'name': class_name,
'size_mb': size_mb,
'count': 1
}
except Exception as e:
# 捕获所有可能的异常包括SQLAlchemy、ORM等框架的异常
error_count += 1
if error_count <= 5: # 只记录前5个错误避免日志过多
logger.debug(f"分析对象时出错: {e}")
continue
logger.debug(f"类实例分析完成: 处理了 {processed_count} 个对象, 遇到 {error_count} 个错误")
# 按内存大小排序
sorted_classes = sorted(class_info.values(), key=lambda x: x['size_mb'], reverse=True)
return sorted_classes
def _get_large_variables(self, limit=100):
"""
获取大内存变量信息,按内存大小排序
使用已计算对象集合避免重复计算
"""
large_vars = []
processed_count = 0
calculated_objects = set() # 避免重复计算
# 获取所有对象
all_objects = muppy.get_objects()
logger.debug(f"开始分析 {len(all_objects)} 个对象的内存使用情况")
for obj in all_objects:
# 跳过类对象
if isinstance(obj, type):
continue
# 跳过已经计算过的对象
obj_id = id(obj)
if obj_id in calculated_objects:
continue
try:
# 首先使用 sys.getsizeof 快速筛选
shallow_size = sys.getsizeof(obj)
if shallow_size < 1024: # 只处理大于1KB的对象
continue
# 对于较大的对象,使用 asizeof 进行深度计算
size_bytes = asizeof.asizeof(obj)
if log_memory:
after_memory = memory_manager.get_memory_usage()
logger.info(f"{func.__name__} 执行后内存: {after_memory['rss']:.1f}MB")
return wrapper
return decorator
# 只处理大于10KB的对象提高分析效率
if size_bytes < 10240:
continue
size_mb = size_bytes / 1024 / 1024
processed_count += 1
calculated_objects.add(obj_id)
def clear_large_objects(*objects):
"""
清理大型对象的辅助函数
"""
for obj in objects:
if hasattr(obj, 'clear') and callable(obj.clear):
obj.clear()
elif hasattr(obj, '__dict__'):
obj.__dict__.clear()
del obj
gc.collect()
# 获取对象信息
var_info = self._get_variable_info(obj, size_mb)
if var_info:
large_vars.append(var_info)
# 如果已经找到足够多的大对象,可以提前结束
if len(large_vars) >= limit * 2: # 多收集一些,后面排序筛选
break
except Exception as e:
# 更广泛的异常捕获
logger.debug(f"分析对象失败: {e}")
continue
logger.debug(f"处理了 {processed_count} 个大对象,找到 {len(large_vars)} 个有效变量")
# 按内存大小排序并返回前N个
large_vars.sort(key=lambda x: x['size_mb'], reverse=True)
return large_vars[:limit]
def _get_variable_info(self, obj, size_mb):
"""
获取变量的描述信息
"""
try:
obj_type = type(obj).__name__
# 尝试获取变量名
var_name = self._get_variable_name(obj)
# 生成描述性信息
if isinstance(obj, dict):
key_count = len(obj)
if key_count > 0:
sample_keys = list(obj.keys())[:3]
var_name += f" ({key_count}项, 键: {sample_keys})"
elif isinstance(obj, (list, tuple, set)):
var_name += f" ({len(obj)}个元素)"
elif isinstance(obj, str):
if len(obj) > 50:
var_name += f" (长度: {len(obj)}, 内容: '{obj[:50]}...')"
else:
var_name += f" ('{obj}')"
elif hasattr(obj, '__class__') and hasattr(obj.__class__, '__name__'):
if hasattr(obj, '__dict__'):
attr_count = len(obj.__dict__)
var_name += f" ({attr_count}个属性)"
return {
'name': var_name,
'type': obj_type,
'size_mb': size_mb
}
except Exception as e:
logger.debug(f"获取变量信息失败: {e}")
return None
@staticmethod
def _get_variable_name(obj):
"""
尝试获取变量名
"""
try:
# 尝试通过gc获取引用该对象的变量名
referrers = gc.get_referrers(obj)
for referrer in referrers:
if isinstance(referrer, dict):
# 检查是否在某个模块的全局变量中
for name, value in referrer.items():
if value is obj and isinstance(name, str):
return name
elif hasattr(referrer, '__dict__'):
# 检查是否在某个实例的属性中
for name, value in referrer.__dict__.items():
if value is obj and isinstance(name, str):
return f"{type(referrer).__name__}.{name}"
# 如果找不到变量名返回对象类型和id
return f"{type(obj).__name__}_{id(obj)}"
except Exception as e:
logger.debug(f"获取变量名失败: {e}")
return f"{type(obj).__name__}_{id(obj)}"

View File

@@ -41,12 +41,35 @@ class PluginHelper(metaclass=Singleton):
if self.install_report():
self.systemconfig.set(SystemConfigKey.PluginInstallReport, "1")
@cached(maxsize=64, ttl=1800)
def get_plugins(self, repo_url: str, package_version: Optional[str] = None) -> Optional[Dict[str, dict]]:
def get_plugins(self, repo_url: str, package_version: Optional[str] = None,
force: bool = False) -> Optional[Dict[str, dict]]:
"""
获取Github所有最新插件列表
:param repo_url: Github仓库地址
:param package_version: 首选插件版本 (如 "v2", "v3"),如果不指定则获取 v1 版本
:param force: 是否强制刷新,忽略缓存
"""
# 如果强制刷新,直接调用不带缓存的版本
if force:
return self._get_plugins_uncached(repo_url, package_version)
# 正常情况下调用带缓存的版本
return self._get_plugins_cached(repo_url, package_version)
@cached(maxsize=64, ttl=1800)
def _get_plugins_cached(self, repo_url: str, package_version: Optional[str] = None) -> Optional[Dict[str, dict]]:
"""
获取Github所有最新插件列表使用缓存
:param repo_url: Github仓库地址
:param package_version: 首选插件版本 (如 "v2", "v3"),如果不指定则获取 v1 版本
"""
return self._get_plugins_uncached(repo_url, package_version)
def _get_plugins_uncached(self, repo_url: str, package_version: Optional[str] = None) -> Optional[Dict[str, dict]]:
"""
获取Github所有最新插件列表不使用缓存
:param repo_url: Github仓库地址
:param package_version: 首选插件版本 (如 "v2", "v3"),如果不指定则获取 v1 版本
"""
if not repo_url:
return None

View File

@@ -1,4 +1,3 @@
import gc
import re
import traceback
from typing import List, Tuple, Union, Optional
@@ -18,11 +17,11 @@ class RssHelper:
"""
RSS帮助类解析RSS报文、获取RSS地址等
"""
# RSS解析限制配置
MAX_RSS_SIZE = 50 * 1024 * 1024 # 50MB最大RSS文件大小
MAX_RSS_ITEMS = 1000 # 最大解析条目数
# 各站点RSS链接获取配置
rss_link_conf = {
"default": {
@@ -228,7 +227,8 @@ class RssHelper:
},
}
def parse(self, url, proxy: bool = False, timeout: Optional[int] = 15, headers: dict = None) -> Union[List[dict], None, bool]:
def parse(self, url, proxy: bool = False,
timeout: Optional[int] = 15, headers: dict = None) -> Union[List[dict], None, bool]:
"""
解析RSS订阅URL获取RSS中的种子信息
:param url: RSS地址
@@ -241,7 +241,7 @@ class RssHelper:
ret_array: list = []
if not url:
return False
try:
ret = RequestUtils(proxies=settings.PROXY if proxy else None,
timeout=timeout, headers=headers).get_res(url)
@@ -250,7 +250,7 @@ class RssHelper:
except Exception as err:
logger.error(f"获取RSS失败{str(err)} - {traceback.format_exc()}")
return False
if ret:
ret_xml = None
root = None
@@ -258,9 +258,9 @@ class RssHelper:
# 检查响应大小避免处理过大的RSS文件
raw_data = ret.content
if raw_data and len(raw_data) > self.MAX_RSS_SIZE:
logger.warning(f"RSS文件过大: {len(raw_data)/1024/1024:.1f}MB跳过解析")
logger.warning(f"RSS文件过大: {len(raw_data) / 1024 / 1024:.1f}MB跳过解析")
return False
if raw_data:
try:
result = chardet.detect(raw_data)
@@ -279,7 +279,7 @@ class RssHelper:
ret.encoding = ret.apparent_encoding
if not ret_xml:
ret_xml = ret.text
# 使用lxml.etree解析XML
parser = None
try:
@@ -307,42 +307,39 @@ class RssHelper:
finally:
if parser is not None:
del parser
if root is None:
logger.error("无法解析RSS内容")
return False
# 查找所有item或entry节点
items = root.xpath('.//item | .//entry')
# 限制处理的条目数量
items_count = min(len(items), self.MAX_RSS_ITEMS)
if len(items) > self.MAX_RSS_ITEMS:
logger.warning(f"RSS条目过多: {len(items)},仅处理前{self.MAX_RSS_ITEMS}")
for i, item in enumerate(items[:items_count]):
for item in items[:items_count]:
try:
# 定期执行垃圾回收
if i > 0 and i % 100 == 0:
gc.collect()
# 使用xpath提取信息更高效
title_nodes = item.xpath('.//title')
title = title_nodes[0].text if title_nodes and title_nodes[0].text else ""
if not title:
continue
# 描述
desc_nodes = item.xpath('.//description | .//summary')
description = desc_nodes[0].text if desc_nodes and desc_nodes[0].text else ""
# 种子页面
link_nodes = item.xpath('.//link')
if link_nodes:
link = link_nodes[0].text if hasattr(link_nodes[0], 'text') and link_nodes[0].text else link_nodes[0].get('href', '')
link = link_nodes[0].text if hasattr(link_nodes[0], 'text') and link_nodes[0].text else \
link_nodes[0].get('href', '')
else:
link = ""
# 种子链接
enclosure_nodes = item.xpath('.//enclosure')
enclosure = enclosure_nodes[0].get('url', '') if enclosure_nodes else ""
@@ -351,24 +348,24 @@ class RssHelper:
# 部分RSS只有link没有enclosure
if not enclosure and link:
enclosure = link
# 大小
size = 0
if enclosure_nodes:
size_attr = enclosure_nodes[0].get('length', '0')
if size_attr and str(size_attr).isdigit():
size = int(size_attr)
# 发布日期
pubdate_nodes = item.xpath('.//pubDate | .//published | .//updated')
pubdate = ""
if pubdate_nodes and pubdate_nodes[0].text:
pubdate = StringUtils.get_time(pubdate_nodes[0].text)
# 获取豆瓣昵称
nickname_nodes = item.xpath('.//*[local-name()="creator"]')
nickname = nickname_nodes[0].text if nickname_nodes and nickname_nodes[0].text else ""
# 返回对象
tmp_dict = {
'title': title,
@@ -382,11 +379,11 @@ class RssHelper:
if nickname:
tmp_dict['nickname'] = nickname
ret_array.append(tmp_dict)
except Exception as e1:
logger.debug(f"解析RSS条目失败{str(e1)} - {traceback.format_exc()}")
continue
except Exception as e2:
logger.error(f"解析RSS失败{str(e2)} - {traceback.format_exc()}")
# RSS过期检查
@@ -403,8 +400,7 @@ class RssHelper:
del root
if ret_xml is not None:
del ret_xml
gc.collect()
return ret_array
def get_rss_link(self, url: str, cookie: str, ua: str, proxy: bool = False) -> Tuple[str, str]:
@@ -446,7 +442,7 @@ class RssHelper:
return "", f"获取 {url} RSS链接失败错误码{res.status_code},错误原因:{res.reason}"
else:
return "", f"获取RSS链接失败无法连接 {url} "
# 解析HTML
if html_text:
html = None
@@ -459,7 +455,7 @@ class RssHelper:
finally:
if html is not None:
del html
return "", f"获取RSS链接失败{url}"
except Exception as e:
return "", f"获取 {url} RSS链接失败{str(e)}"

View File

@@ -1,9 +1,15 @@
import os
import signal
from pathlib import Path
from typing import Tuple
import docker
from app.core.config import settings
from app.core.event import eventmanager, Event
from app.log import logger
from app.schemas import ConfigChangeEventData
from app.schemas.types import EventType
from app.utils.system import SystemUtils
@@ -12,6 +18,22 @@ class SystemHelper:
系统工具类,提供系统相关的操作和判断
"""
__system_flag_file = "/var/log/nginx/__moviepilot__"
@eventmanager.register(EventType.ConfigChanged)
def handle_config_changed(self, event: Event):
"""
处理配置变更事件,更新日志设置
:param event: 事件对象
"""
if not event:
return
event_data: ConfigChangeEventData = event.event_data
if event_data.key not in ['DEBUG', 'LOG_LEVEL', 'LOG_MAX_FILE_SIZE', 'LOG_BACKUP_COUNT',
'LOG_FILE_FORMAT', 'LOG_CONSOLE_FORMAT']:
return
logger.update_loggers()
@staticmethod
def can_restart() -> bool:
"""
@@ -23,17 +45,12 @@ class SystemHelper:
)
@staticmethod
def restart() -> Tuple[bool, str]:
def _get_container_id() -> str:
"""
执行Docker重启操作
获取当前容器ID
"""
if not SystemUtils.is_docker():
return False, "非Docker环境无法重启"
container_id = None
try:
# 创建 Docker 客户端
client = docker.DockerClient(base_url=settings.DOCKER_CLIENT_API)
# 获取当前容器的 ID
container_id = None
with open("/proc/self/mountinfo", "r") as f:
data = f.read()
index_resolv_conf = data.find("resolv.conf")
@@ -49,11 +66,100 @@ class SystemHelper:
data.rfind("/", 0, index_second_slash) + 1
)
container_id = data[index_first_slash:index_second_slash]
except Exception as e:
logger.debug(f"获取容器ID失败: {str(e)}")
return container_id.strip() if container_id else None
@staticmethod
def _check_restart_policy() -> bool:
"""
检查当前容器是否配置了自动重启策略
"""
try:
# 获取当前容器ID
container_id = SystemHelper._get_container_id()
if not container_id:
return False
# 创建 Docker 客户端
client = docker.DockerClient(base_url=settings.DOCKER_CLIENT_API)
# 获取容器信息
container = client.containers.get(container_id)
restart_policy = container.attrs.get('HostConfig', {}).get('RestartPolicy', {})
policy_name = restart_policy.get('Name', 'no')
# 检查是否有有效的重启策略
auto_restart_policies = ['always', 'unless-stopped', 'on-failure']
has_restart_policy = policy_name in auto_restart_policies
logger.info(f"容器重启策略: {policy_name}, 支持自动重启: {has_restart_policy}")
return has_restart_policy
except Exception as e:
logger.warning(f"检查重启策略失败: {str(e)}")
return False
@staticmethod
def restart() -> Tuple[bool, str]:
"""
执行Docker重启操作
"""
if not SystemUtils.is_docker():
return False, "非Docker环境无法重启"
try:
# 检查容器是否配置了自动重启策略
has_restart_policy = SystemHelper._check_restart_policy()
if has_restart_policy:
# 有重启策略,使用优雅退出方式
logger.info("检测到容器配置了自动重启策略,使用优雅重启方式...")
# 发送SIGTERM信号给当前进程触发优雅停止
os.kill(os.getpid(), signal.SIGTERM)
return True, ""
else:
# 没有重启策略使用Docker API强制重启
logger.info("容器未配置自动重启策略使用Docker API重启...")
return SystemHelper._docker_api_restart()
except Exception as err:
logger.error(f"重启失败: {str(err)}")
# 降级为Docker API重启
logger.warning("降级为Docker API重启...")
return SystemHelper._docker_api_restart()
@staticmethod
def _docker_api_restart() -> Tuple[bool, str]:
"""
使用Docker API重启容器并尝试优雅停止
"""
try:
# 创建 Docker 客户端
client = docker.DockerClient(base_url=settings.DOCKER_CLIENT_API)
container_id = SystemHelper._get_container_id()
if not container_id:
return False, "获取容器ID失败"
# 重启当前容器
client.containers.get(container_id.strip()).restart()
# 重启容器
client.containers.get(container_id).restart()
return True, ""
except Exception as err:
print(str(err))
return False, f"重启时发生错误:{str(err)}"
except Exception as docker_err:
return False, f"重启时发生错误:{str(docker_err)}"
def set_system_modified(self):
"""
设置系统已修改标志
"""
try:
if SystemUtils.is_docker():
Path(self.__system_flag_file).touch(exist_ok=True)
except Exception as e:
print(f"设置系统修改标志失败: {str(e)}")
def is_system_reset(self) -> bool:
"""
检查系统是否已被重置
:return: 如果系统已重置,返回 True否则返回 False
"""
if SystemUtils.is_docker():
return not Path(self.__system_flag_file).exists()
return False

View File

@@ -2,14 +2,15 @@ from concurrent.futures import ThreadPoolExecutor
from typing import Optional
from app.utils.singleton import Singleton
from app.core.config import settings
class ThreadHelper(metaclass=Singleton):
"""
线程池管理
"""
def __init__(self, max_workers: Optional[int] = 50):
self.pool = ThreadPoolExecutor(max_workers=max_workers)
def __init__(self):
self.pool = ThreadPoolExecutor(max_workers=settings.CONF['threadpool'])
def submit(self, func, *args, **kwargs):
"""

View File

@@ -47,7 +47,7 @@ class WallpaperHelper(metaclass=Singleton):
"""
return TmdbChain().get_random_wallpager()
@cached(maxsize=1, ttl=3600)
@cached(maxsize=1, ttl=3600, skip_empty=True)
def get_tmdb_wallpapers(self, num: int = 10) -> List[str]:
"""
获取7天的TMDB每日壁纸
@@ -71,7 +71,7 @@ class WallpaperHelper(metaclass=Singleton):
print(str(err))
return None
@cached(maxsize=1, ttl=3600)
@cached(maxsize=1, ttl=3600, skip_empty=True)
def get_bing_wallpapers(self, num: int = 7) -> List[str]:
"""
获取7天的Bing每日壁纸
@@ -94,7 +94,7 @@ class WallpaperHelper(metaclass=Singleton):
"""
return MediaServerChain().get_latest_wallpaper()
@cached(maxsize=1, ttl=3600)
@cached(maxsize=1, ttl=3600, skip_empty=True)
def get_mediaserver_wallpapers(self, num: int = 10) -> List[str]:
"""
获取媒体服务器壁纸列表
@@ -111,7 +111,7 @@ class WallpaperHelper(metaclass=Singleton):
return wallpaper_list[0]
return None
@cached(maxsize=1, ttl=3600)
@cached(maxsize=1, ttl=3600, skip_empty=True)
def get_customize_wallpapers(self) -> List[str]:
"""
获取自定义壁纸api壁纸

View File

@@ -99,6 +99,24 @@ class LoggerManager:
# 线程锁
_lock = threading.Lock()
def get_logger(self, name: str) -> logging.Logger:
"""
获取一个指定名称的、独立的日志记录器。
创建一个独立的日志文件,例如 'diag_memory.log'
:param name: 日志记录器的名称,也将用作文件名。
:return: 一个配置好的 logging.Logger 实例。
"""
# 使用名称作为日志文件名
logfile = f"{name}.log"
with LoggerManager._lock:
# 检查是否已经创建过这个 logger
_logger = self._loggers.get(logfile)
if not _logger:
# 如果没有,就使用现有的 __setup_logger 来创建一个新的
_logger = self.__setup_logger(log_file=logfile)
self._loggers[logfile] = _logger
return _logger
@staticmethod
def __get_caller():
"""

View File

@@ -1,5 +1,6 @@
import multiprocessing
import os
import signal
import sys
import threading
@@ -21,7 +22,7 @@ from app.db.init import init_db, update_db
# uvicorn服务
Server = uvicorn.Server(Config(app, host=settings.HOST, port=settings.PORT,
reload=settings.DEV, workers=multiprocessing.cpu_count(),
timeout_graceful_shutdown=5))
timeout_graceful_shutdown=60))
def start_tray():
@@ -70,7 +71,19 @@ def start_tray():
threading.Thread(target=TrayIcon.run, daemon=True).start()
def signal_handler(signum, frame):
"""
信号处理函数,用于优雅停止服务
"""
print(f"收到信号 {signum},开始优雅停止服务...")
Server.should_exit = True
if __name__ == '__main__':
# 注册信号处理器
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
# 启动托盘
start_tray()
# 初始化数据库

View File

@@ -30,7 +30,7 @@ class BangumiApi(object):
self._session = requests.Session()
self._req = RequestUtils(session=self._session)
@cached(maxsize=settings.CACHE_CONF["bangumi"], ttl=settings.CACHE_CONF["meta"])
@cached(maxsize=settings.CONF["bangumi"], ttl=settings.CONF["meta"])
def __invoke(self, url, key: Optional[str] = None, **kwargs):
req_url = self._base_url + url
params = {}

View File

@@ -171,14 +171,14 @@ class DoubanApi(metaclass=Singleton):
).digest()
).decode()
@cached(maxsize=settings.CACHE_CONF["douban"], ttl=settings.CACHE_CONF["meta"])
@cached(maxsize=settings.CONF["douban"], ttl=settings.CONF["meta"])
def __invoke_recommend(self, url: str, **kwargs) -> dict:
"""
推荐/发现类API
"""
return self.__invoke(url, **kwargs)
@cached(maxsize=settings.CACHE_CONF["douban"], ttl=settings.CACHE_CONF["meta"])
@cached(maxsize=settings.CONF["douban"], ttl=settings.CONF["meta"])
def __invoke_search(self, url: str, **kwargs) -> dict:
"""
搜索类API
@@ -213,7 +213,7 @@ class DoubanApi(metaclass=Singleton):
return resp.json()
return resp.json() if resp else {}
@cached(maxsize=settings.CACHE_CONF["douban"], ttl=settings.CACHE_CONF["meta"])
@cached(maxsize=settings.CONF["douban"], ttl=settings.CONF["meta"])
def __post(self, url: str, **kwargs) -> dict:
"""
POST请求

View File

@@ -16,7 +16,7 @@ from app.schemas.types import MediaType
lock = RLock()
CACHE_EXPIRE_TIMESTAMP_STR = "cache_expire_timestamp"
EXPIRE_TIMESTAMP = settings.CACHE_CONF["meta"]
EXPIRE_TIMESTAMP = settings.CONF["meta"]
class DoubanCache(metaclass=Singleton):

View File

@@ -420,7 +420,7 @@ class FanartModule(_ModuleBase):
return result
@classmethod
@cached(maxsize=settings.CACHE_CONF["fanart"], ttl=settings.CACHE_CONF["meta"])
@cached(maxsize=settings.CONF["fanart"], ttl=settings.CONF["meta"])
def __request_fanart(cls, media_type: MediaType, queryid: Union[str, int]) -> Optional[dict]:
if media_type == MediaType.MOVIE:
image_url = cls._movie_url % queryid

View File

@@ -15,7 +15,7 @@ from app.schemas.types import MediaType
lock = RLock()
CACHE_EXPIRE_TIMESTAMP_STR = "cache_expire_timestamp"
EXPIRE_TIMESTAMP = settings.CACHE_CONF["meta"]
EXPIRE_TIMESTAMP = settings.CONF["meta"]
class TmdbCache(metaclass=Singleton):

View File

@@ -500,7 +500,7 @@ class TmdbApi:
return ret_info
@cached(maxsize=settings.CACHE_CONF["tmdb"], ttl=settings.CACHE_CONF["meta"])
@cached(maxsize=settings.CONF["tmdb"], ttl=settings.CONF["meta"])
@rate_limit_exponential(source="match_tmdb_web", base_wait=5, max_wait=1800, enable_logging=True)
def match_web(self, name: str, mtype: MediaType) -> Optional[dict]:
"""

View File

@@ -124,7 +124,7 @@ class TMDb(object):
def cache(self, cache):
self._cache_enabled = bool(cache)
@cached(maxsize=settings.CACHE_CONF["tmdb"], ttl=settings.CACHE_CONF["meta"])
@cached(maxsize=settings.CONF["tmdb"], ttl=settings.CONF["meta"])
def cached_request(self, method, url, data, json,
_ts=datetime.strftime(datetime.now(), '%Y%m%d')):
"""

View File

@@ -166,7 +166,7 @@ class Scheduler(metaclass=Singleton):
# 创建定时服务
self._scheduler = BackgroundScheduler(timezone=settings.TZ,
executors={
'default': ThreadPoolExecutor(100)
'default': ThreadPoolExecutor(settings.CONF['scheduler'])
})
# CookieCloud定时同步
@@ -323,7 +323,7 @@ class Scheduler(metaclass=Singleton):
"interval",
id="clear_cache",
name="缓存清理",
hours=settings.CACHE_CONF["meta"] / 3600,
hours=settings.CONF["meta"] / 3600,
kwargs={
'job_id': 'clear_cache'
}

View File

@@ -4,18 +4,18 @@ from contextlib import asynccontextmanager
from fastapi import FastAPI
from app.chain.system import SystemChain
from app.core.config import global_vars
from app.startup.command_initializer import init_command, stop_command, restart_command
from app.startup.memory_initializer import init_memory_manager, stop_memory_manager
from app.startup.modules_initializer import init_modules, stop_modules
from app.startup.monitor_initializer import stop_monitor, init_monitor
from app.startup.plugins_initializer import init_plugins, stop_plugins, sync_plugins
from app.startup.plugins_initializer import init_plugins, stop_plugins, sync_plugins, backup_plugins, restore_plugins
from app.startup.routers_initializer import init_routers
from app.startup.scheduler_initializer import stop_scheduler, init_scheduler, init_plugin_scheduler
from app.startup.workflow_initializer import init_workflow, stop_workflow
from app.helper.system import SystemHelper
async def init_plugin_system():
async def init_extra():
"""
同步插件及重启相关依赖服务
"""
@@ -24,6 +24,8 @@ async def init_plugin_system():
init_plugin_scheduler()
# 重新注册命令
restart_command()
# 设置系统已修改标志
SystemHelper().set_system_modified()
# 重启完成
SystemChain().restart_finish()
@@ -38,6 +40,8 @@ async def lifespan(app: FastAPI):
init_modules()
# 初始化路由
init_routers(app)
# 恢复插件备份
restore_plugins()
# 初始化插件
init_plugins()
# 初始化定时器
@@ -51,14 +55,12 @@ async def lifespan(app: FastAPI):
# 初始化内存管理
init_memory_manager()
# 插件同步到本地
sync_plugins_task = asyncio.create_task(init_plugin_system())
sync_plugins_task = asyncio.create_task(init_extra())
try:
# 在此处 yield表示应用已经启动控制权交回 FastAPI 主事件循环
yield
finally:
print("Shutting down...")
# 停止信号
global_vars.stop_system()
# 取消同步插件任务
try:
sync_plugins_task.cancel()
@@ -67,6 +69,8 @@ async def lifespan(app: FastAPI):
pass
except Exception as e:
print(str(e))
# 备份插件
backup_plugins()
# 停止内存管理器
stop_memory_manager()
# 停止工作流

View File

@@ -1,19 +1,15 @@
from app.core.config import settings
from app.helper.memory import MemoryManager
from app.helper.memory import MemoryHelper
def init_memory_manager():
"""
初始化内存监控器
"""
memory_manager = MemoryManager()
# 设置内存阈值和启动监控
memory_manager.set_threshold(settings.CACHE_CONF['memory'])
memory_manager.start_monitoring()
MemoryHelper().start_monitoring()
def stop_memory_manager():
"""
停止内存监控器
"""
MemoryManager().stop_monitoring()
MemoryHelper().stop_monitoring()

View File

@@ -1,7 +1,11 @@
import asyncio
import shutil
from app.core.config import settings
from app.core.plugin import PluginManager
from app.log import logger
from app.utils.system import SystemUtils
from app.helper.system import SystemHelper
async def sync_plugins() -> bool:
@@ -75,3 +79,106 @@ def stop_plugins():
plugin_manager.stop_monitor()
except Exception as e:
logger.error(f"停止插件时发生错误:{e}", exc_info=True)
def backup_plugins():
"""
备份插件到用户配置目录仅docker环境
"""
# 非docker环境不处理
if not SystemUtils.is_docker():
return
try:
# 使用绝对路径确保准确性
plugins_dir = settings.ROOT_PATH / "app" / "plugins"
backup_dir = settings.CONFIG_PATH / "plugins_backup"
if not plugins_dir.exists():
logger.info("插件目录不存在,跳过备份")
return
# 确保备份目录存在
backup_dir.mkdir(parents=True, exist_ok=True)
# 需要排除的文件和目录
exclude_items = {"__init__.py", "__pycache__", ".DS_Store"}
# 遍历插件目录,备份除排除项外的所有内容
for item in plugins_dir.iterdir():
if item.name in exclude_items:
continue
target_path = backup_dir / item.name
# 如果是目录
if item.is_dir():
if target_path.exists():
shutil.rmtree(target_path)
shutil.copytree(item, target_path)
logger.info(f"已备份插件目录: {item.name}")
# 如果是文件
elif item.is_file():
shutil.copy2(item, target_path)
logger.info(f"已备份插件文件: {item.name}")
logger.info(f"插件备份完成,备份位置: {backup_dir}")
except Exception as e:
logger.error(f"插件备份失败: {str(e)}")
def restore_plugins():
"""
从备份恢复插件到app/plugins目录恢复完成后删除备份仅docker环境
"""
# 非docker环境不处理
if not SystemUtils.is_docker():
return
try:
# 使用绝对路径确保准确性
plugins_dir = settings.ROOT_PATH / "app" / "plugins"
backup_dir = settings.CONFIG_PATH / "plugins_backup"
if not backup_dir.exists():
logger.info("插件备份目录不存在,跳过恢复")
return
# 系统被重置才恢复插件
if SystemHelper().is_system_reset():
# 确保插件目录存在
plugins_dir.mkdir(parents=True, exist_ok=True)
# 遍历备份目录,恢复所有内容
restored_count = 0
for item in backup_dir.iterdir():
target_path = plugins_dir / item.name
# 如果是目录,且目录内有内容
if item.is_dir() and any(item.iterdir()):
if target_path.exists():
shutil.rmtree(target_path)
shutil.copytree(item, target_path)
logger.info(f"已恢复插件目录: {item.name}")
restored_count += 1
# 如果是文件
elif item.is_file():
shutil.copy2(item, target_path)
logger.info(f"已恢复插件文件: {item.name}")
restored_count += 1
logger.info(f"插件恢复完成,共恢复 {restored_count} 个项目")
# 删除备份目录
try:
shutil.rmtree(backup_dir)
logger.info(f"已删除插件备份目录: {backup_dir}")
except Exception as e:
logger.warning(f"删除备份目录失败: {str(e)}")
except Exception as e:
logger.error(f"插件恢复失败: {str(e)}")

View File

@@ -17,6 +17,9 @@ from app import schemas
class SystemUtils:
"""
系统工具类,提供系统相关的操作和信息获取方法。
"""
@staticmethod
def execute(cmd: str) -> str:
@@ -439,7 +442,7 @@ class SystemUtils:
current_process = psutil.Process()
process_memory = current_process.memory_info().rss
system_memory = psutil.virtual_memory().total
process_memory_percent = (process_memory / system_memory) * 100
process_memory_percent = (process_memory / system_memory) * 100
return [process_memory, int(process_memory_percent)]
@staticmethod

View File

@@ -5,18 +5,8 @@
HOST=0.0.0.0
# 【*】超级管理员,设置后一但重启将固化到数据库中,修改将无效(初始化超级管理员密码仅会生成一次,请在日志中查看并自行登录系统修改)
SUPERUSER=admin
# 重启自动升级 release/dev/true/false
MOVIEPILOT_AUTO_UPDATE=release
# 自动检查和更新站点资源包(索引、认证等)
AUTO_UPDATE_RESOURCE=true
# 网络代理服务器地址 http(s)://ip:port、socks5://user:pass@host:port、socks5h://user:pass@host:port
PROXY_HOST=
# 媒体识别来源 themoviedb/douban使用themoviedb时需要确保能正常连接api.themoviedb.org使用douban时不支持二级分类
RECOGNIZE_SOURCE=themoviedb
# OCR服务器地址
OCR_HOST=https://movie-pilot.org
# 搜索多个名称true/false为true时搜索时会同时搜索中英文及原始名称搜索结果会更全面但会增加搜索时间为false时其中一个名称搜索到结果或全部名称搜索完毕即停止
SEARCH_MULTIPLE_NAME=false
# 开发调试模式,仅开发人员使用,打开后将停止后台服务
DEV=false
# 为指定字幕添加.default后缀设置为默认字幕支持为'zh-cn''zh-tw''eng'添加默认字幕未定义或设置为None则不添加
DEFAULT_SUB=zh-cn
# 数据库连接池的大小可适当降低如20-50以减少I/O压力
@@ -25,5 +15,9 @@ DB_POOL_SIZE=100
DB_MAX_OVERFLOW=500
# SQLite 的 busy_timeout 参数可适当增加如180以减少锁定错误
DB_TIMEOUT=60
# 是否开发调试模式,仅开发人员使用,打开后将停止后台服务
DEV=false
# 是否启用内存监控,开启后将定期生成内存快照文件
MEMORY_ANALYSIS=false
# 内存快照间隔(分钟)
MEMORY_SNAPSHOT_INTERVAL=60
# 保留的内存快照文件数量
MEMORY_SNAPSHOT_KEEP_COUNT=20

View File

@@ -70,3 +70,4 @@ cf_clearance~=0.31.0
oss2~=2.19.1
tqdm~=4.67.1
setuptools~=78.1.0
pympler~=1.1

View File

@@ -1,2 +1,2 @@
APP_VERSION = 'v2.5.3'
FRONTEND_VERSION = 'v2.5.3'
APP_VERSION = 'v2.5.4'
FRONTEND_VERSION = 'v2.5.4'