Compare commits

...

29 Commits

Author SHA1 Message Date
jxxghp
996d8ab954 v2.8.4-1
- 修复工作流组件加载问题
- 修改个别智能体工具问题
2025-11-20 13:10:38 +08:00
jxxghp
fac2546a92 Enhance media library query tool with detailed logging and improved error handling. Refactor to use MediaServerChain for media existence checks and item details retrieval. 2025-11-20 13:02:23 +08:00
jxxghp
728ea6172a fix exists_local 2025-11-20 12:43:19 +08:00
jxxghp
f59d225029 fix workflow actions 2025-11-20 12:34:44 +08:00
jxxghp
0b178a715f fix search_media 2025-11-20 12:00:00 +08:00
jxxghp
e06e5328c2 Merge pull request #5148 from jxxghp/cursor/handle-ai-agent-message-processing-error-af53 2025-11-20 09:37:48 +08:00
Cursor Agent
1c14cd0979 Refactor: Use asyncio.run_coroutine_threadsafe for async calls
Co-authored-by: jxxghp <jxxghp@qq.com>
2025-11-20 01:37:17 +00:00
jxxghp
f9141f5ba2 Merge remote-tracking branch 'origin/v2' into v2 2025-11-20 08:19:52 +08:00
jxxghp
48da5c976c fixx loop 2025-11-20 08:15:37 +08:00
jxxghp
fa38c81c08 Merge pull request #5146 from DDS-Derek/dev 2025-11-19 20:47:16 +08:00
DDSDerek
8d5fe5270f Update app/modules/filemanager/storages/u115.py
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
2025-11-19 19:51:14 +08:00
DDSRem
0dc0d66549 fix: known issue 2025-11-19 19:46:46 +08:00
DDSRem
f589fcc2d0 feat(u115): improve stability of the u115 module
1. 优化API请求错误时到处理逻辑
2. 提升hash计算速度
3. 接口级QPS速率限制
4. 使用httpx替换request
5. 优化路径拼接稳定性
6. 代码格式化
2025-11-19 19:39:02 +08:00
jxxghp
edd44a0993 Merge pull request #5143 from jxxghp/cursor/update-mcp-api-documentation-and-readme-a12b 2025-11-19 16:05:23 +08:00
Cursor Agent
2aae496742 Refactor: Improve MCP API documentation for broader client support
Co-authored-by: jxxghp <jxxghp@qq.com>
2025-11-19 08:03:53 +00:00
Cursor Agent
6f72046f86 Refactor: Update MCP API documentation and authentication
Co-authored-by: jxxghp <jxxghp@qq.com>
2025-11-19 07:47:32 +00:00
jxxghp
d4a9b446a6 更新 requirements.in 2025-11-19 14:35:41 +08:00
jxxghp
95f571e9b9 更新 requirements.in 2025-11-19 14:34:27 +08:00
jxxghp
e8aeae5c07 更新 version.py 2025-11-19 14:28:49 +08:00
jxxghp
ddf6dc0343 Merge pull request #5142 from jxxghp/cursor/update-agent-site-tool-documentation-with-priority-81fb 2025-11-19 14:17:31 +08:00
Cursor Agent
36d55a9db7 Refactor: Simplify tool descriptions
Co-authored-by: jxxghp <jxxghp@qq.com>
2025-11-19 06:16:24 +00:00
Cursor Agent
7d41379ad5 Refactor: Clarify site priority in tool descriptions
Co-authored-by: jxxghp <jxxghp@qq.com>
2025-11-19 06:15:18 +00:00
jxxghp
63e928da96 更新 version.py 2025-11-19 14:10:11 +08:00
jxxghp
5c983b64bc fix SiteUserData 2025-11-19 13:47:02 +08:00
jxxghp
b2d36c0e68 Update API key documentation to clarify retrieval methods in security.py and mcp-api.md 2025-11-19 13:42:53 +08:00
jxxghp
6123a1620e add mcp 2025-11-19 13:19:17 +08:00
jxxghp
5ae7c10a00 Enhance MoviePilotAgent to handle empty agent messages gracefully by providing a default error response, ensuring better user experience. Refactor message processing to streamline event loop execution. 2025-11-19 12:51:08 +08:00
jxxghp
b5a6794381 Refactor event loop handling to use GlobalVar.CURRENT_EVENT_LOOP across multiple modules, improving consistency and maintainability. 2025-11-19 08:42:07 +08:00
jxxghp
6b575f836a Add filter_groups parameter to AddSubscribeTool and include SearchSubscribeTool and QueryRuleGroupsTool in MoviePilotToolFactory 2025-11-19 08:31:06 +08:00
32 changed files with 1255 additions and 313 deletions

View File

