mqtt push

This commit is contained in:
xiaohuozi
2024-07-11 20:43:43 +08:00
parent 8ccffcde59
commit 5fe703946f
2 changed files with 291 additions and 0 deletions

BIN
icons/Mqtt_A.png Executable file

Binary file not shown.

After

Width:  |  Height:  |  Size: 16 KiB

291
plugins/mqttpush/__init__.py Executable file
View File

@@ -0,0 +1,291 @@
import json
import paho.mqtt.client as mqtt
from typing import Any, List, Dict, Tuple
from app.core.event import eventmanager, Event
from app.log import logger
from app.plugins import _PluginBase
from app.schemas.types import EventType, NotificationType
class MqttClient:
def __init__(self, broker: str, port: int, topic: str, username: str = None, password: str = None):
self._broker = broker
self._port = port
self._topic = topic
self._username = username
self._password = password
self._client = mqtt.Client()
if username and password:
self._client.username_pw_set(username, password)
self._client.connect(broker, port, 60)
def send(self, message: str, qos: int = 0, retain: bool = False):
result = self._client.publish(self._topic, message, qos, retain)
return result
class MqttMsg(_PluginBase):
# 插件名称
plugin_name = "MQTT 消息通知"
# 插件描述
plugin_desc = "支持使用 MQTT 发送消息通知。"
# 插件图标
plugin_icon = "Mqtt_A.png"
# 插件版本
plugin_version = "1.0"
# 插件作者
plugin_author = "lethargicScribe"
# 作者主页
author_url = "https://github.com/lethargicScribe"
# 插件配置项ID前缀
plugin_config_prefix = "mqttmsg_"
# 加载顺序
plugin_order = 27
# 可使用的用户级别
auth_level = 1
# 私有属性
_enabled = False
_broker = None
_port = None
_topic = None
_username = None
_password = None
_msgtypes = []
def init_plugin(self, config: dict = None):
if config:
self._enabled = config.get("enabled")
self._msgtypes = config.get("msgtypes") or []
self._broker = config.get("broker")
self._port = config.get("port")
self._topic = config.get("topic")
self._username = config.get("username")
self._password = config.get("password")
def get_state(self) -> bool:
return self._enabled and (True if self._broker and self._port and self._topic else False)
@staticmethod
def get_command() -> List[Dict[str, Any]]:
pass
def get_api(self) -> List[Dict[str, Any]]:
pass
def get_form(self) -> Tuple[List[dict], Dict[str, Any]]:
"""
拼装插件配置页面需要返回两块数据1、页面配置2、数据结构
"""
MsgTypeOptions = []
for item in NotificationType:
MsgTypeOptions.append({
"title": item.value,
"value": item.name
})
return [
{
'component': 'VForm',
'content': [
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 6
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'enabled',
'label': '启用插件',
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 6
},
'content': [
{
'component': 'VTextField',
'props': {
'model': 'broker',
'label': 'MQTT Broker',
'placeholder': 'broker.example.com',
}
}
]
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 6
},
'content': [
{
'component': 'VTextField',
'props': {
'model': 'port',
'label': '端口',
'placeholder': '1883',
}
}
]
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 6
},
'content': [
{
'component': 'VTextField',
'props': {
'model': 'topic',
'label': '主题',
'placeholder': 'example/topic',
}
}
]
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 6
},
'content': [
{
'component': 'VTextField',
'props': {
'model': 'username',
'label': '用户名',
'placeholder': 'mqttuser',
}
}
]
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 6
},
'content': [
{
'component': 'VTextField',
'props': {
'model': 'password',
'label': '密码',
'placeholder': 'mqttpassword',
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12
},
'content': [
{
'component': 'VSelect',
'props': {
'multiple': True,
'chips': True,
'model': 'msgtypes',
'label': '消息类型',
'items': MsgTypeOptions
}
}
]
}
]
},
]
}
], {
"enabled": False,
'msgtypes': [],
'broker': 'broker.example.com',
'port': 1883,
'topic': 'example/topic',
'username': '',
'password': '',
}
def get_page(self) -> List[dict]:
pass
@eventmanager.register(EventType.NoticeMessage)
def send(self, event: Event):
"""
消息发送事件
"""
if not self.get_state():
return
if not event.event_data:
return
msg_body = event.event_data
# 渠道
channel = msg_body.get("channel")
if channel:
return
# 类型
msg_type: NotificationType = msg_body.get("type")
# 标题
title = msg_body.get("title")
# 文本
text = msg_body.get("text")
if not title and not text:
logger.warn("标题和内容不能同时为空")
return
if (msg_type and self._msgtypes
and msg_type.name not in self._msgtypes):
logger.info(f"消息类型 {msg_type.value} 未开启消息发送")
return
try:
if not self._broker or not self._port or not self._topic:
return False, "参数未配置"
mqtt_client = MqttClient(
broker=self._broker,
port=self._port,
topic=self._topic,
username=self._username,
password=self._password
)
mqtt_client.send(message=text)
except Exception as msg_e:
logger.error(f"MQTT消息发送失败错误信息{str(msg_e)}")
def stop_service(self):
"""
退出插件
"""
pass