Compare commits

...

33 Commits

Author SHA1 Message Date
jxxghp
c83589cac6 rollback telegram 2025-11-18 21:56:55 +08:00
jxxghp
d64492bda5 fix QueryEpisodeScheduleTool 2025-11-18 21:46:11 +08:00
jxxghp
33d6c75924 fix telegram 2025-11-18 21:43:55 +08:00
jxxghp
89f01bad42 fix telegram 2025-11-18 21:32:09 +08:00
jxxghp
767496f81b fix telegram 2025-11-18 21:31:44 +08:00
jxxghp
147a477365 fix site 2025-11-18 21:13:45 +08:00
jxxghp
13171f636f fix 2025-11-18 20:34:53 +08:00
jxxghp
fea3f0d3e0 fix telegram markdown 2025-11-18 19:53:27 +08:00
jxxghp
a3a254c2ea fix telegram markdown 2025-11-18 19:09:36 +08:00
jxxghp
bd9d5f7fc0 Merge pull request #5135 from jxxghp/cursor/handle-telegram-hyphen-escape-error-9f51 2025-11-18 17:49:15 +08:00
Cursor Agent
726738ee9e Refactor: Protect only markdown delimiters, not content
Co-authored-by: jxxghp <jxxghp@qq.com>
2025-11-18 09:43:42 +00:00
jxxghp
725244bb2f fix escape_markdown 2025-11-18 17:30:00 +08:00
jxxghp
d2ac2b8990 feat: add QueryEpisodeScheduleTool to MoviePilotToolFactory
- Included QueryEpisodeScheduleTool in the tool definitions of MoviePilotToolFactory to enhance episode scheduling capabilities.
- Updated factory.py to reflect the addition of this new tool, improving overall functionality.
2025-11-18 17:20:23 +08:00
jxxghp
116569223c fix 2025-11-18 16:44:39 +08:00
jxxghp
05442a019f feat: add UpdateSubscribeTool and additional query tools to MoviePilotToolFactory
- Included UpdateSubscribeTool and QuerySiteUserdataTool in the tool definitions of MoviePilotToolFactory to enhance subscription management and user data querying capabilities.
- Updated the factory.py file to reflect the addition of these new tools, improving overall functionality.
2025-11-18 16:41:15 +08:00
jxxghp
db67080bf8 feat: add ScrapeMetadataTool to MoviePilotToolFactory
- Included ScrapeMetadataTool in the tool definitions of MoviePilotToolFactory to enhance metadata scraping capabilities.
- Updated the factory.py file to reflect the addition of the new tool.
2025-11-18 16:26:18 +08:00
jxxghp
21fabf7436 feat: enhance GetRecommendationsTool and update query tools for improved functionality
- Expanded the GetRecommendationsTool to support additional recommendation sources, including TMDB popular movies and TV shows, as well as various Douban categories.
- Updated the limit for results in QuerySubscribesTool, SearchMediaTool, and QueryTransferHistoryTool from 20 to 50 or 30, respectively, to provide more comprehensive results.
- Removed unnecessary description fields from media objects in QueryPopularSubscribesTool, QuerySubscribeHistoryTool, and QuerySubscribeSharesTool for cleaner output.
2025-11-18 16:21:13 +08:00
jxxghp
a8c6516b31 refactor: remove deprecated tools and add RecognizeMediaTool
- Removed the old tool definitions from __init__.py to streamline the module.
- Added RecognizeMediaTool to factory.py for enhanced media recognition capabilities.
- Updated tool exports to reflect the changes in available tools.
2025-11-18 16:07:11 +08:00
jxxghp
f5ca48a56e fix: update recommendation fetching logic in GetRecommendationsTool
- Adjusted the recommendation fetching methods to accept page and count parameters for better control over result limits.
- Implemented checks to ensure the format of recommendation results is valid, enhancing robustness.
- Added truncation for long overview descriptions to maintain consistency in output.
2025-11-18 13:08:09 +08:00
jxxghp
65ceff9824 v2.8.3
- MoviePilot助手增加了多个智能体工具,大幅提升智能体能力
- 智能体对话支持上下文记忆,发送 `/clear_session` 指令或超过15分钟没动作将清除记忆
2025-11-18 12:52:58 +08:00
jxxghp
ed73cfdcc7 fix: update message formatting in MoviePilotTool for clarity
- Changed the tool message format to use "=>" instead of wrapping with separators, enhancing readability and user experience during tool execution.
2025-11-18 12:43:27 +08:00
jxxghp
9cb79a7827 feat: enhance MoviePilotTool with customizable tool messages
- Added `get_tool_message` method to `MoviePilotTool` and its subclasses for generating user-friendly execution messages based on parameters.
- Improved message formatting for various tools, including `AddDownloadTool`, `AddSubscribeTool`, `DeleteDownloadTool`, and others, to provide clearer feedback during operations.
- This enhancement allows for more personalized and informative messages, improving user experience during tool execution.
2025-11-18 12:42:24 +08:00
jxxghp
984f29005a 更新 message.py 2025-11-18 12:17:12 +08:00
jxxghp
805c3719af feat: add new query tools for enhanced subscription management
- Introduced QuerySubscribeSharesTool, QueryPopularSubscribesTool, and QuerySubscribeHistoryTool to improve subscription querying capabilities.
- Updated __all__ exports in init.py and factory.py to include the new tools.
- Enhanced QuerySubscribesTool to support media type filtering with localized descriptions.
2025-11-18 12:05:06 +08:00
jxxghp
ea646149c0 feat: add new tools for download management and enhance query capabilities
- Introduced DeleteDownloadTool, QueryDirectoriesTool, ListDirectoryTool, QueryTransferHistoryTool, and TransferFileTool to the toolset for improved download management.
- Updated __all__ exports in init.py and factory.py to include the new tools.
- Enhanced QueryDownloadsTool to support querying downloads by hash and title, providing more flexible search options and detailed results.
2025-11-18 11:57:01 +08:00
jxxghp
eae1f8ee4d feat: add UpdateSiteCookieTool and enhance site operations
- Introduced UpdateSiteCookieTool to the toolset for managing site cookies.
- Updated __all__ exports in init.py and factory.py to include UpdateSiteCookieTool.
- Added async_get method in SiteOper for asynchronous retrieval of individual site records, improving database interaction.
2025-11-18 11:34:37 +08:00
jxxghp
8d1de245a6 feat: add TestSiteTool and enhance AddSubscribeTool with advanced filtering options
- Introduced TestSiteTool to the toolset for site testing functionalities.
- Updated __all__ exports in init.py and factory.py to include TestSiteTool.
- Enhanced AddSubscribeTool to support additional parameters for episode management and media quality filtering, improving subscription customization.
2025-11-18 08:51:32 +08:00
jxxghp
b8ef5d1efc feat: add session management to MessageChain
- Implemented session ID creation and reuse based on user activity, with a timeout of 15 minutes.
- Added remote command to clear user sessions, enhancing user session management capabilities.
- Updated Command class to include a new command for clearing sessions.
2025-11-18 08:41:05 +08:00
jxxghp
e1098b34e8 feat: add new tools for subscription management and scheduling
- Introduced DeleteSubscribeTool for managing subscriptions.
- Added QuerySchedulersTool and RunSchedulerTool for enhanced scheduling capabilities.
- Updated __all__ exports in init.py and factory.py to include new tools.
2025-11-18 08:34:00 +08:00
jxxghp
8296f8d2da Enhance tool message formatting in MoviePilotTool: wrap execution messages with separators and icons for better distinction from regular agent messages. 2025-11-17 17:14:05 +08:00
jxxghp
867c83383d Merge pull request #5122 from Seed680/v2 2025-11-17 15:58:53 +08:00
noone
1354119d6d fix(telegram):优化消息发送错误日志记录
- 在 send_msg 方法中细化错误日志,明确指出发送失败的位置
- 在 send_medias_msg 方法中增加标题转义注释并调整日志描述
- 在 send_torrents_msg 方法中补充标题转义逻辑及错误日志说明
2025-11-17 15:22:25 +08:00
noone
53af7f81bb fix:对telegram发送标题进行转义 2025-11-17 15:15:28 +08:00
44 changed files with 3110 additions and 204 deletions

View File

@@ -13,7 +13,7 @@ from langchain_core.runnables.history import RunnableWithMessageHistory
from app.agent.callback import StreamingCallbackHandler
from app.agent.memory import ConversationMemoryManager
from app.agent.prompt import PromptManager
from app.agent.tools import MoviePilotToolFactory
from app.agent.tools.factory import MoviePilotToolFactory
from app.chain import ChainBase
from app.core.config import settings
from app.helper.message import MessageHelper

View File

@@ -1,31 +0,0 @@
"""MoviePilot工具模块"""
from .base import MoviePilotTool
from app.agent.tools.impl.search_media import SearchMediaTool
from app.agent.tools.impl.add_subscribe import AddSubscribeTool
from app.agent.tools.impl.search_torrents import SearchTorrentsTool
from app.agent.tools.impl.add_download import AddDownloadTool
from app.agent.tools.impl.query_subscribes import QuerySubscribesTool
from app.agent.tools.impl.query_downloads import QueryDownloadsTool
from app.agent.tools.impl.query_downloaders import QueryDownloadersTool
from app.agent.tools.impl.query_sites import QuerySitesTool
from app.agent.tools.impl.get_recommendations import GetRecommendationsTool
from app.agent.tools.impl.query_media_library import QueryMediaLibraryTool
from app.agent.tools.impl.send_message import SendMessageTool
from .factory import MoviePilotToolFactory
__all__ = [
"MoviePilotTool",
"SearchMediaTool",
"AddSubscribeTool",
"SearchTorrentsTool",
"AddDownloadTool",
"QuerySubscribesTool",
"QueryDownloadsTool",
"QueryDownloadersTool",
"QuerySitesTool",
"GetRecommendationsTool",
"QueryMediaLibraryTool",
"SendMessageTool",
"MoviePilotToolFactory"
]

View File

@@ -1,6 +1,6 @@
"""MoviePilot工具基类"""
from abc import ABCMeta, abstractmethod
from typing import Callable, Any
from typing import Callable, Any, Optional
from langchain.tools import BaseTool
from pydantic import PrivateAttr
@@ -39,11 +39,33 @@ class MoviePilotTool(BaseTool, metaclass=ABCMeta):
if agent_message:
await self.send_tool_message(agent_message, title="MoviePilot助手")
# 发送执行工具说明
explanation = kwargs.get("explanation")
if explanation:
await self.send_tool_message(f"▶️️{explanation}")
# 优先使用工具自定义的提示消息,如果没有则使用 explanation
tool_message = self.get_tool_message(**kwargs)
if not tool_message:
explanation = kwargs.get("explanation")
if explanation:
tool_message = explanation
if tool_message:
formatted_message = f"⚙️ => {tool_message}"
await self.send_tool_message(formatted_message)
return await self.run(**kwargs)
def get_tool_message(self, **kwargs) -> Optional[str]:
"""
获取工具执行时的友好提示消息
子类可以重写此方法,根据实际参数生成个性化的提示消息。
如果返回 None 或空字符串,将回退使用 explanation 参数。
Args:
**kwargs: 工具的所有参数(包括 explanation
Returns:
str: 友好的提示消息,如果返回 None 或空字符串则使用 explanation
"""
return None
@abstractmethod
async def run(self, **kwargs) -> str:
raise NotImplementedError
@@ -68,6 +90,5 @@ class MoviePilotTool(BaseTool, metaclass=ABCMeta):
username=self._username,
title=title,
text=message
),
escape_markdown=False
)
)

View File

@@ -4,15 +4,36 @@ from typing import List, Callable
from app.agent.tools.impl.add_download import AddDownloadTool
from app.agent.tools.impl.add_subscribe import AddSubscribeTool
from app.agent.tools.impl.update_subscribe import UpdateSubscribeTool
from app.agent.tools.impl.get_recommendations import GetRecommendationsTool
from app.agent.tools.impl.query_downloaders import QueryDownloadersTool
from app.agent.tools.impl.query_downloads import QueryDownloadsTool
from app.agent.tools.impl.query_media_library import QueryMediaLibraryTool
from app.agent.tools.impl.query_sites import QuerySitesTool
from app.agent.tools.impl.update_site import UpdateSiteTool
from app.agent.tools.impl.query_site_userdata import QuerySiteUserdataTool
from app.agent.tools.impl.test_site import TestSiteTool
from app.agent.tools.impl.query_subscribes import QuerySubscribesTool
from app.agent.tools.impl.query_subscribe_shares import QuerySubscribeSharesTool
from app.agent.tools.impl.query_popular_subscribes import QueryPopularSubscribesTool
from app.agent.tools.impl.query_subscribe_history import QuerySubscribeHistoryTool
from app.agent.tools.impl.delete_subscribe import DeleteSubscribeTool
from app.agent.tools.impl.search_media import SearchMediaTool
from app.agent.tools.impl.recognize_media import RecognizeMediaTool
from app.agent.tools.impl.scrape_metadata import ScrapeMetadataTool
from app.agent.tools.impl.query_episode_schedule import QueryEpisodeScheduleTool
from app.agent.tools.impl.search_torrents import SearchTorrentsTool
from app.agent.tools.impl.send_message import SendMessageTool
from app.agent.tools.impl.query_schedulers import QuerySchedulersTool
from app.agent.tools.impl.run_scheduler import RunSchedulerTool
from app.agent.tools.impl.query_workflows import QueryWorkflowsTool
from app.agent.tools.impl.run_workflow import RunWorkflowTool
from app.agent.tools.impl.update_site_cookie import UpdateSiteCookieTool
from app.agent.tools.impl.delete_download import DeleteDownloadTool
from app.agent.tools.impl.query_directories import QueryDirectoriesTool
from app.agent.tools.impl.list_directory import ListDirectoryTool
from app.agent.tools.impl.query_transfer_history import QueryTransferHistoryTool
from app.agent.tools.impl.transfer_file import TransferFileTool
from app.core.plugin import PluginManager
from app.log import logger
from .base import MoviePilotTool
@@ -29,16 +50,37 @@ class MoviePilotToolFactory:
tools = []
tool_definitions = [
SearchMediaTool,
RecognizeMediaTool,
ScrapeMetadataTool,
QueryEpisodeScheduleTool,
AddSubscribeTool,
UpdateSubscribeTool,
SearchTorrentsTool,
AddDownloadTool,
QuerySubscribesTool,
QuerySubscribeSharesTool,
QueryPopularSubscribesTool,
QuerySubscribeHistoryTool,
DeleteSubscribeTool,
QueryDownloadsTool,
DeleteDownloadTool,
QueryDownloadersTool,
QuerySitesTool,
UpdateSiteTool,
QuerySiteUserdataTool,
TestSiteTool,
UpdateSiteCookieTool,
GetRecommendationsTool,
QueryMediaLibraryTool,
SendMessageTool
QueryDirectoriesTool,
ListDirectoryTool,
QueryTransferHistoryTool,
TransferFileTool,
SendMessageTool,
QuerySchedulersTool,
RunSchedulerTool,
QueryWorkflowsTool,
RunWorkflowTool
]
# 创建内置工具
for ToolClass in tool_definitions:

View File

@@ -35,6 +35,20 @@ class AddDownloadTool(MoviePilotTool):
description: str = "Add torrent download task to the configured downloader (qBittorrent, Transmission, etc.). Downloads the torrent file and starts the download process with specified settings."
args_schema: Type[BaseModel] = AddDownloadInput
def get_tool_message(self, **kwargs) -> Optional[str]:
"""根据下载参数生成友好的提示消息"""
torrent_title = kwargs.get("torrent_title", "")
site_name = kwargs.get("site_name", "")
downloader = kwargs.get("downloader")
message = f"正在添加下载任务: {torrent_title}"
if site_name:
message += f" (来源: {site_name})"
if downloader:
message += f" [下载器: {downloader}]"
return message
async def run(self, site_name: str, torrent_title: str, torrent_url: str, torrent_description: Optional[str] = None,
downloader: Optional[str] = None, save_path: Optional[str] = None,
labels: Optional[str] = None, **kwargs) -> str:

View File

