diff --git a/package.json b/package.json index da6ff03..440c381 100644 --- a/package.json +++ b/package.json @@ -861,12 +861,13 @@ "name": "动态企微可信IP", "description": "修改企微应用可信IP,支持Srever酱等第三方通知。验证码以?结尾发送到企业微信应用", "labels": "消息通知", - "version": "1.5.1", + "version": "1.5.2", "icon": "Wecom_A.png", "author": "RamenRa", "level": 2, "v2": true, "history": { + "v1.5.2": "可以从指定url获取ip,修复不使用cc时cookie失效过快,v1可配置第三方为备用通知,server酱可以将文本发送到server3,二维码给服务号", "v1.5.1": "修复v2微信通知,可以指定微信通知ID", "v1.5.0": "支持企微应用通知和第Serve酱等第三方推送。按要求修改插件名称", "v1.4.1": "完善面板说明", diff --git a/plugins/dynamicwechat/__init__.py b/plugins/dynamicwechat/__init__.py index aa16cfc..2b977f4 100644 --- a/plugins/dynamicwechat/__init__.py +++ b/plugins/dynamicwechat/__init__.py @@ -18,9 +18,10 @@ from app.core.event import eventmanager, Event from app.helper.cookiecloud import CookieCloudHelper from app.log import logger from app.plugins import _PluginBase +from app.schemas.types import EventType from app.plugins.dynamicwechat.update_help import PyCookieCloud from app.plugins.dynamicwechat.notify_helper import MySender -from app.schemas.types import EventType + class DynamicWeChat(_PluginBase): @@ -31,7 +32,7 @@ class DynamicWeChat(_PluginBase): # 插件图标 plugin_icon = "Wecom_A.png" # 插件版本 - plugin_version = "1.5.1" + plugin_version = "1.5.2" # 插件作者 plugin_author = "RamenRa" # 作者主页 @@ -61,6 +62,8 @@ class DynamicWeChat(_PluginBase): _is_special_upload = False # 聚合通知 _my_send = None + # 保存cookie + _saved_cookie = None # 通知方式token/api _notification_token = '' @@ -104,6 +107,7 @@ class DynamicWeChat(_PluginBase): version = settings.VERSION_FLAG # V2 else: version = "v1" + def init_plugin(self, config: dict = None): # 清空配置 self._notification_token = '' @@ -113,9 +117,7 @@ class DynamicWeChat(_PluginBase): self._local_scan = False self._input_id_list = '' self._cookie_header = "" - self._current_ip_address = self.get_ip_from_url(random.choice(self._ip_urls)) self._settings_file_path = self.get_data_path() / "settings.json" - # self._cookie_lifetime = PyCookieCloud.load_cookie_lifetime() if config: self._enabled = config.get("enabled") self._notification_token = config.get("notification_token") @@ -134,6 +136,10 @@ class DynamicWeChat(_PluginBase): self._my_send = MySender(self._notification_token) if not self._my_send.init_success: # 没有输入通知方式,不通知 self._my_send = None + if "||" in self._input_id_list: + parts = self._input_id_list.split("||", 1) + self._ip_urls = parts[1].split(",") + self._current_ip_address = self.get_ip_from_url(random.choice(self._ip_urls)) # 停止现有任务 self.stop_service() if (self._enabled or self._onlyonce) and self._input_id_list: @@ -273,10 +279,15 @@ class DynamicWeChat(_PluginBase): self.__update_config() logger.info("----------------------本次任务结束----------------------") else: - logger.warning("cookie已失效请及时更新,不检测公网IP") + logger.warning("cookie已失效请及时更新,本次不检查公网IP") def CheckIP(self): - retry_urls = random.sample(self._ip_urls, len(self._ip_urls)) + if "||" in self._input_id_list: + parts = self._input_id_list.split("||", 1) + ip_urls = parts[1].split(",") + else: + ip_urls = self._ip_urls + retry_urls = random.sample(ip_urls, len(ip_urls)) ip_address = None for url in retry_urls: @@ -290,10 +301,6 @@ class DynamicWeChat(_PluginBase): logger.error("获取IP失败 不操作IP") return False - if self._forced_update: - logger.info("强制更新IP") - self._current_ip_address = ip_address - return True elif not self._ip_changed: # 上次修改IP失败 logger.info("上次IP修改IP没有成功 继续尝试修改IP") self._current_ip_address = ip_address @@ -373,8 +380,6 @@ class DynamicWeChat(_PluginBase): logger.debug(str(e)) return None, None - - def ChangeIP(self): logger.info("开始请求企业微信管理更改可信IP") try: @@ -416,8 +421,8 @@ class DynamicWeChat(_PluginBase): else: self._ip_changed = False browser.close() - except Exception as e: + self._ip_changed = False logger.error(f"更改可信IP失败: {e}") finally: pass @@ -432,34 +437,50 @@ class DynamicWeChat(_PluginBase): return logger.info("使用二维码登录成功,开始刷新cookie") try: - if self._cc_server.check_connection(): - current_url = page.url - current_cookies = context.cookies(current_url) # 通过 context 获取 cookies - if current_cookies is None: - logger.error("无法获取当前 cookies") - return - formatted_cookies = {} - for cookie in current_cookies: - domain = cookie.get('domain') # 使用 get() 方法避免 KeyError - if domain is None: - continue # 跳过没有 domain 的 cookie - - if domain not in formatted_cookies: - formatted_cookies[domain] = [] - formatted_cookies[domain].append(cookie) - flag = self._cc_server.update_cookie(formatted_cookies) - if flag: - logger.info("更新 CookieCloud 成功") - else: - logger.error("更新 CookieCloud 失败") - else: + if not self._cc_server.check_connection(): logger.error("连接 CookieCloud 失败", self._server) - except Exception as e: - logger.error(f"更新 cookie 发生错误: {e}") - else: - logger.error("CookieCloud没有启用或配置错误, 不刷新cookie") + return + current_url = page.url + current_cookies = context.cookies(current_url) # 通过 context 获取 cookies + if current_cookies is None: + logger.error("无法获取当前 cookies") + return + self._saved_cookie = current_cookies + formatted_cookies = {} + for cookie in current_cookies: + domain = cookie.get('domain') # 使用 get() 方法避免 KeyError + if domain is None: + continue # 跳过没有 domain 的 cookie - def get_cookie(self): # 只有从CookieCloud获取cookie成功才返回True + if domain not in formatted_cookies: + formatted_cookies[domain] = [] + formatted_cookies[domain].append(cookie) + flag = self._cc_server.update_cookie(formatted_cookies) + if flag: + logger.info("更新 CookieCloud 成功") + self._cookie_valid = True + else: + logger.error("更新 CookieCloud 失败") + + except Exception as e: + logger.error(f"CookieCloud更新 cookie 发生错误: {e}") + else: + try: + current_url = page.url + current_cookies = context.cookies(current_url) # 通过 context 获取 cookies + if current_cookies is None: + logger.error("更新本地 Cookie失败") + return + else: + logger.info("更新本地 Cookie成功") + self._saved_cookie = current_cookies # 保存 + self._cookie_valid = True + except Exception as e: + logger.error(f"更新本地 cookie 发生错误: {e}") + + def get_cookie(self): + if self._saved_cookie and self._cookie_valid: + return self._saved_cookie try: cookie_header = '' if self._use_cookiecloud: @@ -500,34 +521,60 @@ class DynamicWeChat(_PluginBase): return cookies def refresh_cookie(self): # 保活 - if self._use_cookiecloud: - try: - with sync_playwright() as p: - browser = p.chromium.launch(headless=True, args=['--lang=zh-CN']) - context = browser.new_context() - cookie = self.get_cookie() - if cookie: - context.add_cookies(cookie) + try: + with sync_playwright() as p: + browser = p.chromium.launch(headless=True, args=['--lang=zh-CN']) + context = browser.new_context() + cookie_used = False + if self._saved_cookie: + # logger.info("尝试使用本地保存的 cookie") + context.add_cookies(self._saved_cookie) page = context.new_page() page.goto(self._wechatUrl) time.sleep(3) - if not self.check_login_status(page, task='refresh_cookie'): - self._cookie_valid = False - if self._my_send: - result = self._my_send.send(title="cookie已失效,请及时更新", - content="请在企业微信应用发送/push_qr,让插件推送二维码。如果是使用微信通知请确保公网IP还没有变动", - image=None, force_send=False) # 标题,内容,图片,是否强制发送 - if result: - logger.info(f"cookie失效通知发送失败,原因:{result}") - else: + if self.check_login_status(page, task='refresh_cookie'): + # logger.info("本地保存的 cookie 有效") self._cookie_valid = True - if self._my_send: - self._my_send.reset_limit() - PyCookieCloud.increase_cookie_lifetime(self._settings_file_path, 1200) - self._cookie_lifetime = PyCookieCloud.load_cookie_lifetime(self._settings_file_path) - browser.close() - except Exception as e: - logger.error(f"cookie校验失败:{e}") + cookie_used = True + else: + # logger.warning("本地保存的 cookie 无效") + self._cookie_valid = False + self._saved_cookie = None # 清空无效的 cookie + + if not cookie_used and self._use_cookiecloud: + # logger.info("尝试从CookieCloud 获取新的 cookie") + cookie = self.get_cookie() + if cookie: + context.add_cookies(cookie) + page = context.new_page() + page.goto(self._wechatUrl) + time.sleep(3) + if self.check_login_status(page, task='refresh_cookie'): + # logger.info("新获取的 cookie 有效") + self._cookie_valid = True + self._saved_cookie = context.cookies() # 保存有效的 cookie + else: + # logger.warning("新获取的 cookie 无效") + self._cookie_valid = False + self._saved_cookie = None # 清空无效的 cookie + if self._my_send: + result = self._my_send.send( + title="cookie已失效,请及时更新", + content="请在企业微信应用发送/push_qr, 如有验证码以'?'结束发送到企业微信应用。 如果是使用’微信通知‘请确保公网IP还没有变动", + image=None, force_send=False + ) + if result: + logger.info(f"cookie失效通知发送失败,原因:{result}") + if self._cookie_valid: + if self._my_send: + self._my_send.reset_limit() + PyCookieCloud.increase_cookie_lifetime(self._settings_file_path, 1200) + self._cookie_lifetime = PyCookieCloud.load_cookie_lifetime(self._settings_file_path) + browser.close() + except Exception as e: + self._cookie_valid = False + self._saved_cookie = None # 异常时清空 cookie + logger.error(f"cookie 校验过程中发生异常: {e}") # def check_login_status(self, page, task): @@ -590,45 +637,45 @@ class DynamicWeChat(_PluginBase): "//div[contains(@class, 'js_show_ipConfig_dialog')]//a[contains(@class, '_mod_card_operationLink') and text()='配置']", "配置") ] - if self._input_id_list: - id_list = self._input_id_list.split(",") - app_urls = [f"{bash_url}{app_id.strip()}" for app_id in id_list] - for app_url in app_urls: - page.goto(app_url) # 打开应用详情页 - app_id = app_url.split("/")[-1] - time.sleep(2) - # 依次点击每个按钮 - for xpath, name in buttons: - # 等待按钮出现并可点击 - try: - button = page.wait_for_selector(xpath, timeout=5000) # 等待按钮可点击 - button.click() - # logger.info(f"已点击 '{name}' 按钮") - page.wait_for_selector('textarea.js_ipConfig_textarea', timeout=5000) - # logger.info(f"已找到文本框") - input_area = page.locator('textarea.js_ipConfig_textarea') - confirm = page.locator('.js_ipConfig_confirmBtn') - input_area.fill(self._current_ip_address) # 填充 IP 地址 - confirm.click() # 点击确认按钮 - time.sleep(3) # 等待处理 - self._ip_changed = True - except Exception as e: - logger.error(f"未能找打开{app_url}或点击 '{name}' 按钮异常: {e}") - self._ip_changed = False - if "disabled" in str(e): - logger.info(f"应用{app_id} 已被禁用,可能是没有设置接收api") - if self._ip_changed: - logger.info(f"应用: {app_id} 输入IP:" + self._current_ip_address) - ip_parts = self._current_ip_address.split('.') - masked_ip = f"{ip_parts[0]}.{len(ip_parts[1]) * '*'}.{len(ip_parts[2]) * '*'}.{ip_parts[3]}" - if self._my_send: - result = self._my_send.send(title="更新可信IP成功", - content='应用: ' + app_id + ' 输入IP:' + masked_ip, - force_send=True, diy_channel="WeChat") - return + if "||" in self._input_id_list: + parts = self._input_id_list.split("||", 1) + input_id_list = parts[0] else: - logger.error("未找到应用id,修改IP失败") - return + input_id_list = self._input_id_list + id_list = input_id_list.split(",") + app_urls = [f"{bash_url}{app_id.strip()}" for app_id in id_list] + for app_url in app_urls: + page.goto(app_url) # 打开应用详情页 + app_id = app_url.split("/")[-1] + time.sleep(2) + # 依次点击每个按钮 + for xpath, name in buttons: + # 等待按钮出现并可点击 + try: + button = page.wait_for_selector(xpath, timeout=5000) # 等待按钮可点击 + button.click() + # logger.info(f"已点击 '{name}' 按钮") + page.wait_for_selector('textarea.js_ipConfig_textarea', timeout=5000) + # logger.info(f"已找到文本框") + input_area = page.locator('textarea.js_ipConfig_textarea') + confirm = page.locator('.js_ipConfig_confirmBtn') + input_area.fill(self._current_ip_address) # 填充 IP 地址 + confirm.click() # 点击确认按钮 + time.sleep(3) # 等待处理 + self._ip_changed = True + except Exception as e: + logger.error(f"未能找打开{app_url}或点击 '{name}' 按钮异常: {e}") + self._ip_changed = False + if "disabled" in str(e): + logger.info(f"应用{app_id} 已被禁用,可能是没有设置接收api") + if self._ip_changed: + logger.info(f"应用: {app_id} 输入IP:" + self._current_ip_address) + ip_parts = self._current_ip_address.split('.') + masked_ip = f"{ip_parts[0]}.{len(ip_parts[1]) * '*'}.{len(ip_parts[2]) * '*'}.{ip_parts[3]}" + if self._my_send: + result = self._my_send.send(title="更新可信IP成功", + content='应用: ' + app_id + ' 输入IP:' + masked_ip, + force_send=True, diy_channel="WeChat") def __update_config(self): """ @@ -885,7 +932,7 @@ class DynamicWeChat(_PluginBase): if self._qr_code_image is None: img_component = { "component": "div", - "text": "登录二维码都会在此展示,二维码有6秒延时,过期时间仅对应‘本地扫码功能’", + "text": "登录二维码都会在此展示,二维码有6秒延时。 [适用于Docker版]", "props": { "style": { "fontSize": "22px", @@ -1018,6 +1065,7 @@ class DynamicWeChat(_PluginBase): if result: logger.info(f"远程推送任务: 二维码发送失败,原因:{result}") browser.close() + logger.info("----------------------本次任务结束----------------------") return logger.info("远程推送任务: 二维码已经发送,等待用户 90 秒内扫码登录") # logger.info("远程推送任务: 如收到短信验证码请以?结束,发送到<企业微信应用> 如: 110301?") @@ -1032,6 +1080,7 @@ class DynamicWeChat(_PluginBase): else: logger.warning("远程推送任务: 未找到二维码") browser.close() + logger.info("----------------------本次任务结束----------------------") except Exception as e: logger.error(f"远程推送任务: 推送二维码失败: {e}") @@ -1063,8 +1112,6 @@ class DynamicWeChat(_PluginBase): if self.text[:6].isdigit() and len(self.text) == 7: self._verification_code = self.text[:6] logger.info(f"收到验证码:{self._verification_code}") - # else: - # logger.info(f"收到消息:{self.text}") def get_service(self) -> List[Dict[str, Any]]: """ diff --git a/plugins/dynamicwechat/notify_helper.py b/plugins/dynamicwechat/notify_helper.py index 6c70a5d..da1985e 100644 --- a/plugins/dynamicwechat/notify_helper.py +++ b/plugins/dynamicwechat/notify_helper.py @@ -1,89 +1,112 @@ import re import requests from app.modules.wechat import WeChat -from app.schemas.types import NotificationType +from app.schemas.types import NotificationType,MessageChannel class MySender: def __init__(self, token=None, func=None): - self.token = token - self.channel = self.send_channel() if token else None # 初始化时确定发送渠道 - self.first_text_sent = False # 记录是否已发送过纯文本消息 - self.init_success = bool(token) # 标识初始化成功 - self.post_message_func = func # V2微信模式的 post_message 方法 + self.tokens = token.split('||') if token and '||' in token else [token] if token else [] + self.channels = [MySender._detect_channel(t) for t in self.tokens] + self.current_index = 0 # 当前使用的 token 和 channel 的索引 + self.first_text_sent = False # 是否已发送过纯文本消息 + self.init_success = bool(self.tokens) # 标识初始化是否成功 + self.post_message_func = func # V2 微信模式的 post_message 方法 - def send_channel(self): - if "WeChat" in self.token: + @staticmethod + def _detect_channel(token): + """根据 token 确定通知渠道""" + if "WeChat" in token: return "WeChat" - letters_only = ''.join(re.findall(r'[A-Za-z]', self.token)) - if self.token.lower().startswith("sct".lower()): + letters_only = ''.join(re.findall(r'[A-Za-z]', token)) + if token.lower().startswith("sct"): return "ServerChan" elif letters_only.isupper(): return "AnPush" else: return "PushPlus" - # 标题,内容,图片,是否强制发送 def send(self, title, content=None, image=None, force_send=False, diy_channel=None): + """发送消息""" if not self.init_success: - return # 如果初始化失败,直接返回 + return + # 对纯文本消息进行限制 if not image and not force_send: if self.first_text_sent: return - else: - self.first_text_sent = True + self.first_text_sent = True - # # 如果是 V2 微信通知直接处理 - if self.channel == "WeChat" and self.post_message_func: - return self.send_v2_wechat(title, content, image) + # 如果指定了自定义通道,直接尝试发送 + if diy_channel: + return self._try_send(title, content, image, diy_channel) - try: - if not diy_channel: - channel = self.channel - else: - channel = diy_channel + # 尝试按顺序发送,直到成功或遍历所有通道 + for i in range(len(self.tokens)): + token = self.tokens[self.current_index] + channel = self.channels[self.current_index] + try: + result = self._try_send(title, content, image, channel, token) + if result is None: # 成功时返回 None + return + except Exception as e: + # 打印错误日志或处理错误 + return f"{channel} 通知错误: {e}" + # 切换到下一个通道 + self.current_index = (self.current_index + 1) % len(self.tokens) + return f"所有的通知方式都发送失败" - if channel == "WeChat": - return MySender.send_wechat(title, content, image, self.token) - elif channel == "ServerChan": - return self.send_serverchan(title, content, image) - elif channel == "AnPush": - return self.send_anpush(title, content, image) - elif channel == "PushPlus": - return self.send_pushplus(title, content, image) - else: - return "Unknown channel" - except Exception as e: - return f"Error occurred: {str(e)}" + def _try_send(self, title, content, image, channel, token=None): + """尝试使用指定通道发送消息""" + if channel == "WeChat" and self.post_message_func: + return self._send_v2_wechat(title, content, image, token) + elif channel == "WeChat": + return self._send_wechat(title, content, image, token) + elif channel == "ServerChan": + return self._send_serverchan(title, content, image) + elif channel == "AnPush": + return self._send_anpush(title, content, image) + elif channel == "PushPlus": + return self._send_pushplus(title, content, image) + else: + raise ValueError(f"Unknown channel: {channel}") @staticmethod - def send_wechat(title, content, image, token): + def _send_wechat(title, content, image, token): wechat = WeChat() - if ',' in token: + if token and ',' in token: channel, actual_userid = token.split(',', 1) else: actual_userid = None if image: send_status = wechat.send_msg(title='企业微信登录二维码', image=image, link=image, userid=actual_userid) else: - send_status = wechat.send_msg(title=title, text=content) + send_status = wechat.send_msg(title=title, text=content, userid=actual_userid) if send_status is None: return "微信通知发送错误" return None - def send_serverchan(self, title, content, image): - if self.token.startswith('sctp'): - match = re.match(r'sctp(\d+)t', self.token) + def _send_serverchan(self, title, content, image): + tmp_tokens = self.tokens[self.current_index] + token = tmp_tokens + if ',' in tmp_tokens: + before_comma, after_comma = tmp_tokens.split(',', 1) + if before_comma.startswith('sctp') and image: + token = after_comma # 图片发到公众号 + else: + token = before_comma # 发到 server3 + + if token.startswith('sctp'): + match = re.match(r'sctp(\d+)t', token) if match: num = match.group(1) - url = f'https://{num}.push.ft07.com/send/{self.token}.send' + url = f'https://{num}.push.ft07.com/send/{token}.send' else: - raise ValueError('Invalid sendkey format for sctp') + return '错误的Server3 Sendkey' else: - url = f'https://sctapi.ftqq.com/{self.token}.send' + url = f'https://sctapi.ftqq.com/{token}.send' params = {'title': title, 'desp': f'![img]({image})' if image else content} headers = {'Content-Type': 'application/json;charset=utf-8'} @@ -93,11 +116,12 @@ class MySender: return f"Server酱通知错误: {result.get('message')}" return None - def send_anpush(self, title, content, image): - if ',' in self.token: - channel, token = self.token.split(',', 1) + def _send_anpush(self, title, content, image): + token = self.tokens[self.current_index] # 获取当前通道对应的 token + if ',' in token: + channel, token = token.split(',', 1) else: - return + return "可能AnPush 没有配置消息通道ID" url = f"https://api.anpush.com/push/{token}" payload = { "title": title, @@ -114,8 +138,9 @@ class MySender: return "AnPush 消息通道未找到" return None - def send_pushplus(self, title, content, image): - pushplus_url = f"http://www.pushplus.plus/send/{self.token}" + def _send_pushplus(self, title, content, image): + token = self.tokens[self.current_index] # 获取当前通道对应的 token + pushplus_url = f"http://www.pushplus.plus/send/{token}" # PushPlus发送逻辑 data = { "title": title, @@ -128,12 +153,14 @@ class MySender: return f"PushPlus send failed: {result.get('msg')}" return None - def send_v2_wechat(self, title, content, image): + def _send_v2_wechat(self, title, content, image, token): """V2 微信通知发送""" - if not self.token or ',' not in self.token: - return '没有指定V2微信用户ID' - channel, actual_userid = self.token.split(',', 1) + if token and ',' in token: + channel, actual_userid = token.split(',', 1) + else: + actual_userid = None self.post_message_func( + channel=MessageChannel.Wechat, mtype=NotificationType.Plugin, title=title, text=content, @@ -141,7 +168,7 @@ class MySender: link=image, userid=actual_userid ) - return None + return None # 由于self.post_message()了None外,没有其他返回值。无法判断是否发送成功,V2直接默认成功 def reset_limit(self): """解除限制,允许再次发送纯文本消息"""