This commit is contained in:
xiaohuozi
2024-07-12 10:17:25 +08:00
parent b34a6fbf51
commit b12983114c
2 changed files with 40 additions and 83 deletions

View File

@@ -1,7 +1,6 @@
import asyncio
import websockets
import json
import struct
import paho.mqtt.client as mqtt
from typing import Any, List, Dict, Tuple
from app.core.event import eventmanager, Event
@@ -9,69 +8,31 @@ from app.log import logger
from app.plugins import _PluginBase
from app.schemas.types import EventType, NotificationType
class MqttClient:
def __init__(
self,
topic: str,
server: str = "ws://mqtt.example.com:8083/mqtt",
username: str = "",
password: str = "",
):
self._server = server
self._topic = topic
self._username = username
self._password = password
def __set_url(self, server, topic):
self.url = server.strip("/") + "/" + topic
def _build_mqtt_connect_packet(self, client_id, keep_alive=60):
# 构建 MQTT 连接报文
protocol_name = b"MQTT"
protocol_level = 4
connect_flags = 0x02 | 0x80 | 0x40 # 设置用户名和密码标志
payload = (
struct.pack("!H4sBBH", len(protocol_name), protocol_name, protocol_level, connect_flags, keep_alive) +
struct.pack("!H%dsH%dsH%ds" % (len(client_id), len(self._username), len(self._password)),
len(client_id), client_id.encode('utf-8'),
len(self._username), self._username.encode('utf-8'),
len(self._password), self._password.encode('utf-8'))
)
return b'\x10' + struct.pack('!B', len(payload)) + payload
def __init__(self, server: str, port: int, topic: str, user: str = "", password: str = ""):
self.server = server
self.port = port
self.topic = topic
self.client = mqtt.Client()
if user and password:
self.client.username_pw_set(user, password)
def _build_mqtt_publish_packet(self, topic, message):
# 构建 MQTT 发布报文
payload = message.encode('utf-8')
packet = (
struct.pack("!BBH%ds" % len(topic), 0x30, 2 + len(topic) + len(payload), len(topic), topic.encode('utf-8')) +
payload
)
return packet
def send(self, message: str, title: str = None, format_as_markdown: bool = False):
full_message = {
"title": title,
"message": message,
"markdown": format_as_markdown
}
self.client.connect(self.server, self.port, 60)
self.client.publish(self.topic, json.dumps(full_message))
self.client.disconnect()
async def send(self, message: str, title: str = None, format_as_markdown: bool = False):
async with websockets.connect(self._server) as websocket:
client_id = "mqtt_client"
connect_packet = self._build_mqtt_connect_packet(client_id)
await websocket.send(connect_packet)
connack = await websocket.recv()
print(f"收到 CONNACK 报文: {connack}")
payload = {
"message": message,
"title": title,
"markdown": format_as_markdown
}
payload_str = json.dumps(payload)
publish_packet = self._build_mqtt_publish_packet(self._topic, payload_str)
await websocket.send(publish_packet)
print("MQTT PUBLISH 报文已发送")
disconnect_packet = b'\xe0\x00'
await websocket.send(disconnect_packet)
print("MQTT DISCONNECT 报文已发送")
class MqttMsg(_PluginBase):
# 插件名称
plugin_name = "MQTT 消息通知"
plugin_name = "mqtt 消息通知"
# 插件描述
plugin_desc = "支持使用 MQTT 发送消息通知。"
# 插件图标
@@ -85,16 +46,16 @@ class MqttMsg(_PluginBase):
# 插件配置项ID前缀
plugin_config_prefix = "mqttmsg_"
# 加载顺序
plugin_order = 27
plugin_order = 26
# 可使用的用户级别
auth_level = 1
# 私有属性
_enabled = False
_broker = None
_server = None
_port = None
_topic = None
_username = None
_user = None
_password = None
_msgtypes = []
@@ -102,14 +63,14 @@ class MqttMsg(_PluginBase):
if config:
self._enabled = config.get("enabled")
self._msgtypes = config.get("msgtypes") or []
self._broker = config.get("broker")
self._server = config.get("server")
self._port = config.get("port")
self._topic = config.get("topic")
self._username = config.get("username")
self._user = config.get("user")
self._password = config.get("password")
def get_state(self) -> bool:
return self._enabled and (True if self._broker and self._port and self._topic else False)
return self._enabled and (True if self._server and self._port and self._topic else False)
@staticmethod
def get_command() -> List[Dict[str, Any]]:
@@ -122,6 +83,7 @@ class MqttMsg(_PluginBase):
"""
拼装插件配置页面需要返回两块数据1、页面配置2、数据结构
"""
# 编历 NotificationType 枚举,生成消息类型选项
MsgTypeOptions = []
for item in NotificationType:
MsgTypeOptions.append({
@@ -166,9 +128,9 @@ class MqttMsg(_PluginBase):
{
'component': 'VTextField',
'props': {
'model': 'broker',
'label': 'MQTT Broker',
'placeholder': 'broker.example.com',
'model': 'server',
'label': '服务器',
'placeholder': 'mqtt.example.com',
}
}
]
@@ -202,7 +164,7 @@ class MqttMsg(_PluginBase):
'props': {
'model': 'topic',
'label': '主题',
'placeholder': 'example/topic',
'placeholder': 'MoviePilot',
}
}
]
@@ -217,7 +179,7 @@ class MqttMsg(_PluginBase):
{
'component': 'VTextField',
'props': {
'model': 'username',
'model': 'user',
'label': '用户名',
'placeholder': 'mqttuser',
}
@@ -233,7 +195,7 @@ class MqttMsg(_PluginBase):
'content': [
{
'component': 'VTextField',
'props': {
'props': {
'model': 'password',
'label': '密码',
'placeholder': 'mqttpassword',
@@ -271,10 +233,10 @@ class MqttMsg(_PluginBase):
], {
"enabled": False,
'msgtypes': [],
'broker': 'broker.example.com',
'server': 'mqtt.example.com',
'port': 1883,
'topic': 'example/topic',
'username': '',
'topic': 'MoviePilot',
'user': '',
'password': '',
}
@@ -314,16 +276,10 @@ class MqttMsg(_PluginBase):
return
try:
if not self._broker or not self._port or not self._topic:
if not self._server or not self._port or not self._topic:
return False, "参数未配置"
mqtt_client = MqttClient(
topic=self._topic,
server=f"ws://{self._broker}:{self._port}/mqtt",
username=self._username,
password=self._password
)
asyncio.run(mqtt_client.send(message=text))
mqtt_client = MqttClient(server=self._server, port=self._port, topic=self._topic, user=self._user, password=self._password)
mqtt_client.send(title=title, message=text, format_as_markdown=True)
except Exception as msg_e:
logger.error(f"MQTT消息发送失败错误信息{str(msg_e)}")

View File

@@ -0,0 +1 @@
paho-mqtt==1.6.1