Compare commits

...

67 Commits

Author SHA1 Message Date
jxxghp
8902fb50d6 更新 context.py 2025-07-16 22:22:45 +08:00
jxxghp
b6aa013eb3 v2.6.6 2025-07-16 20:25:43 +08:00
jxxghp
034b43bf70 fix context 2025-07-16 19:59:06 +08:00
jxxghp
59e9032286 add subscribe share statistic api 2025-07-16 08:47:54 +08:00
jxxghp
52a98efd0a add subscribe share statistic api 2025-07-16 08:31:28 +08:00
jxxghp
90cc91aa7f Merge pull request #4614 from Aqr-K/feature-ua 2025-07-15 06:47:34 +08:00
Aqr-K
1973a26e83 fix: 去除冗余代码,简化写法 2025-07-14 22:19:48 +08:00
Aqr-K
6519ad25ca fix is_aarch 2025-07-14 22:17:04 +08:00
Aqr-K
cacfde8166 fix 2025-07-14 22:14:52 +08:00
Aqr-K
df85873726 feat(ua): add cup_arch , USER_AGENT value add cup_arch 2025-07-14 22:04:09 +08:00
jxxghp
dfea294cc9 fix ua 2025-07-14 13:42:49 +08:00
jxxghp
d35b855404 fix ua 2025-07-14 13:30:18 +08:00
jxxghp
7a1cbf70e3 feat:特定默认UA 2025-07-14 12:35:08 +08:00
jxxghp
f260990b86 更新 version.py 2025-07-13 15:14:10 +08:00
jxxghp
6affbe9b55 fix #4558 2025-07-13 15:04:41 +08:00
jxxghp
dbe3a10697 fix 2025-07-13 14:53:39 +08:00
jxxghp
3c25306a5d fix #4590 2025-07-13 14:43:48 +08:00
jxxghp
17f4d49731 fix #4594 2025-07-13 14:24:41 +08:00
jxxghp
e213b5cc64 Merge branch 'v2' of https://github.com/jxxghp/MoviePilot into v2 2025-07-13 14:14:26 +08:00
jxxghp
65e5dad44b 优化移动模式下的种子和残留目录删除逻辑 2025-07-13 14:14:24 +08:00
jxxghp
62ad38ea5d Merge pull request #4605 from wikrin/torrent_optimize 2025-07-13 13:25:35 +08:00
Attente
f98f4c1f77 refactor(helper): 优化 TorrentHelper 类
- 添加检查临时目录中是否存在种子文件
- 修改 match_torrent 方法参数类型
- 优化种子文件下载和处理逻辑
2025-07-13 13:16:36 +08:00
jxxghp
e9f02b58b7 Merge pull request #4604 from cddjr/fix_4602 2025-07-13 06:51:36 +08:00
景大侠
05495e481d fix #4602 2025-07-13 01:10:07 +08:00
jxxghp
5bb2167b78 Merge pull request #4603 from cddjr/fix_nettest 2025-07-12 18:34:54 +08:00
景大侠
b4e0ed66cf 完善网络连通性测试的错误描述 2025-07-12 18:15:19 +08:00
jxxghp
70a0563435 add server_type return 2025-07-12 14:52:18 +08:00
jxxghp
955912b832 fix plex 2025-07-12 14:44:45 +08:00
jxxghp
b65ee75b3d Merge pull request #4601 from cddjr/minimal_deps 2025-07-11 21:46:13 +08:00
景大侠
f642493a38 fix 2025-07-11 21:25:10 +08:00
jxxghp
7f1bfb1e07 Merge pull request #4599 from jtcymc/v2 2025-07-11 21:12:16 +08:00
景大侠
8931e2e016 fix 仅安装用户需要使用的插件依赖 2025-07-11 21:04:33 +08:00
shaw
0465fa77c2 fix(filemanager): 检查目标媒体库目录是否设置
- 在文件整理过程中,增加对目标媒体库目录是否设置的检查- 如果目标媒体库目录未设置,返回错误信息并中断整理过程
- 优化了错误处理逻辑,提高了系统的稳定性和可靠性
2025-07-11 20:02:12 +08:00
jxxghp
575d503cb9 Merge pull request #4598 from cddjr/fix_4586 2025-07-11 18:12:57 +08:00
景大侠
a4fdbdb9ad fix 极空间、Unraid误报网络文件系统 2025-07-11 18:03:19 +08:00
jxxghp
b9cb781a4e rollback size 2025-07-11 08:34:02 +08:00
jxxghp
a3adf867b7 fix 2025-07-10 22:48:08 +08:00
jxxghp
d52cbd2f74 feat:资源下载事件保存路径 2025-07-10 22:16:19 +08:00
jxxghp
8d0003db94 更新 version.py 2025-07-10 11:57:54 +08:00
jxxghp
b775e89e77 fix #4581 2025-07-10 10:44:04 +08:00
jxxghp
0e14b097ba fix #4581 2025-07-10 10:39:22 +08:00
jxxghp
51848b8d8d fix #4581 2025-07-10 10:20:00 +08:00
jxxghp
72658c3e60 Merge pull request #4582 from cddjr/fix_rename_related 2025-07-09 20:42:54 +08:00
jxxghp
036cb6f3b0 remove memory helper 2025-07-09 19:11:37 +08:00
jxxghp
1a86d96bfa Merge pull request #4579 from jxxghp/cursor/bc-f8a13fbf-5ca0-4b0b-ae8d-59c208732d44-b74e 2025-07-09 17:43:46 +08:00
Cursor Agent
f67db38a25 Fix memory analysis performance and timeout issues across platforms
Co-authored-by: jxxghp <jxxghp@163.com>
2025-07-09 09:43:34 +00:00
Cursor Agent
028d18826a Refactor memory analysis with ThreadPoolExecutor for cross-platform timeout
Co-authored-by: jxxghp <jxxghp@163.com>
2025-07-09 09:38:06 +00:00
Cursor Agent
29a605f265 Optimize memory analysis with timeout, sampling, and performance improvements
Co-authored-by: jxxghp <jxxghp@163.com>
2025-07-09 08:57:22 +00:00
jxxghp
4b6959470d Merge pull request #4577 from jxxghp/cursor/analyze-memory-usage-discrepancies-6709 2025-07-09 16:08:00 +08:00
Cursor Agent
600767d2bf Remove memory analysis guide and test script
Co-authored-by: jxxghp <jxxghp@163.com>
2025-07-09 08:07:30 +00:00
Cursor Agent
3efbd47ffd Add comprehensive memory analysis tool with guide and test script
Co-authored-by: jxxghp <jxxghp@163.com>
2025-07-09 08:04:10 +00:00
Cursor Agent
d17e85217b Enhance memory analysis with detailed tracking, leak detection, and system insights
Co-authored-by: jxxghp <jxxghp@163.com>
2025-07-09 07:47:23 +00:00
jxxghp
e608089805 add Note Action 2025-07-09 12:22:22 +08:00
jxxghp
b852acec28 fix workflow 2025-07-09 09:34:53 +08:00
jxxghp
2a3ea8315d fix workflow 2025-07-09 00:19:47 +08:00
jxxghp
9271ee833c Merge pull request #4566 from jxxghp/cursor/helper-91dc
新增工作流分享相关接口和helper
2025-07-09 00:12:56 +08:00
Cursor Agent
570d4ad1a3 Fix workflow API by passing database session to WorkflowOper methods
Co-authored-by: jxxghp <jxxghp@163.com>
2025-07-08 15:44:55 +00:00
Cursor Agent
dccdf3231a Checkpoint before follow-up message 2025-07-08 15:42:31 +00:00
Cursor Agent
b8ee777fd2 Refactor workflow sharing with independent config and improved data access
Co-authored-by: jxxghp <jxxghp@163.com>
2025-07-08 15:33:43 +00:00
Cursor Agent
a2fd3a8d90 Implement workflow sharing feature with new API endpoints and helper
Co-authored-by: jxxghp <jxxghp@163.com>
2025-07-08 15:26:16 +00:00
Cursor Agent
bbffb1420b Add workflow sharing, forking, and related API endpoints
Co-authored-by: jxxghp <jxxghp@163.com>
2025-07-08 15:18:01 +00:00
景大侠
8ea0a32879 fix 优化重命名后的媒体文件根路径获取 2025-07-08 22:37:32 +08:00
景大侠
8c27b8c33e fix 文件管理的自动重命名缺少集信息 2025-07-08 22:37:09 +08:00
景大侠
5c61b22c2f fix 未启用重命名时,整理文件的转移路径不正确 2025-07-08 21:49:31 +08:00
jxxghp
9da9d765a0 fix:静态类引用 2025-07-08 21:40:04 +08:00
jxxghp
f64363728e fix:静态类引用 2025-07-08 21:38:34 +08:00
jxxghp
378777dc7c feat:弱引用单例 2025-07-08 21:29:01 +08:00
58 changed files with 1190 additions and 970 deletions

30
app/actions/note.py Normal file
View File

@@ -0,0 +1,30 @@
from app.actions import BaseAction
from app.schemas import ActionContext
class NoteAction(BaseAction):
"""
备注
"""
@classmethod
@property
def name(cls) -> str: # noqa
return "备注"
@classmethod
@property
def description(cls) -> str: # noqa
return "给工作流添加备注"
@classmethod
@property
def data(cls) -> dict: # noqa
return {}
@property
def success(self) -> bool:
return True
def execute(self, workflow_id: int, params: dict, context: ActionContext) -> ActionContext:
return context

View File

