ChatGPT 适配MoviePilot V2 版本,采用链式事件机制

This commit is contained in:
jxxghp
2024-10-10 19:07:59 +08:00
parent 27b2adef0a
commit 44673b6bf8
4 changed files with 480 additions and 2 deletions

View File

@@ -256,8 +256,7 @@
"version": "1.3",
"icon": "Chatgpt_A.png",
"author": "jxxghp",
"level": 1,
"v2": true
"level": 1
},
"NAStoolSync": {
"name": "历史记录同步",

View File

@@ -58,5 +58,17 @@
"history": {
"v1.3": "MoviePilot V2 版本媒体库服务器刷新插件"
}
},
"ChatGPT": {
"name": "ChatGPT",
"description": "消息交互支持与ChatGPT对话。",
"labels": "消息通知,识别",
"version": "2.0",
"icon": "Chatgpt_A.png",
"author": "jxxghp",
"level": 1,
"history": {
"v2.0": "适配MoviePilot V2 版本,采用链式事件机制"
}
}
}

View File

@@ -0,0 +1,261 @@
from typing import Any, List, Dict, Tuple
from app.core.config import settings
from app.core.event import eventmanager, Event
from app.log import logger
from app.plugins import _PluginBase
from app.plugins.chatgpt.openai import OpenAi
from app.schemas.types import EventType, ChainEventType
class ChatGPT(_PluginBase):
# 插件名称
plugin_name = "ChatGPT"
# 插件描述
plugin_desc = "消息交互支持与ChatGPT对话。"
# 插件图标
plugin_icon = "Chatgpt_A.png"
# 插件版本
plugin_version = "2.0"
# 插件作者
plugin_author = "jxxghp"
# 作者主页
author_url = "https://github.com/jxxghp"
# 插件配置项ID前缀
plugin_config_prefix = "chatgpt_"
# 加载顺序
plugin_order = 15
# 可使用的用户级别
auth_level = 1
# 私有属性
openai = None
_enabled = False
_proxy = False
_recognize = False
_openai_url = None
_openai_key = None
_model = None
def init_plugin(self, config: dict = None):
if config:
self._enabled = config.get("enabled")
self._proxy = config.get("proxy")
self._recognize = config.get("recognize")
self._openai_url = config.get("openai_url")
self._openai_key = config.get("openai_key")
self._model = config.get("model")
if self._openai_url and self._openai_key:
self.openai = OpenAi(api_key=self._openai_key, api_url=self._openai_url,
proxy=settings.PROXY if self._proxy else None,
model=self._model)
def get_state(self) -> bool:
return self._enabled
@staticmethod
def get_command() -> List[Dict[str, Any]]:
pass
def get_api(self) -> List[Dict[str, Any]]:
pass
def get_form(self) -> Tuple[List[dict], Dict[str, Any]]:
"""
拼装插件配置页面需要返回两块数据1、页面配置2、数据结构
"""
return [
{
'component': 'VForm',
'content': [
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 4
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'enabled',
'label': '启用插件',
}
}
]
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 4
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'proxy',
'label': '使用代理服务器',
}
}
]
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 4
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'recognize',
'label': '辅助识别',
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 4
},
'content': [
{
'component': 'VTextField',
'props': {
'model': 'openai_url',
'label': 'OpenAI API Url',
'placeholder': 'https://api.openai.com',
}
}
]
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 4
},
'content': [
{
'component': 'VTextField',
'props': {
'model': 'openai_key',
'label': 'sk-xxx'
}
}
]
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 4
},
'content': [
{
'component': 'VTextField',
'props': {
'model': 'model',
'label': '自定义模型',
'placeholder': 'gpt-3.5-turbo',
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VAlert',
'props': {
'type': 'info',
'variant': 'tonal',
'text': '开启插件后,消息交互时使用请[问帮你]开头或者以号结尾或者超过10个汉字/单词则会触发ChatGPT回复。'
'开启辅助识别后,内置识别功能无法正常识别种子/文件名称时将使用ChatGTP进行AI辅助识别可以提升动漫等非规范命名的识别成功率。'
}
}
]
}
]
}
]
}
], {
"enabled": False,
"proxy": False,
"recognize": False,
"openai_url": "https://api.openai.com",
"openai_key": "",
"model": "gpt-3.5-turbo"
}
def get_page(self) -> List[dict]:
pass
@eventmanager.register(EventType.UserMessage)
def talk(self, event: Event):
"""
监听用户消息获取ChatGPT回复
"""
if not self._enabled:
return
if not self.openai:
return
text = event.event_data.get("text")
userid = event.event_data.get("userid")
channel = event.event_data.get("channel")
if not text:
return
response = self.openai.get_response(text=text, userid=userid)
if response:
self.post_message(channel=channel, title=response, userid=userid)
@eventmanager.register(ChainEventType.NameRecognize)
def recognize(self, event: Event):
"""
监听识别事件使用ChatGPT辅助识别名称
"""
if not event.event_data:
return
title = event.event_data.get("title")
if not title:
return
# 调用ChatGPT
response = self.openai.get_media_name(filename=title)
logger.info(f"ChatGPT返回结果{response}")
if response:
event.event_data = {
'title': title,
'name': response.get("title"),
'year': response.get("year"),
'season': response.get("season"),
'episode': response.get("episode")
}
else:
event.event_data = {}
def stop_service(self):
"""
退出插件
"""
pass