@@ -30,6 +30,8 @@
API文档https://api.movie-pilot.org
MCP工具API文档详见 [docs/mcp-api.md](docs/mcp-api.md)
本地运行需要 `Python 3.12``Node JS v20.12.1`
- 克隆主项目 [MoviePilot](https://github.com/jxxghp/MoviePilot)

View File

@@ -221,15 +221,20 @@ class MoviePilotAgent:
agent_message = await self.callback_handler.get_message()
# 发送Agent回复给用户通过原渠道
await self.send_agent_message(agent_message)
if agent_message:
# 发送回复
await self.send_agent_message(agent_message)
# 添加Agent回复到记忆
await self.memory_manager.add_memory(
session_id=self.session_id,
user_id=self.user_id,
role="agent",
content=agent_message
)
# 添加Agent回复到记忆
await self.memory_manager.add_memory(
session_id=self.session_id,
user_id=self.user_id,
role="agent",
content=agent_message
)
else:
agent_message = "很抱歉,智能体出错了,未能生成回复内容。"
await self.send_agent_message(agent_message)
return agent_message

View File

@@ -7,6 +7,7 @@ from pydantic import PrivateAttr
from app.agent import StreamingCallbackHandler
from app.chain import ChainBase
from app.log import logger
from app.schemas import Notification
@@ -49,7 +50,10 @@ class MoviePilotTool(BaseTool, metaclass=ABCMeta):
if tool_message:
formatted_message = f"⚙️ => {tool_message}"
await self.send_tool_message(formatted_message)
return await self.run(**kwargs)
logger.debug(f'Executing tool {self.name} with args: {kwargs}')
result = await self.run(**kwargs)
logger.debug(f'Tool {self.name} executed with result: {result}')
return result
def get_tool_message(self, **kwargs) -> Optional[str]:
"""

View File

@@ -5,6 +5,7 @@ from typing import List, Callable
from app.agent.tools.impl.add_download import AddDownloadTool
from app.agent.tools.impl.add_subscribe import AddSubscribeTool
from app.agent.tools.impl.update_subscribe import UpdateSubscribeTool
from app.agent.tools.impl.search_subscribe import SearchSubscribeTool
from app.agent.tools.impl.get_recommendations import GetRecommendationsTool
from app.agent.tools.impl.query_downloaders import QueryDownloadersTool
from app.agent.tools.impl.query_downloads import QueryDownloadsTool
@@ -15,6 +16,7 @@ from app.agent.tools.impl.query_site_userdata import QuerySiteUserdataTool
from app.agent.tools.impl.test_site import TestSiteTool
from app.agent.tools.impl.query_subscribes import QuerySubscribesTool
from app.agent.tools.impl.query_subscribe_shares import QuerySubscribeSharesTool
from app.agent.tools.impl.query_rule_groups import QueryRuleGroupsTool
from app.agent.tools.impl.query_popular_subscribes import QueryPopularSubscribesTool
from app.agent.tools.impl.query_subscribe_history import QuerySubscribeHistoryTool
from app.agent.tools.impl.delete_subscribe import DeleteSubscribeTool
@@ -55,11 +57,13 @@ class MoviePilotToolFactory:
QueryEpisodeScheduleTool,
AddSubscribeTool,
UpdateSubscribeTool,
SearchSubscribeTool,
SearchTorrentsTool,
AddDownloadTool,
QuerySubscribesTool,
QuerySubscribeSharesTool,
QueryPopularSubscribesTool,
QueryRuleGroupsTool,
QuerySubscribeHistoryTool,
DeleteSubscribeTool,
QueryDownloadsTool,

View File

@@ -1,6 +1,6 @@
"""添加订阅工具"""
from typing import Optional, Type
from typing import Optional, Type, List
from pydantic import BaseModel, Field
@@ -31,6 +31,8 @@ class AddSubscribeInput(BaseModel):
description="Resolution filter as regular expression (optional, e.g., '1080p|720p|2160p')")
effect: Optional[str] = Field(None,
description="Effect filter as regular expression (optional, e.g., 'HDR|DV|SDR')")
filter_groups: Optional[List[str]] = Field(None,
description="List of filter rule group names to apply (optional, use query_rule_groups tool to get available rule groups)")
class AddSubscribeTool(MoviePilotTool):
@@ -59,11 +61,12 @@ class AddSubscribeTool(MoviePilotTool):
season: Optional[int] = None, tmdb_id: Optional[str] = None,
start_episode: Optional[int] = None, total_episode: Optional[int] = None,
quality: Optional[str] = None, resolution: Optional[str] = None,
effect: Optional[str] = None, **kwargs) -> str:
effect: Optional[str] = None, filter_groups: Optional[List[str]] = None, **kwargs) -> str:
logger.info(
f"执行工具: {self.name}, 参数: title={title}, year={year}, media_type={media_type}, "
f"season={season}, tmdb_id={tmdb_id}, start_episode={start_episode}, "
f"total_episode={total_episode}, quality={quality}, resolution={resolution}, effect={effect}")
f"total_episode={total_episode}, quality={quality}, resolution={resolution}, "
f"effect={effect}, filter_groups={filter_groups}")
try:
subscribe_chain = SubscribeChain()
@@ -87,6 +90,8 @@ class AddSubscribeTool(MoviePilotTool):
subscribe_kwargs['resolution'] = resolution
if effect:
subscribe_kwargs['effect'] = effect
if filter_groups:
subscribe_kwargs['filter_groups'] = filter_groups
sid, message = await subscribe_chain.async_add(
mtype=MediaType(media_type),
@@ -111,6 +116,8 @@ class AddSubscribeTool(MoviePilotTool):
params.append(f"分辨率过滤: {resolution}")
if effect:
params.append(f"特效过滤: {effect}")
if filter_groups:
params.append(f"规则组: {', '.join(filter_groups)}")
if params:
result_msg += f"\n配置参数: {', '.join(params)}"
return result_msg

View File

@@ -6,9 +6,11 @@ from typing import Optional, List, Type
from pydantic import BaseModel, Field
from app.agent.tools.base import MoviePilotTool
from app.db.mediaserver_oper import MediaServerOper
from app.chain.mediaserver import MediaServerChain
from app.core.context import MediaInfo
from app.log import logger
from app.schemas import MediaServerItem
from app.schemas.types import MediaType
class QueryMediaLibraryInput(BaseModel):
@@ -48,11 +50,49 @@ class QueryMediaLibraryTool(MoviePilotTool):
title: Optional[str] = None, year: Optional[str] = None, **kwargs) -> str:
logger.info(f"执行工具: {self.name}, 参数: media_type={media_type}, title={title}")
try:
media_server_oper = MediaServerOper()
filtered_medias: List[MediaServerItem] = await media_server_oper.async_exists(title=title, year=year, mtype=media_type)
if filtered_medias:
return json.dumps([m.to_dict() for m in filtered_medias])
return "媒体库中未找到相关媒体"
if not title:
return "请提供媒体标题进行查询"
# 创建 MediaInfo 对象
mediainfo = MediaInfo()
mediainfo.title = title
mediainfo.year = year
# 转换媒体类型
if media_type == "电影":
mediainfo.type = MediaType.MOVIE
elif media_type == "电视剧":
mediainfo.type = MediaType.TV
# media_type == "all" 时不设置类型,让媒体服务器自动判断
# 调用媒体服务器接口实时查询
media_chain = MediaServerChain()
existsinfo = media_chain.media_exists(mediainfo=mediainfo)
if not existsinfo:
return "媒体库中未找到相关媒体"
# 如果找到了,获取详细信息
result_items = []
if existsinfo.itemid and existsinfo.server:
iteminfo = media_chain.iteminfo(server=existsinfo.server, item_id=existsinfo.itemid)
if iteminfo:
# 使用 model_dump() 转换为字典格式
item_dict = iteminfo.model_dump(exclude_none=True)
result_items.append(item_dict)
if result_items:
return json.dumps(result_items, ensure_ascii=False)
# 如果找到了但没有详细信息,返回基本信息
result_dict = {
"type": existsinfo.type.value if existsinfo.type else None,
"server": existsinfo.server,
"server_type": existsinfo.server_type,
"itemid": existsinfo.itemid,
"seasons": existsinfo.seasons if existsinfo.seasons else {}
}
return json.dumps([result_dict], ensure_ascii=False)
except Exception as e:
logger.error(f"查询媒体库失败: {e}", exc_info=True)
return f"查询媒体库时发生错误: {str(e)}"

View File

@@ -0,0 +1,65 @@
"""查询规则组工具"""
import json
from typing import Optional, Type
from pydantic import BaseModel, Field
from app.agent.tools.base import MoviePilotTool
from app.helper.rule import RuleHelper
from app.log import logger
class QueryRuleGroupsInput(BaseModel):
"""查询规则组工具的输入参数模型"""
explanation: str = Field(..., description="Clear explanation of why this tool is being used in the current context")
class QueryRuleGroupsTool(MoviePilotTool):
name: str = "query_rule_groups"
description: str = "Query all filter rule groups available in the system. Rule groups are used to filter torrents when searching or subscribing. Returns rule group names, media types, and categories, but excludes rule_string to keep results concise."
args_schema: Type[BaseModel] = QueryRuleGroupsInput
def get_tool_message(self, **kwargs) -> Optional[str]:
"""根据查询参数生成友好的提示消息"""
return "正在查询所有规则组"
async def run(self, **kwargs) -> str:
logger.info(f"执行工具: {self.name}")
try:
rule_helper = RuleHelper()
rule_groups = rule_helper.get_rule_groups()
if not rule_groups:
return json.dumps({
"message": "未找到任何规则组",
"rule_groups": []
}, ensure_ascii=False, indent=2)
# 精简字段,过滤掉 rule_string 避免结果过大
simplified_groups = []
for group in rule_groups:
simplified = {
"name": group.name,
"media_type": group.media_type,
"category": group.category
}
simplified_groups.append(simplified)
result = {
"message": f"找到 {len(simplified_groups)} 个规则组",
"rule_groups": simplified_groups
}
return json.dumps(result, ensure_ascii=False, indent=2)
except Exception as e:
error_message = f"查询规则组失败: {str(e)}"
logger.error(f"查询规则组失败: {e}", exc_info=True)
return json.dumps({
"success": False,
"message": error_message,
"rule_groups": []
}, ensure_ascii=False)

View File

@@ -21,7 +21,7 @@ class QuerySitesInput(BaseModel):
class QuerySitesTool(MoviePilotTool):
name: str = "query_sites"
description: str = "Query site status and list all configured sites. Shows site name, domain, status, priority, and basic configuration."
description: str = "Query site status and list all configured sites. Shows site name, domain, status, priority, and basic configuration. Site priority (pri): smaller values have higher priority (e.g., pri=1 has higher priority than pri=10)."
args_schema: Type[BaseModel] = QuerySitesInput
def get_tool_message(self, **kwargs) -> Optional[str]:

View File

@@ -1,6 +1,5 @@
"""刮削媒体元数据工具"""
import asyncio
import json
from pathlib import Path
from typing import Optional, Type
@@ -9,6 +8,7 @@ from pydantic import BaseModel, Field
from app.agent.tools.base import MoviePilotTool
from app.chain.media import MediaChain
from app.core.config import global_vars
from app.core.metainfo import MetaInfoPath
from app.log import logger
from app.schemas import FileItem
@@ -17,9 +17,12 @@ from app.schemas import FileItem
class ScrapeMetadataInput(BaseModel):
"""刮削媒体元数据工具的输入参数模型"""
explanation: str = Field(..., description="Clear explanation of why this tool is being used in the current context")
path: str = Field(..., description="Path to the file or directory to scrape metadata for (e.g., '/path/to/file.mkv' or '/path/to/directory')")
storage: Optional[str] = Field("local", description="Storage type: 'local' for local storage, 'smb', 'alist', etc. for remote storage (default: 'local')")
overwrite: Optional[bool] = Field(False, description="Whether to overwrite existing metadata files (default: False)")
path: str = Field(...,
description="Path to the file or directory to scrape metadata for (e.g., '/path/to/file.mkv' or '/path/to/directory')")
storage: Optional[str] = Field("local",
description="Storage type: 'local' for local storage, 'smb', 'alist', etc. for remote storage (default: 'local')")
overwrite: Optional[bool] = Field(False,
description="Whether to overwrite existing metadata files (default: False)")
class ScrapeMetadataTool(MoviePilotTool):
@@ -32,19 +35,19 @@ class ScrapeMetadataTool(MoviePilotTool):
path = kwargs.get("path", "")
storage = kwargs.get("storage", "local")
overwrite = kwargs.get("overwrite", False)
message = f"正在刮削媒体元数据: {path}"
if storage != "local":
message += f" [存储: {storage}]"
if overwrite:
message += " [覆盖模式]"
return message
async def run(self, path: str, storage: Optional[str] = "local",
overwrite: Optional[bool] = False, **kwargs) -> str:
logger.info(f"执行工具: {self.name}, 参数: path={path}, storage={storage}, overwrite={overwrite}")
try:
# 验证路径
if not path:
@@ -52,14 +55,14 @@ class ScrapeMetadataTool(MoviePilotTool):
"success": False,
"message": "刮削路径不能为空"
}, ensure_ascii=False)
# 创建 FileItem
fileitem = FileItem(
storage=storage,
path=path,
type="file" if Path(path).suffix else "dir"
)
# 检查本地存储路径是否存在
if storage == "local":
scrape_path = Path(path)
@@ -68,23 +71,22 @@ class ScrapeMetadataTool(MoviePilotTool):
"success": False,
"message": f"刮削路径不存在: {path}"
}, ensure_ascii=False)
# 识别媒体信息
media_chain = MediaChain()
scrape_path = Path(path)
meta = MetaInfoPath(scrape_path)
mediainfo = await media_chain.async_recognize_by_meta(meta)
if not mediainfo:
return json.dumps({
"success": False,
"message": f"刮削失败,无法识别媒体信息: {path}",
"path": path
}, ensure_ascii=False)
# 在线程池中执行同步的刮削操作
loop = asyncio.get_event_loop()
await loop.run_in_executor(
await global_vars.loop.run_in_executor(
None,
lambda: media_chain.scrape_metadata(
fileitem=fileitem,
@@ -93,7 +95,7 @@ class ScrapeMetadataTool(MoviePilotTool):
overwrite=overwrite
)
)
return json.dumps({
"success": True,
"message": f"{path} 刮削完成",
@@ -106,7 +108,7 @@ class ScrapeMetadataTool(MoviePilotTool):
"season": mediainfo.season
}
}, ensure_ascii=False, indent=2)
except Exception as e:
error_message = f"刮削媒体元数据失败: {str(e)}"
logger.error(f"刮削媒体元数据失败: {e}", exc_info=True)
@@ -115,4 +117,3 @@ class ScrapeMetadataTool(MoviePilotTool):
"message": error_message,
"path": path
}, ensure_ascii=False)

View File

@@ -51,17 +51,8 @@ class SearchMediaTool(MoviePilotTool):
try:
media_chain = MediaChain()
# 构建搜索标题
search_title = title
if year:
search_title = f"{title} {year}"
if media_type:
search_title = f"{search_title} {media_type}"
if season:
search_title = f"{search_title} S{season:02d}"
# 使用 MediaChain.search 方法
meta, results = await media_chain.async_search(title=search_title)
meta, results = await media_chain.async_search(title=title)
# 过滤结果
if results:

View File

@@ -0,0 +1,127 @@
"""搜索订阅缺失剧集工具"""
import json
from typing import Optional, Type, List
from pydantic import BaseModel, Field
from app.agent.tools.base import MoviePilotTool
from app.chain.subscribe import SubscribeChain
from app.core.config import global_vars
from app.db.subscribe_oper import SubscribeOper
from app.log import logger
class SearchSubscribeInput(BaseModel):
"""搜索订阅缺失剧集工具的输入参数模型"""
explanation: str = Field(..., description="Clear explanation of why this tool is being used in the current context")
subscribe_id: int = Field(..., description="The ID of the subscription to search for missing episodes")
manual: Optional[bool] = Field(False, description="Whether this is a manual search (default: False)")
filter_groups: Optional[List[str]] = Field(None,
description="List of filter rule group names to apply for this search (optional, use query_rule_groups tool to get available rule groups. If provided, will temporarily update the subscription's filter groups before searching)")
class SearchSubscribeTool(MoviePilotTool):
name: str = "search_subscribe"
description: str = "Search for missing episodes/resources for a specific subscription. This tool will search torrent sites for the missing episodes of the subscription and automatically download matching resources. Use this when a user wants to search for missing episodes of a specific subscription."
args_schema: Type[BaseModel] = SearchSubscribeInput
def get_tool_message(self, **kwargs) -> Optional[str]:
"""根据搜索参数生成友好的提示消息"""
subscribe_id = kwargs.get("subscribe_id")
manual = kwargs.get("manual", False)
message = f"正在搜索订阅 #{subscribe_id} 的缺失剧集"
if manual:
message += "(手动搜索)"
return message
async def run(self, subscribe_id: int, manual: Optional[bool] = False,
filter_groups: Optional[List[str]] = None, **kwargs) -> str:
logger.info(
f"执行工具: {self.name}, 参数: subscribe_id={subscribe_id}, manual={manual}, filter_groups={filter_groups}")
try:
# 先验证订阅是否存在
subscribe_oper = SubscribeOper()
subscribe = subscribe_oper.get(subscribe_id)
if not subscribe:
return json.dumps({
"success": False,
"message": f"订阅不存在: {subscribe_id}"
}, ensure_ascii=False)
# 获取订阅信息用于返回
subscribe_info = {
"id": subscribe.id,
"name": subscribe.name,
"year": subscribe.year,
"type": subscribe.type,
"season": subscribe.season,
"state": subscribe.state,
"total_episode": subscribe.total_episode,
"lack_episode": subscribe.lack_episode,
"tmdbid": subscribe.tmdbid,
"doubanid": subscribe.doubanid
}
# 检查订阅状态
if subscribe.state == "S":
return json.dumps({
"success": False,
"message": f"订阅 #{subscribe_id} ({subscribe.name}) 已暂停,无法搜索",
"subscribe": subscribe_info
}, ensure_ascii=False)
# 如果提供了 filter_groups 参数,先更新订阅的规则组
if filter_groups is not None:
subscribe_oper.update(subscribe_id, {"filter_groups": filter_groups})
logger.info(f"更新订阅 #{subscribe_id} 的规则组为: {filter_groups}")
# 调用 SubscribeChain 的 search 方法
# search 方法是同步的,需要在异步环境中运行
subscribe_chain = SubscribeChain()
# 在线程池中执行同步的搜索操作
# 当 sid 有值时state 参数会被忽略,直接处理该订阅
await global_vars.loop.run_in_executor(
None,
lambda: subscribe_chain.search(
sid=subscribe_id,
state='R', # 默认状态,当 sid 有值时此参数会被忽略
manual=manual
)
)
# 重新获取订阅信息以获取更新后的状态
updated_subscribe = subscribe_oper.get(subscribe_id)
if updated_subscribe:
subscribe_info.update({
"state": updated_subscribe.state,
"lack_episode": updated_subscribe.lack_episode,
"last_update": updated_subscribe.last_update,
"filter_groups": updated_subscribe.filter_groups
})
# 如果提供了规则组,会在返回信息中显示
result = {
"success": True,
"message": f"订阅 #{subscribe_id} ({subscribe.name}) 搜索完成",
"subscribe": subscribe_info
}
if filter_groups is not None:
result["message"] += f"(已应用规则组: {', '.join(filter_groups)}"
return json.dumps(result, ensure_ascii=False, indent=2)
except Exception as e:
error_message = f"搜索订阅缺失剧集失败: {str(e)}"
logger.error(f"搜索订阅缺失剧集失败: {e}", exc_info=True)
return json.dumps({
"success": False,
"message": error_message,
"subscribe_id": subscribe_id
}, ensure_ascii=False)

View File

@@ -20,7 +20,7 @@ class UpdateSiteInput(BaseModel):
site_id: int = Field(..., description="The ID of the site to update")
name: Optional[str] = Field(None, description="Site name (optional)")
url: Optional[str] = Field(None, description="Site URL (optional, will be automatically formatted)")
pri: Optional[int] = Field(None, description="Site priority (optional, higher number = higher priority)")
pri: Optional[int] = Field(None, description="Site priority (optional, smaller value = higher priority, e.g., pri=1 has higher priority than pri=10)")
rss: Optional[str] = Field(None, description="RSS feed URL (optional)")
cookie: Optional[str] = Field(None, description="Site cookie (optional)")
ua: Optional[str] = Field(None, description="User-Agent string (optional)")
@@ -39,7 +39,7 @@ class UpdateSiteInput(BaseModel):
class UpdateSiteTool(MoviePilotTool):
name: str = "update_site"
description: str = "Update site configuration including URL, priority, authentication credentials (cookie, UA, API key), proxy settings, rate limits, and other site properties. Supports updating multiple site attributes at once."
description: str = "Update site configuration including URL, priority, authentication credentials (cookie, UA, API key), proxy settings, rate limits, and other site properties. Supports updating multiple site attributes at once. Site priority (pri): smaller values have higher priority (e.g., pri=1 has higher priority than pri=10)."
args_schema: Type[BaseModel] = UpdateSiteInput
def get_tool_message(self, **kwargs) -> Optional[str]:

187
app/agent/tools/manager.py Normal file
View File

@@ -0,0 +1,187 @@
"""MoviePilot工具管理器
用于HTTP API调用工具
"""
import json
from typing import Any, Dict, List, Optional
from app.agent.tools.factory import MoviePilotToolFactory
from app.log import logger
class ToolDefinition:
"""工具定义"""
def __init__(self, name: str, description: str, input_schema: Dict[str, Any]):
self.name = name
self.description = description
self.input_schema = input_schema
class MoviePilotToolsManager:
"""MoviePilot工具管理器用于HTTP API"""
def __init__(self, user_id: str = "api_user", session_id: str = "api_session"):
"""
初始化工具管理器
Args:
user_id: 用户ID
session_id: 会话ID
"""
self.user_id = user_id
self.session_id = session_id
self.tools: List[Any] = []
self._load_tools()
def _load_tools(self):
"""加载所有MoviePilot工具"""
try:
# 创建工具实例
self.tools = MoviePilotToolFactory.create_tools(
session_id=self.session_id,
user_id=self.user_id,
channel=None,
source="api",
username="API Client",
callback_handler=None
)
logger.info(f"成功加载 {len(self.tools)} 个工具")
except Exception as e:
logger.error(f"加载工具失败: {e}", exc_info=True)
self.tools = []
def list_tools(self) -> List[ToolDefinition]:
"""
列出所有可用的工具
Returns:
工具定义列表
"""
tools_list = []
for tool in self.tools:
# 获取工具的输入参数模型
args_schema = getattr(tool, 'args_schema', None)
if args_schema:
# 将Pydantic模型转换为JSON Schema
input_schema = self._convert_to_json_schema(args_schema)
else:
# 如果没有args_schema使用基本信息
input_schema = {
"type": "object",
"properties": {},
"required": []
}
tools_list.append(ToolDefinition(
name=tool.name,
description=tool.description or "",
input_schema=input_schema
))
return tools_list
def get_tool(self, tool_name: str) -> Optional[Any]:
"""
获取指定工具实例
Args:
tool_name: 工具名称
Returns:
工具实例如果未找到返回None
"""
for tool in self.tools:
if tool.name == tool_name:
return tool
return None
async def call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> str:
"""
调用工具
Args:
tool_name: 工具名称
arguments: 工具参数
Returns:
工具执行结果(字符串)
"""
tool_instance = self.get_tool(tool_name)
if not tool_instance:
error_msg = json.dumps({
"error": f"工具 '{tool_name}' 未找到"
}, ensure_ascii=False)
return error_msg
try:
# 调用工具的run方法
result = await tool_instance.run(**arguments)
# 确保返回字符串
if isinstance(result, str):
return result
else:
return json.dumps(result, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"调用工具 {tool_name} 时发生错误: {e}", exc_info=True)
error_msg = json.dumps({
"error": f"调用工具 '{tool_name}' 时发生错误: {str(e)}"
}, ensure_ascii=False)
return error_msg
@staticmethod
def _convert_to_json_schema(args_schema: Any) -> Dict[str, Any]:
"""
将Pydantic模型转换为JSON Schema
Args:
args_schema: Pydantic模型类
Returns:
JSON Schema字典
"""
# 获取Pydantic模型的字段信息
schema = args_schema.model_json_schema()
# 构建JSON Schema
properties = {}
required = []
if "properties" in schema:
for field_name, field_info in schema["properties"].items():
# 转换字段类型
field_type = field_info.get("type", "string")
field_description = field_info.get("description", "")
# 处理可选字段
if field_name not in schema.get("required", []):
# 可选字段
default_value = field_info.get("default")
properties[field_name] = {
"type": field_type,
"description": field_description
}
if default_value is not None:
properties[field_name]["default"] = default_value
else:
properties[field_name] = {
"type": field_type,
"description": field_description
}
required.append(field_name)
# 处理枚举类型
if "enum" in field_info:
properties[field_name]["enum"] = field_info["enum"]
# 处理数组类型
if field_type == "array" and "items" in field_info:
properties[field_name]["items"] = field_info["items"]
return {
"type": "object",
"properties": properties,
"required": required
}

View File

@@ -2,7 +2,7 @@ from fastapi import APIRouter
from app.api.endpoints import login, user, webhook, message, site, subscribe, \
media, douban, search, plugin, tmdb, history, system, download, dashboard, \
transfer, mediaserver, bangumi, storage, discover, recommend, workflow, torrent
transfer, mediaserver, bangumi, storage, discover, recommend, workflow, torrent, mcp
api_router = APIRouter()
api_router.include_router(login.router, prefix="/login", tags=["login"])
@@ -28,3 +28,4 @@ api_router.include_router(discover.router, prefix="/discover", tags=["discover"]
api_router.include_router(recommend.router, prefix="/recommend", tags=["recommend"])
api_router.include_router(workflow.router, prefix="/workflow", tags=["workflow"])
api_router.include_router(torrent.router, prefix="/torrent", tags=["torrent"])
api_router.include_router(mcp.router, prefix="/mcp", tags=["mcp"])

161
app/api/endpoints/mcp.py Normal file
View File

@@ -0,0 +1,161 @@
"""工具API端点
通过HTTP API暴露MoviePilot的智能体工具功能
"""
from typing import List, Any, Dict, Annotated
from fastapi import APIRouter, Depends, HTTPException
from app import schemas
from app.agent.tools.manager import MoviePilotToolsManager
from app.core.security import verify_apikey
from app.log import logger
router = APIRouter()
# 全局工具管理器实例单例模式按用户ID缓存
_tools_managers: Dict[str, MoviePilotToolsManager] = {}
def get_tools_manager(user_id: str = "mcp_user", session_id: str = "mcp_session") -> MoviePilotToolsManager:
"""
获取工具管理器实例按用户ID缓存
Args:
user_id: 用户ID
session_id: 会话ID
Returns:
MoviePilotToolsManager实例
"""
global _tools_managers
# 使用用户ID作为缓存键
cache_key = f"{user_id}_{session_id}"
if cache_key not in _tools_managers:
_tools_managers[cache_key] = MoviePilotToolsManager(
user_id=user_id,
session_id=session_id
)
return _tools_managers[cache_key]
@router.get("/tools", summary="列出所有可用工具", response_model=List[Dict[str, Any]])
async def list_tools(
_: Annotated[str, Depends(verify_apikey)]
) -> Any:
"""
获取所有可用的工具列表
返回每个工具的名称、描述和参数定义
"""
try:
manager = get_tools_manager()
# 获取所有工具定义
tools = manager.list_tools()
# 转换为字典格式
tools_list = []
for tool in tools:
tool_dict = {
"name": tool.name,
"description": tool.description,
"inputSchema": tool.input_schema
}
tools_list.append(tool_dict)
return tools_list
except Exception as e:
logger.error(f"获取工具列表失败: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"获取工具列表失败: {str(e)}")
@router.post("/tools/call", summary="调用工具", response_model=schemas.ToolCallResponse)
async def call_tool(
request: schemas.ToolCallRequest,
) -> Any:
"""
调用指定的工具
Returns:
工具执行结果
"""
try:
# 使用当前用户ID创建管理器实例
manager = get_tools_manager()
# 调用工具
result_text = await manager.call_tool(request.tool_name, request.arguments)
return schemas.ToolCallResponse(
success=True,
result=result_text
)
except Exception as e:
logger.error(f"调用工具 {request.tool_name} 失败: {e}", exc_info=True)
return schemas.ToolCallResponse(
success=False,
error=f"调用工具失败: {str(e)}"
)
@router.get("/tools/{tool_name}", summary="获取工具详情", response_model=Dict[str, Any])
async def get_tool_info(
tool_name: str,
_: Annotated[str, Depends(verify_apikey)]
) -> Any:
"""
获取指定工具的详细信息
Returns:
工具的详细信息,包括名称、描述和参数定义
"""
try:
manager = get_tools_manager()
# 获取所有工具
tools = manager.list_tools()
# 查找指定工具
for tool in tools:
if tool.name == tool_name:
return {
"name": tool.name,
"description": tool.description,
"inputSchema": tool.input_schema
}
raise HTTPException(status_code=404, detail=f"工具 '{tool_name}' 未找到")
except HTTPException:
raise
except Exception as e:
logger.error(f"获取工具信息失败: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"获取工具信息失败: {str(e)}")
@router.get("/tools/{tool_name}/schema", summary="获取工具参数Schema", response_model=Dict[str, Any])
async def get_tool_schema(
tool_name: str,
_: Annotated[str, Depends(verify_apikey)]
) -> Any:
"""
获取指定工具的参数SchemaJSON Schema格式
Returns:
工具的JSON Schema定义
"""
try:
manager = get_tools_manager()
# 获取所有工具
tools = manager.list_tools()
# 查找指定工具
for tool in tools:
if tool.name == tool_name:
return tool.input_schema
raise HTTPException(status_code=404, detail=f"工具 '{tool_name}' 未找到")
except HTTPException:
raise
except Exception as e:
logger.error(f"获取工具Schema失败: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"获取工具Schema失败: {str(e)}")

View File

@@ -10,7 +10,7 @@ from app.chain.download import DownloadChain
from app.chain.media import MediaChain
from app.chain.search import SearchChain
from app.chain.subscribe import SubscribeChain
from app.core.config import settings
from app.core.config import settings, global_vars
from app.core.context import MediaInfo, Context
from app.core.meta import MetaBase
from app.db.user_oper import UserOper
@@ -174,7 +174,7 @@ class MessageChain(ChainBase):
elif text.startswith('/ai') or text.startswith('/AI'):
# AI智能体处理
self._handle_ai_message(text=text, channel=channel, source=source,
userid=userid, username=username)
userid=userid, username=username)
elif text.startswith('/'):
# 执行命令
self.eventmanager.send_event(
@@ -329,7 +329,8 @@ class MessageChain(ChainBase):
else:
best_version = True
# 转换用户名
mp_name = UserOper().get_name(**{f"{channel.name.lower()}_userid": userid}) if channel else None
mp_name = UserOper().get_name(
**{f"{channel.name.lower()}_userid": userid}) if channel else None
# 添加订阅状态为N
SubscribeChain().add(title=mediainfo.title,
year=mediainfo.year,
@@ -505,7 +506,8 @@ class MessageChain(ChainBase):
# 开始搜索
if not medias:
self.post_message(Notification(
channel=channel, source=source, title=f"{meta.name} 没有找到对应的媒体信息!", userid=userid))
channel=channel, source=source, title=f"{meta.name} 没有找到对应的媒体信息!",
userid=userid))
return
logger.info(f"搜索到 {len(medias)} 条相关媒体信息")
try:
@@ -835,21 +837,22 @@ class MessageChain(ChainBase):
如果用户上次会话在15分钟内则复用相同的会话ID否则创建新的会话ID
"""
current_time = datetime.now()
# 检查用户是否有已存在的会话
if userid in MessageChain._user_sessions:
session_id, last_time = MessageChain._user_sessions[userid]
# 计算时间差
time_diff = current_time - last_time
# 如果时间差小于等于15分钟复用会话ID
if time_diff <= timedelta(minutes=MessageChain._session_timeout_minutes):
# 更新最后使用时间
MessageChain._user_sessions[userid] = (session_id, current_time)
logger.info(f"复用会话ID: {session_id}, 用户: {userid}, 距离上次会话: {time_diff.total_seconds() / 60:.1f}分钟")
logger.info(
f"复用会话ID: {session_id}, 用户: {userid}, 距离上次会话: {time_diff.total_seconds() / 60:.1f}分钟")
return session_id
# 创建新的会话ID
new_session_id = f"user_{userid}_{int(time.time())}"
MessageChain._user_sessions[userid] = (new_session_id, current_time)
@@ -877,28 +880,20 @@ class MessageChain(ChainBase):
if userid in MessageChain._user_sessions:
session_id, _ = MessageChain._user_sessions.pop(userid)
logger.info(f"已清除用户 {userid} 的会话: {session_id}")
# 如果有会话ID同时清除智能体的会话记忆
if session_id:
try:
try:
loop = asyncio.get_event_loop()
loop.run_until_complete(
agent_manager.clear_session(
session_id=session_id,
user_id=str(userid)
)
)
except RuntimeError:
asyncio.run(
agent_manager.clear_session(
session_id=session_id,
user_id=str(userid)
)
)
asyncio.run_coroutine_threadsafe(
agent_manager.clear_session(
session_id=session_id,
user_id=str(userid)
),
global_vars.loop
)
except Exception as e:
logger.warning(f"清除智能体会话记忆失败: {e}")
self.post_message(Notification(
channel=channel,
source=source,
@@ -914,7 +909,7 @@ class MessageChain(ChainBase):
))
def _handle_ai_message(self, text: str, channel: MessageChannel, source: str,
userid: Union[str, int], username: str) -> None:
userid: Union[str, int], username: str) -> None:
"""
处理AI智能体消息
"""
@@ -955,34 +950,20 @@ class MessageChain(ChainBase):
# 生成或复用会话ID
session_id = self._get_or_create_session_id(userid)
# 在事件循环中处理
try:
loop = asyncio.get_event_loop()
loop.run_until_complete(
agent_manager.process_message(
session_id=session_id,
user_id=str(userid),
message=user_message,
channel=channel.value if channel else None,
source=source,
username=username
)
)
except RuntimeError:
# 如果没有事件循环,创建新的
asyncio.run(
agent_manager.process_message(
session_id=session_id,
user_id=str(userid),
message=user_message,
channel=channel.value if channel else None,
source=source,
username=username
)
)
asyncio.run_coroutine_threadsafe(
agent_manager.process_message(
session_id=session_id,
user_id=str(userid),
message=user_message,
channel=channel.value if channel else None,
source=source,
username=username
),
global_vars.loop
)
except Exception as e:
logger.error(f"处理AI智能体消息失败: {e}")
self.messagehelper.put(f"AI智能体处理失败: {str(e)}", role="system", title="MoviePilot助手")

