IYUU消息推送 增加 限流

This commit is contained in:
jxxghp
2024-11-13 08:41:06 +08:00
parent e8f4787647
commit 90738219f1
2 changed files with 100 additions and 44 deletions

View File

@@ -905,5 +905,18 @@
"v1.5.1": "修复季度信息未传递的问题. 新增站点列表同步删除",
"v1.5": "修复总集数会同步TMDB变动的问题,增加开关选项"
}
},
"IyuuMsg": {
"name": "IYUU消息推送",
"description": "支持使用IYUU发送消息通知。",
"labels": "消息通知,IYUU",
"version": "1.3",
"icon": "Iyuu_A.png",
"author": "jxxghp",
"level": 1,
"v2": true,
"history": {
"v1.3": "消息限流发送以缓解IYUU服务器压力"
}
}
}

View File

@@ -1,11 +1,14 @@
import threading
from queue import Queue
from time import time, sleep
from typing import Any, List, Dict, Tuple
from urllib.parse import urlencode
from app.plugins import _PluginBase
from app.core.event import eventmanager, Event
from app.log import logger
from app.plugins import _PluginBase
from app.schemas.types import EventType, NotificationType
from app.utils.http import RequestUtils
from typing import Any, List, Dict, Tuple
from app.log import logger
class IyuuMsg(_PluginBase):
@@ -16,7 +19,7 @@ class IyuuMsg(_PluginBase):
# 插件图标
plugin_icon = "Iyuu_A.png"
# 插件版本
plugin_version = "1.2"
plugin_version = "1.3"
# 插件作者
plugin_author = "jxxghp"
# 作者主页
@@ -33,12 +36,30 @@ class IyuuMsg(_PluginBase):
_token = None
_msgtypes = []
# 消息处理线程
processing_thread = None
# 上次发送时间
last_send_time = 0
# 消息队列
message_queue = Queue()
# 消息发送间隔(秒)
send_interval = 5
# 退出事件
__event = threading.Event()
def init_plugin(self, config: dict = None):
self.__event.clear()
if config:
self._enabled = config.get("enabled")
self._token = config.get("token")
self._msgtypes = config.get("msgtypes") or []
if self._enabled and self._token:
# 启动处理队列的后台线程
self.processing_thread = threading.Thread(target=self.process_queue)
self.processing_thread.daemon = True
self.processing_thread.start()
def get_state(self) -> bool:
return self._enabled and (True if self._token else False)
@@ -143,35 +164,52 @@ class IyuuMsg(_PluginBase):
@eventmanager.register(EventType.NoticeMessage)
def send(self, event: Event):
"""
消息发送事件
消息发送事件,将消息加入队列
"""
if not self.get_state():
return
if not event.event_data:
if not self.get_state() or not event.event_data:
return
msg_body = event.event_data
# 渠道
channel = msg_body.get("channel")
if channel:
return
# 类型
msg_type: NotificationType = msg_body.get("type")
# 标题
title = msg_body.get("title")
# 文本
text = msg_body.get("text")
if not title and not text:
# 验证消息的有效性
if not msg_body.get("title") and not msg_body.get("text"):
logger.warn("标题和内容不能同时为空")
return
if (msg_type and self._msgtypes
and msg_type.name not in self._msgtypes):
logger.info(f"消息类型 {msg_type.value} 未开启消息发送")
return
# 将消息加入队列
self.message_queue.put(msg_body)
logger.info("消息已加入队列等待发送")
def process_queue(self):
"""
处理队列中的消息,按间隔时间发送
"""
while True:
if self.__event.is_set():
logger.info("消息发送线程正在退出...")
break
# 获取队列中的下一条消息
msg_body = self.message_queue.get()
# 检查是否满足发送间隔时间
current_time = time()
time_since_last_send = current_time - self.last_send_time
if time_since_last_send < self.send_interval:
sleep(self.send_interval - time_since_last_send)
# 处理消息内容
channel = msg_body.get("channel")
if channel:
continue
msg_type: NotificationType = msg_body.get("type")
title = msg_body.get("title")
text = msg_body.get("text")
# 检查消息类型是否已启用
if msg_type and self._msgtypes and msg_type.name not in self._msgtypes:
logger.info(f"消息类型 {msg_type.value} 未开启消息发送")
continue
# 尝试发送消息
try:
sc_url = "https://iyuu.cn/%s.send?%s" % (self._token, urlencode({"text": title, "desp": text}))
res = RequestUtils().get_res(sc_url)
@@ -181,6 +219,8 @@ class IyuuMsg(_PluginBase):
error = ret_json.get('errmsg')
if errno == 0:
logger.info("IYUU消息发送成功")
# 更新上次发送时间
self.last_send_time = time()
else:
logger.warn(f"IYUU消息发送失败错误码{errno},错误原因:{error}")
elif res is not None:
@@ -190,8 +230,11 @@ class IyuuMsg(_PluginBase):
except Exception as msg_e:
logger.error(f"IYUU消息发送失败{str(msg_e)}")
# 标记任务完成
self.message_queue.task_done()
def stop_service(self):
"""
退出插件
"""
pass
self.__event.set()