This commit is contained in:
InfinityPacer
2024-10-10 22:17:13 +08:00
7 changed files with 764 additions and 7 deletions

BIN
icons/Dingding_A.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 16 KiB

View File

@@ -256,8 +256,7 @@
"version": "1.3",
"icon": "Chatgpt_A.png",
"author": "jxxghp",
"level": 1,
"v2": true
"level": 1
},
"NAStoolSync": {
"name": "历史记录同步",
@@ -840,5 +839,15 @@
"v1.1": "支持将极影视评分修改为豆瓣评分",
"v1.0": "同步极影视在看/已看状态到豆瓣"
}
},
"DingdingMsg": {
"name": "钉钉机器人",
"description": "支持使用钉钉机器人发送消息通知。",
"labels": "消息通知,钉钉机器人",
"version": "1.12",
"icon": "Dingding_A.png",
"author": "nnlegenda",
"level": 1,
"v2": true
}
}

View File

@@ -3,12 +3,12 @@
"name": "站点数据统计",
"description": "站点统计数据图表。",
"labels": "站点,仪表板",
"version": "1.0.1",
"version": "1.0.2",
"icon": "statistic.png",
"author": "lightolly,jxxghp",
"level": 2,
"history": {
"v1.0.1": "MoviePilot V2 版本站点数据统计插件"
"v1.0.2": "MoviePilot V2 版本站点数据统计插件"
}
},
"BrushFlow": {
@@ -58,5 +58,17 @@
"history": {
"v1.3": "MoviePilot V2 版本媒体库服务器刷新插件"
}
},
"ChatGPT": {
"name": "ChatGPT",
"description": "消息交互支持与ChatGPT对话。",
"labels": "消息通知,识别",
"version": "2.0",
"icon": "Chatgpt_A.png",
"author": "jxxghp",
"level": 1,
"history": {
"v2.0": "适配MoviePilot V2 版本,采用链式事件机制"
}
}
}

View File

@@ -0,0 +1,261 @@
from typing import Any, List, Dict, Tuple
from app.core.config import settings
from app.core.event import eventmanager, Event
from app.log import logger
from app.plugins import _PluginBase
from app.plugins.chatgpt.openai import OpenAi
from app.schemas.types import EventType, ChainEventType
class ChatGPT(_PluginBase):
# 插件名称
plugin_name = "ChatGPT"
# 插件描述
plugin_desc = "消息交互支持与ChatGPT对话。"
# 插件图标
plugin_icon = "Chatgpt_A.png"
# 插件版本
plugin_version = "2.0"
# 插件作者
plugin_author = "jxxghp"
# 作者主页
author_url = "https://github.com/jxxghp"
# 插件配置项ID前缀
plugin_config_prefix = "chatgpt_"
# 加载顺序
plugin_order = 15
# 可使用的用户级别
auth_level = 1
# 私有属性
openai = None
_enabled = False
_proxy = False
_recognize = False
_openai_url = None
_openai_key = None
_model = None
def init_plugin(self, config: dict = None):
if config:
self._enabled = config.get("enabled")
self._proxy = config.get("proxy")
self._recognize = config.get("recognize")
self._openai_url = config.get("openai_url")
self._openai_key = config.get("openai_key")
self._model = config.get("model")
if self._openai_url and self._openai_key:
self.openai = OpenAi(api_key=self._openai_key, api_url=self._openai_url,
proxy=settings.PROXY if self._proxy else None,
model=self._model)
def get_state(self) -> bool:
return self._enabled
@staticmethod
def get_command() -> List[Dict[str, Any]]:
pass
def get_api(self) -> List[Dict[str, Any]]:
pass
def get_form(self) -> Tuple[List[dict], Dict[str, Any]]:
"""
拼装插件配置页面需要返回两块数据1、页面配置2、数据结构
"""
return [
{
'component': 'VForm',
'content': [
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 4
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'enabled',
'label': '启用插件',
}
}
]
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 4
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'proxy',
'label': '使用代理服务器',
}
}
]
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 4
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'recognize',
'label': '辅助识别',
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 4
},
'content': [
{
'component': 'VTextField',
'props': {
'model': 'openai_url',
'label': 'OpenAI API Url',
'placeholder': 'https://api.openai.com',
}
}
]
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 4
},
'content': [
{
'component': 'VTextField',
'props': {
'model': 'openai_key',
'label': 'sk-xxx'
}
}
]
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 4
},
'content': [
{
'component': 'VTextField',
'props': {
'model': 'model',
'label': '自定义模型',
'placeholder': 'gpt-3.5-turbo',
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VAlert',
'props': {
'type': 'info',
'variant': 'tonal',
'text': '开启插件后,消息交互时使用请[问帮你]开头或者以号结尾或者超过10个汉字/单词则会触发ChatGPT回复。'
'开启辅助识别后,内置识别功能无法正常识别种子/文件名称时将使用ChatGTP进行AI辅助识别可以提升动漫等非规范命名的识别成功率。'
}
}
]
}
]
}
]
}
], {
"enabled": False,
"proxy": False,
"recognize": False,
"openai_url": "https://api.openai.com",
"openai_key": "",
"model": "gpt-3.5-turbo"
}
def get_page(self) -> List[dict]:
pass
@eventmanager.register(EventType.UserMessage)
def talk(self, event: Event):
"""
监听用户消息获取ChatGPT回复
"""
if not self._enabled:
return
if not self.openai:
return
text = event.event_data.get("text")
userid = event.event_data.get("userid")
channel = event.event_data.get("channel")
if not text:
return
response = self.openai.get_response(text=text, userid=userid)
if response:
self.post_message(channel=channel, title=response, userid=userid)
@eventmanager.register(ChainEventType.NameRecognize)
def recognize(self, event: Event):
"""
监听识别事件使用ChatGPT辅助识别名称
"""
if not event.event_data:
return
title = event.event_data.get("title")
if not title:
return
# 调用ChatGPT
response = self.openai.get_media_name(filename=title)
logger.info(f"ChatGPT返回结果{response}")
if response:
event.event_data = {
'title': title,
'name': response.get("title"),
'year': response.get("year"),
'season': response.get("season"),
'episode': response.get("episode")
}
else:
event.event_data = {}
def stop_service(self):
"""
退出插件
"""
pass