View File

@@ -1,3 +1,4 @@
import asyncio
import copy
import json
import os
@@ -6,6 +7,7 @@ import re
import secrets
import sys
import threading
from asyncio import AbstractEventLoop
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Type
from urllib.parse import urlparse
@@ -852,6 +854,8 @@ class GlobalVar(object):
EMERGENCY_STOP_WORKFLOWS: List[int] = []
# 需应急停止文件整理
EMERGENCY_STOP_TRANSFER: List[str] = []
# 当前事件循环
CURRENT_EVENT_LOOP: AbstractEventLoop = asyncio.get_event_loop()
def stop_system(self):
"""
@@ -916,6 +920,19 @@ class GlobalVar(object):
return True
return False
@property
def loop(self) -> AbstractEventLoop:
"""
当前循环
"""
return self.CURRENT_EVENT_LOOP
def set_loop(self, loop: AbstractEventLoop):
"""
设置循环
"""
self.CURRENT_EVENT_LOOP = loop
# 全局标识
global_vars = GlobalVar()

View File

@@ -11,6 +11,7 @@ from typing import Callable, Dict, List, Optional, Tuple, Union, Any
from fastapi.concurrency import run_in_threadpool
from app.core.config import global_vars
from app.helper.thread import ThreadHelper
from app.log import logger
from app.schemas import ChainEventData
@@ -90,8 +91,6 @@ class EventManager(metaclass=Singleton):
self.__lock = threading.Lock()
# 退出事件
self.__event = threading.Event()
# 当前事件循环
self.loop = asyncio.get_event_loop()
def start(self):
"""
@@ -454,7 +453,7 @@ class EventManager(metaclass=Singleton):
# 对于异步函数,直接在事件循环中运行
asyncio.run_coroutine_threadsafe(
self.__safe_invoke_handler_async(handler, isolated_event),
self.loop
global_vars.loop
)
else:
# 对于同步函数,在线程池中运行