@@ -560,6 +560,15 @@ def popular_subscribes(
return SubscribeHelper().get_shares(name=name, page=page, count=count)
@router.get("/share/statistics", summary="查询订阅分享统计", response_model=List[schemas.SubscribeShareStatistics])
def subscribe_share_statistics(_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
查询订阅分享统计
返回每个分享人分享的媒体数量以及总的复用人次
"""
return SubscribeHelper().get_share_statistics()
@router.get("/{subscribe_id}", summary="订阅详情", response_model=schemas.Subscribe)
def read_subscribe(
subscribe_id: int,

View File

@@ -11,6 +11,7 @@ from typing import Optional, Union, Annotated
import aiofiles
import pillow_avif # noqa 用于自动注册AVIF支持
from PIL import Image
from app.helper.sites import SitesHelper
from fastapi import APIRouter, Body, Depends, HTTPException, Header, Request, Response
from fastapi.responses import StreamingResponse
@@ -18,10 +19,10 @@ from app import schemas
from app.chain.search import SearchChain
from app.chain.system import SystemChain
from app.core.config import global_vars, settings
from app.core.event import eventmanager
from app.core.metainfo import MetaInfo
from app.core.module import ModuleManager
from app.core.security import verify_apitoken, verify_resource_token, verify_token
from app.core.event import eventmanager
from app.db.models import User
from app.db.systemconfig_oper import SystemConfigOper
from app.db.user_oper import get_current_active_superuser
@@ -29,7 +30,6 @@ from app.helper.mediaserver import MediaServerHelper
from app.helper.message import MessageHelper
from app.helper.progress import ProgressHelper
from app.helper.rule import RuleHelper
from app.helper.sites import SitesHelper
from app.helper.subscribe import SubscribeHelper
from app.helper.system import SystemHelper
from app.log import logger
@@ -187,9 +187,11 @@ def get_global_setting(token: str):
"COOKIECLOUD_KEY", "COOKIECLOUD_PASSWORD", "GITHUB_TOKEN", "REPO_GITHUB_TOKEN"}
)
# 追加用户唯一ID和订阅分享管理权限
share_admin = SubscribeHelper().is_admin_user()
info.update({
"USER_UNIQUE_ID": SubscribeHelper().get_user_uuid(),
"SUBSCRIBE_SHARE_MANAGE": SubscribeHelper().is_admin_user(),
"SUBSCRIBE_SHARE_MANAGE": share_admin,
"WORKFLOW_SHARE_MANAGE": share_admin
})
return schemas.Response(success=True,
data=info)
@@ -290,9 +292,9 @@ def get_setting(key: str,
@router.post("/setting/{key}", summary="更新系统设置", response_model=schemas.Response)
def set_setting(
key: str,
value: Annotated[Union[list, dict, bool, int, str] | None, Body()] = None,
_: User = Depends(get_current_active_superuser),
key: str,
value: Annotated[Union[list, dict, bool, int, str] | None, Body()] = None,
_: User = Depends(get_current_active_superuser),
):
"""
更新系统设置(仅管理员)
@@ -452,10 +454,10 @@ def ruletest(title: str,
@router.get("/nettest", summary="测试网络连通性")
def nettest(
url: str,
proxy: bool,
include: Optional[str] = None,
_: schemas.TokenPayload = Depends(verify_token),
url: str,
proxy: bool,
include: Optional[str] = None,
_: schemas.TokenPayload = Depends(verify_token),
):
"""
测试网络连通性
@@ -463,17 +465,27 @@ def nettest(
# 记录开始的毫秒数
start_time = datetime.now()
headers = None
if "github" in url or "{GITHUB_PROXY}" in url:
# 当前使用的加速代理
proxy_name = ""
if "github" in url:
# 这是github的连通性测试
headers = settings.GITHUB_HEADERS
if "{GITHUB_PROXY}" in url:
url = url.replace(
"{GITHUB_PROXY}", UrlUtils.standardize_base_url(settings.GITHUB_PROXY or "")
)
headers = settings.GITHUB_HEADERS
if settings.GITHUB_PROXY:
proxy_name = "Github加速代理"
if "{PIP_PROXY}" in url:
url = url.replace(
"{PIP_PROXY}",
UrlUtils.standardize_base_url(
settings.PIP_PROXY or "https://pypi.org/simple/"
),
)
if settings.PIP_PROXY:
proxy_name = "PIP加速代理"
url = url.replace("{TMDBAPIKEY}", settings.TMDB_API_KEY)
url = url.replace(
"{PIP_PROXY}",
UrlUtils.standardize_base_url(settings.PIP_PROXY or "https://pypi.org/simple/"),
)
result = RequestUtils(
proxies=settings.PROXY if proxy else None,
headers=headers,
@@ -485,21 +497,36 @@ def nettest(
time = round((end_time - start_time).total_seconds() * 1000)
# 计算相关秒数
if result is None:
return schemas.Response(success=False, message="无法连接", data={"time": time})
return schemas.Response(
success=False, message=f"{proxy_name}无法连接", data={"time": time}
)
elif result.status_code == 200:
if include and not re.search(r"%s" % include, result.text, re.IGNORECASE):
# 通常是被加速代理跳转到其它页面了
logger.error(f"{url} 的响应内容不匹配包含规则 {include}")
if proxy_name:
message = f"{proxy_name}已失效,请检查配置"
else:
message = f"无效响应,不匹配 {include}"
return schemas.Response(
success=False,
message=f"无效响应,不匹配 {include}",
message=message,
data={"time": time},
)
return schemas.Response(success=True, data={"time": time})
else:
return schemas.Response(
success=False, message=f"错误码:{result.status_code}", data={"time": time}
)
if proxy_name:
# 加速代理失败
message = f"{proxy_name}已失效,错误码:{result.status_code}"
else:
message = f"错误码:{result.status_code}"
if "github" in url:
# 非加速代理访问github
if result.status_code == 401:
message = "Github Token已失效请检查配置"
elif result.status_code in {403, 429}:
message = "触发限流请配置Github Token"
return schemas.Response(success=False, message=message, data={"time": time})
@router.get("/modulelist", summary="查询已加载的模块ID列表", response_model=schemas.Response)

View File

@@ -8,11 +8,13 @@ from app import schemas
from app.chain.media import MediaChain
from app.chain.storage import StorageChain
from app.chain.transfer import TransferChain
from app.core.config import settings
from app.core.metainfo import MetaInfoPath
from app.core.security import verify_token, verify_apitoken
from app.db import get_db
from app.db.models.transferhistory import TransferHistory
from app.db.user_oper import get_current_active_superuser
from app.helper.directory import DirectoryHelper
from app.schemas import MediaType, FileItem, ManualTransferItem
router = APIRouter()
@@ -35,11 +37,19 @@ def query_name(path: str, filetype: str,
if not new_path:
return schemas.Response(success=False, message="未识别到新名称")
if filetype == "dir":
parents = Path(new_path).parents
if len(parents) > 2:
new_name = parents[1].name
media_path = DirectoryHelper.get_media_root_path(
rename_format=settings.RENAME_FORMAT(mediainfo.type),
rename_path=Path(new_path),
)
if media_path:
new_name = media_path.name
else:
new_name = parents[0].name
# fallback
parents = Path(new_path).parents
if len(parents) > 2:
new_name = parents[1].name
else:
new_name = parents[0].name
else:
new_name = Path(new_path).name
return schemas.Response(success=True, data={

View File

@@ -1,3 +1,4 @@
import json
from datetime import datetime
from typing import List, Any, Optional
@@ -5,14 +6,15 @@ from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from app import schemas
from app.chain.workflow import WorkflowChain
from app.core.config import global_vars
from app.core.plugin import PluginManager
from app.core.workflow import WorkFlowManager
from app.db import get_db
from app.db.models.workflow import Workflow
from app.db.models import Workflow
from app.db.systemconfig_oper import SystemConfigOper
from app.db.user_oper import get_current_active_user
from app.chain.workflow import WorkflowChain
from app.helper.workflow import WorkflowHelper
from app.scheduler import Scheduler
router = APIRouter()
@@ -24,7 +26,8 @@ def list_workflows(db: Session = Depends(get_db),
"""
获取工作流列表
"""
return Workflow.list(db)
from app.db.workflow_oper import WorkflowOper
return WorkflowOper(db).list()
@router.post("/", summary="创建工作流", response_model=schemas.Response)
@@ -34,13 +37,15 @@ def create_workflow(workflow: schemas.Workflow,
"""
创建工作流
"""
if Workflow.get_by_name(db, workflow.name):
from app.db.workflow_oper import WorkflowOper
if workflow.name and WorkflowOper(db).get_by_name(workflow.name):
return schemas.Response(success=False, message="已存在相同名称的工作流")
if not workflow.add_time:
workflow.add_time = datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S")
if not workflow.state:
workflow.state = "P"
Workflow(**workflow.dict()).create(db)
from app.db.models.workflow import Workflow as WorkflowModel
WorkflowModel(**workflow.dict()).create(db)
return schemas.Response(success=True, message="创建工作流成功")
@@ -60,47 +65,97 @@ def list_actions(_: schemas.TokenPayload = Depends(get_current_active_user)) ->
return WorkFlowManager().list_actions()
@router.get("/{workflow_id}", summary="工作流详情", response_model=schemas.Workflow)
def get_workflow(workflow_id: int,
db: Session = Depends(get_db),
_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
@router.post("/share", summary="分享工作流", response_model=schemas.Response)
def workflow_share(
workflow: schemas.WorkflowShare,
_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
"""
获取工作流详情
分享工作流
"""
return Workflow.get(db, workflow_id)
if not workflow.id or not workflow.share_title or not workflow.share_user:
return schemas.Response(success=False, message="请填写工作流ID、分享标题和分享人")
state, errmsg = WorkflowHelper().workflow_share(workflow_id=workflow.id,
share_title=workflow.share_title or "",
share_comment=workflow.share_comment or "",
share_user=workflow.share_user or "")
return schemas.Response(success=state, message=errmsg)
@router.put("/{workflow_id}", summary="更新工作流", response_model=schemas.Response)
def update_workflow(workflow: schemas.Workflow,
db: Session = Depends(get_db),
_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
@router.delete("/share/{share_id}", summary="删除分享", response_model=schemas.Response)
def workflow_share_delete(
share_id: int,
_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
"""
更新工作流
删除分享
"""
wf = Workflow.get(db, workflow.id)
if not wf:
return schemas.Response(success=False, message="工作流不存在")
wf.update(db, workflow.dict())
return schemas.Response(success=True, message="更新成功")
state, errmsg = WorkflowHelper().share_delete(share_id=share_id)
return schemas.Response(success=state, message=errmsg)
@router.delete("/{workflow_id}", summary="删除工作流", response_model=schemas.Response)
def delete_workflow(workflow_id: int,
db: Session = Depends(get_db),
_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
@router.post("/fork", summary="复用工作流", response_model=schemas.Response)
def workflow_fork(
workflow: schemas.WorkflowShare,
db: Session = Depends(get_db),
_: schemas.User = Depends(get_current_active_user)) -> Any:
"""
删除工作流
复用工作流
"""
workflow = Workflow.get(db, workflow_id)
if not workflow:
return schemas.Response(success=False, message="工作流不存在")
# 删除定时任务
Scheduler().remove_workflow_job(workflow)
# 删除工作流
Workflow.delete(db, workflow_id)
# 删除缓存
SystemConfigOper().delete(f"WorkflowCache-{workflow_id}")
return schemas.Response(success=True, message="删除成功")
if not workflow.name:
return schemas.Response(success=False, message="工作流名称不能为空")
# 解析JSON数据添加错误处理
try:
actions = json.loads(workflow.actions or "[]")
except json.JSONDecodeError:
return schemas.Response(success=False, message="actions字段JSON格式错误")
try:
flows = json.loads(workflow.flows or "[]")
except json.JSONDecodeError:
return schemas.Response(success=False, message="flows字段JSON格式错误")
try:
context = json.loads(workflow.context or "{}")
except json.JSONDecodeError:
return schemas.Response(success=False, message="context字段JSON格式错误")
# 创建工作流
workflow_dict = {
"name": workflow.name,
"description": workflow.description,
"timer": workflow.timer,
"actions": actions,
"flows": flows,
"context": context,
"state": "P" # 默认暂停状态
}
# 检查名称是否重复
if Workflow.get_by_name(db, workflow_dict["name"]):
return schemas.Response(success=False, message="已存在相同名称的工作流")
# 创建新工作流
workflow = Workflow(**workflow_dict)
workflow.create(db)
# 更新复用次数
if workflow.id:
WorkflowHelper().workflow_fork(share_id=workflow.id)
return schemas.Response(success=True, message="复用成功")
@router.get("/shares", summary="查询分享的工作流", response_model=List[schemas.WorkflowShare])
def workflow_shares(
name: Optional[str] = None,
page: Optional[int] = 1,
count: Optional[int] = 30,
_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
"""
查询分享的工作流
"""
return WorkflowHelper().get_shares(name=name, page=page, count=count)
@router.post("/{workflow_id}/run", summary="执行工作流", response_model=schemas.Response)
@@ -123,7 +178,8 @@ def start_workflow(workflow_id: int,
"""
启用工作流
"""
workflow = Workflow.get(db, workflow_id)
from app.db.workflow_oper import WorkflowOper
workflow = WorkflowOper(db).get(workflow_id)
if not workflow:
return schemas.Response(success=False, message="工作流不存在")
# 添加定时任务
@@ -140,7 +196,8 @@ def pause_workflow(workflow_id: int,
"""
停用工作流
"""
workflow = Workflow.get(db, workflow_id)
from app.db.workflow_oper import WorkflowOper
workflow = WorkflowOper(db).get(workflow_id)
if not workflow:
return schemas.Response(success=False, message="工作流不存在")
# 删除定时任务
@@ -159,7 +216,8 @@ def reset_workflow(workflow_id: int,
"""
重置工作流
"""
workflow = Workflow.get(db, workflow_id)
from app.db.workflow_oper import WorkflowOper
workflow = WorkflowOper(db).get(workflow_id)
if not workflow:
return schemas.Response(success=False, message="工作流不存在")
# 停止工作流
@@ -169,3 +227,52 @@ def reset_workflow(workflow_id: int,
# 删除缓存
SystemConfigOper().delete(f"WorkflowCache-{workflow_id}")
return schemas.Response(success=True)
@router.get("/{workflow_id}", summary="工作流详情", response_model=schemas.Workflow)
def get_workflow(workflow_id: int,
db: Session = Depends(get_db),
_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
"""
获取工作流详情
"""
from app.db.workflow_oper import WorkflowOper
return WorkflowOper(db).get(workflow_id)
@router.put("/{workflow_id}", summary="更新工作流", response_model=schemas.Response)
def update_workflow(workflow: schemas.Workflow,
db: Session = Depends(get_db),
_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
"""
更新工作流
"""
from app.db.workflow_oper import WorkflowOper
if not workflow.id:
return schemas.Response(success=False, message="工作流ID不能为空")
wf = WorkflowOper(db).get(workflow.id)
if not wf:
return schemas.Response(success=False, message="工作流不存在")
wf.update(db, workflow.dict())
return schemas.Response(success=True, message="更新成功")
@router.delete("/{workflow_id}", summary="删除工作流", response_model=schemas.Response)
def delete_workflow(workflow_id: int,
db: Session = Depends(get_db),
_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
"""
删除工作流
"""
from app.db.workflow_oper import WorkflowOper
workflow = WorkflowOper(db).get(workflow_id)
if not workflow:
return schemas.Response(success=False, message="工作流不存在")
# 删除定时任务
Scheduler().remove_workflow_job(workflow)
# 删除工作流
from app.db.models.workflow import Workflow as WorkflowModel
WorkflowModel.delete(db, workflow_id)
# 删除缓存
SystemConfigOper().delete(f"WorkflowCache-{workflow_id}")
return schemas.Response(success=True, message="删除成功")

View File

@@ -188,6 +188,9 @@ class DownloadChain(ChainBase):
f"Resource download canceled by event: {event_data.source},"
f"Reason: {event_data.reason}")
return None
# 如果事件修改了下载路径,使用新路径
if event_data.options and event_data.options.get("save_path"):
save_path = event_data.options.get("save_path")
# 补充完整的media数据
if not _media.genre_ids:

View File

@@ -367,12 +367,12 @@ class MediaChain(ChainBase):
overwrite=overwrite)
else:
# 检查目的目录下是否已经有nfo刮削文件
sub_files = storagechain.list_files(fileitem)
if any(f.name.endswith('.nfo') for f in sub_files):
has_nfo_file = storagechain.any_files(fileitem, extensions=['.nfo'])
if has_nfo_file and file_list:
logger.info(f"目录 {fileitem.path} 已有NFO文件开始增量刮削...")
for file_path in file_list:
file_item = storagechain.get_file_item(storage=fileitem.storage,
path=Path(file_path))
path=Path(file_path))
if file_item:
# 对于电视剧文件,应该保存到与视频文件相同的目录
# 而不是电视剧根目录

View File

@@ -178,15 +178,14 @@ class StorageChain(ChainBase):
if mtype:
# 重命名格式
rename_format = settings.TV_RENAME_FORMAT \
if mtype == MediaType.TV else settings.MOVIE_RENAME_FORMAT
# 计算重命名中的文件夹层数
rename_format_level = len(rename_format.split("/")) - 1
if rename_format_level < 1:
rename_format = settings.RENAME_FORMAT(mtype)
media_path = DirectoryHelper.get_media_root_path(
rename_format, rename_path=Path(fileitem.path)
)
if not media_path:
return True
# 处理媒体文件根目录
dir_item = self.get_file_item(storage=fileitem.storage,
path=Path(fileitem.path).parents[rename_format_level - 1])
dir_item = self.get_file_item(storage=fileitem.storage, path=media_path)
else:
# 处理上级目录
dir_item = self.get_parent_item(fileitem)

View File

@@ -6,6 +6,7 @@ from typing import Union, Optional
from app.chain import ChainBase
from app.core.config import settings
from app.core.plugin import PluginManager
from app.log import logger
from app.schemas import Notification, MessageChannel
from app.utils.http import RequestUtils
@@ -136,13 +137,6 @@ class SystemChain(ChainBase):
shutil.rmtree(target_path)
shutil.copytree(item, target_path)
logger.info(f"已恢复插件目录: {item.name}")
# 安装依赖
requirements_file = target_path / "requirements.txt"
if requirements_file.exists():
logger.info(f"正在安装插件 {item.name} 的依赖...")
success, message = PluginHelper.pip_install_with_fallback(requirements_file)
if not success:
logger.warn(f"插件 {item.name} 依赖安装失败: {message}")
restored_count += 1
# 如果是文件
elif item.is_file():
@@ -155,6 +149,9 @@ class SystemChain(ChainBase):
logger.info(f"插件恢复完成,共恢复 {restored_count} 个项目")
# 安装缺少的依赖
PluginManager.install_plugin_missing_dependencies()
# 删除备份目录
try:
shutil.rmtree(backup_dir)

View File

@@ -140,6 +140,16 @@ class TorrentsChain(ChainBase):
:param stype: 强制指定缓存类型spider:爬虫缓存rss:rss缓存
:param sites: 强制指定站点ID列表为空则读取设置的订阅站点
"""
def __is_no_cache_site(_domain: str) -> bool:
"""
判断站点是否不需要缓存
"""
for url_key in settings.NO_CACHE_SITE_KEY.split(','):
if url_key in _domain:
return True
return False
# 刷新类型
if not stype:
stype = settings.SUBSCRIBE_MODE
@@ -178,11 +188,16 @@ class TorrentsChain(ChainBase):
# 取前N条
torrents = torrents[:settings.CONF.refresh]
if torrents:
# 过滤出没有处理过的种子 - 优化:使用集合查找,避免重复创建字符串列表
cached_signatures = {f'{t.torrent_info.title}{t.torrent_info.description}'
for t in torrents_cache.get(domain) or []}
torrents = [torrent for torrent in torrents
if f'{torrent.title}{torrent.description}' not in cached_signatures]
if __is_no_cache_site(domain):
# 不需要缓存的站点,直接处理
logger.info(f'{indexer.get("name")}{len(torrents)} 个种子 (不缓存)')
torrents_cache[domain] = []
else:
# 过滤出没有处理过的种子 - 优化:使用集合查找,避免重复创建字符串列表
cached_signatures = {f'{t.torrent_info.title}{t.torrent_info.description}'
for t in torrents_cache.get(domain) or []}
torrents = [torrent for torrent in torrents
if f'{torrent.title}{torrent.description}' not in cached_signatures]
if torrents:
logger.info(f'{indexer.get("name")}{len(torrents)} 个新种子')
else:

View File

@@ -5,7 +5,6 @@ import threading
import traceback
from copy import deepcopy
from pathlib import Path
from queue import Queue
from time import sleep
from typing import List, Optional, Tuple, Union, Dict, Callable
@@ -16,9 +15,9 @@ from app.chain.storage import StorageChain
from app.chain.tmdb import TmdbChain
from app.core.config import settings, global_vars
from app.core.context import MediaInfo
from app.core.event import eventmanager
from app.core.meta import MetaBase
from app.core.metainfo import MetaInfoPath
from app.core.event import eventmanager
from app.db.downloadhistory_oper import DownloadHistoryOper
from app.db.models.downloadhistory import DownloadHistory
from app.db.models.transferhistory import TransferHistory
@@ -28,11 +27,11 @@ from app.helper.directory import DirectoryHelper
from app.helper.format import FormatParser
from app.helper.progress import ProgressHelper
from app.log import logger
from app.schemas import StorageOperSelectionEventData
from app.schemas import TransferInfo, TransferTorrent, Notification, EpisodeFormat, FileItem, TransferDirectoryConf, \
TransferTask, TransferQueue, TransferJob, TransferJobTask
from app.schemas.types import TorrentStatus, EventType, MediaType, ProgressKey, NotificationType, MessageChannel, \
SystemConfigKey, ChainEventType, ContentType
from app.schemas import StorageOperSelectionEventData
from app.utils.singleton import Singleton
from app.utils.string import StringUtils
@@ -213,6 +212,7 @@ class JobManager:
set(self._season_episodes[mediaid]) - set(task.meta.episode_list)
)
return task
return None
def remove_job(self, task: TransferTask) -> Optional[TransferJob]:
"""
@@ -226,6 +226,7 @@ class JobManager:
if __mediaid__ in self._season_episodes:
self._season_episodes.pop(__mediaid__)
return self._job_view.pop(__mediaid__)
return None
def is_done(self, task: TransferTask) -> bool:
"""
@@ -311,7 +312,7 @@ class JobManager:
def count(self, media: MediaInfo, season: Optional[int] = None) -> int:
"""
获取某项任务总数
获取某项任务成功总数
"""
__mediaid__ = self.__get_media_id(media=media, season=season)
with job_lock:
@@ -322,7 +323,7 @@ class JobManager:
def size(self, media: MediaInfo, season: Optional[int] = None) -> int:
"""
获取某项任务总大小
获取某项任务成功文件总大小
"""
__mediaid__ = self.__get_media_id(media=media, season=season)
with job_lock:
@@ -359,22 +360,20 @@ class TransferChain(ChainBase, metaclass=Singleton):
文件整理处理链
"""
# 可处理的文件后缀
all_exts = settings.RMT_MEDIAEXT
# 待整理任务队列
_queue = Queue()
# 文件整理线程
_transfer_thread = None
# 队列间隔时间(秒)
_transfer_interval = 15
def __init__(self):
super().__init__()
# 可处理的文件后缀
self.all_exts = settings.RMT_MEDIAEXT
# 待整理任务队列
self._queue = queue.Queue()
# 文件整理线程
self._transfer_thread = None
# 队列间隔时间(秒)
self._transfer_interval = 15
# 事件管理器
self.jobview = JobManager()
# 车移成功的文件清单
self._success_target_files: Dict[str, List[str]] = {}
# 启动整理任务
self.__init()
@@ -391,6 +390,44 @@ class TransferChain(ChainBase, metaclass=Singleton):
"""
整理完成后处理
"""
def __do_finished():
"""
完成时发送消息、刮削事件、移除任务等
"""
# 更新文件数量
transferinfo.file_count = self.jobview.count(task.mediainfo, task.meta.begin_season) or 1
# 更新文件大小
transferinfo.total_size = self.jobview.size(task.mediainfo,
task.meta.begin_season) or task.fileitem.size
# 更新文件清单
transferinfo.file_list_new = self._success_target_files.pop(transferinfo.target_diritem.path, [])
# 发送通知,实时手动整理时不发
if transferinfo.need_notify and (task.background or not task.manual):
se_str = None
if task.mediainfo.type == MediaType.TV:
season_episodes = self.jobview.season_episodes(task.mediainfo, task.meta.begin_season)
if season_episodes:
se_str = f"{task.meta.season} {StringUtils.format_ep(season_episodes)}"
else:
se_str = f"{task.meta.season}"
self.send_transfer_message(meta=task.meta,
mediainfo=task.mediainfo,
transferinfo=transferinfo,
season_episode=se_str,
username=task.username)
# 刮削事件
if transferinfo.need_scrape:
self.eventmanager.send_event(EventType.MetadataScrape, {
'meta': task.meta,
'mediainfo': task.mediainfo,
'fileitem': transferinfo.target_diritem,
'file_list': transferinfo.file_list_new,
'overwrite': False
})
# 移除已完成的任务
self.jobview.remove_job(task)
transferhis = TransferHistoryOper()
if not transferinfo.success:
# 转移失败
@@ -416,6 +453,10 @@ class TransferChain(ChainBase, metaclass=Singleton):
))
# 整理失败
self.jobview.fail_task(task)
with task_lock:
# 整理完成且有成功的任务时
if self.jobview.is_finished(task):
__do_finished()
return False, transferinfo.message
# 转移成功
@@ -444,57 +485,49 @@ class TransferChain(ChainBase, metaclass=Singleton):
})
with task_lock:
# 登记转移成功文件清单
target_dir_path = transferinfo.target_diritem.path
target_files = transferinfo.file_list_new
if self._success_target_files.get(target_dir_path):
self._success_target_files[target_dir_path].extend(target_files)
else:
self._success_target_files[target_dir_path] = target_files
# 全部整理成功时
if self.jobview.is_success(task):
# 移动模式删除空目录
if transferinfo.transfer_type in ["move"]:
# 所有成功的业务
tasks = self.jobview.success_tasks(task.mediainfo, task.meta.begin_season)
# 记录已处理的种子hash
processed_hashes = set()
storagechain = StorageChain()
downloadhistoryoper = DownloadHistoryOper()
for t in tasks:
# 下载器hash
if t.download_hash and t.download_hash not in processed_hashes:
processed_hashes.add(t.download_hash)
if self.remove_torrents(t.download_hash, downloader=t.downloader):
logger.info(f"移动模式删除种子成功:{t.download_hash} ")
# 删除残留目录
if t.fileitem:
storagechain.delete_media_file(t.fileitem, delete_self=False)
if not t.download_hash:
continue
# 通过download_hash获取种子保存目录
download_history = downloadhistoryoper.get_by_hash(t.download_hash)
if download_history and download_history.path:
# 检查种子目录下是否还有有效媒体文件
seed_dir_item = storagechain.get_file_item(storage=t.fileitem.storage,
path=Path(download_history.path))
if seed_dir_item and seed_dir_item.type == "dir":
remain_files = storagechain.list_files(seed_dir_item, recursion=True)
has_media = any(
f.extension and f.extension.lower() in [ext.lstrip('.') for ext in self.all_exts]
for f in remain_files if f.type == "file"
)
if not has_media:
if self.remove_torrents(t.download_hash, downloader=t.downloader):
logger.info(f"移动模式删除种子成功:{t.download_hash} ")
# 删除残留目录
if t.fileitem:
storagechain.delete_media_file(t.fileitem, delete_self=False)
else:
logger.info(
f"种子目录 {download_history.path} 还有未整理的媒体文件,暂不删除种子和残留目录")
# 整理完成且有成功的任务时
if self.jobview.is_finished(task):
# 发送通知,实时手动整理时不发
if transferinfo.need_notify and (task.background or not task.manual):
se_str = None
if task.mediainfo.type == MediaType.TV:
season_episodes = self.jobview.season_episodes(task.mediainfo, task.meta.begin_season)
if season_episodes:
se_str = f"{task.meta.season} {StringUtils.format_ep(season_episodes)}"
else:
se_str = f"{task.meta.season}"
# 更新文件数量
transferinfo.file_count = self.jobview.count(task.mediainfo, task.meta.begin_season) or 1
# 更新文件大小
transferinfo.total_size = self.jobview.size(task.mediainfo,
task.meta.begin_season) or task.fileitem.size
self.send_transfer_message(meta=task.meta,
mediainfo=task.mediainfo,
transferinfo=transferinfo,
season_episode=se_str,
username=task.username)
# 刮削事件
if transferinfo.need_scrape:
self.eventmanager.send_event(EventType.MetadataScrape, {
'meta': task.meta,
'mediainfo': task.mediainfo,
'fileitem': transferinfo.target_diritem,
'file_list': transferinfo.file_list_new,
'overwrite': False
})
# 移除已完成的任务
self.jobview.remove_job(task)
__do_finished()
return True, ""
@@ -1101,7 +1134,8 @@ class TransferChain(ChainBase, metaclass=Singleton):
# 自定义识别
if formaterHandler:
# 开始集、结束集、PART
begin_ep, end_ep, part = formaterHandler.split_episode(file_name=file_path.name, file_meta=file_meta)
begin_ep, end_ep, part = formaterHandler.split_episode(file_name=file_path.name,
file_meta=file_meta)
if begin_ep is not None:
file_meta.begin_episode = begin_ep
file_meta.part = part

View File

@@ -1,6 +1,8 @@
import copy
import json
import os
import platform
import re
import secrets
import sys
import threading
@@ -11,8 +13,10 @@ from dotenv import set_key
from pydantic import BaseModel, BaseSettings, validator, Field
from app.log import logger, log_settings, LogConfigModel
from app.schemas import MediaType
from app.utils.system import SystemUtils
from app.utils.url import UrlUtils
from version import APP_VERSION
class SystemConfModel(BaseModel):
@@ -211,6 +215,8 @@ class ConfigModel(BaseModel):
SITEDATA_REFRESH_INTERVAL: int = 6
# 读取和发送站点消息
SITE_MESSAGE: bool = True
# 不能缓存站点资源的站点域名,多个使用,分隔
NO_CACHE_SITE_KEY: str = "m-team"
# 种子标签
TORRENT_TAG: str = "MOVIEPILOT"
# 下载站点字幕
@@ -229,8 +235,6 @@ class ConfigModel(BaseModel):
COOKIECLOUD_INTERVAL: Optional[int] = 60 * 24
# CookieCloud同步黑名单多个域名,分割
COOKIECLOUD_BLACKLIST: Optional[str] = None
# CookieCloud对应的浏览器UA
USER_AGENT: str = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36 Edg/113.0.1774.57"
# 电影重命名格式
MOVIE_RENAME_FORMAT: str = "{{title}}{% if year %} ({{year}}){% endif %}" \
"/{{title}}{% if year %} ({{year}}){% endif %}{% if part %}-{{part}}{% endif %}{% if videoFormat %} - {{videoFormat}}{% endif %}" \
@@ -274,12 +278,6 @@ class ConfigModel(BaseModel):
REPO_GITHUB_TOKEN: Optional[str] = None
# 大内存模式
BIG_MEMORY_MODE: bool = False
# 是否启用内存监控
MEMORY_ANALYSIS: bool = False
# 内存快照间隔(分钟)
MEMORY_SNAPSHOT_INTERVAL: int = 30
# 保留的内存快照文件数量
MEMORY_SNAPSHOT_KEEP_COUNT: int = 20
# 全局图片缓存,将媒体图片缓存到本地
GLOBAL_IMAGE_CACHE: bool = False
# 是否启用编码探测的性能模式
@@ -311,6 +309,8 @@ class ConfigModel(BaseModel):
DEFAULT_SUB: Optional[str] = "zh-cn"
# Docker Client API地址
DOCKER_CLIENT_API: Optional[str] = "tcp://127.0.0.1:38379"
# 工作流数据共享
WORKFLOW_STATISTIC_SHARE: bool = True
class Settings(BaseSettings, ConfigModel, LogConfigModel):
@@ -510,6 +510,13 @@ class Settings(BaseSettings, ConfigModel, LogConfigModel):
"""
return "v2"
@property
def USER_AGENT(self) -> str:
"""
全局用户代理字符串
"""
return f"{self.PROJECT_NAME}/{APP_VERSION[1:]} ({platform.system()} {platform.release()}; {SystemUtils.cpu_arch()})"
@property
def INNER_CONFIG_PATH(self):
return self.ROOT_PATH / "config"
@@ -655,6 +662,23 @@ class Settings(BaseSettings, ConfigModel, LogConfigModel):
return None
return UrlUtils.combine_url(host=self.APP_DOMAIN, path=url)
def RENAME_FORMAT(self, media_type: MediaType):
"""
获取指定类型的重命名格式
:param media_type: MediaType.TV 或 MediaType.Movie
:return: 重命名格式
"""
rename_format = (
self.TV_RENAME_FORMAT
if media_type == MediaType.TV
else self.MOVIE_RENAME_FORMAT
)
# 规范重命名格式
rename_format = rename_format.replace("\\", "/")
rename_format = re.sub(r'/+', '/', rename_format)
return rename_format.strip("/")
# 实例化配置
settings = Settings()

View File

@@ -193,7 +193,7 @@ class MediaInfo:
# LOGO
logo_path: str = None
# 评分
vote_average: float = 0.0
vote_average: float = None
# 描述
overview: str = None
# 风格ID
@@ -237,9 +237,9 @@ class MediaInfo:
# 流媒体平台
networks: list = field(default_factory=list)
# 集数
number_of_episodes: int = 0
number_of_episodes: int = None
# 季数
number_of_seasons: int = 0
number_of_seasons: int = None
# 原产国
origin_country: list = field(default_factory=list)
# 原名
@@ -255,9 +255,9 @@ class MediaInfo:
# 标签
tagline: str = None
# 评价数量
vote_count: int = 0
vote_count: int = None
# 流行度
popularity: int = 0
popularity: int = None
# 时长
runtime: int = None
# 下一集
@@ -474,7 +474,16 @@ class MediaInfo:
self.names = info.get('names') or []
# 剩余属性赋值
for key, value in info.items():
if hasattr(self, key) and not getattr(self, key):
if not value:
continue
if not hasattr(self, key):
continue
current_value = getattr(self, key)
if current_value:
continue
if current_value is None:
setattr(self, key, value)
elif type(current_value) == type(value):
setattr(self, key, value)
def set_douban_info(self, info: dict):
@@ -606,7 +615,16 @@ class MediaInfo:
self.production_countries = [{"id": country, "name": country} for country in info.get("countries") or []]
# 剩余属性赋值
for key, value in info.items():
if not value:
continue
if not hasattr(self, key):
continue
current_value = getattr(self, key)
if current_value:
continue
if current_value is None:
setattr(self, key, value)
elif type(current_value) == type(value):
setattr(self, key, value)
def set_bangumi_info(self, info: dict):

View File

@@ -69,9 +69,6 @@ class EventManager(metaclass=Singleton):
EventManager 负责管理和调度广播事件和链式事件,包括订阅、发送和处理事件
"""
# 退出事件
__event = threading.Event()
def __init__(self):
self.__executor = ThreadHelper() # 动态线程池,用于消费事件
self.__consumer_threads = [] # 用于保存启动的事件消费者线程
@@ -81,6 +78,7 @@ class EventManager(metaclass=Singleton):
self.__disabled_handlers = set() # 禁用的事件处理器集合
self.__disabled_classes = set() # 禁用的事件处理器类集合
self.__lock = threading.Lock() # 线程锁
self.__event = threading.Event() # 退出事件
def start(self):
"""

View File

@@ -9,8 +9,6 @@ class CustomizationMatcher(metaclass=Singleton):
"""
识别自定义占位符
"""
customization = None
custom_separator = None
def __init__(self):
self.systemconfig = SystemConfigOper()

View File

@@ -9,7 +9,6 @@ class ReleaseGroupsMatcher(metaclass=Singleton):
"""
识别制作组、字幕组
"""
__release_groups: str = None
# 内置组
RELEASE_GROUPS: dict = {
"0ff": ['FF(?:(?:A|WE)B|CD|E(?:DU|B)|TV)'],

View File

@@ -16,14 +16,14 @@ class ModuleManager(metaclass=Singleton):
模块管理器
"""
# 模块列表
_modules: dict = {}
# 运行态模块列表
_running_modules: dict = {}
# 子模块类型集合
SubType = Union[DownloaderType, MediaServerType, MessageChannel, StorageSchema, OtherModulesType]
def __init__(self):
# 模块列表
self._modules: dict = {}
# 运行态模块列表
self._running_modules: dict = {}
self.load_modules()
def load_modules(self):

View File

@@ -10,6 +10,7 @@ from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import Any, Dict, List, Optional, Type, Union, Callable, Tuple
from app.helper.sites import SitesHelper
from fastapi import HTTPException
from starlette import status
from watchdog.events import FileSystemEventHandler
@@ -21,7 +22,6 @@ from app.core.event import eventmanager, Event
from app.db.plugindata_oper import PluginDataOper
from app.db.systemconfig_oper import SystemConfigOper
from app.helper.plugin import PluginHelper
from app.helper.sites import SitesHelper
from app.log import logger
from app.schemas.types import EventType, SystemConfigKey
from app.utils.crypto import RSAUtils
@@ -88,16 +88,15 @@ class PluginManager(metaclass=Singleton):
插件管理器
"""
# 插件列表
_plugins: dict = {}
# 运行态插件列表
_running_plugins: dict = {}
# 配置Key
_config_key: str = "plugin.%s"
# 监听器
_observer: Observer = None
def __init__(self):
# 插件列表
self._plugins: dict = {}
# 运行态插件列表
self._running_plugins: dict = {}
# 配置Key
self._config_key: str = "plugin.%s"
# 监听器
self._observer: Observer = None
# 开发者模式监测插件修改
if settings.DEV or settings.PLUGIN_AUTO_RELOAD:
self.__start_monitor()

View File

@@ -13,10 +13,9 @@ class WorkFlowManager(metaclass=Singleton):
工作流管理器
"""
# 所有动作定义
_actions: Dict[str, Any] = {}
def __init__(self):
# 所有动作定义
self._actions: Dict[str, Any] = {}
self.init()
def init(self):

View File

@@ -37,6 +37,11 @@ class Workflow(Base):
# 最后执行时间
last_time = Column(String)
@staticmethod
@db_query
def list(db):
return db.query(Workflow).all()
@staticmethod
@db_query
def get_enabled_workflows(db):

View File

@@ -25,6 +25,12 @@ class WorkflowOper(DbOper):
"""
return Workflow.get(self._db, wid)
def list(self) -> List[Workflow]:
"""
获取所有工作流列表
"""
return Workflow.list(self._db)
def list_enabled(self) -> List[Workflow]:
"""
获取启用的工作流列表

View File

@@ -1,12 +1,16 @@
import re
from pathlib import Path
from typing import List, Optional
from app import schemas
from app.core.context import MediaInfo
from app.db.systemconfig_oper import SystemConfigOper
from app.log import logger
from app.schemas.types import SystemConfigKey
from app.utils.system import SystemUtils
JINJA2_VAR_PATTERN = re.compile(r"\{\{.*?\}\}", re.DOTALL)
class DirectoryHelper:
"""
@@ -109,3 +113,42 @@ class DirectoryHelper:
return matched_dir
return matched_dirs[0]
return None
@staticmethod
def get_media_root_path(rename_format: str, rename_path: Path) -> Optional[Path]:
"""
获取重命名后的媒体文件根路径
:param rename_format: 重命名格式
:param rename_path: 重命名后的路径
:return: 媒体文件根路径
"""
if not rename_format:
logger.error("重命名格式不能为空")
return None
# 计算重命名中的文件夹层数
rename_list = rename_format.split("/")
rename_format_level = len(rename_list) - 1
# 查找标题参数所在层
for level, name in enumerate(rename_list):
matchs = JINJA2_VAR_PATTERN.findall(name)
if not matchs:
continue
# 处理特例,有的人重命名的第一层是年份、分辨率
if any("title" in m for m in matchs):
# 找出含标题的这一层作为媒体根目录
rename_format_level -= level
break
else:
# 假定第一层目录是媒体根目录
logger.warn(f"重命名格式 {rename_format} 缺少标题参数")
if rename_format_level > len(rename_path.parents):
# 通常因为路径以/结尾被Path规范化删除了
logger.error(f"路径 {rename_path} 不匹配重命名格式 {rename_format}")
return None
if rename_format_level <= 0:
# 所有媒体文件都存在一个目录内的特殊需求
rename_format_level = 1
# 媒体根路径
media_root = rename_path.parents[rename_format_level - 1]
return media_root

View File

@@ -8,7 +8,6 @@ import os
class DisplayHelper(metaclass=Singleton):
_display: Display = None
def __init__(self):
if not SystemUtils.is_docker():

View File

@@ -70,6 +70,9 @@ def enable_doh(enable: bool):
class DohHelper(metaclass=Singleton):
"""
DoH帮助类用于处理DNS over HTTPS解析。
"""
def __init__(self):
enable_doh(settings.DOH_ENABLE)

View File

@@ -1,457 +0,0 @@
import gc
import sys
import threading
import time
from datetime import datetime
from typing import Optional
import psutil
from pympler import muppy, summary, asizeof
from app.core.config import settings
from app.core.event import eventmanager, Event
from app.log import logger
from app.schemas import ConfigChangeEventData
from app.schemas.types import EventType
from app.utils.singleton import Singleton
class MemoryHelper(metaclass=Singleton):
"""
内存管理工具类,用于监控和优化内存使用
"""
def __init__(self):
# 检查间隔(秒) - 从配置获取默认5分钟
self._check_interval = settings.MEMORY_SNAPSHOT_INTERVAL * 60
self._monitoring = False
self._monitor_thread: Optional[threading.Thread] = None
# 内存快照保存目录
self._memory_snapshot_dir = settings.LOG_PATH / "memory_snapshots"
# 保留的快照文件数量
self._keep_count = settings.MEMORY_SNAPSHOT_KEEP_COUNT
@eventmanager.register(EventType.ConfigChanged)
def handle_config_changed(self, event: Event):
"""
处理配置变更事件,更新内存监控设置
:param event: 事件对象
"""
if not event:
return
event_data: ConfigChangeEventData = event.event_data
if event_data.key not in ['MEMORY_ANALYSIS', 'MEMORY_SNAPSHOT_INTERVAL', 'MEMORY_SNAPSHOT_KEEP_COUNT']:
return
# 更新配置
if event_data.key == 'MEMORY_SNAPSHOT_INTERVAL':
self._check_interval = settings.MEMORY_SNAPSHOT_INTERVAL * 60
elif event_data.key == 'MEMORY_SNAPSHOT_KEEP_COUNT':
self._keep_count = settings.MEMORY_SNAPSHOT_KEEP_COUNT
self.stop_monitoring()
self.start_monitoring()
def start_monitoring(self):
"""
开始内存监控
"""
if not settings.MEMORY_ANALYSIS:
return
if self._monitoring:
return
# 创建内存快照目录
self._memory_snapshot_dir.mkdir(parents=True, exist_ok=True)
# 初始化内存分析器
self._monitoring = True
self._monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True)
self._monitor_thread.start()
logger.info("内存监控已启动")
def stop_monitoring(self):
"""
停止内存监控
"""
self._monitoring = False
if self._monitor_thread:
self._monitor_thread.join(timeout=5)
logger.info("内存监控已停止")
def _monitor_loop(self):
"""
内存监控循环
"""
logger.info("内存监控循环开始")
while self._monitoring:
try:
# 生成内存快照
self._create_memory_snapshot()
time.sleep(self._check_interval)
except Exception as e:
logger.error(f"内存监控出错: {e}")
# 出错后等待1分钟再继续
time.sleep(60)
logger.info("内存监控循环结束")
def _create_memory_snapshot(self):
"""
创建内存快照并保存到文件
"""
try:
# 获取当前时间戳
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
snapshot_file = self._memory_snapshot_dir / f"memory_snapshot_{timestamp}.txt"
# 获取系统内存使用情况
memory_usage = psutil.Process().memory_info().rss
logger.info(f"开始创建内存快照: {snapshot_file}")
# 第一步:写入基本信息和对象类型统计
self._write_basic_info(snapshot_file, memory_usage)
# 第二步:分析并写入类实例内存使用情况
self._append_class_analysis(snapshot_file)
# 第三步:分析并写入大内存变量详情
self._append_variable_analysis(snapshot_file)
logger.info(f"内存快照已保存: {snapshot_file}, 当前内存使用: {memory_usage / 1024 / 1024:.2f} MB")
# 清理过期的快照文件保留最近30个
self._cleanup_old_snapshots()
except Exception as e:
logger.error(f"创建内存快照失败: {e}")
@staticmethod
def _write_basic_info(snapshot_file, memory_usage):
"""
写入基本信息和对象类型统计
"""
# 获取当前进程的内存使用情况
all_objects = muppy.get_objects()
sum1 = summary.summarize(all_objects)
with open(snapshot_file, 'w', encoding='utf-8') as f:
f.write(f"内存快照时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"当前进程内存使用: {memory_usage / 1024 / 1024:.2f} MB\n")
f.write("=" * 80 + "\n")
f.write("对象类型统计:\n")
f.write("-" * 80 + "\n")
# 写入对象统计信息
for line in summary.format_(sum1):
f.write(line + "\n")
# 立即刷新到磁盘
f.flush()
logger.debug("基本信息已写入快照文件")
def _append_class_analysis(self, snapshot_file):
"""
分析并追加类实例内存使用情况
"""
with open(snapshot_file, 'a', encoding='utf-8') as f:
f.write("\n" + "=" * 80 + "\n")
f.write("类实例内存使用情况 (按内存大小排序):\n")
f.write("-" * 80 + "\n")
f.write("正在分析中...\n")
# 立即刷新,让用户知道这部分开始了
f.flush()
try:
logger.debug("开始分析类实例内存使用情况")
class_objects = self._get_class_memory_usage()
# 重新打开文件,移除"正在分析中..."并写入实际结果
with open(snapshot_file, 'r', encoding='utf-8') as f:
content = f.read()
# 替换"正在分析中..."
content = content.replace("正在分析中...\n", "")
with open(snapshot_file, 'w', encoding='utf-8') as f:
f.write(content)
if class_objects:
# 只显示前100个类
for i, class_info in enumerate(class_objects[:100], 1):
f.write(f"{i:3d}. {class_info['name']:<50} "
f"{class_info['size_mb']:>8.2f} MB ({class_info['count']} 个实例)\n")
else:
f.write("未找到有效的类实例信息\n")
f.flush()
except Exception as e:
logger.error(f"获取类实例信息失败: {e}")
# 即使出错也要更新文件
with open(snapshot_file, 'r', encoding='utf-8') as f:
content = f.read()
content = content.replace("正在分析中...\n", f"获取类实例信息失败: {e}\n")
with open(snapshot_file, 'w', encoding='utf-8') as f:
f.write(content)
f.flush()
logger.debug("类实例分析已完成并写入")
def _append_variable_analysis(self, snapshot_file):
"""
分析并追加大内存变量详情
"""
with open(snapshot_file, 'a', encoding='utf-8') as f:
f.write("\n" + "=" * 80 + "\n")
f.write("大内存变量详情 (前100个):\n")
f.write("-" * 80 + "\n")
f.write("正在分析中...\n")
# 立即刷新,让用户知道这部分开始了
f.flush()
try:
logger.debug("开始分析大内存变量")
large_variables = self._get_large_variables(100)
# 重新打开文件,移除"正在分析中..."并写入实际结果
with open(snapshot_file, 'r', encoding='utf-8') as f:
content = f.read()
# 替换最后的"正在分析中..."
content = content.replace("正在分析中...\n", "")
with open(snapshot_file, 'w', encoding='utf-8') as f:
f.write(content)
if large_variables:
for i, var_info in enumerate(large_variables, 1):
f.write(
f"{i:3d}. {var_info['name']:<30} {var_info['type']:<15} {var_info['size_mb']:>8.2f} MB\n")
else:
f.write("未找到大内存变量\n")
f.flush()
except Exception as e:
logger.error(f"获取大内存变量信息失败: {e}")
# 即使出错也要更新文件
with open(snapshot_file, 'r', encoding='utf-8') as f:
content = f.read()
content = content.replace("正在分析中...\n", f"获取变量信息失败: {e}\n")
with open(snapshot_file, 'w', encoding='utf-8') as f:
f.write(content)
f.flush()
logger.debug("大内存变量分析已完成并写入")
def _cleanup_old_snapshots(self):
"""
清理过期的内存快照文件,只保留最近的指定数量文件
"""
try:
snapshot_files = list(self._memory_snapshot_dir.glob("memory_snapshot_*.txt"))
if len(snapshot_files) > self._keep_count:
# 按修改时间排序,删除最旧的文件
snapshot_files.sort(key=lambda x: x.stat().st_mtime)
for old_file in snapshot_files[:-self._keep_count]:
old_file.unlink()
logger.debug(f"已删除过期内存快照: {old_file}")
except Exception as e:
logger.error(f"清理过期快照失败: {e}")
@staticmethod
def _get_class_memory_usage():
"""
获取所有类实例的内存使用情况,按内存大小排序
"""
class_info = {}
processed_count = 0
error_count = 0
# 获取所有对象
all_objects = muppy.get_objects()
logger.debug(f"开始分析 {len(all_objects)} 个对象的类实例内存使用情况")
for obj in all_objects:
try:
# 跳过类对象本身,统计类的实例
if isinstance(obj, type):
continue
# 获取对象的类名 - 这里可能会出错
obj_class = type(obj)
# 安全地获取类名
try:
if hasattr(obj_class, '__module__') and hasattr(obj_class, '__name__'):
class_name = f"{obj_class.__module__}.{obj_class.__name__}"
else:
class_name = str(obj_class)
except Exception as e:
# 如果获取类名失败,使用简单的类型描述
class_name = f"<unknown_class_{id(obj_class)}>"
logger.debug(f"获取类名失败: {e}")
# 计算对象本身的内存使用(不包括引用对象,避免重复计算)
size_bytes = sys.getsizeof(obj)
if size_bytes < 100: # 跳过太小的对象
continue
size_mb = size_bytes / 1024 / 1024
processed_count += 1
if class_name in class_info:
class_info[class_name]['size_mb'] += size_mb
class_info[class_name]['count'] += 1
else:
class_info[class_name] = {
'name': class_name,
'size_mb': size_mb,
'count': 1
}
except Exception as e:
# 捕获所有可能的异常包括SQLAlchemy、ORM等框架的异常
error_count += 1
if error_count <= 5: # 只记录前5个错误避免日志过多
logger.debug(f"分析对象时出错: {e}")
continue
logger.debug(f"类实例分析完成: 处理了 {processed_count} 个对象, 遇到 {error_count} 个错误")
# 按内存大小排序
sorted_classes = sorted(class_info.values(), key=lambda x: x['size_mb'], reverse=True)
return sorted_classes
def _get_large_variables(self, limit=100):
"""
获取大内存变量信息,按内存大小排序
使用已计算对象集合避免重复计算
"""
large_vars = []
processed_count = 0
calculated_objects = set() # 避免重复计算
# 获取所有对象
all_objects = muppy.get_objects()
logger.debug(f"开始分析 {len(all_objects)} 个对象的内存使用情况")
for obj in all_objects:
# 跳过类对象
if isinstance(obj, type):
continue
# 跳过已经计算过的对象
obj_id = id(obj)
if obj_id in calculated_objects:
continue
try:
# 首先使用 sys.getsizeof 快速筛选
shallow_size = sys.getsizeof(obj)
if shallow_size < 1024: # 只处理大于1KB的对象
continue
# 对于较大的对象,使用 asizeof 进行深度计算
size_bytes = asizeof.asizeof(obj)
# 只处理大于10KB的对象提高分析效率
if size_bytes < 10240:
continue
size_mb = size_bytes / 1024 / 1024
processed_count += 1
calculated_objects.add(obj_id)
# 获取对象信息
var_info = self._get_variable_info(obj, size_mb)
if var_info:
large_vars.append(var_info)
# 如果已经找到足够多的大对象,可以提前结束
if len(large_vars) >= limit * 2: # 多收集一些,后面排序筛选
break
except Exception as e:
# 更广泛的异常捕获
logger.debug(f"分析对象失败: {e}")
continue
logger.debug(f"处理了 {processed_count} 个大对象,找到 {len(large_vars)} 个有效变量")
# 按内存大小排序并返回前N个
large_vars.sort(key=lambda x: x['size_mb'], reverse=True)
return large_vars[:limit]
def _get_variable_info(self, obj, size_mb):
"""
获取变量的描述信息
"""
try:
obj_type = type(obj).__name__
# 尝试获取变量名
var_name = self._get_variable_name(obj)
# 生成描述性信息
if isinstance(obj, dict):
key_count = len(obj)
if key_count > 0:
sample_keys = list(obj.keys())[:3]
var_name += f" ({key_count}项, 键: {sample_keys})"
elif isinstance(obj, (list, tuple, set)):
var_name += f" ({len(obj)}个元素)"
elif isinstance(obj, str):
if len(obj) > 50:
var_name += f" (长度: {len(obj)}, 内容: '{obj[:50]}...')"
else:
var_name += f" ('{obj}')"
elif hasattr(obj, '__class__') and hasattr(obj.__class__, '__name__'):
if hasattr(obj, '__dict__'):
attr_count = len(obj.__dict__)
var_name += f" ({attr_count}个属性)"
return {
'name': var_name,
'type': obj_type,
'size_mb': size_mb
}
except Exception as e:
logger.debug(f"获取变量信息失败: {e}")
return None
@staticmethod
def _get_variable_name(obj):
"""
尝试获取变量名
"""
try:
# 尝试通过gc获取引用该对象的变量名
referrers = gc.get_referrers(obj)
for referrer in referrers:
if isinstance(referrer, dict):
# 检查是否在某个模块的全局变量中
for name, value in referrer.items():
if value is obj and isinstance(name, str):
return name
elif hasattr(referrer, '__dict__'):
# 检查是否在某个实例的属性中
for name, value in referrer.__dict__.items():
if value is obj and isinstance(name, str):
return f"{type(referrer).__name__}.{name}"
# 如果找不到变量名返回对象类型和id
return f"{type(obj).__name__}_{id(obj)}"
except Exception as e:
logger.debug(f"获取变量名失败: {e}")
return f"{type(obj).__name__}_{id(obj)}"

View File

@@ -541,8 +541,6 @@ class MessageQueueManager(metaclass=SingletonClass):
消息发送队列管理器
"""
schedule_periods: List[tuple[int, int, int, int]] = []
def __init__(
self,
send_callback: Optional[Callable] = None,
@@ -554,6 +552,8 @@ class MessageQueueManager(metaclass=SingletonClass):
:param send_callback: 实际发送消息的回调函数
:param check_interval: 时间检查间隔(秒)
"""
self.schedule_periods: List[tuple[int, int, int, int]] = []
self.init_config()
self.queue: queue.Queue[Any] = queue.Queue()

View File

@@ -18,14 +18,14 @@ from app.db.systemconfig_oper import SystemConfigOper
from app.log import logger
from app.schemas.types import SystemConfigKey
from app.utils.http import RequestUtils
from app.utils.singleton import Singleton
from app.utils.singleton import WeakSingleton
from app.utils.system import SystemUtils
from app.utils.url import UrlUtils
PLUGIN_DIR = Path(settings.ROOT_PATH) / "app" / "plugins"
class PluginHelper(metaclass=Singleton):
class PluginHelper(metaclass=WeakSingleton):
"""
插件市场管理,下载安装插件到本地
"""
@@ -649,10 +649,20 @@ class PluginHelper(metaclass=Singleton):
"""
dependencies = {}
try:
install_plugins = {
plugin_id.lower() # 对应插件的小写目录名
for plugin_id in SystemConfigOper().get(
SystemConfigKey.UserInstalledPlugins
) or []
}
for plugin_dir in PLUGIN_DIR.iterdir():
if plugin_dir.is_dir():
requirements_file = plugin_dir / "requirements.txt"
if requirements_file.exists():
if plugin_dir.name not in install_plugins:
# 这个插件不在安装列表中 忽略它的依赖
logger.debug(f"忽略插件 {plugin_dir.name} 的依赖")
continue
# 解析当前插件的 requirements.txt获取依赖项
plugin_deps = self.__parse_requirements(requirements_file)
for pkg_name, version_specifiers in plugin_deps.items():

View File

@@ -1,12 +1,11 @@
from enum import Enum
from typing import Union, Dict, Optional
from typing import Union, Optional
from app.schemas.types import ProgressKey
from app.utils.singleton import Singleton
from app.utils.singleton import WeakSingleton
class ProgressHelper(metaclass=Singleton):
_process_detail: Dict[str, dict] = {}
class ProgressHelper(metaclass=WeakSingleton):
def __init__(self):
self._process_detail = {}

View File

@@ -8,11 +8,11 @@ from app.db.systemconfig_oper import SystemConfigOper
from app.log import logger
from app.schemas.types import SystemConfigKey
from app.utils.http import RequestUtils
from app.utils.singleton import Singleton
from app.utils.singleton import WeakSingleton
from app.utils.system import SystemUtils
class SubscribeHelper(metaclass=Singleton):
class SubscribeHelper(metaclass=WeakSingleton):
"""
订阅数据统计/订阅分享等
"""
@@ -29,6 +29,8 @@ class SubscribeHelper(metaclass=Singleton):
_sub_shares = f"{settings.MP_SERVER_HOST}/subscribe/shares"
_sub_share_statistic = f"{settings.MP_SERVER_HOST}/subscribe/share/statistics"
_sub_fork = f"{settings.MP_SERVER_HOST}/subscribe/fork/%s"
_shares_cache_region = "subscribe_share"
@@ -215,6 +217,18 @@ class SubscribeHelper(metaclass=Singleton):
return res.json()
return []
@cached(maxsize=1, ttl=1800)
def get_share_statistics(self) -> List[dict]:
"""
获取订阅分享统计数据
"""
if not settings.SUBSCRIBE_STATISTIC_SHARE:
return []
res = RequestUtils(proxies=settings.PROXY, timeout=15).get_res(self._sub_share_statistic)
if res and res.status_code == 200:
return res.json()
return []
def get_user_uuid(self) -> str:
"""
获取用户uuid

View File

@@ -1,10 +1,9 @@
import datetime
import re
from pathlib import Path
from typing import Tuple, Optional, List, Union, Dict
from typing import Tuple, Optional, List, Union, Dict, Any
from urllib.parse import unquote
from requests import Response
from torrentool.api import Torrent
from app.core.config import settings
@@ -16,17 +15,17 @@ from app.db.systemconfig_oper import SystemConfigOper
from app.log import logger
from app.schemas.types import MediaType, SystemConfigKey
from app.utils.http import RequestUtils
from app.utils.singleton import Singleton
from app.utils.singleton import WeakSingleton
from app.utils.string import StringUtils
class TorrentHelper(metaclass=Singleton):
class TorrentHelper(metaclass=WeakSingleton):
"""
种子帮助类
"""
# 失败的种子:站点链接
_invalid_torrents = []
def __init__(self):
self._invalid_torrents = []
def download_torrent(self, url: str,
cookie: Optional[str] = None,
@@ -40,6 +39,22 @@ class TorrentHelper(metaclass=Singleton):
"""
if url.startswith("magnet:"):
return None, url, "", [], f"磁力链接"
# 构建 torrent 种子文件的存储路径
file_path = (Path(settings.TEMP_PATH) / StringUtils.md5_hash(url)).with_suffix(".torrent")
if file_path.exists():
try:
# 获取种子目录和文件清单
folder_name, file_list = self.get_torrent_info(file_path)
# 无法获取信息,则认为缓存文件无效
if not folder_name and not file_list:
raise ValueError("无效的缓存种子文件")
# 获取种子数据
content = file_path.read_bytes()
# 成功拿到种子数据
return file_path, content, folder_name, file_list, ""
except Exception as err:
logger.error(f"处理缓存的种子文件 {file_path} 时出错: {err},将重新下载")
file_path.unlink(missing_ok=True)
# 请求种子文件
req = RequestUtils(
ua=ua,
@@ -106,10 +121,6 @@ class TorrentHelper(metaclass=Singleton):
if req.content:
# 检查是不是种子文件,如果不是仍然抛出异常
try:
# 读取种子文件名
file_name = self.get_url_filename(req, url)
# 种子文件路径
file_path = Path(settings.TEMP_PATH) / file_name
# 保存到文件
file_path.write_bytes(req.content)
# 获取种子目录和文件清单
@@ -170,7 +181,7 @@ class TorrentHelper(metaclass=Singleton):
return "", []
@staticmethod
def get_url_filename(req: Response, url: str) -> str:
def get_url_filename(req: Any, url: str) -> str:
"""
从下载请求中获取种子文件名
"""
@@ -308,7 +319,7 @@ class TorrentHelper(metaclass=Singleton):
self._invalid_torrents.append(url)
@staticmethod
def match_torrent(mediainfo: MediaInfo, torrent_meta: MetaInfo, torrent: TorrentInfo) -> bool:
def match_torrent(mediainfo: MediaInfo, torrent_meta: MetaBase, torrent: TorrentInfo) -> bool:
"""
检查种子是否匹配媒体信息
:param mediainfo: 需要匹配的媒体信息

132
app/helper/workflow.py Normal file
View File

@@ -0,0 +1,132 @@
import json
from typing import List, Tuple, Optional
from app.core.cache import cached, cache_backend
from app.core.config import settings
from app.db.workflow_oper import WorkflowOper
from app.log import logger
from app.utils.http import RequestUtils
from app.utils.singleton import WeakSingleton
from app.utils.system import SystemUtils
class WorkflowHelper(metaclass=WeakSingleton):
"""
工作流分享等
"""
_workflow_share = f"{settings.MP_SERVER_HOST}/workflow/share"
_workflow_shares = f"{settings.MP_SERVER_HOST}/workflow/shares"
_workflow_fork = f"{settings.MP_SERVER_HOST}/workflow/fork/%s"
_shares_cache_region = "workflow_share"
_share_user_id = None
def __init__(self):
self.get_user_uuid()
def workflow_share(self, workflow_id: int,
share_title: str, share_comment: str, share_user: str) -> Tuple[bool, str]:
"""
分享工作流
"""
if not settings.WORKFLOW_STATISTIC_SHARE: # 使用独立的工作流分享开关
return False, "当前没有开启工作流数据共享功能"
# 获取工作流信息
workflow = WorkflowOper().get(workflow_id)
if not workflow:
return False, "工作流不存在"
if not workflow.actions or not workflow.flows:
return False, "请分享有动作和流程的工作流"
workflow_dict = workflow.to_dict()
workflow_dict.pop("id", None)
workflow_dict.pop("context", None)
workflow_dict['actions'] = json.dumps(workflow_dict['actions'] or [])
workflow_dict['flows'] = json.dumps(workflow_dict['flows'] or [])
# 发送分享请求
res = RequestUtils(proxies=settings.PROXY or {}, content_type="application/json",
timeout=10).post(self._workflow_share,
json={
"share_title": share_title,
"share_comment": share_comment,
"share_user": share_user,
"share_uid": self._share_user_id,
**workflow_dict
})
if res is None:
return False, "连接MoviePilot服务器失败"
if res.ok:
# 清除 get_shares 的缓存,以便实时看到结果
cache_backend.clear(region=self._shares_cache_region)
return True, ""
else:
return False, res.json().get("message")
def share_delete(self, share_id: int) -> Tuple[bool, str]:
"""
删除分享
"""
if not settings.WORKFLOW_STATISTIC_SHARE: # 使用独立的工作流分享开关
return False, "当前没有开启工作流数据共享功能"
res = RequestUtils(proxies=settings.PROXY or {},
timeout=5).delete_res(f"{self._workflow_share}/{share_id}",
params={"share_uid": self._share_user_id})
if res is None:
return False, "连接MoviePilot服务器失败"
if res.ok:
# 清除 get_shares 的缓存,以便实时看到结果
cache_backend.clear(region=self._shares_cache_region)
return True, ""
else:
return False, res.json().get("message")
def workflow_fork(self, share_id: int) -> Tuple[bool, str]:
"""
复用分享的工作流
"""
if not settings.WORKFLOW_STATISTIC_SHARE: # 使用独立的工作流分享开关
return False, "当前没有开启工作流数据共享功能"
res = RequestUtils(proxies=settings.PROXY or {}, timeout=5, headers={
"Content-Type": "application/json"
}).get_res(self._workflow_fork % share_id)
if res is None:
return False, "连接MoviePilot服务器失败"
if res.ok:
return True, ""
else:
return False, res.json().get("message")
@cached(region=_shares_cache_region)
def get_shares(self, name: Optional[str] = None, page: Optional[int] = 1, count: Optional[int] = 30) -> List[dict]:
"""
获取工作流分享数据
"""
if not settings.WORKFLOW_STATISTIC_SHARE: # 使用独立的工作流分享开关
return []
res = RequestUtils(proxies=settings.PROXY or {}, timeout=15).get_res(self._workflow_shares, params={
"name": name,
"page": page,
"count": count
})
if res and res.status_code == 200:
return res.json()
return []
def get_user_uuid(self) -> str:
"""
获取用户uuid
"""
if not self._share_user_id:
self._share_user_id = SystemUtils.generate_user_unique_id()
logger.info(f"当前用户UUID: {self._share_user_id}")
return self._share_user_id or ""

View File

@@ -12,10 +12,10 @@ import requests
from app.core.cache import cached
from app.core.config import settings
from app.utils.http import RequestUtils
from app.utils.singleton import Singleton
from app.utils.singleton import WeakSingleton
class DoubanApi(metaclass=Singleton):
class DoubanApi(metaclass=WeakSingleton):
_urls = {
# 搜索类
# sort=U:近期热门 T:标记最多 S:评分最高 R:最新上映
@@ -151,7 +151,6 @@ class DoubanApi(metaclass=Singleton):
_api_key2 = "0ab215a8b1977939201640fa14c66bab"
_base_url = "https://frodo.douban.com/api/v2"
_api_url = "https://api.douban.com/v2"
_session = None
def __init__(self):
self._session = requests.Session()

View File

@@ -10,7 +10,7 @@ from app.core.config import settings
from app.core.meta import MetaBase
from app.core.metainfo import MetaInfo
from app.log import logger
from app.utils.singleton import Singleton
from app.utils.singleton import WeakSingleton
from app.schemas.types import MediaType
lock = RLock()
@@ -19,7 +19,7 @@ CACHE_EXPIRE_TIMESTAMP_STR = "cache_expire_timestamp"
EXPIRE_TIMESTAMP = settings.CONF.meta
class DoubanCache(metaclass=Singleton):
class DoubanCache(metaclass=WeakSingleton):
"""
豆瓣缓存数据
{
@@ -29,9 +29,6 @@ class DoubanCache(metaclass=Singleton):
"type": MediaType
}
"""
_meta_data: dict = {}
# 缓存文件路径
_meta_path: Path = None
# TMDB缓存过期
_tmdb_cache_expire: bool = True
@@ -233,3 +230,6 @@ class DoubanCache(metaclass=Singleton):
if not cache_media_info:
return
self._meta_data[key]['title'] = cn_title
def __del__(self):
self.save()

View File

@@ -166,7 +166,8 @@ class Emby:
type=library_type,
image=image,
link=f'{self._playhost or self._host}web/index.html'
f'#!/videos?serverId={self.serverid}&parentId={library.get("Id")}'
f'#!/videos?serverId={self.serverid}&parentId={library.get("Id")}',
server_type= "emby"
)
)
return libraries
@@ -1167,7 +1168,8 @@ class Emby:
type=item_type,
image=image,
link=link,
percent=item.get("UserData", {}).get("PlayedPercentage")
percent=item.get("UserData", {}).get("PlayedPercentage"),
server_type='emby'
))
return ret_resume
else:
@@ -1219,7 +1221,8 @@ class Emby:
type=item_type,
image=image,
link=link,
BackdropImageTags=item.get("BackdropImageTags")
BackdropImageTags=item.get("BackdropImageTags"),
server_type='emby'
))
return ret_latest
else:

