import json import paho.mqtt.client as mqtt from typing import Any, List, Dict, Tuple from app.core.event import eventmanager, Event from app.log import logger from app.plugins import _PluginBase from app.schemas.types import EventType, NotificationType class MqttClient: def __init__(self, broker: str, port: int, topic: str, username: str = None, password: str = None): self._broker = broker self._port = port self._topic = topic self._username = username self._password = password self._client = mqtt.Client() if username and password: self._client.username_pw_set(username, password) self._client.connect(broker, port, 60) def send(self, message: str, qos: int = 0, retain: bool = False): result = self._client.publish(self._topic, message, qos, retain) return result class MqttMsg(_PluginBase): # 插件名称 plugin_name = "MQTT 消息通知" # 插件描述 plugin_desc = "支持使用 MQTT 发送消息通知。" # 插件图标 plugin_icon = "Mqtt_A.png" # 插件版本 plugin_version = "1.0" # 插件作者 plugin_author = "lethargicScribe" # 作者主页 author_url = "https://github.com/lethargicScribe" # 插件配置项ID前缀 plugin_config_prefix = "mqttmsg_" # 加载顺序 plugin_order = 27 # 可使用的用户级别 auth_level = 1 # 私有属性 _enabled = False _broker = None _port = None _topic = None _username = None _password = None _msgtypes = [] def init_plugin(self, config: dict = None): if config: self._enabled = config.get("enabled") self._msgtypes = config.get("msgtypes") or [] self._broker = config.get("broker") self._port = config.get("port") self._topic = config.get("topic") self._username = config.get("username") self._password = config.get("password") def get_state(self) -> bool: return self._enabled and (True if self._broker and self._port and self._topic else False) @staticmethod def get_command() -> List[Dict[str, Any]]: pass def get_api(self) -> List[Dict[str, Any]]: pass def get_form(self) -> Tuple[List[dict], Dict[str, Any]]: """ 拼装插件配置页面,需要返回两块数据:1、页面配置;2、数据结构 """ MsgTypeOptions = [] for item in NotificationType: MsgTypeOptions.append({ "title": item.value, "value": item.name }) return [ { 'component': 'VForm', 'content': [ { 'component': 'VRow', 'content': [ { 'component': 'VCol', 'props': { 'cols': 12, 'md': 6 }, 'content': [ { 'component': 'VSwitch', 'props': { 'model': 'enabled', 'label': '启用插件', } } ] } ] }, { 'component': 'VRow', 'content': [ { 'component': 'VCol', 'props': { 'cols': 12, 'md': 6 }, 'content': [ { 'component': 'VTextField', 'props': { 'model': 'broker', 'label': 'MQTT Broker', 'placeholder': 'broker.example.com', } } ] }, { 'component': 'VCol', 'props': { 'cols': 12, 'md': 6 }, 'content': [ { 'component': 'VTextField', 'props': { 'model': 'port', 'label': '端口', 'placeholder': '1883', } } ] }, { 'component': 'VCol', 'props': { 'cols': 12, 'md': 6 }, 'content': [ { 'component': 'VTextField', 'props': { 'model': 'topic', 'label': '主题', 'placeholder': 'example/topic', } } ] }, { 'component': 'VCol', 'props': { 'cols': 12, 'md': 6 }, 'content': [ { 'component': 'VTextField', 'props': { 'model': 'username', 'label': '用户名', 'placeholder': 'mqttuser', } } ] }, { 'component': 'VCol', 'props': { 'cols': 12, 'md': 6 }, 'content': [ { 'component': 'VTextField', 'props': { 'model': 'password', 'label': '密码', 'placeholder': 'mqttpassword', } } ] } ] }, { 'component': 'VRow', 'content': [ { 'component': 'VCol', 'props': { 'cols': 12 }, 'content': [ { 'component': 'VSelect', 'props': { 'multiple': True, 'chips': True, 'model': 'msgtypes', 'label': '消息类型', 'items': MsgTypeOptions } } ] } ] }, ] } ], { "enabled": False, 'msgtypes': [], 'broker': 'broker.example.com', 'port': 1883, 'topic': 'example/topic', 'username': '', 'password': '', } def get_page(self) -> List[dict]: pass @eventmanager.register(EventType.NoticeMessage) def send(self, event: Event): """ 消息发送事件 """ if not self.get_state(): return if not event.event_data: return msg_body = event.event_data # 渠道 channel = msg_body.get("channel") if channel: return # 类型 msg_type: NotificationType = msg_body.get("type") # 标题 title = msg_body.get("title") # 文本 text = msg_body.get("text") if not title and not text: logger.warn("标题和内容不能同时为空") return if (msg_type and self._msgtypes and msg_type.name not in self._msgtypes): logger.info(f"消息类型 {msg_type.value} 未开启消息发送") return try: if not self._broker or not self._port or not self._topic: return False, "参数未配置" mqtt_client = MqttClient( broker=self._broker, port=self._port, topic=self._topic, username=self._username, password=self._password ) mqtt_client.send(message=text) except Exception as msg_e: logger.error(f"MQTT消息发送失败,错误信息:{str(msg_e)}") def stop_service(self): """ 退出插件 """ pass