View File

@@ -261,7 +261,7 @@ def verify_apitoken(token: Annotated[str, Security(__get_api_token)]) -> str:
def verify_apikey(apikey: Annotated[str, Security(__get_api_key)]) -> str:
"""
使用 API Key 进行身份认证
:param apikey: API Key从 URL 查询参数中获取 apikey=xxx
:param apikey: API Key从 URL 查询参数中获取 apikey=xxx,或请求头中获取 X-API-KEY=xxx
:return: 返回校验通过的 API Key
"""
return __verify_key(apikey, settings.API_TOKEN, "apikey")

View File

@@ -65,6 +65,14 @@ class MediaServerItem(Base):
@classmethod
@db_query
def exists_by_title(cls, db: Session, title: str, mtype: str, year: str):
if not mtype and not year:
return db.query(cls).filter(cls.title == title).first()
elif not year:
return db.query(cls).filter(cls.title == title,
cls.item_type == mtype).first()
elif not mtype:
return db.query(cls).filter(cls.title == title,
cls.year == str(year)).first()
return db.query(cls).filter(cls.title == title,
cls.item_type == mtype,
cls.year == str(year)).first()
@@ -85,7 +93,16 @@ class MediaServerItem(Base):
@classmethod
@async_db_query
async def async_exists_by_title(cls, db: AsyncSession, title: str, mtype: str, year: str):
result = await db.execute(select(cls).filter(cls.title == title,
if not mtype and not year:
result = await db.execute(select(cls).filter(cls.title == title))
elif not year:
result = await db.execute(select(cls).filter(cls.title == title,
cls.item_type == mtype))
elif not mtype:
result = await db.execute(select(cls).filter(cls.title == title,
cls.year == str(year)))
else:
result = await db.execute(select(cls).filter(cls.title == title,
cls.item_type == mtype,
cls.year == str(year)))
return result.scalars().first()

View File

@@ -1,15 +1,16 @@
import base64
import hashlib
import secrets
import threading
import time
from pathlib import Path
from typing import List, Optional, Tuple, Union
from threading import Lock
from typing import List, Optional, Tuple, Union, Dict
from hashlib import sha256
import oss2
import requests
import httpx
from oss2 import SizedFileAdapter, determine_part_size
from oss2.models import PartInfo
from cryptography.hazmat.primitives import hashes
from app import schemas
from app.core.config import settings, global_vars
@@ -19,8 +20,10 @@ from app.modules.filemanager.storages import transfer_process
from app.schemas.types import StorageSchema
from app.utils.singleton import WeakSingleton
from app.utils.string import StringUtils
from app.utils.limit import QpsRateLimiter
lock = threading.Lock()
lock = Lock()
class NoCheckInException(Exception):
@@ -36,10 +39,7 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
schema = StorageSchema.U115
# 支持的整理方式
transtype = {
"move": "移动",
"copy": "复制"
}
transtype = {"move": "移动", "copy": "复制"}
# 基础url
base_url = "https://proapi.115.com"
@@ -52,18 +52,28 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
def __init__(self):
super().__init__()
self._auth_state = {}
self.session = requests.Session()
self.session = httpx.Client(follow_redirects=True, timeout=20.0)
self._init_session()
self.qps_limiter: Dict[str, QpsRateLimiter] = {
"/open/ufile/files": QpsRateLimiter(4),
"/open/folder/get_info": QpsRateLimiter(3),
"/open/ufile/move": QpsRateLimiter(2),
"/open/ufile/copy": QpsRateLimiter(2),
"/open/ufile/update": QpsRateLimiter(2),
"/open/ufile/delete": QpsRateLimiter(2),
}
def _init_session(self):
"""
初始化带速率限制的会话
"""
self.session.headers.update({
"User-Agent": "W115Storage/2.0",
"Accept-Encoding": "gzip, deflate",
"Content-Type": "application/x-www-form-urlencoded"
})
self.session.headers.update(
{
"User-Agent": "W115Storage/2.0",
"Accept-Encoding": "gzip, deflate",
"Content-Type": "application/x-www-form-urlencoded",
}
)
def _check_session(self):
"""
@@ -87,10 +97,7 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
if expires_in and refresh_time + expires_in < int(time.time()):
tokens = self.__refresh_access_token(refresh_token)
if tokens:
self.set_config({
"refresh_time": int(time.time()),
**tokens
})
self.set_config({"refresh_time": int(time.time()), **tokens})
else:
return None
access_token = tokens.get("access_token")
@@ -105,7 +112,7 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
# 生成PKCE参数
code_verifier = secrets.token_urlsafe(96)[:128]
code_challenge = base64.b64encode(
hashlib.sha256(code_verifier.encode("utf-8")).digest()
sha256(code_verifier.encode("utf-8")).digest()
).decode("utf-8")
# 请求设备码
resp = self.session.post(
@@ -113,8 +120,8 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
data={
"client_id": settings.U115_APP_ID,
"code_challenge": code_challenge,
"code_challenge_method": "sha256"
}
"code_challenge_method": "sha256",
},
)
if resp is None:
return {}, "网络错误"
@@ -126,13 +133,11 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
"code_verifier": code_verifier,
"uid": result["data"]["uid"],
"time": result["data"]["time"],
"sign": result["data"]["sign"]
"sign": result["data"]["sign"],
}
# 生成二维码内容
return {
"codeContent": result['data']['qrcode']
}, ""
return {"codeContent": result["data"]["qrcode"]}, ""
def check_login(self) -> Optional[Tuple[dict, str]]:
"""
@@ -146,8 +151,8 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
params={
"uid": self._auth_state["uid"],
"time": self._auth_state["time"],
"sign": self._auth_state["sign"]
}
"sign": self._auth_state["sign"],
},
)
if resp is None:
return {}, "网络错误"
@@ -156,11 +161,11 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
return {}, result.get("message")
if result["data"]["status"] == 2:
tokens = self.__get_access_token()
self.set_config({
"refresh_time": int(time.time()),
**tokens
})
return {"status": result["data"]["status"], "tip": result["data"]["msg"]}, ""
self.set_config({"refresh_time": int(time.time()), **tokens})
return {
"status": result["data"]["status"],
"tip": result["data"]["msg"],
}, ""
except Exception as e:
return {}, str(e)
@@ -174,8 +179,8 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
"https://passportapi.115.com/open/deviceCodeToToken",
data={
"uid": self._auth_state["uid"],
"code_verifier": self._auth_state["code_verifier"]
}
"code_verifier": self._auth_state["code_verifier"],
},
)
if resp is None:
raise Exception("获取 access_token 失败")
@@ -190,21 +195,24 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
"""
resp = self.session.post(
"https://passportapi.115.com/open/refreshToken",
data={
"refresh_token": refresh_token
}
data={"refresh_token": refresh_token},
)
if resp is None:
logger.error(f"【115】刷新 access_token 失败refresh_token={refresh_token}")
logger.error(
f"【115】刷新 access_token 失败refresh_token={refresh_token}"
)
return None
result = resp.json()
if result.get("code") != 0:
logger.warn(f"【115】刷新 access_token 失败:{result.get('code')} - {result.get('message')}")
logger.warn(
f"【115】刷新 access_token 失败:{result.get('code')} - {result.get('message')}"
)
return None
return result.get("data")
def _request_api(self, method: str, endpoint: str,
result_key: Optional[str] = None, **kwargs) -> Optional[Union[dict, list]]:
def _request_api(
self, method: str, endpoint: str, result_key: Optional[str] = None, **kwargs
) -> Optional[Union[dict, list]]:
"""
带错误处理和速率限制的API请求
"""
@@ -216,12 +224,13 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
# 重试次数
retry_times = kwargs.pop("retry_limit", 5)
# qps 速率限制
if endpoint in self.qps_limiter:
self.qps_limiter[endpoint].acquire()
try:
resp = self.session.request(
method, f"{self.base_url}{endpoint}",
**kwargs
)
except requests.exceptions.RequestException as e:
resp = self.session.request(method, f"{self.base_url}{endpoint}", **kwargs)
except httpx.RequestError as e:
logger.error(f"【115】{method} 请求 {endpoint} 网络错误: {str(e)}")
return None
@@ -241,7 +250,21 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
return self._request_api(method, endpoint, result_key, **kwargs)
# 处理请求错误
resp.raise_for_status()
try:
resp.raise_for_status()
except httpx.HTTPStatusError as e:
if retry_times <= 0:
logger.error(
f"【115】{method} 请求 {endpoint} 错误 {e},重试次数用尽!"
)
return None
kwargs["retry_limit"] = retry_times - 1
sleep_duration = 2 ** (5 - retry_times + 1)
logger.info(
f"【115】{method} 请求 {endpoint} 错误 {e},等待 {sleep_duration} 秒后重试..."
)
time.sleep(sleep_duration)
return self._request_api(method, endpoint, result_key, **kwargs)
# 返回数据
ret_data = resp.json()
@@ -251,10 +274,14 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
logger.warn(f"【115】{method} 请求 {endpoint} 出错:{error_msg}")
if "已达到当前访问上限" in error_msg:
if retry_times <= 0:
logger.error(f"【115】{method} 请求 {endpoint} 达到访问上限,重试次数用尽!")
logger.error(
f"【115】{method} 请求 {endpoint} 达到访问上限,重试次数用尽!"
)
return None
kwargs["retry_limit"] = retry_times - 1
logger.info(f"【115】{method} 请求 {endpoint} 达到访问上限,等待 {self.retry_delay} 秒后重试...")
logger.info(
f"【115】{method} 请求 {endpoint} 达到访问上限,等待 {self.retry_delay} 秒后重试..."
)
time.sleep(self.retry_delay)
return self._request_api(method, endpoint, result_key, **kwargs)
return None
@@ -269,26 +296,15 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
计算文件SHA1符合115规范
size: 前多少字节
"""
sha1 = hashlib.sha1()
with open(filepath, 'rb') as f:
sha1 = hashes.Hash(hashes.SHA1())
with open(filepath, "rb") as f:
if size:
chunk = f.read(size)
sha1.update(chunk)
else:
while chunk := f.read(8192):
sha1.update(chunk)
return sha1.hexdigest()
def _delay_get_item(self, path: Path) -> Optional[schemas.FileItem]:
"""
自动延迟重试 get_item 模块
"""
for i in range(1, 4):
time.sleep(2 ** i)
fileitem = self.get_item(path)
if fileitem:
return fileitem
return None
return sha1.finalize().hex()
def init_storage(self):
pass
@@ -304,7 +320,7 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
return [item]
return []
if fileitem.path == "/":
cid = '0'
cid = "0"
else:
cid = fileitem.fileid
if not cid:
@@ -322,29 +338,37 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
"GET",
"/open/ufile/files",
"data",
params={"cid": int(cid), "limit": 1000, "offset": offset, "cur": True, "show_dir": 1}
params={
"cid": int(cid),
"limit": 1000,
"offset": offset,
"cur": True,
"show_dir": 1,
},
)
if resp is None:
raise FileNotFoundError(f"【115】{fileitem.path} 检索出错!")
if not resp:
break
for item in resp:
# 更新缓存
path = f"{fileitem.path}{item['fn']}"
file_path = path + ("/" if item["fc"] == "0" else "")
items.append(schemas.FileItem(
storage=self.schema.value,
fileid=str(item["fid"]),
parent_fileid=cid,
name=item["fn"],
basename=Path(item["fn"]).stem,
extension=item["ico"] if item["fc"] == "1" else None,
type="dir" if item["fc"] == "0" else "file",
path=file_path,
size=item["fs"] if item["fc"] == "1" else None,
modify_time=item["upt"],
pickcode=item["pc"]
))
parent_path = Path(fileitem.path) # noqa
item_name = item["fn"]
full_path = parent_path / item_name
items.append(
schemas.FileItem(
storage=self.schema.value,
fileid=str(item["fid"]),
parent_fileid=cid,
name=item["fn"],
basename=Path(item["fn"]).stem,
extension=item["ico"] if item["fc"] == "1" else None,
type="dir" if item["fc"] == "0" else "file",
path=full_path.as_posix() + ("/" if item["fc"] == "0" else ""),
size=item["fs"] if item["fc"] == "1" else None,
modify_time=item["upt"],
pickcode=item["pc"],
)
)
if len(resp) < 1000:
break
@@ -352,7 +376,9 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
return items
def create_folder(self, parent_item: schemas.FileItem, name: str) -> Optional[schemas.FileItem]:
def create_folder(
self, parent_item: schemas.FileItem, name: str
) -> Optional[schemas.FileItem]:
"""
创建目录
"""
@@ -360,10 +386,7 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
resp = self._request_api(
"POST",
"/open/folder/add",
data={
"pid": int(parent_item.fileid or "0"),
"file_name": name
}
data={"pid": int(parent_item.fileid or "0"), "file_name": name},
)
if not resp:
return None
@@ -376,15 +399,19 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
return schemas.FileItem(
storage=self.schema.value,
fileid=str(resp["data"]["file_id"]),
path=str(new_path) + "/",
path=new_path.as_posix() + "/",
name=name,
basename=name,
type="dir",
modify_time=int(time.time())
modify_time=int(time.time()),
)
def upload(self, target_dir: schemas.FileItem, local_path: Path,
new_name: Optional[str] = None) -> Optional[schemas.FileItem]:
def upload(
self,
target_dir: schemas.FileItem,
local_path: Path,
new_name: Optional[str] = None,
) -> Optional[schemas.FileItem]:
"""
实现带秒传、断点续传和二次认证的文件上传
"""
@@ -409,13 +436,9 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
"file_size": file_size,
"target": target_param,
"fileid": file_sha1,
"preid": file_preid
"preid": file_preid,
}
init_resp = self._request_api(
"POST",
"/open/upload/init",
data=init_data
)
init_resp = self._request_api("POST", "/open/upload/init", data=init_data)
if not init_resp:
return None
if not init_resp.get("state"):
@@ -444,19 +467,15 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
# 取2392148-2392298之间的内容(包含2392148、2392298)的sha1
f.seek(start)
chunk = f.read(end - start + 1)
sign_val = hashlib.sha1(chunk).hexdigest().upper()
sha1 = hashes.Hash(hashes.SHA1())
sha1.update(chunk)
sign_val = sha1.finalize().hex().upper()
# 重新初始化请求
# sign_keysign_val(根据sign_check计算的值大写的sha1值)
init_data.update({
"pick_code": pick_code,
"sign_key": sign_key,
"sign_val": sign_val
})
init_resp = self._request_api(
"POST",
"/open/upload/init",
data=init_data
init_data.update(
{"pick_code": pick_code, "sign_key": sign_key, "sign_val": sign_val}
)
init_resp = self._request_api("POST", "/open/upload/init", data=init_data)
if not init_resp:
return None
if not init_resp.get("state"):
@@ -485,32 +504,30 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
"GET",
"/open/folder/get_info",
"data",
params={
"file_id": int(file_id)
}
params={"file_id": int(file_id)},
)
if info_resp:
return schemas.FileItem(
storage=self.schema.value,
fileid=str(info_resp["file_id"]),
path=str(target_path) + ("/" if info_resp["file_category"] == "0" else ""),
path=target_path.as_posix()
+ ("/" if info_resp["file_category"] == "0" else ""),
type="file" if info_resp["file_category"] == "1" else "dir",
name=info_resp["file_name"],
basename=Path(info_resp["file_name"]).stem,
extension=Path(info_resp["file_name"]).suffix[1:] if info_resp[
"file_category"] == "1" else None,
extension=Path(info_resp["file_name"]).suffix[1:]
if info_resp["file_category"] == "1"
else None,
pickcode=info_resp["pick_code"],
size=StringUtils.num_filesize(info_resp['size']) if info_resp["file_category"] == "1" else None,
modify_time=info_resp["utime"]
size=StringUtils.num_filesize(info_resp["size"])
if info_resp["file_category"] == "1"
else None,
modify_time=info_resp["utime"],
)
return self._delay_get_item(target_path)
return self.get_item(target_path)
# Step 4: 获取上传凭证
token_resp = self._request_api(
"GET",
"/open/upload/get_token",
"data"
)
token_resp = self._request_api("GET", "/open/upload/get_token", "data")
if not token_resp:
logger.warn("【115】获取上传凭证失败")
return None
@@ -530,8 +547,8 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
"file_size": file_size,
"target": target_param,
"fileid": file_sha1,
"pick_code": pick_code
}
"pick_code": pick_code,
},
)
if resume_resp:
logger.debug(f"【115】上传 Step 5 断点续传结果: {resume_resp}")
@@ -542,25 +559,25 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
auth = oss2.StsAuth(
access_key_id=AccessKeyId,
access_key_secret=AccessKeySecret,
security_token=SecurityToken
security_token=SecurityToken,
)
bucket = oss2.Bucket(auth, endpoint, bucket_name) # noqa
# determine_part_size方法用于确定分片大小设置分片大小为 10M
part_size = determine_part_size(file_size, preferred_size=10 * 1024 * 1024)
# 初始化进度条
logger.info(f"【115】开始上传: {local_path} -> {target_path},分片大小:{StringUtils.str_filesize(part_size)}")
logger.info(
f"【115】开始上传: {local_path} -> {target_path},分片大小:{StringUtils.str_filesize(part_size)}"
)
progress_callback = transfer_process(local_path.as_posix())
# 初始化分片
upload_id = bucket.init_multipart_upload(object_name,
params={
"encoding-type": "url",
"sequential": ""
}).upload_id
upload_id = bucket.init_multipart_upload(
object_name, params={"encoding-type": "url", "sequential": ""}
).upload_id
parts = []
# 逐个上传分片
with open(local_path, 'rb') as fileobj:
with open(local_path, "rb") as fileobj:
part_number = 1
offset = 0
while offset < file_size:
@@ -569,9 +586,15 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
return None
num_to_upload = min(part_size, file_size - offset)
# 调用SizedFileAdapter(fileobj, size)方法会生成一个新的文件对象,重新计算起始追加位置。
logger.info(f"【115】开始上传 {target_name} 分片 {part_number}: {offset} -> {offset + num_to_upload}")
result = bucket.upload_part(object_name, upload_id, part_number,
data=SizedFileAdapter(fileobj, num_to_upload))
logger.info(
f"【115】开始上传 {target_name} 分片 {part_number}: {offset} -> {offset + num_to_upload}"
)
result = bucket.upload_part(
object_name,
upload_id,
part_number,
data=SizedFileAdapter(fileobj, num_to_upload),
)
parts.append(PartInfo(part_number, result.etag))
logger.info(f"【115】{target_name} 分片 {part_number} 上传完成")
offset += num_to_upload
@@ -585,15 +608,18 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
# 请求头
headers = {
'X-oss-callback': encode_callback(callback["callback"]),
'x-oss-callback-var': encode_callback(callback["callback_var"]),
'x-oss-forbid-overwrite': 'false'
"X-oss-callback": encode_callback(callback["callback"]),
"x-oss-callback-var": encode_callback(callback["callback_var"]),
"x-oss-forbid-overwrite": "false",
}
try:
result = bucket.complete_multipart_upload(object_name, upload_id, parts,
headers=headers)
result = bucket.complete_multipart_upload(
object_name, upload_id, parts, headers=headers
)
if result.status == 200:
logger.debug(f"【115】上传 Step 6 回调结果:{result.resp.response.json()}")
logger.debug(
f"【115】上传 Step 6 回调结果:{result.resp.response.json()}"
)
logger.info(f"【115】{target_name} 上传成功")
else:
logger.warn(f"【115】{target_name} 上传失败,错误码: {result.status}")
@@ -602,10 +628,12 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
if e.code == "FileAlreadyExists":
logger.warn(f"【115】{target_name} 已存在")
else:
logger.error(f"【115】{target_name} 上传失败: {e.status}, 错误码: {e.code}, 详情: {e.message}")
logger.error(
f"【115】{target_name} 上传失败: {e.status}, 错误码: {e.code}, 详情: {e.message}"
)
return None
# 返回结果
return self._delay_get_item(target_path)
return self.get_item(target_path)
def download(self, fileitem: schemas.FileItem, path: Path = None) -> Optional[Path]:
"""
@@ -617,12 +645,7 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
return None
download_info = self._request_api(
"POST",
"/open/ufile/downurl",
"data",
data={
"pick_code": detail.pickcode
}
"POST", "/open/ufile/downurl", "data", data={"pick_code": detail.pickcode}
)
if not download_info:
logger.error(f"【115】获取下载链接失败: {fileitem.name}")
@@ -643,28 +666,26 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
progress_callback = transfer_process(Path(fileitem.path).as_posix())
try:
with self.session.get(download_url, stream=True) as r:
with self.session.stream("GET", download_url) as r:
r.raise_for_status()
downloaded_size = 0
with open(local_path, "wb") as f:
for chunk in r.iter_content(chunk_size=self.chunk_size):
for chunk in r.iter_bytes(chunk_size=self.chunk_size):
if global_vars.is_transfer_stopped(fileitem.path):
logger.info(f"【115】{fileitem.path} 下载已取消!")
r.close()
return None
if chunk:
f.write(chunk)
downloaded_size += len(chunk)
# 更新进度
if file_size:
progress = (downloaded_size * 100) / file_size
progress_callback(progress)
f.write(chunk)
downloaded_size += len(chunk)
if file_size:
progress = (downloaded_size * 100) / file_size
progress_callback(progress)
# 完成下载
progress_callback(100)
logger.info(f"【115】下载完成: {fileitem.name}")
except requests.exceptions.RequestException as e:
except httpx.RequestError as e:
logger.error(f"【115】下载网络错误: {fileitem.name} - {str(e)}")
# 删除可能部分下载的文件
if local_path.exists():
@@ -688,14 +709,10 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
"""
try:
self._request_api(
"POST",
"/open/ufile/delete",
data={
"file_ids": int(fileitem.fileid)
}
"POST", "/open/ufile/delete", data={"file_ids": int(fileitem.fileid)}
)
return True
except requests.exceptions.HTTPError:
except httpx.HTTPError:
return False
def rename(self, fileitem: schemas.FileItem, name: str) -> bool:
@@ -705,10 +722,7 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
resp = self._request_api(
"POST",
"/open/ufile/update",
data={
"file_id": int(fileitem.fileid),
"file_name": name
}
data={"file_id": int(fileitem.fileid), "file_name": name},
)
if not resp:
return False
@@ -725,10 +739,8 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
"POST",
"/open/folder/get_info",
"data",
data={
"path": path.as_posix()
},
no_error_log=True
data={"path": path.as_posix()},
no_error_log=True,
)
if not resp:
return None
@@ -739,10 +751,12 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
type="file" if resp["file_category"] == "1" else "dir",
name=resp["file_name"],
basename=Path(resp["file_name"]).stem,
extension=Path(resp["file_name"]).suffix[1:] if resp["file_category"] == "1" else None,
extension=Path(resp["file_name"]).suffix[1:]
if resp["file_category"] == "1"
else None,
pickcode=resp["pick_code"],
size=resp['size_byte'] if resp["file_category"] == "1" else None,
modify_time=resp["utime"]
size=resp["size_byte"] if resp["file_category"] == "1" else None,
modify_time=resp["utime"],
)
except Exception as e:
logger.debug(f"【115】获取文件信息失败: {str(e)}")
@@ -753,7 +767,9 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
获取指定路径的文件夹,如不存在则创建
"""
def __find_dir(_fileitem: schemas.FileItem, _name: str) -> Optional[schemas.FileItem]:
def __find_dir(
_fileitem: schemas.FileItem, _name: str
) -> Optional[schemas.FileItem]:
"""
查找下级目录中匹配名称的目录
"""
@@ -808,13 +824,13 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
data={
"file_id": int(fileitem.fileid),
"pid": int(dest_fileitem.fileid),
}
},
)
if not resp:
return False
if resp["state"]:
new_path = Path(path) / fileitem.name
new_item = self._delay_get_item(new_path)
new_item = self.get_item(new_path)
if not new_item:
return False
if self.rename(new_item, new_name):
@@ -840,13 +856,13 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
data={
"file_ids": int(fileitem.fileid),
"to_cid": int(dest_fileitem.fileid),
}
},
)
if not resp:
return False
if resp["state"]:
new_path = Path(path) / fileitem.name
new_file = self._delay_get_item(new_path)
new_file = self.get_item(new_path)
if not new_file:
return False
if self.rename(new_file, new_name):
@@ -864,17 +880,12 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
获取带有企业级配额信息的存储使用情况
"""
try:
resp = self._request_api(
"GET",
"/open/user/info",
"data"
)
resp = self._request_api("GET", "/open/user/info", "data")
if not resp:
return None
space = resp["rt_space_info"]
return schemas.StorageUsage(
total=space["all_total"]["size"],
available=space["all_remain"]["size"]
total=space["all_total"]["size"], available=space["all_remain"]["size"]
)
except NoCheckInException:
return None