@@ -21,17 +21,49 @@ class AddSubscribeInput(BaseModel):
description="Season number for TV shows (optional, if not specified will subscribe to all seasons)")
tmdb_id: Optional[str] = Field(None,
description="TMDB database ID for precise media identification (optional but recommended for accuracy)")
start_episode: Optional[int] = Field(None,
description="Starting episode number for TV shows (optional, defaults to 1 if not specified)")
total_episode: Optional[int] = Field(None,
description="Total number of episodes for TV shows (optional, will be auto-detected from TMDB if not specified)")
quality: Optional[str] = Field(None,
description="Quality filter as regular expression (optional, e.g., 'BluRay|WEB-DL|HDTV')")
resolution: Optional[str] = Field(None,
description="Resolution filter as regular expression (optional, e.g., '1080p|720p|2160p')")
effect: Optional[str] = Field(None,
description="Effect filter as regular expression (optional, e.g., 'HDR|DV|SDR')")
class AddSubscribeTool(MoviePilotTool):
name: str = "add_subscribe"
description: str = "Add media subscription to create automated download rules for movies and TV shows. The system will automatically search and download new episodes or releases based on the subscription criteria."
description: str = "Add media subscription to create automated download rules for movies and TV shows. The system will automatically search and download new episodes or releases based on the subscription criteria. Supports advanced filtering options like quality, resolution, and effect filters using regular expressions."
args_schema: Type[BaseModel] = AddSubscribeInput
def get_tool_message(self, **kwargs) -> Optional[str]:
"""根据订阅参数生成友好的提示消息"""
title = kwargs.get("title", "")
year = kwargs.get("year", "")
media_type = kwargs.get("media_type", "")
season = kwargs.get("season")
message = f"正在添加订阅: {title}"
if year:
message += f" ({year})"
if media_type:
message += f" [{media_type}]"
if season:
message += f"{season}"
return message
async def run(self, title: str, year: str, media_type: str,
season: Optional[int] = None, tmdb_id: Optional[str] = None, **kwargs) -> str:
season: Optional[int] = None, tmdb_id: Optional[str] = None,
start_episode: Optional[int] = None, total_episode: Optional[int] = None,
quality: Optional[str] = None, resolution: Optional[str] = None,
effect: Optional[str] = None, **kwargs) -> str:
logger.info(
f"执行工具: {self.name}, 参数: title={title}, year={year}, media_type={media_type}, season={season}, tmdb_id={tmdb_id}")
f"执行工具: {self.name}, 参数: title={title}, year={year}, media_type={media_type}, "
f"season={season}, tmdb_id={tmdb_id}, start_episode={start_episode}, "
f"total_episode={total_episode}, quality={quality}, resolution={resolution}, effect={effect}")
try:
subscribe_chain = SubscribeChain()
@@ -43,16 +75,45 @@ class AddSubscribeTool(MoviePilotTool):
except (ValueError, TypeError):
logger.warning(f"无效的 tmdb_id: {tmdb_id},将忽略")
# 构建额外的订阅参数
subscribe_kwargs = {}
if start_episode is not None:
subscribe_kwargs['start_episode'] = start_episode
if total_episode is not None:
subscribe_kwargs['total_episode'] = total_episode
if quality:
subscribe_kwargs['quality'] = quality
if resolution:
subscribe_kwargs['resolution'] = resolution
if effect:
subscribe_kwargs['effect'] = effect
sid, message = await subscribe_chain.async_add(
mtype=MediaType(media_type),
title=title,
year=year,
tmdbid=tmdbid_int,
season=season,
username=self._user_id
username=self._user_id,
**subscribe_kwargs
)
if sid:
return f"成功添加订阅:{title} ({year})"
result_msg = f"成功添加订阅:{title} ({year})"
if subscribe_kwargs:
params = []
if start_episode is not None:
params.append(f"开始集数: {start_episode}")
if total_episode is not None:
params.append(f"总集数: {total_episode}")
if quality:
params.append(f"质量过滤: {quality}")
if resolution:
params.append(f"分辨率过滤: {resolution}")
if effect:
params.append(f"特效过滤: {effect}")
if params:
result_msg += f"\n配置参数: {', '.join(params)}"
return result_msg
else:
return f"添加订阅失败:{message}"
except Exception as e:

View File

@@ -0,0 +1,76 @@
"""删除下载任务工具"""
from typing import Optional, Type
from pydantic import BaseModel, Field
from app.agent.tools.base import MoviePilotTool
from app.chain.download import DownloadChain
from app.log import logger
class DeleteDownloadInput(BaseModel):
"""删除下载任务工具的输入参数模型"""
explanation: str = Field(..., description="Clear explanation of why this tool is being used in the current context")
task_identifier: str = Field(..., description="Task identifier: can be task hash (unique identifier) or task title/name")
downloader: Optional[str] = Field(None, description="Name of specific downloader (optional, if not provided will search all downloaders)")
delete_files: Optional[bool] = Field(False, description="Whether to delete downloaded files along with the task (default: False, only removes the task from downloader)")
class DeleteDownloadTool(MoviePilotTool):
name: str = "delete_download"
description: str = "Delete a download task from the downloader. Can delete by task hash (unique identifier) or task title/name. Optionally specify the downloader name and whether to delete downloaded files."
args_schema: Type[BaseModel] = DeleteDownloadInput
def get_tool_message(self, **kwargs) -> Optional[str]:
"""根据删除参数生成友好的提示消息"""
task_identifier = kwargs.get("task_identifier", "")
downloader = kwargs.get("downloader")
delete_files = kwargs.get("delete_files", False)
message = f"正在删除下载任务: {task_identifier}"
if downloader:
message += f" [下载器: {downloader}]"
if delete_files:
message += " (包含文件)"
return message
async def run(self, task_identifier: str, downloader: Optional[str] = None,
delete_files: Optional[bool] = False, **kwargs) -> str:
logger.info(f"执行工具: {self.name}, 参数: task_identifier={task_identifier}, downloader={downloader}, delete_files={delete_files}")
try:
download_chain = DownloadChain()
# 如果task_identifier看起来像hash通常是40个字符的十六进制字符串
task_hash = None
if len(task_identifier) == 40 and all(c in '0123456789abcdefABCDEF' for c in task_identifier):
# 直接使用hash
task_hash = task_identifier
else:
# 通过标题查找任务
downloads = download_chain.downloading(name=downloader)
for dl in downloads:
# 检查标题或名称是否匹配
if (task_identifier.lower() in (dl.title or "").lower()) or \
(task_identifier.lower() in (dl.name or "").lower()):
task_hash = dl.hash
break
if not task_hash:
return f"未找到匹配的下载任务:{task_identifier},请使用 query_downloads 工具查询可用的下载任务"
# 删除下载任务
# remove_torrents 支持 delete_file 参数,可以控制是否删除文件
result = download_chain.remove_torrents(hashs=[task_hash], downloader=downloader, delete_file=delete_files)
if result:
files_info = "(包含文件)" if delete_files else "(不包含文件)"
return f"成功删除下载任务:{task_identifier} {files_info}"
else:
return f"删除下载任务失败:{task_identifier},请检查任务是否存在或下载器是否可用"
except Exception as e:
logger.error(f"删除下载任务失败: {e}", exc_info=True)
return f"删除下载任务时发生错误: {str(e)}"

View File

@@ -0,0 +1,63 @@
"""删除订阅工具"""
from typing import Optional, Type
from pydantic import BaseModel, Field
from app.agent.tools.base import MoviePilotTool
from app.core.event import eventmanager
from app.db.subscribe_oper import SubscribeOper
from app.helper.subscribe import SubscribeHelper
from app.log import logger
from app.schemas.types import EventType
class DeleteSubscribeInput(BaseModel):
"""删除订阅工具的输入参数模型"""
explanation: str = Field(..., description="Clear explanation of why this tool is being used in the current context")
subscribe_id: int = Field(..., description="The ID of the subscription to delete (can be obtained from query_subscribes tool)")
class DeleteSubscribeTool(MoviePilotTool):
name: str = "delete_subscribe"
description: str = "Delete a media subscription by its ID. This will remove the subscription and stop automatic downloads for that media."
args_schema: Type[BaseModel] = DeleteSubscribeInput
def get_tool_message(self, **kwargs) -> Optional[str]:
"""根据删除参数生成友好的提示消息"""
subscribe_id = kwargs.get("subscribe_id")
return f"正在删除订阅 (ID: {subscribe_id})"
async def run(self, subscribe_id: int, **kwargs) -> str:
logger.info(f"执行工具: {self.name}, 参数: subscribe_id={subscribe_id}")
try:
subscribe_oper = SubscribeOper()
# 获取订阅信息
subscribe = await subscribe_oper.async_get(subscribe_id)
if not subscribe:
return f"订阅 ID {subscribe_id} 不存在"
# 在删除之前获取订阅信息(用于事件)
subscribe_info = subscribe.to_dict()
# 删除订阅
subscribe_oper.delete(subscribe_id)
# 发送事件
await eventmanager.async_send_event(EventType.SubscribeDeleted, {
"subscribe_id": subscribe_id,
"subscribe_info": subscribe_info
})
# 统计订阅
SubscribeHelper().sub_done_async({
"tmdbid": subscribe.tmdbid,
"doubanid": subscribe.doubanid
})
return f"成功删除订阅:{subscribe.name} ({subscribe.year})"
except Exception as e:
logger.error(f"删除订阅失败: {e}", exc_info=True)
return f"删除订阅时发生错误: {str(e)}"

View File

