忽略因网络波动导致获取ip错误。自定义的类合并为helper.py。后续核心功能没问题将不再更新

This commit is contained in:
ramen
2024-12-11 21:43:58 +08:00
parent ee8c18278b
commit 9fad5c094a
4 changed files with 292 additions and 290 deletions

View File

@@ -861,20 +861,18 @@
"name": "动态企微可信IP",
"description": "修改企微应用可信IP支持Srever酱等第三方通知。验证码以结尾发送到企业微信应用",
"labels": "消息通知",
"version": "1.5.2",
"version": "1.6.0",
"icon": "Wecom_A.png",
"author": "RamenRa",
"level": 2,
"v2": true,
"history": {
"v1.6.0": "忽略因网络波动导致获取ip错误。自定义的类合并为helper.py。后续核心功能没问题将不再更新",
"v1.5.2": "可以从指定url获取ip,修复不使用cc时cookie失效过快v1可配置第三方为备用通知server酱可以将文本发送到server3,二维码给服务号",
"v1.5.1": "修复v2微信通知可以指定微信通知ID",
"v1.5.0": "支持企微应用通知和第Serve酱等第三方推送。按要求修改插件名称",
"v1.4.1": "完善面板说明",
"v1.4.0": "修复强制更改IP时配置面板延时过长的问题。庆祝v2进入正式版显示了一个没用的参数",
"v1.3.1": "修正一些逻辑判断修改ip成功会通知一次",
"v1.3.0": "兼容v2操作cookie前检查一次CookieCloud",
"v1.2.0": "远程命令/push_qr立即推送一次二维码到pushplus。添加<本地扫码刷新cookie>"
"v1.4.0": "修复强制更改IP时配置面板延时过长的问题。庆祝v2进入正式版显示了一个没用的参数"
}
},
"SyncCookieCloud": {

View File

@@ -19,20 +19,19 @@ from app.helper.cookiecloud import CookieCloudHelper
from app.log import logger
from app.plugins import _PluginBase
from app.schemas.types import EventType
from app.plugins.dynamicwechat.update_help import PyCookieCloud
from app.plugins.dynamicwechat.notify_helper import MySender
from app.plugins.dynamicwechat.helper import PyCookieCloud, MySender
class DynamicWeChat(_PluginBase):
# 插件名称
plugin_name = "动态企微可信IP"
# 插件描述
plugin_desc = "修改企微应用可信IP详细说明查看'作者主页'支持第三方通知。验证码以?结尾发送到企业微信应用"
plugin_desc = "修改企微应用可信IP,详细说明查看'作者主页',支持第三方通知。验证码以?结尾发送到企业微信应用"
# 插件图标
plugin_icon = "Wecom_A.png"
# 插件版本
plugin_version = "1.5.2"
plugin_version = "1.6.0"
# 插件作者
plugin_author = "RamenRa"
# 作者主页
@@ -111,6 +110,7 @@ class DynamicWeChat(_PluginBase):
def init_plugin(self, config: dict = None):
# 清空配置
self._notification_token = ''
self._cron = '*/10 * * * *'
self._ip_changed = True
self._forced_update = False
self._use_cookiecloud = True
@@ -134,12 +134,9 @@ class DynamicWeChat(_PluginBase):
self._my_send = MySender(self._notification_token, func=self.post_message)
else:
self._my_send = MySender(self._notification_token)
if not self._my_send.init_success: # 没有输入通知方式不通知
if not self._my_send.init_success: # 没有输入通知方式,不通知
self._my_send = None
if "||" in self._input_id_list:
parts = self._input_id_list.split("||", 1)
self._ip_urls = parts[1].split(",")
self._current_ip_address = self.get_ip_from_url(random.choice(self._ip_urls))
_, self._current_ip_address = self.get_ip_from_url(self._input_id_list)
# 停止现有任务
self.stop_service()
if (self._enabled or self._onlyonce) and self._input_id_list:
@@ -147,20 +144,24 @@ class DynamicWeChat(_PluginBase):
self._scheduler = BackgroundScheduler(timezone=settings.TZ)
# 运行一次定时服务
if self._onlyonce:
logger.info("立即检测公网IP")
self._scheduler.add_job(func=self.check, trigger='date',
run_date=datetime.now(tz=pytz.timezone(settings.TZ)) + timedelta(seconds=3),
name="检测公网IP") # 添加任务
if not self._forced_update or not self._local_scan:
# logger.info("立即检测公网IP")
self._scheduler.add_job(func=self.check, trigger='date',
run_date=datetime.now(tz=pytz.timezone(settings.TZ)) + timedelta(seconds=3),
name="检测公网IP") # 添加任务
# 关闭一次性开关
self._onlyonce = False
if self._forced_update:
self._scheduler.add_job(func=self.forced_change, trigger='date',
run_date=datetime.now(tz=pytz.timezone(settings.TZ)) + timedelta(seconds=3),
name="强制更新公网IP") # 添加任务
if not self._local_scan:
logger.info("使用Cookie,强制更新公网IP")
self._scheduler.add_job(func=self.forced_change, trigger='date',
run_date=datetime.now(tz=pytz.timezone(settings.TZ)) + timedelta(seconds=3),
name="强制更新公网IP") # 添加任务
self._forced_update = False
if self._local_scan:
logger.info("使用本地扫码登陆")
self._scheduler.add_job(func=self.local_scanning, trigger='date',
run_date=datetime.now(tz=pytz.timezone(settings.TZ)) + timedelta(seconds=3),
name="本地扫码登陆") # 添加任务
@@ -181,6 +182,17 @@ class DynamicWeChat(_PluginBase):
self._scheduler.start()
self.__update_config()
def _send_cookie_false(self):
self._cookie_valid = False
if self._my_send:
result = self._my_send.send(
title="cookie已失效,请及时更新",
content="请在企业微信应用发送/push_qr, 如有验证码以''结束发送到企业微信应用。 如果使用微信通知请确保公网IP还没有变动",
image=None, force_send=False
)
if result:
logger.info(f"cookie失效通知发送失败,原因:{result}")
@eventmanager.register(EventType.PluginAction)
def forced_change(self, event: Event = None):
"""
@@ -208,6 +220,7 @@ class DynamicWeChat(_PluginBase):
self.click_app_management_buttons(page)
else:
logger.error("cookie失效,强制修改IP失败请使用'本地扫码修改IP'")
self._cookie_valid = False
browser.close()
except Exception as err:
logger.error(f"强制修改IP失败{err}")
@@ -226,7 +239,6 @@ class DynamicWeChat(_PluginBase):
event_data = event.event_data
if not event_data or event_data.get("action") != "dynamicwechat":
return
try:
with sync_playwright() as p:
browser = p.chromium.launch(headless=True, args=['--lang=zh-CN'])
@@ -238,7 +250,7 @@ class DynamicWeChat(_PluginBase):
current_time = datetime.now()
future_time = current_time + timedelta(seconds=110)
self._future_timestamp = int(future_time.timestamp())
logger.info("请重新进入插件面板扫码! 每20秒检查登录状态最大尝试5次")
logger.info("请重新进入插件面板扫码! 每20秒检查登录状态,最大尝试5次")
max_attempts = 5
attempt = 0
while attempt < max_attempts:
@@ -252,7 +264,7 @@ class DynamicWeChat(_PluginBase):
else:
logger.info("用户可能没有扫码或登录失败")
else:
logger.error("未找到二维码任务结束")
logger.error("未找到二维码,任务结束")
logger.info("----------------------本次任务结束----------------------")
browser.close()
except Exception as e:
@@ -279,30 +291,20 @@ class DynamicWeChat(_PluginBase):
self.__update_config()
logger.info("----------------------本次任务结束----------------------")
else:
logger.warning("cookie已失效请及时更新本次不检查公网IP")
logger.warning("cookie已失效请及时更新,本次不检查公网IP")
def CheckIP(self):
if "||" in self._input_id_list:
parts = self._input_id_list.split("||", 1)
ip_urls = parts[1].split(",")
else:
ip_urls = self._ip_urls
retry_urls = random.sample(ip_urls, len(ip_urls))
ip_address = None
for url in retry_urls:
ip_address = self.get_ip_from_url(url)
if ip_address != "获取IP失败" and ip_address:
logger.info(f"IP获取成功: {url}: {ip_address}")
break
url, ip_address = self.get_ip_from_url(self._input_id_list)
if url and ip_address:
logger.info(f"IP获取成功: {url}: {ip_address}")
# 如果所有 URL 请求失败
if ip_address == "获取IP失败" or not ip_address:
logger.error("获取IP失败 不操作IP")
if ip_address == "获取IP失败" or not url:
logger.error("获取IP失败 不操作可信IP")
return False
elif not self._ip_changed: # 上次修改IP失败
logger.info("上次IP修改IP没有成功 继续尝试修改IP")
logger.info("上次IP修改IP失败 继续尝试修改IP")
self._current_ip_address = ip_address
return True
@@ -310,48 +312,54 @@ class DynamicWeChat(_PluginBase):
if ip_address != self._current_ip_address:
logger.info("检测到IP变化")
self._current_ip_address = ip_address
# self._ip_changed = False
return True
else:
return False
def try_connect_cc(self):
if self._use_cookiecloud:
if settings.COOKIECLOUD_KEY and settings.COOKIECLOUD_PASSWORD: # 使用设置里的cookieCloud
if settings.COOKIECLOUD_ENABLE_LOCAL:
self._cc_server = PyCookieCloud(url=self._server, uuid=settings.COOKIECLOUD_KEY,
password=settings.COOKIECLOUD_PASSWORD)
logger.info("使用内建CookieCloud服务器")
else: # 使用设置里的cookieCloud
self._cc_server = PyCookieCloud(url=settings.COOKIECLOUD_HOST, uuid=settings.COOKIECLOUD_KEY,
password=settings.COOKIECLOUD_PASSWORD)
logger.info("使用自定义CookieCloud服务器")
if not self._cc_server.check_connection():
self._cc_server = None
logger.error("没有可用的CookieCloud服务器")
else: # 未设置cookieCloud
self._cc_server = None
logger.error("没有配置CookieCloud的用户KEY和PASSWORD")
if not self._use_cookiecloud: # 不使用CookieCloud
self._cc_server = None
return
if not settings.COOKIECLOUD_KEY or not settings.COOKIECLOUD_PASSWORD: # 没有设置key和password
self._cc_server = None
logger.error("没有配置CookieCloud的用户KEY和PASSWORD")
return
if settings.COOKIECLOUD_ENABLE_LOCAL:
self._cc_server = PyCookieCloud(url=self._server, uuid=settings.COOKIECLOUD_KEY,
password=settings.COOKIECLOUD_PASSWORD)
logger.info("使用内建CookieCloud服务器")
else: # 使用设置里的cookieCloud
self._cc_server = PyCookieCloud(url=settings.COOKIECLOUD_HOST, uuid=settings.COOKIECLOUD_KEY,
password=settings.COOKIECLOUD_PASSWORD)
logger.info("使用自定义CookieCloud服务器")
if not self._cc_server.check_connection():
self._cc_server = None
logger.error("没有可用的CookieCloud服务器")
def get_ip_from_url(self, url):
try:
# 发送 GET 请求
response = requests.get(url)
# 检查响应状态码是否为 200
if response.status_code == 200:
# 解析响应 JSON 数据并获取 IP 地址
ip_address = re.search(self._ip_pattern, response.text)
if ip_address:
return ip_address.group()
else:
return "获取IP失败"
else:
return "获取IP失败"
except Exception as e:
if "104" in str(e):
pass
else:
logger.warning(f"{url} 获取IP失败,Error: {e}")
def get_ip_from_url(self, input_data) -> (str, str):
# 根据输入解析 URL 列表
if isinstance(input_data, str) and "||" in input_data:
_, url_list = input_data.split("||", 1)
urls = url_list.split(",")
elif isinstance(input_data, list):
urls = input_data
else:
urls = self._ip_urls
# 随机化 URL 列表
random.shuffle(urls)
for url in urls:
try:
response = requests.get(url, timeout=3)
if response.status_code == 200:
ip_address = re.search(self._ip_pattern, response.text)
if ip_address:
return url, ip_address.group() # 返回匹配的 IP 地址
except Exception as e:
if "104" not in str(e) or 'Read timed out' not in str(e): # 忽略网络波动,都失败会返回None, "获取IP失败"
logger.warning(f"{url} 获取IP失败, Error: {e}")
return None, "获取IP失败"
def find_qrc(self, page):
# 查找 iframe 元素并切换到它
@@ -395,30 +403,21 @@ class DynamicWeChat(_PluginBase):
time.sleep(3)
img_src, refuse_time = self.find_qrc(page)
if img_src:
if self._my_send:
result = self._my_send.send(title="企业微信登录二维码", image=img_src)
if result:
logger.info(f"二维码发送失败,原因:{result}")
browser.close()
return
logger.info("二维码已经发送,等待用户 90 秒内扫码登录")
# logger.info("如收到短信验证码请以?结束,发送到<企业微信应用> 如: 110301")
time.sleep(90) # 等待用户扫码
login_status = self.check_login_status(page, "")
if login_status:
self._update_cookie(page, context) # 刷新cookie
self.click_app_management_buttons(page)
else:
self._ip_changed = False
if self._my_send: # 统一逻辑,只有用户发送'/push_qr'才会发生二维码
self._ip_changed = False
self._send_cookie_false()
logger.info("已尝试发送cookie失效通知")
else:
self._ip_changed = False
logger.info("cookie已失效")
self._cookie_valid = False
logger.info("cookie已失效,且没有配置通知方式,本次修改可信IP失败")
else: # 如果直接进入企业微信
logger.info("尝试cookie登录")
login_status = self.check_login_status(page, "")
if login_status:
if self.check_login_status(page, ""):
self.click_app_management_buttons(page)
else:
logger.info("发生了意料之外的错误,请附上配置信息到github反馈")
self._send_cookie_false()
self._ip_changed = False
browser.close()
except Exception as e:
@@ -435,7 +434,7 @@ class DynamicWeChat(_PluginBase):
self.try_connect_cc() # 再尝试一次连接
if self._cc_server is None:
return
logger.info("使用二维码登录成功开始刷新cookie")
logger.info("使用二维码登录成功,开始刷新cookie")
try:
if not self._cc_server.check_connection():
logger.error("连接 CookieCloud 失败", self._server)
@@ -443,7 +442,8 @@ class DynamicWeChat(_PluginBase):
current_url = page.url
current_cookies = context.cookies(current_url) # 通过 context 获取 cookies
if current_cookies is None:
logger.error("无法获取当前 cookies")
logger.error("无法从内置浏览器获取 cookies")
self._cookie_valid = False
return
self._saved_cookie = current_cookies
formatted_cookies = {}
@@ -455,20 +455,25 @@ class DynamicWeChat(_PluginBase):
if domain not in formatted_cookies:
formatted_cookies[domain] = []
formatted_cookies[domain].append(cookie)
flag = self._cc_server.update_cookie(formatted_cookies)
if flag:
if self._cc_server.update_cookie(formatted_cookies):
logger.info("更新 CookieCloud 成功")
self._cookie_valid = True
self._is_special_upload = True
else:
self._send_cookie_false()
self._is_special_upload = False
logger.error("更新 CookieCloud 失败")
except Exception as e:
self._send_cookie_false()
self._is_special_upload = False
logger.error(f"CookieCloud更新 cookie 发生错误: {e}")
else:
try:
current_url = page.url
current_cookies = context.cookies(current_url) # 通过 context 获取 cookies
if current_cookies is None:
self._send_cookie_false()
logger.error("更新本地 Cookie失败")
return
else:
@@ -476,6 +481,7 @@ class DynamicWeChat(_PluginBase):
self._saved_cookie = current_cookies # 保存
self._cookie_valid = True
except Exception as e:
self._send_cookie_false()
logger.error(f"更新本地 cookie 发生错误: {e}")
def get_cookie(self):
@@ -483,28 +489,26 @@ class DynamicWeChat(_PluginBase):
return self._saved_cookie
try:
cookie_header = ''
if self._use_cookiecloud:
cookies, msg = self._cookiecloud.download()
if not cookies: # CookieCloud获取cookie失败
logger.error(f"CookieCloud获取cookie失败,失败原因:{msg}")
return
else:
for domain, cookie in cookies.items():
if domain == ".work.weixin.qq.com":
cookie_header = cookie
if '_upload_type=A' in cookie:
self._is_special_upload = True
else:
self._is_special_upload = False
break
if cookie_header == '':
cookie_header = self._cookie_header
else: # 不使用CookieCloud
if not self._use_cookiecloud:
return
cookies, msg = self._cookiecloud.download()
if not cookies: # CookieCloud获取cookie失败
logger.error(f"CookieCloud获取cookie失败,失败原因:{msg}")
return
for domain, cookie in cookies.items():
if domain == ".work.weixin.qq.com":
cookie_header = cookie
if '_upload_type=A' in cookie:
self._is_special_upload = True
else:
self._is_special_upload = False
break
if cookie_header == '':
cookie_header = self._cookie_header
cookie = self.parse_cookie_header(cookie_header)
return cookie
except Exception as e:
logger.error(f"从CookieCloud获取cookie错误错误原因:{e}")
logger.error(f"从CookieCloud获取cookie错误,错误原因:{e}")
return
@staticmethod
@@ -544,27 +548,22 @@ class DynamicWeChat(_PluginBase):
if not cookie_used and self._use_cookiecloud:
# logger.info("尝试从CookieCloud 获取新的 cookie")
cookie = self.get_cookie()
if cookie:
context.add_cookies(cookie)
page = context.new_page()
page.goto(self._wechatUrl)
time.sleep(3)
if self.check_login_status(page, task='refresh_cookie'):
# logger.info("新获取的 cookie 有效")
self._cookie_valid = True
self._saved_cookie = context.cookies() # 保存有效的 cookie
else:
# logger.warning("新获取的 cookie 无效")
self._cookie_valid = False
self._saved_cookie = None # 清空无效的 cookie
if self._my_send:
result = self._my_send.send(
title="cookie已失效请及时更新",
content="请在企业微信应用发送/push_qr 如有验证码以''结束发送到企业微信应用。 如果是使用微信通知请确保公网IP还没有变动",
image=None, force_send=False
)
if result:
logger.info(f"cookie失效通知发送失败原因{result}")
if not cookie:
self._send_cookie_false()
return
context.add_cookies(cookie)
page = context.new_page()
page.goto(self._wechatUrl)
time.sleep(3)
if self.check_login_status(page, task='refresh_cookie'):
# logger.info("新获取的 cookie 有效")
self._cookie_valid = True
self._saved_cookie = context.cookies() # 保存有效的 cookie
else:
# logger.warning("新获取的 cookie 无效")
self._send_cookie_false()
self._saved_cookie = None # 清空无效的 cookie
if self._cookie_valid:
if self._my_send:
self._my_send.reset_limit()
@@ -572,7 +571,7 @@ class DynamicWeChat(_PluginBase):
self._cookie_lifetime = PyCookieCloud.load_cookie_lifetime(self._settings_file_path)
browser.close()
except Exception as e:
self._cookie_valid = False
self._send_cookie_false()
self._saved_cookie = None # 异常时清空 cookie
logger.error(f"cookie 校验过程中发生异常: {e}")
@@ -601,7 +600,7 @@ class DynamicWeChat(_PluginBase):
if task == 'local_scanning':
time.sleep(6)
else:
logger.info("等待30秒请将短信验证码请以''结束发送到<企业微信应用> 如: 110301")
logger.info("等待30秒,请将短信验证码请以''结束,发送到<企业微信应用> 如: 110301")
time.sleep(30) # 多等30秒
if self._verification_code:
# logger.info("输入验证码:" + self._verification_code)
@@ -620,14 +619,15 @@ class DynamicWeChat(_PluginBase):
logger.error("未收到短信验证码")
return False
except Exception as e:
# logger.debug(str(e)) # 基于bug运行请不要将错误输出到日志
# try: # 没有登录成功也没有短信验证码
# logger.debug(str(e)) # 基于bug运行,请不要将错误输出到日志
# try: # 没有登录成功,也没有短信验证码
if self.find_qrc(
page) and not task == 'refresh_cookie' and not task == 'local_scanning': # 延长任务找到的二维码不会被发送所以不算用户没有扫码
page) and not task == 'refresh_cookie' and not task == 'local_scanning': # 延长任务找到的二维码不会被发送,所以不算用户没有扫码
logger.warning(f"用户没有扫描二维码")
return False
def click_app_management_buttons(self, page):
self._cookie_valid = True
bash_url = "https://work.weixin.qq.com/wework_admin/frame#apps/modApiApp/"
# 按钮的选择器和名称
buttons = [
@@ -637,6 +637,7 @@ class DynamicWeChat(_PluginBase):
"//div[contains(@class, 'js_show_ipConfig_dialog')]//a[contains(@class, '_mod_card_operationLink') and text()='配置']",
"配置")
]
_, self._current_ip_address = self.get_ip_from_url(self._input_id_list)
if "||" in self._input_id_list:
parts = self._input_id_list.split("||", 1)
input_id_list = parts[0]
@@ -645,8 +646,12 @@ class DynamicWeChat(_PluginBase):
id_list = input_id_list.split(",")
app_urls = [f"{bash_url}{app_id.strip()}" for app_id in id_list]
for app_url in app_urls:
page.goto(app_url) # 打开应用详情页
app_id = app_url.split("/")[-1]
if app_id.startswith("100000") and len(app_id) == 6:
self._ip_changed = False
logger.warning(f"请根据 https://github.com/RamenRa/MoviePilot-Plugins 的说明进行配置应用ID")
return
page.goto(app_url) # 打开应用详情页
time.sleep(2)
# 依次点击每个按钮
for xpath, name in buttons:
@@ -673,9 +678,9 @@ class DynamicWeChat(_PluginBase):
ip_parts = self._current_ip_address.split('.')
masked_ip = f"{ip_parts[0]}.{len(ip_parts[1]) * '*'}.{len(ip_parts[2]) * '*'}.{ip_parts[3]}"
if self._my_send:
result = self._my_send.send(title="更新可信IP成功",
content='应用: ' + app_id + ' 输入IP' + masked_ip,
force_send=True, diy_channel="WeChat")
self._my_send.send(title="更新可信IP成功",
content='应用: ' + app_id + ' 输入IP' + masked_ip,
force_send=True, diy_channel="WeChat")
def __update_config(self):
"""
@@ -700,7 +705,7 @@ class DynamicWeChat(_PluginBase):
def get_form(self) -> Tuple[List[dict], Dict[str, Any]]:
"""
拼装插件配置页面只保留必要的配置项并添加 token 配置。
拼装插件配置页面,只保留必要的配置项,并添加 token 配置。
"""
return [
{
@@ -851,7 +856,7 @@ class DynamicWeChat(_PluginBase):
'model': 'input_id_list',
'label': '[必填]应用ID',
'rows': 1,
'placeholder': '输入应用ID多个ID用英文逗号分隔。在企业微信应用页面URL末尾获取'
'placeholder': '输入应用ID,多个ID用英文逗号分隔。在企业微信应用页面URL末尾获取'
}
}
]
@@ -872,7 +877,7 @@ class DynamicWeChat(_PluginBase):
'props': {
'type': 'info',
'variant': 'tonal',
'text': '建议启用内建或自定义CookieCloud。支持微信、Server酱等第三方通知具体请查看作者主页'
'text': '建议启用内建或自定义CookieCloud。支持微信、Server酱等第三方通知,具体请查看作者主页'
}
}
]
@@ -892,7 +897,7 @@ class DynamicWeChat(_PluginBase):
'component': 'VAlert',
'props': {
'type': 'info',
'text': 'Cookie失效时通知用户用户使用/push_qr让插件推送二维码。使用第三方通知时填写对应Token/API'
'text': 'Cookie失效时通知用户,用户使用/push_qr让插件推送二维码。使用第三方通知时填写对应Token/API'
}
}
]
@@ -919,20 +924,20 @@ class DynamicWeChat(_PluginBase):
# 判断二维码是否过期
if current_time > self._future_timestamp:
vaild_text = "二维码已过期"
color = "#ff0000"
vaild_text = "二维码已过期或没有扫码任务"
color = "#ff0000" if self._enabled else "#bbbbbb"
self._qr_code_image = None
else:
# 二维码有效格式化过期时间为 年-月-日 时:分:秒
# 二维码有效,格式化过期时间为 年-月-日 时:分:秒
expiration_time = datetime.fromtimestamp(self._future_timestamp).strftime('%Y-%m-%d %H:%M:%S')
vaild_text = f"二维码有效过期时间: {expiration_time}"
vaild_text = f"二维码有效,过期时间: {expiration_time}"
color = "#32CD32"
# 如果self._qr_code_image为None返回提示信息
# 如果self._qr_code_image为None,返回提示信息
if self._qr_code_image is None:
img_component = {
"component": "div",
"text": "登录二维码都会在此展示二维码有6秒延时。 [适用于Docker版]",
"text": "登录二维码都会在此展示,二维码有6秒延时。 [适用于Docker版]",
"props": {
"style": {
"fontSize": "22px",
@@ -1063,17 +1068,16 @@ class DynamicWeChat(_PluginBase):
if self._my_send:
result = self._my_send.send("企业微信登录二维码", image=image_src)
if result:
logger.info(f"远程推送任务: 二维码发送失败原因:{result}")
logger.info(f"远程推送任务: 二维码发送失败,原因:{result}")
browser.close()
logger.info("----------------------本次任务结束----------------------")
return
logger.info("远程推送任务: 二维码已经发送,等待用户 90 秒内扫码登录")
# logger.info("远程推送任务: 如收到短信验证码请以?结束发送到<企业微信应用> 如: 110301")
logger.info("远程推送任务: 二维码发送成功,等待用户 90 秒内扫码登录。V2'微信通知'的用户,此消息并不准确")
# logger.info("远程推送任务: 如收到短信验证码请以?结束,发送到<企业微信应用> 如: 110301")
time.sleep(90)
login_status = self.check_login_status(page, 'push_qr_code')
if login_status:
if self.check_login_status(page, 'push_qr_code'):
self._update_cookie(page, context) # 刷新cookie
# logger.info("远程推送任务: 没有可用的CookieCloud服务器只修改可信IP")
# logger.info("远程推送任务: 没有可用的CookieCloud服务器,只修改可信IP")
self.click_app_management_buttons(page)
else:
logger.warning("远程推送任务: 没有找到可用的通知方式")
@@ -1125,7 +1129,7 @@ class DynamicWeChat(_PluginBase):
}]
"""
if self._enabled and self._cron:
logger.info(f"{self.plugin_name}定时服务启动时间间隔 {self._cron} ")
# logger.info(f"{self.plugin_name}定时服务启动,时间间隔 {self._cron} ")
return [{
"id": self.__class__.__name__,
"name": f"{self.plugin_name}服务",

View File

@@ -3,6 +3,128 @@ import requests
from app.modules.wechat import WeChat
from app.schemas.types import NotificationType,MessageChannel
import os
import json
import requests
import base64
import hashlib
from typing import Dict, Any
from Crypto import Random
from Crypto.Cipher import AES
def bytes_to_key(data: bytes, salt: bytes, output=48) -> bytes:
# 兼容v2 将bytes_to_key和encrypt导入
assert len(salt) == 8, len(salt)
data += salt
key = hashlib.md5(data).digest()
final_key = key
while len(final_key) < output:
key = hashlib.md5(key + data).digest()
final_key += key
return final_key[:output]
def encrypt(message: bytes, passphrase: bytes) -> bytes:
"""
CryptoJS 加密原文
This is a modified copy of https://stackoverflow.com/questions/36762098/how-to-decrypt-password-from-javascript-cryptojs-aes-encryptpassword-passphras
"""
salt = Random.new().read(8)
key_iv = bytes_to_key(passphrase, salt, 32 + 16)
key = key_iv[:32]
iv = key_iv[32:]
aes = AES.new(key, AES.MODE_CBC, iv)
length = 16 - (len(message) % 16)
data = message + (chr(length) * length).encode()
return base64.b64encode(b"Salted__" + salt + aes.encrypt(data))
class PyCookieCloud:
def __init__(self, url: str, uuid: str, password: str):
self.url: str = url
self.uuid: str = uuid
self.password: str = password
def check_connection(self) -> bool:
"""
Test the connection to the CookieCloud server.
:return: True if the connection is successful, False otherwise.
"""
try:
resp = requests.get(self.url, timeout=3) # 设置超时为3秒
return resp.status_code == 200
except Exception as e:
return False
def update_cookie(self, formatted_cookies: Dict[str, Any]) -> bool:
"""
Update cookie data to CookieCloud.
:param formatted_cookies: cookie value to update.
:return: if update success, return True, else return False.
"""
if '.work.weixin.qq.com' not in formatted_cookies:
formatted_cookies['.work.weixin.qq.com'] = []
formatted_cookies['.work.weixin.qq.com'].append({
'name': '_upload_type',
'value': 'A',
'domain': '.work.weixin.qq.com',
'path': '/',
'expires': -1,
'httpOnly': False,
'secure': False,
'sameSite': 'Lax'
})
cookie = {'cookie_data': formatted_cookies}
raw_data = json.dumps(cookie)
encrypted_data = encrypt(raw_data.encode('utf-8'), self.get_the_key().encode('utf-8')).decode('utf-8')
cookie_cloud_request = requests.post(self.url + '/update',
json={'uuid': self.uuid, 'encrypted': encrypted_data})
if cookie_cloud_request.status_code == 200:
if cookie_cloud_request.json().get('action') == 'done':
return True
return False
def get_the_key(self) -> str:
"""
Get the key used to encrypt and decrypt data.
:return: the key.
"""
md5 = hashlib.md5()
md5.update((self.uuid + '-' + self.password).encode('utf-8'))
return md5.hexdigest()[:16]
@staticmethod
def load_cookie_lifetime(settings_file: str = None): # 返回时间戳 单位秒
if os.path.exists(settings_file):
with open(settings_file, 'r') as file:
settings = json.load(file)
return settings.get('_cookie_lifetime', 0)
else:
return 0
@staticmethod
def save_cookie_lifetime(settings_file, cookie_lifetime): # 传入时间戳 单位秒
with open(settings_file, 'w') as file:
json.dump({'_cookie_lifetime': cookie_lifetime}, file)
@staticmethod
def increase_cookie_lifetime(settings_file, seconds: int):
if os.path.exists(settings_file):
with open(settings_file, 'r') as file:
settings = json.load(file)
current_lifetime = settings.get('_cookie_lifetime', 0)
else:
current_lifetime = 0
new_lifetime = current_lifetime + seconds
# 保存新的 _cookie_lifetime
PyCookieCloud.save_cookie_lifetime(settings_file, new_lifetime)
class MySender:
def __init__(self, token=None, func=None):
@@ -51,9 +173,7 @@ class MySender:
if result is None: # 成功时返回 None
return
except Exception as e:
# 打印错误日志或处理错误
return f"{channel} 通知错误: {e}"
# 切换到下一个通道
pass # 忽略单个错误,继续尝试下一个通道
self.current_index = (self.current_index + 1) % len(self.tokens)
return f"所有的通知方式都发送失败"
@@ -90,13 +210,14 @@ class MySender:
def _send_serverchan(self, title, content, image):
tmp_tokens = self.tokens[self.current_index]
token = tmp_tokens
if ',' in tmp_tokens:
before_comma, after_comma = tmp_tokens.split(',', 1)
if before_comma.startswith('sctp') and image:
token = after_comma # 图片发到公众号
else:
token = before_comma # 发到 server3
else:
token = tmp_tokens
if token.startswith('sctp'):
match = re.match(r'sctp(\d+)t', token)
@@ -156,7 +277,7 @@ class MySender:
def _send_v2_wechat(self, title, content, image, token):
"""V2 微信通知发送"""
if token and ',' in token:
channel, actual_userid = token.split(',', 1)
_, actual_userid = token.split(',', 1)
else:
actual_userid = None
self.post_message_func(

View File

@@ -1,121 +0,0 @@
import os
import json
import requests
import base64
import hashlib
from typing import Dict, Any
from Crypto import Random
from Crypto.Cipher import AES
def bytes_to_key(data: bytes, salt: bytes, output=48) -> bytes:
# 兼容v2 将bytes_to_key和encrypt导入
assert len(salt) == 8, len(salt)
data += salt
key = hashlib.md5(data).digest()
final_key = key
while len(final_key) < output:
key = hashlib.md5(key + data).digest()
final_key += key
return final_key[:output]
def encrypt(message: bytes, passphrase: bytes) -> bytes:
"""
CryptoJS 加密原文
This is a modified copy of https://stackoverflow.com/questions/36762098/how-to-decrypt-password-from-javascript-cryptojs-aes-encryptpassword-passphras
"""
salt = Random.new().read(8)
key_iv = bytes_to_key(passphrase, salt, 32 + 16)
key = key_iv[:32]
iv = key_iv[32:]
aes = AES.new(key, AES.MODE_CBC, iv)
length = 16 - (len(message) % 16)
data = message + (chr(length) * length).encode()
return base64.b64encode(b"Salted__" + salt + aes.encrypt(data))
class PyCookieCloud:
def __init__(self, url: str, uuid: str, password: str):
self.url: str = url
self.uuid: str = uuid
self.password: str = password
def check_connection(self) -> bool:
"""
Test the connection to the CookieCloud server.
:return: True if the connection is successful, False otherwise.
"""
try:
resp = requests.get(self.url, timeout=3) # 设置超时为3秒
return resp.status_code == 200
except Exception as e:
return False
def update_cookie(self, formatted_cookies: Dict[str, Any]) -> bool:
"""
Update cookie data to CookieCloud.
:param formatted_cookies: cookie value to update.
:return: if update success, return True, else return False.
"""
if '.work.weixin.qq.com' not in formatted_cookies:
formatted_cookies['.work.weixin.qq.com'] = []
formatted_cookies['.work.weixin.qq.com'].append({
'name': '_upload_type',
'value': 'A',
'domain': '.work.weixin.qq.com',
'path': '/',
'expires': -1,
'httpOnly': False,
'secure': False,
'sameSite': 'Lax'
})
cookie = {'cookie_data': formatted_cookies}
raw_data = json.dumps(cookie)
encrypted_data = encrypt(raw_data.encode('utf-8'), self.get_the_key().encode('utf-8')).decode('utf-8')
cookie_cloud_request = requests.post(self.url + '/update',
json={'uuid': self.uuid, 'encrypted': encrypted_data})
if cookie_cloud_request.status_code == 200:
if cookie_cloud_request.json().get('action') == 'done':
return True
return False
def get_the_key(self) -> str:
"""
Get the key used to encrypt and decrypt data.
:return: the key.
"""
md5 = hashlib.md5()
md5.update((self.uuid + '-' + self.password).encode('utf-8'))
return md5.hexdigest()[:16]
@staticmethod
def load_cookie_lifetime(settings_file: str = None): # 返回时间戳 单位秒
if os.path.exists(settings_file):
with open(settings_file, 'r') as file:
settings = json.load(file)
return settings.get('_cookie_lifetime', 0)
else:
return 0
@staticmethod
def save_cookie_lifetime(settings_file, cookie_lifetime): # 传入时间戳 单位秒
with open(settings_file, 'w') as file:
json.dump({'_cookie_lifetime': cookie_lifetime}, file)
@staticmethod
def increase_cookie_lifetime(settings_file, seconds: int):
if os.path.exists(settings_file):
with open(settings_file, 'r') as file:
settings = json.load(file)
current_lifetime = settings.get('_cookie_lifetime', 0)
else:
current_lifetime = 0
new_lifetime = current_lifetime + seconds
# 保存新的 _cookie_lifetime
PyCookieCloud.save_cookie_lifetime(settings_file, new_lifetime)