View File

@@ -21,7 +21,7 @@ from app.chain.site import SiteChain
from app.chain.subscribe import SubscribeChain
from app.chain.transfer import TransferChain
from app.chain.workflow import WorkflowChain
from app.core.config import settings
from app.core.config import settings, global_vars
from app.core.event import eventmanager, Event
from app.core.plugin import PluginManager
from app.db.systemconfig_oper import SystemConfigOper
@@ -60,8 +60,7 @@ class Scheduler(metaclass=SingletonClass):
self._auth_count = 0
# 用户认证失败消息发送
self._auth_message = False
# 当前事件循环
self.loop = asyncio.get_event_loop()
# 初始化
self.init()
@eventmanager.register(EventType.ConfigChanged)
@@ -475,7 +474,7 @@ class Scheduler(metaclass=SingletonClass):
"""
启动协程
"""
return asyncio.run_coroutine_threadsafe(coro, self.loop)
return asyncio.run_coroutine_threadsafe(coro, global_vars.loop)
# 获取定时任务
job = self.__prepare_job(job_id)

View File

@@ -22,3 +22,5 @@ from .token import *
from .transfer import *
from .user import *
from .workflow import *
from .mcp import *

16
app/schemas/mcp.py Normal file
View File

@@ -0,0 +1,16 @@
from typing import Any, Dict, Optional
from pydantic import BaseModel, Field
class ToolCallRequest(BaseModel):
"""工具调用请求模型"""
tool_name: str = Field(..., description="工具名称")
arguments: Dict[str, Any] = Field(default_factory=dict, description="工具参数")
class ToolCallResponse(BaseModel):
"""工具调用响应模型"""
success: bool = Field(..., description="是否成功")
result: Optional[str] = Field(None, description="工具执行结果")
error: Optional[str] = Field(None, description="错误信息")