View File

@@ -0,0 +1,206 @@
import json
import time
from typing import List, Union
import openai
from cacheout import Cache
OpenAISessionCache = Cache(maxsize=100, ttl=3600, timer=time.time, default=None)
class OpenAi:
_api_key: str = None
_api_url: str = None
_model: str = "gpt-3.5-turbo"
def __init__(self, api_key: str = None, api_url: str = None, proxy: dict = None, model: str = None):
self._api_key = api_key
self._api_url = api_url
openai.api_base = self._api_url + "/v1"
openai.api_key = self._api_key
if proxy and proxy.get("https"):
openai.proxy = proxy.get("https")
if model:
self._model = model
def get_state(self) -> bool:
return True if self._api_key else False
@staticmethod
def __save_session(session_id: str, message: str):
"""
保存会话
:param session_id: 会话ID
:param message: 消息
:return:
"""
seasion = OpenAISessionCache.get(session_id)
if seasion:
seasion.append({
"role": "assistant",
"content": message
})
OpenAISessionCache.set(session_id, seasion)
@staticmethod
def __get_session(session_id: str, message: str) -> List[dict]:
"""
获取会话
:param session_id: 会话ID
:return: 会话上下文
"""
seasion = OpenAISessionCache.get(session_id)
if seasion:
seasion.append({
"role": "user",
"content": message
})
else:
seasion = [
{
"role": "system",
"content": "请在接下来的对话中请使用中文回复,并且内容尽可能详细。"
},
{
"role": "user",
"content": message
}]
OpenAISessionCache.set(session_id, seasion)
return seasion
def __get_model(self, message: Union[str, List[dict]],
prompt: str = None,
user: str = "MoviePilot",
**kwargs):
"""
获取模型
"""
if not isinstance(message, list):
if prompt:
message = [
{
"role": "system",
"content": prompt
},
{
"role": "user",
"content": message
}
]
else:
message = [
{
"role": "user",
"content": message
}
]
return openai.ChatCompletion.create(
model=self._model,
user=user,
messages=message,
**kwargs
)
@staticmethod
def __clear_session(session_id: str):
"""
清除会话
:param session_id: 会话ID
:return:
"""
if OpenAISessionCache.get(session_id):
OpenAISessionCache.delete(session_id)
def get_media_name(self, filename: str):
"""
从文件名中提取媒体名称等要素
:param filename: 文件名
:return: Json
"""
if not self.get_state():
return None
result = ""
try:
_filename_prompt = "I will give you a movie/tvshow file name.You need to return a Json." \
"\nPay attention to the correct identification of the film name." \
"\n{\"title\":string,\"version\":string,\"part\":string,\"year\":string,\"resolution\":string,\"season\":number|null,\"episode\":number|null}"
completion = self.__get_model(prompt=_filename_prompt, message=filename)
result = completion.choices[0].message.content
return json.loads(result)
except Exception as e:
print(f"{str(e)}{result}")
return {}
def get_response(self, text: str, userid: str):
"""
聊天对话,获取答案
:param text: 输入文本
:param userid: 用户ID
:return:
"""
if not self.get_state():
return ""
try:
if not userid:
return "用户信息错误"
else:
userid = str(userid)
if text == "#清除":
self.__clear_session(userid)
return "会话已清除"
# 获取历史上下文
messages = self.__get_session(userid, text)
completion = self.__get_model(message=messages, user=userid)
result = completion.choices[0].message.content
if result:
self.__save_session(userid, text)
return result
except openai.error.RateLimitError as e:
return f"请求被ChatGPT拒绝了{str(e)}"
except openai.error.APIConnectionError as e:
return f"ChatGPT网络连接失败{str(e)}"
except openai.error.Timeout as e:
return f"没有接收到ChatGPT的返回消息{str(e)}"
except Exception as e:
return f"请求ChatGPT出现错误{str(e)}"
def translate_to_zh(self, text: str):
"""
翻译为中文
:param text: 输入文本
"""
if not self.get_state():
return False, None
system_prompt = "You are a translation engine that can only translate text and cannot interpret it."
user_prompt = f"translate to zh-CN:\n\n{text}"
result = ""
try:
completion = self.__get_model(prompt=system_prompt,
message=user_prompt,
temperature=0,
top_p=1,
frequency_penalty=0,
presence_penalty=0)
result = completion.choices[0].message.content.strip()
return True, result
except Exception as e:
print(f"{str(e)}{result}")
return False, str(e)
def get_question_answer(self, question: str):
"""
从给定问题和选项中获取正确答案
:param question: 问题及选项
:return: Json
"""
if not self.get_state():
return None
result = ""
try:
_question_prompt = "下面我们来玩一个游戏,你是老师,我是学生,你需要回答我的问题,我会给你一个题目和几个选项,你的回复必须是给定选项中正确答案对应的序号,请直接回复数字"
completion = self.__get_model(prompt=_question_prompt, message=question)
result = completion.choices[0].message.content
return result
except Exception as e:
print(f"{str(e)}{result}")
return {}

