From 15cbcaeff170755cf6a1139b2f06db7890be5108 Mon Sep 17 00:00:00 2001 From: ramen <1205925392@qq.com> Date: Sun, 22 Dec 2024 10:57:02 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BD=BF=E7=94=A8=E7=AC=AC=E4=B8=89=E6=96=B9?= =?UTF-8?q?=E9=80=9A=E7=9F=A5=E6=97=B6=E6=94=AF=E6=8C=81IP=E5=8F=98?= =?UTF-8?q?=E5=8A=A8=E5=90=8E=E9=80=9A=E7=9F=A5=EF=BC=8C=E6=8B=9F=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E5=A4=9A=E7=BD=91=E7=BB=9C=E5=87=BA=E5=8F=A3=E6=A3=80?= =?UTF-8?q?=E6=9F=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- package.json | 3 +- plugins/dynamicwechat/__init__.py | 286 ++++++++++++++++++++++------- plugins/dynamicwechat/helper.py | 294 +++++++++++++++++++++++++++--- 3 files changed, 485 insertions(+), 98 deletions(-) diff --git a/package.json b/package.json index bc79d4a..7b69783 100644 --- a/package.json +++ b/package.json @@ -875,12 +875,13 @@ "name": "动态企微可信IP", "description": "修改企微应用可信IP,支持Srever酱等第三方通知。验证码以?结尾发送到企业微信应用", "labels": "消息通知", - "version": "1.6.0", + "version": "1.7.0", "icon": "Wecom_A.png", "author": "RamenRa", "level": 2, "v2": true, "history": { + "v1.7.0": "使用第三方通知时可IP变动后通知,拟支持多网络出口检查。", "v1.6.0": "忽略因网络波动导致获取ip错误。自定义的类合并为helper.py。后续核心功能没问题将不再更新", "v1.5.2": "可以从指定url获取ip,修复不使用cc时cookie失效过快,v1可配置第三方为备用通知,server酱可以将文本发送到server3,二维码给服务号", "v1.5.1": "修复v2微信通知,可以指定微信通知ID", diff --git a/plugins/dynamicwechat/__init__.py b/plugins/dynamicwechat/__init__.py index c6d9dd5..efd006a 100644 --- a/plugins/dynamicwechat/__init__.py +++ b/plugins/dynamicwechat/__init__.py @@ -31,7 +31,7 @@ class DynamicWeChat(_PluginBase): # 插件图标 plugin_icon = "Wecom_A.png" # 插件版本 - plugin_version = "1.6.0" + plugin_version = "1.7.0" # 插件作者 plugin_author = "RamenRa" # 作者主页 @@ -61,10 +61,20 @@ class DynamicWeChat(_PluginBase): _is_special_upload = False # 聚合通知 _my_send = None + # 多wan口支持 + wan2 = None + # 当前检测url + wan2_url = None + # IP变动后发送通知开关 + _await_ip = False # 保存cookie _saved_cookie = None # 通知方式token/api _notification_token = '' + # 标记企业微信通知可用 + _wechat_available = True + # 标记IP变动后 是否发送通知 + _send_notification = False # 匹配ip地址的正则 _ip_pattern = r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b' @@ -96,8 +106,9 @@ class DynamicWeChat(_PluginBase): _use_cookiecloud = True # 登录cookie _cookie_header = "" + # 内建CookieCloud服务器 _server = f'http://localhost:{settings.NGINX_PORT}/cookiecloud' - + # CookieCloud客户端 _cookiecloud = CookieCloudHelper() # 定时器 _scheduler: Optional[BackgroundScheduler] = None @@ -124,31 +135,42 @@ class DynamicWeChat(_PluginBase): self._cron = config.get("cron") self._onlyonce = config.get("onlyonce") self._input_id_list = config.get("input_id_list") - self._current_ip_address = config.get("current_ip_address") + # self._current_ip_address = config.get("current_ip_address") self._forced_update = config.get("forced_update") self._local_scan = config.get("local_scan") self._use_cookiecloud = config.get("use_cookiecloud") self._cookie_header = config.get("cookie_header") - self._ip_changed = config.get("ip_changed") + self._await_ip = config.get("await_ip") if self.version != "v1": self._my_send = MySender(self._notification_token, func=self.post_message) else: self._my_send = MySender(self._notification_token) if not self._my_send.init_success: # 没有输入通知方式,不通知 self._my_send = None - _, self._current_ip_address = self.get_ip_from_url(self._input_id_list) + if self._my_send and not self._my_send.other_channel: # 确保跟随通知配置,一定要配置了第三方才可以使用 + self._await_ip = False + if "||wan2" in self._input_id_list: # 多wan口 + self.wan2 = IpLocationParser(self._settings_file_path) + self._current_ip_address = self.wan2.read_ips("ips") # 从文件中读取 + logger.info(f"当前记录的IP:{self._current_ip_address},如为空或不一致,预计检测1-2轮内会修正") + else: + self.wan2 = None + _, self._current_ip_address = self.get_ip_from_url() # 直接从网页获取 # 停止现有任务 self.stop_service() if (self._enabled or self._onlyonce) and self._input_id_list: # 定时服务 self._scheduler = BackgroundScheduler(timezone=settings.TZ) # 运行一次定时服务 - if self._onlyonce: - if not self._forced_update or not self._local_scan: - # logger.info("立即检测公网IP") - self._scheduler.add_job(func=self.check, trigger='date', - run_date=datetime.now(tz=pytz.timezone(settings.TZ)) + timedelta(seconds=3), - name="检测公网IP") # 添加任务 + if self._onlyonce: # 多网口ip检测禁用立即检测 + if not self.wan2: + if not self._forced_update or not self._local_scan: + # logger.info("立即检测公网IP") + self._scheduler.add_job(func=self.check, trigger='date', + run_date=datetime.now(tz=pytz.timezone(settings.TZ)) + timedelta(seconds=3), + name="检测公网IP") # 添加任务 + else: + logger.info("启用多网口检测时禁用‘立即检测一次’功能") # 关闭一次性开关 self._onlyonce = False @@ -176,6 +198,15 @@ class DynamicWeChat(_PluginBase): logger.error(f"定时任务配置错误:{err}") self.systemmessage.put(f"执行周期配置错误:{err}") + if self.wan2: + try: + self._scheduler.add_job(func=self.get_ip_from_url, + trigger=CronTrigger.from_crontab(self._cron), + name="多wan口公网IP检测") + except Exception as err: + logger.error(f"多wan口公网IP检测定时任务配置错误:{err}") + self.systemmessage.put(f"执行周期配置错误:{err}") + # 启动任务 if self._scheduler.get_jobs(): self._scheduler.print_jobs() @@ -184,13 +215,13 @@ class DynamicWeChat(_PluginBase): def _send_cookie_false(self): self._cookie_valid = False - if self._my_send: - result = self._my_send.send( + if self._my_send and not self._await_ip: # 不启用“IP变动后通知” + error = self._my_send.send( title="cookie已失效,请及时更新", content="请在企业微信应用发送/push_qr, 如有验证码以'?'结束发送到企业微信应用。 如果使用’微信通知‘请确保公网IP还没有变动", image=None, force_send=False ) - if result: + if error: logger.info(f"cookie失效通知发送失败,原因:{result}") @eventmanager.register(EventType.PluginAction) @@ -290,31 +321,70 @@ class DynamicWeChat(_PluginBase): self.ChangeIP() self.__update_config() logger.info("----------------------本次任务结束----------------------") + elif self._await_ip and not self._send_notification: + # logger.info("cookie已失效。但配置了第三方通知,继续检测公网IP。当IP变动企业微信通知彻底无法使用时通知用户") + logger.info("开始检测公网IP,等待IP变动后发送通知") + if self.CheckIP(func="public"): + # logger.info(f"配置的第三方通知{self._my_send.other_channel}") + for channel, token in self._my_send.other_channel: + # logger.info(f"正常尝试:{channel} {token}") + error = self._my_send.send( + title="公网IP与企业微信IP不一致", + content="请在企业微信应用发送/push_qr, 如有验证码以'?'结束发送到企业微信应用。", + image=None, force_send=False, diy_channel=channel, diy_token=token + ) + if error: + logger.error(f"通道 {channel} 发送失败,原因:{error}") + else: + self._send_notification = True + break # 发送成功后退出循环 + self._wechat_available = False # 标记不可用 + logger.info("----------------------本次任务结束----------------------") else: - logger.warning("cookie已失效请及时更新,本次不检查公网IP") + if self._send_notification: + logger.info("企业微信可信IP和公网IP不一致,微信通知可能已经无法使用。第三方通知已经发送。") + else: + logger.info("cookie已失效请及时更新,本次不检查公网IP") - def CheckIP(self): - url, ip_address = self.get_ip_from_url(self._input_id_list) - if url and ip_address: - logger.info(f"IP获取成功: {url}: {ip_address}") + def CheckIP(self, func=None): + if self.wan2: + ip_address = self.wan2.read_ips("url_ip") + url = self.wan2_url + else: + url, ip_address = self.get_ip_from_url() - # 如果所有 URL 请求失败 if ip_address == "获取IP失败" or not url: logger.error("获取IP失败 不操作可信IP") return False - elif not self._ip_changed: # 上次修改IP失败 + # 成功获取 IP,记录日志 + if url and ip_address: + logger.info(f"IP获取成功: {url}: {ip_address}") + + # 上次修改 IP 失败时,继续尝试修改 + if not self._ip_changed and func != "public": # 排除cookie失效 检测公网变动的任务 logger.info("上次IP修改IP失败 继续尝试修改IP") - self._current_ip_address = ip_address return True - # 检查 IP 是否变化 - if ip_address != self._current_ip_address: - logger.info("检测到IP变化") - self._current_ip_address = ip_address - return True + # 如果有 wan2,则处理新增的 IP 地址 + if self.wan2: + if isinstance(ip_address, str): + url_ips = ip_address.split(";") # 将字符串按分号拆分为多个 IP 地址 + else: + url_ips = ip_address + saved_ips = self.wan2.read_ips("ips") + + # 检查每个新 IP 是否存在,若不存在则添加并返回 True + for ip in url_ips: + if ip not in saved_ips: + self.wan2.add_ips("ips", ip) # 将url获取到的新IP添加到ips字段 + return True else: - return False + # 检查 IP 是否变化 + if ip_address != self._current_ip_address: + logger.info("检测到IP变化") + return True + return False def try_connect_cc(self): if not self._use_cookiecloud: # 不使用CookieCloud @@ -336,30 +406,57 @@ class DynamicWeChat(_PluginBase): self._cc_server = None logger.error("没有可用的CookieCloud服务器") - def get_ip_from_url(self, input_data) -> (str, str): + def get_ip_from_url(self) -> (str, str): # 根据输入解析 URL 列表 - if isinstance(input_data, str) and "||" in input_data: - _, url_list = input_data.split("||", 1) + if isinstance(self._input_id_list, str) and "||" in self._input_id_list: + _, url_list = self._input_id_list.split("||", 1) urls = url_list.split(",") - elif isinstance(input_data, list): - urls = input_data + elif isinstance(self._input_id_list, list): + urls = self._input_id_list else: urls = self._ip_urls # 随机化 URL 列表 random.shuffle(urls) - - for url in urls: - try: - response = requests.get(url, timeout=3) - if response.status_code == 200: - ip_address = re.search(self._ip_pattern, response.text) - if ip_address: - return url, ip_address.group() # 返回匹配的 IP 地址 - except Exception as e: - if "104" not in str(e) or 'Read timed out' not in str(e): # 忽略网络波动,都失败会返回None, "获取IP失败" - logger.warning(f"{url} 获取IP失败, Error: {e}") - return None, "获取IP失败" + if not self.wan2: + for url in urls: + try: + response = requests.get(url, timeout=3) + if response.status_code == 200: + ip_address = re.search(self._ip_pattern, response.text) + if ip_address: + # self.wan1.overwrite_ips("url_ip", ip_address.group()) + return url, ip_address.group() # 返回匹配的 IP 地址 + except Exception as e: + if "104" not in str(e) and 'Read timed out' not in str(e): # 忽略网络波动,都失败会返回None, "获取IP失败" + logger.warning(f"{url} 获取IP失败, Error: {e}") + return None, "获取IP失败" + else: + urls = ["https://ip.skk.moe/multi", "https://ip.orz.tools"] + random.shuffle(urls) + # 创建一个 Playwright 实例 + with sync_playwright() as p: + browser = None # 定义浏览器变量 + for url in urls: + try: + # 启动浏览器 + if url == "https://ip.skk.moe/multi": + browser = p.chromium.launch(headless=False, args=['--lang=zh-CN']) + else: + browser = p.chromium.launch(headless=True, args=['--lang=zh-CN']) + page = browser.new_page() + china_ips = self.wan2.get_ipv4(page, url) + if china_ips: + self.wan2_url = url + self.wan2.overwrite_ips("url_ip", china_ips) # 将获取到的IP写入文件 覆盖写入 + return url, china_ips # 成功获取到IP后返回 + except Exception as e: + logger.warning(f"{url} 多出口IP获取失败, Error: {e}") + finally: + if browser: + browser.close() + browser = None # 重置浏览器变量 + return None, "获取IP失败" def find_qrc(self, page): # 查找 iframe 元素并切换到它 @@ -475,9 +572,11 @@ class DynamicWeChat(_PluginBase): if current_cookies is None: self._send_cookie_false() logger.error("更新本地 Cookie失败") + self._is_special_upload = False return else: logger.info("更新本地 Cookie成功") + self._is_special_upload = True self._saved_cookie = current_cookies # 保存 self._cookie_valid = True except Exception as e: @@ -498,10 +597,6 @@ class DynamicWeChat(_PluginBase): for domain, cookie in cookies.items(): if domain == ".work.weixin.qq.com": cookie_header = cookie - if '_upload_type=A' in cookie: - self._is_special_upload = True - else: - self._is_special_upload = False break if cookie_header == '': cookie_header = self._cookie_header @@ -511,11 +606,15 @@ class DynamicWeChat(_PluginBase): logger.error(f"从CookieCloud获取cookie错误,错误原因:{e}") return - @staticmethod - def parse_cookie_header(cookie_header): + # @staticmethod + def parse_cookie_header(self, cookie_header): cookies = [] + self._is_special_upload = False for cookie in cookie_header.split(';'): name, value = cookie.strip().split('=', 1) + if name == '_upload_type' and value == 'A': + self._is_special_upload = True + continue cookies.append({ 'name': name, 'value': value, @@ -613,6 +712,7 @@ class DynamicWeChat(_PluginBase): # 等待登录成功的元素出现 success_element = page.wait_for_selector('#check_corp_info', timeout=5000) if success_element: + self._verification_code = None logger.info("验证码登录成功!") return True else: @@ -628,6 +728,7 @@ class DynamicWeChat(_PluginBase): def click_app_management_buttons(self, page): self._cookie_valid = True + self._my_send.reset_limit() # 解除限制 可以发送cookie失效提醒 bash_url = "https://work.weixin.qq.com/wework_admin/frame#apps/modApiApp/" # 按钮的选择器和名称 buttons = [ @@ -637,7 +738,10 @@ class DynamicWeChat(_PluginBase): "//div[contains(@class, 'js_show_ipConfig_dialog')]//a[contains(@class, '_mod_card_operationLink') and text()='配置']", "配置") ] - _, self._current_ip_address = self.get_ip_from_url(self._input_id_list) + if self.wan2: # 多wan口从文件读取 ip + self._current_ip_address = self.wan2.read_ips("ips") + else: + _, self._current_ip_address = self.get_ip_from_url() if "||" in self._input_id_list: parts = self._input_id_list.split("||", 1) input_id_list = parts[0] @@ -664,6 +768,7 @@ class DynamicWeChat(_PluginBase): # logger.info(f"已找到文本框") input_area = page.locator('textarea.js_ipConfig_textarea') confirm = page.locator('.js_ipConfig_confirmBtn') + # logger.info(f"即将输入的内容:'{input_ip}'") input_area.fill(self._current_ip_address) # 填充 IP 地址 confirm.click() # 点击确认按钮 time.sleep(3) # 等待处理 @@ -674,14 +779,25 @@ class DynamicWeChat(_PluginBase): if "disabled" in str(e): logger.info(f"应用{app_id} 已被禁用,可能是没有设置接收api") if self._ip_changed: + self._wechat_available = True # 标记微信通知重新有效 + self._send_notification = False # 重置第三方通知已发送标记 + masked_ips = [self.mask_ip(ip) for ip in self._current_ip_address.split(';')] + masked_ip_string = ";".join(masked_ips) logger.info(f"应用: {app_id} 输入IP:" + self._current_ip_address) - ip_parts = self._current_ip_address.split('.') - masked_ip = f"{ip_parts[0]}.{len(ip_parts[1]) * '*'}.{len(ip_parts[2]) * '*'}.{ip_parts[3]}" if self._my_send: self._my_send.send(title="更新可信IP成功", - content='应用: ' + app_id + ' 输入IP:' + masked_ip, + content='应用: ' + app_id + ' 输入IP:' + masked_ip_string, force_send=True, diy_channel="WeChat") + @staticmethod + def mask_ip(ip): + ip_parts = ip.split('.') + if len(ip_parts) == 4: # 确保是有效的 IPv4 地址 + # 使用星号替换第二和第三部分 + masked_ip = f"{ip_parts[0]}.{len(ip_parts[1]) * '*'}.{len(ip_parts[2]) * '*'}.{ip_parts[3]}" + return masked_ip + return ip # 如果不是有效的 IP 地址,返回原地址 + def __update_config(self): """ 更新配置 @@ -691,8 +807,8 @@ class DynamicWeChat(_PluginBase): "onlyonce": self._onlyonce, "cron": self._cron, "notification_token": self._notification_token, - "current_ip_address": self._current_ip_address, - "ip_changed": self._ip_changed, + # "current_ip_address": self._current_ip_address, + "await_ip": self._await_ip, "forced_update": self._forced_update, "local_scan": self._local_scan, "input_id_list": self._input_id_list, @@ -798,7 +914,26 @@ class DynamicWeChat(_PluginBase): } } ] - } + }, + *( + [{ + 'component': 'VCol', + 'props': { + 'cols': 12, + 'md': 4 + }, + 'content': [ + { + 'component': 'VSwitch', + 'props': { + 'model': 'await_ip', + 'label': 'IP变动后通知', + } + } + ] + }] + if self._my_send and self._my_send.other_channel else [] + ) ] }, { @@ -877,7 +1012,7 @@ class DynamicWeChat(_PluginBase): 'props': { 'type': 'info', 'variant': 'tonal', - 'text': '建议启用内建或自定义CookieCloud。支持微信、Server酱等第三方通知,具体请查看作者主页' + 'text': '建议启用内建或自定义CookieCloud。支持微信和Server酱等第三方通知。具体请查看作者主页' } } ] @@ -897,7 +1032,7 @@ class DynamicWeChat(_PluginBase): 'component': 'VAlert', 'props': { 'type': 'info', - 'text': 'Cookie失效时通知用户,用户使用/push_qr让插件推送二维码。使用第三方通知时填写对应Token/API' + 'text': 'Cookie失效时通知用户,用户使用/push_qr让插件推送二维码。使用第三方通知时填写对应Token/API' } } ] @@ -913,6 +1048,7 @@ class DynamicWeChat(_PluginBase): "forceUpdate": False, "use_cookiecloud": True, "use_local_qr": False, + "await_ip": False, "cookie_header": "", "notification_token": "", "input_id_list": "" @@ -969,7 +1105,7 @@ class DynamicWeChat(_PluginBase): } } } - if self._is_special_upload: + if self._is_special_upload and self._enabled: # 计算 cookie_lifetime 的天数、小时数和分钟数 cookie_lifetime_days = self._cookie_lifetime // 86400 # 一天的秒数为 86400 cookie_lifetime_hours = (self._cookie_lifetime % 86400) // 3600 # 计算小时数 @@ -1066,12 +1202,24 @@ class DynamicWeChat(_PluginBase): image_src, refuse_time = self.find_qrc(page) if image_src: if self._my_send: - result = self._my_send.send("企业微信登录二维码", image=image_src) - if result: - logger.info(f"远程推送任务: 二维码发送失败,原因:{result}") - browser.close() - logger.info("----------------------本次任务结束----------------------") - return + if not self._wechat_available and self._my_send.other_channel: # 微信通知已经无法使用 + for channel, token in self._my_send.other_channel: + # logger.info(f"正常尝试:{channel} {token}") + error = self._my_send.send( + title="企业微信登录二维码", + image=image_src, diy_channel=channel, diy_token=token + ) + if error: + logger.warning(f"通道 {channel} 推送二维码失败,原因:{error}") + else: + break # 发送成功后退出循环 + else: # 硬发 + error = self._my_send.send("企业微信登录二维码", image=image_src) + if error: + logger.info(f"远程推送任务: 二维码发送失败,原因:{error}") + browser.close() + logger.info("----------------------本次任务结束----------------------") + return logger.info("远程推送任务: 二维码发送成功,等待用户 90 秒内扫码登录。V2'微信通知'的用户,此消息并不准确") # logger.info("远程推送任务: 如收到短信验证码请以?结束,发送到<企业微信应用> 如: 110301?") time.sleep(90) @@ -1129,7 +1277,9 @@ class DynamicWeChat(_PluginBase): }] """ if self._enabled and self._cron: - # logger.info(f"{self.plugin_name}定时服务启动,时间间隔 {self._cron} ") + logger.info(f"服务启动") + if self.wan2: + logger.info("多网口检测第一次获取IP可能会失败") return [{ "id": self.__class__.__name__, "name": f"{self.plugin_name}服务", diff --git a/plugins/dynamicwechat/helper.py b/plugins/dynamicwechat/helper.py index 0df72d4..94b4d9e 100644 --- a/plugins/dynamicwechat/helper.py +++ b/plugins/dynamicwechat/helper.py @@ -1,8 +1,4 @@ import re -import requests -from app.modules.wechat import WeChat -from app.schemas.types import NotificationType,MessageChannel - import os import json import requests @@ -12,6 +8,9 @@ from typing import Dict, Any from Crypto import Random from Crypto.Cipher import AES +from app.modules.wechat import WeChat +from app.schemas.types import NotificationType, MessageChannel + def bytes_to_key(data: bytes, salt: bytes, output=48) -> bytes: # 兼容v2 将bytes_to_key和encrypt导入 @@ -100,7 +99,7 @@ class PyCookieCloud: return md5.hexdigest()[:16] @staticmethod - def load_cookie_lifetime(settings_file: str = None): # 返回时间戳 单位秒 + def load_cookie_lifetime(settings_file: str = None): if os.path.exists(settings_file): with open(settings_file, 'r') as file: settings = json.load(file) @@ -109,18 +108,21 @@ class PyCookieCloud: return 0 @staticmethod - def save_cookie_lifetime(settings_file, cookie_lifetime): # 传入时间戳 单位秒 + def save_cookie_lifetime(settings_file, cookie_lifetime): + data = {} + if os.path.exists(settings_file): + with open(settings_file, 'r') as file: + data = json.load(file) + + # 只更新 _cookie_lifetime 字段,其它字段保持不变 + data['_cookie_lifetime'] = cookie_lifetime + with open(settings_file, 'w') as file: - json.dump({'_cookie_lifetime': cookie_lifetime}, file) + json.dump(data, file, indent=4) @staticmethod def increase_cookie_lifetime(settings_file, seconds: int): - if os.path.exists(settings_file): - with open(settings_file, 'r') as file: - settings = json.load(file) - current_lifetime = settings.get('_cookie_lifetime', 0) - else: - current_lifetime = 0 + current_lifetime = PyCookieCloud.load_cookie_lifetime(settings_file) new_lifetime = current_lifetime + seconds # 保存新的 _cookie_lifetime PyCookieCloud.save_cookie_lifetime(settings_file, new_lifetime) @@ -135,6 +137,14 @@ class MySender: self.init_success = bool(self.tokens) # 标识初始化是否成功 self.post_message_func = func # V2 微信模式的 post_message 方法 + @property + def other_channel(self): + """ + 返回非 WeChat 通道及其对应 token 的列表 + :return: [(channel, token), ...] + """ + return [(channel, token) for channel, token in zip(self.channels, self.tokens) if channel != "WeChat"] + @staticmethod def _detect_channel(token): """根据 token 确定通知渠道""" @@ -149,7 +159,7 @@ class MySender: else: return "PushPlus" - def send(self, title, content=None, image=None, force_send=False, diy_channel=None): + def send(self, title, content=None, image=None, force_send=False, diy_channel=None, diy_token=None): """发送消息""" if not self.init_success: return @@ -162,14 +172,14 @@ class MySender: # 如果指定了自定义通道,直接尝试发送 if diy_channel: - return self._try_send(title, content, image, diy_channel) + return self._try_send(title, content, image, channel=diy_channel, diy_token=diy_token) # 尝试按顺序发送,直到成功或遍历所有通道 - for i in range(len(self.tokens)): + for _ in range(len(self.tokens)): token = self.tokens[self.current_index] channel = self.channels[self.current_index] try: - result = self._try_send(title, content, image, channel, token) + result = self._try_send(title, content, image, channel, token=token) if result is None: # 成功时返回 None return except Exception as e: @@ -177,20 +187,20 @@ class MySender: self.current_index = (self.current_index + 1) % len(self.tokens) return f"所有的通知方式都发送失败" - def _try_send(self, title, content, image, channel, token=None): + def _try_send(self, title, content, image, channel, token=None, diy_token=None): """尝试使用指定通道发送消息""" if channel == "WeChat" and self.post_message_func: return self._send_v2_wechat(title, content, image, token) elif channel == "WeChat": return self._send_wechat(title, content, image, token) elif channel == "ServerChan": - return self._send_serverchan(title, content, image) + return self._send_serverchan(title, content, image, diy_token) elif channel == "AnPush": - return self._send_anpush(title, content, image) + return self._send_anpush(title, content, image, diy_token) elif channel == "PushPlus": - return self._send_pushplus(title, content, image) + return self._send_pushplus(title, content, image, diy_token) else: - raise ValueError(f"Unknown channel: {channel}") + return f"未知的通知方式: {channel}" @staticmethod def _send_wechat(title, content, image, token): @@ -208,8 +218,11 @@ class MySender: return "微信通知发送错误" return None - def _send_serverchan(self, title, content, image): - tmp_tokens = self.tokens[self.current_index] + def _send_serverchan(self, title, content, image, diy_token=None): + if diy_token: + tmp_tokens = diy_token + else: + tmp_tokens = self.tokens[self.current_index] if ',' in tmp_tokens: before_comma, after_comma = tmp_tokens.split(',', 1) if before_comma.startswith('sctp') and image: @@ -237,12 +250,15 @@ class MySender: return f"Server酱通知错误: {result.get('message')}" return None - def _send_anpush(self, title, content, image): - token = self.tokens[self.current_index] # 获取当前通道对应的 token + def _send_anpush(self, title, content, image, diy_token=None): + if diy_token: + token = diy_token + else: + token = self.tokens[self.current_index] # 获取当前通道对应的 token if ',' in token: channel, token = token.split(',', 1) else: - return "可能AnPush 没有配置消息通道ID" + return "AnPush可能没有配置消息通道ID" url = f"https://api.anpush.com/push/{token}" payload = { "title": title, @@ -259,8 +275,11 @@ class MySender: return "AnPush 消息通道未找到" return None - def _send_pushplus(self, title, content, image): - token = self.tokens[self.current_index] # 获取当前通道对应的 token + def _send_pushplus(self, title, content, image, diy_token=None): + if diy_token: + token = diy_token + else: + token = self.tokens[self.current_index] # 获取当前通道对应的 token pushplus_url = f"http://www.pushplus.plus/send/{token}" # PushPlus发送逻辑 data = { @@ -294,3 +313,220 @@ class MySender: def reset_limit(self): """解除限制,允许再次发送纯文本消息""" self.first_text_sent = False + + +class IpLocationParser: + def __init__(self, settings_file_path, max_ips=4): + self._settings_file_path = settings_file_path + self._max_ips = max_ips # 最大历史IP数量 + self._ips = self.read_ips("ips") # 初始化时读取已存储的 IP 地址 + + @staticmethod + def _parse(page, url): + # 定义 URL 到解析函数的映射 + parser_methods = { + "https://ip.orz.tools": IpLocationParser._parse_ip_orz_tools, + "https://ip.skk.moe/multi": IpLocationParser._parse_ip_skk_moe, + "http://revproxy.ustc.edu.cn:8000": IpLocationParser._parse_ip_ustc, + } + parser_method = parser_methods.get(url) + if parser_method is None: + return [], [] + return parser_method(page) + + @staticmethod + def _remove_duplicates(ipv4_addresses, locations): + """去重并保持 IP 地址和归属地的对应关系""" + seen = set() + unique_ipv4 = [] + unique_locations = [] + + for ip, location in zip(ipv4_addresses, locations): + if ip not in seen: + seen.add(ip) + unique_ipv4.append(ip) + unique_locations.append(location) + + return unique_ipv4, unique_locations + + @staticmethod + def _is_valid_ipv4(ip): + """验证是否是合法的 IPv4 地址""" + return re.match(r'^\d{1,3}(\.\d{1,3}){3}$', ip) is not None + + @staticmethod + def _parse_ip_orz_tools(page): + rows = page.query_selector_all('#results3 .row') + # print(f"ip_orz_tools共找到 {len(rows)} 行数据") + ipv4_addresses, locations = [], [] + + for i, row in enumerate(rows): + row_html = row.inner_html() + + # 提取 IP 地址 + ip_match = re.search(r'data-name="([^"]+)"', row_html) + if ip_match: + ip = ip_match.group(1).strip() + if not IpLocationParser._is_valid_ipv4(ip): + continue + else: + continue + + # 提取位置数据 + loc_element = row.query_selector('.loc.cell') + location = loc_element.inner_text().strip() if loc_element else "未知" + + ipv4_addresses.append(ip) + locations.append(location) + + return IpLocationParser._remove_duplicates(ipv4_addresses, locations) + + @staticmethod + def _parse_ip_skk_moe(page): + rows = page.query_selector_all( + 'body > div > section > div.x1n2onr6.xw2csxc.x10fe3q7.x116uinm.xdpxx8g > table > tbody > tr' + ) + # print(f"skk共找到 {len(rows)} 行数据") + ipv4_addresses, locations = [], [] + + for i, row in enumerate(rows): + ip_element = row.query_selector('th') + loc_element = row.query_selector('td:nth-child(3)') # 假设归属地在第 3 列 + + if ip_element and loc_element: + ip = ip_element.inner_text().strip() + if not IpLocationParser._is_valid_ipv4(ip): + continue + location = loc_element.inner_text().strip() + + ipv4_addresses.append(ip) + locations.append(location) + + return IpLocationParser._remove_duplicates(ipv4_addresses, locations) + + @staticmethod + def _parse_ip_ustc(page): + rows = page.query_selector_all( + 'body > div:nth-child(4) > center > table > tbody > tr > td:nth-child(2)' + ) + # print(f"ip_ustc共找到 {len(rows)} 行数据") + ipv4_addresses, locations = [], [] + + for row in rows: + row_text = row.inner_text().strip() + + # 提取 IP 地址 + ip_match = re.match(r'(\d+\.\d+\.\d+\.\d+)', row_text) + if ip_match: + ip = ip_match.group(1).strip() + if not IpLocationParser._is_valid_ipv4(ip): + continue + else: + continue + + # 提取归属地 + location_match = re.search(r'(China|中国).*', row_text) + location = location_match.group(0).strip() if location_match else "未知" + + ipv4_addresses.append(ip) + locations.append(location) + + return IpLocationParser._remove_duplicates(ipv4_addresses, locations) + + @staticmethod + def get_ipv4(page, url: str) -> str: + """返回多个中国 IP 地址,逗号分隔""" + # 导航到目标页面 + page.goto(url) + # 等待一段时间,让所有动态渲染的内容加载完成 + page.wait_for_timeout(10000) # 等待 10 秒钟 + # 调用解析器解析数据 + ipv4_addresses, locations = IpLocationParser._parse(page, url) + # 筛选出属于中国的 IP 地址 + china_ips = [ + ip for ip, location in zip(ipv4_addresses, locations) + if 'China' in location or '中国' in location + ] + # 返回逗号分隔的字符串 + return ';'.join(china_ips) + + def _limit_and_deduplicate_ips(self, ips): + """ + 去重并限制 IP 地址数量,最多保存 _max_ips 个 IP 地址。 + """ + # 去重并保留顺序 + unique_ips = list(dict.fromkeys(ips)) + return unique_ips[:self._max_ips] # 保留最多 _max_ips 个 IP 地址 + + def _read_ips_from_json(self, field): + """ + 从 JSON 文件中读取指定字段的 IP 地址。 + """ + if not os.path.exists(self._settings_file_path): + return "" # 文件不存在,返回空字符串 + + try: + with open(self._settings_file_path, 'r') as f: + data = json.load(f) + # 获取字段内容并返回分号分隔的字符串 + return ";".join(data.get(field, [])) + except (json.JSONDecodeError, IOError): + return "" # 读取失败,返回空字符串 + + def _overwrite_ips_in_json(self, field, new_ips): + """ + 覆盖写入指定字段的 IP 地址。 + :param field: 要更新的字段名 + :param new_ips: 新的 IP 地址列表或分号分隔的字符串 + """ + # 如果输入是字符串,将其转换为列表 + if isinstance(new_ips, str): + new_ips = new_ips.split(";") + + # 去重并限制 IP 数量 + new_ips = self._limit_and_deduplicate_ips(new_ips) + + # 读取现有数据(如果文件不存在,则初始化空数据) + if os.path.exists(self._settings_file_path): + try: + with open(self._settings_file_path, 'r') as f: + data = json.load(f) + except (json.JSONDecodeError, IOError): + data = {} + else: + data = {} + + # 更新指定字段 + data[field] = new_ips + + # 写入文件 + with open(self._settings_file_path, 'w') as f: + json.dump(data, f, indent=4) + + def read_ips(self, field) -> str: + """ + 获取 JSON 文件中指定字段的所有 IP 地址,返回分号分隔的字符串。 + """ + return self._read_ips_from_json(field) + + def overwrite_ips(self, field, new_ips): + """ + 覆盖写入指定字段的新 IP 地址。 + """ + self._overwrite_ips_in_json(field, new_ips) + + def add_ips(self, field, new_ips): + """ + 增量添加指定字段中的 IP 地址。 + :param field: 要更新的字段名 + :param new_ips: 要添加的 IP 地址列表或分号分隔的字符串 + """ + # 获取当前的 IP 地址 + current_ips = self.read_ips(field).split(";") if self.read_ips(field) else [] + + # 合并新 IP 地址并去重、限制数量 + updated_ips = self._limit_and_deduplicate_ips(new_ips.split(";") + current_ips) + + # 写入更新后的 IP 地址 + self.overwrite_ips(field, updated_ips) +