Merge pull request #613 from RamenRa/master

This commit is contained in:
jxxghp
2024-12-22 11:16:47 +08:00
committed by GitHub
3 changed files with 485 additions and 98 deletions

View File

@@ -875,12 +875,13 @@
"name": "动态企微可信IP",
"description": "修改企微应用可信IP支持Srever酱等第三方通知。验证码以结尾发送到企业微信应用",
"labels": "消息通知",
"version": "1.6.0",
"version": "1.7.0",
"icon": "Wecom_A.png",
"author": "RamenRa",
"level": 2,
"v2": true,
"history": {
"v1.7.0": "使用第三方通知时可IP变动后通知拟支持多网络出口检查。",
"v1.6.0": "忽略因网络波动导致获取ip错误。自定义的类合并为helper.py。后续核心功能没问题将不再更新",
"v1.5.2": "可以从指定url获取ip,修复不使用cc时cookie失效过快v1可配置第三方为备用通知server酱可以将文本发送到server3,二维码给服务号",
"v1.5.1": "修复v2微信通知可以指定微信通知ID",

View File

@@ -31,7 +31,7 @@ class DynamicWeChat(_PluginBase):
# 插件图标
plugin_icon = "Wecom_A.png"
# 插件版本
plugin_version = "1.6.0"
plugin_version = "1.7.0"
# 插件作者
plugin_author = "RamenRa"
# 作者主页
@@ -61,10 +61,20 @@ class DynamicWeChat(_PluginBase):
_is_special_upload = False
# 聚合通知
_my_send = None
# 多wan口支持
wan2 = None
# 当前检测url
wan2_url = None
# IP变动后发送通知开关
_await_ip = False
# 保存cookie
_saved_cookie = None
# 通知方式token/api
_notification_token = ''
# 标记企业微信通知可用
_wechat_available = True
# 标记IP变动后 是否发送通知
_send_notification = False
# 匹配ip地址的正则
_ip_pattern = r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b'
@@ -96,8 +106,9 @@ class DynamicWeChat(_PluginBase):
_use_cookiecloud = True
# 登录cookie
_cookie_header = ""
# 内建CookieCloud服务器
_server = f'http://localhost:{settings.NGINX_PORT}/cookiecloud'
# CookieCloud客户端
_cookiecloud = CookieCloudHelper()
# 定时器
_scheduler: Optional[BackgroundScheduler] = None
@@ -124,31 +135,42 @@ class DynamicWeChat(_PluginBase):
self._cron = config.get("cron")
self._onlyonce = config.get("onlyonce")
self._input_id_list = config.get("input_id_list")
self._current_ip_address = config.get("current_ip_address")
# self._current_ip_address = config.get("current_ip_address")
self._forced_update = config.get("forced_update")
self._local_scan = config.get("local_scan")
self._use_cookiecloud = config.get("use_cookiecloud")
self._cookie_header = config.get("cookie_header")
self._ip_changed = config.get("ip_changed")
self._await_ip = config.get("await_ip")
if self.version != "v1":
self._my_send = MySender(self._notification_token, func=self.post_message)
else:
self._my_send = MySender(self._notification_token)
if not self._my_send.init_success: # 没有输入通知方式,不通知
self._my_send = None
_, self._current_ip_address = self.get_ip_from_url(self._input_id_list)
if self._my_send and not self._my_send.other_channel: # 确保跟随通知配置,一定要配置了第三方才可以使用
self._await_ip = False
if "||wan2" in self._input_id_list: # 多wan口
self.wan2 = IpLocationParser(self._settings_file_path)
self._current_ip_address = self.wan2.read_ips("ips") # 从文件中读取
logger.info(f"当前记录的IP{self._current_ip_address}如为空或不一致预计检测1-2轮内会修正")
else:
self.wan2 = None
_, self._current_ip_address = self.get_ip_from_url() # 直接从网页获取
# 停止现有任务
self.stop_service()
if (self._enabled or self._onlyonce) and self._input_id_list:
# 定时服务
self._scheduler = BackgroundScheduler(timezone=settings.TZ)
# 运行一次定时服务
if self._onlyonce:
if not self._forced_update or not self._local_scan:
# logger.info("立即检测公网IP")
self._scheduler.add_job(func=self.check, trigger='date',
run_date=datetime.now(tz=pytz.timezone(settings.TZ)) + timedelta(seconds=3),
name="检测公网IP") # 添加任务
if self._onlyonce: # 多网口ip检测禁用立即检测
if not self.wan2:
if not self._forced_update or not self._local_scan:
# logger.info("立即检测公网IP")
self._scheduler.add_job(func=self.check, trigger='date',
run_date=datetime.now(tz=pytz.timezone(settings.TZ)) + timedelta(seconds=3),
name="检测公网IP") # 添加任务
else:
logger.info("启用多网口检测时禁用‘立即检测一次’功能")
# 关闭一次性开关
self._onlyonce = False
@@ -176,6 +198,15 @@ class DynamicWeChat(_PluginBase):
logger.error(f"定时任务配置错误:{err}")
self.systemmessage.put(f"执行周期配置错误:{err}")
if self.wan2:
try:
self._scheduler.add_job(func=self.get_ip_from_url,
trigger=CronTrigger.from_crontab(self._cron),
name="多wan口公网IP检测")
except Exception as err:
logger.error(f"多wan口公网IP检测定时任务配置错误{err}")
self.systemmessage.put(f"执行周期配置错误:{err}")
# 启动任务
if self._scheduler.get_jobs():
self._scheduler.print_jobs()
@@ -184,13 +215,13 @@ class DynamicWeChat(_PluginBase):
def _send_cookie_false(self):
self._cookie_valid = False
if self._my_send:
result = self._my_send.send(
if self._my_send and not self._await_ip: # 不启用“IP变动后通知”
error = self._my_send.send(
title="cookie已失效,请及时更新",
content="请在企业微信应用发送/push_qr, 如有验证码以''结束发送到企业微信应用。 如果使用微信通知请确保公网IP还没有变动",
image=None, force_send=False
)
if result:
if error:
logger.info(f"cookie失效通知发送失败,原因:{result}")
@eventmanager.register(EventType.PluginAction)
@@ -290,31 +321,70 @@ class DynamicWeChat(_PluginBase):
self.ChangeIP()
self.__update_config()
logger.info("----------------------本次任务结束----------------------")
elif self._await_ip and not self._send_notification:
# logger.info("cookie已失效。但配置了第三方通知继续检测公网IP。当IP变动企业微信通知彻底无法使用时通知用户")
logger.info("开始检测公网IP,等待IP变动后发送通知")
if self.CheckIP(func="public"):
# logger.info(f"配置的第三方通知{self._my_send.other_channel}")
for channel, token in self._my_send.other_channel:
# logger.info(f"正常尝试:{channel} {token}")
error = self._my_send.send(
title="公网IP与企业微信IP不一致",
content="请在企业微信应用发送/push_qr, 如有验证码以''结束发送到企业微信应用。",
image=None, force_send=False, diy_channel=channel, diy_token=token
)
if error:
logger.error(f"通道 {channel} 发送失败,原因:{error}")
else:
self._send_notification = True
break # 发送成功后退出循环
self._wechat_available = False # 标记不可用
logger.info("----------------------本次任务结束----------------------")
else:
logger.warning("cookie已失效请及时更新,本次不检查公网IP")
if self._send_notification:
logger.info("企业微信可信IP和公网IP不一致微信通知可能已经无法使用。第三方通知已经发送。")
else:
logger.info("cookie已失效请及时更新,本次不检查公网IP")
def CheckIP(self):
url, ip_address = self.get_ip_from_url(self._input_id_list)
if url and ip_address:
logger.info(f"IP获取成功: {url}: {ip_address}")
def CheckIP(self, func=None):
if self.wan2:
ip_address = self.wan2.read_ips("url_ip")
url = self.wan2_url
else:
url, ip_address = self.get_ip_from_url()
# 如果所有 URL 请求失败
if ip_address == "获取IP失败" or not url:
logger.error("获取IP失败 不操作可信IP")
return False
elif not self._ip_changed: # 上次修改IP失败
# 成功获取 IP记录日志
if url and ip_address:
logger.info(f"IP获取成功: {url}: {ip_address}")
# 上次修改 IP 失败时,继续尝试修改
if not self._ip_changed and func != "public": # 排除cookie失效 检测公网变动的任务
logger.info("上次IP修改IP失败 继续尝试修改IP")
self._current_ip_address = ip_address
return True
# 检查 IP 是否变化
if ip_address != self._current_ip_address:
logger.info("检测到IP变化")
self._current_ip_address = ip_address
return True
# 如果有 wan2则处理新增的 IP 地址
if self.wan2:
if isinstance(ip_address, str):
url_ips = ip_address.split(";") # 将字符串按分号拆分为多个 IP 地址
else:
url_ips = ip_address
saved_ips = self.wan2.read_ips("ips")
# 检查每个新 IP 是否存在,若不存在则添加并返回 True
for ip in url_ips:
if ip not in saved_ips:
self.wan2.add_ips("ips", ip) # 将url获取到的新IP添加到ips字段
return True
else:
return False
# 检查 IP 是否变化
if ip_address != self._current_ip_address:
logger.info("检测到IP变化")
return True
return False
def try_connect_cc(self):
if not self._use_cookiecloud: # 不使用CookieCloud
@@ -336,30 +406,57 @@ class DynamicWeChat(_PluginBase):
self._cc_server = None
logger.error("没有可用的CookieCloud服务器")
def get_ip_from_url(self, input_data) -> (str, str):
def get_ip_from_url(self) -> (str, str):
# 根据输入解析 URL 列表
if isinstance(input_data, str) and "||" in input_data:
_, url_list = input_data.split("||", 1)
if isinstance(self._input_id_list, str) and "||" in self._input_id_list:
_, url_list = self._input_id_list.split("||", 1)
urls = url_list.split(",")
elif isinstance(input_data, list):
urls = input_data
elif isinstance(self._input_id_list, list):
urls = self._input_id_list
else:
urls = self._ip_urls
# 随机化 URL 列表
random.shuffle(urls)
for url in urls:
try:
response = requests.get(url, timeout=3)
if response.status_code == 200:
ip_address = re.search(self._ip_pattern, response.text)
if ip_address:
return url, ip_address.group() # 返回匹配的 IP 地址
except Exception as e:
if "104" not in str(e) or 'Read timed out' not in str(e): # 忽略网络波动,都失败会返回None, "获取IP失败"
logger.warning(f"{url} 获取IP失败, Error: {e}")
return None, "获取IP失败"
if not self.wan2:
for url in urls:
try:
response = requests.get(url, timeout=3)
if response.status_code == 200:
ip_address = re.search(self._ip_pattern, response.text)
if ip_address:
# self.wan1.overwrite_ips("url_ip", ip_address.group())
return url, ip_address.group() # 返回匹配的 IP 地址
except Exception as e:
if "104" not in str(e) and 'Read timed out' not in str(e): # 忽略网络波动,都失败会返回None, "获取IP失败"
logger.warning(f"{url} 获取IP失败, Error: {e}")
return None, "获取IP失败"
else:
urls = ["https://ip.skk.moe/multi", "https://ip.orz.tools"]
random.shuffle(urls)
# 创建一个 Playwright 实例
with sync_playwright() as p:
browser = None # 定义浏览器变量
for url in urls:
try:
# 启动浏览器
if url == "https://ip.skk.moe/multi":
browser = p.chromium.launch(headless=False, args=['--lang=zh-CN'])
else:
browser = p.chromium.launch(headless=True, args=['--lang=zh-CN'])
page = browser.new_page()
china_ips = self.wan2.get_ipv4(page, url)
if china_ips:
self.wan2_url = url
self.wan2.overwrite_ips("url_ip", china_ips) # 将获取到的IP写入文件 覆盖写入
return url, china_ips # 成功获取到IP后返回
except Exception as e:
logger.warning(f"{url} 多出口IP获取失败, Error: {e}")
finally:
if browser:
browser.close()
browser = None # 重置浏览器变量
return None, "获取IP失败"
def find_qrc(self, page):
# 查找 iframe 元素并切换到它
@@ -475,9 +572,11 @@ class DynamicWeChat(_PluginBase):
if current_cookies is None:
self._send_cookie_false()
logger.error("更新本地 Cookie失败")
self._is_special_upload = False
return
else:
logger.info("更新本地 Cookie成功")
self._is_special_upload = True
self._saved_cookie = current_cookies # 保存
self._cookie_valid = True
except Exception as e:
@@ -498,10 +597,6 @@ class DynamicWeChat(_PluginBase):
for domain, cookie in cookies.items():
if domain == ".work.weixin.qq.com":
cookie_header = cookie
if '_upload_type=A' in cookie:
self._is_special_upload = True
else:
self._is_special_upload = False
break
if cookie_header == '':
cookie_header = self._cookie_header
@@ -511,11 +606,15 @@ class DynamicWeChat(_PluginBase):
logger.error(f"从CookieCloud获取cookie错误,错误原因:{e}")
return
@staticmethod
def parse_cookie_header(cookie_header):
# @staticmethod
def parse_cookie_header(self, cookie_header):
cookies = []
self._is_special_upload = False
for cookie in cookie_header.split(';'):
name, value = cookie.strip().split('=', 1)
if name == '_upload_type' and value == 'A':
self._is_special_upload = True
continue
cookies.append({
'name': name,
'value': value,
@@ -613,6 +712,7 @@ class DynamicWeChat(_PluginBase):
# 等待登录成功的元素出现
success_element = page.wait_for_selector('#check_corp_info', timeout=5000)
if success_element:
self._verification_code = None
logger.info("验证码登录成功!")
return True
else:
@@ -628,6 +728,7 @@ class DynamicWeChat(_PluginBase):
def click_app_management_buttons(self, page):
self._cookie_valid = True
self._my_send.reset_limit() # 解除限制 可以发送cookie失效提醒
bash_url = "https://work.weixin.qq.com/wework_admin/frame#apps/modApiApp/"
# 按钮的选择器和名称
buttons = [
@@ -637,7 +738,10 @@ class DynamicWeChat(_PluginBase):
"//div[contains(@class, 'js_show_ipConfig_dialog')]//a[contains(@class, '_mod_card_operationLink') and text()='配置']",
"配置")
]
_, self._current_ip_address = self.get_ip_from_url(self._input_id_list)
if self.wan2: # 多wan口从文件读取 ip
self._current_ip_address = self.wan2.read_ips("ips")
else:
_, self._current_ip_address = self.get_ip_from_url()
if "||" in self._input_id_list:
parts = self._input_id_list.split("||", 1)
input_id_list = parts[0]
@@ -664,6 +768,7 @@ class DynamicWeChat(_PluginBase):
# logger.info(f"已找到文本框")
input_area = page.locator('textarea.js_ipConfig_textarea')
confirm = page.locator('.js_ipConfig_confirmBtn')
# logger.info(f"即将输入的内容:'{input_ip}'")
input_area.fill(self._current_ip_address) # 填充 IP 地址
confirm.click() # 点击确认按钮
time.sleep(3) # 等待处理
@@ -674,14 +779,25 @@ class DynamicWeChat(_PluginBase):
if "disabled" in str(e):
logger.info(f"应用{app_id} 已被禁用,可能是没有设置接收api")
if self._ip_changed:
self._wechat_available = True # 标记微信通知重新有效
self._send_notification = False # 重置第三方通知已发送标记
masked_ips = [self.mask_ip(ip) for ip in self._current_ip_address.split(';')]
masked_ip_string = ";".join(masked_ips)
logger.info(f"应用: {app_id} 输入IP" + self._current_ip_address)
ip_parts = self._current_ip_address.split('.')
masked_ip = f"{ip_parts[0]}.{len(ip_parts[1]) * '*'}.{len(ip_parts[2]) * '*'}.{ip_parts[3]}"
if self._my_send:
self._my_send.send(title="更新可信IP成功",
content='应用: ' + app_id + ' 输入IP' + masked_ip,
content='应用: ' + app_id + ' 输入IP' + masked_ip_string,
force_send=True, diy_channel="WeChat")
@staticmethod
def mask_ip(ip):
ip_parts = ip.split('.')
if len(ip_parts) == 4: # 确保是有效的 IPv4 地址
# 使用星号替换第二和第三部分
masked_ip = f"{ip_parts[0]}.{len(ip_parts[1]) * '*'}.{len(ip_parts[2]) * '*'}.{ip_parts[3]}"
return masked_ip
return ip # 如果不是有效的 IP 地址,返回原地址
def __update_config(self):
"""
更新配置
@@ -691,8 +807,8 @@ class DynamicWeChat(_PluginBase):
"onlyonce": self._onlyonce,
"cron": self._cron,
"notification_token": self._notification_token,
"current_ip_address": self._current_ip_address,
"ip_changed": self._ip_changed,
# "current_ip_address": self._current_ip_address,
"await_ip": self._await_ip,
"forced_update": self._forced_update,
"local_scan": self._local_scan,
"input_id_list": self._input_id_list,
@@ -798,7 +914,26 @@ class DynamicWeChat(_PluginBase):
}
}
]
}
},
*(
[{
'component': 'VCol',
'props': {
'cols': 12,
'md': 4
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'await_ip',
'label': 'IP变动后通知',
}
}
]
}]
if self._my_send and self._my_send.other_channel else []
)
]
},
{
@@ -877,7 +1012,7 @@ class DynamicWeChat(_PluginBase):
'props': {
'type': 'info',
'variant': 'tonal',
'text': '建议启用内建或自定义CookieCloud。支持微信Server酱等第三方通知,具体请查看作者主页'
'text': '建议启用内建或自定义CookieCloud。支持微信Server酱等第三方通知具体请查看作者主页'
}
}
]
@@ -897,7 +1032,7 @@ class DynamicWeChat(_PluginBase):
'component': 'VAlert',
'props': {
'type': 'info',
'text': 'Cookie失效时通知用户,用户使用/push_qr让插件推送二维码。使用第三方通知时填写对应Token/API'
'text': 'Cookie失效时通知用户用户使用/push_qr让插件推送二维码。使用第三方通知时填写对应Token/API'
}
}
]
@@ -913,6 +1048,7 @@ class DynamicWeChat(_PluginBase):
"forceUpdate": False,
"use_cookiecloud": True,
"use_local_qr": False,
"await_ip": False,
"cookie_header": "",
"notification_token": "",
"input_id_list": ""
@@ -969,7 +1105,7 @@ class DynamicWeChat(_PluginBase):
}
}
}
if self._is_special_upload:
if self._is_special_upload and self._enabled:
# 计算 cookie_lifetime 的天数、小时数和分钟数
cookie_lifetime_days = self._cookie_lifetime // 86400 # 一天的秒数为 86400
cookie_lifetime_hours = (self._cookie_lifetime % 86400) // 3600 # 计算小时数
@@ -1066,12 +1202,24 @@ class DynamicWeChat(_PluginBase):
image_src, refuse_time = self.find_qrc(page)
if image_src:
if self._my_send:
result = self._my_send.send("企业微信登录二维码", image=image_src)
if result:
logger.info(f"远程推送任务: 二维码发送失败,原因:{result}")
browser.close()
logger.info("----------------------本次任务结束----------------------")
return
if not self._wechat_available and self._my_send.other_channel: # 微信通知已经无法使用
for channel, token in self._my_send.other_channel:
# logger.info(f"正常尝试:{channel} {token}")
error = self._my_send.send(
title="企业微信登录二维码",
image=image_src, diy_channel=channel, diy_token=token
)
if error:
logger.warning(f"通道 {channel} 推送二维码失败,原因:{error}")
else:
break # 发送成功后退出循环
else: # 硬发
error = self._my_send.send("企业微信登录二维码", image=image_src)
if error:
logger.info(f"远程推送任务: 二维码发送失败,原因:{error}")
browser.close()
logger.info("----------------------本次任务结束----------------------")
return
logger.info("远程推送任务: 二维码发送成功,等待用户 90 秒内扫码登录。V2'微信通知'的用户,此消息并不准确")
# logger.info("远程推送任务: 如收到短信验证码请以?结束,发送到<企业微信应用> 如: 110301")
time.sleep(90)
@@ -1129,7 +1277,9 @@ class DynamicWeChat(_PluginBase):
}]
"""
if self._enabled and self._cron:
# logger.info(f"{self.plugin_name}定时服务启动,时间间隔 {self._cron} ")
logger.info(f"服务启动")
if self.wan2:
logger.info("多网口检测第一次获取IP可能会失败")
return [{
"id": self.__class__.__name__,
"name": f"{self.plugin_name}服务",

View File

@@ -1,8 +1,4 @@
import re
import requests
from app.modules.wechat import WeChat
from app.schemas.types import NotificationType,MessageChannel
import os
import json
import requests
@@ -12,6 +8,9 @@ from typing import Dict, Any
from Crypto import Random
from Crypto.Cipher import AES
from app.modules.wechat import WeChat
from app.schemas.types import NotificationType, MessageChannel
def bytes_to_key(data: bytes, salt: bytes, output=48) -> bytes:
# 兼容v2 将bytes_to_key和encrypt导入
@@ -100,7 +99,7 @@ class PyCookieCloud:
return md5.hexdigest()[:16]
@staticmethod
def load_cookie_lifetime(settings_file: str = None): # 返回时间戳 单位秒
def load_cookie_lifetime(settings_file: str = None):
if os.path.exists(settings_file):
with open(settings_file, 'r') as file:
settings = json.load(file)
@@ -109,18 +108,21 @@ class PyCookieCloud:
return 0
@staticmethod
def save_cookie_lifetime(settings_file, cookie_lifetime): # 传入时间戳 单位秒
def save_cookie_lifetime(settings_file, cookie_lifetime):
data = {}
if os.path.exists(settings_file):
with open(settings_file, 'r') as file:
data = json.load(file)
# 只更新 _cookie_lifetime 字段,其它字段保持不变
data['_cookie_lifetime'] = cookie_lifetime
with open(settings_file, 'w') as file:
json.dump({'_cookie_lifetime': cookie_lifetime}, file)
json.dump(data, file, indent=4)
@staticmethod
def increase_cookie_lifetime(settings_file, seconds: int):
if os.path.exists(settings_file):
with open(settings_file, 'r') as file:
settings = json.load(file)
current_lifetime = settings.get('_cookie_lifetime', 0)
else:
current_lifetime = 0
current_lifetime = PyCookieCloud.load_cookie_lifetime(settings_file)
new_lifetime = current_lifetime + seconds
# 保存新的 _cookie_lifetime
PyCookieCloud.save_cookie_lifetime(settings_file, new_lifetime)
@@ -135,6 +137,14 @@ class MySender:
self.init_success = bool(self.tokens) # 标识初始化是否成功
self.post_message_func = func # V2 微信模式的 post_message 方法
@property
def other_channel(self):
"""
返回非 WeChat 通道及其对应 token 的列表
:return: [(channel, token), ...]
"""
return [(channel, token) for channel, token in zip(self.channels, self.tokens) if channel != "WeChat"]
@staticmethod
def _detect_channel(token):
"""根据 token 确定通知渠道"""
@@ -149,7 +159,7 @@ class MySender:
else:
return "PushPlus"
def send(self, title, content=None, image=None, force_send=False, diy_channel=None):
def send(self, title, content=None, image=None, force_send=False, diy_channel=None, diy_token=None):
"""发送消息"""
if not self.init_success:
return
@@ -162,14 +172,14 @@ class MySender:
# 如果指定了自定义通道,直接尝试发送
if diy_channel:
return self._try_send(title, content, image, diy_channel)
return self._try_send(title, content, image, channel=diy_channel, diy_token=diy_token)
# 尝试按顺序发送,直到成功或遍历所有通道
for i in range(len(self.tokens)):
for _ in range(len(self.tokens)):
token = self.tokens[self.current_index]
channel = self.channels[self.current_index]
try:
result = self._try_send(title, content, image, channel, token)
result = self._try_send(title, content, image, channel, token=token)
if result is None: # 成功时返回 None
return
except Exception as e:
@@ -177,20 +187,20 @@ class MySender:
self.current_index = (self.current_index + 1) % len(self.tokens)
return f"所有的通知方式都发送失败"
def _try_send(self, title, content, image, channel, token=None):
def _try_send(self, title, content, image, channel, token=None, diy_token=None):
"""尝试使用指定通道发送消息"""
if channel == "WeChat" and self.post_message_func:
return self._send_v2_wechat(title, content, image, token)
elif channel == "WeChat":
return self._send_wechat(title, content, image, token)
elif channel == "ServerChan":
return self._send_serverchan(title, content, image)
return self._send_serverchan(title, content, image, diy_token)
elif channel == "AnPush":
return self._send_anpush(title, content, image)
return self._send_anpush(title, content, image, diy_token)
elif channel == "PushPlus":
return self._send_pushplus(title, content, image)
return self._send_pushplus(title, content, image, diy_token)
else:
raise ValueError(f"Unknown channel: {channel}")
return f"未知的通知方式: {channel}"
@staticmethod
def _send_wechat(title, content, image, token):
@@ -208,8 +218,11 @@ class MySender:
return "微信通知发送错误"
return None
def _send_serverchan(self, title, content, image):
tmp_tokens = self.tokens[self.current_index]
def _send_serverchan(self, title, content, image, diy_token=None):
if diy_token:
tmp_tokens = diy_token
else:
tmp_tokens = self.tokens[self.current_index]
if ',' in tmp_tokens:
before_comma, after_comma = tmp_tokens.split(',', 1)
if before_comma.startswith('sctp') and image:
@@ -237,12 +250,15 @@ class MySender:
return f"Server酱通知错误: {result.get('message')}"
return None
def _send_anpush(self, title, content, image):
token = self.tokens[self.current_index] # 获取当前通道对应的 token
def _send_anpush(self, title, content, image, diy_token=None):
if diy_token:
token = diy_token
else:
token = self.tokens[self.current_index] # 获取当前通道对应的 token
if ',' in token:
channel, token = token.split(',', 1)
else:
return "可能AnPush 没有配置消息通道ID"
return "AnPush可能没有配置消息通道ID"
url = f"https://api.anpush.com/push/{token}"
payload = {
"title": title,
@@ -259,8 +275,11 @@ class MySender:
return "AnPush 消息通道未找到"
return None
def _send_pushplus(self, title, content, image):
token = self.tokens[self.current_index] # 获取当前通道对应的 token
def _send_pushplus(self, title, content, image, diy_token=None):
if diy_token:
token = diy_token
else:
token = self.tokens[self.current_index] # 获取当前通道对应的 token
pushplus_url = f"http://www.pushplus.plus/send/{token}"
# PushPlus发送逻辑
data = {
@@ -294,3 +313,220 @@ class MySender:
def reset_limit(self):
"""解除限制,允许再次发送纯文本消息"""
self.first_text_sent = False
class IpLocationParser:
def __init__(self, settings_file_path, max_ips=4):
self._settings_file_path = settings_file_path
self._max_ips = max_ips # 最大历史IP数量
self._ips = self.read_ips("ips") # 初始化时读取已存储的 IP 地址
@staticmethod
def _parse(page, url):
# 定义 URL 到解析函数的映射
parser_methods = {
"https://ip.orz.tools": IpLocationParser._parse_ip_orz_tools,
"https://ip.skk.moe/multi": IpLocationParser._parse_ip_skk_moe,
"http://revproxy.ustc.edu.cn:8000": IpLocationParser._parse_ip_ustc,
}
parser_method = parser_methods.get(url)
if parser_method is None:
return [], []
return parser_method(page)
@staticmethod
def _remove_duplicates(ipv4_addresses, locations):
"""去重并保持 IP 地址和归属地的对应关系"""
seen = set()
unique_ipv4 = []
unique_locations = []
for ip, location in zip(ipv4_addresses, locations):
if ip not in seen:
seen.add(ip)
unique_ipv4.append(ip)
unique_locations.append(location)
return unique_ipv4, unique_locations
@staticmethod
def _is_valid_ipv4(ip):
"""验证是否是合法的 IPv4 地址"""
return re.match(r'^\d{1,3}(\.\d{1,3}){3}$', ip) is not None
@staticmethod
def _parse_ip_orz_tools(page):
rows = page.query_selector_all('#results3 .row')
# print(f"ip_orz_tools共找到 {len(rows)} 行数据")
ipv4_addresses, locations = [], []
for i, row in enumerate(rows):
row_html = row.inner_html()
# 提取 IP 地址
ip_match = re.search(r'data-name="([^"]+)"', row_html)
if ip_match:
ip = ip_match.group(1).strip()
if not IpLocationParser._is_valid_ipv4(ip):
continue
else:
continue
# 提取位置数据
loc_element = row.query_selector('.loc.cell')
location = loc_element.inner_text().strip() if loc_element else "未知"
ipv4_addresses.append(ip)
locations.append(location)
return IpLocationParser._remove_duplicates(ipv4_addresses, locations)
@staticmethod
def _parse_ip_skk_moe(page):
rows = page.query_selector_all(
'body > div > section > div.x1n2onr6.xw2csxc.x10fe3q7.x116uinm.xdpxx8g > table > tbody > tr'
)
# print(f"skk共找到 {len(rows)} 行数据")
ipv4_addresses, locations = [], []
for i, row in enumerate(rows):
ip_element = row.query_selector('th')
loc_element = row.query_selector('td:nth-child(3)') # 假设归属地在第 3 列
if ip_element and loc_element:
ip = ip_element.inner_text().strip()
if not IpLocationParser._is_valid_ipv4(ip):
continue
location = loc_element.inner_text().strip()
ipv4_addresses.append(ip)
locations.append(location)
return IpLocationParser._remove_duplicates(ipv4_addresses, locations)
@staticmethod
def _parse_ip_ustc(page):
rows = page.query_selector_all(
'body > div:nth-child(4) > center > table > tbody > tr > td:nth-child(2)'
)
# print(f"ip_ustc共找到 {len(rows)} 行数据")
ipv4_addresses, locations = [], []
for row in rows:
row_text = row.inner_text().strip()
# 提取 IP 地址
ip_match = re.match(r'(\d+\.\d+\.\d+\.\d+)', row_text)
if ip_match:
ip = ip_match.group(1).strip()
if not IpLocationParser._is_valid_ipv4(ip):
continue
else:
continue
# 提取归属地
location_match = re.search(r'(China|中国).*', row_text)
location = location_match.group(0).strip() if location_match else "未知"
ipv4_addresses.append(ip)
locations.append(location)
return IpLocationParser._remove_duplicates(ipv4_addresses, locations)
@staticmethod
def get_ipv4(page, url: str) -> str:
"""返回多个中国 IP 地址,逗号分隔"""
# 导航到目标页面
page.goto(url)
# 等待一段时间,让所有动态渲染的内容加载完成
page.wait_for_timeout(10000) # 等待 10 秒钟
# 调用解析器解析数据
ipv4_addresses, locations = IpLocationParser._parse(page, url)
# 筛选出属于中国的 IP 地址
china_ips = [
ip for ip, location in zip(ipv4_addresses, locations)
if 'China' in location or '中国' in location
]
# 返回逗号分隔的字符串
return ';'.join(china_ips)
def _limit_and_deduplicate_ips(self, ips):
"""
去重并限制 IP 地址数量,最多保存 _max_ips 个 IP 地址。
"""
# 去重并保留顺序
unique_ips = list(dict.fromkeys(ips))
return unique_ips[:self._max_ips] # 保留最多 _max_ips 个 IP 地址
def _read_ips_from_json(self, field):
"""
从 JSON 文件中读取指定字段的 IP 地址。
"""
if not os.path.exists(self._settings_file_path):
return "" # 文件不存在,返回空字符串
try:
with open(self._settings_file_path, 'r') as f:
data = json.load(f)
# 获取字段内容并返回分号分隔的字符串
return ";".join(data.get(field, []))
except (json.JSONDecodeError, IOError):
return "" # 读取失败,返回空字符串
def _overwrite_ips_in_json(self, field, new_ips):
"""
覆盖写入指定字段的 IP 地址。
:param field: 要更新的字段名
:param new_ips: 新的 IP 地址列表或分号分隔的字符串
"""
# 如果输入是字符串,将其转换为列表
if isinstance(new_ips, str):
new_ips = new_ips.split(";")
# 去重并限制 IP 数量
new_ips = self._limit_and_deduplicate_ips(new_ips)
# 读取现有数据(如果文件不存在,则初始化空数据)
if os.path.exists(self._settings_file_path):
try:
with open(self._settings_file_path, 'r') as f:
data = json.load(f)
except (json.JSONDecodeError, IOError):
data = {}
else:
data = {}
# 更新指定字段
data[field] = new_ips
# 写入文件
with open(self._settings_file_path, 'w') as f:
json.dump(data, f, indent=4)
def read_ips(self, field) -> str:
"""
获取 JSON 文件中指定字段的所有 IP 地址,返回分号分隔的字符串。
"""
return self._read_ips_from_json(field)
def overwrite_ips(self, field, new_ips):
"""
覆盖写入指定字段的新 IP 地址。
"""
self._overwrite_ips_in_json(field, new_ips)
def add_ips(self, field, new_ips):
"""
增量添加指定字段中的 IP 地址。
:param field: 要更新的字段名
:param new_ips: 要添加的 IP 地址列表或分号分隔的字符串
"""
# 获取当前的 IP 地址
current_ips = self.read_ips(field).split(";") if self.read_ips(field) else []
# 合并新 IP 地址并去重、限制数量
updated_ips = self._limit_and_deduplicate_ips(new_ips.split(";") + current_ips)
# 写入更新后的 IP 地址
self.overwrite_ips(field, updated_ips)