diff --git a/app/agent/tools/factory.py b/app/agent/tools/factory.py index 23b1aefa..1503827a 100644 --- a/app/agent/tools/factory.py +++ b/app/agent/tools/factory.py @@ -75,6 +75,8 @@ from app.agent.tools.impl.run_slash_command import RunSlashCommandTool from app.agent.tools.impl.list_slash_commands import ListSlashCommandsTool from app.agent.tools.impl.query_custom_identifiers import QueryCustomIdentifiersTool from app.agent.tools.impl.update_custom_identifiers import UpdateCustomIdentifiersTool +from app.agent.tools.impl.query_system_settings import QuerySystemSettingsTool +from app.agent.tools.impl.update_system_settings import UpdateSystemSettingsTool from app.core.plugin import PluginManager from app.log import logger from app.schemas.message import ChannelCapabilityManager @@ -218,6 +220,8 @@ class MoviePilotToolFactory: ListSlashCommandsTool, QueryCustomIdentifiersTool, UpdateCustomIdentifiersTool, + QuerySystemSettingsTool, + UpdateSystemSettingsTool, ] if MoviePilotToolFactory._should_enable_choice_tool(channel): tool_definitions.append(AskUserChoiceTool) diff --git a/app/agent/tools/impl/_system_setting_utils.py b/app/agent/tools/impl/_system_setting_utils.py new file mode 100644 index 00000000..c434aa8b --- /dev/null +++ b/app/agent/tools/impl/_system_setting_utils.py @@ -0,0 +1,331 @@ +"""系统设置工具共用的键解析与分组元数据。""" + +from dataclasses import dataclass +from typing import Optional + +from app.core.config import Settings +from app.schemas.types import SystemConfigKey + + +@dataclass(frozen=True) +class SettingSpec: + """描述一个可被 Agent 读写的系统设置项。""" + + key: str + source: str + group: str + label: str + + +SYSTEMCONFIG_SETTING_METADATA = { + SystemConfigKey.Downloaders.value: { + "group": "downloaders", + "label": "下载器配置", + }, + SystemConfigKey.MediaServers.value: { + "group": "media_servers", + "label": "媒体服务器配置", + }, + SystemConfigKey.Notifications.value: { + "group": "notifications", + "label": "消息通知配置", + }, + SystemConfigKey.NotificationSwitchs.value: { + "group": "notification_switches", + "label": "通知场景开关", + }, + SystemConfigKey.Directories.value: { + "group": "directories", + "label": "目录配置", + }, + SystemConfigKey.Storages.value: { + "group": "storages", + "label": "存储配置", + }, + SystemConfigKey.IndexerSites.value: { + "group": "search_sites", + "label": "搜索站点范围", + }, + SystemConfigKey.RssSites.value: { + "group": "subscribe_sites", + "label": "订阅站点范围", + }, + SystemConfigKey.UserSiteAuthParams.value: { + "group": "site_auth", + "label": "站点认证参数", + }, + SystemConfigKey.AIAgentConfig.value: { + "group": "ai_agent", + "label": "AI 智能体配置", + }, + SystemConfigKey.CustomIdentifiers.value: { + "group": "custom_identifiers", + "label": "自定义识别词", + }, + SystemConfigKey.CustomReleaseGroups.value: { + "group": "customization", + "label": "自定义制作组/字幕组", + }, + SystemConfigKey.Customization.value: { + "group": "customization", + "label": "自定义占位符", + }, + SystemConfigKey.TransferExcludeWords.value: { + "group": "transfer", + "label": "整理屏蔽词", + }, + SystemConfigKey.TorrentsPriority.value: { + "group": "filter_rules", + "label": "种子优先级规则", + }, + SystemConfigKey.CustomFilterRules.value: { + "group": "filter_rules", + "label": "用户自定义规则", + }, + SystemConfigKey.UserFilterRuleGroups.value: { + "group": "filter_rules", + "label": "用户规则组", + }, + SystemConfigKey.SearchFilterRuleGroups.value: { + "group": "filter_rules", + "label": "搜索默认过滤规则组", + }, + SystemConfigKey.SubscribeFilterRuleGroups.value: { + "group": "filter_rules", + "label": "订阅默认过滤规则组", + }, + SystemConfigKey.BestVersionFilterRuleGroups.value: { + "group": "filter_rules", + "label": "洗版默认过滤规则组", + }, + SystemConfigKey.SubscribeDefaultParams.value: { + "group": "subscribe_defaults", + "label": "订阅默认参数", + }, + SystemConfigKey.DefaultMovieSubscribeConfig.value: { + "group": "subscribe_defaults", + "label": "默认电影订阅规则", + }, + SystemConfigKey.DefaultTvSubscribeConfig.value: { + "group": "subscribe_defaults", + "label": "默认电视剧订阅规则", + }, + SystemConfigKey.UserInstalledPlugins.value: { + "group": "plugins", + "label": "已安装插件列表", + }, + SystemConfigKey.PluginFolders.value: { + "group": "plugins", + "label": "插件文件夹分组配置", + }, + SystemConfigKey.PluginInstallReport.value: { + "group": "plugins", + "label": "插件安装统计", + }, + SystemConfigKey.NotificationSendTime.value: { + "group": "notifications", + "label": "通知发送时间", + }, + SystemConfigKey.NotificationTemplates.value: { + "group": "notifications", + "label": "通知模板", + }, + SystemConfigKey.ScrapingSwitchs.value: { + "group": "scraping", + "label": "刮削开关设置", + }, + SystemConfigKey.FollowSubscribers.value: { + "group": "subscribe_sites", + "label": "Follow 订阅分享者", + }, +} + + +LIST_ITEM_MATCH_FIELD_DEFAULTS = { + SystemConfigKey.Downloaders.value: "name", + SystemConfigKey.MediaServers.value: "name", + SystemConfigKey.Notifications.value: "name", + SystemConfigKey.NotificationSwitchs.value: "type", + SystemConfigKey.Directories.value: "name", + SystemConfigKey.Storages.value: "name", +} + + +GROUP_ALIASES = { + "all": "all", + "全部": "all", + "settings": "settings", + "basic": "settings", + "基础设置": "settings", + "基础配置": "settings", + "systemconfig": "systemconfig", + "system_config": "systemconfig", + "系统设置": "systemconfig", + "系统配置": "systemconfig", + "downloaders": "downloaders", + "downloader": "downloaders", + "下载器": "downloaders", + "media_servers": "media_servers", + "mediaservers": "media_servers", + "media-servers": "media_servers", + "媒体服务器": "media_servers", + "notifications": "notifications", + "notification": "notifications", + "消息通知": "notifications", + "通知": "notifications", + "notification_switches": "notification_switches", + "notification_switchs": "notification_switches", + "通知开关": "notification_switches", + "storages": "storages", + "storage": "storages", + "存储": "storages", + "directories": "directories", + "directory": "directories", + "目录": "directories", + "search_sites": "search_sites", + "indexer_sites": "search_sites", + "搜索站点": "search_sites", + "subscribe_sites": "subscribe_sites", + "rss_sites": "subscribe_sites", + "订阅站点": "subscribe_sites", + "site_auth": "site_auth", + "site_auth_params": "site_auth", + "站点认证": "site_auth", + "ai_agent": "ai_agent", + "agent": "ai_agent", + "智能体": "ai_agent", + "custom_identifiers": "custom_identifiers", + "自定义识别词": "custom_identifiers", + "filter_rules": "filter_rules", + "过滤规则": "filter_rules", + "subscribe_defaults": "subscribe_defaults", + "订阅默认": "subscribe_defaults", + "plugins": "plugins", + "插件": "plugins", + "customization": "customization", + "自定义": "customization", + "transfer": "transfer", + "整理": "transfer", + "scraping": "scraping", + "刮削": "scraping", + "misc": "misc", + "其他": "misc", +} + + +def _normalize_token(value: str) -> str: + return str(value).strip().lower().replace("-", "_") + + +def _build_specs() -> tuple[dict[str, SettingSpec], dict[str, SettingSpec]]: + core_specs = { + key: SettingSpec(key=key, source="settings", group="settings", label=key) + for key in Settings.model_fields.keys() + } + system_specs = {} + for item in SystemConfigKey: + metadata = SYSTEMCONFIG_SETTING_METADATA.get(item.value, {}) + system_specs[item.value] = SettingSpec( + key=item.value, + source="systemconfig", + group=metadata.get("group", "misc"), + label=metadata.get("label", item.value), + ) + return core_specs, system_specs + + +CORE_SETTING_SPECS, SYSTEMCONFIG_SETTING_SPECS = _build_specs() +ALL_SETTING_SPECS = {**CORE_SETTING_SPECS, **SYSTEMCONFIG_SETTING_SPECS} + + +SETTING_KEY_ALIASES = {} +for key in CORE_SETTING_SPECS: + SETTING_KEY_ALIASES[_normalize_token(key)] = key +for item in SystemConfigKey: + SETTING_KEY_ALIASES[_normalize_token(item.value)] = item.value + SETTING_KEY_ALIASES[_normalize_token(item.name)] = item.value + +SINGLE_KEY_GROUP_ALIASES = { + _normalize_token(alias): next( + ( + spec.key + for spec in SYSTEMCONFIG_SETTING_SPECS.values() + if spec.group == canonical_group + ), + None, + ) + for alias, canonical_group in GROUP_ALIASES.items() + if canonical_group not in {"all", "settings", "systemconfig"} + and len( + [ + spec.key + for spec in SYSTEMCONFIG_SETTING_SPECS.values() + if spec.group == canonical_group + ] + ) + == 1 +} + + +def normalize_group(group: Optional[str]) -> str: + if not group: + return "all" + normalized = GROUP_ALIASES.get(_normalize_token(group)) + if not normalized: + raise ValueError( + "group 不支持,支持值包括 all/settings/systemconfig 以及" + " downloaders、media_servers、notifications、storages、directories、" + "search_sites、subscribe_sites、site_auth、ai_agent 等分类别名" + ) + return normalized + + +def resolve_setting_spec(setting_key: Optional[str]) -> Optional[SettingSpec]: + """把精确键名、枚举名或单键分组别名解析为统一的设置定义。""" + + if not setting_key: + return None + + normalized = _normalize_token(setting_key) + resolved_key = SETTING_KEY_ALIASES.get(normalized) or SINGLE_KEY_GROUP_ALIASES.get( + normalized + ) + if not resolved_key: + return None + return ALL_SETTING_SPECS.get(resolved_key) + + +def list_setting_specs( + group: Optional[str] = "all", keyword: Optional[str] = None +) -> list[SettingSpec]: + """按分组和关键字筛选可查询的设置项。""" + + normalized_group = normalize_group(group) + if normalized_group == "all": + specs = list(ALL_SETTING_SPECS.values()) + elif normalized_group == "settings": + specs = list(CORE_SETTING_SPECS.values()) + elif normalized_group == "systemconfig": + specs = list(SYSTEMCONFIG_SETTING_SPECS.values()) + else: + specs = [ + spec + for spec in SYSTEMCONFIG_SETTING_SPECS.values() + if spec.group == normalized_group + ] + + if keyword: + normalized_keyword = _normalize_token(keyword) + specs = [ + spec + for spec in specs + if normalized_keyword in _normalize_token(spec.key) + or normalized_keyword in _normalize_token(spec.group) + or normalized_keyword in _normalize_token(spec.label) + ] + + return sorted(specs, key=lambda spec: (spec.source, spec.group, spec.key)) + + +def get_default_list_match_field(setting_key: str) -> Optional[str]: + return LIST_ITEM_MATCH_FIELD_DEFAULTS.get(setting_key) diff --git a/app/agent/tools/impl/query_custom_identifiers.py b/app/agent/tools/impl/query_custom_identifiers.py index ba8876b1..81a600bb 100644 --- a/app/agent/tools/impl/query_custom_identifiers.py +++ b/app/agent/tools/impl/query_custom_identifiers.py @@ -27,6 +27,7 @@ class QueryCustomIdentifiersTool(MoviePilotTool): "Returns the list of identifier rules used for preprocessing torrent/file names before media recognition. " "Use this tool to check existing rules before adding new ones to avoid duplicates." ) + require_admin: bool = True args_schema: Type[BaseModel] = QueryCustomIdentifiersInput def get_tool_message(self, **kwargs) -> Optional[str]: diff --git a/app/agent/tools/impl/query_directory_settings.py b/app/agent/tools/impl/query_directory_settings.py index f7c16722..1671efbe 100644 --- a/app/agent/tools/impl/query_directory_settings.py +++ b/app/agent/tools/impl/query_directory_settings.py @@ -24,6 +24,7 @@ class QueryDirectorySettingsInput(BaseModel): class QueryDirectorySettingsTool(MoviePilotTool): name: str = "query_directory_settings" description: str = "Query system directory configuration settings (NOT file listings). Returns configured directory paths, storage types, transfer modes, and other directory-related settings. Use 'list_directory' to list actual files and folders in a directory." + require_admin: bool = True args_schema: Type[BaseModel] = QueryDirectorySettingsInput def get_tool_message(self, **kwargs) -> Optional[str]: diff --git a/app/agent/tools/impl/query_downloaders.py b/app/agent/tools/impl/query_downloaders.py index 6113b7b6..f78feed7 100644 --- a/app/agent/tools/impl/query_downloaders.py +++ b/app/agent/tools/impl/query_downloaders.py @@ -19,6 +19,7 @@ class QueryDownloadersInput(BaseModel): class QueryDownloadersTool(MoviePilotTool): name: str = "query_downloaders" description: str = "Query downloader configuration and list all available downloaders. Shows downloader status, connection details, and configuration settings." + require_admin: bool = True args_schema: Type[BaseModel] = QueryDownloadersInput def get_tool_message(self, **kwargs) -> Optional[str]: diff --git a/app/agent/tools/impl/query_system_settings.py b/app/agent/tools/impl/query_system_settings.py new file mode 100644 index 00000000..25bbca8c --- /dev/null +++ b/app/agent/tools/impl/query_system_settings.py @@ -0,0 +1,186 @@ +"""统一查询系统设置工具。""" + +import json +from typing import Optional, Type + +from pydantic import BaseModel, Field + +from app.agent.tools.base import MoviePilotTool +from app.agent.tools.impl._system_setting_utils import ( + SettingSpec, + list_setting_specs, + resolve_setting_spec, +) +from app.core.config import settings +from app.db.systemconfig_oper import SystemConfigOper +from app.log import logger + + +class QuerySystemSettingsInput(BaseModel): + """查询系统设置工具的输入参数模型。""" + + explanation: str = Field( + ..., + description="Clear explanation of why this tool is being used in the current context", + ) + setting_key: Optional[str] = Field( + None, + description=( + "Exact setting key to query. Supports Settings field names like 'APP_DOMAIN' or 'TMDB_API_KEY', " + "SystemConfigKey values like 'Downloaders' or 'MediaServers', enum names, and some single-key aliases " + "such as 'downloaders', 'directories', 'search_sites', 'subscribe_sites', 'site_auth', 'ai_agent', " + "and 'custom_identifiers'." + ), + ) + group: Optional[str] = Field( + "all", + description=( + "Optional group filter when setting_key is not provided. Supports 'all', 'settings', 'systemconfig', " + "and category aliases such as 'downloaders', 'media_servers', 'notifications', 'notification_switches', " + "'storages', 'directories', 'search_sites', 'subscribe_sites', 'site_auth', 'ai_agent', 'filter_rules', " + "'subscribe_defaults', 'plugins', and 'custom_identifiers'. Chinese aliases are also accepted." + ), + ) + keyword: Optional[str] = Field( + None, + description=( + "Optional keyword used to fuzzy match setting keys, group names, or labels when listing settings." + ), + ) + include_values: Optional[bool] = Field( + None, + description=( + "Whether to include full setting values. Default behavior: when a single setting is matched it returns the full value; " + "when multiple settings are matched it returns summaries only unless this is explicitly set to true." + ), + ) + + +class QuerySystemSettingsTool(MoviePilotTool): + name: str = "query_system_settings" + description: str = ( + "Query system settings across both the basic Settings module and all SystemConfig-backed categories. " + "Use this tool to inspect downloaders, media servers, notification channels, storages, directories, search-site ranges, " + "subscribe-site ranges, site auth params, AI agent config, and any other system setting before making changes." + ) + require_admin: bool = True + args_schema: Type[BaseModel] = QuerySystemSettingsInput + + def get_tool_message(self, **kwargs) -> Optional[str]: + """根据查询参数生成友好的提示消息。""" + + setting_key = kwargs.get("setting_key") + group = kwargs.get("group", "all") + keyword = kwargs.get("keyword") + if setting_key: + return f"查询系统设置: {setting_key}" + if keyword: + return f"筛选系统设置: {group} / {keyword}" + return f"查询系统设置分组: {group}" + + @staticmethod + def _load_setting_value(spec: SettingSpec): + if spec.source == "settings": + return getattr(settings, spec.key) + return SystemConfigOper().get(spec.key) + + @staticmethod + def _summarize_value(value) -> dict: + summary = { + "has_value": value is not None, + "value_type": type(value).__name__, + } + if isinstance(value, list): + summary["item_count"] = len(value) + if value: + summary["item_type"] = type(value[0]).__name__ + elif isinstance(value, dict): + keys = list(value.keys()) + summary["item_count"] = len(keys) + summary["keys_preview"] = keys[:10] + if len(keys) > 10: + summary["keys_truncated"] = True + elif isinstance(value, str): + summary["length"] = len(value) + preview = value[:200] + if preview: + summary["value_preview"] = preview + if len(value) > len(preview): + summary["value_truncated"] = True + elif value is not None: + summary["value_preview"] = value + return summary + + async def run( + self, + setting_key: Optional[str] = None, + group: Optional[str] = "all", + keyword: Optional[str] = None, + include_values: Optional[bool] = None, + **kwargs, + ) -> str: + logger.info( + "执行工具: %s, setting_key=%s, group=%s, keyword=%s", + self.name, + setting_key, + group, + keyword, + ) + + try: + if setting_key: + spec = resolve_setting_spec(setting_key) + if not spec: + return json.dumps( + { + "success": False, + "message": f"系统设置项 '{setting_key}' 不存在", + }, + ensure_ascii=False, + ) + specs = [spec] + else: + specs = list_setting_specs(group=group, keyword=keyword) + if not specs: + return json.dumps( + { + "success": False, + "message": "没有找到匹配的系统设置项", + }, + ensure_ascii=False, + ) + + should_include_values = ( + include_values if include_values is not None else len(specs) == 1 + ) + settings_payload = [] + for spec in specs: + value = self._load_setting_value(spec) + item = { + "setting_key": spec.key, + "source": spec.source, + "group": spec.group, + "label": spec.label, + } + item.update(self._summarize_value(value)) + if should_include_values: + item["value"] = value + settings_payload.append(item) + + return json.dumps( + { + "success": True, + "matched_count": len(settings_payload), + "include_values": should_include_values, + "settings": settings_payload, + }, + ensure_ascii=False, + indent=2, + default=str, + ) + except Exception as e: + logger.error(f"查询系统设置失败: {e}", exc_info=True) + return json.dumps( + {"success": False, "message": f"查询系统设置时发生错误: {str(e)}"}, + ensure_ascii=False, + ) diff --git a/app/agent/tools/impl/update_custom_identifiers.py b/app/agent/tools/impl/update_custom_identifiers.py index 7aaaad9c..886b1bcc 100644 --- a/app/agent/tools/impl/update_custom_identifiers.py +++ b/app/agent/tools/impl/update_custom_identifiers.py @@ -52,6 +52,7 @@ class UpdateCustomIdentifiersTool(MoviePilotTool): "Lines starting with '#' are comments. " "The replacement target supports: {[tmdbid=xxx;type=movie/tv;s=xxx;e=xxx]} for direct TMDB ID matching." ) + require_admin: bool = True args_schema: Type[BaseModel] = UpdateCustomIdentifiersInput def get_tool_message(self, **kwargs) -> Optional[str]: diff --git a/app/agent/tools/impl/update_system_settings.py b/app/agent/tools/impl/update_system_settings.py new file mode 100644 index 00000000..ef34f388 --- /dev/null +++ b/app/agent/tools/impl/update_system_settings.py @@ -0,0 +1,305 @@ +"""统一更新系统设置工具。""" + +import copy +import json +from typing import Any, Literal, Optional, Type, Union + +from pydantic import BaseModel, Field + +from app.agent.tools.base import MoviePilotTool +from app.agent.tools.impl._system_setting_utils import ( + SettingSpec, + get_default_list_match_field, + resolve_setting_spec, +) +from app.core.config import settings +from app.core.event import eventmanager +from app.db.systemconfig_oper import SystemConfigOper +from app.log import logger +from app.schemas.event import ConfigChangeEventData +from app.schemas.types import EventType + +SettingValue = Optional[Union[list, dict, bool, int, float, str]] + + +class UpdateSystemSettingsInput(BaseModel): + """更新系统设置工具的输入参数模型。""" + + explanation: str = Field( + ..., + description="Clear explanation of why this tool is being used in the current context", + ) + setting_key: str = Field( + ..., + description=( + "Exact setting key to update. Supports Settings field names, SystemConfigKey values, enum names, and common aliases " + "such as 'downloaders', 'directories', 'search_sites', 'subscribe_sites', 'site_auth', 'ai_agent', and 'custom_identifiers'." + ), + ) + value: SettingValue = Field( + None, + description=( + "The new value or list item payload. For replace: this becomes the entire setting value. For merge_dict: this should be a dict of keys to merge. " + "For upsert_list_item/remove_list_item: this can be a dict item or a scalar list item." + ), + ) + operation: Literal[ + "replace", + "merge_dict", + "upsert_list_item", + "remove_list_item", + ] = Field( + "replace", + description=( + "Update operation. replace replaces the whole value; merge_dict merges dict keys (optionally with remove_keys); " + "upsert_list_item inserts or replaces one item inside a list; remove_list_item removes one item from a list." + ), + ) + remove_keys: Optional[list[str]] = Field( + None, + description="Optional dict keys to delete when operation is merge_dict.", + ) + match_field: Optional[str] = Field( + None, + description=( + "Optional match field for list item upsert/remove. If omitted, common SystemConfig categories use built-in defaults such as 'name' or 'type'." + ), + ) + match_value: SettingValue = Field( + None, + description=( + "Optional explicit value used to locate a list item when operation is upsert_list_item or remove_list_item." + ), + ) + + +class UpdateSystemSettingsTool(MoviePilotTool): + name: str = "update_system_settings" + description: str = ( + "Update system settings across both the basic Settings module and all SystemConfig-backed categories. " + "Supports full replacement, shallow dict merge, and generic list item upsert/remove so the agent can manage downloaders, media servers, notification channels, storages, directories, search-site ranges, subscribe-site ranges, site auth params, AI agent config, and other system settings through one tool." + ) + require_admin: bool = True + args_schema: Type[BaseModel] = UpdateSystemSettingsInput + + def get_tool_message(self, **kwargs) -> Optional[str]: + """根据更新参数生成友好的提示消息。""" + + setting_key = kwargs.get("setting_key", "") + operation = kwargs.get("operation", "replace") + action_map = { + "replace": "覆盖系统设置", + "merge_dict": "合并系统设置", + "upsert_list_item": "更新列表项", + "remove_list_item": "移除列表项", + } + return f"{action_map.get(operation, '更新系统设置')}: {setting_key}" + + @staticmethod + def _load_setting_value(spec: SettingSpec): + if spec.source == "settings": + return getattr(settings, spec.key) + return SystemConfigOper().get(spec.key) + + @staticmethod + def _normalize_systemconfig_value(value: Any): + if isinstance(value, list): + filtered = [item for item in value if item is not None] + return filtered or None + return value + + @staticmethod + def _resolve_list_match( + spec: SettingSpec, + operation: str, + value: Any, + match_field: Optional[str], + match_value: Any, + ) -> tuple[Optional[str], Any]: + resolved_field = match_field or get_default_list_match_field(spec.key) + resolved_value = match_value + + if isinstance(value, dict): + if not resolved_field: + raise ValueError( + f"{operation} 需要提供 match_field,或使用带默认匹配字段的系统配置项" + ) + if resolved_value is None: + resolved_value = value.get(resolved_field) + if resolved_value is None: + raise ValueError( + f"{operation} 缺少匹配值,请在 value.{resolved_field} 或 match_value 中提供" + ) + else: + if resolved_value is None: + resolved_value = value + + return resolved_field, resolved_value + + @classmethod + def _prepare_next_value( + cls, + spec: SettingSpec, + current_value: Any, + value: Any, + operation: str, + remove_keys: Optional[list[str]] = None, + match_field: Optional[str] = None, + match_value: Any = None, + ) -> Any: + remove_keys = remove_keys or [] + if operation == "replace": + return value + + if operation == "merge_dict": + if remove_keys and not isinstance(remove_keys, list): + raise ValueError("remove_keys 必须是字符串列表") + if current_value is not None and not isinstance(current_value, dict): + raise ValueError("merge_dict 仅支持当前值为 dict 的设置项") + if value is not None and not isinstance(value, dict): + raise ValueError("merge_dict 的 value 必须是 dict 或 null") + next_value = dict(current_value or {}) + if value: + next_value.update(value) + for key in remove_keys: + next_value.pop(key, None) + return next_value + + if operation not in {"upsert_list_item", "remove_list_item"}: + raise ValueError(f"不支持的操作: {operation}") + + if current_value is not None and not isinstance(current_value, list): + raise ValueError(f"{operation} 仅支持当前值为 list 的设置项") + + next_items = list(copy.deepcopy(current_value or [])) + resolved_field, resolved_match_value = cls._resolve_list_match( + spec, operation, value, match_field, match_value + ) + + if operation == "upsert_list_item": + if value is None: + raise ValueError("upsert_list_item 必须提供 value") + replaced = False + for index, item in enumerate(next_items): + if resolved_field: + if isinstance(item, dict) and item.get(resolved_field) == resolved_match_value: + next_items[index] = value + replaced = True + break + elif item == resolved_match_value: + next_items[index] = value + replaced = True + break + if not replaced: + next_items.append(value) + return next_items + + return [ + item + for item in next_items + if not ( + isinstance(item, dict) + and resolved_field + and item.get(resolved_field) == resolved_match_value + ) + and not (not resolved_field and item == resolved_match_value) + ] + + async def run( + self, + setting_key: str, + value: SettingValue = None, + operation: str = "replace", + remove_keys: Optional[list[str]] = None, + match_field: Optional[str] = None, + match_value: SettingValue = None, + **kwargs, + ) -> str: + logger.info( + "执行工具: %s, setting_key=%s, operation=%s", + self.name, + setting_key, + operation, + ) + + try: + spec = resolve_setting_spec(setting_key) + if not spec: + return json.dumps( + { + "success": False, + "message": f"系统设置项 '{setting_key}' 不存在", + }, + ensure_ascii=False, + ) + + current_value = self._load_setting_value(spec) + next_value = self._prepare_next_value( + spec=spec, + current_value=current_value, + value=value, + operation=operation, + remove_keys=remove_keys, + match_field=match_field, + match_value=match_value, + ) + + event_value = next_value + changed = False + message = "" + if spec.source == "settings": + success, message = settings.update_setting(spec.key, next_value) + if success is False: + return json.dumps( + { + "success": False, + "message": message or f"更新设置 {spec.key} 失败", + }, + ensure_ascii=False, + ) + changed = success is True + else: + normalized_value = self._normalize_systemconfig_value(next_value) + event_value = normalized_value + success = await SystemConfigOper().async_set(spec.key, normalized_value) + changed = success is True + + if changed: + await eventmanager.async_send_event( + etype=EventType.ConfigChanged, + data=ConfigChangeEventData( + key=spec.key, + value=event_value, + change_type="update", + ), + ) + + saved_value = self._load_setting_value(spec) + if not changed and not message: + message = "配置值未发生变化" + + return json.dumps( + { + "success": True, + "message": message or f"系统设置 {spec.key} 已更新", + "changed": changed, + "operation": operation, + "setting": { + "setting_key": spec.key, + "source": spec.source, + "group": spec.group, + "label": spec.label, + }, + "previous_value": current_value, + "saved_value": saved_value, + }, + ensure_ascii=False, + indent=2, + default=str, + ) + except Exception as e: + logger.error(f"更新系统设置失败: {e}", exc_info=True) + return json.dumps( + {"success": False, "message": f"更新系统设置时发生错误: {str(e)}"}, + ensure_ascii=False, + ) diff --git a/app/agent/tools/manager.py b/app/agent/tools/manager.py index e1f2981b..65d5f171 100644 --- a/app/agent/tools/manager.py +++ b/app/agent/tools/manager.py @@ -23,7 +23,12 @@ class MoviePilotToolsManager: MoviePilot工具管理器(用于HTTP API) """ - def __init__(self, user_id: str = "api_user", session_id: str = uuid.uuid4()): + def __init__( + self, + user_id: str = "api_user", + session_id: str = uuid.uuid4(), + is_admin: bool = True, + ): """ 初始化工具管理器 @@ -33,6 +38,7 @@ class MoviePilotToolsManager: """ self.user_id = user_id self.session_id = session_id + self.is_admin = is_admin self.tools: List[Any] = [] self._load_tools() @@ -64,6 +70,8 @@ class MoviePilotToolsManager: """ tools_list = [] for tool in self.tools: + if getattr(tool, "_require_admin", False) and not self.is_admin: + continue # 获取工具的输入参数模型 args_schema = getattr(tool, "args_schema", None) if args_schema: @@ -215,6 +223,13 @@ class MoviePilotToolsManager: return normalized + def _check_tool_permission(self, tool_instance: Any) -> Optional[str]: + """为 HTTP/MCP/CLI 入口补齐 require_admin 门禁。""" + + if getattr(tool_instance, "_require_admin", False) and not self.is_admin: + return "抱歉,您没有执行此工具的权限。只有系统管理员才能执行工具操作。" + return None + async def call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> str: """ 调用工具 @@ -235,6 +250,10 @@ class MoviePilotToolsManager: return error_msg try: + permission_error = self._check_tool_permission(tool_instance) + if permission_error: + return json.dumps({"error": permission_error}, ensure_ascii=False) + # 规范化参数类型 normalized_arguments = self._normalize_arguments(tool_instance, arguments) diff --git a/tests/test_agent_system_settings_tools.py b/tests/test_agent_system_settings_tools.py new file mode 100644 index 00000000..df5303c4 --- /dev/null +++ b/tests/test_agent_system_settings_tools.py @@ -0,0 +1,341 @@ +import asyncio +import importlib.util +import json +import sys +import unittest +from dataclasses import dataclass +from enum import Enum +from pathlib import Path +from types import ModuleType +from unittest.mock import AsyncMock, MagicMock, patch + + +ROOT = Path(__file__).resolve().parents[1] + + +def _stub_module(name: str, *, package: bool = False, **attrs): + module = sys.modules.get(name) + if module is None: + module = ModuleType(name) + if package: + module.__path__ = [] + sys.modules[name] = module + for key, value in attrs.items(): + setattr(module, key, value) + return module + + +def _load_module(module_name: str, relative_path: str): + spec = importlib.util.spec_from_file_location(module_name, ROOT / relative_path) + module = importlib.util.module_from_spec(spec) + sys.modules[module_name] = module + assert spec and spec.loader + spec.loader.exec_module(module) + return module + + +class _DummyLogger: + def info(self, *_args, **_kwargs): + pass + + def warning(self, *_args, **_kwargs): + pass + + def error(self, *_args, **_kwargs): + pass + + def debug(self, *_args, **_kwargs): + pass + + +class _DummyMoviePilotTool: + result_max_chars = None + + def __init__(self, session_id: str, user_id: str, **kwargs): + self._session_id = session_id + self._user_id = user_id + self._require_admin = getattr(self.__class__, "require_admin", False) + self.name = getattr(self.__class__, "name", self.__class__.__name__) + + +def _format_tool_result_for_agent(result, **kwargs): + if isinstance(result, str): + return result + return json.dumps(result, ensure_ascii=False, default=str) + + +class _SystemConfigKey(Enum): + Downloaders = "Downloaders" + MediaServers = "MediaServers" + Notifications = "Notifications" + NotificationSwitchs = "NotificationSwitchs" + Directories = "Directories" + Storages = "Storages" + IndexerSites = "IndexerSites" + RssSites = "RssSites" + CustomReleaseGroups = "CustomReleaseGroups" + Customization = "Customization" + CustomIdentifiers = "CustomIdentifiers" + TransferExcludeWords = "TransferExcludeWords" + TorrentsPriority = "TorrentsPriority" + CustomFilterRules = "CustomFilterRules" + UserFilterRuleGroups = "UserFilterRuleGroups" + SearchFilterRuleGroups = "SearchFilterRuleGroups" + SubscribeFilterRuleGroups = "SubscribeFilterRuleGroups" + SubscribeDefaultParams = "SubscribeDefaultParams" + BestVersionFilterRuleGroups = "BestVersionFilterRuleGroups" + SubscribeReport = "SubscribeReport" + UserCustomCSS = "UserCustomCSS" + UserInstalledPlugins = "UserInstalledPlugins" + PluginFolders = "PluginFolders" + DefaultMovieSubscribeConfig = "DefaultMovieSubscribeConfig" + DefaultTvSubscribeConfig = "DefaultTvSubscribeConfig" + UserSiteAuthParams = "UserSiteAuthParams" + FollowSubscribers = "FollowSubscribers" + NotificationSendTime = "NotificationSendTime" + AIAgentConfig = "AIAgentConfig" + NotificationTemplates = "NotificationTemplates" + ScrapingSwitchs = "ScrapingSwitchs" + PluginInstallReport = "PluginInstallReport" + SetupWizardState = "SetupWizardState" + UgreenSessionCache = "UgreenSessionCache" + + +class _EventType(Enum): + ConfigChanged = "ConfigChanged" + + +class _DummySettingsModel: + model_fields = { + "APP_DOMAIN": object(), + "TMDB_API_KEY": object(), + } + + +class _DummySettings: + APP_DOMAIN = "https://old.example.com" + TMDB_API_KEY = "demo-token" + + def update_setting(self, key, value): + setattr(self, key, value) + return True, "" + + +class _DummySystemConfigOper: + def get(self, key): + return None + + async def async_set(self, key, value): + return True + + +class _DummyEventManager: + async def async_send_event(self, *args, **kwargs): + return None + + +@dataclass +class _ConfigChangeEventData: + key: object + value: object = None + change_type: str = "update" + + +class _StubToolFactory: + @staticmethod + def create_tools(*args, **kwargs): + return [] + + +for _package_name in ( + "app", + "app.agent", + "app.agent.tools", + "app.agent.tools.impl", + "app.core", + "app.db", + "app.schemas", +): + _stub_module(_package_name, package=True) + +_stub_module( + "app.agent.tools.base", + MoviePilotTool=_DummyMoviePilotTool, + format_tool_result_for_agent=_format_tool_result_for_agent, +) +_stub_module( + "app.core.config", + Settings=_DummySettingsModel, + settings=_DummySettings(), +) +_stub_module("app.db.systemconfig_oper", SystemConfigOper=_DummySystemConfigOper) +_stub_module("app.log", logger=_DummyLogger()) +_stub_module("app.core.event", eventmanager=_DummyEventManager()) +_stub_module("app.schemas.event", ConfigChangeEventData=_ConfigChangeEventData) +_stub_module( + "app.schemas.types", + SystemConfigKey=_SystemConfigKey, + EventType=_EventType, +) +_stub_module("app.agent.tools.factory", MoviePilotToolFactory=_StubToolFactory) + +_load_module( + "app.agent.tools.impl._system_setting_utils", + "app/agent/tools/impl/_system_setting_utils.py", +) +query_module = _load_module( + "app.agent.tools.impl.query_system_settings", + "app/agent/tools/impl/query_system_settings.py", +) +update_module = _load_module( + "app.agent.tools.impl.update_system_settings", + "app/agent/tools/impl/update_system_settings.py", +) +manager_module = _load_module( + "app.agent.tools.manager", + "app/agent/tools/manager.py", +) + +QuerySystemSettingsTool = query_module.QuerySystemSettingsTool +UpdateSystemSettingsTool = update_module.UpdateSystemSettingsTool +MoviePilotToolsManager = manager_module.MoviePilotToolsManager + + +class TestAgentSystemSettingsTools(unittest.TestCase): + def test_query_system_settings_returns_exact_systemconfig_value(self): + tool = QuerySystemSettingsTool(session_id="session-1", user_id="10001") + config_oper = MagicMock() + config_oper.get.return_value = [{"name": "qb", "enabled": True}] + + with patch.object(query_module, "SystemConfigOper", return_value=config_oper): + result = asyncio.run(tool.run(setting_key="Downloaders")) + + payload = json.loads(result) + self.assertTrue(payload["success"]) + self.assertEqual(payload["matched_count"], 1) + self.assertEqual(payload["settings"][0]["setting_key"], "Downloaders") + self.assertEqual(payload["settings"][0]["value"][0]["name"], "qb") + + def test_query_system_settings_group_defaults_to_summary_for_multiple_items(self): + tool = QuerySystemSettingsTool(session_id="session-1", user_id="10001") + config_oper = MagicMock() + config_oper.get.return_value = [] + + with patch.object(query_module, "SystemConfigOper", return_value=config_oper): + result = asyncio.run(tool.run(group="systemconfig")) + + payload = json.loads(result) + self.assertTrue(payload["success"]) + self.assertFalse(payload["include_values"]) + self.assertGreater(payload["matched_count"], 1) + + def test_update_system_settings_merges_dict_and_emits_event(self): + tool = UpdateSystemSettingsTool(session_id="session-1", user_id="10001") + config_oper = MagicMock() + config_oper.get.side_effect = [ + {"chatgpt": {"enabled": True}}, + {"chatgpt": {"enabled": False}, "gemini": {"enabled": True}}, + ] + config_oper.async_set = AsyncMock(return_value=True) + + with patch.object( + update_module, "SystemConfigOper", return_value=config_oper + ), patch.object( + update_module.eventmanager, + "async_send_event", + new=AsyncMock(), + ) as send_event: + result = asyncio.run( + tool.run( + setting_key="AIAgentConfig", + operation="merge_dict", + value={"chatgpt": {"enabled": False}, "gemini": {"enabled": True}}, + ) + ) + + payload = json.loads(result) + self.assertTrue(payload["success"]) + self.assertTrue(payload["changed"]) + config_oper.async_set.assert_awaited_once_with( + "AIAgentConfig", + {"chatgpt": {"enabled": False}, "gemini": {"enabled": True}}, + ) + send_event.assert_awaited_once() + + def test_update_system_settings_upserts_named_list_item(self): + tool = UpdateSystemSettingsTool(session_id="session-1", user_id="10001") + config_oper = MagicMock() + config_oper.get.side_effect = [ + [{"name": "qb", "enabled": False}], + [{"name": "qb", "enabled": True}], + ] + config_oper.async_set = AsyncMock(return_value=True) + + with patch.object( + update_module, "SystemConfigOper", return_value=config_oper + ), patch.object( + update_module.eventmanager, + "async_send_event", + new=AsyncMock(), + ): + result = asyncio.run( + tool.run( + setting_key="downloaders", + operation="upsert_list_item", + value={"name": "qb", "enabled": True}, + ) + ) + + payload = json.loads(result) + self.assertTrue(payload["success"]) + self.assertEqual(payload["saved_value"], [{"name": "qb", "enabled": True}]) + + def test_update_system_settings_updates_basic_settings(self): + tool = UpdateSystemSettingsTool(session_id="session-1", user_id="10001") + + with patch.object( + update_module.settings, + "update_setting", + return_value=(True, ""), + ) as update_setting, patch.object( + UpdateSystemSettingsTool, + "_load_setting_value", + side_effect=["https://old.example.com", "https://new.example.com"], + ), patch.object( + update_module.eventmanager, + "async_send_event", + new=AsyncMock(), + ) as send_event: + result = asyncio.run( + tool.run(setting_key="APP_DOMAIN", value="https://new.example.com") + ) + + payload = json.loads(result) + self.assertTrue(payload["success"]) + self.assertTrue(payload["changed"]) + update_setting.assert_called_once_with("APP_DOMAIN", "https://new.example.com") + send_event.assert_awaited_once() + + def test_tool_manager_blocks_admin_tools_for_non_admin_context(self): + tool = QuerySystemSettingsTool(session_id="session-1", user_id="10001") + + with patch.object( + manager_module.MoviePilotToolFactory, + "create_tools", + return_value=[tool], + ): + manager = MoviePilotToolsManager(is_admin=False) + result = asyncio.run( + manager.call_tool( + "query_system_settings", + {"setting_key": "Downloaders"}, + ) + ) + + payload = json.loads(result) + self.assertIn("error", payload) + self.assertIn("系统管理员", payload["error"]) + + +if __name__ == "__main__": + unittest.main()