View File

@@ -1,6 +1,7 @@
from pathlib import Path
from typing import Optional, List, Tuple, Union, Dict, Callable
from app.chain.tmdb import TmdbChain
from app.core.config import settings
from app.core.context import MediaInfo
from app.core.meta import MetaBase
@@ -139,13 +140,29 @@ class FileManagerModule(_ModuleBase):
"""
handler = TransHandler()
# 重命名格式
rename_format = settings.TV_RENAME_FORMAT \
if mediainfo.type == MediaType.TV else settings.MOVIE_RENAME_FORMAT
rename_format = settings.RENAME_FORMAT(mediainfo.type)
# 获取集信息
episodes_info: Optional[List[TmdbEpisode]] = None
if mediainfo.type == MediaType.TV:
# 判断注意season为0的情况
season_num = mediainfo.season
if season_num is None and meta.season_seq:
if meta.season_seq.isdigit():
season_num = int(meta.season_seq)
# 默认值1
if season_num is None:
season_num = 1
episodes_info = TmdbChain().tmdb_episodes(
tmdbid=mediainfo.tmdb_id,
season=season_num,
episode_group=mediainfo.episode_group,
)
# 获取重命名后的名称
path = handler.get_rename_path(
template_string=rename_format,
rename_dict=handler.get_naming_dict(meta=meta,
mediainfo=mediainfo,
episodes_info=episodes_info,
file_ext=Path(meta.title).suffix)
)
return str(path)
@@ -411,6 +428,12 @@ class FileManagerModule(_ModuleBase):
message=f"{target_path} 不是有效目录")
# 获取目标路径
if target_directory:
# 目标媒体库目录未设置
if not target_directory.library_path:
logger.error(f"目标媒体库目录未设置,无法整理文件,源路径:{fileitem.path}")
return TransferInfo(success=False,
fileitem=fileitem,
message="目标媒体库目录未设置")
# 整理方式
if not transfer_type:
transfer_type = target_directory.transfer_type
@@ -510,8 +533,7 @@ class FileManagerModule(_ModuleBase):
# 媒体分类路径
dir_path = handler.get_dest_dir(mediainfo=mediainfo, target_dir=dest_dir)
# 重命名格式
rename_format = settings.TV_RENAME_FORMAT \
if mediainfo.type == MediaType.TV else settings.MOVIE_RENAME_FORMAT
rename_format = settings.RENAME_FORMAT(mediainfo.type)
# 元数据补上常用属性,尽可能确保重命名后的路径不出现空白
meta = MetaInfo(mediainfo.title)
if meta.type == MediaType.UNKNOWN and mediainfo.type is not None:
@@ -529,18 +551,14 @@ class FileManagerModule(_ModuleBase):
rename_dict=handler.get_naming_dict(meta=meta,
mediainfo=mediainfo)
)
# 计算重命名中的文件夹层数
rename_list = rename_format.split("/")
rename_format_level = len(rename_list) - 1
for level, name in enumerate(rename_list):
# 处理特例,有的人重命名第一层是年份、分辨率
if "{{title}}" in name:
# 找出含标题的这一层作为扫描路径
rename_format_level -= level
break
# 取相对路径的第1层目录
media_path = target_path.parents[rename_format_level - 1]
if dir_path.is_relative_to(media_path):
# 获取重命名后的媒体文件根路径
media_path = DirectoryHelper.get_media_root_path(
rename_format, rename_path=target_path
)
if not media_path:
# 忽略
continue
if dir_path != media_path and dir_path.is_relative_to(media_path):
# 兜底检查,避免不必要的扫盘
logger.warn(f"{media_path} 是媒体库目录 {dir_path} 的父目录,忽略获取媒体文件列表,请检查重命名格式!")
continue

View File

@@ -15,7 +15,7 @@ from app.core.config import settings
from app.log import logger
from app.modules.filemanager import StorageBase
from app.schemas.types import StorageSchema
from app.utils.singleton import Singleton
from app.utils.singleton import WeakSingleton
from app.utils.string import StringUtils
lock = threading.Lock()
@@ -29,7 +29,7 @@ class SessionInvalidException(Exception):
pass
class AliPan(StorageBase, metaclass=Singleton):
class AliPan(StorageBase, metaclass=WeakSingleton):
"""
阿里云盘相关操作
"""
@@ -43,17 +43,12 @@ class AliPan(StorageBase, metaclass=Singleton):
"copy": "复制"
}
# 验证参数
_auth_state = {}
# 上传进度值
_last_progress = 0
# 基础url
base_url = "https://openapi.alipan.com"
def __init__(self):
super().__init__()
self._auth_state = {}
self.session = requests.Session()
self._init_session()
@@ -244,6 +239,7 @@ class AliPan(StorageBase, metaclass=Singleton):
conf = self.get_conf()
conf.update(result)
self.set_config(conf)
return None
def _request_api(self, method: str, endpoint: str,
result_key: Optional[str] = None, **kwargs) -> Optional[Union[dict, list]]:
@@ -369,7 +365,7 @@ class AliPan(StorageBase, metaclass=Singleton):
break
next_marker = resp.get("next_marker")
for item in resp.get("items", []):
items.append(self.__get_fileitem(item, parent=fileitem.path))
items.append(self.__get_fileitem(item, parent=str(fileitem.path)))
if len(resp.get("items")) < 100:
break
return items

View File

@@ -12,11 +12,11 @@ from app.log import logger
from app.modules.filemanager.storages import StorageBase
from app.schemas.types import StorageSchema
from app.utils.http import RequestUtils
from app.utils.singleton import Singleton
from app.utils.singleton import WeakSingleton
from app.utils.url import UrlUtils
class Alist(StorageBase, metaclass=Singleton):
class Alist(StorageBase, metaclass=WeakSingleton):
"""
Alist相关操作
api文档https://oplist.org/zh/
@@ -38,7 +38,7 @@ class Alist(StorageBase, metaclass=Singleton):
"""
初始化
"""
self.__generate_token.clear_cache()
self.__generate_token.clear_cache() # noqa
@property
def __get_base_url(self) -> str:
@@ -376,10 +376,46 @@ class Alist(StorageBase, metaclass=Singleton):
"""
return self.get_folder(Path(fileitem.path).parent)
def __is_empty_dir(self, fileitem: schemas.FileItem) -> bool:
"""
判断目录是否为空
"""
if fileitem.type != "dir":
return False
# 获取目录内容
items = self.list(fileitem)
return len(items) == 0
def delete(self, fileitem: schemas.FileItem) -> bool:
"""
删除文件
删除文件或目录空目录用专用API
"""
# 如果是空目录,优先用 remove_empty_directory
if fileitem.type == "dir" and self.__is_empty_dir(fileitem):
resp = RequestUtils(
headers=self.__get_header_with_token()
).post_res(
self.__get_api_url("/api/fs/remove_empty_directory"),
json={
"src_dir": fileitem.path,
},
)
if resp is None:
logger.warn(f"【OpenList】请求删除空目录 {fileitem.path} 失败无法连接alist服务")
return False
if resp.status_code != 200:
logger.warn(
f"【OpenList】请求删除空目录 {fileitem.path} 失败,状态码:{resp.status_code}"
)
return False
result = resp.json()
if result["code"] != 200:
logger.warn(
f'【OpenList】删除空目录 {fileitem.path} 失败,错误信息:{result["message"]}'
)
return False
return True
# 其它情况(文件或非空目录)
resp = RequestUtils(
headers=self.__get_header_with_token()
).post_res(
@@ -389,20 +425,6 @@ class Alist(StorageBase, metaclass=Singleton):
"names": [fileitem.name],
},
)
"""
{
"names": [
"string"
],
"dir": "string"
}
======================================
{
"code": 200,
"message": "success",
"data": null
}
"""
if resp is None:
logger.warn(f"【OpenList】请求删除文件 {fileitem.path} 失败无法连接alist服务")
return False
@@ -411,7 +433,6 @@ class Alist(StorageBase, metaclass=Singleton):
f"【OpenList】请求删除文件 {fileitem.path} 失败,状态码:{resp.status_code}"
)
return False
result = resp.json()
if result["code"] != 200:
logger.warn(

View File

@@ -12,17 +12,19 @@ from app.core.config import settings
from app.log import logger
from app.modules.filemanager import StorageBase
from app.schemas.types import StorageSchema
from app.utils.singleton import Singleton
from app.utils.singleton import WeakSingleton
lock = threading.Lock()
class SMBConnectionError(Exception):
"""SMB 连接错误"""
"""
SMB 连接错误
"""
pass
class SMB(StorageBase, metaclass=Singleton):
class SMB(StorageBase, metaclass=WeakSingleton):
"""
SMB网络挂载存储相关操作 - 使用 smbclient 高级接口
"""

View File

@@ -18,7 +18,7 @@ from app.core.config import settings
from app.log import logger
from app.modules.filemanager import StorageBase
from app.schemas.types import StorageSchema
from app.utils.singleton import Singleton
from app.utils.singleton import WeakSingleton
from app.utils.string import StringUtils
lock = threading.Lock()
@@ -28,7 +28,7 @@ class NoCheckInException(Exception):
pass
class U115Pan(StorageBase, metaclass=Singleton):
class U115Pan(StorageBase, metaclass=WeakSingleton):
"""
115相关操作
"""
@@ -41,18 +41,12 @@ class U115Pan(StorageBase, metaclass=Singleton):
"move": "移动",
"copy": "复制"
}
# 验证参数
_auth_state = {}
# 上传进度值
_last_progress = 0
# 基础url
base_url = "https://proapi.115.com"
def __init__(self):
super().__init__()
self._auth_state = {}
self.session = requests.Session()
self._init_session()
@@ -492,7 +486,8 @@ class U115Pan(StorageBase, metaclass=Singleton):
type="file" if info_resp["file_category"] == "1" else "dir",
name=info_resp["file_name"],
basename=Path(info_resp["file_name"]).stem,
extension=Path(info_resp["file_name"]).suffix[1:] if info_resp["file_category"] == "1" else None,
extension=Path(info_resp["file_name"]).suffix[1:] if info_resp[
"file_category"] == "1" else None,
pickcode=info_resp["pick_code"],
size=StringUtils.num_filesize(info_resp['size']) if info_resp["file_category"] == "1" else None,
modify_time=info_resp["utime"]

View File

@@ -10,6 +10,7 @@ from app.core.context import MediaInfo
from app.core.event import eventmanager
from app.core.meta import MetaBase
from app.core.metainfo import MetaInfoPath
from app.helper.directory import DirectoryHelper
from app.helper.message import TemplateHelper
from app.log import logger
from app.modules.filemanager.storages import StorageBase
@@ -26,11 +27,10 @@ class TransHandler:
文件转移整理类
"""
result: Optional[TransferInfo] = None
inner_lock: Lock = Lock()
def __init__(self):
self.__reset_result()
self.result = None
def __reset_result(self):
"""
@@ -103,185 +103,211 @@ class TransHandler:
# 重置结果
self.__reset_result()
# 重命名格式
rename_format = settings.TV_RENAME_FORMAT \
if mediainfo.type == MediaType.TV else settings.MOVIE_RENAME_FORMAT
try:
# 判断是否为文件夹
if fileitem.type == "dir":
# 整理整个目录,一般为蓝光原盘
if need_rename:
new_path = self.get_rename_path(
path=target_path,
template_string=rename_format,
rename_dict=self.get_naming_dict(meta=in_meta,
mediainfo=mediainfo)
).parent
else:
new_path = target_path / fileitem.name
# 整理目录
new_diritem, errmsg = self.__transfer_dir(fileitem=fileitem,
mediainfo=mediainfo,
source_oper=source_oper,
target_oper=target_oper,
target_storage=target_storage,
target_path=new_path,
transfer_type=transfer_type)
if not new_diritem:
logger.error(f"文件夹 {fileitem.path} 整理失败:{errmsg}")
self.__set_result(success=False,
message=errmsg,
fileitem=fileitem,
transfer_type=transfer_type,
need_notify=need_notify)
return self.result
# 重命名格式
rename_format = settings.RENAME_FORMAT(mediainfo.type)
logger.info(f"文件夹 {fileitem.path} 整理成功")
# 计算目录下所有文件大小
total_size = sum(file.stat().st_size for file in Path(fileitem.path).rglob('*') if file.is_file())
# 返回整理后的路径
self.__set_result(success=True,
fileitem=fileitem,
target_item=new_diritem,
target_diritem=new_diritem,
total_size=total_size,
need_scrape=need_scrape,
need_notify=need_notify,
transfer_type=transfer_type)
return self.result
else:
# 整理单个文件
if mediainfo.type == MediaType.TV:
# 电视剧
if in_meta.begin_episode is None:
logger.warn(f"文件 {fileitem.path} 整理失败:未识别到文件集数")
# 判断是否为文件夹
if fileitem.type == "dir":
# 整理整个目录,一般为蓝光原盘
if need_rename:
new_path = self.get_rename_path(
path=target_path,
template_string=rename_format,
rename_dict=self.get_naming_dict(meta=in_meta,
mediainfo=mediainfo)
)
new_path = DirectoryHelper.get_media_root_path(
rename_format, rename_path=new_path
)
if not new_path:
self.__set_result(
success=False,
message="重命名格式无效",
fileitem=fileitem,
transfer_type=transfer_type,
need_notify=need_notify,
)
return self.result.copy()
else:
new_path = target_path / fileitem.name
# 整理目录
new_diritem, errmsg = self.__transfer_dir(fileitem=fileitem,
mediainfo=mediainfo,
source_oper=source_oper,
target_oper=target_oper,
target_storage=target_storage,
target_path=new_path,
transfer_type=transfer_type)
if not new_diritem:
logger.error(f"文件夹 {fileitem.path} 整理失败:{errmsg}")
self.__set_result(success=False,
message=f"未识别到文件集数",
message=errmsg,
fileitem=fileitem,
transfer_type=transfer_type,
need_notify=need_notify)
return self.result.copy()
logger.info(f"文件夹 {fileitem.path} 整理成功")
# 计算目录下所有文件大小
total_size = sum(file.stat().st_size for file in Path(fileitem.path).rglob('*') if file.is_file())
# 返回整理后的路径
self.__set_result(success=True,
fileitem=fileitem,
target_item=new_diritem,
target_diritem=new_diritem,
total_size=total_size,
need_scrape=need_scrape,
need_notify=need_notify,
transfer_type=transfer_type)
return self.result.copy()
else:
# 整理单个文件
if mediainfo.type == MediaType.TV:
# 电视剧
if in_meta.begin_episode is None:
logger.warn(f"文件 {fileitem.path} 整理失败:未识别到文件集数")
self.__set_result(success=False,
message="未识别到文件集数",
fileitem=fileitem,
fail_list=[fileitem.path],
transfer_type=transfer_type,
need_notify=need_notify)
return self.result.copy()
# 文件结束季为空
in_meta.end_season = None
# 文件总季数为1
if in_meta.total_season:
in_meta.total_season = 1
# 文件不可能超过2集
if in_meta.total_episode > 2:
in_meta.total_episode = 1
in_meta.end_episode = None
# 目的文件名
if need_rename:
new_file = self.get_rename_path(
path=target_path,
template_string=rename_format,
rename_dict=self.get_naming_dict(
meta=in_meta,
mediainfo=mediainfo,
episodes_info=episodes_info,
file_ext=f".{fileitem.extension}"
)
)
folder_path = DirectoryHelper.get_media_root_path(
rename_format, rename_path=new_file
)
if not folder_path:
self.__set_result(
success=False,
message="重命名格式无效",
fileitem=fileitem,
fail_list=[fileitem.path],
transfer_type=transfer_type,
need_notify=need_notify,
)
return self.result.copy()
else:
new_file = target_path / fileitem.name
folder_path = target_path
# 判断是否要覆盖
overflag = False
# 目标目录
target_diritem = target_oper.get_folder(folder_path)
if not target_diritem:
logger.error(f"目标目录 {folder_path} 获取失败")
self.__set_result(success=False,
message=f"目标目录 {folder_path} 获取失败",
fileitem=fileitem,
fail_list=[fileitem.path],
transfer_type=transfer_type,
need_notify=need_notify)
return self.result
# 文件结束季为空
in_meta.end_season = None
# 文件总季数为1
if in_meta.total_season:
in_meta.total_season = 1
# 文件不可能超过2集
if in_meta.total_episode > 2:
in_meta.total_episode = 1
in_meta.end_episode = None
# 目的文件名
if need_rename:
new_file = self.get_rename_path(
path=target_path,
template_string=rename_format,
rename_dict=self.get_naming_dict(
meta=in_meta,
mediainfo=mediainfo,
episodes_info=episodes_info,
file_ext=f".{fileitem.extension}"
)
)
else:
new_file = target_path / fileitem.name
# 判断是否要覆盖
overflag = False
# 计算重命名中的文件夹层级
rename_format_level = len(rename_format.split("/")) - 1
folder_path = new_file.parents[rename_format_level - 1]
# 目标目录
target_diritem = target_oper.get_folder(folder_path)
if not target_diritem:
logger.error(f"目标目录 {folder_path} 获取失败")
self.__set_result(success=False,
message=f"目标目录 {folder_path} 获取失败",
fileitem=fileitem,
fail_list=[fileitem.path],
transfer_type=transfer_type,
need_notify=need_notify)
return self.result
# 目标文件
target_item = target_oper.get_item(new_file)
if target_item:
# 目标文件已存在
target_file = new_file
if target_storage == "local" and new_file.is_symlink():
target_file = new_file.readlink()
if not target_file.exists():
overflag = True
if not overflag:
return self.result.copy()
# 目标文件
target_item = target_oper.get_item(new_file)
if target_item:
# 目标文件已存在
logger.info(f"目的文件系统中已经存在同名文件 {target_file},当前整理覆盖模式设置为 {overwrite_mode}")
if overwrite_mode == 'always':
# 总是覆盖同名文件
overflag = True
elif overwrite_mode == 'size':
# 存在时大覆盖小
if target_item.size < fileitem.size:
logger.info(f"目标文件文件大小更小,将覆盖:{new_file}")
target_file = new_file
if target_storage == "local" and new_file.is_symlink():
target_file = new_file.readlink()
if not target_file.exists():
overflag = True
else:
if not overflag:
# 目标文件已存在
logger.info(f"目的文件系统中已经存在同名文件 {target_file},当前整理覆盖模式设置为 {overwrite_mode}")
if overwrite_mode == 'always':
# 总是覆盖同名文件
overflag = True
elif overwrite_mode == 'size':
# 存在时大覆盖小
if target_item.size < fileitem.size:
logger.info(f"目标文件文件大小更小,将覆盖:{new_file}")
overflag = True
else:
self.__set_result(success=False,
message=f"媒体库存在同名文件,且质量更好",
fileitem=fileitem,
target_item=target_item,
target_diritem=target_diritem,
fail_list=[fileitem.path],
transfer_type=transfer_type,
need_notify=need_notify)
return self.result.copy()
elif overwrite_mode == 'never':
# 存在不覆盖
self.__set_result(success=False,
message=f"媒体库存在同名文件,且质量更好",
message=f"媒体库存在同名文件,当前覆盖模式为不覆盖",
fileitem=fileitem,
target_item=target_item,
target_diritem=target_diritem,
fail_list=[fileitem.path],
transfer_type=transfer_type,
need_notify=need_notify)
return self.result
elif overwrite_mode == 'never':
# 存在不覆盖
self.__set_result(success=False,
message=f"媒体库存在同名文件,当前覆盖模式为不覆盖",
fileitem=fileitem,
target_item=target_item,
target_diritem=target_diritem,
fail_list=[fileitem.path],
transfer_type=transfer_type,
need_notify=need_notify)
return self.result
elif overwrite_mode == 'latest':
# 仅保留最新版本
logger.info(f"当前整理覆盖模式设置为仅保留最新版本,将覆盖:{new_file}")
overflag = True
else:
if overwrite_mode == 'latest':
# 文件不存在,但仅保留最新版本
logger.info(f"当前整理覆盖模式设置为 {overwrite_mode},仅保留最新版本,正在删除已有版本文件 ...")
self.__delete_version_files(target_oper, new_file)
# 整理文件
new_item, err_msg = self.__transfer_file(fileitem=fileitem,
mediainfo=mediainfo,
target_storage=target_storage,
target_file=new_file,
transfer_type=transfer_type,
over_flag=overflag,
source_oper=source_oper,
target_oper=target_oper)
if not new_item:
logger.error(f"文件 {fileitem.path} 整理失败:{err_msg}")
self.__set_result(success=False,
message=err_msg,
return self.result.copy()
elif overwrite_mode == 'latest':
# 仅保留最新版本
logger.info(f"当前整理覆盖模式设置为仅保留最新版本,将覆盖:{new_file}")
overflag = True
else:
if overwrite_mode == 'latest':
# 文件不存在,但仅保留最新版本
logger.info(f"当前整理覆盖模式设置为 {overwrite_mode},仅保留最新版本,正在删除已有版本文件 ...")
self.__delete_version_files(target_oper, new_file)
# 整理文件
new_item, err_msg = self.__transfer_file(fileitem=fileitem,
mediainfo=mediainfo,
target_storage=target_storage,
target_file=new_file,
transfer_type=transfer_type,
over_flag=overflag,
source_oper=source_oper,
target_oper=target_oper)
if not new_item:
logger.error(f"文件 {fileitem.path} 整理失败:{err_msg}")
self.__set_result(success=False,
message=err_msg,
fileitem=fileitem,
fail_list=[fileitem.path],
transfer_type=transfer_type,
need_notify=need_notify)
return self.result.copy()
logger.info(f"文件 {fileitem.path} 整理成功")
self.__set_result(success=True,
fileitem=fileitem,
fail_list=[fileitem.path],
target_item=new_item,
target_diritem=target_diritem,
need_scrape=need_scrape,
transfer_type=transfer_type,
need_notify=need_notify)
return self.result
logger.info(f"文件 {fileitem.path} 整理成功")
self.__set_result(success=True,
fileitem=fileitem,
target_item=new_item,
target_diritem=target_diritem,
need_scrape=need_scrape,
transfer_type=transfer_type,
need_notify=need_notify)
return self.result
return self.result.copy()
finally:
self.result = None
@staticmethod
def __transfer_command(fileitem: FileItem, target_storage: str,

View File

@@ -168,7 +168,8 @@ class Jellyfin:
path=library.get("Path"),
type=library_type,
image=image,
link=link
link=link,
server_type="jellyfin"
))
return libraries
@@ -934,7 +935,8 @@ class Jellyfin:
type=item_type,
image=image,
link=link,
percent=item.get("UserData", {}).get("PlayedPercentage")
percent=item.get("UserData", {}).get("PlayedPercentage"),
server_type='jellyfin',
))
return ret_resume
else:
@@ -986,7 +988,8 @@ class Jellyfin:
type=item_type,
image=image,
link=link,
BackdropImageTags=item.get("BackdropImageTags")
BackdropImageTags=item.get("BackdropImageTags"),
server_type='jellyfin'
))
return ret_latest
else:

View File

@@ -154,7 +154,8 @@ class Plex:
type=library_type,
image_list=image_list,
link=f"{self._playhost or self._host}web/index.html#!/media/{self._plex.machineIdentifier}"
f"/com.plexapp.plugins.library?source={library.key}"
f"/com.plexapp.plugins.library?source={library.key}&X-Plex-Token={self._token}",
server_type='plex'
)
)
return libraries
@@ -387,6 +388,8 @@ class Plex:
for path, lib_key in result_dict.items():
logger.info(f"刷新媒体库:{lib_key} - {path}")
self._plex.query(f'/library/sections/{lib_key}/refresh?path={quote_plus(str(Path(path).parent))}')
return None
return None
@staticmethod
def __find_librarie(path: Path, libraries: List[Any]) -> Tuple[str, str]:
@@ -541,6 +544,7 @@ class Plex:
continue
except Exception as err:
logger.error(f"获取媒体库列表出错:{str(err)}")
return None
def get_webhook_message(self, form: any) -> Optional[schemas.WebhookEventInfo]:
"""
@@ -718,7 +722,7 @@ class Plex:
拼装媒体播放链接
:param item_id: 媒体的的ID
"""
return f'{self._playhost or self._host}web/index.html#!/server/{self._plex.machineIdentifier}/details?key={item_id}'
return f'{self._playhost or self._host}web/index.html#!/server/{self._plex.machineIdentifier}/details?key={item_id}&X-Plex-Token={self._token}'
def get_resume(self, num: Optional[int] = 12) -> Optional[List[schemas.MediaServerPlayItem]]:
"""
@@ -752,7 +756,8 @@ class Plex:
type=item_type,
image=image,
link=link,
percent=item.viewOffset / item.duration * 100 if item.viewOffset and item.duration else 0
percent=item.viewOffset / item.duration * 100 if item.viewOffset and item.duration else 0,
server_type='plex'
))
return ret_resume[:num]
@@ -820,7 +825,8 @@ class Plex:
subtitle=item.year,
type=item_type,
image=image,
link=link
link=link,
server_type='plex'
))
offset += num
return ret_resume[:num]