View File

@@ -75,7 +75,7 @@ class SiteUserData(BaseModel):
# 用户名
username: Optional[str] = None
# 用户ID
userid: Optional[str] = None
userid: Optional[Union[str, int]] = None
# 用户等级
user_level: Optional[str] = None
# 加入时间

View File

@@ -4,6 +4,7 @@ from contextlib import asynccontextmanager
from fastapi import FastAPI
from app.chain.system import SystemChain
from app.core.config import global_vars
from app.helper.system import SystemHelper
from app.startup.command_initializer import init_command, stop_command, restart_command
from app.startup.modules_initializer import init_modules, stop_modules
@@ -35,6 +36,8 @@ async def lifespan(app: FastAPI):
定义应用的生命周期事件
"""
print("Starting up...")
# 存储当前循环
global_vars.set_loop(asyncio.get_event_loop())
# 初始化路由
init_routers(app)
# 初始化模块

View File

@@ -1,5 +1,4 @@
import asyncio
from app.core.config import global_vars
from app.core.plugin import PluginManager
from app.log import logger
@@ -9,7 +8,7 @@ async def sync_plugins() -> bool:
初始化安装插件并动态注册后台任务及API
"""
try:
loop = asyncio.get_event_loop()
loop = global_vars.loop
plugin_manager = PluginManager()
sync_result = await execute_task(loop, plugin_manager.sync, "插件同步到本地")

