使用第三方通知时支持IP变动后通知,拟支持多网络出口检查

This commit is contained in:
ramen
2024-12-22 10:57:02 +08:00
parent 8c86177dc1
commit 15cbcaeff1
3 changed files with 485 additions and 98 deletions

View File

@@ -31,7 +31,7 @@ class DynamicWeChat(_PluginBase):
# 插件图标
plugin_icon = "Wecom_A.png"
# 插件版本
plugin_version = "1.6.0"
plugin_version = "1.7.0"
# 插件作者
plugin_author = "RamenRa"
# 作者主页
@@ -61,10 +61,20 @@ class DynamicWeChat(_PluginBase):
_is_special_upload = False
# 聚合通知
_my_send = None
# 多wan口支持
wan2 = None
# 当前检测url
wan2_url = None
# IP变动后发送通知开关
_await_ip = False
# 保存cookie
_saved_cookie = None
# 通知方式token/api
_notification_token = ''
# 标记企业微信通知可用
_wechat_available = True
# 标记IP变动后 是否发送通知
_send_notification = False
# 匹配ip地址的正则
_ip_pattern = r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b'
@@ -96,8 +106,9 @@ class DynamicWeChat(_PluginBase):
_use_cookiecloud = True
# 登录cookie
_cookie_header = ""
# 内建CookieCloud服务器
_server = f'http://localhost:{settings.NGINX_PORT}/cookiecloud'
# CookieCloud客户端
_cookiecloud = CookieCloudHelper()
# 定时器
_scheduler: Optional[BackgroundScheduler] = None
@@ -124,31 +135,42 @@ class DynamicWeChat(_PluginBase):
self._cron = config.get("cron")
self._onlyonce = config.get("onlyonce")
self._input_id_list = config.get("input_id_list")
self._current_ip_address = config.get("current_ip_address")
# self._current_ip_address = config.get("current_ip_address")
self._forced_update = config.get("forced_update")
self._local_scan = config.get("local_scan")
self._use_cookiecloud = config.get("use_cookiecloud")
self._cookie_header = config.get("cookie_header")
self._ip_changed = config.get("ip_changed")
self._await_ip = config.get("await_ip")
if self.version != "v1":
self._my_send = MySender(self._notification_token, func=self.post_message)
else:
self._my_send = MySender(self._notification_token)
if not self._my_send.init_success: # 没有输入通知方式,不通知
self._my_send = None
_, self._current_ip_address = self.get_ip_from_url(self._input_id_list)
if self._my_send and not self._my_send.other_channel: # 确保跟随通知配置,一定要配置了第三方才可以使用
self._await_ip = False
if "||wan2" in self._input_id_list: # 多wan口
self.wan2 = IpLocationParser(self._settings_file_path)
self._current_ip_address = self.wan2.read_ips("ips") # 从文件中读取
logger.info(f"当前记录的IP{self._current_ip_address}如为空或不一致预计检测1-2轮内会修正")
else:
self.wan2 = None
_, self._current_ip_address = self.get_ip_from_url() # 直接从网页获取
# 停止现有任务
self.stop_service()
if (self._enabled or self._onlyonce) and self._input_id_list:
# 定时服务
self._scheduler = BackgroundScheduler(timezone=settings.TZ)
# 运行一次定时服务
if self._onlyonce:
if not self._forced_update or not self._local_scan:
# logger.info("立即检测公网IP")
self._scheduler.add_job(func=self.check, trigger='date',
run_date=datetime.now(tz=pytz.timezone(settings.TZ)) + timedelta(seconds=3),
name="检测公网IP") # 添加任务
if self._onlyonce: # 多网口ip检测禁用立即检测
if not self.wan2:
if not self._forced_update or not self._local_scan:
# logger.info("立即检测公网IP")
self._scheduler.add_job(func=self.check, trigger='date',
run_date=datetime.now(tz=pytz.timezone(settings.TZ)) + timedelta(seconds=3),
name="检测公网IP") # 添加任务
else:
logger.info("启用多网口检测时禁用‘立即检测一次’功能")
# 关闭一次性开关
self._onlyonce = False
@@ -176,6 +198,15 @@ class DynamicWeChat(_PluginBase):
logger.error(f"定时任务配置错误:{err}")
self.systemmessage.put(f"执行周期配置错误:{err}")
if self.wan2:
try:
self._scheduler.add_job(func=self.get_ip_from_url,
trigger=CronTrigger.from_crontab(self._cron),
name="多wan口公网IP检测")
except Exception as err:
logger.error(f"多wan口公网IP检测定时任务配置错误{err}")
self.systemmessage.put(f"执行周期配置错误:{err}")
# 启动任务
if self._scheduler.get_jobs():
self._scheduler.print_jobs()
@@ -184,13 +215,13 @@ class DynamicWeChat(_PluginBase):
def _send_cookie_false(self):
self._cookie_valid = False
if self._my_send:
result = self._my_send.send(
if self._my_send and not self._await_ip: # 不启用“IP变动后通知”
error = self._my_send.send(
title="cookie已失效,请及时更新",
content="请在企业微信应用发送/push_qr, 如有验证码以''结束发送到企业微信应用。 如果使用微信通知请确保公网IP还没有变动",
image=None, force_send=False
)
if result:
if error:
logger.info(f"cookie失效通知发送失败,原因:{result}")
@eventmanager.register(EventType.PluginAction)
@@ -290,31 +321,70 @@ class DynamicWeChat(_PluginBase):
self.ChangeIP()
self.__update_config()
logger.info("----------------------本次任务结束----------------------")
elif self._await_ip and not self._send_notification:
# logger.info("cookie已失效。但配置了第三方通知继续检测公网IP。当IP变动企业微信通知彻底无法使用时通知用户")
logger.info("开始检测公网IP,等待IP变动后发送通知")
if self.CheckIP(func="public"):
# logger.info(f"配置的第三方通知{self._my_send.other_channel}")
for channel, token in self._my_send.other_channel:
# logger.info(f"正常尝试:{channel} {token}")
error = self._my_send.send(
title="公网IP与企业微信IP不一致",
content="请在企业微信应用发送/push_qr, 如有验证码以''结束发送到企业微信应用。",
image=None, force_send=False, diy_channel=channel, diy_token=token
)
if error:
logger.error(f"通道 {channel} 发送失败,原因:{error}")
else:
self._send_notification = True
break # 发送成功后退出循环
self._wechat_available = False # 标记不可用
logger.info("----------------------本次任务结束----------------------")
else:
logger.warning("cookie已失效请及时更新,本次不检查公网IP")
if self._send_notification:
logger.info("企业微信可信IP和公网IP不一致微信通知可能已经无法使用。第三方通知已经发送。")
else:
logger.info("cookie已失效请及时更新,本次不检查公网IP")
def CheckIP(self):
url, ip_address = self.get_ip_from_url(self._input_id_list)
if url and ip_address:
logger.info(f"IP获取成功: {url}: {ip_address}")
def CheckIP(self, func=None):
if self.wan2:
ip_address = self.wan2.read_ips("url_ip")
url = self.wan2_url
else:
url, ip_address = self.get_ip_from_url()
# 如果所有 URL 请求失败
if ip_address == "获取IP失败" or not url:
logger.error("获取IP失败 不操作可信IP")
return False
elif not self._ip_changed: # 上次修改IP失败
# 成功获取 IP记录日志
if url and ip_address:
logger.info(f"IP获取成功: {url}: {ip_address}")
# 上次修改 IP 失败时,继续尝试修改
if not self._ip_changed and func != "public": # 排除cookie失效 检测公网变动的任务
logger.info("上次IP修改IP失败 继续尝试修改IP")
self._current_ip_address = ip_address
return True
# 检查 IP 是否变化
if ip_address != self._current_ip_address:
logger.info("检测到IP变化")
self._current_ip_address = ip_address
return True
# 如果有 wan2则处理新增的 IP 地址
if self.wan2:
if isinstance(ip_address, str):
url_ips = ip_address.split(";") # 将字符串按分号拆分为多个 IP 地址
else:
url_ips = ip_address
saved_ips = self.wan2.read_ips("ips")
# 检查每个新 IP 是否存在,若不存在则添加并返回 True
for ip in url_ips:
if ip not in saved_ips:
self.wan2.add_ips("ips", ip) # 将url获取到的新IP添加到ips字段
return True
else:
return False
# 检查 IP 是否变化
if ip_address != self._current_ip_address:
logger.info("检测到IP变化")
return True
return False
def try_connect_cc(self):
if not self._use_cookiecloud: # 不使用CookieCloud
@@ -336,30 +406,57 @@ class DynamicWeChat(_PluginBase):
self._cc_server = None
logger.error("没有可用的CookieCloud服务器")
def get_ip_from_url(self, input_data) -> (str, str):
def get_ip_from_url(self) -> (str, str):
# 根据输入解析 URL 列表
if isinstance(input_data, str) and "||" in input_data:
_, url_list = input_data.split("||", 1)
if isinstance(self._input_id_list, str) and "||" in self._input_id_list:
_, url_list = self._input_id_list.split("||", 1)
urls = url_list.split(",")
elif isinstance(input_data, list):
urls = input_data
elif isinstance(self._input_id_list, list):
urls = self._input_id_list
else:
urls = self._ip_urls
# 随机化 URL 列表
random.shuffle(urls)
for url in urls:
try:
response = requests.get(url, timeout=3)
if response.status_code == 200:
ip_address = re.search(self._ip_pattern, response.text)
if ip_address:
return url, ip_address.group() # 返回匹配的 IP 地址
except Exception as e:
if "104" not in str(e) or 'Read timed out' not in str(e): # 忽略网络波动,都失败会返回None, "获取IP失败"
logger.warning(f"{url} 获取IP失败, Error: {e}")
return None, "获取IP失败"
if not self.wan2:
for url in urls:
try:
response = requests.get(url, timeout=3)
if response.status_code == 200:
ip_address = re.search(self._ip_pattern, response.text)
if ip_address:
# self.wan1.overwrite_ips("url_ip", ip_address.group())
return url, ip_address.group() # 返回匹配的 IP 地址
except Exception as e:
if "104" not in str(e) and 'Read timed out' not in str(e): # 忽略网络波动,都失败会返回None, "获取IP失败"
logger.warning(f"{url} 获取IP失败, Error: {e}")
return None, "获取IP失败"
else:
urls = ["https://ip.skk.moe/multi", "https://ip.orz.tools"]
random.shuffle(urls)
# 创建一个 Playwright 实例
with sync_playwright() as p:
browser = None # 定义浏览器变量
for url in urls:
try:
# 启动浏览器
if url == "https://ip.skk.moe/multi":
browser = p.chromium.launch(headless=False, args=['--lang=zh-CN'])
else:
browser = p.chromium.launch(headless=True, args=['--lang=zh-CN'])
page = browser.new_page()
china_ips = self.wan2.get_ipv4(page, url)
if china_ips:
self.wan2_url = url
self.wan2.overwrite_ips("url_ip", china_ips) # 将获取到的IP写入文件 覆盖写入
return url, china_ips # 成功获取到IP后返回
except Exception as e:
logger.warning(f"{url} 多出口IP获取失败, Error: {e}")
finally:
if browser:
browser.close()
browser = None # 重置浏览器变量
return None, "获取IP失败"
def find_qrc(self, page):
# 查找 iframe 元素并切换到它
@@ -475,9 +572,11 @@ class DynamicWeChat(_PluginBase):
if current_cookies is None:
self._send_cookie_false()
logger.error("更新本地 Cookie失败")
self._is_special_upload = False
return
else:
logger.info("更新本地 Cookie成功")
self._is_special_upload = True
self._saved_cookie = current_cookies # 保存
self._cookie_valid = True
except Exception as e:
@@ -498,10 +597,6 @@ class DynamicWeChat(_PluginBase):
for domain, cookie in cookies.items():
if domain == ".work.weixin.qq.com":
cookie_header = cookie
if '_upload_type=A' in cookie:
self._is_special_upload = True
else:
self._is_special_upload = False
break
if cookie_header == '':
cookie_header = self._cookie_header
@@ -511,11 +606,15 @@ class DynamicWeChat(_PluginBase):
logger.error(f"从CookieCloud获取cookie错误,错误原因:{e}")
return
@staticmethod
def parse_cookie_header(cookie_header):
# @staticmethod
def parse_cookie_header(self, cookie_header):
cookies = []
self._is_special_upload = False
for cookie in cookie_header.split(';'):
name, value = cookie.strip().split('=', 1)
if name == '_upload_type' and value == 'A':
self._is_special_upload = True
continue
cookies.append({
'name': name,
'value': value,
@@ -613,6 +712,7 @@ class DynamicWeChat(_PluginBase):
# 等待登录成功的元素出现
success_element = page.wait_for_selector('#check_corp_info', timeout=5000)
if success_element:
self._verification_code = None
logger.info("验证码登录成功!")
return True
else:
@@ -628,6 +728,7 @@ class DynamicWeChat(_PluginBase):
def click_app_management_buttons(self, page):
self._cookie_valid = True
self._my_send.reset_limit() # 解除限制 可以发送cookie失效提醒
bash_url = "https://work.weixin.qq.com/wework_admin/frame#apps/modApiApp/"
# 按钮的选择器和名称
buttons = [
@@ -637,7 +738,10 @@ class DynamicWeChat(_PluginBase):
"//div[contains(@class, 'js_show_ipConfig_dialog')]//a[contains(@class, '_mod_card_operationLink') and text()='配置']",
"配置")
]
_, self._current_ip_address = self.get_ip_from_url(self._input_id_list)
if self.wan2: # 多wan口从文件读取 ip
self._current_ip_address = self.wan2.read_ips("ips")
else:
_, self._current_ip_address = self.get_ip_from_url()
if "||" in self._input_id_list:
parts = self._input_id_list.split("||", 1)
input_id_list = parts[0]
@@ -664,6 +768,7 @@ class DynamicWeChat(_PluginBase):
# logger.info(f"已找到文本框")
input_area = page.locator('textarea.js_ipConfig_textarea')
confirm = page.locator('.js_ipConfig_confirmBtn')
# logger.info(f"即将输入的内容:'{input_ip}'")
input_area.fill(self._current_ip_address) # 填充 IP 地址
confirm.click() # 点击确认按钮
time.sleep(3) # 等待处理
@@ -674,14 +779,25 @@ class DynamicWeChat(_PluginBase):
if "disabled" in str(e):
logger.info(f"应用{app_id} 已被禁用,可能是没有设置接收api")
if self._ip_changed:
self._wechat_available = True # 标记微信通知重新有效
self._send_notification = False # 重置第三方通知已发送标记
masked_ips = [self.mask_ip(ip) for ip in self._current_ip_address.split(';')]
masked_ip_string = ";".join(masked_ips)
logger.info(f"应用: {app_id} 输入IP" + self._current_ip_address)
ip_parts = self._current_ip_address.split('.')
masked_ip = f"{ip_parts[0]}.{len(ip_parts[1]) * '*'}.{len(ip_parts[2]) * '*'}.{ip_parts[3]}"
if self._my_send:
self._my_send.send(title="更新可信IP成功",
content='应用: ' + app_id + ' 输入IP' + masked_ip,
content='应用: ' + app_id + ' 输入IP' + masked_ip_string,
force_send=True, diy_channel="WeChat")
@staticmethod
def mask_ip(ip):
ip_parts = ip.split('.')
if len(ip_parts) == 4: # 确保是有效的 IPv4 地址
# 使用星号替换第二和第三部分
masked_ip = f"{ip_parts[0]}.{len(ip_parts[1]) * '*'}.{len(ip_parts[2]) * '*'}.{ip_parts[3]}"
return masked_ip
return ip # 如果不是有效的 IP 地址,返回原地址
def __update_config(self):
"""
更新配置
@@ -691,8 +807,8 @@ class DynamicWeChat(_PluginBase):
"onlyonce": self._onlyonce,
"cron": self._cron,
"notification_token": self._notification_token,
"current_ip_address": self._current_ip_address,
"ip_changed": self._ip_changed,
# "current_ip_address": self._current_ip_address,
"await_ip": self._await_ip,
"forced_update": self._forced_update,
"local_scan": self._local_scan,
"input_id_list": self._input_id_list,
@@ -798,7 +914,26 @@ class DynamicWeChat(_PluginBase):
}
}
]
}
},
*(
[{
'component': 'VCol',
'props': {
'cols': 12,
'md': 4
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'await_ip',
'label': 'IP变动后通知',
}
}
]
}]
if self._my_send and self._my_send.other_channel else []
)
]
},
{
@@ -877,7 +1012,7 @@ class DynamicWeChat(_PluginBase):
'props': {
'type': 'info',
'variant': 'tonal',
'text': '建议启用内建或自定义CookieCloud。支持微信Server酱等第三方通知,具体请查看作者主页'
'text': '建议启用内建或自定义CookieCloud。支持微信Server酱等第三方通知具体请查看作者主页'
}
}
]
@@ -897,7 +1032,7 @@ class DynamicWeChat(_PluginBase):
'component': 'VAlert',
'props': {
'type': 'info',
'text': 'Cookie失效时通知用户,用户使用/push_qr让插件推送二维码。使用第三方通知时填写对应Token/API'
'text': 'Cookie失效时通知用户用户使用/push_qr让插件推送二维码。使用第三方通知时填写对应Token/API'
}
}
]
@@ -913,6 +1048,7 @@ class DynamicWeChat(_PluginBase):
"forceUpdate": False,
"use_cookiecloud": True,
"use_local_qr": False,
"await_ip": False,
"cookie_header": "",
"notification_token": "",
"input_id_list": ""
@@ -969,7 +1105,7 @@ class DynamicWeChat(_PluginBase):
}
}
}
if self._is_special_upload:
if self._is_special_upload and self._enabled:
# 计算 cookie_lifetime 的天数、小时数和分钟数
cookie_lifetime_days = self._cookie_lifetime // 86400 # 一天的秒数为 86400
cookie_lifetime_hours = (self._cookie_lifetime % 86400) // 3600 # 计算小时数
@@ -1066,12 +1202,24 @@ class DynamicWeChat(_PluginBase):
image_src, refuse_time = self.find_qrc(page)
if image_src:
if self._my_send:
result = self._my_send.send("企业微信登录二维码", image=image_src)
if result:
logger.info(f"远程推送任务: 二维码发送失败,原因:{result}")
browser.close()
logger.info("----------------------本次任务结束----------------------")
return
if not self._wechat_available and self._my_send.other_channel: # 微信通知已经无法使用
for channel, token in self._my_send.other_channel:
# logger.info(f"正常尝试:{channel} {token}")
error = self._my_send.send(
title="企业微信登录二维码",
image=image_src, diy_channel=channel, diy_token=token
)
if error:
logger.warning(f"通道 {channel} 推送二维码失败,原因:{error}")
else:
break # 发送成功后退出循环
else: # 硬发
error = self._my_send.send("企业微信登录二维码", image=image_src)
if error:
logger.info(f"远程推送任务: 二维码发送失败,原因:{error}")
browser.close()
logger.info("----------------------本次任务结束----------------------")
return
logger.info("远程推送任务: 二维码发送成功,等待用户 90 秒内扫码登录。V2'微信通知'的用户,此消息并不准确")
# logger.info("远程推送任务: 如收到短信验证码请以?结束,发送到<企业微信应用> 如: 110301")
time.sleep(90)
@@ -1129,7 +1277,9 @@ class DynamicWeChat(_PluginBase):
}]
"""
if self._enabled and self._cron:
# logger.info(f"{self.plugin_name}定时服务启动,时间间隔 {self._cron} ")
logger.info(f"服务启动")
if self.wan2:
logger.info("多网口检测第一次获取IP可能会失败")
return [{
"id": self.__class__.__name__,
"name": f"{self.plugin_name}服务",