Merge pull request #573 from RamenRa/master

This commit is contained in:
jxxghp
2024-11-23 12:39:22 +08:00
committed by GitHub
3 changed files with 234 additions and 159 deletions

View File

@@ -861,12 +861,13 @@
"name": "动态企微可信IP",
"description": "修改企微应用可信IP支持Srever酱等第三方通知。验证码以结尾发送到企业微信应用",
"labels": "消息通知",
"version": "1.5.1",
"version": "1.5.2",
"icon": "Wecom_A.png",
"author": "RamenRa",
"level": 2,
"v2": true,
"history": {
"v1.5.2": "可以从指定url获取ip,修复不使用cc时cookie失效过快v1可配置第三方为备用通知server酱可以将文本发送到server3,二维码给服务号",
"v1.5.1": "修复v2微信通知可以指定微信通知ID",
"v1.5.0": "支持企微应用通知和第Serve酱等第三方推送。按要求修改插件名称",
"v1.4.1": "完善面板说明",

View File

@@ -18,9 +18,10 @@ from app.core.event import eventmanager, Event
from app.helper.cookiecloud import CookieCloudHelper
from app.log import logger
from app.plugins import _PluginBase
from app.schemas.types import EventType
from app.plugins.dynamicwechat.update_help import PyCookieCloud
from app.plugins.dynamicwechat.notify_helper import MySender
from app.schemas.types import EventType
class DynamicWeChat(_PluginBase):
@@ -31,7 +32,7 @@ class DynamicWeChat(_PluginBase):
# 插件图标
plugin_icon = "Wecom_A.png"
# 插件版本
plugin_version = "1.5.1"
plugin_version = "1.5.2"
# 插件作者
plugin_author = "RamenRa"
# 作者主页
@@ -61,6 +62,8 @@ class DynamicWeChat(_PluginBase):
_is_special_upload = False
# 聚合通知
_my_send = None
# 保存cookie
_saved_cookie = None
# 通知方式token/api
_notification_token = ''
@@ -104,6 +107,7 @@ class DynamicWeChat(_PluginBase):
version = settings.VERSION_FLAG # V2
else:
version = "v1"
def init_plugin(self, config: dict = None):
# 清空配置
self._notification_token = ''
@@ -113,9 +117,7 @@ class DynamicWeChat(_PluginBase):
self._local_scan = False
self._input_id_list = ''
self._cookie_header = ""
self._current_ip_address = self.get_ip_from_url(random.choice(self._ip_urls))
self._settings_file_path = self.get_data_path() / "settings.json"
# self._cookie_lifetime = PyCookieCloud.load_cookie_lifetime()
if config:
self._enabled = config.get("enabled")
self._notification_token = config.get("notification_token")
@@ -134,6 +136,10 @@ class DynamicWeChat(_PluginBase):
self._my_send = MySender(self._notification_token)
if not self._my_send.init_success: # 没有输入通知方式,不通知
self._my_send = None
if "||" in self._input_id_list:
parts = self._input_id_list.split("||", 1)
self._ip_urls = parts[1].split(",")
self._current_ip_address = self.get_ip_from_url(random.choice(self._ip_urls))
# 停止现有任务
self.stop_service()
if (self._enabled or self._onlyonce) and self._input_id_list:
@@ -273,10 +279,15 @@ class DynamicWeChat(_PluginBase):
self.__update_config()
logger.info("----------------------本次任务结束----------------------")
else:
logger.warning("cookie已失效请及时更新不检公网IP")
logger.warning("cookie已失效请及时更新本次不检公网IP")
def CheckIP(self):
retry_urls = random.sample(self._ip_urls, len(self._ip_urls))
if "||" in self._input_id_list:
parts = self._input_id_list.split("||", 1)
ip_urls = parts[1].split(",")
else:
ip_urls = self._ip_urls
retry_urls = random.sample(ip_urls, len(ip_urls))
ip_address = None
for url in retry_urls:
@@ -290,10 +301,6 @@ class DynamicWeChat(_PluginBase):
logger.error("获取IP失败 不操作IP")
return False
if self._forced_update:
logger.info("强制更新IP")
self._current_ip_address = ip_address
return True
elif not self._ip_changed: # 上次修改IP失败
logger.info("上次IP修改IP没有成功 继续尝试修改IP")
self._current_ip_address = ip_address
@@ -373,8 +380,6 @@ class DynamicWeChat(_PluginBase):
logger.debug(str(e))
return None, None
def ChangeIP(self):
logger.info("开始请求企业微信管理更改可信IP")
try:
@@ -416,8 +421,8 @@ class DynamicWeChat(_PluginBase):
else:
self._ip_changed = False
browser.close()
except Exception as e:
self._ip_changed = False
logger.error(f"更改可信IP失败: {e}")
finally:
pass
@@ -432,34 +437,50 @@ class DynamicWeChat(_PluginBase):
return
logger.info("使用二维码登录成功开始刷新cookie")
try:
if self._cc_server.check_connection():
current_url = page.url
current_cookies = context.cookies(current_url) # 通过 context 获取 cookies
if current_cookies is None:
logger.error("无法获取当前 cookies")
return
formatted_cookies = {}
for cookie in current_cookies:
domain = cookie.get('domain') # 使用 get() 方法避免 KeyError
if domain is None:
continue # 跳过没有 domain 的 cookie
if domain not in formatted_cookies:
formatted_cookies[domain] = []
formatted_cookies[domain].append(cookie)
flag = self._cc_server.update_cookie(formatted_cookies)
if flag:
logger.info("更新 CookieCloud 成功")
else:
logger.error("更新 CookieCloud 失败")
else:
if not self._cc_server.check_connection():
logger.error("连接 CookieCloud 失败", self._server)
except Exception as e:
logger.error(f"更新 cookie 发生错误: {e}")
else:
logger.error("CookieCloud没有启用或配置错误, 不刷新cookie")
return
current_url = page.url
current_cookies = context.cookies(current_url) # 通过 context 获取 cookies
if current_cookies is None:
logger.error("无法获取当前 cookies")
return
self._saved_cookie = current_cookies
formatted_cookies = {}
for cookie in current_cookies:
domain = cookie.get('domain') # 使用 get() 方法避免 KeyError
if domain is None:
continue # 跳过没有 domain 的 cookie
def get_cookie(self): # 只有从CookieCloud获取cookie成功才返回True
if domain not in formatted_cookies:
formatted_cookies[domain] = []
formatted_cookies[domain].append(cookie)
flag = self._cc_server.update_cookie(formatted_cookies)
if flag:
logger.info("更新 CookieCloud 成功")
self._cookie_valid = True
else:
logger.error("更新 CookieCloud 失败")
except Exception as e:
logger.error(f"CookieCloud更新 cookie 发生错误: {e}")
else:
try:
current_url = page.url
current_cookies = context.cookies(current_url) # 通过 context 获取 cookies
if current_cookies is None:
logger.error("更新本地 Cookie失败")
return
else:
logger.info("更新本地 Cookie成功")
self._saved_cookie = current_cookies # 保存
self._cookie_valid = True
except Exception as e:
logger.error(f"更新本地 cookie 发生错误: {e}")
def get_cookie(self):
if self._saved_cookie and self._cookie_valid:
return self._saved_cookie
try:
cookie_header = ''
if self._use_cookiecloud:
@@ -500,34 +521,60 @@ class DynamicWeChat(_PluginBase):
return cookies
def refresh_cookie(self): # 保活
if self._use_cookiecloud:
try:
with sync_playwright() as p:
browser = p.chromium.launch(headless=True, args=['--lang=zh-CN'])
context = browser.new_context()
cookie = self.get_cookie()
if cookie:
context.add_cookies(cookie)
try:
with sync_playwright() as p:
browser = p.chromium.launch(headless=True, args=['--lang=zh-CN'])
context = browser.new_context()
cookie_used = False
if self._saved_cookie:
# logger.info("尝试使用本地保存的 cookie")
context.add_cookies(self._saved_cookie)
page = context.new_page()
page.goto(self._wechatUrl)
time.sleep(3)
if not self.check_login_status(page, task='refresh_cookie'):
self._cookie_valid = False
if self._my_send:
result = self._my_send.send(title="cookie已失效请及时更新",
content="请在企业微信应用发送/push_qr让插件推送二维码。如果是使用微信通知请确保公网IP还没有变动",
image=None, force_send=False) # 标题,内容,图片,是否强制发送
if result:
logger.info(f"cookie失效通知发送失败原因{result}")
else:
if self.check_login_status(page, task='refresh_cookie'):
# logger.info("本地保存的 cookie 有效")
self._cookie_valid = True
if self._my_send:
self._my_send.reset_limit()
PyCookieCloud.increase_cookie_lifetime(self._settings_file_path, 1200)
self._cookie_lifetime = PyCookieCloud.load_cookie_lifetime(self._settings_file_path)
browser.close()
except Exception as e:
logger.error(f"cookie校验失败:{e}")
cookie_used = True
else:
# logger.warning("本地保存的 cookie 无效")
self._cookie_valid = False
self._saved_cookie = None # 清空无效的 cookie
if not cookie_used and self._use_cookiecloud:
# logger.info("尝试从CookieCloud 获取新的 cookie")
cookie = self.get_cookie()
if cookie:
context.add_cookies(cookie)
page = context.new_page()
page.goto(self._wechatUrl)
time.sleep(3)
if self.check_login_status(page, task='refresh_cookie'):
# logger.info("新获取的 cookie 有效")
self._cookie_valid = True
self._saved_cookie = context.cookies() # 保存有效的 cookie
else:
# logger.warning("新获取的 cookie 无效")
self._cookie_valid = False
self._saved_cookie = None # 清空无效的 cookie
if self._my_send:
result = self._my_send.send(
title="cookie已失效请及时更新",
content="请在企业微信应用发送/push_qr 如有验证码以''结束发送到企业微信应用。 如果是使用微信通知请确保公网IP还没有变动",
image=None, force_send=False
)
if result:
logger.info(f"cookie失效通知发送失败原因{result}")
if self._cookie_valid:
if self._my_send:
self._my_send.reset_limit()
PyCookieCloud.increase_cookie_lifetime(self._settings_file_path, 1200)
self._cookie_lifetime = PyCookieCloud.load_cookie_lifetime(self._settings_file_path)
browser.close()
except Exception as e:
self._cookie_valid = False
self._saved_cookie = None # 异常时清空 cookie
logger.error(f"cookie 校验过程中发生异常: {e}")
#
def check_login_status(self, page, task):
@@ -590,45 +637,45 @@ class DynamicWeChat(_PluginBase):
"//div[contains(@class, 'js_show_ipConfig_dialog')]//a[contains(@class, '_mod_card_operationLink') and text()='配置']",
"配置")
]
if self._input_id_list:
id_list = self._input_id_list.split(",")
app_urls = [f"{bash_url}{app_id.strip()}" for app_id in id_list]
for app_url in app_urls:
page.goto(app_url) # 打开应用详情页
app_id = app_url.split("/")[-1]
time.sleep(2)
# 依次点击每个按钮
for xpath, name in buttons:
# 等待按钮出现并可点击
try:
button = page.wait_for_selector(xpath, timeout=5000) # 等待按钮可点击
button.click()
# logger.info(f"已点击 '{name}' 按钮")
page.wait_for_selector('textarea.js_ipConfig_textarea', timeout=5000)
# logger.info(f"已找到文本框")
input_area = page.locator('textarea.js_ipConfig_textarea')
confirm = page.locator('.js_ipConfig_confirmBtn')
input_area.fill(self._current_ip_address) # 填充 IP 地址
confirm.click() # 点击确认按钮
time.sleep(3) # 等待处理
self._ip_changed = True
except Exception as e:
logger.error(f"未能找打开{app_url}或点击 '{name}' 按钮异常: {e}")
self._ip_changed = False
if "disabled" in str(e):
logger.info(f"应用{app_id} 已被禁用,可能是没有设置接收api")
if self._ip_changed:
logger.info(f"应用: {app_id} 输入IP" + self._current_ip_address)
ip_parts = self._current_ip_address.split('.')
masked_ip = f"{ip_parts[0]}.{len(ip_parts[1]) * '*'}.{len(ip_parts[2]) * '*'}.{ip_parts[3]}"
if self._my_send:
result = self._my_send.send(title="更新可信IP成功",
content='应用: ' + app_id + ' 输入IP' + masked_ip,
force_send=True, diy_channel="WeChat")
return
if "||" in self._input_id_list:
parts = self._input_id_list.split("||", 1)
input_id_list = parts[0]
else:
logger.error("未找到应用id修改IP失败")
return
input_id_list = self._input_id_list
id_list = input_id_list.split(",")
app_urls = [f"{bash_url}{app_id.strip()}" for app_id in id_list]
for app_url in app_urls:
page.goto(app_url) # 打开应用详情页
app_id = app_url.split("/")[-1]
time.sleep(2)
# 依次点击每个按钮
for xpath, name in buttons:
# 等待按钮出现并可点击
try:
button = page.wait_for_selector(xpath, timeout=5000) # 等待按钮可点击
button.click()
# logger.info(f"已点击 '{name}' 按钮")
page.wait_for_selector('textarea.js_ipConfig_textarea', timeout=5000)
# logger.info(f"已找到文本框")
input_area = page.locator('textarea.js_ipConfig_textarea')
confirm = page.locator('.js_ipConfig_confirmBtn')
input_area.fill(self._current_ip_address) # 填充 IP 地址
confirm.click() # 点击确认按钮
time.sleep(3) # 等待处理
self._ip_changed = True
except Exception as e:
logger.error(f"未能找打开{app_url}或点击 '{name}' 按钮异常: {e}")
self._ip_changed = False
if "disabled" in str(e):
logger.info(f"应用{app_id} 已被禁用,可能是没有设置接收api")
if self._ip_changed:
logger.info(f"应用: {app_id} 输入IP" + self._current_ip_address)
ip_parts = self._current_ip_address.split('.')
masked_ip = f"{ip_parts[0]}.{len(ip_parts[1]) * '*'}.{len(ip_parts[2]) * '*'}.{ip_parts[3]}"
if self._my_send:
result = self._my_send.send(title="更新可信IP成功",
content='应用: ' + app_id + ' 输入IP' + masked_ip,
force_send=True, diy_channel="WeChat")
def __update_config(self):
"""
@@ -885,7 +932,7 @@ class DynamicWeChat(_PluginBase):
if self._qr_code_image is None:
img_component = {
"component": "div",
"text": "登录二维码都会在此展示二维码有6秒延时,过期时间仅对应‘本地扫码功能’",
"text": "登录二维码都会在此展示二维码有6秒延时。 [适用于Docker版]",
"props": {
"style": {
"fontSize": "22px",
@@ -1018,6 +1065,7 @@ class DynamicWeChat(_PluginBase):
if result:
logger.info(f"远程推送任务: 二维码发送失败,原因:{result}")
browser.close()
logger.info("----------------------本次任务结束----------------------")
return
logger.info("远程推送任务: 二维码已经发送,等待用户 90 秒内扫码登录")
# logger.info("远程推送任务: 如收到短信验证码请以?结束,发送到<企业微信应用> 如: 110301")
@@ -1032,6 +1080,7 @@ class DynamicWeChat(_PluginBase):
else:
logger.warning("远程推送任务: 未找到二维码")
browser.close()
logger.info("----------------------本次任务结束----------------------")
except Exception as e:
logger.error(f"远程推送任务: 推送二维码失败: {e}")
@@ -1063,8 +1112,6 @@ class DynamicWeChat(_PluginBase):
if self.text[:6].isdigit() and len(self.text) == 7:
self._verification_code = self.text[:6]
logger.info(f"收到验证码:{self._verification_code}")
# else:
# logger.info(f"收到消息:{self.text}")
def get_service(self) -> List[Dict[str, Any]]:
"""