View File

@@ -7,19 +7,19 @@ from ruamel.yaml import CommentedMap
from app.core.config import settings
from app.log import logger
from app.utils.singleton import Singleton
from app.utils.singleton import WeakSingleton
class CategoryHelper(metaclass=Singleton):
class CategoryHelper(metaclass=WeakSingleton):
"""
二级分类
"""
_categorys = {}
_movie_categorys = {}
_tv_categorys = {}
def __init__(self):
self._category_path: Path = settings.CONFIG_PATH / "category.yaml"
self._categorys = {}
self._movie_categorys = {}
self._tv_categorys = {}
self.init()
def init(self):
@@ -69,7 +69,7 @@ class CategoryHelper(metaclass=Singleton):
"""
if not self._movie_categorys:
return []
return self._movie_categorys.keys()
return list(self._movie_categorys.keys())
@property
def tv_categorys(self) -> list:
@@ -78,7 +78,7 @@ class CategoryHelper(metaclass=Singleton):
"""
if not self._tv_categorys:
return []
return self._tv_categorys.keys()
return list(self._tv_categorys.keys())
def get_movie_category(self, tmdb_info) -> str:
"""
@@ -127,7 +127,7 @@ class CategoryHelper(metaclass=Singleton):
continue
elif attr == "production_countries":
# 制片国家
info_values = [str(val.get("iso_3166_1")).upper() for val in info_value]
info_values = [str(val.get("iso_3166_1")).upper() for val in info_value] # type: ignore
else:
if isinstance(info_value, list):
info_values = [str(val).upper() for val in info_value]

View File

@@ -9,7 +9,7 @@ from typing import Optional
from app.core.config import settings
from app.core.meta import MetaBase
from app.log import logger
from app.utils.singleton import Singleton
from app.utils.singleton import WeakSingleton
from app.schemas.types import MediaType
lock = RLock()
@@ -18,7 +18,7 @@ CACHE_EXPIRE_TIMESTAMP_STR = "cache_expire_timestamp"
EXPIRE_TIMESTAMP = settings.CONF.meta
class TmdbCache(metaclass=Singleton):
class TmdbCache(metaclass=WeakSingleton):
"""
TMDB缓存数据
{
@@ -28,9 +28,6 @@ class TmdbCache(metaclass=Singleton):
"type": MediaType
}
"""
_meta_data: dict = {}
# 缓存文件路径
_meta_path: Path = None
# TMDB缓存过期
_tmdb_cache_expire: bool = True
@@ -218,3 +215,6 @@ class TmdbCache(metaclass=Singleton):
if not cache_media_info:
return
self._meta_data[key]['title'] = cn_title
def __del__(self):
self.save()

View File

@@ -320,7 +320,7 @@ class Api:
types=None,
exclude_grouped_video=True,
page=1,
page_size=22,
page_size=20,
sort_by="create_time",
sort="DESC",
) -> Optional[list[Item]]:

View File

@@ -163,6 +163,7 @@ class TrimeMedia:
for img_path in library.posters or []
],
link=f"{self._playhost or self._api.host}/library/{library.guid}",
server_type='trimemedia'
)
)
return libraries
@@ -458,6 +459,7 @@ class TrimeMedia:
if item.duration and item.ts is not None
else 0
),
server_type='trimemedia',
)
def get_items(

View File

@@ -59,26 +59,23 @@ class Monitor(metaclass=Singleton):
目录监控处理链,单例模式
"""
# 退出事件
_event = threading.Event()
# 监控服务
_observers = []
# 定时服务
_scheduler = None
# 存储快照缓存目录
_snapshot_cache_dir = None
# 存储过照间隔(分钟)
_snapshot_interval = 5
# TTL缓存10秒钟有效
_cache = TTLCache(maxsize=1024, ttl=10)
def __init__(self):
super().__init__()
# 退出事件
self._event = threading.Event()
# 监控服务
self._observers = []
# 定时服务
self._scheduler = None
# 存储快照缓存目录
self._snapshot_cache_dir = None
# 存储过照间隔(分钟)
self._snapshot_interval = 5
# TTL缓存10秒钟有效
self._cache = TTLCache(maxsize=1024, ttl=10)
# 监控的文件扩展名
self.all_exts = settings.RMT_MEDIAEXT
# 初始化快照缓存目录
self._snapshot_cache_dir = settings.TEMP_PATH / "snapshots"
@@ -394,6 +391,14 @@ class Monitor(metaclass=Singleton):
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
output = result.stdout.lower()
# 以下本地文件系统含有fuse关键字
local_fs = [
"fuse.shfs", # Unraid
"zfuse.zfsv", # 极空间(zfuse.zfsv2、zfuse.zfsv3、...)
# TBD
]
if any(fs in output for fs in local_fs):
return False
network_fs = ['nfs', 'cifs', 'smbfs', 'fuse', 'sshfs', 'ftpfs']
return any(fs in output for fs in network_fs)
elif system == 'Darwin':