@@ -14,7 +14,21 @@ class GetRecommendationsInput(BaseModel):
"""获取推荐工具的输入参数模型"""
explanation: str = Field(..., description="Clear explanation of why this tool is being used in the current context")
source: Optional[str] = Field("tmdb_trending",
description="Recommendation source: 'tmdb_trending' for TMDB trending content, 'douban_hot' for Douban popular content, 'bangumi_calendar' for Bangumi anime calendar")
description="Recommendation source: "
"'tmdb_trending' for TMDB trending content, "
"'tmdb_movies' for TMDB popular movies, "
"'tmdb_tvs' for TMDB popular TV shows, "
"'douban_hot' for Douban popular content, "
"'douban_movie_hot' for Douban hot movies, "
"'douban_tv_hot' for Douban hot TV shows, "
"'douban_movie_showing' for Douban movies currently showing, "
"'douban_movies' for Douban latest movies, "
"'douban_tvs' for Douban latest TV shows, "
"'douban_movie_top250' for Douban movie TOP250, "
"'douban_tv_weekly_chinese' for Douban Chinese TV weekly chart, "
"'douban_tv_weekly_global' for Douban global TV weekly chart, "
"'douban_tv_animation' for Douban popular animation, "
"'bangumi_calendar' for Bangumi anime calendar")
media_type: Optional[str] = Field("all",
description="Type of media content: '电影' for films, '电视剧' for television series or anime series, 'all' for all types")
limit: Optional[int] = Field(20,
@@ -26,29 +40,98 @@ class GetRecommendationsTool(MoviePilotTool):
description: str = "Get trending and popular media recommendations from various sources. Returns curated lists of popular movies, TV shows, and anime based on different criteria like trending, ratings, or calendar schedules."
args_schema: Type[BaseModel] = GetRecommendationsInput
def get_tool_message(self, **kwargs) -> Optional[str]:
"""根据推荐参数生成友好的提示消息"""
source = kwargs.get("source", "tmdb_trending")
media_type = kwargs.get("media_type", "all")
limit = kwargs.get("limit", 20)
source_map = {
"tmdb_trending": "TMDB流行趋势",
"tmdb_movies": "TMDB热门电影",
"tmdb_tvs": "TMDB热门电视剧",
"douban_hot": "豆瓣热门",
"douban_movie_hot": "豆瓣热门电影",
"douban_tv_hot": "豆瓣热门电视剧",
"douban_movie_showing": "豆瓣正在热映",
"douban_movies": "豆瓣最新电影",
"douban_tvs": "豆瓣最新电视剧",
"douban_movie_top250": "豆瓣电影TOP250",
"douban_tv_weekly_chinese": "豆瓣国产剧集榜",
"douban_tv_weekly_global": "豆瓣全球剧集榜",
"douban_tv_animation": "豆瓣热门动漫",
"bangumi_calendar": "番组计划"
}
source_desc = source_map.get(source, source)
message = f"正在获取推荐: {source_desc}"
if media_type != "all":
message += f" [{media_type}]"
message += f" (限制: {limit}条)"
return message
async def run(self, source: Optional[str] = "tmdb_trending",
media_type: Optional[str] = "all", limit: Optional[int] = 20, **kwargs) -> str:
logger.info(f"执行工具: {self.name}, 参数: source={source}, media_type={media_type}, limit={limit}")
try:
name_dicts = {
"tmdb_trending": "TMDB 热门推荐",
"douban_hot": "豆瓣热门推荐",
"bangumi_calendar": "番组计划推荐"
}
recommend_chain = RecommendChain()
results = []
if source == "tmdb_trending":
results = await recommend_chain.async_tmdb_trending(limit=limit)
# async_tmdb_trending 只接受 page 参数,返回固定数量的结果
# 如果需要限制数量,需要在返回后截取
results = await recommend_chain.async_tmdb_trending(page=1)
if limit and limit > 0:
results = results[:limit]
elif source == "tmdb_movies":
# async_tmdb_movies 接受 page 参数,返回固定数量的结果
results = await recommend_chain.async_tmdb_movies(page=1)
if limit and limit > 0:
results = results[:limit]
elif source == "tmdb_tvs":
# async_tmdb_tvs 接受 page 参数,返回固定数量的结果
results = await recommend_chain.async_tmdb_tvs(page=1)
if limit and limit > 0:
results = results[:limit]
elif source == "douban_hot":
if media_type == "movie":
results = await recommend_chain.async_douban_movie_hot(limit=limit)
results = await recommend_chain.async_douban_movie_hot(page=1, count=limit)
elif media_type == "tv":
results = await recommend_chain.async_douban_tv_hot(limit=limit)
results = await recommend_chain.async_douban_tv_hot(page=1, count=limit)
else: # all
results.extend(await recommend_chain.async_douban_movie_hot(limit=limit))
results.extend(await recommend_chain.async_douban_tv_hot(limit=limit))
results.extend(await recommend_chain.async_douban_movie_hot(page=1, count=limit))
results.extend(await recommend_chain.async_douban_tv_hot(page=1, count=limit))
elif source == "douban_movie_hot":
results = await recommend_chain.async_douban_movie_hot(page=1, count=limit)
elif source == "douban_tv_hot":
results = await recommend_chain.async_douban_tv_hot(page=1, count=limit)
elif source == "douban_movie_showing":
results = await recommend_chain.async_douban_movie_showing(page=1, count=limit)
elif source == "douban_movies":
results = await recommend_chain.async_douban_movies(page=1, count=limit)
elif source == "douban_tvs":
results = await recommend_chain.async_douban_tvs(page=1, count=limit)
elif source == "douban_movie_top250":
results = await recommend_chain.async_douban_movie_top250(page=1, count=limit)
elif source == "douban_tv_weekly_chinese":
results = await recommend_chain.async_douban_tv_weekly_chinese(page=1, count=limit)
elif source == "douban_tv_weekly_global":
results = await recommend_chain.async_douban_tv_weekly_global(page=1, count=limit)
elif source == "douban_tv_animation":
results = await recommend_chain.async_douban_tv_animation(page=1, count=limit)
elif source == "bangumi_calendar":
results = await recommend_chain.async_bangumi_calendar(limit=limit)
results = await recommend_chain.async_bangumi_calendar(page=1, count=limit)
else:
# 不支持的推荐来源
supported_sources = [
"tmdb_trending", "tmdb_movies", "tmdb_tvs",
"douban_hot", "douban_movie_hot", "douban_tv_hot",
"douban_movie_showing", "douban_movies", "douban_tvs",
"douban_movie_top250", "douban_tv_weekly_chinese",
"douban_tv_weekly_global", "douban_tv_animation",
"bangumi_calendar"
]
return f"不支持的推荐来源: {source}。支持的来源包括: {', '.join(supported_sources)}"
if results:
# 限制最多20条结果
@@ -57,7 +140,11 @@ class GetRecommendationsTool(MoviePilotTool):
# 精简字段,只保留关键信息
simplified_results = []
for r in limited_results:
# r 已经是字典格式to_dict的结果
# r 应该是字典格式to_dict的结果,但为了安全起见进行检查
if not isinstance(r, dict):
logger.warning(f"推荐结果格式异常,跳过: {type(r)}")
continue
simplified = {
"title": r.get("title"),
"en_title": r.get("en_title"),
@@ -67,7 +154,6 @@ class GetRecommendationsTool(MoviePilotTool):
"tmdb_id": r.get("tmdb_id"),
"imdb_id": r.get("imdb_id"),
"douban_id": r.get("douban_id"),
"overview": r.get("overview", "")[:200] + "..." if r.get("overview") and len(r.get("overview", "")) > 200 else r.get("overview"),
"vote_average": r.get("vote_average"),
"poster_path": r.get("poster_path"),
"detail_link": r.get("detail_link")

View File

@@ -0,0 +1,130 @@
"""查询文件系统目录内容工具"""
import json
from datetime import datetime
from pathlib import Path
from typing import Optional, Type
from pydantic import BaseModel, Field
from app.agent.tools.base import MoviePilotTool
from app.chain.storage import StorageChain
from app.log import logger
from app.schemas.file import FileItem
from app.utils.string import StringUtils
class ListDirectoryInput(BaseModel):
"""查询文件系统目录内容工具的输入参数模型"""
explanation: str = Field(..., description="Clear explanation of why this tool is being used in the current context")
path: str = Field(..., description="Directory path to list contents (e.g., '/home/user/downloads' or 'C:/Downloads')")
storage: Optional[str] = Field("local", description="Storage type (default: 'local' for local file system, can be 'smb', 'alist', etc.)")
sort_by: Optional[str] = Field("name", description="Sort order: 'name' for alphabetical sorting, 'time' for modification time sorting (default: 'name')")
class ListDirectoryTool(MoviePilotTool):
name: str = "list_directory"
description: str = "List contents of a file system directory. Shows files and subdirectories with their names, types, sizes, and modification times. Returns up to 20 items and the total count if there are more items."
args_schema: Type[BaseModel] = ListDirectoryInput
def get_tool_message(self, **kwargs) -> Optional[str]:
"""根据目录参数生成友好的提示消息"""
path = kwargs.get("path", "")
storage = kwargs.get("storage", "local")
message = f"正在查询目录: {path}"
if storage != "local":
message += f" [存储: {storage}]"
return message
async def run(self, path: str, storage: Optional[str] = "local",
sort_by: Optional[str] = "name", **kwargs) -> str:
logger.info(f"执行工具: {self.name}, 参数: path={path}, storage={storage}, sort_by={sort_by}")
try:
# 规范化路径
if not path:
return "错误:路径不能为空"
# 确保路径格式正确
if storage == "local":
# 本地路径处理
if not path.startswith("/") and not (len(path) > 1 and path[1] == ":"):
# 相对路径,尝试转换为绝对路径
path = str(Path(path).resolve())
else:
# 远程存储路径,确保以/开头
if not path.startswith("/"):
path = "/" + path
# 创建FileItem
fileitem = FileItem(
storage=storage or "local",
path=path,
type="dir"
)
# 查询目录内容
storage_chain = StorageChain()
file_list = storage_chain.list_files(fileitem, recursion=False)
if file_list is None:
return f"无法访问目录:{path},请检查路径是否正确或存储是否可用"
if not file_list:
return f"目录 {path} 为空"
# 排序
if sort_by == "time":
file_list.sort(key=lambda x: x.modify_time or 0, reverse=True)
else:
# 默认按名称排序(目录优先,然后按名称)
file_list.sort(key=lambda x: (
0 if x.type == "dir" else 1,
StringUtils.natural_sort_key(x.name or "")
))
# 限制返回数量
total_count = len(file_list)
limited_list = file_list[:20]
# 转换为字典格式
simplified_items = []
for item in limited_list:
# 格式化文件大小
size_str = None
if item.size:
size_str = StringUtils.str_filesize(item.size)
# 格式化修改时间
modify_time_str = None
if item.modify_time:
try:
modify_time_str = datetime.fromtimestamp(item.modify_time).strftime("%Y-%m-%d %H:%M:%S")
except (ValueError, OSError):
modify_time_str = str(item.modify_time)
simplified = {
"name": item.name,
"type": item.type,
"path": item.path,
"size": size_str,
"modify_time": modify_time_str
}
# 如果是文件,添加扩展名
if item.type == "file" and item.extension:
simplified["extension"] = item.extension
simplified_items.append(simplified)
result_json = json.dumps(simplified_items, ensure_ascii=False, indent=2)
# 如果结果被裁剪,添加提示信息
if total_count > 20:
return f"注意:目录中共有 {total_count} 个项目,为节省上下文空间,仅显示前 20 个项目。\n\n{result_json}"
else:
return result_json
except Exception as e:
logger.error(f"查询目录内容失败: {e}", exc_info=True)
return f"查询目录内容时发生错误: {str(e)}"

View File

@@ -0,0 +1,134 @@
"""查询系统目录设置工具"""
import json
from typing import Optional, Type
from pydantic import BaseModel, Field
from app.agent.tools.base import MoviePilotTool
from app.helper.directory import DirectoryHelper
from app.log import logger
class QueryDirectoriesInput(BaseModel):
"""查询系统目录设置工具的输入参数模型"""
explanation: str = Field(..., description="Clear explanation of why this tool is being used in the current context")
directory_type: Optional[str] = Field("all",
description="Filter directories by type: 'download' for download directories, 'library' for media library directories, 'all' for all directories")
storage_type: Optional[str] = Field("all",
description="Filter directories by storage type: 'local' for local storage, 'remote' for remote storage, 'all' for all storage types")
name: Optional[str] = Field(None,
description="Filter directories by name (partial match, optional)")
class QueryDirectoriesTool(MoviePilotTool):
name: str = "query_directories"
description: str = "Query system directory configuration and list all configured directories. Shows download directories, media library directories, storage settings, transfer modes, and other directory-related configurations."
args_schema: Type[BaseModel] = QueryDirectoriesInput
def get_tool_message(self, **kwargs) -> Optional[str]:
"""根据查询参数生成友好的提示消息"""
directory_type = kwargs.get("directory_type", "all")
storage_type = kwargs.get("storage_type", "all")
name = kwargs.get("name")
parts = ["正在查询目录配置"]
if directory_type != "all":
type_map = {"download": "下载目录", "library": "媒体库目录"}
parts.append(f"类型: {type_map.get(directory_type, directory_type)}")
if storage_type != "all":
storage_map = {"local": "本地存储", "remote": "远程存储"}
parts.append(f"存储: {storage_map.get(storage_type, storage_type)}")
if name:
parts.append(f"名称: {name}")
return " | ".join(parts) if len(parts) > 1 else parts[0]
async def run(self, directory_type: Optional[str] = "all",
storage_type: Optional[str] = "all",
name: Optional[str] = None, **kwargs) -> str:
logger.info(f"执行工具: {self.name}, 参数: directory_type={directory_type}, storage_type={storage_type}, name={name}")
try:
directory_helper = DirectoryHelper()
# 根据目录类型获取目录列表
if directory_type == "download":
dirs = directory_helper.get_download_dirs()
elif directory_type == "library":
dirs = directory_helper.get_library_dirs()
else:
dirs = directory_helper.get_dirs()
# 按存储类型过滤
filtered_dirs = []
for d in dirs:
# 按存储类型过滤
if storage_type == "local":
# 对于下载目录,检查 storage对于媒体库目录检查 library_storage
if directory_type == "download" and d.storage != "local":
continue
elif directory_type == "library" and d.library_storage != "local":
continue
elif directory_type == "all":
# 检查是否有本地存储配置
if d.download_path and d.storage != "local":
continue
if d.library_path and d.library_storage != "local":
continue
elif storage_type == "remote":
# 对于下载目录,检查 storage对于媒体库目录检查 library_storage
if directory_type == "download" and d.storage == "local":
continue
elif directory_type == "library" and d.library_storage == "local":
continue
elif directory_type == "all":
# 检查是否有远程存储配置
if d.download_path and d.storage == "local":
continue
if d.library_path and d.library_storage == "local":
continue
# 按名称过滤(部分匹配)
if name and d.name and name.lower() not in d.name.lower():
continue
filtered_dirs.append(d)
if filtered_dirs:
# 转换为字典格式,只保留关键信息
simplified_dirs = []
for d in filtered_dirs:
simplified = {
"name": d.name,
"priority": d.priority,
"storage": d.storage,
"download_path": d.download_path,
"library_path": d.library_path,
"library_storage": d.library_storage,
"media_type": d.media_type,
"media_category": d.media_category,
"monitor_type": d.monitor_type,
"monitor_mode": d.monitor_mode,
"transfer_type": d.transfer_type,
"overwrite_mode": d.overwrite_mode,
"renaming": d.renaming,
"scraping": d.scraping,
"notify": d.notify,
"download_type_folder": d.download_type_folder,
"download_category_folder": d.download_category_folder,
"library_type_folder": d.library_type_folder,
"library_category_folder": d.library_category_folder
}
simplified_dirs.append(simplified)
result_json = json.dumps(simplified_dirs, ensure_ascii=False, indent=2)
return result_json
return "未找到相关目录配置"
except Exception as e:
logger.error(f"查询系统目录设置失败: {e}", exc_info=True)
return f"查询系统目录设置时发生错误: {str(e)}"

View File

@@ -1,7 +1,7 @@
"""查询下载器工具"""
import json
from typing import Type
from typing import Optional, Type
from pydantic import BaseModel, Field
@@ -21,6 +21,10 @@ class QueryDownloadersTool(MoviePilotTool):
description: str = "Query downloader configuration and list all available downloaders. Shows downloader status, connection details, and configuration settings."
args_schema: Type[BaseModel] = QueryDownloadersInput
def get_tool_message(self, **kwargs) -> Optional[str]:
"""生成友好的提示消息"""
return "正在查询下载器配置"
async def run(self, **kwargs) -> str:
logger.info(f"执行工具: {self.name}")
try:

View File

@@ -7,6 +7,7 @@ from pydantic import BaseModel, Field
from app.agent.tools.base import MoviePilotTool
from app.chain.download import DownloadChain
from app.db.downloadhistory_oper import DownloadHistoryOper
from app.log import logger
@@ -17,27 +18,136 @@ class QueryDownloadsInput(BaseModel):
description="Name of specific downloader to query (optional, if not provided queries all configured downloaders)")
status: Optional[str] = Field("all",
description="Filter downloads by status: 'downloading' for active downloads, 'completed' for finished downloads, 'paused' for paused downloads, 'all' for all downloads")
hash: Optional[str] = Field(None, description="Query specific download task by hash (optional, if provided will search for this specific task regardless of status)")
title: Optional[str] = Field(None, description="Query download tasks by title/name (optional, supports partial match, searches all tasks if provided)")
class QueryDownloadsTool(MoviePilotTool):
name: str = "query_downloads"
description: str = "Query download status and list all active download tasks. Shows download progress, completion status, and task details from configured downloaders."
description: str = "Query download status and list download tasks. Can query all active downloads, or search for specific tasks by hash or title. Shows download progress, completion status, and task details from configured downloaders."
args_schema: Type[BaseModel] = QueryDownloadsInput
def get_tool_message(self, **kwargs) -> Optional[str]:
"""根据查询参数生成友好的提示消息"""
downloader = kwargs.get("downloader")
status = kwargs.get("status", "all")
hash_value = kwargs.get("hash")
title = kwargs.get("title")
parts = ["正在查询下载任务"]
if downloader:
parts.append(f"下载器: {downloader}")
if status != "all":
status_map = {"downloading": "下载中", "completed": "已完成", "paused": "已暂停"}
parts.append(f"状态: {status_map.get(status, status)}")
if hash_value:
parts.append(f"Hash: {hash_value[:8]}...")
elif title:
parts.append(f"标题: {title}")
return " | ".join(parts) if len(parts) > 1 else parts[0]
async def run(self, downloader: Optional[str] = None,
status: Optional[str] = "all", **kwargs) -> str:
logger.info(f"执行工具: {self.name}, 参数: downloader={downloader}, status={status}")
status: Optional[str] = "all",
hash: Optional[str] = None,
title: Optional[str] = None, **kwargs) -> str:
logger.info(f"执行工具: {self.name}, 参数: downloader={downloader}, status={status}, hash={hash}, title={title}")
try:
download_chain = DownloadChain()
# 使用 DownloadChain.downloading 方法获取正在下载的任务
downloads = download_chain.downloading(name=downloader)
filtered_downloads = []
for dl in downloads:
if downloader and dl.downloader != downloader:
continue
if status != "all" and dl.status != status:
continue
filtered_downloads.append(dl)
# 如果提供了hash直接查询该hash的任务不限制状态
if hash:
torrents = download_chain.list_torrents(downloader=downloader, hashs=[hash])
if not torrents:
return f"未找到hash为 {hash} 的下载任务(该任务可能已完成、已删除或不存在)"
# 转换为DownloadingTorrent格式
downloads = []
for torrent in torrents:
# 获取下载历史信息
history = DownloadHistoryOper().get_by_hash(torrent.hash)
if history:
torrent.media = {
"tmdbid": history.tmdbid,
"type": history.type,
"title": history.title,
"season": history.seasons,
"episode": history.episodes,
"image": history.image,
}
torrent.userid = history.userid
torrent.username = history.username
downloads.append(torrent)
filtered_downloads = downloads
elif title:
# 如果提供了title查询所有任务并搜索匹配的标题
# 查询所有状态的任务
all_torrents = download_chain.list_torrents(downloader=downloader) or []
filtered_downloads = []
for torrent in all_torrents:
# 检查标题或名称是否匹配
if (title.lower() in (torrent.title or "").lower()) or \
(title.lower() in (torrent.name or "").lower()):
# 获取下载历史信息
history = DownloadHistoryOper().get_by_hash(torrent.hash)
if history:
torrent.media = {
"tmdbid": history.tmdbid,
"type": history.type,
"title": history.title,
"season": history.seasons,
"episode": history.episodes,
"image": history.image,
}
torrent.userid = history.userid
torrent.username = history.username
filtered_downloads.append(torrent)
if not filtered_downloads:
return f"未找到标题包含 '{title}' 的下载任务"
else:
# 根据status决定查询方式
if status == "downloading":
# 如果status为下载中使用downloading方法
downloads = download_chain.downloading(name=downloader)
filtered_downloads = []
for dl in downloads:
if downloader and dl.downloader != downloader:
continue
filtered_downloads.append(dl)
else:
# 其他状态completed、paused、all使用list_torrents查询所有任务
# 查询所有状态的任务
all_torrents = download_chain.list_torrents(downloader=downloader) or []
filtered_downloads = []
for torrent in all_torrents:
if downloader and torrent.downloader != downloader:
continue
# 根据status过滤
if status == "completed":
# 已完成的任务state为seeding或completed
if torrent.state not in ["seeding", "completed"]:
continue
elif status == "paused":
# 已暂停的任务
if torrent.state != "paused":
continue
# status == "all" 时不过滤
# 获取下载历史信息
history = DownloadHistoryOper().get_by_hash(torrent.hash)
if history:
torrent.media = {
"tmdbid": history.tmdbid,
"type": history.type,
"title": history.title,
"season": history.seasons,
"episode": history.episodes,
"image": history.image,
}
torrent.userid = history.userid
torrent.username = history.username
filtered_downloads.append(torrent)
if filtered_downloads:
# 限制最多20条结果
total_count = len(filtered_downloads)
@@ -73,6 +183,13 @@ class QueryDownloadsTool(MoviePilotTool):
# 如果结果被裁剪,添加提示信息
if total_count > 20:
return f"注意:查询结果共找到 {total_count} 条,为节省上下文空间,仅显示前 20 条结果。\n\n{result_json}"
# 如果查询的是特定hash或title添加明确的状态信息
if hash:
return f"找到hash为 {hash} 的下载任务:\n\n{result_json}"
elif title:
return f"找到 {total_count} 个标题包含 '{title}' 的下载任务:\n\n{result_json}"
return result_json
return "未找到相关下载任务"
except Exception as e:

View File

@@ -0,0 +1,116 @@
"""查询剧集上映时间工具"""
import json
from typing import Optional, Type
from pydantic import BaseModel, Field
from app.agent.tools.base import MoviePilotTool
from app.chain.media import MediaChain
from app.chain.tmdb import TmdbChain
from app.log import logger
from app.schemas import MediaType
class QueryEpisodeScheduleInput(BaseModel):
"""查询剧集上映时间工具的输入参数模型"""
explanation: str = Field(..., description="Clear explanation of why this tool is being used in the current context")
tmdb_id: int = Field(..., description="TMDB ID of the TV series")
season: int = Field(..., description="Season number to query")
episode_group: Optional[str] = Field(None, description="Episode group ID (optional)")
class QueryEpisodeScheduleTool(MoviePilotTool):
name: str = "query_episode_schedule"
description: str = "Query TV series episode air dates and schedule. Returns detailed information for each episode including air date, episode number, title, overview, and other metadata. Filters out episodes without air dates."
args_schema: Type[BaseModel] = QueryEpisodeScheduleInput
def get_tool_message(self, **kwargs) -> Optional[str]:
"""根据查询参数生成友好的提示消息"""
tmdb_id = kwargs.get("tmdb_id")
season = kwargs.get("season")
episode_group = kwargs.get("episode_group")
message = f"正在查询剧集上映时间: TMDB ID {tmdb_id}{season}"
if episode_group:
message += f" (剧集组: {episode_group})"
return message
async def run(self, tmdb_id: int, season: int, episode_group: Optional[str] = None, **kwargs) -> str:
logger.info(f"执行工具: {self.name}, 参数: tmdb_id={tmdb_id}, season={season}, episode_group={episode_group}")
try:
# 获取媒体信息(用于获取标题和海报)
media_chain = MediaChain()
mediainfo = await media_chain.async_recognize_media(tmdbid=tmdb_id, mtype=MediaType.TV)
if not mediainfo:
return f"未找到 TMDB ID {tmdb_id} 的媒体信息"
# 获取集列表
tmdb_chain = TmdbChain()
episodes = await tmdb_chain.async_tmdb_episodes(
tmdbid=tmdb_id,
season=season,
episode_group=episode_group
)
if not episodes:
return json.dumps({
"success": False,
"message": f"未找到 TMDB ID {tmdb_id}{season}季的集信息"
}, ensure_ascii=False)
# 过滤掉没有上映日期的集,并构建每集的详细信息
episode_list = []
for episode in episodes:
air_date = episode.air_date
# 过滤掉没有上映日期的数据
if not air_date:
continue
episode_info = {
"episode_number": episode.episode_number,
"name": episode.name,
"air_date": air_date,
"runtime": episode.runtime,
"vote_average": episode.vote_average,
"still_path": episode.still_path,
"episode_type": episode.episode_type,
"season_number": episode.season_number
}
episode_list.append(episode_info)
if not episode_list:
return json.dumps({
"success": False,
"message": f"未找到 TMDB ID {tmdb_id}{season}季的播出时间信息(所有集都没有播出日期)"
}, ensure_ascii=False)
# 按播出日期排序
episode_list.sort(key=lambda x: (x["air_date"] or "", x["episode_number"] or 0))
result = {
"success": True,
"tmdb_id": tmdb_id,
"season": season,
"episode_group": episode_group,
"series_title": mediainfo.title if mediainfo else None,
"series_poster": mediainfo.poster_path if mediainfo else None,
"total_episodes": len(episodes),
"episodes_with_air_date": len(episode_list),
"episodes": episode_list
}
return json.dumps(result, ensure_ascii=False, indent=2)
except Exception as e:
error_message = f"查询剧集上映时间失败: {str(e)}"
logger.error(f"查询剧集上映时间失败: {e}", exc_info=True)
return json.dumps({
"success": False,
"message": error_message,
"tmdb_id": tmdb_id,
"season": season
}, ensure_ascii=False)

View File

@@ -27,6 +27,23 @@ class QueryMediaLibraryTool(MoviePilotTool):
description: str = "Check if a specific media resource already exists in the media library (Plex, Emby, Jellyfin). Use this tool to verify whether a movie or TV series has been successfully processed and added to the media server before performing operations like downloading or subscribing."
args_schema: Type[BaseModel] = QueryMediaLibraryInput
def get_tool_message(self, **kwargs) -> Optional[str]:
"""根据查询参数生成友好的提示消息"""
media_type = kwargs.get("media_type", "all")
title = kwargs.get("title")
year = kwargs.get("year")
parts = ["正在查询媒体库"]
if title:
parts.append(f"标题: {title}")
if year:
parts.append(f"年份: {year}")
if media_type != "all":
parts.append(f"类型: {media_type}")
return " | ".join(parts) if len(parts) > 1 else parts[0]
async def run(self, media_type: Optional[str] = "all",
title: Optional[str] = None, year: Optional[str] = None, **kwargs) -> str:
logger.info(f"执行工具: {self.name}, 参数: media_type={media_type}, title={title}")

