This commit is contained in:
thsrite
2024-04-25 10:18:48 +08:00
parent 8739165e51
commit b6bdef3033
3 changed files with 873 additions and 0 deletions

View File

@@ -342,5 +342,22 @@
"v1.1": "fix bug",
"v1.0": "init"
}
},
"WeChatForward2": {
"name": "微信消息转发(测试)",
"description": "根据正则转发通知到其他WeChat应用。",
"version": "1.6",
"icon": "Wechat_A.png",
"author": "thsrite",
"level": 1,
"history": {
"v1.6": "修改获取指定用户订阅列表方法",
"v1.5": "丰富日志",
"v1.4": "特定消息强制指定userid",
"v1.3": "防重复发送额外消息",
"v1.2": "fix规则",
"v1.1": "自定义发送额外消息",
"v1.0": "根据正则转发通知到其他WeChat应用"
}
}
}

View File

@@ -49,6 +49,36 @@ class WeChatForward(_PluginBase):
# 企业微信获取TokenURL
_token_url = f"{settings.WECHAT_PROXY}/cgi-bin/gettoken?corpid=%s&corpsecret=%s"
example = [
{
'comment': '入库消息',
'appid': 1000003,
'corpid': None,
'appsecret': None,
'pattern': '已入库',
'extra_confs': None,
},
{
'comment': '站点签到数据统计',
'appid': 1000005,
'corpid': None,
'appsecret': None,
'pattern': '自动签到|自动登录|数据统计|刷流任务|药丸签到|订阅下载统计|98堂签到',
'extra_confs': [
{
'pattern': '开始下载',
'userid': 'test,test2',
'msg': '{name} 后台下载任务已提交,请耐心等候入库通知。'
},
{
'pattern': '已添加订阅',
'userid': 'test,test2',
'msg': '{name} 电视剧正在更新,待更新后自动下载。'
}
]
}
]
def init_plugin(self, config: dict = None):
if config:
self._enabled = config.get("enabled")

View File

