diff --git a/package.json b/package.json index 910c2d9..c06de77 100644 --- a/package.json +++ b/package.json @@ -256,8 +256,7 @@ "version": "1.3", "icon": "Chatgpt_A.png", "author": "jxxghp", - "level": 1, - "v2": true + "level": 1 }, "NAStoolSync": { "name": "历史记录同步", diff --git a/package.v2.json b/package.v2.json index 0c27974..1dd5632 100644 --- a/package.v2.json +++ b/package.v2.json @@ -58,5 +58,17 @@ "history": { "v1.3": "MoviePilot V2 版本媒体库服务器刷新插件" } + }, + "ChatGPT": { + "name": "ChatGPT", + "description": "消息交互支持与ChatGPT对话。", + "labels": "消息通知,识别", + "version": "2.0", + "icon": "Chatgpt_A.png", + "author": "jxxghp", + "level": 1, + "history": { + "v2.0": "适配MoviePilot V2 版本,采用链式事件机制" + } } } \ No newline at end of file diff --git a/plugins.v2/chatgpt/__init__.py b/plugins.v2/chatgpt/__init__.py new file mode 100644 index 0000000..3ba05b6 --- /dev/null +++ b/plugins.v2/chatgpt/__init__.py @@ -0,0 +1,261 @@ +from typing import Any, List, Dict, Tuple + +from app.core.config import settings +from app.core.event import eventmanager, Event +from app.log import logger +from app.plugins import _PluginBase +from app.plugins.chatgpt.openai import OpenAi +from app.schemas.types import EventType, ChainEventType + + +class ChatGPT(_PluginBase): + # 插件名称 + plugin_name = "ChatGPT" + # 插件描述 + plugin_desc = "消息交互支持与ChatGPT对话。" + # 插件图标 + plugin_icon = "Chatgpt_A.png" + # 插件版本 + plugin_version = "2.0" + # 插件作者 + plugin_author = "jxxghp" + # 作者主页 + author_url = "https://github.com/jxxghp" + # 插件配置项ID前缀 + plugin_config_prefix = "chatgpt_" + # 加载顺序 + plugin_order = 15 + # 可使用的用户级别 + auth_level = 1 + + # 私有属性 + openai = None + _enabled = False + _proxy = False + _recognize = False + _openai_url = None + _openai_key = None + _model = None + + def init_plugin(self, config: dict = None): + if config: + self._enabled = config.get("enabled") + self._proxy = config.get("proxy") + self._recognize = config.get("recognize") + self._openai_url = config.get("openai_url") + self._openai_key = config.get("openai_key") + self._model = config.get("model") + if self._openai_url and self._openai_key: + self.openai = OpenAi(api_key=self._openai_key, api_url=self._openai_url, + proxy=settings.PROXY if self._proxy else None, + model=self._model) + + def get_state(self) -> bool: + return self._enabled + + @staticmethod + def get_command() -> List[Dict[str, Any]]: + pass + + def get_api(self) -> List[Dict[str, Any]]: + pass + + def get_form(self) -> Tuple[List[dict], Dict[str, Any]]: + """ + 拼装插件配置页面,需要返回两块数据:1、页面配置;2、数据结构 + """ + return [ + { + 'component': 'VForm', + 'content': [ + { + 'component': 'VRow', + 'content': [ + { + 'component': 'VCol', + 'props': { + 'cols': 12, + 'md': 4 + }, + 'content': [ + { + 'component': 'VSwitch', + 'props': { + 'model': 'enabled', + 'label': '启用插件', + } + } + ] + }, + { + 'component': 'VCol', + 'props': { + 'cols': 12, + 'md': 4 + }, + 'content': [ + { + 'component': 'VSwitch', + 'props': { + 'model': 'proxy', + 'label': '使用代理服务器', + } + } + ] + }, + { + 'component': 'VCol', + 'props': { + 'cols': 12, + 'md': 4 + }, + 'content': [ + { + 'component': 'VSwitch', + 'props': { + 'model': 'recognize', + 'label': '辅助识别', + } + } + ] + } + ] + }, + { + 'component': 'VRow', + 'content': [ + { + 'component': 'VCol', + 'props': { + 'cols': 12, + 'md': 4 + }, + 'content': [ + { + 'component': 'VTextField', + 'props': { + 'model': 'openai_url', + 'label': 'OpenAI API Url', + 'placeholder': 'https://api.openai.com', + } + } + ] + }, + { + 'component': 'VCol', + 'props': { + 'cols': 12, + 'md': 4 + }, + 'content': [ + { + 'component': 'VTextField', + 'props': { + 'model': 'openai_key', + 'label': 'sk-xxx' + } + } + ] + }, + { + 'component': 'VCol', + 'props': { + 'cols': 12, + 'md': 4 + }, + 'content': [ + { + 'component': 'VTextField', + 'props': { + 'model': 'model', + 'label': '自定义模型', + 'placeholder': 'gpt-3.5-turbo', + } + } + ] + } + ] + }, + { + 'component': 'VRow', + 'content': [ + { + 'component': 'VCol', + 'props': { + 'cols': 12, + }, + 'content': [ + { + 'component': 'VAlert', + 'props': { + 'type': 'info', + 'variant': 'tonal', + 'text': '开启插件后,消息交互时使用请[问帮你]开头,或者以?号结尾,或者超过10个汉字/单词,则会触发ChatGPT回复。' + '开启辅助识别后,内置识别功能无法正常识别种子/文件名称时,将使用ChatGTP进行AI辅助识别,可以提升动漫等非规范命名的识别成功率。' + } + } + ] + } + ] + } + ] + } + ], { + "enabled": False, + "proxy": False, + "recognize": False, + "openai_url": "https://api.openai.com", + "openai_key": "", + "model": "gpt-3.5-turbo" + } + + def get_page(self) -> List[dict]: + pass + + @eventmanager.register(EventType.UserMessage) + def talk(self, event: Event): + """ + 监听用户消息,获取ChatGPT回复 + """ + if not self._enabled: + return + if not self.openai: + return + text = event.event_data.get("text") + userid = event.event_data.get("userid") + channel = event.event_data.get("channel") + if not text: + return + response = self.openai.get_response(text=text, userid=userid) + if response: + self.post_message(channel=channel, title=response, userid=userid) + + @eventmanager.register(ChainEventType.NameRecognize) + def recognize(self, event: Event): + """ + 监听识别事件,使用ChatGPT辅助识别名称 + """ + if not event.event_data: + return + title = event.event_data.get("title") + if not title: + return + # 调用ChatGPT + response = self.openai.get_media_name(filename=title) + logger.info(f"ChatGPT返回结果:{response}") + if response: + event.event_data = { + 'title': title, + 'name': response.get("title"), + 'year': response.get("year"), + 'season': response.get("season"), + 'episode': response.get("episode") + } + else: + event.event_data = {} + + def stop_service(self): + """ + 退出插件 + """ + pass diff --git a/plugins.v2/chatgpt/openai.py b/plugins.v2/chatgpt/openai.py new file mode 100644 index 0000000..937ecea --- /dev/null +++ b/plugins.v2/chatgpt/openai.py @@ -0,0 +1,206 @@ +import json +import time +from typing import List, Union + +import openai +from cacheout import Cache + +OpenAISessionCache = Cache(maxsize=100, ttl=3600, timer=time.time, default=None) + + +class OpenAi: + _api_key: str = None + _api_url: str = None + _model: str = "gpt-3.5-turbo" + + def __init__(self, api_key: str = None, api_url: str = None, proxy: dict = None, model: str = None): + self._api_key = api_key + self._api_url = api_url + openai.api_base = self._api_url + "/v1" + openai.api_key = self._api_key + if proxy and proxy.get("https"): + openai.proxy = proxy.get("https") + if model: + self._model = model + + def get_state(self) -> bool: + return True if self._api_key else False + + @staticmethod + def __save_session(session_id: str, message: str): + """ + 保存会话 + :param session_id: 会话ID + :param message: 消息 + :return: + """ + seasion = OpenAISessionCache.get(session_id) + if seasion: + seasion.append({ + "role": "assistant", + "content": message + }) + OpenAISessionCache.set(session_id, seasion) + + @staticmethod + def __get_session(session_id: str, message: str) -> List[dict]: + """ + 获取会话 + :param session_id: 会话ID + :return: 会话上下文 + """ + seasion = OpenAISessionCache.get(session_id) + if seasion: + seasion.append({ + "role": "user", + "content": message + }) + else: + seasion = [ + { + "role": "system", + "content": "请在接下来的对话中请使用中文回复,并且内容尽可能详细。" + }, + { + "role": "user", + "content": message + }] + OpenAISessionCache.set(session_id, seasion) + return seasion + + def __get_model(self, message: Union[str, List[dict]], + prompt: str = None, + user: str = "MoviePilot", + **kwargs): + """ + 获取模型 + """ + if not isinstance(message, list): + if prompt: + message = [ + { + "role": "system", + "content": prompt + }, + { + "role": "user", + "content": message + } + ] + else: + message = [ + { + "role": "user", + "content": message + } + ] + return openai.ChatCompletion.create( + model=self._model, + user=user, + messages=message, + **kwargs + ) + + @staticmethod + def __clear_session(session_id: str): + """ + 清除会话 + :param session_id: 会话ID + :return: + """ + if OpenAISessionCache.get(session_id): + OpenAISessionCache.delete(session_id) + + def get_media_name(self, filename: str): + """ + 从文件名中提取媒体名称等要素 + :param filename: 文件名 + :return: Json + """ + if not self.get_state(): + return None + result = "" + try: + _filename_prompt = "I will give you a movie/tvshow file name.You need to return a Json." \ + "\nPay attention to the correct identification of the film name." \ + "\n{\"title\":string,\"version\":string,\"part\":string,\"year\":string,\"resolution\":string,\"season\":number|null,\"episode\":number|null}" + completion = self.__get_model(prompt=_filename_prompt, message=filename) + result = completion.choices[0].message.content + return json.loads(result) + except Exception as e: + print(f"{str(e)}:{result}") + return {} + + def get_response(self, text: str, userid: str): + """ + 聊天对话,获取答案 + :param text: 输入文本 + :param userid: 用户ID + :return: + """ + if not self.get_state(): + return "" + try: + if not userid: + return "用户信息错误" + else: + userid = str(userid) + if text == "#清除": + self.__clear_session(userid) + return "会话已清除" + # 获取历史上下文 + messages = self.__get_session(userid, text) + completion = self.__get_model(message=messages, user=userid) + result = completion.choices[0].message.content + if result: + self.__save_session(userid, text) + return result + except openai.error.RateLimitError as e: + return f"请求被ChatGPT拒绝了,{str(e)}" + except openai.error.APIConnectionError as e: + return f"ChatGPT网络连接失败:{str(e)}" + except openai.error.Timeout as e: + return f"没有接收到ChatGPT的返回消息:{str(e)}" + except Exception as e: + return f"请求ChatGPT出现错误:{str(e)}" + + def translate_to_zh(self, text: str): + """ + 翻译为中文 + :param text: 输入文本 + """ + if not self.get_state(): + return False, None + system_prompt = "You are a translation engine that can only translate text and cannot interpret it." + user_prompt = f"translate to zh-CN:\n\n{text}" + result = "" + try: + completion = self.__get_model(prompt=system_prompt, + message=user_prompt, + temperature=0, + top_p=1, + frequency_penalty=0, + presence_penalty=0) + result = completion.choices[0].message.content.strip() + return True, result + except Exception as e: + print(f"{str(e)}:{result}") + return False, str(e) + + def get_question_answer(self, question: str): + """ + 从给定问题和选项中获取正确答案 + :param question: 问题及选项 + :return: Json + """ + if not self.get_state(): + return None + result = "" + try: + _question_prompt = "下面我们来玩一个游戏,你是老师,我是学生,你需要回答我的问题,我会给你一个题目和几个选项,你的回复必须是给定选项中正确答案对应的序号,请直接回复数字" + completion = self.__get_model(prompt=_question_prompt, message=question) + result = completion.choices[0].message.content + return result + except Exception as e: + print(f"{str(e)}:{result}") + return {}