View File

@@ -40,20 +40,20 @@ class Scheduler(metaclass=Singleton):
"""
定时任务管理
"""
# 定时服务
_scheduler = None
# 退出事件
_event = threading.Event()
# 锁
_lock = threading.RLock()
# 各服务的运行状态
_jobs = {}
# 用户认证失败次数
_auth_count = 0
# 用户认证失败消息发送
_auth_message = False
def __init__(self):
# 定时服务
self._scheduler = None
# 退出事件
self._event = threading.Event()
# 锁
self._lock = threading.RLock()
# 各服务的运行状态
self._jobs = {}
# 用户认证失败次数
self._auth_count = 0
# 用户认证失败消息发送
self._auth_message = False
self.init()
@eventmanager.register(EventType.ConfigChanged)
@@ -443,7 +443,7 @@ class Scheduler(metaclass=Singleton):
return
with self._lock:
job_id = f"workflow-{workflow.id}"
service = self._jobs.pop(job_id, None)
service = self._jobs.pop(job_id, {})
if not service:
return
try:

View File

@@ -214,7 +214,7 @@ class ResourceDownloadEventData(ChainEventData):
channel: Optional[MessageChannel] = Field(None, description="通知渠道")
origin: Optional[str] = Field(None, description="来源")
downloader: Optional[str] = Field(None, description="下载器")
options: Optional[dict] = Field(None, description="其他参数")
options: Optional[dict] = Field(default={}, description="其他参数")
# 输出参数
cancel: bool = Field(default=False, description="是否取消下载")

