from app.plugins import _PluginBase from typing import Any, List, Dict, Tuple from app.log import logger from app.schemas import NotificationType from app import schemas class SynologyNotify(_PluginBase): # 插件名称 plugin_name = "群辉Webhook通知" # 插件描述 plugin_desc = "接收群辉webhook通知并推送。" # 插件图标 plugin_icon = "https://raw.githubusercontent.com/thsrite/MoviePilot-Plugins/main/icons/synology.png" # 插件版本 plugin_version = "1.3" # 插件作者 plugin_author = "thsrite" # 作者主页 author_url = "https://github.com/thsrite" # 插件配置项ID前缀 plugin_config_prefix = "synologynotify_" # 加载顺序 plugin_order = 30 # 可使用的用户级别 auth_level = 1 # 任务执行间隔 _enabled = False _notify = False _msgtype = None def init_plugin(self, config: dict = None): if config: self._enabled = config.get("enabled") self._notify = config.get("notify") self._msgtype = config.get("msgtype") def send_notify(self, text: str = None, title: str = None, content: str = None, url: str = None) -> schemas.Response: """ 发送通知 """ logger.info(f"收到webhook消息啦。。。 {text} {title} {content} {url}") if self._enabled and self._notify: mtype = NotificationType.Manual if self._msgtype: mtype = NotificationType.__getitem__(str(self._msgtype)) or NotificationType.Manual if text: self.post_message(title="群辉通知", mtype=mtype, text=text) else: self.post_message(title=title, mtype=mtype, text=f"{content}\n[查看详情]({url})") return schemas.Response( success=True, message="发送成功" ) def get_state(self) -> bool: return self._enabled @staticmethod def get_command() -> List[Dict[str, Any]]: pass def get_api(self) -> List[Dict[str, Any]]: """ 获取插件API [{ "path": "/xx", "endpoint": self.xxx, "methods": ["GET", "POST"], "summary": "API说明" }] """ return [ { "path": "/webhook", "endpoint": self.send_notify, "methods": ["GET"], "summary": "群辉webhook", "description": "接受群辉webhook通知并推送", } ] def get_form(self) -> Tuple[List[dict], Dict[str, Any]]: """ 拼装插件配置页面,需要返回两块数据:1、页面配置;2、数据结构 """ # 编历 NotificationType 枚举,生成消息类型选项 MsgTypeOptions = [] for item in NotificationType: MsgTypeOptions.append({ "title": item.value, "value": item.name }) return [ { 'component': 'VForm', 'content': [ { 'component': 'VRow', 'content': [ { 'component': 'VCol', 'props': { 'cols': 12, 'md': 6 }, 'content': [ { 'component': 'VSwitch', 'props': { 'model': 'enabled', 'label': '启用插件', } } ] }, { 'component': 'VCol', 'props': { 'cols': 12, 'md': 6 }, 'content': [ { 'component': 'VSwitch', 'props': { 'model': 'notify', 'label': '开启通知', } } ] }, ] }, { 'component': 'VRow', 'content': [ { 'component': 'VCol', 'props': { 'cols': 12 }, 'content': [ { 'component': 'VSelect', 'props': { 'multiple': False, 'chips': True, 'model': 'msgtype', 'label': '消息类型', 'items': MsgTypeOptions } } ] } ] }, { 'component': 'VRow', 'content': [ { 'component': 'VCol', 'props': { 'cols': 12, }, 'content': [ { 'component': 'VAlert', 'props': { 'type': 'info', 'variant': 'tonal', 'text': '群辉webhook配置http://ip:3001/api/v1/plugin/SynologyNotify/webhook?apikey=*****&text=hello world。' 'text参数类型是消息内容。此插件安装完需要重启生效api。消息类型默认为手动处理通知。' } } ] } ] }, { 'component': 'VRow', 'content': [ { 'component': 'VCol', 'props': { 'cols': 12, }, 'content': [ { 'component': 'VAlert', 'props': { 'type': 'info', 'variant': 'tonal', 'text': '如安装完插件后,群晖发送webhook提示404,重启MoviePilot即可。' } } ] } ] } ] } ], { "enabled": False, "notify": False, "msgtype": "" } def get_page(self) -> List[dict]: pass def stop_service(self): """ 退出插件 """ pass