@@ -0,0 +1,826 @@
import json
import re
import time
from datetime import datetime
from app.core.config import settings
from app.db.subscribe_oper import SubscribeOper
from app.modules.wechat import WeChat
from app.plugins import _PluginBase
from app.core.event import eventmanager
from app.schemas.types import EventType, MessageChannel, MediaType
from app.utils.http import RequestUtils
from typing import Any, List, Dict, Tuple, Optional
from app.log import logger
class WeChatForward2(_PluginBase):
# 插件名称
plugin_name = "微信消息转发"
# 插件描述
plugin_desc = "根据正则转发通知到其他WeChat应用。"
# 插件图标
plugin_icon = "Wechat_A.png"
# 插件版本
plugin_version = "1.6"
# 插件作者
plugin_author = "thsrite"
# 作者主页
author_url = "https://github.com/thsrite"
# 插件配置项ID前缀
plugin_config_prefix = "wechatforward_"
# 加载顺序
plugin_order = 16
# 可使用的用户级别
auth_level = 1
# 私有属性
_enabled = False
_wechat = None
_pattern = None
_ignore_userid = None
_extra_confs = None
_pattern_token = {}
_extra_msg_history = {}
_specify_confs = {}
# 企业微信发送消息URL
_send_msg_url = f"{settings.WECHAT_PROXY}/cgi-bin/message/send?access_token=%s"
# 企业微信获取TokenURL
_token_url = f"{settings.WECHAT_PROXY}/cgi-bin/gettoken?corpid=%s&corpsecret=%s"
example = [
{
'comment': '入库消息',
'appid': 1000003,
'corpid': None,
'appsecret': None,
'pattern': '已入库',
'extra_confs': None,
},
{
'comment': '站点签到数据统计',
'appid': 1000005,
'corpid': None,
'appsecret': None,
'pattern': '自动签到|自动登录|数据统计|刷流任务|药丸签到|订阅下载统计|98堂签到',
'extra_confs': [
{
'pattern': '开始下载',
'userid': 'test,test2',
'msg': '{name} 后台下载任务已提交,请耐心等候入库通知。'
},
{
'pattern': '已添加订阅',
'userid': 'test,test2',
'msg': '{name} 电视剧正在更新,待更新后自动下载。'
}
]
}
]
def init_plugin(self, config: dict = None):
if config:
self._enabled = config.get("enabled")
self._wechat = config.get("wechat")
self._pattern = config.get("pattern")
self._ignore_userid = config.get("ignore_userid")
self._extra_confs = config.get("extra_confs")
self._specify_confs = config.get("specify_confs")
# 获取token存库
if self._enabled and self._wechat:
self.__save_wechat_token()
def get_state(self) -> bool:
return self._enabled
@staticmethod
def get_command() -> List[Dict[str, Any]]:
pass
def get_api(self) -> List[Dict[str, Any]]:
pass
def get_form(self) -> Tuple[List[dict], Dict[str, Any]]:
"""
拼装插件配置页面需要返回两块数据1、页面配置2、数据结构
"""
return [
{
'component': 'VForm',
'content': [
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 6
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'enabled',
'label': '开启转发'
}
}
]
},
{
"component": "VCol",
"props": {
"cols": 12,
"md": 4
},
"content": [
{
"component": "VSwitch",
"props": {
"model": "dialog_closed",
"label": "设置站点"
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VTextarea',
'props': {
'model': 'wechat',
'rows': '5',
'label': '应用配置',
'placeholder': 'appid:corpid:appsecret一行一个配置'
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VTextarea',
'props': {
'model': 'pattern',
'rows': '6',
'label': '正则配置',
'placeholder': '对应上方应用配置,一行一个,一一对应'
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VTextarea',
'props': {
'model': 'extra_confs',
'rows': '4',
'label': '额外消息配置',
'placeholder': '开始下载 > userid > 后台下载任务已提交,请耐心等候入库通知。 > appid'
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VTextarea',
'props': {
'model': 'specify_confs',
'rows': '2',
'label': '特定消息指定用户',
'placeholder': 'title > text > userid'
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VTextarea',
'props': {
'model': 'ignore_userid',
'rows': '1',
'label': '忽略userid',
'placeholder': '开始下载|添加下载任务失败'
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VAlert',
'props': {
'type': 'info',
'variant': 'tonal',
'text': '根据正则表达式把MoviePilot的消息转发到多个微信应用。'
'应用配置可加注释:'
'appid:corpid:appsecret#站点通知'
}
}
]
}
]
},
{
"component": "VDialog",
"props": {
"model": "dialog_closed",
"max-width": "65rem",
"overlay-class": "v-dialog--scrollable v-overlay--scroll-blocked",
"content-class": "v-card v-card--density-default v-card--variant-elevated rounded-t"
},
"content": [
{
"component": "VCard",
"props": {
"title": "设置站点配置"
},
"content": [
{
"component": "VDialogCloseBtn",
"props": {
"model": "dialog_closed"
}
},
{
"component": "VCardText",
"props": {},
"content": [
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VAceEditor',
'props': {
'modelvalue': 'site_config',
'lang': 'json',
'theme': 'monokai',
'style': 'height: 30rem',
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VAlert',
'props': {
'type': 'info',
'variant': 'tonal'
},
'content': [
{
'component': 'span',
'text': '注意:只有启用站点独立配置时,该配置项才会生效,详细配置参考:'
},
{
'component': 'a',
'props': {
'href': 'https://github.com/InfinityPacer/MoviePilot-Plugins/blob/main/README.md',
'target': '_blank'
},
'text': 'https://github.com/InfinityPacer/MoviePilot-Plugins/blob/main/README.md'
}
]
}
]
}
]
}
]
}
]
}
]
}
]
}
], {
"enabled": False,
"wechat": "",
"pattern": "",
"ignore_userid": "",
"specify_confs": "",
"extra_confs": "",
"site_config": WeChatForward2.get_demo_site_config()
}
@staticmethod
def get_demo_site_config() -> str:
desc = ("// 以下为配置示例请参考https://github.com/InfinityPacer/MoviePilot-Plugins/blob/main/README.md 进行配置\n"
"// 如与全局保持一致的配置项,请勿在站点配置中配置\n"
"// 注意无关内容需使用 // 注释\n")
config = """[{
"sitename": "站点1",
"seed_time": 96,
"hr_seed_time": 144
}, {
"sitename": "站点2",
"hr": "yes",
"size": "10-500",
"seeder": "5-10",
"pubtime": "5-120",
"seed_time": 96,
"save_path": "/downloads/site2",
"proxy_download": true,
"hr_seed_time": 144
}, {
"sitename": "站点3",
"freeleech": "free",
"hr": "yes",
"include": "",
"exclude": "",
"size": "10-500",
"seeder": "1",
"pubtime": "5-120",
"seed_time": 120,
"hr_seed_time": 144,
"seed_ratio": "",
"seed_size": "",
"download_time": "",
"seed_avgspeed": "",
"seed_inactivetime": "",
"save_path": "/downloads/site1",
"proxy_download": false,
"proxy_delete": false,
"qb_category": "刷流",
"auto_qb_category": true
}]"""
return desc + config
def get_page(self) -> List[dict]:
pass
@eventmanager.register(EventType.NoticeMessage)
def send(self, event):
"""
消息转发
"""
if not self._enabled:
return
# 消息体
data = event.event_data
channel = data['channel']
if channel and channel != MessageChannel.Wechat:
return
title = data['title']
text = data['text']
image = data['image']
userid = data['userid']
# 正则匹配
patterns = self._pattern.split("\n")
for index, pattern in enumerate(patterns):
msg_match = re.search(pattern, title)
if msg_match:
access_token, appid = self.__flush_access_token(index=index)
if not access_token:
logger.error("未获取到有效token请检查配置")
continue
# 忽略userid正则表达式
if self._ignore_userid and re.search(self._ignore_userid, title):
userid = None
else:
# 特定消息指定用户
userid = self.__specify_userid(title=title, text=text, userid=userid)
# 发送消息
if image:
self.__send_image_message(title=title, text=text, image_url=image, userid=userid,
access_token=access_token, appid=appid, index=index)
else:
self.__send_message(title=title, text=text, userid=userid, access_token=access_token, appid=appid,
index=index)
# 开始下载 > userid > {name} 后台下载任务已提交,请耐心等候入库通知。 > appid
# 已添加订阅 > userid > {name} 电视剧正在更新,已添加订阅,待更新后自动下载。 > appid
if self._extra_confs:
self.__send_extra_msg(title=title, text=text)
def __specify_userid(self, title, text, userid):
"""
特定消息指定用户
"""
if self._specify_confs:
for specify_conf in self._specify_confs.split("\n"):
if not specify_conf:
continue
# 跳过注释
if str(specify_conf).startswith("#"):
continue
specify = specify_conf.split(" > ")
if len(specify) != 3:
continue
if re.search(specify[0], title) and re.search(specify[1], text):
userid = specify[2]
logger.info(f"消息 {title} {text} 指定用户 {userid}")
break
return userid
def __send_extra_msg(self, title, text):
"""
根据自定义规则发送额外消息
"""
self._extra_msg_history = self.get_data(key="extra_msg") or {}
is_save_history = False
extra_confs = self._extra_confs.split("\n")
for extra_conf in extra_confs:
if not extra_conf:
continue
# 跳过注释
if str(extra_conf).startswith("#"):
continue
extras = str(extra_conf).split(" > ")
if len(extras) != 4:
continue
extra_pattern = extras[0]
extra_userid = extras[1]
extra_title = extras[2]
extra_appid = extras[3]
if str(extra_title).find('{name}') != -1:
extra_title = extra_title.replace('{name}', self.__parse_tv_title(title))
if re.search(extra_pattern, title):
logger.info(f"{title} 正则匹配到额外消息 {extra_pattern}")
# 搜索消息获取消息text中的用户
userid_pattern = r"用户:(.*?)\n"
result = re.search(userid_pattern, text)
if not result:
# 订阅消息获取消息text中的用户
pattern = r"来自用户:(.*?)$"
result = re.search(pattern, text)
if not result:
logger.error("未获取到用户,跳过处理")
continue
# 获取消息text中的用户
user_id = result.group(1)
logger.info(f"获取到消息用户 {user_id}")
if user_id and any(user_id == user for user in extra_userid.split(",")):
if "开始下载" in str(title):
# 判断是否重复发送10分钟内重复消息title、重复userid算重复消息
extra_history_time = self._extra_msg_history.get(
f"{user_id}-{self.__parse_tv_title(title)}") or None
# 只处理下载消息
if extra_history_time:
logger.info(
f"获取到额外消息上次发送时间 {datetime.strptime(extra_history_time, '%Y-%m-%d %H:%M:%S')}")
if (datetime.now() - datetime.strptime(extra_history_time,
'%Y-%m-%d %H:%M:%S')).total_seconds() < 600:
logger.warn(
f"额外消息 {self.__parse_tv_title(title)} 十分钟内重复发送,跳过。")
continue
# 判断当前用户是否订阅,是否订阅后续消息
subscribes = SubscribeOper().list_by_username(username=str(user_id),
state="R",
mtype=MediaType.TV.value)
is_subscribe = False
for subscribe in subscribes:
# 匹配订阅title
if f"{subscribe.name} ({subscribe.year})" in title:
is_subscribe = True
# 电视剧之前该用户订阅下载过,不再发送额外消息
if is_subscribe:
logger.warn(
f"额外消息 {self.__parse_tv_title(title)} 用户 {user_id} 已订阅,不再发送额外消息。")
continue
logger.info(f"消息用户{user_id} 匹配到目标用户 {extra_userid}")
# 发送额外消息
if str(settings.WECHAT_APP_ID) == str(extra_appid):
# 直接发送
WeChat().send_msg(title=extra_title, userid=user_id)
logger.info(f"{settings.WECHAT_APP_ID} 发送额外消息 {extra_title} 成功")
# 保存已发送消息
if "开始下载" in str(title):
self._extra_msg_history[f"{user_id}-{self.__parse_tv_title(title)}"] = time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
is_save_history = True
else:
for wechat_idx in self._pattern_token.keys():
wechat_conf = self._pattern_token.get(wechat_idx)
if (wechat_conf and wechat_conf.get("appid")
and str(wechat_conf.get("appid")) == str(extra_appid)):
access_token, appid = self.__flush_access_token(index=wechat_idx)
if not access_token:
logger.error("未获取到有效token请检查配置")
continue
self.__send_message(title=extra_title,
userid=user_id,
access_token=access_token,
appid=appid,
index=wechat_idx)
logger.info(f"{appid} 发送额外消息 {extra_title} 成功")
# 保存已发送消息
if "开始下载" in str(title):
self._extra_msg_history[
f"{user_id}-{self.__parse_tv_title(title)}"] = time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
is_save_history = True
# 保存额外消息历史
if is_save_history:
self.save_data(key="extra_msg",
value=self._extra_msg_history)
def __parse_tv_title(self, title):
"""
解析title标题
"""
titles = title.split(" ")
_title = ""
for sub_title_str in titles:
# 电影 功夫熊猫 (2008) 开始下载
# 电影 功夫熊猫 (2008) 已添加订阅
# 电视剧 追风者 (2024) S01 E01-E04 开始下载
# 电视剧 追风者 (2024) S01 E01-E04 已添加订阅
if 'E' in sub_title_str:
continue
if '开始下载' in sub_title_str:
continue
if '已添加订阅' in sub_title_str:
continue
_title += f"{sub_title_str} "
return str(_title.rstrip())
def __save_wechat_token(self):
"""
获取并存储wechat token
"""
# 解析配置
wechats = self._wechat.split("\n")
for index, wechat in enumerate(wechats):
# 排除注释
wechat = wechat.split("#")[0]
wechat_config = wechat.split(":")
if len(wechat_config) != 3:
logger.error(f"{wechat} 应用配置不正确")
continue
appid = wechat_config[0]
corpid = wechat_config[1]
appsecret = wechat_config[2]
# 已过期重新获取token
access_token, expires_in, access_token_time = self.__get_access_token(corpid=corpid,
appsecret=appsecret)
if not access_token:
# 没有token获取token
logger.error(f"wechat配置 appid = {appid} 获取token失败请检查配置")
continue
self._pattern_token[index] = {
"appid": appid,
"corpid": corpid,
"appsecret": appsecret,
"access_token": access_token,
"expires_in": expires_in,
"access_token_time": access_token_time,
}
def __flush_access_token(self, index: int, force: bool = False):
"""
获取第i个配置wechat token
"""
wechat_token = self._pattern_token[index]
if not wechat_token:
logger.error(f"未获取到第 {index} 条正则对应的wechat应用token请检查配置")
return None
access_token = wechat_token['access_token']
expires_in = wechat_token['expires_in']
access_token_time = wechat_token['access_token_time']
appid = wechat_token['appid']
corpid = wechat_token['corpid']
appsecret = wechat_token['appsecret']
# 判断token有效期
if force or (datetime.now() - access_token_time).seconds >= expires_in:
# 重新获取token
access_token, expires_in, access_token_time = self.__get_access_token(corpid=corpid,
appsecret=appsecret)
if not access_token:
logger.error(f"wechat配置 appid = {appid} 获取token失败请检查配置")
return None, None
self._pattern_token[index] = {
"appid": appid,
"corpid": corpid,
"appsecret": appsecret,
"access_token": access_token,
"expires_in": expires_in,
"access_token_time": access_token_time,
}
return access_token, appid
def __send_message(self, title: str, text: str = None, userid: str = None, access_token: str = None,
appid: str = None, index: int = None) -> Optional[bool]:
"""
发送文本消息
:param title: 消息标题
:param text: 消息内容
:param userid: 消息发送对象的ID为空则发给所有人
:return: 发送状态,错误信息
"""
if text:
conent = "%s\n%s" % (title, text.replace("\n\n", "\n"))
else:
conent = title
if not userid:
userid = "@all"
req_json = {
"touser": userid,
"msgtype": "text",
"agentid": appid,
"text": {
"content": conent
},
"safe": 0,
"enable_id_trans": 0,
"enable_duplicate_check": 0
}
return self.__post_request(access_token=access_token, req_json=req_json, index=index, title=title)
def __send_image_message(self, title: str, text: str, image_url: str, userid: str = None,
access_token: str = None, appid: str = None, index: int = None) -> Optional[bool]:
"""
发送图文消息
:param title: 消息标题
:param text: 消息内容
:param image_url: 图片地址
:param userid: 消息发送对象的ID为空则发给所有人
:return: 发送状态,错误信息
"""
if text:
text = text.replace("\n\n", "\n")
if not userid:
userid = "@all"
req_json = {
"touser": userid,
"msgtype": "news",
"agentid": appid,
"news": {
"articles": [
{
"title": title,
"description": text,
"picurl": image_url,
"url": ''
}
]
}
}
return self.__post_request(access_token=access_token, req_json=req_json, index=index, title=title)
def __post_request(self, access_token: str, req_json: dict, index: int, title: str, retry: int = 0) -> bool:
message_url = self._send_msg_url % access_token
"""
向微信发送请求
"""
try:
res = RequestUtils(content_type='application/json').post(
message_url,
data=json.dumps(req_json, ensure_ascii=False).encode('utf-8')
)
if res and res.status_code == 200:
ret_json = res.json()
if ret_json.get('errcode') == 0:
logger.info(f"转发 配置{index} 消息 {title} {req_json} 成功")
return True
else:
if ret_json.get('errcode') == 81013:
return False
logger.error(f"转发 配置{index} 消息 {title} {req_json} 失败,错误信息:{ret_json}")
if ret_json.get('errcode') == 42001 or ret_json.get('errcode') == 40014:
logger.info("token已过期正在重新刷新token重试")
# 重新获取token
access_token, appid = self.__flush_access_token(index=index,
force=True)
if access_token:
retry += 1
# 重发请求
if retry <= 3:
return self.__post_request(access_token=access_token,
req_json=req_json,
index=index,
title=title,
retry=retry)
return False
elif res is not None:
logger.error(
f"转发 配置{index} 消息 {title} {req_json} 失败,错误码:{res.status_code},错误原因:{res.reason}")
return False
else:
logger.error(f"转发 配置{index} 消息 {title} {req_json} 失败,未获取到返回信息")
return False
except Exception as err:
logger.error(f"转发 配置{index} 消息 {title} {req_json} 异常,错误信息:{str(err)}")
return False
def __get_access_token(self, corpid: str, appsecret: str):
"""
获取微信Token
:return 微信Token
"""
try:
token_url = self._token_url % (corpid, appsecret)
res = RequestUtils().get_res(token_url)
if res:
ret_json = res.json()
if ret_json.get('errcode') == 0:
access_token = ret_json.get('access_token')
expires_in = ret_json.get('expires_in')
access_token_time = datetime.now()
return access_token, expires_in, access_token_time
else:
logger.error(f"{ret_json.get('errmsg')}")
return None, None, None
else:
logger.error(f"{corpid} {appsecret} 获取token失败")
return None, None, None
except Exception as e:
logger.error(f"获取微信access_token失败错误信息{str(e)}")
return None, None, None
def stop_service(self):
"""
退出插件
"""
pass