View File

@@ -382,3 +382,28 @@ def rate_limit_window(max_calls: int, window_seconds: float,
limiter = WindowRateLimiter(max_calls, window_seconds, source, enable_logging)
# 使用通用装饰器逻辑包装该限流器
return rate_limit_handler(limiter, raise_on_limit)
class QpsRateLimiter:
"""
速率控制器,精确控制 QPS
"""
def __init__(self, qps: float | int):
if qps <= 0:
qps = float("inf")
self.interval = 1.0 / qps
self.lock = threading.Lock()
self.next_call_time = time.monotonic()
def acquire(self) -> None:
"""
获取调用许可,阻塞直到满足速率限制
"""
sleep_duration = 0
with self.lock:
now = time.monotonic()
sleep_duration = self.next_call_time - now
self.next_call_time = max(now, self.next_call_time) + self.interval
if sleep_duration > 0:
time.sleep(sleep_duration)

View File

@@ -41,7 +41,7 @@ class WorkFlowManager(metaclass=Singleton):
return False
if obj.__name__ == "BaseAction":
return False
return obj.__module__.startswith("app.actions")
return obj.__module__.startswith("app.workflow.actions")
# 加载所有动作
self._actions = {}

278
docs/mcp-api.md Normal file
View File

@@ -0,0 +1,278 @@
# MoviePilot 工具API文档
MoviePilot的智能体工具已通过HTTP API暴露可以通过RESTful API调用所有工具。
## API端点
所有工具相关的API端点都在 `/api/v1/mcp` 路径下(保持向后兼容)。
### 1. 列出所有工具
**GET** `/api/v1/mcp/tools`
获取所有可用的MCP工具列表。
**认证**: 需要API KEY在请求头中添加 `X-API-KEY: <api_key>` 或在查询参数中添加 `apikey=<api_key>`
**响应示例**:
```json
[
{
"name": "add_subscribe",
"description": "Add media subscription to create automated download rules...",
"inputSchema": {
"type": "object",
"properties": {
"title": {
"type": "string",
"description": "The title of the media to subscribe to"
},
"year": {
"type": "string",
"description": "Release year of the media"
},
...
},
"required": ["title", "year", "media_type"]
}
},
...
]
```
### 2. 调用工具
**POST** `/api/v1/mcp/tools/call`
调用指定的MCP工具。
**认证**: 需要API KEY在请求头中添加 `X-API-KEY: <api_key>` 或在查询参数中添加 `apikey=<api_key>`
**请求体**:
```json
{
"tool_name": "add_subscribe",
"arguments": {
"title": "流浪地球",
"year": "2019",
"media_type": "电影"
}
}
```
**响应示例**:
```json
{
"success": true,
"result": "成功添加订阅:流浪地球 (2019)",
"error": null
}
```
**错误响应示例**:
```json
{
"success": false,
"result": null,
"error": "调用工具失败: 参数验证失败"
}
```
### 3. 获取工具详情
**GET** `/api/v1/mcp/tools/{tool_name}`
获取指定工具的详细信息。
**认证**: 需要API KEY在请求头中添加 `X-API-KEY: <api_key>` 或在查询参数中添加 `apikey=<api_key>`
**路径参数**:
- `tool_name`: 工具名称
**响应示例**:
```json
{
"name": "add_subscribe",
"description": "Add media subscription to create automated download rules...",
"inputSchema": {
"type": "object",
"properties": {
"title": {
"type": "string",
"description": "The title of the media to subscribe to"
},
...
},
"required": ["title", "year", "media_type"]
}
}
```
### 4. 获取工具参数Schema
**GET** `/api/v1/mcp/tools/{tool_name}/schema`
获取指定工具的参数SchemaJSON Schema格式
**认证**: 需要API KEY在请求头中添加 `X-API-KEY: <api_key>` 或在查询参数中添加 `apikey=<api_key>`
**路径参数**:
- `tool_name`: 工具名称
**响应示例**:
```json
{
"type": "object",
"properties": {
"title": {
"type": "string",
"description": "The title of the media to subscribe to"
},
"year": {
"type": "string",
"description": "Release year of the media"
},
...
},
"required": ["title", "year", "media_type"]
}
```
## MCP客户端配置
MoviePilot的MCP工具可以通过HTTP协议在支持MCP的客户端中使用。以下是常见MCP客户端的配置方法
### Claude Desktop (Anthropic)
在Claude Desktop的配置文件中添加MoviePilot的MCP服务器配置
**macOS**: `~/Library/Application Support/Claude/claude_desktop_config.json`
**Windows**: `%APPDATA%\Claude\claude_desktop_config.json`
```json
{
"mcpServers": {
"moviepilot": {
"command": "npx",
"args": [
"-y",
"@modelcontextprotocol/server-http",
"http://localhost:3001/api/v1/mcp"
],
"env": {
"X-API-KEY": "your_api_key_here"
}
}
}
}
```
**注意**: 如果MCP HTTP服务器不支持环境变量传递API Key可以使用查询参数方式
```json
{
"mcpServers": {
"moviepilot": {
"command": "npx",
"args": [
"-y",
"@modelcontextprotocol/server-http",
"http://localhost:3001/api/v1/mcp?apikey=your_api_key_here"
]
}
}
}
```
### 其他支持MCP的聊天客户端
对于其他支持MCP协议的聊天客户端如其他AI聊天助手、对话机器人等通常可以通过配置文件或设置界面添加HTTP协议的MCP服务器。配置格式可能因客户端而异但通常需要以下信息
**配置参数**
1. **服务器类型**: HTTP
2. **服务器地址**: `http://your-moviepilot-host:3001/api/v1/mcp`
3. **认证方式**:
- 在HTTP请求头中添加 `X-API-KEY: <your_api_key>`
- 或在URL查询参数中添加 `apikey=<your_api_key>`
**示例配置**(通用格式):
使用请求头方式:
```json
{
"mcpServers": {
"moviepilot": {
"url": "http://localhost:3001/api/v1/mcp",
"headers": {
"X-API-KEY": "your_api_key_here"
}
}
}
}
```
或使用查询参数方式:
```json
{
"mcpServers": {
"moviepilot": {
"url": "http://localhost:3001/api/v1/mcp?apikey=your_api_key_here"
}
}
}
```
**支持的端点**:
- `GET /tools` - 列出所有工具
- `POST /tools/call` - 调用工具
- `GET /tools/{tool_name}` - 获取工具详情
- `GET /tools/{tool_name}/schema` - 获取工具参数Schema
配置完成后您就可以在聊天对话中使用MoviePilot的各种工具例如
- 添加媒体订阅
- 查询下载历史
- 搜索媒体资源
- 管理媒体服务器
- 等等...
### 获取API Key
API Key可以在MoviePilot的系统设置中生成和查看。请妥善保管您的API Key不要泄露给他人。
## 认证
所有MCP API端点都需要认证。**仅支持API Key认证方式**
- **请求头方式**: 在请求头中添加 `X-API-KEY: <api_key>`
- **查询参数方式**: 在URL查询参数中添加 `apikey=<api_key>`
**获取API Key**: 在MoviePilot系统设置中生成和查看API Key。请妥善保管您的API Key不要泄露给他人。
## 错误处理
API会返回标准的HTTP状态码
- `200 OK`: 请求成功
- `400 Bad Request`: 请求参数错误
- `401 Unauthorized`: 未认证或API Key无效
- `404 Not Found`: 工具不存在
- `500 Internal Server Error`: 服务器内部错误
错误响应格式:
```json
{
"detail": "错误描述信息"
}
```
## 架构说明
工具API通过FastAPI端点暴露使用HTTP协议与客户端通信。所有工具共享相同的实现确保功能一致性。
## 注意事项
1. **用户上下文**: API调用会使用当前认证用户的ID作为工具执行的用户上下文
2. **会话隔离**: 每个API请求使用独立的会话ID
3. **参数验证**: 工具参数会根据JSON Schema进行验证
4. **错误日志**: 所有工具调用错误都会记录到MoviePilot日志系统

View File

@@ -80,11 +80,11 @@ pympler~=1.1
smbprotocol~=1.15.0
setproctitle~=1.3.6
httpx[socks]~=0.28.1
langchain==0.3.27
langchain-core==0.3.76
langchain-community==0.3.29
langchain-openai==0.3.33
langchain-google-genai==2.0.10
langchain-deepseek==0.1.4
langchain-experimental==0.3.4
openai==1.108.2
langchain~=0.3.27
langchain-core~=0.3.76
langchain-community~=0.3.29
langchain-openai~=0.3.33
langchain-google-genai~=2.0.10
langchain-deepseek~=0.1.4
langchain-experimental~=0.3.4
openai~=1.108.2

View File

@@ -1,2 +1,2 @@
APP_VERSION = 'v2.8.3'
APP_VERSION = 'v2.8.4-1'
FRONTEND_VERSION = 'v2.8.3'