View File

@@ -0,0 +1,206 @@
import json
import time
from typing import List, Union
import openai
from cacheout import Cache
OpenAISessionCache = Cache(maxsize=100, ttl=3600, timer=time.time, default=None)
class OpenAi:
_api_key: str = None
_api_url: str = None
_model: str = "gpt-3.5-turbo"
def __init__(self, api_key: str = None, api_url: str = None, proxy: dict = None, model: str = None):
self._api_key = api_key
self._api_url = api_url
openai.api_base = self._api_url + "/v1"
openai.api_key = self._api_key
if proxy and proxy.get("https"):
openai.proxy = proxy.get("https")
if model:
self._model = model
def get_state(self) -> bool:
return True if self._api_key else False
@staticmethod
def __save_session(session_id: str, message: str):
"""
保存会话
:param session_id: 会话ID
:param message: 消息
:return:
"""
seasion = OpenAISessionCache.get(session_id)
if seasion:
seasion.append({
"role": "assistant",
"content": message
})
OpenAISessionCache.set(session_id, seasion)
@staticmethod
def __get_session(session_id: str, message: str) -> List[dict]:
"""
获取会话
:param session_id: 会话ID
:return: 会话上下文
"""
seasion = OpenAISessionCache.get(session_id)
if seasion:
seasion.append({
"role": "user",
"content": message
})
else:
seasion = [
{
"role": "system",
"content": "请在接下来的对话中请使用中文回复,并且内容尽可能详细。"
},
{
"role": "user",
"content": message
}]
OpenAISessionCache.set(session_id, seasion)
return seasion
def __get_model(self, message: Union[str, List[dict]],
prompt: str = None,
user: str = "MoviePilot",
**kwargs):
"""
获取模型
"""
if not isinstance(message, list):
if prompt:
message = [
{
"role": "system",
"content": prompt
},
{
"role": "user",
"content": message
}
]
else:
message = [
{
"role": "user",
"content": message
}
]
return openai.ChatCompletion.create(
model=self._model,
user=user,
messages=message,
**kwargs
)
@staticmethod
def __clear_session(session_id: str):
"""
清除会话
:param session_id: 会话ID
:return:
"""
if OpenAISessionCache.get(session_id):
OpenAISessionCache.delete(session_id)
def get_media_name(self, filename: str):
"""
从文件名中提取媒体名称等要素
:param filename: 文件名
:return: Json
"""
if not self.get_state():
return None
result = ""
try:
_filename_prompt = "I will give you a movie/tvshow file name.You need to return a Json." \
"\nPay attention to the correct identification of the film name." \
"\n{\"title\":string,\"version\":string,\"part\":string,\"year\":string,\"resolution\":string,\"season\":number|null,\"episode\":number|null}"
completion = self.__get_model(prompt=_filename_prompt, message=filename)
result = completion.choices[0].message.content
return json.loads(result)
except Exception as e:
print(f"{str(e)}{result}")
return {}
def get_response(self, text: str, userid: str):
"""
聊天对话,获取答案
:param text: 输入文本
:param userid: 用户ID
:return:
"""
if not self.get_state():
return ""
try:
if not userid:
return "用户信息错误"
else:
userid = str(userid)
if text == "#清除":
self.__clear_session(userid)
return "会话已清除"
# 获取历史上下文
messages = self.__get_session(userid, text)
completion = self.__get_model(message=messages, user=userid)
result = completion.choices[0].message.content
if result:
self.__save_session(userid, text)
return result
except openai.error.RateLimitError as e:
return f"请求被ChatGPT拒绝了{str(e)}"
except openai.error.APIConnectionError as e:
return f"ChatGPT网络连接失败{str(e)}"
except openai.error.Timeout as e:
return f"没有接收到ChatGPT的返回消息{str(e)}"
except Exception as e:
return f"请求ChatGPT出现错误{str(e)}"
def translate_to_zh(self, text: str):
"""
翻译为中文
:param text: 输入文本
"""
if not self.get_state():
return False, None
system_prompt = "You are a translation engine that can only translate text and cannot interpret it."
user_prompt = f"translate to zh-CN:\n\n{text}"
result = ""
try:
completion = self.__get_model(prompt=system_prompt,
message=user_prompt,
temperature=0,
top_p=1,
frequency_penalty=0,
presence_penalty=0)
result = completion.choices[0].message.content.strip()
return True, result
except Exception as e:
print(f"{str(e)}{result}")
return False, str(e)
def get_question_answer(self, question: str):
"""
从给定问题和选项中获取正确答案
:param question: 问题及选项
:return: Json
"""
if not self.get_state():
return None
result = ""
try:
_question_prompt = "下面我们来玩一个游戏,你是老师,我是学生,你需要回答我的问题,我会给你一个题目和几个选项,你的回复必须是给定选项中正确答案对应的序号,请直接回复数字"
completion = self.__get_model(prompt=_question_prompt, message=question)
result = completion.choices[0].message.content
return result
except Exception as e:
print(f"{str(e)}{result}")
return {}