View File

@@ -0,0 +1,152 @@
"""查询热门订阅工具"""
import json
from typing import Optional, Type
import cn2an
from pydantic import BaseModel, Field
from app.agent.tools.base import MoviePilotTool
from app.core.context import MediaInfo
from app.helper.subscribe import SubscribeHelper
from app.log import logger
from app.schemas.types import MediaType
class QueryPopularSubscribesInput(BaseModel):
"""查询热门订阅工具的输入参数模型"""
explanation: str = Field(..., description="Clear explanation of why this tool is being used in the current context")
stype: str = Field(..., description="Media type: '电影' for films, '电视剧' for television series")
page: Optional[int] = Field(1, description="Page number for pagination (default: 1)")
count: Optional[int] = Field(30, description="Number of items per page (default: 30)")
min_sub: Optional[int] = Field(None, description="Minimum number of subscribers filter (optional, e.g., 5)")
genre_id: Optional[int] = Field(None, description="Filter by genre ID (optional)")
min_rating: Optional[float] = Field(None, description="Minimum rating filter (optional, e.g., 7.5)")
max_rating: Optional[float] = Field(None, description="Maximum rating filter (optional, e.g., 10.0)")
sort_type: Optional[str] = Field(None, description="Sort type (optional, e.g., 'count', 'rating')")
class QueryPopularSubscribesTool(MoviePilotTool):
name: str = "query_popular_subscribes"
description: str = "Query popular subscriptions based on user shared data. Shows media with the most subscribers, supports filtering by genre, rating, minimum subscribers, and pagination."
args_schema: Type[BaseModel] = QueryPopularSubscribesInput
def get_tool_message(self, **kwargs) -> Optional[str]:
"""根据查询参数生成友好的提示消息"""
stype = kwargs.get("stype", "")
page = kwargs.get("page", 1)
min_sub = kwargs.get("min_sub")
min_rating = kwargs.get("min_rating")
max_rating = kwargs.get("max_rating")
parts = [f"正在查询热门订阅 [{stype}]"]
if min_sub:
parts.append(f"最少订阅: {min_sub}")
if min_rating:
parts.append(f"最低评分: {min_rating}")
if max_rating:
parts.append(f"最高评分: {max_rating}")
if page > 1:
parts.append(f"{page}")
return " | ".join(parts) if len(parts) > 1 else parts[0]
async def run(self, stype: str,
page: Optional[int] = 1,
count: Optional[int] = 30,
min_sub: Optional[int] = None,
genre_id: Optional[int] = None,
min_rating: Optional[float] = None,
max_rating: Optional[float] = None,
sort_type: Optional[str] = None, **kwargs) -> str:
logger.info(
f"执行工具: {self.name}, 参数: stype={stype}, page={page}, count={count}, min_sub={min_sub}, "
f"genre_id={genre_id}, min_rating={min_rating}, max_rating={max_rating}, sort_type={sort_type}")
try:
if page is None or page < 1:
page = 1
if count is None or count < 1:
count = 30
subscribe_helper = SubscribeHelper()
subscribes = await subscribe_helper.async_get_statistic(
stype=stype,
page=page,
count=count,
genre_id=genre_id,
min_rating=min_rating,
max_rating=max_rating,
sort_type=sort_type
)
if not subscribes:
return "未找到热门订阅数据(可能订阅统计功能未启用)"
# 转换为MediaInfo格式并过滤
ret_medias = []
for sub in subscribes:
# 订阅人数
subscriber_count = sub.get("count", 0)
# 如果设置了最小订阅人数,进行过滤
if min_sub and subscriber_count < min_sub:
continue
media = MediaInfo()
media.type = MediaType(sub.get("type"))
media.tmdb_id = sub.get("tmdbid")
# 处理标题
title = sub.get("name")
season = sub.get("season")
if season and int(season) > 1 and media.tmdb_id:
# 小写数据转大写
season_str = cn2an.an2cn(season, "low")
title = f"{title}{season_str}"
media.title = title
media.year = sub.get("year")
media.douban_id = sub.get("doubanid")
media.bangumi_id = sub.get("bangumiid")
media.tvdb_id = sub.get("tvdbid")
media.imdb_id = sub.get("imdbid")
media.season = sub.get("season")
media.vote_average = sub.get("vote")
media.poster_path = sub.get("poster")
media.backdrop_path = sub.get("backdrop")
media.popularity = subscriber_count
ret_medias.append(media)
if not ret_medias:
return "未找到符合条件的热门订阅"
# 转换为字典格式,只保留关键信息
simplified_medias = []
for media in ret_medias:
media_dict = media.to_dict()
simplified = {
"type": media_dict.get("type"),
"title": media_dict.get("title"),
"year": media_dict.get("year"),
"tmdb_id": media_dict.get("tmdb_id"),
"douban_id": media_dict.get("douban_id"),
"bangumi_id": media_dict.get("bangumi_id"),
"tvdb_id": media_dict.get("tvdb_id"),
"imdb_id": media_dict.get("imdb_id"),
"season": media_dict.get("season"),
"vote_average": media_dict.get("vote_average"),
"poster_path": media_dict.get("poster_path"),
"backdrop_path": media_dict.get("backdrop_path"),
"popularity": media_dict.get("popularity"), # 订阅人数
"subscriber_count": media_dict.get("popularity") # 明确标注为订阅人数
}
simplified_medias.append(simplified)
result_json = json.dumps(simplified_medias, ensure_ascii=False, indent=2)
pagination_info = f"{page} 页,每页 {count} 条,共 {len(simplified_medias)} 条结果"
return f"{pagination_info}\n\n{result_json}"
except Exception as e:
logger.error(f"查询热门订阅失败: {e}", exc_info=True)
return f"查询热门订阅时发生错误: {str(e)}"

View File

@@ -0,0 +1,55 @@
"""查询定时服务工具"""
import json
from typing import Optional, Type
from pydantic import BaseModel, Field
from app.agent.tools.base import MoviePilotTool
from app.log import logger
from app.scheduler import Scheduler
class QuerySchedulersInput(BaseModel):
"""查询定时服务工具的输入参数模型"""
explanation: str = Field(..., description="Clear explanation of why this tool is being used in the current context")
class QuerySchedulersTool(MoviePilotTool):
name: str = "query_schedulers"
description: str = "Query scheduled tasks and list all available scheduler jobs. Shows job status, next run time, and provider information."
args_schema: Type[BaseModel] = QuerySchedulersInput
def get_tool_message(self, **kwargs) -> Optional[str]:
"""生成友好的提示消息"""
return "正在查询定时服务"
async def run(self, **kwargs) -> str:
logger.info(f"执行工具: {self.name}")
try:
scheduler = Scheduler()
schedulers = scheduler.list()
if schedulers:
# 转换为字典列表以便JSON序列化
schedulers_list = []
for s in schedulers:
schedulers_list.append({
"id": s.id,
"name": s.name,
"provider": s.provider,
"status": s.status,
"next_run": s.next_run
})
result_json = json.dumps(schedulers_list, ensure_ascii=False, indent=2)
# 限制最多30条结果
total_count = len(schedulers_list)
if total_count > 30:
limited_schedulers = schedulers_list[:30]
limited_json = json.dumps(limited_schedulers, ensure_ascii=False, indent=2)
return f"注意:查询结果共找到 {total_count} 条,为节省上下文空间,仅显示前 30 条结果。\n\n{limited_json}"
return result_json
return "未找到定时服务"
except Exception as e:
logger.error(f"查询定时服务失败: {e}", exc_info=True)
return f"查询定时服务时发生错误: {str(e)}"

View File

@@ -0,0 +1,136 @@
"""查询站点用户数据工具"""
import json
from typing import Optional, Type
from pydantic import BaseModel, Field
from app.agent.tools.base import MoviePilotTool
from app.db import AsyncSessionFactory
from app.db.models.site import Site
from app.db.models.siteuserdata import SiteUserData
from app.log import logger
class QuerySiteUserdataInput(BaseModel):
"""查询站点用户数据工具的输入参数模型"""
explanation: str = Field(..., description="Clear explanation of why this tool is being used in the current context")
site_id: int = Field(..., description="The ID of the site to query user data for")
workdate: Optional[str] = Field(None, description="Work date to query (optional, format: 'YYYY-MM-DD', if not specified returns latest data)")
class QuerySiteUserdataTool(MoviePilotTool):
name: str = "query_site_userdata"
description: str = "Query user data for a specific site including username, user level, upload/download statistics, seeding information, bonus points, and other account details. Supports querying data for a specific date or latest data."
args_schema: Type[BaseModel] = QuerySiteUserdataInput
def get_tool_message(self, **kwargs) -> Optional[str]:
"""根据查询参数生成友好的提示消息"""
site_id = kwargs.get("site_id")
workdate = kwargs.get("workdate")
message = f"正在查询站点 #{site_id} 的用户数据"
if workdate:
message += f" (日期: {workdate})"
else:
message += " (最新数据)"
return message
async def run(self, site_id: int, workdate: Optional[str] = None, **kwargs) -> str:
logger.info(f"执行工具: {self.name}, 参数: site_id={site_id}, workdate={workdate}")
try:
# 获取数据库会话
async with AsyncSessionFactory() as db:
# 获取站点
site = await Site.async_get(db, site_id)
if not site:
return json.dumps({
"success": False,
"message": f"站点不存在: {site_id}"
}, ensure_ascii=False)
# 获取站点用户数据
user_data_list = await SiteUserData.async_get_by_domain(
db,
domain=site.domain,
workdate=workdate
)
if not user_data_list:
return json.dumps({
"success": False,
"message": f"站点 {site.name} ({site.domain}) 暂无用户数据",
"site_id": site_id,
"site_name": site.name,
"site_domain": site.domain,
"workdate": workdate
}, ensure_ascii=False)
# 格式化用户数据
result = {
"success": True,
"site_id": site_id,
"site_name": site.name,
"site_domain": site.domain,
"workdate": workdate,
"data_count": len(user_data_list),
"user_data": []
}
for user_data in user_data_list:
# 格式化上传/下载量(转换为可读格式)
upload_gb = user_data.upload / (1024 ** 3) if user_data.upload else 0
download_gb = user_data.download / (1024 ** 3) if user_data.download else 0
seeding_size_gb = user_data.seeding_size / (1024 ** 3) if user_data.seeding_size else 0
leeching_size_gb = user_data.leeching_size / (1024 ** 3) if user_data.leeching_size else 0
user_data_dict = {
"domain": user_data.domain,
"name": user_data.name,
"username": user_data.username,
"userid": user_data.userid,
"user_level": user_data.user_level,
"join_at": user_data.join_at,
"bonus": user_data.bonus,
"upload": user_data.upload,
"upload_gb": round(upload_gb, 2),
"download": user_data.download,
"download_gb": round(download_gb, 2),
"ratio": round(user_data.ratio, 2) if user_data.ratio else 0,
"seeding": int(user_data.seeding) if user_data.seeding else 0,
"leeching": int(user_data.leeching) if user_data.leeching else 0,
"seeding_size": user_data.seeding_size,
"seeding_size_gb": round(seeding_size_gb, 2),
"leeching_size": user_data.leeching_size,
"leeching_size_gb": round(leeching_size_gb, 2),
"seeding_info": user_data.seeding_info if user_data.seeding_info else [],
"message_unread": user_data.message_unread,
"message_unread_contents": user_data.message_unread_contents if user_data.message_unread_contents else [],
"err_msg": user_data.err_msg,
"updated_day": user_data.updated_day,
"updated_time": user_data.updated_time
}
result["user_data"].append(user_data_dict)
# 如果有多条数据,只返回最新的(按更新时间排序)
if len(result["user_data"]) > 1:
result["user_data"].sort(
key=lambda x: (x.get("updated_day", ""), x.get("updated_time", "")),
reverse=True
)
result["message"] = f"找到 {len(result['user_data'])} 条数据,显示最新的一条"
result["user_data"] = [result["user_data"][0]]
return json.dumps(result, ensure_ascii=False, indent=2)
except Exception as e:
error_message = f"查询站点用户数据失败: {str(e)}"
logger.error(f"查询站点用户数据失败: {e}", exc_info=True)
return json.dumps({
"success": False,
"message": error_message,
"site_id": site_id
}, ensure_ascii=False)

View File

@@ -24,6 +24,22 @@ class QuerySitesTool(MoviePilotTool):
description: str = "Query site status and list all configured sites. Shows site name, domain, status, priority, and basic configuration."
args_schema: Type[BaseModel] = QuerySitesInput
def get_tool_message(self, **kwargs) -> Optional[str]:
"""根据查询参数生成友好的提示消息"""
status = kwargs.get("status", "all")
name = kwargs.get("name")
parts = ["正在查询站点"]
if status != "all":
status_map = {"active": "已启用", "inactive": "已禁用"}
parts.append(f"状态: {status_map.get(status, status)}")
if name:
parts.append(f"名称: {name}")
return " | ".join(parts) if len(parts) > 1 else parts[0]
async def run(self, status: Optional[str] = "all", name: Optional[str] = None, **kwargs) -> str:
logger.info(f"执行工具: {self.name}, 参数: status={status}, name={name}")
try:

View File

@@ -0,0 +1,113 @@
"""查询订阅历史工具"""
import json
from typing import Optional, Type
from pydantic import BaseModel, Field
from app.agent.tools.base import MoviePilotTool
from app.db import AsyncSessionFactory
from app.db.models.subscribehistory import SubscribeHistory
from app.log import logger
class QuerySubscribeHistoryInput(BaseModel):
"""查询订阅历史工具的输入参数模型"""
explanation: str = Field(..., description="Clear explanation of why this tool is being used in the current context")
media_type: Optional[str] = Field("all", description="Filter by media type: '电影' for films, '电视剧' for television series, 'all' for all types (default: 'all')")
name: Optional[str] = Field(None, description="Filter by media name (partial match, optional)")
class QuerySubscribeHistoryTool(MoviePilotTool):
name: str = "query_subscribe_history"
description: str = "Query subscription history records. Shows completed subscriptions with their details including name, type, rating, completion date, and other subscription information. Supports filtering by media type and name. Returns up to 30 records."
args_schema: Type[BaseModel] = QuerySubscribeHistoryInput
def get_tool_message(self, **kwargs) -> Optional[str]:
"""根据查询参数生成友好的提示消息"""
media_type = kwargs.get("media_type", "all")
name = kwargs.get("name")
parts = ["正在查询订阅历史"]
if media_type != "all":
parts.append(f"类型: {media_type}")
if name:
parts.append(f"名称: {name}")
return " | ".join(parts) if len(parts) > 1 else parts[0]
async def run(self, media_type: Optional[str] = "all",
name: Optional[str] = None, **kwargs) -> str:
logger.info(f"执行工具: {self.name}, 参数: media_type={media_type}, name={name}")
try:
# 获取数据库会话
async with AsyncSessionFactory() as db:
# 根据类型查询
if media_type == "all":
# 查询所有类型,需要分别查询电影和电视剧
movie_history = await SubscribeHistory.async_list_by_type(db, mtype="movie", page=1, count=100)
tv_history = await SubscribeHistory.async_list_by_type(db, mtype="tv", page=1, count=100)
all_history = list(movie_history) + list(tv_history)
# 按日期排序
all_history.sort(key=lambda x: x.date or "", reverse=True)
else:
# 查询指定类型
all_history = await SubscribeHistory.async_list_by_type(db, mtype=media_type, page=1, count=100)
# 按名称过滤
filtered_history = []
if name:
name_lower = name.lower()
for record in all_history:
if record.name and name_lower in record.name.lower():
filtered_history.append(record)
else:
filtered_history = all_history
if not filtered_history:
return "未找到相关订阅历史记录"
# 限制最多30条
total_count = len(filtered_history)
limited_history = filtered_history[:30]
# 转换为字典格式,只保留关键信息
simplified_records = []
for record in limited_history:
simplified = {
"id": record.id,
"name": record.name,
"year": record.year,
"type": record.type,
"season": record.season,
"tmdbid": record.tmdbid,
"doubanid": record.doubanid,
"bangumiid": record.bangumiid,
"poster": record.poster,
"vote": record.vote,
"total_episode": record.total_episode,
"date": record.date,
"username": record.username
}
# 添加过滤规则信息(如果有)
if record.filter:
simplified["filter"] = record.filter
if record.quality:
simplified["quality"] = record.quality
if record.resolution:
simplified["resolution"] = record.resolution
simplified_records.append(simplified)
result_json = json.dumps(simplified_records, ensure_ascii=False, indent=2)
# 如果结果被裁剪,添加提示信息
if total_count > 30:
return f"注意:查询结果共找到 {total_count} 条,为节省上下文空间,仅显示前 30 条结果。\n\n{result_json}"
return result_json
except Exception as e:
logger.error(f"查询订阅历史失败: {e}", exc_info=True)
return f"查询订阅历史时发生错误: {str(e)}"

View File