View File

@@ -72,6 +72,8 @@ class MediaServerLibrary(BaseModel):
image_list: Optional[List[str]] = None
# 跳转链接
link: Optional[str] = None
# 服务器类型
server_type: Optional[str] = None
class MediaServerItemUserState(BaseModel):
@@ -175,3 +177,4 @@ class MediaServerPlayItem(BaseModel):
link: Optional[str] = None
percent: Optional[float] = None
BackdropImageTags: Optional[list] = Field(default_factory=list)
server_type: Optional[str] = None

View File

@@ -138,6 +138,15 @@ class SubscribeShare(BaseModel):
count: Optional[int] = 0
class SubscribeShareStatistics(BaseModel):
# 分享人
share_user: Optional[str] = None
# 分享数量
share_count: Optional[int] = 0
# 总复用人次
total_reuse_count: Optional[int] = 0
class SubscribeDownloadFileInfo(BaseModel):
# 种子名称
torrent_title: Optional[str] = None

View File

@@ -82,3 +82,25 @@ class ActionFlow(BaseModel):
source: Optional[str] = Field(default=None, description="源动作")
target: Optional[str] = Field(default=None, description="目标动作")
animated: Optional[bool] = Field(default=True, description="是否动画流程")
class WorkflowShare(BaseModel):
"""
工作流分享信息
"""
id: Optional[int] = Field(default=None, description="分享ID")
share_title: Optional[str] = Field(default=None, description="分享标题")
share_comment: Optional[str] = Field(default=None, description="分享说明")
share_user: Optional[str] = Field(default=None, description="分享人")
share_uid: Optional[str] = Field(default=None, description="分享人唯一ID")
name: Optional[str] = Field(default=None, description="工作流名称")
description: Optional[str] = Field(default=None, description="工作流描述")
timer: Optional[str] = Field(default=None, description="定时器")
actions: Optional[str] = Field(default=None, description="任务列表(JSON字符串)")
flows: Optional[str] = Field(default=None, description="任务流(JSON字符串)")
context: Optional[str] = Field(default=None, description="执行上下文(JSON字符串)")
date: Optional[str] = Field(default=None, description="分享时间")
count: Optional[int] = Field(default=0, description="复用人次")
class Config:
orm_mode = True

