From 9fad5c094ae1646b9e087c0cacc92472ae9b107a Mon Sep 17 00:00:00 2001 From: ramen <1205925392@qq.com> Date: Wed, 11 Dec 2024 21:43:58 +0800 Subject: [PATCH] =?UTF-8?q?=E5=BF=BD=E7=95=A5=E5=9B=A0=E7=BD=91=E7=BB=9C?= =?UTF-8?q?=E6=B3=A2=E5=8A=A8=E5=AF=BC=E8=87=B4=E8=8E=B7=E5=8F=96ip?= =?UTF-8?q?=E9=94=99=E8=AF=AF=E3=80=82=E8=87=AA=E5=AE=9A=E4=B9=89=E7=9A=84?= =?UTF-8?q?=E7=B1=BB=E5=90=88=E5=B9=B6=E4=B8=BAhelper.py=E3=80=82=E5=90=8E?= =?UTF-8?q?=E7=BB=AD=E6=A0=B8=E5=BF=83=E5=8A=9F=E8=83=BD=E6=B2=A1=E9=97=AE?= =?UTF-8?q?=E9=A2=98=E5=B0=86=E4=B8=8D=E5=86=8D=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- package.json | 8 +- plugins/dynamicwechat/__init__.py | 322 +++++++++--------- .../{notify_helper.py => helper.py} | 131 ++++++- plugins/dynamicwechat/update_help.py | 121 ------- 4 files changed, 292 insertions(+), 290 deletions(-) rename plugins/dynamicwechat/{notify_helper.py => helper.py} (61%) delete mode 100644 plugins/dynamicwechat/update_help.py diff --git a/package.json b/package.json index 3e0b13d..f27986c 100644 --- a/package.json +++ b/package.json @@ -861,20 +861,18 @@ "name": "动态企微可信IP", "description": "修改企微应用可信IP,支持Srever酱等第三方通知。验证码以?结尾发送到企业微信应用", "labels": "消息通知", - "version": "1.5.2", + "version": "1.6.0", "icon": "Wecom_A.png", "author": "RamenRa", "level": 2, "v2": true, "history": { + "v1.6.0": "忽略因网络波动导致获取ip错误。自定义的类合并为helper.py。后续核心功能没问题将不再更新", "v1.5.2": "可以从指定url获取ip,修复不使用cc时cookie失效过快,v1可配置第三方为备用通知,server酱可以将文本发送到server3,二维码给服务号", "v1.5.1": "修复v2微信通知,可以指定微信通知ID", "v1.5.0": "支持企微应用通知和第Serve酱等第三方推送。按要求修改插件名称", "v1.4.1": "完善面板说明", - "v1.4.0": "修复强制更改IP时配置面板延时过长的问题。庆祝v2进入正式版,显示了一个没用的参数", - "v1.3.1": "修正一些逻辑判断,修改ip成功会通知一次", - "v1.3.0": "兼容v2,操作cookie前检查一次CookieCloud", - "v1.2.0": "远程命令/push_qr,立即推送一次二维码到pushplus。添加<本地扫码刷新cookie>" + "v1.4.0": "修复强制更改IP时配置面板延时过长的问题。庆祝v2进入正式版,显示了一个没用的参数" } }, "SyncCookieCloud": { diff --git a/plugins/dynamicwechat/__init__.py b/plugins/dynamicwechat/__init__.py index 2b977f4..c6d9dd5 100644 --- a/plugins/dynamicwechat/__init__.py +++ b/plugins/dynamicwechat/__init__.py @@ -19,20 +19,19 @@ from app.helper.cookiecloud import CookieCloudHelper from app.log import logger from app.plugins import _PluginBase from app.schemas.types import EventType -from app.plugins.dynamicwechat.update_help import PyCookieCloud -from app.plugins.dynamicwechat.notify_helper import MySender +from app.plugins.dynamicwechat.helper import PyCookieCloud, MySender class DynamicWeChat(_PluginBase): # 插件名称 plugin_name = "动态企微可信IP" # 插件描述 - plugin_desc = "修改企微应用可信IP,详细说明查看'作者主页',支持第三方通知。验证码以?结尾发送到企业微信应用" + plugin_desc = "修改企微应用可信IP,详细说明查看'作者主页',支持第三方通知。验证码以?结尾发送到企业微信应用" # 插件图标 plugin_icon = "Wecom_A.png" # 插件版本 - plugin_version = "1.5.2" + plugin_version = "1.6.0" # 插件作者 plugin_author = "RamenRa" # 作者主页 @@ -111,6 +110,7 @@ class DynamicWeChat(_PluginBase): def init_plugin(self, config: dict = None): # 清空配置 self._notification_token = '' + self._cron = '*/10 * * * *' self._ip_changed = True self._forced_update = False self._use_cookiecloud = True @@ -134,12 +134,9 @@ class DynamicWeChat(_PluginBase): self._my_send = MySender(self._notification_token, func=self.post_message) else: self._my_send = MySender(self._notification_token) - if not self._my_send.init_success: # 没有输入通知方式,不通知 + if not self._my_send.init_success: # 没有输入通知方式,不通知 self._my_send = None - if "||" in self._input_id_list: - parts = self._input_id_list.split("||", 1) - self._ip_urls = parts[1].split(",") - self._current_ip_address = self.get_ip_from_url(random.choice(self._ip_urls)) + _, self._current_ip_address = self.get_ip_from_url(self._input_id_list) # 停止现有任务 self.stop_service() if (self._enabled or self._onlyonce) and self._input_id_list: @@ -147,20 +144,24 @@ class DynamicWeChat(_PluginBase): self._scheduler = BackgroundScheduler(timezone=settings.TZ) # 运行一次定时服务 if self._onlyonce: - logger.info("立即检测公网IP") - self._scheduler.add_job(func=self.check, trigger='date', - run_date=datetime.now(tz=pytz.timezone(settings.TZ)) + timedelta(seconds=3), - name="检测公网IP") # 添加任务 + if not self._forced_update or not self._local_scan: + # logger.info("立即检测公网IP") + self._scheduler.add_job(func=self.check, trigger='date', + run_date=datetime.now(tz=pytz.timezone(settings.TZ)) + timedelta(seconds=3), + name="检测公网IP") # 添加任务 # 关闭一次性开关 self._onlyonce = False if self._forced_update: - self._scheduler.add_job(func=self.forced_change, trigger='date', - run_date=datetime.now(tz=pytz.timezone(settings.TZ)) + timedelta(seconds=3), - name="强制更新公网IP") # 添加任务 + if not self._local_scan: + logger.info("使用Cookie,强制更新公网IP") + self._scheduler.add_job(func=self.forced_change, trigger='date', + run_date=datetime.now(tz=pytz.timezone(settings.TZ)) + timedelta(seconds=3), + name="强制更新公网IP") # 添加任务 self._forced_update = False if self._local_scan: + logger.info("使用本地扫码登陆") self._scheduler.add_job(func=self.local_scanning, trigger='date', run_date=datetime.now(tz=pytz.timezone(settings.TZ)) + timedelta(seconds=3), name="本地扫码登陆") # 添加任务 @@ -181,6 +182,17 @@ class DynamicWeChat(_PluginBase): self._scheduler.start() self.__update_config() + def _send_cookie_false(self): + self._cookie_valid = False + if self._my_send: + result = self._my_send.send( + title="cookie已失效,请及时更新", + content="请在企业微信应用发送/push_qr, 如有验证码以'?'结束发送到企业微信应用。 如果使用’微信通知‘请确保公网IP还没有变动", + image=None, force_send=False + ) + if result: + logger.info(f"cookie失效通知发送失败,原因:{result}") + @eventmanager.register(EventType.PluginAction) def forced_change(self, event: Event = None): """ @@ -208,6 +220,7 @@ class DynamicWeChat(_PluginBase): self.click_app_management_buttons(page) else: logger.error("cookie失效,强制修改IP失败:请使用'本地扫码修改IP'") + self._cookie_valid = False browser.close() except Exception as err: logger.error(f"强制修改IP失败:{err}") @@ -226,7 +239,6 @@ class DynamicWeChat(_PluginBase): event_data = event.event_data if not event_data or event_data.get("action") != "dynamicwechat": return - try: with sync_playwright() as p: browser = p.chromium.launch(headless=True, args=['--lang=zh-CN']) @@ -238,7 +250,7 @@ class DynamicWeChat(_PluginBase): current_time = datetime.now() future_time = current_time + timedelta(seconds=110) self._future_timestamp = int(future_time.timestamp()) - logger.info("请重新进入插件面板扫码! 每20秒检查登录状态,最大尝试5次") + logger.info("请重新进入插件面板扫码! 每20秒检查登录状态,最大尝试5次") max_attempts = 5 attempt = 0 while attempt < max_attempts: @@ -252,7 +264,7 @@ class DynamicWeChat(_PluginBase): else: logger.info("用户可能没有扫码或登录失败") else: - logger.error("未找到二维码,任务结束") + logger.error("未找到二维码,任务结束") logger.info("----------------------本次任务结束----------------------") browser.close() except Exception as e: @@ -279,30 +291,20 @@ class DynamicWeChat(_PluginBase): self.__update_config() logger.info("----------------------本次任务结束----------------------") else: - logger.warning("cookie已失效请及时更新,本次不检查公网IP") + logger.warning("cookie已失效请及时更新,本次不检查公网IP") def CheckIP(self): - if "||" in self._input_id_list: - parts = self._input_id_list.split("||", 1) - ip_urls = parts[1].split(",") - else: - ip_urls = self._ip_urls - retry_urls = random.sample(ip_urls, len(ip_urls)) - ip_address = None - - for url in retry_urls: - ip_address = self.get_ip_from_url(url) - if ip_address != "获取IP失败" and ip_address: - logger.info(f"IP获取成功: {url}: {ip_address}") - break + url, ip_address = self.get_ip_from_url(self._input_id_list) + if url and ip_address: + logger.info(f"IP获取成功: {url}: {ip_address}") # 如果所有 URL 请求失败 - if ip_address == "获取IP失败" or not ip_address: - logger.error("获取IP失败 不操作IP") + if ip_address == "获取IP失败" or not url: + logger.error("获取IP失败 不操作可信IP") return False elif not self._ip_changed: # 上次修改IP失败 - logger.info("上次IP修改IP没有成功 继续尝试修改IP") + logger.info("上次IP修改IP失败 继续尝试修改IP") self._current_ip_address = ip_address return True @@ -310,48 +312,54 @@ class DynamicWeChat(_PluginBase): if ip_address != self._current_ip_address: logger.info("检测到IP变化") self._current_ip_address = ip_address - # self._ip_changed = False return True else: return False def try_connect_cc(self): - if self._use_cookiecloud: - if settings.COOKIECLOUD_KEY and settings.COOKIECLOUD_PASSWORD: # 使用设置里的cookieCloud - if settings.COOKIECLOUD_ENABLE_LOCAL: - self._cc_server = PyCookieCloud(url=self._server, uuid=settings.COOKIECLOUD_KEY, - password=settings.COOKIECLOUD_PASSWORD) - logger.info("使用内建CookieCloud服务器") - else: # 使用设置里的cookieCloud - self._cc_server = PyCookieCloud(url=settings.COOKIECLOUD_HOST, uuid=settings.COOKIECLOUD_KEY, - password=settings.COOKIECLOUD_PASSWORD) - logger.info("使用自定义CookieCloud服务器") - if not self._cc_server.check_connection(): - self._cc_server = None - logger.error("没有可用的CookieCloud服务器") - else: # 未设置cookieCloud - self._cc_server = None - logger.error("没有配置CookieCloud的用户KEY和PASSWORD") + if not self._use_cookiecloud: # 不使用CookieCloud + self._cc_server = None + return + if not settings.COOKIECLOUD_KEY or not settings.COOKIECLOUD_PASSWORD: # 没有设置key和password + self._cc_server = None + logger.error("没有配置CookieCloud的用户KEY和PASSWORD") + return + if settings.COOKIECLOUD_ENABLE_LOCAL: + self._cc_server = PyCookieCloud(url=self._server, uuid=settings.COOKIECLOUD_KEY, + password=settings.COOKIECLOUD_PASSWORD) + logger.info("使用内建CookieCloud服务器") + else: # 使用设置里的cookieCloud + self._cc_server = PyCookieCloud(url=settings.COOKIECLOUD_HOST, uuid=settings.COOKIECLOUD_KEY, + password=settings.COOKIECLOUD_PASSWORD) + logger.info("使用自定义CookieCloud服务器") + if not self._cc_server.check_connection(): + self._cc_server = None + logger.error("没有可用的CookieCloud服务器") - def get_ip_from_url(self, url): - try: - # 发送 GET 请求 - response = requests.get(url) - # 检查响应状态码是否为 200 - if response.status_code == 200: - # 解析响应 JSON 数据并获取 IP 地址 - ip_address = re.search(self._ip_pattern, response.text) - if ip_address: - return ip_address.group() - else: - return "获取IP失败" - else: - return "获取IP失败" - except Exception as e: - if "104" in str(e): - pass - else: - logger.warning(f"{url} 获取IP失败,Error: {e}") + def get_ip_from_url(self, input_data) -> (str, str): + # 根据输入解析 URL 列表 + if isinstance(input_data, str) and "||" in input_data: + _, url_list = input_data.split("||", 1) + urls = url_list.split(",") + elif isinstance(input_data, list): + urls = input_data + else: + urls = self._ip_urls + + # 随机化 URL 列表 + random.shuffle(urls) + + for url in urls: + try: + response = requests.get(url, timeout=3) + if response.status_code == 200: + ip_address = re.search(self._ip_pattern, response.text) + if ip_address: + return url, ip_address.group() # 返回匹配的 IP 地址 + except Exception as e: + if "104" not in str(e) or 'Read timed out' not in str(e): # 忽略网络波动,都失败会返回None, "获取IP失败" + logger.warning(f"{url} 获取IP失败, Error: {e}") + return None, "获取IP失败" def find_qrc(self, page): # 查找 iframe 元素并切换到它 @@ -395,30 +403,21 @@ class DynamicWeChat(_PluginBase): time.sleep(3) img_src, refuse_time = self.find_qrc(page) if img_src: - if self._my_send: - result = self._my_send.send(title="企业微信登录二维码", image=img_src) - if result: - logger.info(f"二维码发送失败,原因:{result}") - browser.close() - return - logger.info("二维码已经发送,等待用户 90 秒内扫码登录") - # logger.info("如收到短信验证码请以?结束,发送到<企业微信应用> 如: 110301?") - time.sleep(90) # 等待用户扫码 - login_status = self.check_login_status(page, "") - if login_status: - self._update_cookie(page, context) # 刷新cookie - self.click_app_management_buttons(page) - else: - self._ip_changed = False + if self._my_send: # 统一逻辑,只有用户发送'/push_qr'才会发生二维码 + self._ip_changed = False + self._send_cookie_false() + logger.info("已尝试发送cookie失效通知") else: self._ip_changed = False - logger.info("cookie已失效") + self._cookie_valid = False + logger.info("cookie已失效,且没有配置通知方式,本次修改可信IP失败") else: # 如果直接进入企业微信 logger.info("尝试cookie登录") - login_status = self.check_login_status(page, "") - if login_status: + if self.check_login_status(page, ""): self.click_app_management_buttons(page) else: + logger.info("发生了意料之外的错误,请附上配置信息到github反馈") + self._send_cookie_false() self._ip_changed = False browser.close() except Exception as e: @@ -435,7 +434,7 @@ class DynamicWeChat(_PluginBase): self.try_connect_cc() # 再尝试一次连接 if self._cc_server is None: return - logger.info("使用二维码登录成功,开始刷新cookie") + logger.info("使用二维码登录成功,开始刷新cookie") try: if not self._cc_server.check_connection(): logger.error("连接 CookieCloud 失败", self._server) @@ -443,7 +442,8 @@ class DynamicWeChat(_PluginBase): current_url = page.url current_cookies = context.cookies(current_url) # 通过 context 获取 cookies if current_cookies is None: - logger.error("无法获取当前 cookies") + logger.error("无法从内置浏览器获取 cookies") + self._cookie_valid = False return self._saved_cookie = current_cookies formatted_cookies = {} @@ -455,20 +455,25 @@ class DynamicWeChat(_PluginBase): if domain not in formatted_cookies: formatted_cookies[domain] = [] formatted_cookies[domain].append(cookie) - flag = self._cc_server.update_cookie(formatted_cookies) - if flag: + if self._cc_server.update_cookie(formatted_cookies): logger.info("更新 CookieCloud 成功") self._cookie_valid = True + self._is_special_upload = True else: + self._send_cookie_false() + self._is_special_upload = False logger.error("更新 CookieCloud 失败") except Exception as e: + self._send_cookie_false() + self._is_special_upload = False logger.error(f"CookieCloud更新 cookie 发生错误: {e}") else: try: current_url = page.url current_cookies = context.cookies(current_url) # 通过 context 获取 cookies if current_cookies is None: + self._send_cookie_false() logger.error("更新本地 Cookie失败") return else: @@ -476,6 +481,7 @@ class DynamicWeChat(_PluginBase): self._saved_cookie = current_cookies # 保存 self._cookie_valid = True except Exception as e: + self._send_cookie_false() logger.error(f"更新本地 cookie 发生错误: {e}") def get_cookie(self): @@ -483,28 +489,26 @@ class DynamicWeChat(_PluginBase): return self._saved_cookie try: cookie_header = '' - if self._use_cookiecloud: - cookies, msg = self._cookiecloud.download() - if not cookies: # CookieCloud获取cookie失败 - logger.error(f"CookieCloud获取cookie失败,失败原因:{msg}") - return - else: - for domain, cookie in cookies.items(): - if domain == ".work.weixin.qq.com": - cookie_header = cookie - if '_upload_type=A' in cookie: - self._is_special_upload = True - else: - self._is_special_upload = False - break - if cookie_header == '': - cookie_header = self._cookie_header - else: # 不使用CookieCloud + if not self._use_cookiecloud: return + cookies, msg = self._cookiecloud.download() + if not cookies: # CookieCloud获取cookie失败 + logger.error(f"CookieCloud获取cookie失败,失败原因:{msg}") + return + for domain, cookie in cookies.items(): + if domain == ".work.weixin.qq.com": + cookie_header = cookie + if '_upload_type=A' in cookie: + self._is_special_upload = True + else: + self._is_special_upload = False + break + if cookie_header == '': + cookie_header = self._cookie_header cookie = self.parse_cookie_header(cookie_header) return cookie except Exception as e: - logger.error(f"从CookieCloud获取cookie错误,错误原因:{e}") + logger.error(f"从CookieCloud获取cookie错误,错误原因:{e}") return @staticmethod @@ -544,27 +548,22 @@ class DynamicWeChat(_PluginBase): if not cookie_used and self._use_cookiecloud: # logger.info("尝试从CookieCloud 获取新的 cookie") cookie = self.get_cookie() - if cookie: - context.add_cookies(cookie) - page = context.new_page() - page.goto(self._wechatUrl) - time.sleep(3) - if self.check_login_status(page, task='refresh_cookie'): - # logger.info("新获取的 cookie 有效") - self._cookie_valid = True - self._saved_cookie = context.cookies() # 保存有效的 cookie - else: - # logger.warning("新获取的 cookie 无效") - self._cookie_valid = False - self._saved_cookie = None # 清空无效的 cookie - if self._my_send: - result = self._my_send.send( - title="cookie已失效,请及时更新", - content="请在企业微信应用发送/push_qr, 如有验证码以'?'结束发送到企业微信应用。 如果是使用’微信通知‘请确保公网IP还没有变动", - image=None, force_send=False - ) - if result: - logger.info(f"cookie失效通知发送失败,原因:{result}") + if not cookie: + self._send_cookie_false() + return + context.add_cookies(cookie) + page = context.new_page() + page.goto(self._wechatUrl) + time.sleep(3) + if self.check_login_status(page, task='refresh_cookie'): + # logger.info("新获取的 cookie 有效") + self._cookie_valid = True + self._saved_cookie = context.cookies() # 保存有效的 cookie + else: + # logger.warning("新获取的 cookie 无效") + self._send_cookie_false() + self._saved_cookie = None # 清空无效的 cookie + if self._cookie_valid: if self._my_send: self._my_send.reset_limit() @@ -572,7 +571,7 @@ class DynamicWeChat(_PluginBase): self._cookie_lifetime = PyCookieCloud.load_cookie_lifetime(self._settings_file_path) browser.close() except Exception as e: - self._cookie_valid = False + self._send_cookie_false() self._saved_cookie = None # 异常时清空 cookie logger.error(f"cookie 校验过程中发生异常: {e}") @@ -601,7 +600,7 @@ class DynamicWeChat(_PluginBase): if task == 'local_scanning': time.sleep(6) else: - logger.info("等待30秒,请将短信验证码请以'?'结束,发送到<企业微信应用> 如: 110301?") + logger.info("等待30秒,请将短信验证码请以'?'结束,发送到<企业微信应用> 如: 110301?") time.sleep(30) # 多等30秒 if self._verification_code: # logger.info("输入验证码:" + self._verification_code) @@ -620,14 +619,15 @@ class DynamicWeChat(_PluginBase): logger.error("未收到短信验证码") return False except Exception as e: - # logger.debug(str(e)) # 基于bug运行,请不要将错误输出到日志 - # try: # 没有登录成功,也没有短信验证码 + # logger.debug(str(e)) # 基于bug运行,请不要将错误输出到日志 + # try: # 没有登录成功,也没有短信验证码 if self.find_qrc( - page) and not task == 'refresh_cookie' and not task == 'local_scanning': # 延长任务找到的二维码不会被发送,所以不算用户没有扫码 + page) and not task == 'refresh_cookie' and not task == 'local_scanning': # 延长任务找到的二维码不会被发送,所以不算用户没有扫码 logger.warning(f"用户没有扫描二维码") return False def click_app_management_buttons(self, page): + self._cookie_valid = True bash_url = "https://work.weixin.qq.com/wework_admin/frame#apps/modApiApp/" # 按钮的选择器和名称 buttons = [ @@ -637,6 +637,7 @@ class DynamicWeChat(_PluginBase): "//div[contains(@class, 'js_show_ipConfig_dialog')]//a[contains(@class, '_mod_card_operationLink') and text()='配置']", "配置") ] + _, self._current_ip_address = self.get_ip_from_url(self._input_id_list) if "||" in self._input_id_list: parts = self._input_id_list.split("||", 1) input_id_list = parts[0] @@ -645,8 +646,12 @@ class DynamicWeChat(_PluginBase): id_list = input_id_list.split(",") app_urls = [f"{bash_url}{app_id.strip()}" for app_id in id_list] for app_url in app_urls: - page.goto(app_url) # 打开应用详情页 app_id = app_url.split("/")[-1] + if app_id.startswith("100000") and len(app_id) == 6: + self._ip_changed = False + logger.warning(f"请根据 https://github.com/RamenRa/MoviePilot-Plugins 的说明进行配置应用ID") + return + page.goto(app_url) # 打开应用详情页 time.sleep(2) # 依次点击每个按钮 for xpath, name in buttons: @@ -673,9 +678,9 @@ class DynamicWeChat(_PluginBase): ip_parts = self._current_ip_address.split('.') masked_ip = f"{ip_parts[0]}.{len(ip_parts[1]) * '*'}.{len(ip_parts[2]) * '*'}.{ip_parts[3]}" if self._my_send: - result = self._my_send.send(title="更新可信IP成功", - content='应用: ' + app_id + ' 输入IP:' + masked_ip, - force_send=True, diy_channel="WeChat") + self._my_send.send(title="更新可信IP成功", + content='应用: ' + app_id + ' 输入IP:' + masked_ip, + force_send=True, diy_channel="WeChat") def __update_config(self): """ @@ -700,7 +705,7 @@ class DynamicWeChat(_PluginBase): def get_form(self) -> Tuple[List[dict], Dict[str, Any]]: """ - 拼装插件配置页面,只保留必要的配置项,并添加 token 配置。 + 拼装插件配置页面,只保留必要的配置项,并添加 token 配置。 """ return [ { @@ -851,7 +856,7 @@ class DynamicWeChat(_PluginBase): 'model': 'input_id_list', 'label': '[必填]应用ID', 'rows': 1, - 'placeholder': '输入应用ID,多个ID用英文逗号分隔。在企业微信应用页面URL末尾获取' + 'placeholder': '输入应用ID,多个ID用英文逗号分隔。在企业微信应用页面URL末尾获取' } } ] @@ -872,7 +877,7 @@ class DynamicWeChat(_PluginBase): 'props': { 'type': 'info', 'variant': 'tonal', - 'text': '建议启用内建或自定义CookieCloud。支持微信、Server酱等第三方通知,具体请查看作者主页' + 'text': '建议启用内建或自定义CookieCloud。支持微信、Server酱等第三方通知,具体请查看作者主页' } } ] @@ -892,7 +897,7 @@ class DynamicWeChat(_PluginBase): 'component': 'VAlert', 'props': { 'type': 'info', - 'text': 'Cookie失效时通知用户,用户使用/push_qr让插件推送二维码。使用第三方通知时填写对应Token/API' + 'text': 'Cookie失效时通知用户,用户使用/push_qr让插件推送二维码。使用第三方通知时填写对应Token/API' } } ] @@ -919,20 +924,20 @@ class DynamicWeChat(_PluginBase): # 判断二维码是否过期 if current_time > self._future_timestamp: - vaild_text = "二维码已过期" - color = "#ff0000" + vaild_text = "二维码已过期或没有扫码任务" + color = "#ff0000" if self._enabled else "#bbbbbb" self._qr_code_image = None else: - # 二维码有效,格式化过期时间为 年-月-日 时:分:秒 + # 二维码有效,格式化过期时间为 年-月-日 时:分:秒 expiration_time = datetime.fromtimestamp(self._future_timestamp).strftime('%Y-%m-%d %H:%M:%S') - vaild_text = f"二维码有效,过期时间: {expiration_time}" + vaild_text = f"二维码有效,过期时间: {expiration_time}" color = "#32CD32" - # 如果self._qr_code_image为None,返回提示信息 + # 如果self._qr_code_image为None,返回提示信息 if self._qr_code_image is None: img_component = { "component": "div", - "text": "登录二维码都会在此展示,二维码有6秒延时。 [适用于Docker版]", + "text": "登录二维码都会在此展示,二维码有6秒延时。 [适用于Docker版]", "props": { "style": { "fontSize": "22px", @@ -1063,17 +1068,16 @@ class DynamicWeChat(_PluginBase): if self._my_send: result = self._my_send.send("企业微信登录二维码", image=image_src) if result: - logger.info(f"远程推送任务: 二维码发送失败,原因:{result}") + logger.info(f"远程推送任务: 二维码发送失败,原因:{result}") browser.close() logger.info("----------------------本次任务结束----------------------") return - logger.info("远程推送任务: 二维码已经发送,等待用户 90 秒内扫码登录") - # logger.info("远程推送任务: 如收到短信验证码请以?结束,发送到<企业微信应用> 如: 110301?") + logger.info("远程推送任务: 二维码发送成功,等待用户 90 秒内扫码登录。V2'微信通知'的用户,此消息并不准确") + # logger.info("远程推送任务: 如收到短信验证码请以?结束,发送到<企业微信应用> 如: 110301?") time.sleep(90) - login_status = self.check_login_status(page, 'push_qr_code') - if login_status: + if self.check_login_status(page, 'push_qr_code'): self._update_cookie(page, context) # 刷新cookie - # logger.info("远程推送任务: 没有可用的CookieCloud服务器,只修改可信IP") + # logger.info("远程推送任务: 没有可用的CookieCloud服务器,只修改可信IP") self.click_app_management_buttons(page) else: logger.warning("远程推送任务: 没有找到可用的通知方式") @@ -1125,7 +1129,7 @@ class DynamicWeChat(_PluginBase): }] """ if self._enabled and self._cron: - logger.info(f"{self.plugin_name}定时服务启动,时间间隔 {self._cron} ") + # logger.info(f"{self.plugin_name}定时服务启动,时间间隔 {self._cron} ") return [{ "id": self.__class__.__name__, "name": f"{self.plugin_name}服务", diff --git a/plugins/dynamicwechat/notify_helper.py b/plugins/dynamicwechat/helper.py similarity index 61% rename from plugins/dynamicwechat/notify_helper.py rename to plugins/dynamicwechat/helper.py index da1985e..0df72d4 100644 --- a/plugins/dynamicwechat/notify_helper.py +++ b/plugins/dynamicwechat/helper.py @@ -3,6 +3,128 @@ import requests from app.modules.wechat import WeChat from app.schemas.types import NotificationType,MessageChannel +import os +import json +import requests +import base64 +import hashlib +from typing import Dict, Any +from Crypto import Random +from Crypto.Cipher import AES + + +def bytes_to_key(data: bytes, salt: bytes, output=48) -> bytes: + # 兼容v2 将bytes_to_key和encrypt导入 + assert len(salt) == 8, len(salt) + data += salt + key = hashlib.md5(data).digest() + final_key = key + while len(final_key) < output: + key = hashlib.md5(key + data).digest() + final_key += key + return final_key[:output] + + +def encrypt(message: bytes, passphrase: bytes) -> bytes: + """ + CryptoJS 加密原文 + + This is a modified copy of https://stackoverflow.com/questions/36762098/how-to-decrypt-password-from-javascript-cryptojs-aes-encryptpassword-passphras + """ + salt = Random.new().read(8) + key_iv = bytes_to_key(passphrase, salt, 32 + 16) + key = key_iv[:32] + iv = key_iv[32:] + aes = AES.new(key, AES.MODE_CBC, iv) + length = 16 - (len(message) % 16) + data = message + (chr(length) * length).encode() + return base64.b64encode(b"Salted__" + salt + aes.encrypt(data)) + + +class PyCookieCloud: + def __init__(self, url: str, uuid: str, password: str): + self.url: str = url + self.uuid: str = uuid + self.password: str = password + + def check_connection(self) -> bool: + """ + Test the connection to the CookieCloud server. + + :return: True if the connection is successful, False otherwise. + """ + try: + resp = requests.get(self.url, timeout=3) # 设置超时为3秒 + return resp.status_code == 200 + except Exception as e: + return False + + def update_cookie(self, formatted_cookies: Dict[str, Any]) -> bool: + """ + Update cookie data to CookieCloud. + + :param formatted_cookies: cookie value to update. + :return: if update success, return True, else return False. + """ + if '.work.weixin.qq.com' not in formatted_cookies: + formatted_cookies['.work.weixin.qq.com'] = [] + formatted_cookies['.work.weixin.qq.com'].append({ + 'name': '_upload_type', + 'value': 'A', + 'domain': '.work.weixin.qq.com', + 'path': '/', + 'expires': -1, + 'httpOnly': False, + 'secure': False, + 'sameSite': 'Lax' + }) + + cookie = {'cookie_data': formatted_cookies} + raw_data = json.dumps(cookie) + encrypted_data = encrypt(raw_data.encode('utf-8'), self.get_the_key().encode('utf-8')).decode('utf-8') + cookie_cloud_request = requests.post(self.url + '/update', + json={'uuid': self.uuid, 'encrypted': encrypted_data}) + if cookie_cloud_request.status_code == 200: + if cookie_cloud_request.json().get('action') == 'done': + return True + return False + + def get_the_key(self) -> str: + """ + Get the key used to encrypt and decrypt data. + + :return: the key. + """ + md5 = hashlib.md5() + md5.update((self.uuid + '-' + self.password).encode('utf-8')) + return md5.hexdigest()[:16] + + @staticmethod + def load_cookie_lifetime(settings_file: str = None): # 返回时间戳 单位秒 + if os.path.exists(settings_file): + with open(settings_file, 'r') as file: + settings = json.load(file) + return settings.get('_cookie_lifetime', 0) + else: + return 0 + + @staticmethod + def save_cookie_lifetime(settings_file, cookie_lifetime): # 传入时间戳 单位秒 + with open(settings_file, 'w') as file: + json.dump({'_cookie_lifetime': cookie_lifetime}, file) + + @staticmethod + def increase_cookie_lifetime(settings_file, seconds: int): + if os.path.exists(settings_file): + with open(settings_file, 'r') as file: + settings = json.load(file) + current_lifetime = settings.get('_cookie_lifetime', 0) + else: + current_lifetime = 0 + new_lifetime = current_lifetime + seconds + # 保存新的 _cookie_lifetime + PyCookieCloud.save_cookie_lifetime(settings_file, new_lifetime) + class MySender: def __init__(self, token=None, func=None): @@ -51,9 +173,7 @@ class MySender: if result is None: # 成功时返回 None return except Exception as e: - # 打印错误日志或处理错误 - return f"{channel} 通知错误: {e}" - # 切换到下一个通道 + pass # 忽略单个错误,继续尝试下一个通道 self.current_index = (self.current_index + 1) % len(self.tokens) return f"所有的通知方式都发送失败" @@ -90,13 +210,14 @@ class MySender: def _send_serverchan(self, title, content, image): tmp_tokens = self.tokens[self.current_index] - token = tmp_tokens if ',' in tmp_tokens: before_comma, after_comma = tmp_tokens.split(',', 1) if before_comma.startswith('sctp') and image: token = after_comma # 图片发到公众号 else: token = before_comma # 发到 server3 + else: + token = tmp_tokens if token.startswith('sctp'): match = re.match(r'sctp(\d+)t', token) @@ -156,7 +277,7 @@ class MySender: def _send_v2_wechat(self, title, content, image, token): """V2 微信通知发送""" if token and ',' in token: - channel, actual_userid = token.split(',', 1) + _, actual_userid = token.split(',', 1) else: actual_userid = None self.post_message_func( diff --git a/plugins/dynamicwechat/update_help.py b/plugins/dynamicwechat/update_help.py deleted file mode 100644 index e3cd6f7..0000000 --- a/plugins/dynamicwechat/update_help.py +++ /dev/null @@ -1,121 +0,0 @@ -import os -import json -import requests -import base64 -import hashlib -from typing import Dict, Any -from Crypto import Random -from Crypto.Cipher import AES - - -def bytes_to_key(data: bytes, salt: bytes, output=48) -> bytes: - # 兼容v2 将bytes_to_key和encrypt导入 - assert len(salt) == 8, len(salt) - data += salt - key = hashlib.md5(data).digest() - final_key = key - while len(final_key) < output: - key = hashlib.md5(key + data).digest() - final_key += key - return final_key[:output] - - -def encrypt(message: bytes, passphrase: bytes) -> bytes: - """ - CryptoJS 加密原文 - - This is a modified copy of https://stackoverflow.com/questions/36762098/how-to-decrypt-password-from-javascript-cryptojs-aes-encryptpassword-passphras - """ - salt = Random.new().read(8) - key_iv = bytes_to_key(passphrase, salt, 32 + 16) - key = key_iv[:32] - iv = key_iv[32:] - aes = AES.new(key, AES.MODE_CBC, iv) - length = 16 - (len(message) % 16) - data = message + (chr(length) * length).encode() - return base64.b64encode(b"Salted__" + salt + aes.encrypt(data)) - - -class PyCookieCloud: - def __init__(self, url: str, uuid: str, password: str): - self.url: str = url - self.uuid: str = uuid - self.password: str = password - - def check_connection(self) -> bool: - """ - Test the connection to the CookieCloud server. - - :return: True if the connection is successful, False otherwise. - """ - try: - resp = requests.get(self.url, timeout=3) # 设置超时为3秒 - return resp.status_code == 200 - except Exception as e: - return False - - def update_cookie(self, formatted_cookies: Dict[str, Any]) -> bool: - """ - Update cookie data to CookieCloud. - - :param formatted_cookies: cookie value to update. - :return: if update success, return True, else return False. - """ - if '.work.weixin.qq.com' not in formatted_cookies: - formatted_cookies['.work.weixin.qq.com'] = [] - formatted_cookies['.work.weixin.qq.com'].append({ - 'name': '_upload_type', - 'value': 'A', - 'domain': '.work.weixin.qq.com', - 'path': '/', - 'expires': -1, - 'httpOnly': False, - 'secure': False, - 'sameSite': 'Lax' - }) - - cookie = {'cookie_data': formatted_cookies} - raw_data = json.dumps(cookie) - encrypted_data = encrypt(raw_data.encode('utf-8'), self.get_the_key().encode('utf-8')).decode('utf-8') - cookie_cloud_request = requests.post(self.url + '/update', - json={'uuid': self.uuid, 'encrypted': encrypted_data}) - if cookie_cloud_request.status_code == 200: - if cookie_cloud_request.json().get('action') == 'done': - return True - return False - - def get_the_key(self) -> str: - """ - Get the key used to encrypt and decrypt data. - - :return: the key. - """ - md5 = hashlib.md5() - md5.update((self.uuid + '-' + self.password).encode('utf-8')) - return md5.hexdigest()[:16] - - @staticmethod - def load_cookie_lifetime(settings_file: str = None): # 返回时间戳 单位秒 - if os.path.exists(settings_file): - with open(settings_file, 'r') as file: - settings = json.load(file) - return settings.get('_cookie_lifetime', 0) - else: - return 0 - - @staticmethod - def save_cookie_lifetime(settings_file, cookie_lifetime): # 传入时间戳 单位秒 - with open(settings_file, 'w') as file: - json.dump({'_cookie_lifetime': cookie_lifetime}, file) - - @staticmethod - def increase_cookie_lifetime(settings_file, seconds: int): - if os.path.exists(settings_file): - with open(settings_file, 'r') as file: - settings = json.load(file) - current_lifetime = settings.get('_cookie_lifetime', 0) - else: - current_lifetime = 0 - new_lifetime = current_lifetime + seconds - # 保存新的 _cookie_lifetime - PyCookieCloud.save_cookie_lifetime(settings_file, new_lifetime)