View File

@@ -1,89 +1,112 @@
import re
import requests
from app.modules.wechat import WeChat
from app.schemas.types import NotificationType
from app.schemas.types import NotificationType,MessageChannel
class MySender:
def __init__(self, token=None, func=None):
self.token = token
self.channel = self.send_channel() if token else None # 初始化时确定发送渠道
self.first_text_sent = False # 记录是否已发送过纯文本消息
self.init_success = bool(token) # 标识初始化成功
self.post_message_func = func # V2微信模式的 post_message 方法
self.tokens = token.split('||') if token and '||' in token else [token] if token else []
self.channels = [MySender._detect_channel(t) for t in self.tokens]
self.current_index = 0 # 当前使用的 token 和 channel 的索引
self.first_text_sent = False # 是否已发送过纯文本消息
self.init_success = bool(self.tokens) # 标识初始化是否成功
self.post_message_func = func # V2 微信模式的 post_message 方法
def send_channel(self):
if "WeChat" in self.token:
@staticmethod
def _detect_channel(token):
"""根据 token 确定通知渠道"""
if "WeChat" in token:
return "WeChat"
letters_only = ''.join(re.findall(r'[A-Za-z]', self.token))
if self.token.lower().startswith("sct".lower()):
letters_only = ''.join(re.findall(r'[A-Za-z]', token))
if token.lower().startswith("sct"):
return "ServerChan"
elif letters_only.isupper():
return "AnPush"
else:
return "PushPlus"
# 标题,内容,图片,是否强制发送
def send(self, title, content=None, image=None, force_send=False, diy_channel=None):
"""发送消息"""
if not self.init_success:
return # 如果初始化失败,直接返回
return
# 对纯文本消息进行限制
if not image and not force_send:
if self.first_text_sent:
return
else:
self.first_text_sent = True
self.first_text_sent = True
# # 如果是 V2 微信通知直接处理
if self.channel == "WeChat" and self.post_message_func:
return self.send_v2_wechat(title, content, image)
# 如果指定了自定义通道,直接尝试发送
if diy_channel:
return self._try_send(title, content, image, diy_channel)
try:
if not diy_channel:
channel = self.channel
else:
channel = diy_channel
# 尝试按顺序发送,直到成功或遍历所有通道
for i in range(len(self.tokens)):
token = self.tokens[self.current_index]
channel = self.channels[self.current_index]
try:
result = self._try_send(title, content, image, channel, token)
if result is None: # 成功时返回 None
return
except Exception as e:
# 打印错误日志或处理错误
return f"{channel} 通知错误: {e}"
# 切换到下一个通道
self.current_index = (self.current_index + 1) % len(self.tokens)
return f"所有的通知方式都发送失败"
if channel == "WeChat":
return MySender.send_wechat(title, content, image, self.token)
elif channel == "ServerChan":
return self.send_serverchan(title, content, image)
elif channel == "AnPush":
return self.send_anpush(title, content, image)
elif channel == "PushPlus":
return self.send_pushplus(title, content, image)
else:
return "Unknown channel"
except Exception as e:
return f"Error occurred: {str(e)}"
def _try_send(self, title, content, image, channel, token=None):
"""尝试使用指定通道发送消息"""
if channel == "WeChat" and self.post_message_func:
return self._send_v2_wechat(title, content, image, token)
elif channel == "WeChat":
return self._send_wechat(title, content, image, token)
elif channel == "ServerChan":
return self._send_serverchan(title, content, image)
elif channel == "AnPush":
return self._send_anpush(title, content, image)
elif channel == "PushPlus":
return self._send_pushplus(title, content, image)
else:
raise ValueError(f"Unknown channel: {channel}")
@staticmethod
def send_wechat(title, content, image, token):
def _send_wechat(title, content, image, token):
wechat = WeChat()
if ',' in token:
if token and ',' in token:
channel, actual_userid = token.split(',', 1)
else:
actual_userid = None
if image:
send_status = wechat.send_msg(title='企业微信登录二维码', image=image, link=image, userid=actual_userid)
else:
send_status = wechat.send_msg(title=title, text=content)
send_status = wechat.send_msg(title=title, text=content, userid=actual_userid)
if send_status is None:
return "微信通知发送错误"
return None
def send_serverchan(self, title, content, image):
if self.token.startswith('sctp'):
match = re.match(r'sctp(\d+)t', self.token)
def _send_serverchan(self, title, content, image):
tmp_tokens = self.tokens[self.current_index]
token = tmp_tokens
if ',' in tmp_tokens:
before_comma, after_comma = tmp_tokens.split(',', 1)
if before_comma.startswith('sctp') and image:
token = after_comma # 图片发到公众号
else:
token = before_comma # 发到 server3
if token.startswith('sctp'):
match = re.match(r'sctp(\d+)t', token)
if match:
num = match.group(1)
url = f'https://{num}.push.ft07.com/send/{self.token}.send'
url = f'https://{num}.push.ft07.com/send/{token}.send'
else:
raise ValueError('Invalid sendkey format for sctp')
return '错误的Server3 Sendkey'
else:
url = f'https://sctapi.ftqq.com/{self.token}.send'
url = f'https://sctapi.ftqq.com/{token}.send'
params = {'title': title, 'desp': f'![img]({image})' if image else content}
headers = {'Content-Type': 'application/json;charset=utf-8'}
@@ -93,11 +116,12 @@ class MySender:
return f"Server酱通知错误: {result.get('message')}"
return None
def send_anpush(self, title, content, image):
if ',' in self.token:
channel, token = self.token.split(',', 1)
def _send_anpush(self, title, content, image):
token = self.tokens[self.current_index] # 获取当前通道对应的 token
if ',' in token:
channel, token = token.split(',', 1)
else:
return
return "可能AnPush 没有配置消息通道ID"
url = f"https://api.anpush.com/push/{token}"
payload = {
"title": title,
@@ -114,8 +138,9 @@ class MySender:
return "AnPush 消息通道未找到"
return None
def send_pushplus(self, title, content, image):
pushplus_url = f"http://www.pushplus.plus/send/{self.token}"
def _send_pushplus(self, title, content, image):
token = self.tokens[self.current_index] # 获取当前通道对应的 token
pushplus_url = f"http://www.pushplus.plus/send/{token}"
# PushPlus发送逻辑
data = {
"title": title,
@@ -128,12 +153,14 @@ class MySender:
return f"PushPlus send failed: {result.get('msg')}"
return None
def send_v2_wechat(self, title, content, image):
def _send_v2_wechat(self, title, content, image, token):
"""V2 微信通知发送"""
if not self.token or ',' not in self.token:
return '没有指定V2微信用户ID'
channel, actual_userid = self.token.split(',', 1)
if token and ',' in token:
channel, actual_userid = token.split(',', 1)
else:
actual_userid = None
self.post_message_func(
channel=MessageChannel.Wechat,
mtype=NotificationType.Plugin,
title=title,
text=content,
@@ -141,7 +168,7 @@ class MySender:
link=image,
userid=actual_userid
)
return None
return None # 由于self.post_message()了None外没有其他返回值。无法判断是否发送成功V2直接默认成功
def reset_limit(self):
"""解除限制,允许再次发送纯文本消息"""