feat: optimize v2 ChatGPT recognition plugin

This commit is contained in:
jxxghp
2026-05-27 06:59:47 +08:00
parent ec0c8cc521
commit b048106d2e
4 changed files with 548 additions and 589 deletions

View File

@@ -116,13 +116,15 @@
},
"ChatGPT": {
"name": "ChatGPT",
"description": "消息交互支持与ChatGPT对话。",
"labels": "消息通知,识别",
"version": "2.1.9",
"description": "使用系统智能助手或 Agent Tokens 管理插件的 LLM 配置增强媒体名称识别。",
"labels": "AI,识别,LLM,媒体识别,Agent Tokens",
"version": "3.0",
"icon": "Chatgpt_A.png",
"author": "jxxghp",
"level": 1,
"system_version": ">=2.13.0",
"history": {
"v3.0": "移除聊天功能,仅保留媒体识别增强;模型来源改为系统智能助手设置或 Agent Tokens 管理插件;重写英文识别提示词并优化配置界面。",
"v2.1.9": "更新依赖库",
"v2.1.8": "修复 OpenAI API >=1.0.0 兼容性问题",
"v2.1.7": "独立安装OpenAi SDK依赖",

View File

@@ -1,23 +1,49 @@
from typing import Any, List, Dict, Tuple
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple
from app.core.config import settings
from app.core.event import eventmanager, Event
from app.core.event import Event, eventmanager
from app.log import logger
from app.plugins import _PluginBase
from app.plugins.chatgpt.openai import OpenAi
from app.schemas.types import EventType, ChainEventType
from app.schemas import NotificationType
from app.schemas import AgentLLMProviderEventData, AgentTokensUsageEventData, NotificationType
from app.schemas.types import ChainEventType, EventType
DEFAULT_RECOGNIZE_PROMPT = """
You are a media filename recognition engine for MoviePilot.
Parse the movie or TV filename provided by the user and return exactly one JSON object:
{"name":string,"version":string,"part":string,"year":string,"resolution":string,"season":number|null,"episode":number|null}
Rules:
- Return JSON only. Do not wrap it in Markdown and do not add explanations.
- Use the most likely official title in "name"; remove release group, source, codec, audio, subtitle, edition, and site tags.
- Preserve meaningful edition information such as Director's Cut, Extended, Theatrical, Remastered, IMAX, Uncut, or Part in "version" or "part".
- If the filename contains Chinese homophones, pinyin, initials, or letter substitutions, infer the most likely real Chinese title.
- Put a four-digit release year in "year" when it is clearly present; otherwise use an empty string.
- Put resolution such as 2160p, 1080p, 720p, 4K, UHD, or HD in "resolution" when present; otherwise use an empty string.
- For TV series, extract numeric season and episode when reliable. Use null when unknown or ambiguous.
- For movies, use null for both season and episode unless the filename clearly describes a split part.
""".strip()
class ChatGPT(_PluginBase):
"""
ChatGPT 识别增强插件,仅保留媒体名称辅助识别能力。
"""
MODEL_SOURCE_SYSTEM = "system"
MODEL_SOURCE_AGENT_TOKENS = "agent_tokens"
# 插件名称
plugin_name = "ChatGPT"
# 插件描述
plugin_desc = "大模型对话与媒体识别增强"
plugin_desc = "使用 MoviePilot 系统智能助手或 Agent Tokens 管理插件的 LLM 配置增强媒体名称识别"
# 插件图标
plugin_icon = "Chatgpt_A.png"
# 插件版本
plugin_version = "2.1.9"
plugin_version = "3.0"
# 插件作者
plugin_author = "jxxghp"
# 作者主页
@@ -29,462 +55,416 @@ class ChatGPT(_PluginBase):
# 可使用的用户级别
auth_level = 1
# 私有属性
openai = None
openai: Optional[OpenAi] = None
_enabled = False
_proxy = False
_compatible = False
_recognize = False
_openai_url = None
_openai_key = None
_model = None
# 存储多个API密钥
_api_keys = []
# 当前使用的密钥索引
_current_key_index = 0
# 密钥失效状态
_key_status = {}
# 是否发送通知
_model_source = MODEL_SOURCE_SYSTEM
_notify = False
# 自定义提示词
_customize_prompt = '接下来我会给你一个电影或电视剧的文件名你需要识别文件名中的名称、版本、分段、年份、分瓣率、季集等信息并按以下JSON格式返回{"name":string,"version":string,"part":string,"year":string,"resolution":string,"season":number|null,"episode":number|null}特别注意返回结果需要严格附合JSON格式不需要有任何其它的字符。如果中文电影或电视剧的文件名中存在谐音字或字母替代的情况请还原最有可能的结果。'
_customize_prompt = DEFAULT_RECOGNIZE_PROMPT
def init_plugin(self, config: dict = None):
if config:
self._enabled = config.get("enabled")
self._proxy = config.get("proxy")
self._compatible = config.get("compatible")
self._recognize = config.get("recognize")
self._openai_url = config.get("openai_url")
self._openai_key = config.get("openai_key")
self._model = config.get("model")
self._notify = config.get("notify")
self._customize_prompt = config.get("customize_prompt")
# 处理多个API密钥
if self._openai_key:
self._api_keys = [key.strip() for key in self._openai_key.split(',') if key.strip()]
# 初始化密钥状态
self._key_status = {key: True for key in self._api_keys}
logger.info(f"ChatGPT插件加载了 {len(self._api_keys)} 个API密钥")
if self._openai_url and self._api_keys:
# 使用第一个密钥初始化
self._current_key_index = 0
self.init_openai(self._api_keys[self._current_key_index])
def init_openai(self, api_key):
"""
初始化OpenAI客户端
初始化插件配置并同步识别事件处理器状态。
"""
if self._openai_url and api_key:
self.openai = OpenAi(api_key=api_key, api_url=self._openai_url,
proxy=settings.PROXY if self._proxy else None,
model=self._model, compatible=bool(self._compatible), customize_prompt=self._customize_prompt)
logger.info(f"ChatGPT插件初始化API客户端成功")
return True
return False
config = config or {}
def switch_to_next_key(self, failed_key):
enabled = bool(config.get("enabled"))
if "recognize" in config:
enabled = enabled and bool(config.get("recognize"))
model_source = self._clean_text(config.get("model_source")) or self.MODEL_SOURCE_SYSTEM
if model_source not in {self.MODEL_SOURCE_SYSTEM, self.MODEL_SOURCE_AGENT_TOKENS}:
model_source = self.MODEL_SOURCE_SYSTEM
self._enabled = enabled
self._model_source = model_source
self._notify = bool(config.get("notify"))
self._customize_prompt = self._clean_text(config.get("customize_prompt")) or DEFAULT_RECOGNIZE_PROMPT
self.openai = None
self._sync_event_handler_state()
def _sync_event_handler_state(self) -> None:
"""
切换到下一个可用的API密钥
:return: (is_switched, error_message) 元组,表示是否切换成功及错误信息
按插件开关启用或禁用链式识别事件处理器。
"""
# 标记当前密钥为失效
self._key_status[failed_key] = False
try:
if self._enabled:
eventmanager.enable_event_handler(self.recognize)
else:
eventmanager.disable_event_handler(self.recognize)
except Exception as exc:
logger.debug(f"同步 ChatGPT 识别事件处理器状态失败: {exc}")
# 寻找下一个可用的密钥
original_index = self._current_key_index
while True:
self._current_key_index = (self._current_key_index + 1) % len(self._api_keys)
next_key = self._api_keys[self._current_key_index]
@staticmethod
def _clean_text(value: Any) -> str:
"""
清理配置或事件中的文本字段。
"""
return str(value or "").strip()
# 如果密钥标记为可用或者已经尝试了所有密钥,则使用该密钥
if self._key_status.get(next_key, True) or self._current_key_index == original_index:
break
@staticmethod
def _event_get(event_data: Any, key: str, default: Any = None) -> Any:
"""
兼容读取字典或事件模型中的字段。
"""
if isinstance(event_data, dict):
return event_data.get(key, default)
return getattr(event_data, key, default)
# 检查是否所有密钥都失效
if all(not status for status in self._key_status.values()):
logger.error("所有API密钥均已失效")
return False, "所有API密钥均已失效请检查配置"
# 使用新密钥重新初始化客户端
next_key = self._api_keys[self._current_key_index]
logger.info(f"切换到下一个API密钥 {next_key}")
success = self.init_openai(next_key)
return success, ""
@staticmethod
def _event_set(event_data: Any, key: str, value: Any) -> None:
"""
兼容写入字典或事件模型中的字段。
"""
if isinstance(event_data, dict):
event_data[key] = value
else:
setattr(event_data, key, value)
def get_state(self) -> bool:
return self._enabled
"""
返回插件是否启用。
"""
return bool(self._enabled)
@staticmethod
def get_command() -> List[Dict[str, Any]]:
pass
"""
当前插件不注册用户消息命令。
"""
return []
def get_api(self) -> List[Dict[str, Any]]:
pass
"""
当前插件不注册额外 API。
"""
return []
def get_form(self) -> Tuple[List[dict], Dict[str, Any]]:
"""
拼装插件配置页面需要返回两块数据1、页面配置2、数据结构
构建只面向识别增强的插件配置页面。
"""
return [
{
'component': 'VForm',
'content': [
"component": "VForm",
"content": [
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 4
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'enabled',
'label': '启用插件',
}
}
]
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 4
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'proxy',
'label': '使用代理服务器',
}
}
]
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 4
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'compatible',
'label': '兼容模式',
}
}
]
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 4
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'recognize',
'label': '辅助识别',
}
}
]
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 4
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'notify',
'label': '开启通知',
}
}
]
}
]
"component": "VAlert",
"props": {
"type": "info",
"variant": "tonal",
"text": "插件仅在 MoviePilot 原生识别失败后参与名称识别增强,不再处理聊天消息。模型配置可直接使用系统智能助手设置,或通过 Agent Tokens 管理插件动态分配。",
},
},
{
'component': 'VRow',
'content': [
"component": "VRow",
"content": [
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 4
},
'content': [
"component": "VCol",
"props": {"cols": 12, "md": 6},
"content": [
{
'component': 'VTextField',
'props': {
'model': 'openai_url',
'label': 'OpenAI API Url',
'placeholder': 'https://api.openai.com',
}
"component": "VSwitch",
"props": {
"model": "enabled",
"label": "启用识别增强",
"hint": "开启后监听媒体名称辅助识别链式事件",
"persistent-hint": True,
},
}
]
],
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 4
},
'content': [
"component": "VCol",
"props": {"cols": 12, "md": 6},
"content": [
{
'component': 'VTextField',
'props': {
'model': 'openai_key',
'label': 'API密钥 (多个密钥以逗号分隔)',
'placeholder': 'sk-xxx,sk-yyy'
}
"component": "VSwitch",
"props": {
"model": "notify",
"label": "调用失败通知",
"hint": "模型配置缺失或调用失败时发送插件通知",
"persistent-hint": True,
},
}
]
],
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 4
},
'content': [
{
'component': 'VTextField',
'props': {
'model': 'model',
'label': '自定义模型',
'placeholder': 'gpt-3.5-turbo',
}
}
]
}
]
],
},
{
'component': 'VRow',
'content': [
"component": "VRow",
"content": [
{
'component': 'VCol',
'props': {'cols': 12},
'content': [
"component": "VCol",
"props": {"cols": 12},
"content": [
{
'component': 'VTextarea',
'props': {
'rows': 2,
'auto-grow': True,
'model': 'customize_prompt',
'label': '辅助识别提示词',
'hint': '在辅助识别时的给AI的提示词',
'clearable': True,
'persistent-hint': True,
}
"component": "VSelect",
"props": {
"model": "model_source",
"label": "模型来源",
"items": [
{"title": "使用系统智能助手设置", "value": self.MODEL_SOURCE_SYSTEM},
{"title": "使用 Agent Tokens 管理插件", "value": self.MODEL_SOURCE_AGENT_TOKENS},
],
"hint": "Agent Tokens 模式会发出 Agent LLM 供应商链式事件,并读取插件返回的 API Base URL、API Key 与模型 ID。",
"persistent-hint": True,
},
}
]
],
}
]
],
},
{
'component': 'VRow',
'content': [
"component": "VAlert",
"props": {
"type": "warning",
"variant": "tonal",
"text": "选择 Agent Tokens 管理插件时请先启用该插件并至少配置一个已启用、未耗尽且填写了模型地址、API Key 和模型 ID 的供应商。",
},
},
{
"component": "VRow",
"content": [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
"component": "VCol",
"props": {"cols": 12},
"content": [
{
'component': 'VAlert',
'props': {
'type': 'info',
'variant': 'tonal',
'text': '开启插件后,消息交互时使用请[问帮你]开头或者以号结尾或者超过10个汉字/单词则会触发ChatGPT回复。'
'开启辅助识别后,内置识别功能无法正常识别种子/文件名称时将使用ChatGTP进行AI辅助识别可以提升动漫等非规范命名的识别成功率。'
'支持输入多个API密钥以逗号分隔在密钥调用失败时将自动切换到下一个可用密钥。'
'开启通知选项后将在API密钥调用失败时发送系统通知。'
}
"component": "VTextarea",
"props": {
"rows": 8,
"auto-grow": True,
"model": "customize_prompt",
"label": "识别增强系统提示词",
"hint": "用于约束模型只返回 MoviePilot 可消费的 JSON 识别结果",
"clearable": True,
"persistent-hint": True,
},
}
]
],
}
]
}
]
],
},
],
}
], {
"enabled": False,
"proxy": False,
"compatible": False,
"recognize": False,
"model_source": self.MODEL_SOURCE_SYSTEM,
"notify": False,
"openai_url": "https://api.openai.com",
"openai_key": "",
"model": "gpt-3.5-turbo",
"customize_prompt": '接下来我会给你一个电影或电视剧的文件名你需要识别文件名中的名称、版本、分段、年份、分瓣率、季集等信息并按以下JSON格式返回{"name":string, '
'"version":string,"part":string,"year":string,"resolution":string,"season":number|null,"episode":number|null}特别注意返回结果需要严格附合JSON格式不需要有任何其它的字符。如果中文电影或电视剧的文件名中存在谐音字或字母替代的情况请还原最有可能的结果。'
"customize_prompt": DEFAULT_RECOGNIZE_PROMPT,
}
def get_page(self) -> List[dict]:
pass
"""
当前插件不提供独立详情页。
"""
return []
def _resolve_system_model_config(self) -> Tuple[Optional[Dict[str, Any]], str]:
"""
直接从 MoviePilot 系统智能助手配置读取 LLM 运行参数。
"""
config = {
"provider": getattr(settings, "LLM_PROVIDER", None),
"model": getattr(settings, "LLM_MODEL", None),
"api_key": getattr(settings, "LLM_API_KEY", None),
"base_url": getattr(settings, "LLM_BASE_URL", None),
"base_url_preset": getattr(settings, "LLM_BASE_URL_PRESET", None),
"user_agent": getattr(settings, "LLM_USER_AGENT", None),
"thinking_level": getattr(settings, "LLM_THINKING_LEVEL", None),
"source": "system",
}
return self._normalize_model_config(config)
def _resolve_agent_tokens_model_config(self) -> Tuple[Optional[Dict[str, Any]], str]:
"""
通过 Agent LLM 供应商链式事件从 Agent Tokens 管理插件读取 LLM 运行参数。
"""
event_data = AgentLLMProviderEventData(
provider=getattr(settings, "LLM_PROVIDER", None),
model=getattr(settings, "LLM_MODEL", None),
api_key=getattr(settings, "LLM_API_KEY", None),
base_url=getattr(settings, "LLM_BASE_URL", None),
base_url_preset=getattr(settings, "LLM_BASE_URL_PRESET", None),
user_agent=getattr(settings, "LLM_USER_AGENT", None),
thinking_level=None,
)
selected_event = eventmanager.send_event(ChainEventType.AgentLLMProvider, event_data)
resolved_data = selected_event.event_data if selected_event else event_data
if not self._clean_text(self._event_get(resolved_data, "selected_provider_id")):
return None, "Agent Tokens 管理插件未返回可用供应商请启用该插件并配置好模型地址、API Key 和模型 ID"
config = {
"provider": self._event_get(resolved_data, "provider"),
"model": self._event_get(resolved_data, "model"),
"api_key": self._event_get(resolved_data, "api_key"),
"base_url": self._event_get(resolved_data, "base_url"),
"base_url_preset": self._event_get(resolved_data, "base_url_preset"),
"user_agent": self._event_get(resolved_data, "user_agent"),
"thinking_level": self._event_get(resolved_data, "thinking_level"),
"selected_provider_id": self._event_get(resolved_data, "selected_provider_id"),
"selected_provider_name": self._event_get(resolved_data, "selected_provider_name"),
"source": self._event_get(resolved_data, "source") or "AgentTokens",
}
return self._normalize_model_config(config)
def _normalize_model_config(self, config: Dict[str, Any]) -> Tuple[Optional[Dict[str, Any]], str]:
"""
标准化模型运行参数并校验必要字段。
"""
normalized = {
"provider": self._clean_text(config.get("provider")) or "openai",
"model": self._clean_text(config.get("model")),
"api_key": self._clean_text(config.get("api_key")),
"base_url": self._clean_text(config.get("base_url")) or None,
"base_url_preset": self._clean_text(config.get("base_url_preset")) or None,
"user_agent": self._clean_text(config.get("user_agent")) or None,
"thinking_level": self._clean_text(config.get("thinking_level")) or None,
"selected_provider_id": self._clean_text(config.get("selected_provider_id")) or None,
"selected_provider_name": self._clean_text(config.get("selected_provider_name")) or None,
"source": self._clean_text(config.get("source")) or None,
}
if not normalized["api_key"]:
return None, "未配置 LLM API Key"
if not normalized["model"]:
return None, "未配置 LLM 模型 ID"
return normalized, ""
def _resolve_model_config(self) -> Tuple[Optional[Dict[str, Any]], str]:
"""
根据配置的模型来源解析本次识别调用的 LLM 运行参数。
"""
if self._model_source == self.MODEL_SOURCE_AGENT_TOKENS:
return self._resolve_agent_tokens_model_config()
return self._resolve_system_model_config()
def init_openai(self, model_config: Dict[str, Any]) -> bool:
"""
使用解析出的 LLM 运行参数初始化识别客户端。
"""
if not model_config:
self.openai = None
return False
self.openai = OpenAi(
api_key=model_config.get("api_key"),
api_url=model_config.get("base_url"),
provider=model_config.get("provider"),
model=model_config.get("model"),
base_url_preset=model_config.get("base_url_preset"),
user_agent=model_config.get("user_agent"),
thinking_level=model_config.get("thinking_level"),
customize_prompt=self._customize_prompt,
)
logger.info(
"ChatGPT 识别增强初始化 LLM 成功,来源:%sProvider%sModel%s",
self._model_source,
model_config.get("provider"),
model_config.get("model"),
)
return True
@staticmethod
def is_api_error(response):
def is_api_error(response: Any) -> Tuple[bool, str]:
"""
判断响应是否表示API错误
:param response: API响应
:return: (is_error, error_message) 元组,表示是否错误及错误信息
判断识别响应是否为模型调用错误
"""
# 检查响应是否为字典且包含errorMsg
if isinstance(response, dict) and response.get("errorMsg"):
return True, response.get("errorMsg")
# 检查响应是否为字符串且包含错误信息
if isinstance(response, str) and "请求ChatGPT出现错误" in response:
return True, response
# 如果没有错误信息,则表示调用成功
return True, str(response.get("errorMsg"))
return False, ""
@eventmanager.register(EventType.UserMessage)
def talk(self, event: Event):
def _notify_error(self, message: str) -> None:
"""
监听用户消息获取ChatGPT回复
按配置发送插件错误通知。
"""
if not self._enabled:
return
if not self.openai:
return
text = event.event_data.get("text")
userid = event.event_data.get("userid")
channel = event.event_data.get("channel")
if not text:
return
if text.startswith("http") or text.startswith("magnet") or text.startswith("ftp"):
return
# 尝试获取响应失败时切换API密钥
retry_count = 0
max_retries = len(self._api_keys)
while retry_count < max_retries:
response = self.openai.get_response(text=text, userid=userid)
# 判断响应是否正常
is_error, error_msg = self.is_api_error(response)
logger.info(f"ChatGPT返回结果{response}")
if is_error:
current_key = self._api_keys[self._current_key_index]
switched, switch_error = self.switch_to_next_key(current_key)
# 发送密钥失效通知
if self._notify:
message = f"API密钥 {current_key} 调用失败: {error_msg}"
self.post_message(channel=channel, title=message, userid=userid)
# 如果所有密钥都失效,发送额外通知
if not switched:
message = switch_error
self.post_message(mtype=NotificationType.Plugin, title="ChatGpt", text=message)
if not switched:
# 所有密钥都失效,发送消息并退出
return
retry_count += 1
else:
# 成功获取响应
self.post_message(channel=channel, title=response, userid=userid)
return
# 所有重试都失败
logger.warning(message)
if self._notify:
self.post_message(channel=channel,
title="无法获取ChatGPT响应所有API密钥都已失效",
userid=userid)
self.post_message(mtype=NotificationType.Plugin, title=self.plugin_name, text=message)
def _record_agent_tokens_usage(
self,
model_config: Dict[str, Any],
usage: Dict[str, int],
success: bool,
error: Optional[str] = None,
) -> None:
"""
将 Agent Tokens 模式下的识别调用用量回写给配额管理插件。
"""
if self._model_source != self.MODEL_SOURCE_AGENT_TOKENS:
return
if not model_config or not model_config.get("selected_provider_id"):
return
usage = usage or {}
event_data = AgentTokensUsageEventData(
session_id=f"chatgpt-recognize-{datetime.now().strftime('%Y%m%d%H%M%S')}",
selected_provider_id=model_config.get("selected_provider_id"),
selected_provider_name=model_config.get("selected_provider_name"),
provider=model_config.get("provider"),
base_url=model_config.get("base_url"),
model=model_config.get("model"),
input_tokens=int(usage.get("input_tokens") or 0),
output_tokens=int(usage.get("output_tokens") or 0),
total_tokens=int(usage.get("total_tokens") or 0),
model_call_count=1,
success=success,
error=error,
finished_at=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
source=self.__class__.__name__,
)
eventmanager.send_event(EventType.AgentTokensUsage, event_data)
def _write_recognition_result(self, event_data: Any, response: Dict[str, Any]) -> None:
"""
将模型识别结果写回 MoviePilot 名称识别链式事件。
"""
for key in ("name", "year", "season", "episode"):
self._event_set(event_data, key, response.get(key))
self._event_set(event_data, "source_plugin", self.__class__.__name__)
@eventmanager.register(ChainEventType.NameRecognize)
def recognize(self, event: Event):
"""
监听识别事件使用ChatGPT辅助识别名称
监听名称识别链式事件,使用 LLM 进行辅助识别
"""
if not self.openai:
if not self._enabled or not event or not event.event_data:
return
if not self._recognize:
if self._event_get(event.event_data, "source_plugin") or self._event_get(event.event_data, "name"):
return
if not event.event_data:
return
title = event.event_data.get("title")
title = self._clean_text(self._event_get(event.event_data, "title"))
if not title:
return
# 尝试获取媒体名称失败时切换API密钥
retry_count = 0
max_retries = len(self._api_keys)
model_config, error = self._resolve_model_config()
if error:
self._notify_error(f"ChatGPT 识别增强不可用:{error}")
return
if not self.init_openai(model_config) or not self.openai:
self._notify_error("ChatGPT 识别增强不可用LLM 客户端初始化失败")
return
while retry_count < max_retries:
response = self.openai.get_media_name(filename=title)
logger.info(f"ChatGPT返回结果{response}")
response = self.openai.get_media_name(filename=title)
logger.info(f"ChatGPT 识别增强返回结果:{response}")
is_error, error_msg = self.is_api_error(response)
usage = self.openai.get_last_usage() if self.openai else {}
self._record_agent_tokens_usage(model_config, usage, success=not is_error, error=error_msg)
# 判断响应是否正常
is_error, error_msg = self.is_api_error(response)
if is_error:
self._notify_error(f"ChatGPT 识别增强调用失败:{error_msg}")
return
if not isinstance(response, dict) or not response.get("name"):
self._notify_error(f"ChatGPT 识别增强未返回有效名称:{title}")
return
# 如果不是错误但返回字典中没有name字段也视为错误
if not is_error and isinstance(response, dict) and not response.get("name"):
is_error = True
error_msg = "未返回有效识别结果"
if is_error:
# 发生错误,尝试切换密钥
current_key = self._api_keys[self._current_key_index]
switched, switch_error = self.switch_to_next_key(current_key)
# 发送密钥失效通知 (通过系统通知,因为这里没有用户交互)
if self._notify:
message = f"API密钥 {current_key} 调用失败: {error_msg}"
self.post_message(mtype=NotificationType.Plugin, title="ChatGpt", text=message)
# 如果所有密钥都失效,发送额外通知
if not switched:
message = switch_error
self.post_message(mtype=NotificationType.Plugin, title="ChatGpt", text=message)
if not switched:
# 所有密钥都失效
return
retry_count += 1
else:
# 成功获取结果
event.event_data = {
'title': title,
'name': response.get("name"),
'year': response.get("year"),
'season': response.get("season"),
'episode': response.get("episode")
}
return
# 所有重试都失败
if self._notify:
logger.error(f"无法识别标题 {title}所有API密钥都已失效")
self.post_message(mtype=NotificationType.Plugin,
title="ChatGpt",
text=f"无法识别标题 {title}所有API密钥都已失效")
self._write_recognition_result(event.event_data, response)
def stop_service(self):
"""
退出插件
停止插件时禁用识别事件处理器。
"""
pass
try:
eventmanager.disable_event_handler(self.recognize)
except Exception as exc:
logger.debug(f"禁用 ChatGPT 识别事件处理器失败: {exc}")

