From b34a6fbf51d179b786cfe4adc4f36303db340331 Mon Sep 17 00:00:00 2001 From: xiaohuozi <1093565302@qq.com> Date: Thu, 11 Jul 2024 22:18:47 +0800 Subject: [PATCH] icon --- plugins/mqttpush/__init__.py | 78 ++++++++++++++++++++++++++++-------- 1 file changed, 61 insertions(+), 17 deletions(-) diff --git a/plugins/mqttpush/__init__.py b/plugins/mqttpush/__init__.py index 28d1d8a..59df267 100755 --- a/plugins/mqttpush/__init__.py +++ b/plugins/mqttpush/__init__.py @@ -1,6 +1,7 @@ +import asyncio +import websockets import json -import paho.mqtt.client as mqtt - +import struct from typing import Any, List, Dict, Tuple from app.core.event import eventmanager, Event @@ -9,21 +10,64 @@ from app.plugins import _PluginBase from app.schemas.types import EventType, NotificationType class MqttClient: - - def __init__(self, broker: str, port: int, topic: str, username: str = None, password: str = None): - self._broker = broker - self._port = port + def __init__( + self, + topic: str, + server: str = "ws://mqtt.example.com:8083/mqtt", + username: str = "", + password: str = "", + ): + self._server = server self._topic = topic self._username = username self._password = password - self._client = mqtt.Client() - if username and password: - self._client.username_pw_set(username, password) - self._client.connect(broker, port, 60) + def __set_url(self, server, topic): + self.url = server.strip("/") + "/" + topic - def send(self, message: str, qos: int = 0, retain: bool = False): - result = self._client.publish(self._topic, message, qos, retain) - return result + def _build_mqtt_connect_packet(self, client_id, keep_alive=60): + # 构建 MQTT 连接报文 + protocol_name = b"MQTT" + protocol_level = 4 + connect_flags = 0x02 | 0x80 | 0x40 # 设置用户名和密码标志 + payload = ( + struct.pack("!H4sBBH", len(protocol_name), protocol_name, protocol_level, connect_flags, keep_alive) + + struct.pack("!H%dsH%dsH%ds" % (len(client_id), len(self._username), len(self._password)), + len(client_id), client_id.encode('utf-8'), + len(self._username), self._username.encode('utf-8'), + len(self._password), self._password.encode('utf-8')) + ) + return b'\x10' + struct.pack('!B', len(payload)) + payload + + def _build_mqtt_publish_packet(self, topic, message): + # 构建 MQTT 发布报文 + payload = message.encode('utf-8') + packet = ( + struct.pack("!BBH%ds" % len(topic), 0x30, 2 + len(topic) + len(payload), len(topic), topic.encode('utf-8')) + + payload + ) + return packet + + async def send(self, message: str, title: str = None, format_as_markdown: bool = False): + async with websockets.connect(self._server) as websocket: + client_id = "mqtt_client" + connect_packet = self._build_mqtt_connect_packet(client_id) + await websocket.send(connect_packet) + connack = await websocket.recv() + print(f"收到 CONNACK 报文: {connack}") + + payload = { + "message": message, + "title": title, + "markdown": format_as_markdown + } + payload_str = json.dumps(payload) + publish_packet = self._build_mqtt_publish_packet(self._topic, payload_str) + await websocket.send(publish_packet) + print("MQTT PUBLISH 报文已发送") + + disconnect_packet = b'\xe0\x00' + await websocket.send(disconnect_packet) + print("MQTT DISCONNECT 报文已发送") class MqttMsg(_PluginBase): # 插件名称 @@ -189,7 +233,7 @@ class MqttMsg(_PluginBase): 'content': [ { 'component': 'VTextField', - 'props': { + 'props': { 'model': 'password', 'label': '密码', 'placeholder': 'mqttpassword', @@ -272,14 +316,14 @@ class MqttMsg(_PluginBase): try: if not self._broker or not self._port or not self._topic: return False, "参数未配置" + mqtt_client = MqttClient( - broker=self._broker, - port=self._port, topic=self._topic, + server=f"ws://{self._broker}:{self._port}/mqtt", username=self._username, password=self._password ) - mqtt_client.send(message=text) + asyncio.run(mqtt_client.send(message=text)) except Exception as msg_e: logger.error(f"MQTT消息发送失败,错误信息:{str(msg_e)}")