View File

@@ -5,7 +5,6 @@ from fastapi import FastAPI
from app.chain.system import SystemChain
from app.startup.command_initializer import init_command, stop_command, restart_command
from app.startup.memory_initializer import init_memory_manager, stop_memory_manager
from app.startup.modules_initializer import init_modules, stop_modules
from app.startup.monitor_initializer import stop_monitor, init_monitor
from app.startup.plugins_initializer import init_plugins, stop_plugins, sync_plugins
@@ -52,8 +51,6 @@ async def lifespan(app: FastAPI):
init_command()
# 初始化工作流
init_workflow()
# 初始化内存管理
init_memory_manager()
# 插件同步到本地
sync_plugins_task = asyncio.create_task(init_extra())
try:
@@ -71,8 +68,6 @@ async def lifespan(app: FastAPI):
print(str(e))
# 备份插件
SystemChain().backup_plugins()
# 停止内存管理器
stop_memory_manager()
# 停止工作流
stop_workflow()
# 停止命令

View File

@@ -1,15 +0,0 @@
from app.helper.memory import MemoryHelper
def init_memory_manager():
"""
初始化内存监控器
"""
MemoryHelper().start_monitoring()
def stop_memory_manager():
"""
停止内存监控器
"""
MemoryHelper().stop_monitoring()