@@ -0,0 +1,113 @@
"""查询订阅分享工具"""
import json
from typing import Optional, Type
from pydantic import BaseModel, Field
from app.agent.tools.base import MoviePilotTool
from app.helper.subscribe import SubscribeHelper
from app.log import logger
class QuerySubscribeSharesInput(BaseModel):
"""查询订阅分享工具的输入参数模型"""
explanation: str = Field(..., description="Clear explanation of why this tool is being used in the current context")
name: Optional[str] = Field(None, description="Filter shares by media name (partial match, optional)")
page: Optional[int] = Field(1, description="Page number for pagination (default: 1)")
count: Optional[int] = Field(30, description="Number of items per page (default: 30)")
genre_id: Optional[int] = Field(None, description="Filter by genre ID (optional)")
min_rating: Optional[float] = Field(None, description="Minimum rating filter (optional, e.g., 7.5)")
max_rating: Optional[float] = Field(None, description="Maximum rating filter (optional, e.g., 10.0)")
sort_type: Optional[str] = Field(None, description="Sort type (optional, e.g., 'count', 'rating')")
class QuerySubscribeSharesTool(MoviePilotTool):
name: str = "query_subscribe_shares"
description: str = "Query shared subscriptions from other users. Shows popular subscriptions shared by the community with filtering and pagination support."
args_schema: Type[BaseModel] = QuerySubscribeSharesInput
def get_tool_message(self, **kwargs) -> Optional[str]:
"""根据查询参数生成友好的提示消息"""
name = kwargs.get("name")
page = kwargs.get("page", 1)
min_rating = kwargs.get("min_rating")
max_rating = kwargs.get("max_rating")
parts = ["正在查询订阅分享"]
if name:
parts.append(f"名称: {name}")
if min_rating:
parts.append(f"最低评分: {min_rating}")
if max_rating:
parts.append(f"最高评分: {max_rating}")
if page > 1:
parts.append(f"{page}")
return " | ".join(parts) if len(parts) > 1 else parts[0]
async def run(self, name: Optional[str] = None,
page: Optional[int] = 1,
count: Optional[int] = 30,
genre_id: Optional[int] = None,
min_rating: Optional[float] = None,
max_rating: Optional[float] = None,
sort_type: Optional[str] = None, **kwargs) -> str:
logger.info(
f"执行工具: {self.name}, 参数: name={name}, page={page}, count={count}, genre_id={genre_id}, "
f"min_rating={min_rating}, max_rating={max_rating}, sort_type={sort_type}")
try:
if page is None or page < 1:
page = 1
if count is None or count < 1:
count = 30
subscribe_helper = SubscribeHelper()
shares = await subscribe_helper.async_get_shares(
name=name,
page=page,
count=count,
genre_id=genre_id,
min_rating=min_rating,
max_rating=max_rating,
sort_type=sort_type
)
if not shares:
return "未找到订阅分享数据(可能订阅分享功能未启用)"
# 简化字段,只保留关键信息
simplified_shares = []
for share in shares:
simplified = {
"id": share.get("id"),
"name": share.get("name"),
"year": share.get("year"),
"type": share.get("type"),
"season": share.get("season"),
"tmdbid": share.get("tmdbid"),
"doubanid": share.get("doubanid"),
"bangumiid": share.get("bangumiid"),
"poster": share.get("poster"),
"vote": share.get("vote"),
"share_title": share.get("share_title"),
"share_comment": share.get("share_comment"),
"share_user": share.get("share_user"),
"fork_count": share.get("fork_count", 0)
}
# 截断过长的描述
if simplified.get("description") and len(simplified["description"]) > 200:
simplified["description"] = simplified["description"][:200] + "..."
simplified_shares.append(simplified)
result_json = json.dumps(simplified_shares, ensure_ascii=False, indent=2)
pagination_info = f"{page} 页,每页 {count} 条,共 {len(simplified_shares)} 条结果"
return f"{pagination_info}\n\n{result_json}"
except Exception as e:
logger.error(f"查询订阅分享失败: {e}", exc_info=True)
return f"查询订阅分享时发生错误: {str(e)}"

View File

@@ -16,7 +16,7 @@ class QuerySubscribesInput(BaseModel):
status: Optional[str] = Field("all",
description="Filter subscriptions by status: 'R' for enabled subscriptions, 'P' for disabled ones, 'all' for all subscriptions")
media_type: Optional[str] = Field("all",
description="Filter by media type: 'movie' for films, 'tv' for television series, 'all' for all types")
description="Filter by media type: '电影' for films, '电视剧' for television series, 'all' for all types")
class QuerySubscribesTool(MoviePilotTool):
@@ -24,6 +24,24 @@ class QuerySubscribesTool(MoviePilotTool):
description: str = "Query subscription status and list all user subscriptions. Shows active subscriptions, their download status, and configuration details."
args_schema: Type[BaseModel] = QuerySubscribesInput
def get_tool_message(self, **kwargs) -> Optional[str]:
"""根据查询参数生成友好的提示消息"""
status = kwargs.get("status", "all")
media_type = kwargs.get("media_type", "all")
parts = ["正在查询订阅"]
# 根据状态过滤条件生成提示
if status != "all":
status_map = {"R": "已启用", "P": "已禁用"}
parts.append(f"状态: {status_map.get(status, status)}")
# 根据媒体类型过滤条件生成提示
if media_type != "all":
parts.append(f"类型: {media_type}")
return " | ".join(parts) if len(parts) > 1 else parts[0]
async def run(self, status: Optional[str] = "all", media_type: Optional[str] = "all", **kwargs) -> str:
logger.info(f"执行工具: {self.name}, 参数: status={status}, media_type={media_type}")
try:
@@ -37,9 +55,9 @@ class QuerySubscribesTool(MoviePilotTool):
continue
filtered_subscribes.append(sub)
if filtered_subscribes:
# 限制最多20条结果
# 限制最多50条结果
total_count = len(filtered_subscribes)
limited_subscribes = filtered_subscribes[:20]
limited_subscribes = filtered_subscribes[:50]
# 精简字段,只保留关键信息
simplified_subscribes = []
for s in limited_subscribes:
@@ -54,7 +72,6 @@ class QuerySubscribesTool(MoviePilotTool):
"bangumiid": s.bangumiid,
"poster": s.poster,
"vote": s.vote,
"description": s.description[:200] + "..." if s.description and len(s.description) > 200 else s.description,
"state": s.state,
"total_episode": s.total_episode,
"lack_episode": s.lack_episode,
@@ -64,8 +81,8 @@ class QuerySubscribesTool(MoviePilotTool):
simplified_subscribes.append(simplified)
result_json = json.dumps(simplified_subscribes, ensure_ascii=False, indent=2)
# 如果结果被裁剪,添加提示信息
if total_count > 20:
return f"注意:查询结果共找到 {total_count} 条,为节省上下文空间,仅显示前 20 条结果。\n\n{result_json}"
if total_count > 50:
return f"注意:查询结果共找到 {total_count} 条,为节省上下文空间,仅显示前 50 条结果。\n\n{result_json}"
return result_json
return "未找到相关订阅"
except Exception as e:

View File

@@ -0,0 +1,133 @@
"""查询整理历史记录工具"""
import json
from typing import Optional, Type
import jieba
from pydantic import BaseModel, Field
from app.agent.tools.base import MoviePilotTool
from app.db import AsyncSessionFactory
from app.db.models.transferhistory import TransferHistory
from app.log import logger
class QueryTransferHistoryInput(BaseModel):
"""查询整理历史记录工具的输入参数模型"""
explanation: str = Field(..., description="Clear explanation of why this tool is being used in the current context")
title: Optional[str] = Field(None, description="Search by title (optional, supports partial match)")
status: Optional[str] = Field("all",
description="Filter by status: 'success' for successful transfers, 'failed' for failed transfers, 'all' for all records (default: 'all')")
page: Optional[int] = Field(1, description="Page number for pagination (default: 1, each page contains 30 records)")
class QueryTransferHistoryTool(MoviePilotTool):
name: str = "query_transfer_history"
description: str = "Query file transfer history records. Shows transfer status, source and destination paths, media information, and transfer details. Supports filtering by title and status."
args_schema: Type[BaseModel] = QueryTransferHistoryInput
def get_tool_message(self, **kwargs) -> Optional[str]:
"""根据查询参数生成友好的提示消息"""
title = kwargs.get("title")
status = kwargs.get("status", "all")
page = kwargs.get("page", 1)
parts = ["正在查询整理历史"]
if title:
parts.append(f"标题: {title}")
if status != "all":
status_map = {"success": "成功", "failed": "失败"}
parts.append(f"状态: {status_map.get(status, status)}")
if page > 1:
parts.append(f"{page}")
return " | ".join(parts) if len(parts) > 1 else parts[0]
async def run(self, title: Optional[str] = None,
status: Optional[str] = "all",
page: Optional[int] = 1, **kwargs) -> str:
logger.info(f"执行工具: {self.name}, 参数: title={title}, status={status}, page={page}")
try:
# 处理状态参数
status_bool = None
if status == "success":
status_bool = True
elif status == "failed":
status_bool = False
# 处理页码参数
if page is None or page < 1:
page = 1
# 每页记录数
count = 50
# 获取数据库会话
async with AsyncSessionFactory() as db:
# 处理标题搜索
if title:
# 使用 jieba 分词处理标题
words = jieba.cut(title, HMM=False)
title_search = "%".join(words)
# 查询记录
result = await TransferHistory.async_list_by_title(
db, title=title_search, page=page, count=count, status=status_bool
)
total = await TransferHistory.async_count_by_title(
db, title=title_search, status=status_bool
)
else:
# 查询所有记录
result = await TransferHistory.async_list_by_page(
db, page=page, count=count, status=status_bool
)
total = await TransferHistory.async_count(db, status=status_bool)
if not result:
return "未找到相关整理历史记录"
# 转换为字典格式,只保留关键信息
simplified_records = []
for record in result:
simplified = {
"id": record.id,
"title": record.title,
"year": record.year,
"type": record.type,
"category": record.category,
"seasons": record.seasons,
"episodes": record.episodes,
"src": record.src,
"dest": record.dest,
"mode": record.mode,
"status": "成功" if record.status else "失败",
"date": record.date,
"downloader": record.downloader,
"download_hash": record.download_hash
}
# 如果失败,添加错误信息
if not record.status and record.errmsg:
simplified["errmsg"] = record.errmsg
# 添加媒体ID信息如果有
if record.tmdbid:
simplified["tmdbid"] = record.tmdbid
if record.imdbid:
simplified["imdbid"] = record.imdbid
if record.doubanid:
simplified["doubanid"] = record.doubanid
simplified_records.append(simplified)
result_json = json.dumps(simplified_records, ensure_ascii=False, indent=2)
# 计算总页数
total_pages = (total + count - 1) // count if total > 0 else 1
# 构建分页信息
pagination_info = f"{page}/{total_pages} 页,共 {total} 条记录(每页 {count} 条)"
return f"{pagination_info}\n\n{result_json}"
except Exception as e:
logger.error(f"查询整理历史记录失败: {e}", exc_info=True)
return f"查询整理历史记录时发生错误: {str(e)}"

View File

@@ -0,0 +1,128 @@
"""查询工作流工具"""
import json
from typing import Optional, Type
from pydantic import BaseModel, Field
from app.agent.tools.base import MoviePilotTool
from app.db import AsyncSessionFactory
from app.db.workflow_oper import WorkflowOper
from app.log import logger
class QueryWorkflowsInput(BaseModel):
"""查询工作流工具的输入参数模型"""
explanation: str = Field(..., description="Clear explanation of why this tool is being used in the current context")
state: Optional[str] = Field("all", description="Filter workflows by state: 'W' for waiting, 'R' for running, 'P' for paused, 'S' for success, 'F' for failed, 'all' for all workflows (default: 'all')")
name: Optional[str] = Field(None, description="Filter workflows by name (partial match, optional)")
trigger_type: Optional[str] = Field("all", description="Filter workflows by trigger type: 'timer' for scheduled, 'event' for event-triggered, 'manual' for manual, 'all' for all types (default: 'all')")
class QueryWorkflowsTool(MoviePilotTool):
name: str = "query_workflows"
description: str = "Query workflow list and status. Shows workflow name, description, trigger type, state, execution count, and other workflow details. Supports filtering by state, name, and trigger type."
args_schema: Type[BaseModel] = QueryWorkflowsInput
def get_tool_message(self, **kwargs) -> Optional[str]:
"""根据查询参数生成友好的提示消息"""
state = kwargs.get("state", "all")
name = kwargs.get("name")
trigger_type = kwargs.get("trigger_type", "all")
parts = ["正在查询工作流"]
if state != "all":
state_map = {"W": "等待", "R": "运行中", "P": "暂停", "S": "成功", "F": "失败"}
parts.append(f"状态: {state_map.get(state, state)}")
if trigger_type != "all":
trigger_map = {"timer": "定时触发", "event": "事件触发", "manual": "手动触发"}
parts.append(f"触发类型: {trigger_map.get(trigger_type, trigger_type)}")
if name:
parts.append(f"名称: {name}")
return " | ".join(parts) if len(parts) > 1 else parts[0]
async def run(self, state: Optional[str] = "all",
name: Optional[str] = None,
trigger_type: Optional[str] = "all", **kwargs) -> str:
logger.info(f"执行工具: {self.name}, 参数: state={state}, name={name}, trigger_type={trigger_type}")
try:
# 获取数据库会话
async with AsyncSessionFactory() as db:
workflow_oper = WorkflowOper(db)
workflows = await workflow_oper.async_list()
# 过滤工作流
filtered_workflows = []
for wf in workflows:
# 按状态过滤
if state != "all" and wf.state != state:
continue
# 按触发类型过滤
if trigger_type != "all":
if trigger_type == "timer" and wf.trigger_type not in ["timer", None]:
continue
elif trigger_type == "event" and wf.trigger_type != "event":
continue
elif trigger_type == "manual" and wf.trigger_type != "manual":
continue
# 按名称过滤(部分匹配)
if name and wf.name and name.lower() not in wf.name.lower():
continue
filtered_workflows.append(wf)
if not filtered_workflows:
return "未找到相关工作流"
# 转换为字典格式,只保留关键信息
simplified_workflows = []
for wf in filtered_workflows:
# 状态说明
state_map = {
"W": "等待",
"R": "运行中",
"P": "暂停",
"S": "成功",
"F": "失败"
}
state_desc = state_map.get(wf.state, wf.state)
# 触发类型说明
trigger_type_map = {
"timer": "定时触发",
"event": "事件触发",
"manual": "手动触发"
}
trigger_type_desc = trigger_type_map.get(wf.trigger_type, wf.trigger_type or "定时触发")
simplified = {
"id": wf.id,
"name": wf.name,
"description": wf.description,
"trigger_type": trigger_type_desc,
"state": state_desc,
"run_count": wf.run_count,
"timer": wf.timer,
"event_type": wf.event_type,
"add_time": wf.add_time,
"last_time": wf.last_time,
"current_action": wf.current_action
}
# 如果有结果,添加结果信息
if wf.result:
simplified["result"] = wf.result
simplified_workflows.append(simplified)
result_json = json.dumps(simplified_workflows, ensure_ascii=False, indent=2)
return result_json
except Exception as e:
logger.error(f"查询工作流失败: {e}", exc_info=True)
return f"查询工作流时发生错误: {str(e)}"

View File

@@ -0,0 +1,162 @@
"""识别媒体信息工具"""
import json
from typing import Optional, Type
from pydantic import BaseModel, Field
from app.agent.tools.base import MoviePilotTool
from app.chain.media import MediaChain
from app.core.context import Context
from app.core.metainfo import MetaInfo
from app.log import logger
class RecognizeMediaInput(BaseModel):
"""识别媒体信息工具的输入参数模型"""
explanation: str = Field(..., description="Clear explanation of why this tool is being used in the current context")
title: Optional[str] = Field(None, description="The title of the torrent/media to recognize (required for torrent recognition)")
subtitle: Optional[str] = Field(None, description="The subtitle or description of the torrent (optional, helps improve recognition accuracy)")
path: Optional[str] = Field(None, description="The file path to recognize (required for file recognition, mutually exclusive with title)")
class RecognizeMediaTool(MoviePilotTool):
name: str = "recognize_media"
description: str = "Recognize media information from torrent titles or file paths. Supports two modes: 1) Recognize from torrent title and optional subtitle, 2) Recognize from file path. Returns detailed media information including title, year, type, TMDB ID, overview, and other metadata."
args_schema: Type[BaseModel] = RecognizeMediaInput
def get_tool_message(self, **kwargs) -> Optional[str]:
"""根据识别参数生成友好的提示消息"""
title = kwargs.get("title")
subtitle = kwargs.get("subtitle")
path = kwargs.get("path")
if path:
message = f"正在识别文件媒体信息: {path}"
elif title:
message = f"正在识别种子媒体信息: {title}"
if subtitle:
message += f" ({subtitle})"
else:
message = "正在识别媒体信息"
return message
async def run(self, title: Optional[str] = None, subtitle: Optional[str] = None,
path: Optional[str] = None, **kwargs) -> str:
logger.info(f"执行工具: {self.name}, 参数: title={title}, subtitle={subtitle}, path={path}")
try:
media_chain = MediaChain()
context = None
# 根据提供的参数选择识别方式
if path:
# 文件路径识别
if not path:
return json.dumps({
"success": False,
"message": "文件路径不能为空"
}, ensure_ascii=False)
context = await media_chain.async_recognize_by_path(path)
if context:
return self._format_context_result(context, "文件")
else:
return json.dumps({
"success": False,
"message": f"无法识别文件媒体信息: {path}",
"path": path
}, ensure_ascii=False)
elif title:
# 种子标题识别
metainfo = MetaInfo(title, subtitle)
mediainfo = await media_chain.async_recognize_by_meta(metainfo)
if mediainfo:
context = Context(meta_info=metainfo, media_info=mediainfo)
return self._format_context_result(context, "种子")
else:
return json.dumps({
"success": False,
"message": f"无法识别种子媒体信息: {title}",
"title": title,
"subtitle": subtitle
}, ensure_ascii=False)
else:
return json.dumps({
"success": False,
"message": "必须提供 title标题或 path文件路径参数之一"
}, ensure_ascii=False)
except Exception as e:
error_message = f"识别媒体信息失败: {str(e)}"
logger.error(f"识别媒体信息失败: {e}", exc_info=True)
return json.dumps({
"success": False,
"message": error_message
}, ensure_ascii=False)
def _format_context_result(self, context: Context, source_type: str) -> str:
"""格式化识别结果为JSON字符串"""
if not context:
return json.dumps({
"success": False,
"message": "识别结果为空"
}, ensure_ascii=False)
context_dict = context.to_dict()
media_info = context_dict.get("media_info")
meta_info = context_dict.get("meta_info")
# 构建简化的结果
result = {
"success": True,
"source_type": source_type,
"media_info": None,
"meta_info": None
}
# 处理媒体信息
if media_info:
result["media_info"] = {
"title": media_info.get("title"),
"en_title": media_info.get("en_title"),
"year": media_info.get("year"),
"type": media_info.get("type"),
"season": media_info.get("season"),
"tmdb_id": media_info.get("tmdb_id"),
"imdb_id": media_info.get("imdb_id"),
"douban_id": media_info.get("douban_id"),
"bangumi_id": media_info.get("bangumi_id"),
"overview": media_info.get("overview"),
"vote_average": media_info.get("vote_average"),
"poster_path": media_info.get("poster_path"),
"backdrop_path": media_info.get("backdrop_path"),
"detail_link": media_info.get("detail_link"),
"title_year": media_info.get("title_year"),
"source": media_info.get("source")
}
# 处理元数据信息
if meta_info:
result["meta_info"] = {
"name": meta_info.get("name"),
"title": meta_info.get("title"),
"year": meta_info.get("year"),
"type": meta_info.get("type"),
"begin_season": meta_info.get("begin_season"),
"end_season": meta_info.get("end_season"),
"begin_episode": meta_info.get("begin_episode"),
"end_episode": meta_info.get("end_episode"),
"total_episode": meta_info.get("total_episode"),
"part": meta_info.get("part"),
"season_episode": meta_info.get("season_episode"),
"episode_list": meta_info.get("episode_list"),
"tmdbid": meta_info.get("tmdbid"),
"doubanid": meta_info.get("doubanid")
}
return json.dumps(result, ensure_ascii=False, indent=2)