View File

@@ -32,7 +32,7 @@ class SiteStatistic(_PluginBase):
# 插件图标
plugin_icon = "statistic.png"
# 插件版本
plugin_version = "1.0.1"
plugin_version = "1.0.2"
# 插件作者
plugin_author = "lightolly,jxxghp"
# 作者主页
@@ -206,8 +206,8 @@ class SiteStatistic(_PluginBase):
data_list: List[SiteUserData] = self.siteoper.get_userdata()
if not data_list:
return "", [], []
# 每个日期只保留最后一条数据
data_list = list({data.updated_day: data for data in data_list}.values())
# 每个日期、每个站点只保留最后一条数据
data_list = list({f"{data.updated_day}_{data.name}": data for data in data_list}.values())
# 按日期倒序排序
data_list.sort(key=lambda x: x.updated_day, reverse=True)
# 今天的日期

View File

@@ -0,0 +1,269 @@
import re
import time
import hmac
import hashlib
import base64
import urllib.parse
from app.plugins import _PluginBase
from app.core.event import eventmanager, Event
from app.schemas.types import EventType, NotificationType
from app.utils.http import RequestUtils
from typing import Any, List, Dict, Tuple
from app.log import logger
class DingdingMsg(_PluginBase):
# 插件名称
plugin_name = "钉钉机器人"
# 插件描述
plugin_desc = "支持使用钉钉机器人发送消息通知。"
# 插件图标
plugin_icon = "Dingding_A.png"
# 插件版本
plugin_version = "1.12"
# 插件作者
plugin_author = "nnlegenda"
# 作者主页
author_url = "https://github.com/nnlegenda"
# 插件配置项ID前缀
plugin_config_prefix = "dingdingmsg_"
# 加载顺序
plugin_order = 25
# 可使用的用户级别
auth_level = 1
# 私有属性
_enabled = False
_token = None
_secret = None
_msgtypes = []
def init_plugin(self, config: dict = None):
if config:
self._enabled = config.get("enabled")
self._token = config.get("token")
self._secret = config.get("secret")
self._msgtypes = config.get("msgtypes") or []
def get_state(self) -> bool:
return self._enabled and (True if self._token else False) and (True if self._secret else False)
@staticmethod
def get_command() -> List[Dict[str, Any]]:
pass
def get_api(self) -> List[Dict[str, Any]]:
pass
def get_form(self) -> Tuple[List[dict], Dict[str, Any]]:
"""
拼装插件配置页面需要返回两块数据1、页面配置2、数据结构
"""
# 编历 NotificationType 枚举,生成消息类型选项
MsgTypeOptions = []
for item in NotificationType:
MsgTypeOptions.append({
"title": item.value,
"value": item.name
})
return [
{
'component': 'VForm',
'content': [
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 6
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'enabled',
'label': '启用插件',
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12
},
'content': [
{
'component': 'VTextField',
'props': {
'model': 'token',
'label': '钉钉机器人token',
'placeholder': 'xxxxxx',
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12
},
'content': [
{
'component': 'VTextField',
'props': {
'model': 'secret',
'label': '加签',
'placeholder': 'SECxxx',
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12
},
'content': [
{
'component': 'VSelect',
'props': {
'multiple': True,
'chips': True,
'model': 'msgtypes',
'label': '消息类型',
'items': MsgTypeOptions
}
}
]
}
]
},
]
}
], {
"enabled": False,
'token': '',
'msgtypes': []
}
def get_page(self) -> List[dict]:
pass
@eventmanager.register(EventType.NoticeMessage)
def send(self, event: Event):
"""
消息发送事件
"""
if not self.get_state():
return
if not event.event_data:
return
msg_body = event.event_data
# 渠道
channel = msg_body.get("channel")
if channel:
return
# 类型
msg_type: NotificationType = msg_body.get("type")
# 标题
title = msg_body.get("title")
# 文本
text = msg_body.get("text")
# 封面
cover = msg_body.get("image")
if not title and not text:
logger.warn("标题和内容不能同时为空")
return
if (msg_type and self._msgtypes
and msg_type.name not in self._msgtypes):
logger.info(f"消息类型 {msg_type.value} 未开启消息发送")
return
sc_url = self.url_sign(self._token, self._secret)
try:
if text:
# 对text进行Markdown特殊字符转义
text = re.sub(r"([_`])", r"\\\1", text)
else:
text = ""
if cover:
data = {
"msgtype": "markdown",
"markdown": {
"title": title,
"text": "### %s\n\n"
"![Cover](%s)\n\n"
"> %s\n\n > MoviePilot %s\n" % (title, cover, text, msg_type.value)
}
}
else:
data = {
"msgtype": "markdown",
"markdown": {
"title": title,
"text": "### %s\n\n"
"> %s\n\n > MoviePilot %s\n" % (title, text, msg_type.value)
}
}
res = RequestUtils(content_type="application/json").post_res(sc_url, json=data)
if res and res.status_code == 200:
ret_json = res.json()
errno = ret_json.get('errcode')
error = ret_json.get('errmsg')
if errno == 0:
logger.info("钉钉机器人消息发送成功")
else:
logger.warn(f"钉钉机器人消息发送失败,错误码:{errno},错误原因:{error}")
elif res is not None:
logger.warn(f"钉钉机器人消息发送失败,错误码:{res.status_code},错误原因:{res.reason}")
else:
logger.warn("钉钉机器人消息发送失败,未获取到返回信息")
except Exception as msg_e:
logger.error(f"钉钉机器人消息发送失败,{str(msg_e)}")
def stop_service(self):
"""
退出插件
"""
pass
def url_sign(self, access_token: str, secret: str) -> str:
"""
加签
"""
# 生成时间戳和签名
timestamp = str(round(time.time() * 1000))
secret_enc = secret.encode('utf-8')
string_to_sign = '{}\n{}'.format(timestamp, secret)
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
# 组合请求的完整 URL
full_url = f'https://oapi.dingtalk.com/robot/send?access_token={access_token}&timestamp={timestamp}&sign={sign}'
return full_url