View File

@@ -1,235 +1,213 @@
import asyncio
import inspect
import json
import re
import time
from typing import List, Union
import threading
from typing import Any, Dict, Optional
import openai
from cacheout import Cache
OpenAISessionCache = Cache(maxsize=100, ttl=3600, timer=time.time, default=None)
from app.agent.llm import LLMHelper
from langchain_core.messages import HumanMessage, SystemMessage
class OpenAi:
_api_key: str = None
_api_url: str = None
_model: str = "gpt-3.5-turbo"
_prompt: str = '接下来我会给你一个电影或电视剧的文件名你需要识别文件名中的名称、版本、分段、年份、分瓣率、季集等信息并按以下JSON格式返回{"name":string,"version":string,"part":string,"year":string,"resolution":string,"season":number|null,"episode":number|null}特别注意返回结果需要严格附合JSON格式不需要有任何其它的字符。如果中文电影或电视剧的文件名中存在谐音字或字母替代的情况请还原最有可能的结果。'
_client: openai.OpenAI = None
"""
Lightweight LLM recognition client kept under the original module name for plugin compatibility.
"""
def __init__(self, api_key: str = None, api_url: str = None,
proxy: dict = None, model: str = None,
compatible: bool = False, customize_prompt: str = None):
_JSON_FENCE_PATTERN = re.compile(r"^```(?:json)?\s*([\s\S]*?)\s*```$", re.IGNORECASE)
def __init__(
self,
api_key: str = None,
api_url: str = None,
provider: str = None,
model: str = None,
base_url_preset: str = None,
user_agent: str = None,
thinking_level: str = None,
customize_prompt: str = None,
**kwargs,
):
"""
初始化用于媒体识别的 LLM 客户端运行参数。
"""
self._api_key = api_key
self._api_url = api_url
if model:
self._model = model
if customize_prompt:
self._prompt = customize_prompt
# 初始化 OpenAI 客户端
if self._api_key and self._api_url:
base_url = self._api_url if compatible else self._api_url + "/v1"
http_client = None
if proxy and proxy.get("https"):
import httpx
proxy_url = proxy.get("https")
# httpx 支持字符串格式的代理 URL
http_client = httpx.Client(proxies=proxy_url, timeout=60.0)
self._client = openai.OpenAI(
api_key=self._api_key,
base_url=base_url,
http_client=http_client
)
self._provider = provider or "openai"
self._model = model
self._base_url_preset = base_url_preset
self._user_agent = user_agent
self._thinking_level = thinking_level
self._prompt = customize_prompt or ""
self._last_usage: Dict[str, int] = {}
def get_state(self) -> bool:
return True if self._api_key else False
"""
返回当前客户端是否具备发起识别调用的必要模型配置。
"""
return bool(self._api_key and self._model)
def get_last_usage(self) -> Dict[str, int]:
"""
返回最近一次模型调用提取到的 token 用量。
"""
return dict(self._last_usage or {})
@staticmethod
def __save_session(session_id: str, message: str):
def _run_async_compatible(value: Any) -> Any:
"""
保存会话
:param session_id: 会话ID
:param message: 消息
:return:
在同步插件回调中兼容执行新版 MoviePilot 的异步 LLM 初始化。
"""
seasion = OpenAISessionCache.get(session_id)
if seasion:
seasion.append({
"role": "assistant",
"content": message
})
OpenAISessionCache.set(session_id, seasion)
if not inspect.isawaitable(value):
return value
try:
asyncio.get_running_loop()
except RuntimeError:
return asyncio.run(value)
result: Dict[str, Any] = {}
error: Dict[str, BaseException] = {}
def _worker() -> None:
"""
在独立线程中运行协程,避免嵌套事件循环。
"""
try:
result["value"] = asyncio.run(value)
except BaseException as exc: # noqa: BLE001
error["exc"] = exc
thread = threading.Thread(target=_worker, daemon=True)
thread.start()
thread.join()
if "exc" in error:
raise error["exc"]
return result.get("value")
@staticmethod
def __get_session(session_id: str, message: str) -> List[dict]:
def _lookup_int(data: Any, key: str) -> Optional[int]:
"""
获取会话
:param session_id: 会话ID
:return: 会话上下文
从字典或对象字段中安全读取整数 token 统计。
"""
seasion = OpenAISessionCache.get(session_id)
if seasion:
seasion.append({
"role": "user",
"content": message
})
else:
seasion = [
{
"role": "system",
"content": "请在接下来的对话中请使用中文回复,并且内容尽可能详细。"
},
{
"role": "user",
"content": message
}]
OpenAISessionCache.set(session_id, seasion)
return seasion
if not data:
return None
value = data.get(key) if isinstance(data, dict) else getattr(data, key, None)
try:
return int(value) if value is not None else None
except (TypeError, ValueError):
return None
def __get_model(self, message: Union[str, List[dict]],
prompt: str = None,
user: str = "MoviePilot",
**kwargs):
@classmethod
def _extract_usage(cls, response: Any) -> Dict[str, int]:
"""
获取模型
从 LangChain AIMessage 中提取 token 用量。
"""
if not self._client:
raise ValueError("OpenAI client not initialized. Please check API key and API URL.")
if not isinstance(message, list):
if prompt:
message = [
{
"role": "system",
"content": prompt
},
{
"role": "user",
"content": message
}
]
else:
message = [
{
"role": "user",
"content": message
}
]
# 新版本 API 不支持 user 参数,需要从 kwargs 中移除
kwargs.pop('user', None)
return self._client.chat.completions.create(
model=self._model,
messages=message,
**kwargs
usage_metadata = getattr(response, "usage_metadata", None)
response_metadata = getattr(response, "response_metadata", None) or {}
token_usage = (
response_metadata.get("token_usage")
or response_metadata.get("usage")
or response_metadata.get("usage_metadata")
or {}
)
@staticmethod
def __clear_session(session_id: str):
"""
清除会话
:param session_id: 会话ID
:return:
"""
if OpenAISessionCache.get(session_id):
OpenAISessionCache.delete(session_id)
input_tokens = (
cls._lookup_int(usage_metadata, "input_tokens")
or cls._lookup_int(token_usage, "input_tokens")
or cls._lookup_int(token_usage, "prompt_tokens")
or 0
)
output_tokens = (
cls._lookup_int(usage_metadata, "output_tokens")
or cls._lookup_int(token_usage, "output_tokens")
or cls._lookup_int(token_usage, "completion_tokens")
or 0
)
total_tokens = (
cls._lookup_int(usage_metadata, "total_tokens")
or cls._lookup_int(token_usage, "total_tokens")
or input_tokens + output_tokens
)
return {
"input_tokens": max(input_tokens, 0),
"output_tokens": max(output_tokens, 0),
"total_tokens": max(total_tokens, 0),
}
def get_media_name(self, filename: str):
def _get_llm(self) -> Any:
"""
从文件名中提取媒体名称等要素
:param filename: 文件名
:return: Json
按当前运行参数创建 MoviePilot LLM 实例。
"""
llm = LLMHelper.get_llm(
streaming=False,
provider=self._provider,
model=self._model,
thinking_level=self._thinking_level,
api_key=self._api_key,
base_url=self._api_url,
base_url_preset=self._base_url_preset,
user_agent=self._user_agent,
)
return self._run_async_compatible(llm)
@staticmethod
def _extract_response_text(response: Any) -> str:
"""
从模型响应对象中提取文本内容。
"""
content = getattr(response, "content", response)
return LLMHelper._extract_text_content(content).strip()
@classmethod
def _strip_json_fence(cls, text: str) -> str:
"""
移除模型可能附加的 Markdown JSON 代码块包裹。
"""
text = str(text or "").strip()
match = cls._JSON_FENCE_PATTERN.match(text)
return match.group(1).strip() if match else text
@classmethod
def _extract_json_text(cls, text: str) -> str:
"""
从模型回复中提取第一个 JSON 对象文本。
"""
text = cls._strip_json_fence(text)
if text.startswith("{") and text.endswith("}"):
return text
start = text.find("{")
end = text.rfind("}")
if start >= 0 and end > start:
return text[start:end + 1]
return text
def get_media_name(self, filename: str) -> Dict[str, Any]:
"""
从媒体文件名中提取结构化识别信息。
"""
self._last_usage = {}
if not self.get_state():
return None
return {"errorMsg": "LLM API Key or model is not configured"}
result = ""
try:
_filename_prompt = self._prompt
completion = self.__get_model(prompt=_filename_prompt, message=filename)
result = completion.choices[0].message.content
# 有些模型返回json数据时会使用 ```json ``` 包裹json对象 所以需要进行提取
# 定义正则表达式模式,匹配```json开头和```结尾的内容
pattern = r'^```json\s*([\s\S]*?)\s*```$'
# 使用正则表达式进行匹配
match = re.match(pattern, result.strip())
if match:
# 提取中间的JSON部分
result = match.group(1)
return json.loads(result)
except Exception as e:
llm = self._get_llm()
completion = llm.invoke(
[
SystemMessage(content=self._prompt),
HumanMessage(content=str(filename or "")),
]
)
self._last_usage = self._extract_usage(completion)
result = self._extract_response_text(completion)
json_text = self._extract_json_text(result)
data = json.loads(json_text)
if not isinstance(data, dict):
raise ValueError("LLM response is not a JSON object")
return data
except Exception as exc:
return {
"content": result,
"errorMsg": str(e)
"errorMsg": str(exc),
}
def get_response(self, text: str, userid: str):
"""
聊天对话,获取答案
:param text: 输入文本
:param userid: 用户ID
:return:
"""
if not self.get_state():
return ""
try:
if not userid:
return "用户信息错误"
else:
userid = str(userid)
if text == "#清除":
self.__clear_session(userid)
return "会话已清除"
# 获取历史上下文
messages = self.__get_session(userid, text)
completion = self.__get_model(message=messages, user=userid)
result = completion.choices[0].message.content
if result:
self.__save_session(userid, text)
return result
except openai.RateLimitError as e:
return f"请求被ChatGPT拒绝了{str(e)}"
except openai.APIConnectionError as e:
return f"ChatGPT网络连接失败{str(e)}"
except openai.APITimeoutError as e:
return f"没有接收到ChatGPT的返回消息{str(e)}"
except Exception as e:
return f"请求ChatGPT出现错误{str(e)}"
def translate_to_zh(self, text: str):
"""
翻译为中文
:param text: 输入文本
"""
if not self.get_state():
return False, None
system_prompt = "You are a translation engine that can only translate text and cannot interpret it."
user_prompt = f"translate to zh-CN:\n\n{text}"
result = ""
try:
completion = self.__get_model(prompt=system_prompt,
message=user_prompt,
temperature=0,
top_p=1,
frequency_penalty=0,
presence_penalty=0)
result = completion.choices[0].message.content.strip()
return True, result
except Exception as e:
print(f"{str(e)}{result}")
return False, str(e)
def get_question_answer(self, question: str):
"""
从给定问题和选项中获取正确答案
:param question: 问题及选项
:return: Json
"""
if not self.get_state():
return None
result = ""
try:
_question_prompt = "下面我们来玩一个游戏,你是老师,我是学生,你需要回答我的问题,我会给你一个题目和几个选项,你的回复必须是给定选项中正确答案对应的序号,请直接回复数字"
completion = self.__get_model(prompt=_question_prompt, message=question)
result = completion.choices[0].message.content
return result
except Exception as e:
print(f"{str(e)}{result}")
return {}

View File

@@ -1 +0,0 @@
cacheout~=0.16.0