View File

@@ -0,0 +1,53 @@
"""运行定时服务工具"""
from typing import Optional, Type
from pydantic import BaseModel, Field
from app.agent.tools.base import MoviePilotTool
from app.log import logger
from app.scheduler import Scheduler
class RunSchedulerInput(BaseModel):
"""运行定时服务工具的输入参数模型"""
explanation: str = Field(..., description="Clear explanation of why this tool is being used in the current context")
job_id: str = Field(..., description="The ID of the scheduled job to run (can be obtained from query_schedulers tool)")
class RunSchedulerTool(MoviePilotTool):
name: str = "run_scheduler"
description: str = "Manually trigger a scheduled task to run immediately. This will execute the specified scheduler job by its ID."
args_schema: Type[BaseModel] = RunSchedulerInput
def get_tool_message(self, **kwargs) -> Optional[str]:
"""根据运行参数生成友好的提示消息"""
job_id = kwargs.get("job_id", "")
return f"正在运行定时服务 (ID: {job_id})"
async def run(self, job_id: str, **kwargs) -> str:
logger.info(f"执行工具: {self.name}, 参数: job_id={job_id}")
try:
scheduler = Scheduler()
# 检查定时服务是否存在
schedulers = scheduler.list()
job_exists = False
job_name = None
for s in schedulers:
if s.id == job_id:
job_exists = True
job_name = s.name
break
if not job_exists:
return f"定时服务 ID {job_id} 不存在,请使用 query_schedulers 工具查询可用的定时服务"
# 运行定时服务
scheduler.start(job_id)
return f"成功触发定时服务:{job_name} (ID: {job_id})"
except Exception as e:
logger.error(f"运行定时服务失败: {e}", exc_info=True)
return f"运行定时服务时发生错误: {str(e)}"

View File

@@ -0,0 +1,72 @@
"""执行工作流工具"""
from typing import Optional, Type
from pydantic import BaseModel, Field
from app.agent.tools.base import MoviePilotTool
from app.chain.workflow import WorkflowChain
from app.db import AsyncSessionFactory
from app.db.workflow_oper import WorkflowOper
from app.log import logger
class RunWorkflowInput(BaseModel):
"""执行工作流工具的输入参数模型"""
explanation: str = Field(..., description="Clear explanation of why this tool is being used in the current context")
workflow_identifier: str = Field(..., description="Workflow identifier: can be workflow ID (integer as string) or workflow name")
from_begin: Optional[bool] = Field(True, description="Whether to run workflow from the beginning (default: True, if False will continue from last executed action)")
class RunWorkflowTool(MoviePilotTool):
name: str = "run_workflow"
description: str = "Execute a specific workflow manually. Can run workflow by ID or name. Supports running from the beginning or continuing from the last executed action."
args_schema: Type[BaseModel] = RunWorkflowInput
def get_tool_message(self, **kwargs) -> Optional[str]:
"""根据工作流参数生成友好的提示消息"""
workflow_identifier = kwargs.get("workflow_identifier", "")
from_begin = kwargs.get("from_begin", True)
message = f"正在执行工作流: {workflow_identifier}"
if not from_begin:
message += " (从上次位置继续)"
else:
message += " (从头开始)"
return message
async def run(self, workflow_identifier: str,
from_begin: Optional[bool] = True, **kwargs) -> str:
logger.info(f"执行工具: {self.name}, 参数: workflow_identifier={workflow_identifier}, from_begin={from_begin}")
try:
# 获取数据库会话
async with AsyncSessionFactory() as db:
workflow_oper = WorkflowOper(db)
# 尝试解析为工作流ID
workflow = None
if workflow_identifier.isdigit():
# 如果是数字尝试作为工作流ID查询
workflow = await workflow_oper.async_get(int(workflow_identifier))
# 如果不是ID或ID查询失败尝试按名称查询
if not workflow:
workflow = await workflow_oper.async_get_by_name(workflow_identifier)
if not workflow:
return f"未找到工作流:{workflow_identifier},请使用 query_workflows 工具查询可用的工作流"
# 执行工作流
workflow_chain = WorkflowChain()
state, errmsg = workflow_chain.process(workflow.id, from_begin=from_begin)
if not state:
return f"执行工作流失败:{workflow.name} (ID: {workflow.id})\n错误原因:{errmsg}"
else:
return f"工作流执行成功:{workflow.name} (ID: {workflow.id})"
except Exception as e:
logger.error(f"执行工作流失败: {e}", exc_info=True)
return f"执行工作流时发生错误: {str(e)}"

View File

@@ -0,0 +1,118 @@
"""刮削媒体元数据工具"""
import asyncio
import json
from pathlib import Path
from typing import Optional, Type
from pydantic import BaseModel, Field
from app.agent.tools.base import MoviePilotTool
from app.chain.media import MediaChain
from app.core.metainfo import MetaInfoPath
from app.log import logger
from app.schemas import FileItem
class ScrapeMetadataInput(BaseModel):
"""刮削媒体元数据工具的输入参数模型"""
explanation: str = Field(..., description="Clear explanation of why this tool is being used in the current context")
path: str = Field(..., description="Path to the file or directory to scrape metadata for (e.g., '/path/to/file.mkv' or '/path/to/directory')")
storage: Optional[str] = Field("local", description="Storage type: 'local' for local storage, 'smb', 'alist', etc. for remote storage (default: 'local')")
overwrite: Optional[bool] = Field(False, description="Whether to overwrite existing metadata files (default: False)")
class ScrapeMetadataTool(MoviePilotTool):
name: str = "scrape_metadata"
description: str = "Scrape media metadata (NFO files, posters, backgrounds, etc.) for a file or directory. Automatically recognizes media information from the file path and generates metadata files. Supports both local and remote storage."
args_schema: Type[BaseModel] = ScrapeMetadataInput
def get_tool_message(self, **kwargs) -> Optional[str]:
"""根据刮削参数生成友好的提示消息"""
path = kwargs.get("path", "")
storage = kwargs.get("storage", "local")
overwrite = kwargs.get("overwrite", False)
message = f"正在刮削媒体元数据: {path}"
if storage != "local":
message += f" [存储: {storage}]"
if overwrite:
message += " [覆盖模式]"
return message
async def run(self, path: str, storage: Optional[str] = "local",
overwrite: Optional[bool] = False, **kwargs) -> str:
logger.info(f"执行工具: {self.name}, 参数: path={path}, storage={storage}, overwrite={overwrite}")
try:
# 验证路径
if not path:
return json.dumps({
"success": False,
"message": "刮削路径不能为空"
}, ensure_ascii=False)
# 创建 FileItem
fileitem = FileItem(
storage=storage,
path=path,
type="file" if Path(path).suffix else "dir"
)
# 检查本地存储路径是否存在
if storage == "local":
scrape_path = Path(path)
if not scrape_path.exists():
return json.dumps({
"success": False,
"message": f"刮削路径不存在: {path}"
}, ensure_ascii=False)
# 识别媒体信息
media_chain = MediaChain()
scrape_path = Path(path)
meta = MetaInfoPath(scrape_path)
mediainfo = await media_chain.async_recognize_by_meta(meta)
if not mediainfo:
return json.dumps({
"success": False,
"message": f"刮削失败,无法识别媒体信息: {path}",
"path": path
}, ensure_ascii=False)
# 在线程池中执行同步的刮削操作
loop = asyncio.get_event_loop()
await loop.run_in_executor(
None,
lambda: media_chain.scrape_metadata(
fileitem=fileitem,
meta=meta,
mediainfo=mediainfo,
overwrite=overwrite
)
)
return json.dumps({
"success": True,
"message": f"{path} 刮削完成",
"path": path,
"media_info": {
"title": mediainfo.title,
"year": mediainfo.year,
"type": mediainfo.type.value if mediainfo.type else None,
"tmdb_id": mediainfo.tmdb_id,
"season": mediainfo.season
}
}, ensure_ascii=False, indent=2)
except Exception as e:
error_message = f"刮削媒体元数据失败: {str(e)}"
logger.error(f"刮削媒体元数据失败: {e}", exc_info=True)
return json.dumps({
"success": False,
"message": error_message,
"path": path
}, ensure_ascii=False)

View File