View File

@@ -1,5 +1,7 @@
import sys
import re
from contextlib import contextmanager
from pathlib import Path
from typing import Any, Optional, Union
import chardet
@@ -8,6 +10,7 @@ import urllib3
from requests import Response, Session
from urllib3.exceptions import InsecureRequestWarning
from app.core.config import settings
from app.log import logger
urllib3.disable_warnings(InsecureRequestWarning)
@@ -86,6 +89,7 @@ class AutoCloseResponse:
def __exit__(self, *args):
self.close()
class RequestUtils:
def __init__(self,
@@ -106,6 +110,10 @@ class RequestUtils:
if headers:
self._headers = headers
else:
if ua and ua == settings.USER_AGENT:
caller_name = self.__get_caller()
if caller_name:
ua = f"{settings.USER_AGENT} Plugin/{caller_name}"
self._headers = {
"User-Agent": ua,
"Content-Type": content_type,
@@ -120,6 +128,43 @@ class RequestUtils:
else:
self._cookies = None
@staticmethod
def __get_caller():
"""
获取调用者的名称,识别是否为插件调用
"""
# 调用者名称
caller_name = None
try:
frame = sys._getframe(3) # noqa
except (AttributeError, ValueError):
return None
while frame:
filepath = Path(frame.f_code.co_filename)
parts = filepath.parts
if "app" in parts:
if not caller_name and "plugins" in parts:
try:
plugins_index = parts.index("plugins")
if plugins_index + 1 < len(parts):
plugin_candidate = parts[plugins_index + 1]
if plugin_candidate != "__init__.py":
caller_name = plugin_candidate
break
except ValueError:
pass
if "main.py" in parts:
break
elif len(parts) != 1:
break
try:
frame = frame.f_back
except AttributeError:
break
return caller_name
def request(self, method: str, url: str, raise_exception: bool = False, **kwargs) -> Optional[Response]:
"""
发起HTTP请求

View File

@@ -1,4 +1,6 @@
import abc
import threading
import weakref
class Singleton(abc.ABCMeta, type):
@@ -40,3 +42,17 @@ class AbstractSingletonClass(abc.ABC, metaclass=SingletonClass):
抽像类单例模式(按类)
"""
pass
class WeakSingleton(abc.ABCMeta, type):
"""
弱引用单例模式 - 当没有强引用时自动清理
"""
_instances: weakref.WeakKeyDictionary = weakref.WeakKeyDictionary()
_lock = threading.RLock()
def __call__(cls, *args, **kwargs):
with cls._lock:
if cls not in cls._instances:
cls._instances[cls] = super().__call__(*args, **kwargs)
return cls._instances[cls]

View File

@@ -68,35 +68,57 @@ class SystemUtils:
"""
if SystemUtils.is_windows():
return False
return True if "synology" in SystemUtils.execute('uname -a') else False
return "synology" in SystemUtils.execute('uname -a')
@staticmethod
def is_windows() -> bool:
"""
判断是否为Windows系统
"""
return True if os.name == "nt" else False
return os.name == "nt"
@staticmethod
def is_frozen() -> bool:
"""
判断是否为冻结的二进制文件
"""
return True if getattr(sys, 'frozen', False) else False
return getattr(sys, 'frozen', False)
@staticmethod
def is_macos() -> bool:
"""
判断是否为MacOS系统
"""
return True if platform.system() == 'Darwin' else False
return platform.system() == 'Darwin'
@staticmethod
def is_aarch64() -> bool:
"""
判断是否为ARM64架构
"""
return True if platform.machine() == 'aarch64' else False
return platform.machine().lower() in ('aarch64', 'arm64')
@staticmethod
def is_aarch() -> bool:
"""
判断是否为ARM32架构
"""
arch_name = platform.machine().lower()
return arch_name.startswith(('arm', 'aarch')) and arch_name not in ('aarch64', 'arm64')
@staticmethod
def is_x86_64() -> bool:
"""
判断是否为AMD64架构
"""
return platform.machine().lower() in ('amd64', 'x86_64')
@staticmethod
def is_x86_32() -> bool:
"""
判断是否为AMD32架构
"""
return platform.machine().lower() in ('i386', 'i686', 'x86', '386', 'x86_32')
@staticmethod
def platform() -> str:
@@ -112,6 +134,22 @@ class SystemUtils:
else:
return "Linux"
@staticmethod
def cpu_arch() -> str:
"""
获取CPU架构
"""
if SystemUtils.is_x86_64():
return "x86_64"
elif SystemUtils.is_x86_32():
return "x86_32"
elif SystemUtils.is_aarch64():
return "Arm64"
elif SystemUtils.is_aarch():
return "Arm32"
else:
return platform.machine()
@staticmethod
def copy(src: Path, dest: Path) -> Tuple[int, str]:
"""

View File

@@ -1,2 +1,2 @@
APP_VERSION = 'v2.6.3'
FRONTEND_VERSION = 'v2.6.3'
APP_VERSION = 'v2.6.6'
FRONTEND_VERSION = 'v2.6.6'