@@ -27,6 +27,23 @@ class SearchMediaTool(MoviePilotTool):
description: str = "Search for media resources including movies, TV shows, anime, etc. Supports searching by title, year, type, and other criteria. Returns detailed media information from TMDB database."
args_schema: Type[BaseModel] = SearchMediaInput
def get_tool_message(self, **kwargs) -> Optional[str]:
"""根据搜索参数生成友好的提示消息"""
title = kwargs.get("title", "")
year = kwargs.get("year")
media_type = kwargs.get("media_type")
season = kwargs.get("season")
message = f"正在搜索媒体: {title}"
if year:
message += f" ({year})"
if media_type:
message += f" [{media_type}]"
if season:
message += f"{season}"
return message
async def run(self, title: str, year: Optional[str] = None,
media_type: Optional[str] = None, season: Optional[int] = None, **kwargs) -> str:
logger.info(
@@ -60,9 +77,9 @@ class SearchMediaTool(MoviePilotTool):
filtered_results.append(result)
if filtered_results:
# 限制最多20条结果
# 限制最多30条结果
total_count = len(filtered_results)
limited_results = filtered_results[:20]
limited_results = filtered_results[:30]
# 精简字段,只保留关键信息
simplified_results = []
for r in limited_results:
@@ -83,8 +100,8 @@ class SearchMediaTool(MoviePilotTool):
simplified_results.append(simplified)
result_json = json.dumps(simplified_results, ensure_ascii=False, indent=2)
# 如果结果被裁剪,添加提示信息
if total_count > 20:
return f"注意:搜索结果共找到 {total_count} 条,为节省上下文空间,仅显示前 20 条结果。\n\n{result_json}"
if total_count > 30:
return f"注意:搜索结果共找到 {total_count} 条,为节省上下文空间,仅显示前 30 条结果。\n\n{result_json}"
return result_json
else:
return f"未找到符合条件的媒体资源: {title}"

View File

@@ -33,6 +33,26 @@ class SearchTorrentsTool(MoviePilotTool):
description: str = "Search for torrent files across configured indexer sites based on media information. Returns available torrent downloads with details like file size, quality, and download links."
args_schema: Type[BaseModel] = SearchTorrentsInput
def get_tool_message(self, **kwargs) -> Optional[str]:
"""根据搜索参数生成友好的提示消息"""
title = kwargs.get("title", "")
year = kwargs.get("year")
media_type = kwargs.get("media_type")
season = kwargs.get("season")
filter_pattern = kwargs.get("filter_pattern")
message = f"正在搜索种子: {title}"
if year:
message += f" ({year})"
if media_type:
message += f" [{media_type}]"
if season:
message += f"{season}"
if filter_pattern:
message += f" 过滤: {filter_pattern}"
return message
async def run(self, title: str, year: Optional[str] = None,
media_type: Optional[str] = None, season: Optional[int] = None,
sites: Optional[List[int]] = None, filter_pattern: Optional[str] = None, **kwargs) -> str:

View File

@@ -21,6 +21,20 @@ class SendMessageTool(MoviePilotTool):
description: str = "Send notification message to the user through configured notification channels (Telegram, Slack, WeChat, etc.). Used to inform users about operation results, errors, or important updates."
args_schema: Type[BaseModel] = SendMessageInput
def get_tool_message(self, **kwargs) -> Optional[str]:
"""根据消息参数生成友好的提示消息"""
message = kwargs.get("message", "")
message_type = kwargs.get("message_type", "info")
type_map = {"info": "信息", "success": "成功", "warning": "警告", "error": "错误"}
type_desc = type_map.get(message_type, message_type)
# 截断过长的消息
if len(message) > 50:
message = message[:50] + "..."
return f"正在发送{type_desc}消息: {message}"
async def run(self, message: str, message_type: Optional[str] = None, **kwargs) -> str:
logger.info(f"执行工具: {self.name}, 参数: message={message}, message_type={message_type}")
try:

View File

@@ -0,0 +1,72 @@
"""测试站点连通性工具"""
from typing import Optional, Type
from pydantic import BaseModel, Field
from app.agent.tools.base import MoviePilotTool
from app.chain.site import SiteChain
from app.db.site_oper import SiteOper
from app.log import logger
from app.utils.string import StringUtils
class TestSiteInput(BaseModel):
"""测试站点连通性工具的输入参数模型"""
explanation: str = Field(..., description="Clear explanation of why this tool is being used in the current context")
site_identifier: str = Field(..., description="Site identifier: can be site ID (integer as string), site name, or site domain/URL")
class TestSiteTool(MoviePilotTool):
name: str = "test_site"
description: str = "Test site connectivity and availability. This will check if a site is accessible and can be logged in. Accepts site ID, site name, or site domain/URL as identifier."
args_schema: Type[BaseModel] = TestSiteInput
def get_tool_message(self, **kwargs) -> Optional[str]:
"""根据测试参数生成友好的提示消息"""
site_identifier = kwargs.get("site_identifier", "")
return f"正在测试站点连通性: {site_identifier}"
async def run(self, site_identifier: str, **kwargs) -> str:
logger.info(f"执行工具: {self.name}, 参数: site_identifier={site_identifier}")
try:
site_oper = SiteOper()
site_chain = SiteChain()
# 尝试解析为站点ID
site = None
if site_identifier.isdigit():
# 如果是数字尝试作为站点ID查询
site = await site_oper.async_get(int(site_identifier))
# 如果不是ID或ID查询失败尝试按名称或域名查询
if not site:
# 尝试按名称查询
sites = await site_oper.async_list()
for s in sites:
if (site_identifier.lower() in (s.name or "").lower()) or \
(site_identifier.lower() in (s.domain or "").lower()):
site = s
break
# 如果还是没找到尝试从URL提取域名
if not site:
domain = StringUtils.get_url_domain(site_identifier)
if domain:
site = await site_oper.async_get_by_domain(domain)
if not site:
return f"未找到站点:{site_identifier},请使用 query_sites 工具查询可用的站点"
# 测试站点连通性
status, message = site_chain.test(site.domain)
if status:
return f"站点连通性测试成功:{site.name} ({site.domain})\n{message}"
else:
return f"站点连通性测试失败:{site.name} ({site.domain})\n{message}"
except Exception as e:
logger.error(f"测试站点连通性失败: {e}", exc_info=True)
return f"测试站点连通性时发生错误: {str(e)}"

View File

@@ -0,0 +1,134 @@
"""整理文件或目录工具"""
from pathlib import Path
from typing import Optional, Type
from pydantic import BaseModel, Field
from app.agent.tools.base import MoviePilotTool
from app.chain.transfer import TransferChain
from app.log import logger
from app.schemas import FileItem, MediaType
class TransferFileInput(BaseModel):
"""整理文件或目录工具的输入参数模型"""
explanation: str = Field(..., description="Clear explanation of why this tool is being used in the current context")
file_path: str = Field(..., description="Path to the file or directory to transfer (e.g., '/path/to/file.mkv' or '/path/to/directory')")
storage: Optional[str] = Field("local", description="Storage type of the source file (default: 'local', can be 'smb', 'alist', etc.)")
target_path: Optional[str] = Field(None, description="Target path for the transferred file/directory (optional, uses default library path if not specified)")
target_storage: Optional[str] = Field(None, description="Target storage type (optional, uses default storage if not specified)")
media_type: Optional[str] = Field(None, description="Media type: '电影' for films, '电视剧' for television series (optional, will be auto-detected if not specified)")
tmdbid: Optional[int] = Field(None, description="TMDB ID for precise media identification (optional but recommended for accuracy)")
doubanid: Optional[str] = Field(None, description="Douban ID for media identification (optional)")
season: Optional[int] = Field(None, description="Season number for TV shows (optional)")
transfer_type: Optional[str] = Field(None, description="Transfer mode: 'move' to move files, 'copy' to copy files, 'link' for hard link, 'softlink' for symbolic link (optional, uses default mode if not specified)")
background: Optional[bool] = Field(False, description="Whether to run transfer in background (default: False, runs synchronously)")
class TransferFileTool(MoviePilotTool):
name: str = "transfer_file"
description: str = "Transfer/organize a file or directory to the media library. Automatically recognizes media information and organizes files according to configured rules. Supports custom target paths, media identification, and transfer modes."
args_schema: Type[BaseModel] = TransferFileInput
def get_tool_message(self, **kwargs) -> Optional[str]:
"""根据整理参数生成友好的提示消息"""
file_path = kwargs.get("file_path", "")
media_type = kwargs.get("media_type")
transfer_type = kwargs.get("transfer_type")
background = kwargs.get("background", False)
message = f"正在整理文件: {file_path}"
if media_type:
message += f" [{media_type}]"
if transfer_type:
transfer_map = {"move": "移动", "copy": "复制", "link": "硬链接", "softlink": "软链接"}
message += f" 模式: {transfer_map.get(transfer_type, transfer_type)}"
if background:
message += " [后台运行]"
return message
async def run(self, file_path: str, storage: Optional[str] = "local",
target_path: Optional[str] = None,
target_storage: Optional[str] = None,
media_type: Optional[str] = None,
tmdbid: Optional[int] = None,
doubanid: Optional[str] = None,
season: Optional[int] = None,
transfer_type: Optional[str] = None,
background: Optional[bool] = False, **kwargs) -> str:
logger.info(
f"执行工具: {self.name}, 参数: file_path={file_path}, storage={storage}, target_path={target_path}, "
f"target_storage={target_storage}, media_type={media_type}, tmdbid={tmdbid}, doubanid={doubanid}, "
f"season={season}, transfer_type={transfer_type}, background={background}")
try:
if not file_path:
return "错误:必须提供文件或目录路径"
# 规范化路径
if storage == "local":
# 本地路径处理
if not file_path.startswith("/") and not (len(file_path) > 1 and file_path[1] == ":"):
# 相对路径,尝试转换为绝对路径
file_path = str(Path(file_path).resolve())
else:
# 远程存储路径,确保以/开头
if not file_path.startswith("/"):
file_path = "/" + file_path
# 创建FileItem
fileitem = FileItem(
storage=storage or "local",
path=file_path,
type="dir" if file_path.endswith("/") else "file"
)
# 处理目标路径
target_path_obj = None
if target_path:
target_path_obj = Path(target_path)
# 处理媒体类型
mtype = None
if media_type:
try:
mtype = MediaType(media_type)
except ValueError:
return f"错误:无效的媒体类型 '{media_type}',支持的类型:'movie', 'tv'"
# 调用整理方法
transfer_chain = TransferChain()
state, errormsg = transfer_chain.manual_transfer(
fileitem=fileitem,
target_storage=target_storage,
target_path=target_path_obj,
tmdbid=tmdbid,
doubanid=doubanid,
mtype=mtype,
season=season,
transfer_type=transfer_type,
background=background
)
if not state:
# 处理错误信息
if isinstance(errormsg, list):
error_text = f"整理完成,{len(errormsg)} 个文件转移失败"
if errormsg:
error_text += f"\n" + "\n".join(str(e) for e in errormsg[:5]) # 只显示前5个错误
if len(errormsg) > 5:
error_text += f"\n... 还有 {len(errormsg) - 5} 个错误"
else:
error_text = str(errormsg)
return f"整理失败:{error_text}"
else:
if background:
return f"整理任务已提交到后台运行:{file_path}"
else:
return f"整理成功:{file_path}"
except Exception as e:
logger.error(f"整理文件失败: {e}", exc_info=True)
return f"整理文件时发生错误: {str(e)}"

View File

@@ -0,0 +1,203 @@
"""更新站点工具"""
import json
from typing import Optional, Type
from pydantic import BaseModel, Field
from app.agent.tools.base import MoviePilotTool
from app.core.event import eventmanager
from app.db import AsyncSessionFactory
from app.db.models.site import Site
from app.log import logger
from app.schemas.types import EventType
from app.utils.string import StringUtils
class UpdateSiteInput(BaseModel):
"""更新站点工具的输入参数模型"""
explanation: str = Field(..., description="Clear explanation of why this tool is being used in the current context")
site_id: int = Field(..., description="The ID of the site to update")
name: Optional[str] = Field(None, description="Site name (optional)")
url: Optional[str] = Field(None, description="Site URL (optional, will be automatically formatted)")
pri: Optional[int] = Field(None, description="Site priority (optional, higher number = higher priority)")
rss: Optional[str] = Field(None, description="RSS feed URL (optional)")
cookie: Optional[str] = Field(None, description="Site cookie (optional)")
ua: Optional[str] = Field(None, description="User-Agent string (optional)")
apikey: Optional[str] = Field(None, description="API key (optional)")
token: Optional[str] = Field(None, description="API token (optional)")
proxy: Optional[int] = Field(None, description="Whether to use proxy: 0 for no, 1 for yes (optional)")
filter: Optional[str] = Field(None, description="Filter rule as regular expression (optional)")
note: Optional[str] = Field(None, description="Site notes/remarks (optional)")
timeout: Optional[int] = Field(None, description="Request timeout in seconds (optional, default: 15)")
limit_interval: Optional[int] = Field(None, description="Rate limit interval in seconds (optional)")
limit_count: Optional[int] = Field(None, description="Rate limit count per interval (optional)")
limit_seconds: Optional[int] = Field(None, description="Rate limit seconds between requests (optional)")
is_active: Optional[bool] = Field(None, description="Whether site is active: True for enabled, False for disabled (optional)")
downloader: Optional[str] = Field(None, description="Downloader name for this site (optional)")
class UpdateSiteTool(MoviePilotTool):
name: str = "update_site"
description: str = "Update site configuration including URL, priority, authentication credentials (cookie, UA, API key), proxy settings, rate limits, and other site properties. Supports updating multiple site attributes at once."
args_schema: Type[BaseModel] = UpdateSiteInput
def get_tool_message(self, **kwargs) -> Optional[str]:
"""根据更新参数生成友好的提示消息"""
site_id = kwargs.get("site_id")
fields_updated = []
if kwargs.get("name"):
fields_updated.append("名称")
if kwargs.get("url"):
fields_updated.append("URL")
if kwargs.get("pri") is not None:
fields_updated.append("优先级")
if kwargs.get("cookie"):
fields_updated.append("Cookie")
if kwargs.get("ua"):
fields_updated.append("User-Agent")
if kwargs.get("proxy") is not None:
fields_updated.append("代理设置")
if kwargs.get("is_active") is not None:
fields_updated.append("启用状态")
if kwargs.get("downloader"):
fields_updated.append("下载器")
if fields_updated:
return f"正在更新站点 #{site_id}: {', '.join(fields_updated)}"
return f"正在更新站点 #{site_id}"
async def run(self, site_id: int,
name: Optional[str] = None,
url: Optional[str] = None,
pri: Optional[int] = None,
rss: Optional[str] = None,
cookie: Optional[str] = None,
ua: Optional[str] = None,
apikey: Optional[str] = None,
token: Optional[str] = None,
proxy: Optional[int] = None,
filter: Optional[str] = None,
note: Optional[str] = None,
timeout: Optional[int] = None,
limit_interval: Optional[int] = None,
limit_count: Optional[int] = None,
limit_seconds: Optional[int] = None,
is_active: Optional[bool] = None,
downloader: Optional[str] = None,
**kwargs) -> str:
logger.info(f"执行工具: {self.name}, 参数: site_id={site_id}")
try:
# 获取数据库会话
async with AsyncSessionFactory() as db:
# 获取站点
site = await Site.async_get(db, site_id)
if not site:
return json.dumps({
"success": False,
"message": f"站点不存在: {site_id}"
}, ensure_ascii=False)
# 构建更新字典
site_dict = {}
# 基本信息
if name is not None:
site_dict["name"] = name
# URL处理需要校正格式
if url is not None:
_scheme, _netloc = StringUtils.get_url_netloc(url)
site_dict["url"] = f"{_scheme}://{_netloc}/"
if pri is not None:
site_dict["pri"] = pri
if rss is not None:
site_dict["rss"] = rss
# 认证信息
if cookie is not None:
site_dict["cookie"] = cookie
if ua is not None:
site_dict["ua"] = ua
if apikey is not None:
site_dict["apikey"] = apikey
if token is not None:
site_dict["token"] = token
# 配置选项
if proxy is not None:
site_dict["proxy"] = proxy
if filter is not None:
site_dict["filter"] = filter
if note is not None:
site_dict["note"] = note
if timeout is not None:
site_dict["timeout"] = timeout
# 流控设置
if limit_interval is not None:
site_dict["limit_interval"] = limit_interval
if limit_count is not None:
site_dict["limit_count"] = limit_count
if limit_seconds is not None:
site_dict["limit_seconds"] = limit_seconds
# 状态和下载器
if is_active is not None:
site_dict["is_active"] = is_active
if downloader is not None:
site_dict["downloader"] = downloader
# 如果没有要更新的字段
if not site_dict:
return json.dumps({
"success": False,
"message": "没有提供要更新的字段"
}, ensure_ascii=False)
# 更新站点
await site.async_update(db, site_dict)
# 重新获取更新后的站点数据
updated_site = await Site.async_get(db, site_id)
# 发送站点更新事件
await eventmanager.async_send_event(EventType.SiteUpdated, {
"domain": updated_site.domain if updated_site else site.domain
})
# 构建返回结果
result = {
"success": True,
"message": f"站点 #{site_id} 更新成功",
"site_id": site_id,
"updated_fields": list(site_dict.keys())
}
if updated_site:
result["site"] = {
"id": updated_site.id,
"name": updated_site.name,
"domain": updated_site.domain,
"url": updated_site.url,
"pri": updated_site.pri,
"is_active": updated_site.is_active,
"downloader": updated_site.downloader,
"proxy": updated_site.proxy,
"timeout": updated_site.timeout
}
return json.dumps(result, ensure_ascii=False, indent=2)
except Exception as e:
error_message = f"更新站点失败: {str(e)}"
logger.error(f"更新站点失败: {e}", exc_info=True)
return json.dumps({
"success": False,
"message": error_message,
"site_id": site_id
}, ensure_ascii=False)

View File

@@ -0,0 +1,88 @@
"""更新站点Cookie和UA工具"""
from typing import Optional, Type
from pydantic import BaseModel, Field
from app.agent.tools.base import MoviePilotTool
from app.chain.site import SiteChain
from app.db.site_oper import SiteOper
from app.log import logger
from app.utils.string import StringUtils
class UpdateSiteCookieInput(BaseModel):
"""更新站点Cookie和UA工具的输入参数模型"""
explanation: str = Field(..., description="Clear explanation of why this tool is being used in the current context")
site_identifier: str = Field(..., description="Site identifier: can be site ID (integer as string), site name, or site domain/URL")
username: str = Field(..., description="Site login username")
password: str = Field(..., description="Site login password")
two_step_code: Optional[str] = Field(None, description="Two-step verification code or secret key (optional, required for sites with 2FA enabled)")
class UpdateSiteCookieTool(MoviePilotTool):
name: str = "update_site_cookie"
description: str = "Update site Cookie and User-Agent by logging in with username and password. This tool can automatically obtain and update the site's authentication credentials. Supports two-step verification for sites that require it. Accepts site ID, site name, or site domain/URL as identifier."
args_schema: Type[BaseModel] = UpdateSiteCookieInput
def get_tool_message(self, **kwargs) -> Optional[str]:
"""根据更新参数生成友好的提示消息"""
site_identifier = kwargs.get("site_identifier", "")
username = kwargs.get("username", "")
two_step_code = kwargs.get("two_step_code")
message = f"正在更新站点Cookie: {site_identifier} (用户: {username})"
if two_step_code:
message += " [需要两步验证]"
return message
async def run(self, site_identifier: str, username: str, password: str,
two_step_code: Optional[str] = None, **kwargs) -> str:
logger.info(f"执行工具: {self.name}, 参数: site_identifier={site_identifier}, username={username}")
try:
site_oper = SiteOper()
site_chain = SiteChain()
# 尝试解析为站点ID
site = None
if site_identifier.isdigit():
# 如果是数字尝试作为站点ID查询
site = await site_oper.async_get(int(site_identifier))
# 如果不是ID或ID查询失败尝试按名称或域名查询
if not site:
# 尝试按名称查询
sites = await site_oper.async_list()
for s in sites:
if (site_identifier.lower() in (s.name or "").lower()) or \
(site_identifier.lower() in (s.domain or "").lower()):
site = s
break
# 如果还是没找到尝试从URL提取域名
if not site:
domain = StringUtils.get_url_domain(site_identifier)
if domain:
site = await site_oper.async_get_by_domain(domain)
if not site:
return f"未找到站点:{site_identifier},请使用 query_sites 工具查询可用的站点"
# 更新站点Cookie和UA
status, message = site_chain.update_cookie(
site_info=site,
username=username,
password=password,
two_step_code=two_step_code
)
if status:
return f"站点【{site.name}】Cookie和UA更新成功\n{message}"
else:
return f"站点【{site.name}】Cookie和UA更新失败\n错误原因:{message}"
except Exception as e:
logger.error(f"更新站点Cookie和UA失败: {e}", exc_info=True)
return f"更新站点Cookie和UA时发生错误: {str(e)}"

View File

@@ -0,0 +1,239 @@
"""更新订阅工具"""
import json
from typing import Optional, Type, List
from pydantic import BaseModel, Field
from app.agent.tools.base import MoviePilotTool
from app.core.event import eventmanager
from app.db import AsyncSessionFactory
from app.db.models.subscribe import Subscribe
from app.log import logger
from app.schemas.types import EventType
class UpdateSubscribeInput(BaseModel):
"""更新订阅工具的输入参数模型"""
explanation: str = Field(..., description="Clear explanation of why this tool is being used in the current context")
subscribe_id: int = Field(..., description="The ID of the subscription to update")
name: Optional[str] = Field(None, description="Subscription name/title (optional)")
year: Optional[str] = Field(None, description="Release year (optional)")
season: Optional[int] = Field(None, description="Season number for TV shows (optional)")
total_episode: Optional[int] = Field(None, description="Total number of episodes (optional)")
lack_episode: Optional[int] = Field(None, description="Number of missing episodes (optional)")
start_episode: Optional[int] = Field(None, description="Starting episode number (optional)")
quality: Optional[str] = Field(None, description="Quality filter as regular expression (optional, e.g., 'BluRay|WEB-DL|HDTV')")
resolution: Optional[str] = Field(None, description="Resolution filter as regular expression (optional, e.g., '1080p|720p|2160p')")
effect: Optional[str] = Field(None, description="Effect filter as regular expression (optional, e.g., 'HDR|DV|SDR')")
include: Optional[str] = Field(None, description="Include filter as regular expression (optional)")
exclude: Optional[str] = Field(None, description="Exclude filter as regular expression (optional)")
filter: Optional[str] = Field(None, description="Filter rule as regular expression (optional)")
state: Optional[str] = Field(None, description="Subscription state: 'R' for enabled, 'P' for disabled, 'S' for paused (optional)")
sites: Optional[List[int]] = Field(None, description="List of site IDs to search from (optional)")
downloader: Optional[str] = Field(None, description="Downloader name (optional)")
save_path: Optional[str] = Field(None, description="Save path for downloaded files (optional)")
best_version: Optional[int] = Field(None, description="Whether to upgrade to best version: 0 for no, 1 for yes (optional)")
custom_words: Optional[str] = Field(None, description="Custom recognition words (optional)")
media_category: Optional[str] = Field(None, description="Custom media category (optional)")
episode_group: Optional[str] = Field(None, description="Episode group ID (optional)")
class UpdateSubscribeTool(MoviePilotTool):
name: str = "update_subscribe"
description: str = "Update subscription properties including filters, episode counts, state, and other settings. Supports updating quality/resolution filters, episode tracking, subscription state, and download configuration."
args_schema: Type[BaseModel] = UpdateSubscribeInput
def get_tool_message(self, **kwargs) -> Optional[str]:
"""根据更新参数生成友好的提示消息"""
subscribe_id = kwargs.get("subscribe_id")
fields_updated = []
if kwargs.get("name"):
fields_updated.append("名称")
if kwargs.get("total_episode") is not None:
fields_updated.append("总集数")
if kwargs.get("lack_episode") is not None:
fields_updated.append("缺失集数")
if kwargs.get("quality"):
fields_updated.append("质量过滤")
if kwargs.get("resolution"):
fields_updated.append("分辨率过滤")
if kwargs.get("state"):
state_map = {"R": "启用", "P": "禁用", "S": "暂停"}
fields_updated.append(f"状态({state_map.get(kwargs.get('state'), kwargs.get('state'))})")
if kwargs.get("sites"):
fields_updated.append("站点")
if kwargs.get("downloader"):
fields_updated.append("下载器")
if fields_updated:
return f"正在更新订阅 #{subscribe_id}: {', '.join(fields_updated)}"
return f"正在更新订阅 #{subscribe_id}"
async def run(self, subscribe_id: int,
name: Optional[str] = None,
year: Optional[str] = None,
season: Optional[int] = None,
total_episode: Optional[int] = None,
lack_episode: Optional[int] = None,
start_episode: Optional[int] = None,
quality: Optional[str] = None,
resolution: Optional[str] = None,
effect: Optional[str] = None,
include: Optional[str] = None,
exclude: Optional[str] = None,
filter: Optional[str] = None,
state: Optional[str] = None,
sites: Optional[List[int]] = None,
downloader: Optional[str] = None,
save_path: Optional[str] = None,
best_version: Optional[int] = None,
custom_words: Optional[str] = None,
media_category: Optional[str] = None,
episode_group: Optional[str] = None,
**kwargs) -> str:
logger.info(f"执行工具: {self.name}, 参数: subscribe_id={subscribe_id}")
try:
# 获取数据库会话
async with AsyncSessionFactory() as db:
# 获取订阅
subscribe = await Subscribe.async_get(db, subscribe_id)
if not subscribe:
return json.dumps({
"success": False,
"message": f"订阅不存在: {subscribe_id}"
}, ensure_ascii=False)
# 保存旧数据用于事件
old_subscribe_dict = subscribe.to_dict()
# 构建更新字典
subscribe_dict = {}
# 基本信息
if name is not None:
subscribe_dict["name"] = name
if year is not None:
subscribe_dict["year"] = year
if season is not None:
subscribe_dict["season"] = season
# 集数相关
if total_episode is not None:
subscribe_dict["total_episode"] = total_episode
# 如果总集数增加,缺失集数也要相应增加
if total_episode > (subscribe.total_episode or 0):
old_lack = subscribe.lack_episode or 0
subscribe_dict["lack_episode"] = old_lack + (total_episode - (subscribe.total_episode or 0))
# 标记为手动修改过总集数
subscribe_dict["manual_total_episode"] = 1
# 缺失集数处理(只有在没有提供总集数时才单独处理)
# 注意:如果 lack_episode 为 0不更新避免更新为0
if lack_episode is not None and total_episode is None:
if lack_episode > 0:
subscribe_dict["lack_episode"] = lack_episode
# 如果 lack_episode 为 0不添加到更新字典中保持原值或由总集数逻辑处理
if start_episode is not None:
subscribe_dict["start_episode"] = start_episode
# 过滤规则
if quality is not None:
subscribe_dict["quality"] = quality
if resolution is not None:
subscribe_dict["resolution"] = resolution
if effect is not None:
subscribe_dict["effect"] = effect
if include is not None:
subscribe_dict["include"] = include
if exclude is not None:
subscribe_dict["exclude"] = exclude
if filter is not None:
subscribe_dict["filter"] = filter
# 状态
if state is not None:
valid_states = ["R", "P", "S", "N"]
if state not in valid_states:
return json.dumps({
"success": False,
"message": f"无效的订阅状态: {state},有效状态: {', '.join(valid_states)}"
}, ensure_ascii=False)
subscribe_dict["state"] = state
# 下载配置
if sites is not None:
subscribe_dict["sites"] = sites
if downloader is not None:
subscribe_dict["downloader"] = downloader
if save_path is not None:
subscribe_dict["save_path"] = save_path
if best_version is not None:
subscribe_dict["best_version"] = best_version
# 其他配置
if custom_words is not None:
subscribe_dict["custom_words"] = custom_words
if media_category is not None:
subscribe_dict["media_category"] = media_category
if episode_group is not None:
subscribe_dict["episode_group"] = episode_group
# 如果没有要更新的字段
if not subscribe_dict:
return json.dumps({
"success": False,
"message": "没有提供要更新的字段"
}, ensure_ascii=False)
# 更新订阅
await subscribe.async_update(db, subscribe_dict)
# 重新获取更新后的订阅数据
updated_subscribe = await Subscribe.async_get(db, subscribe_id)
# 发送订阅调整事件
await eventmanager.async_send_event(EventType.SubscribeModified, {
"subscribe_id": subscribe_id,
"old_subscribe_info": old_subscribe_dict,
"subscribe_info": updated_subscribe.to_dict() if updated_subscribe else {},
})
# 构建返回结果
result = {
"success": True,
"message": f"订阅 #{subscribe_id} 更新成功",
"subscribe_id": subscribe_id,
"updated_fields": list(subscribe_dict.keys())
}
if updated_subscribe:
result["subscribe"] = {
"id": updated_subscribe.id,
"name": updated_subscribe.name,
"year": updated_subscribe.year,
"type": updated_subscribe.type,
"season": updated_subscribe.season,
"state": updated_subscribe.state,
"total_episode": updated_subscribe.total_episode,
"lack_episode": updated_subscribe.lack_episode,
"start_episode": updated_subscribe.start_episode,
"quality": updated_subscribe.quality,
"resolution": updated_subscribe.resolution,
"effect": updated_subscribe.effect
}
return json.dumps(result, ensure_ascii=False, indent=2)
except Exception as e:
error_message = f"更新订阅失败: {str(e)}"
logger.error(f"更新订阅失败: {e}", exc_info=True)
return json.dumps({
"success": False,
"message": error_message,
"subscribe_id": subscribe_id
}, ensure_ascii=False)

View File

@@ -219,10 +219,10 @@ async def read_userdata(
status_code=404,
detail=f"站点 {site_id} 不存在",
)
user_data = await SiteUserData.async_get_by_domain(db, domain=site.domain, workdate=workdate)
if not user_data:
user_datas = await SiteUserData.async_get_by_domain(db, domain=site.domain, workdate=workdate)
if not user_datas:
return schemas.Response(success=False, data=[])
return schemas.Response(success=True, data=user_data)
return schemas.Response(success=True, data=[data.to_dict() for data in user_datas])
@router.get("/test/{site_id}", summary="连接测试", response_model=schemas.Response)

View File

@@ -1,5 +1,7 @@
import asyncio
import re
import time
from datetime import datetime, timedelta
from typing import Any, Optional, Dict, Union, List
from app.agent import agent_manager
@@ -35,6 +37,10 @@ class MessageChain(ChainBase):
_cache_file = "__user_messages__"
# 每页数据量
_page_size: int = 8
# 用户会话信息 {userid: (session_id, last_time)}
_user_sessions: Dict[Union[str, int], tuple] = {}
# 会话超时时间(分钟)
_session_timeout_minutes: int = 15
@staticmethod
def __get_noexits_info(
@@ -822,6 +828,91 @@ class MessageChain(ChainBase):
return buttons
@staticmethod
def _get_or_create_session_id(userid: Union[str, int]) -> str:
"""
获取或创建会话ID
如果用户上次会话在15分钟内则复用相同的会话ID否则创建新的会话ID
"""
current_time = datetime.now()
# 检查用户是否有已存在的会话
if userid in MessageChain._user_sessions:
session_id, last_time = MessageChain._user_sessions[userid]
# 计算时间差
time_diff = current_time - last_time
# 如果时间差小于等于15分钟复用会话ID
if time_diff <= timedelta(minutes=MessageChain._session_timeout_minutes):
# 更新最后使用时间
MessageChain._user_sessions[userid] = (session_id, current_time)
logger.info(f"复用会话ID: {session_id}, 用户: {userid}, 距离上次会话: {time_diff.total_seconds() / 60:.1f}分钟")
return session_id
# 创建新的会话ID
new_session_id = f"user_{userid}_{int(time.time())}"
MessageChain._user_sessions[userid] = (new_session_id, current_time)
logger.info(f"创建新会话ID: {new_session_id}, 用户: {userid}")
return new_session_id
@staticmethod
def clear_user_session(userid: Union[str, int]) -> bool:
"""
清除指定用户的会话信息
返回是否成功清除
"""
if userid in MessageChain._user_sessions:
session_id, _ = MessageChain._user_sessions.pop(userid)
logger.info(f"已清除用户 {userid} 的会话: {session_id}")
return True
return False
def remote_clear_session(self, channel: MessageChannel, userid: Union[str, int], source: Optional[str] = None):
"""
清除用户会话(远程命令接口)
"""
# 获取并清除会话信息
session_id = None
if userid in MessageChain._user_sessions:
session_id, _ = MessageChain._user_sessions.pop(userid)
logger.info(f"已清除用户 {userid} 的会话: {session_id}")
# 如果有会话ID同时清除智能体的会话记忆
if session_id:
try:
try:
loop = asyncio.get_event_loop()
loop.run_until_complete(
agent_manager.clear_session(
session_id=session_id,
user_id=str(userid)
)
)
except RuntimeError:
asyncio.run(
agent_manager.clear_session(
session_id=session_id,
user_id=str(userid)
)
)
except Exception as e:
logger.warning(f"清除智能体会话记忆失败: {e}")
self.post_message(Notification(
channel=channel,
source=source,
title="智能体会话已清除,下次将创建新的会话",
userid=userid
))
else:
self.post_message(Notification(
channel=channel,
source=source,
title="您当前没有活跃的智能体会话",
userid=userid
))
def _handle_ai_message(self, text: str, channel: MessageChannel, source: str,
userid: Union[str, int], username: str) -> None:
"""
@@ -862,17 +953,8 @@ class MessageChain(ChainBase):
))
return
# 发送处理中消息
self.post_message(Notification(
channel=channel,
source=source,
userid=userid,
username=username,
title="MoviePilot助手已收到您的请求请稍候..."
))
# 生成会话ID
session_id = f"user_{userid}_{hash(user_message) % 10000}"
# 生成或复用会话ID
session_id = self._get_or_create_session_id(userid)
# 在事件循环中处理
try:

View File

@@ -5,6 +5,7 @@ from typing import Any, Union, Dict, Optional
from app.chain import ChainBase
from app.chain.download import DownloadChain
from app.chain.message import MessageChain
from app.chain.site import SiteChain
from app.chain.subscribe import SubscribeChain
from app.chain.system import SystemChain
@@ -140,6 +141,12 @@ class Command(metaclass=Singleton):
"description": "当前版本",
"category": "管理",
"data": {}
},
"/clear_session": {
"func": MessageChain().remote_clear_session,
"description": "清除会话",
"category": "管理",
"data": {}
}
}
# 插件命令集合

View File

@@ -29,6 +29,12 @@ class SiteOper(DbOper):
"""
return Site.get(self._db, sid)
async def async_get(self, sid: int) -> Site:
"""
异步查询单个站点
"""
return await Site.async_get(self._db, sid)
def list(self) -> List[Site]:
"""
获取站点列表

View File

@@ -283,8 +283,7 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]):
image=message.image, userid=userid, link=message.link,
buttons=message.buttons,
original_message_id=message.original_message_id,
original_chat_id=message.original_chat_id,
escape_markdown=kwargs.get("escape_markdown"))
original_chat_id=message.original_chat_id)
def post_medias_message(self, message: Notification, medias: List[MediaInfo]) -> None:
"""

View File

@@ -31,8 +31,7 @@ class Telegram:
_callback_handlers: Dict[str, Callable] = {} # 存储回调处理器
_user_chat_mapping: Dict[str, str] = {} # userid -> chat_id mapping for reply targeting
_bot_username: Optional[str] = None # Bot username for mention detection
_escape_chars = r'_*[]()~`>#+-=|{}.!' # Telegram MarkdownV2
_markdown_escape_pattern = re.compile(f'([{re.escape(_escape_chars)}])') # Telegram MarkdownV2 规则转义特殊字符正则pattern
def __init__(self, TELEGRAM_TOKEN: Optional[str] = None, TELEGRAM_CHAT_ID: Optional[str] = None, **kwargs):
"""
初始化参数
@@ -53,7 +52,7 @@ class Telegram:
else:
apihelper.proxy = settings.PROXY
# bot
_bot = telebot.TeleBot(self._telegram_token, parse_mode="MarkdownV2")
_bot = telebot.TeleBot(self._telegram_token, parse_mode="Markdown")
# 记录句柄
self._bot = _bot
# 获取并存储bot用户名用于@检测
@@ -216,8 +215,7 @@ class Telegram:
userid: Optional[str] = None, link: Optional[str] = None,
buttons: Optional[List[List[dict]]] = None,
original_message_id: Optional[int] = None,
original_chat_id: Optional[str] = None,
escape_markdown: bool = True) -> Optional[bool]:
original_chat_id: Optional[str] = None) -> Optional[bool]:
"""
发送Telegram消息
:param title: 消息标题
@@ -228,7 +226,6 @@ class Telegram:
:param buttons: 按钮列表,格式:[[{"text": "按钮文本", "callback_data": "回调数据"}]]
:param original_message_id: 原消息ID如果提供则编辑原消息
:param original_chat_id: 原消息的聊天ID编辑消息时需要
:param escape_markdown: 是否对内容进行Markdown转义
"""
if not self._telegram_token or not self._telegram_chat_id:
@@ -239,20 +236,10 @@ class Telegram:
return False
try:
if title:
# 标题总是转义因为通常标题不包含Markdown格式
title = self.escape_markdown(title)
if text:
if escape_markdown:
# 完全转义模式:转义所有特殊字符
text = self.escape_markdown(text)
else:
# 智能转义模式保留Markdown格式只转义普通文本中的特殊字符
text = self.escape_markdown_smart(text)
if title:
caption = f"*{title}*\n{text}"
else:
caption = text
# 对text进行Markdown特殊字符转义
text = re.sub(r"([_`])", r"\\\1", text)
caption = f"*{title}*\n{text}"
else:
caption = f"*{title}*"
@@ -512,7 +499,7 @@ class Telegram:
if image:
# 如果有图片使用edit_message_media
media = InputMediaPhoto(media=image, caption=text, parse_mode="MarkdownV2")
media = InputMediaPhoto(media=image, caption=text, parse_mode="Markdown")
self._bot.edit_message_media(
chat_id=chat_id,
message_id=message_id,
@@ -525,7 +512,7 @@ class Telegram:
chat_id=chat_id,
message_id=message_id,
text=text,
parse_mode="MarkdownV2",
parse_mode="Markdown",
reply_markup=reply_markup
)
return True
@@ -555,7 +542,7 @@ class Telegram:
ret = self._bot.send_photo(chat_id=userid or self._telegram_chat_id,
photo=photo,
caption=caption,
parse_mode="MarkdownV2",
parse_mode="Markdown",
reply_markup=reply_markup)
if ret is None:
raise RetryException("发送图片消息失败")
@@ -566,12 +553,12 @@ class Telegram:
for i in range(0, len(caption), 4095):
ret = self._bot.send_message(chat_id=userid or self._telegram_chat_id,
text=caption[i:i + 4095],
parse_mode="MarkdownV2",
parse_mode="Markdown",
reply_markup=reply_markup if i == 0 else None)
else:
ret = self._bot.send_message(chat_id=userid or self._telegram_chat_id,
text=caption,
parse_mode="MarkdownV2",
parse_mode="Markdown",
reply_markup=reply_markup)
if ret is None:
raise RetryException("发送文本消息失败")
@@ -610,84 +597,3 @@ class Telegram:
self._bot.stop_polling()
self._polling_thread.join()
logger.info("Telegram消息接收服务已停止")
def escape_markdown(self, text: str) -> str:
# 按 Telegram MarkdownV2 规则转义特殊字符
if not isinstance(text, str):
return str(text) if text is not None else ""
return self._markdown_escape_pattern.sub(r'\\\1', text)
def escape_markdown_smart(self, text: str) -> str:
"""
智能转义Markdown文本只转义不在Markdown标记内的特殊字符
这样可以保留已有的Markdown格式如*粗体*、_斜体_、[链接](url)等),
同时转义普通文本中的特殊字符以避免API错误
注意Telegram MarkdownV2不支持以下语法这些字符会被转义
- 标题语法(#、##、###)会被转义为 \#、\##、\###
- 列表语法(-、*、+)会被转义为 \-、\*、\+
- 引用语法(>)会被转义为 \>
建议使用加粗文本模拟标题:*标题文本*
:param text: 要转义的文本
:return: 转义后的文本
"""
if not isinstance(text, str):
return str(text) if text is not None else ""
# 如果没有特殊字符,直接返回
if not any(char in self._escape_chars for char in text):
return text
# 标记受保护的区域Markdown标记内的内容不转义
protected = [False] * len(text)
# 按优先级匹配Markdown标记从最复杂到最简单
# 1. 链接:[text](url) - 必须最先匹配
link_pattern = r'\[([^\]]*)\]\(([^)]*)\)'
for match in re.finditer(link_pattern, text):
for i in range(match.start(), match.end()):
protected[i] = True
# 2. 粗体:*text*(单个*,不是**
bold_pattern = r'(?<!\*)\*(?!\*)([^*]+?)(?<!\*)\*(?!\*)'
for match in re.finditer(bold_pattern, text):
if not any(protected[match.start():match.end()]):
for i in range(match.start(), match.end()):
protected[i] = True
# 3. 斜体_text_单个_不是__
italic_pattern = r'(?<!_)_(?!_)([^_]+?)(?<!_)_(?!_)'
for match in re.finditer(italic_pattern, text):
if not any(protected[match.start():match.end()]):
for i in range(match.start(), match.end()):
protected[i] = True
# 4. 代码:`text`
code_pattern = r'`([^`]+)`'
for match in re.finditer(code_pattern, text):
if not any(protected[match.start():match.end()]):
for i in range(match.start(), match.end()):
protected[i] = True
# 5. 删除线:~text~
strikethrough_pattern = r'~([^~]+)~'
for match in re.finditer(strikethrough_pattern, text):
if not any(protected[match.start():match.end()]):
for i in range(match.start(), match.end()):
protected[i] = True
# 构建结果:只转义未保护区域的特殊字符
result = []
for i, char in enumerate(text):
if protected[i]:
# 受保护区域Markdown标记内不转义
result.append(char)
elif char in self._escape_chars:
# 未保护区域,转义特殊字符
result.append('\\' + char)
else:
result.append(char)
return ''.join(result)

View File

@@ -37,6 +37,7 @@ beautifulsoup4~=4.13.4
pillow~=11.2.1
pillow-avif-plugin~=1.5.2
pyTelegramBotAPI~=4.27.0
telegramify-markdown~=0.5.2
playwright~=1.53.0
cf_clearance~=0.31.0
torrentool~=1.2.0

View File

@@ -1,2 +1,2 @@
APP_VERSION = 'v2.8.2'
FRONTEND_VERSION = 'v2.8.2'
APP_VERSION = 'v2.8.3'
FRONTEND_VERSION = 'v2.8.3'