兼容v2,操作cookie前检查一次CookieCloud

This commit is contained in:
ramen
2024-10-27 23:26:21 +08:00
parent e1cfd40af5
commit 9f283fb3f9
3 changed files with 85 additions and 52 deletions

View File

@@ -860,11 +860,13 @@
"name": "修改企业微信可信IP",
"description": "优先使用cookie可本地扫码刷新Cookie当填写两个第三方token时可手机远程更新cookie。",
"labels": "消息通知",
"version": "1.2.0",
"version": "1.3.0",
"icon": "Wecom_A.png",
"author": "RamenRa",
"level": 2,
"v2": true,
"history": {
"v1.3.0": "兼容v2操作cookie前检查一次CookieCloud",
"v1.2.0": "远程命令/push_qr立即推送一次二维码到pushplus。添加<本地扫码刷新cookie>",
"v1.1.5": "将chromium运行设置为headless模式",
"v1.1.4": "放弃self.post_message()的消息推送还原成send_pushplus_message()",

View File

@@ -30,7 +30,7 @@ class DynamicWeChat(_PluginBase):
# 插件图标
plugin_icon = "Wecom_A.png"
# 插件版本
plugin_version = "1.2.0"
plugin_version = "1.3.0"
# 插件作者
plugin_author = "RamenRa"
# 作者主页
@@ -96,23 +96,18 @@ class DynamicWeChat(_PluginBase):
_scheduler: Optional[BackgroundScheduler] = None
def init_plugin(self, config: dict = None):
self._server = f'http://localhost:{settings.NGINX_PORT}/cookiecloud'
# 清空配置
# self._wechatUrl = 'https://work.weixin.qq.com/wework_admin/loginpage_wx?from=myhome'
# self._urls = []
self._server = f'http://localhost:{settings.NGINX_PORT}/cookiecloud'
self._helloimg_s_token = ''
self._pushplus_token = ''
self._ip_changed = True
self._forced_update = False
# self._cookie_valid = False
self._use_cookiecloud = True
self._local_scan = False
self._input_id_list = ''
self._cookie_header = ""
self._cookie_from_CC = ""
self._current_ip_address = self.get_ip_from_url(self._ip_urls[0])
# logger.info(f"当前公网 IP: {self._current_ip_address}")
# logger.info(f"server host: {self._server} _uuid: {settings.COOKIECLOUD_KEY} _password: {settings.COOKIECLOUD_PASSWORD}")
if config:
self._enabled = config.get("enabled")
self._cron = config.get("cron")
@@ -127,22 +122,11 @@ class DynamicWeChat(_PluginBase):
self._use_cookiecloud = config.get("use_cookiecloud")
self._cookie_header = config.get("cookie_header")
self._ip_changed = config.get("ip_changed")
if self._use_cookiecloud:
if settings.COOKIECLOUD_ENABLE_LOCAL:
self._cc_server = PyCookieCloud(url=self._server, uuid=settings.COOKIECLOUD_KEY,
password=settings.COOKIECLOUD_PASSWORD)
logger.info("使用内建CookieCloud服务器")
else: # 使用设置里的cookieCloud
self._cc_server = PyCookieCloud(url=settings.COOKIECLOUD_HOST, uuid=settings.COOKIECLOUD_KEY,
password=settings.COOKIECLOUD_PASSWORD)
logger.info("使用自定义CookieCloud服务器")
if not self._cc_server.check_connection():
self._cc_server = None
logger.error("没有可用的CookieCloud服务器")
self.try_connect_cc()
# 停止现有任务
self.stop_service()
if self._enabled or self._onlyonce:
if self._enabled or self._onlyonce and self._input_id_list:
# 定时服务
self._scheduler = BackgroundScheduler(timezone=settings.TZ)
# 运行一次定时服务
@@ -199,12 +183,11 @@ class DynamicWeChat(_PluginBase):
page.goto(self._wechatUrl)
time.sleep(3) # 页面加载等待时间
current_time = datetime.now()
future_time = current_time + timedelta(seconds=110)
self._future_timestamp = int(future_time.timestamp())
if self.find_qrc(page):
logger.info("请<重新进入!>插件面板扫码!每20秒检查登录状态最大尝试5次")
current_time = datetime.now()
future_time = current_time + timedelta(seconds=110)
self._future_timestamp = int(future_time.timestamp())
logger.info("请重新进入插件面板扫码!每20秒检查登录状态最大尝试5次")
max_attempts = 5
attempt = 0
while attempt < max_attempts:
@@ -212,13 +195,14 @@ class DynamicWeChat(_PluginBase):
# logger.info(f"第 {attempt} 次检查登录状态...")
time.sleep(20) # 每20秒检查一次
if self.check_login_status(page, task='local_scanning'):
logger.info("登录成功更新cookie")
self._update_cookie(page, context) # 刷新cookie
self.click_app_management_buttons(page)
break
else:
logger.info("未检测到登录,任务结束")
else:
logger.info("未找到二维码,任务结束")
logger.info("----------------------本次任务结束----------------------")
browser.close()
except Exception as e:
logger.error(f"本地扫码任务: 本地扫码失败: {e}")
@@ -248,10 +232,10 @@ class DynamicWeChat(_PluginBase):
# logger.info("检测公网IP完毕")
logger.info("----------------------本次任务结束----------------------")
if event:
self.post_message(channel=event.event_data.get("channel"),
title="检测公网IP完毕",
userid=event.event_data.get("user"))
# if event:
# self.post_message(channel=event.event_data.get("channel"),
# title="检测公网IP完毕",
# userid=event.event_data.get("user"))
def CheckIP(self):
retry_urls = random.sample(self._ip_urls, len(self._ip_urls))
@@ -286,6 +270,20 @@ class DynamicWeChat(_PluginBase):
else:
return False
def try_connect_cc(self):
if self._use_cookiecloud:
if settings.COOKIECLOUD_ENABLE_LOCAL:
self._cc_server = PyCookieCloud(url=self._server, uuid=settings.COOKIECLOUD_KEY,
password=settings.COOKIECLOUD_PASSWORD)
logger.info("使用内建CookieCloud服务器")
else: # 使用设置里的cookieCloud
self._cc_server = PyCookieCloud(url=settings.COOKIECLOUD_HOST, uuid=settings.COOKIECLOUD_KEY,
password=settings.COOKIECLOUD_PASSWORD)
logger.info("使用自定义CookieCloud服务器")
if not self._cc_server.check_connection():
self._cc_server = None
logger.error("没有可用的CookieCloud服务器")
def get_ip_from_url(self, url):
try:
# 发送 GET 请求
@@ -301,8 +299,10 @@ class DynamicWeChat(_PluginBase):
else:
return "获取IP失败"
except Exception as e:
logger.warning(f"{url}获取IP失败,Error: {e}")
# return "获取IP失败"
if "104" in str(e):
pass
else:
logger.warning(f"{url} 获取IP失败,Error: {e}")
def find_qrc(self, page):
# 查找 iframe 元素并切换到它
@@ -395,6 +395,12 @@ class DynamicWeChat(_PluginBase):
pass
def _update_cookie(self, page, context):
self._future_timestamp = 0 # 标记二维码失效
if not self._cc_server.check_connection: # 连接失败返回 False
self.try_connect_cc() # 再尝试一次连接
if self._cc_server is None:
return
if self._use_cookiecloud and self._cc_server:
logger.info("使用二维码登录成功开始刷新cookie")
try:
@@ -416,14 +422,12 @@ class DynamicWeChat(_PluginBase):
else:
logger.error("更新CookieCloud失败")
else:
logger.error("连接CookieCloud失败", self._server, settings.COOKIECLOUD_KEY,
settings.COOKIECLOUD_PASSWORD)
logger.error("连接CookieCloud失败", self._server)
except Exception as e:
logger.error(f"更新cookie发生错误: {e}")
else:
logger.error("CookieCloud配置错误, 不刷新cookie")
# ----------cookie addd-----------------
def get_cookie(self): # 只有从CookieCloud获取cookie成功才返回True
try:
cookie_header = ''
@@ -508,7 +512,7 @@ class DynamicWeChat(_PluginBase):
logger.info("等待30秒请将短信验证码请以''结束,发送到<企业微信应用> 如: 110301")
time.sleep(30) # 多等30秒
if self._verification_code:
logger.info("输入验证码:" + self._verification_code)
# logger.info("输入验证码:" + self._verification_code)
for digit in self._verification_code:
page.keyboard.press(digit)
time.sleep(0.3) # 每个数字之间添加少量间隔以确保输入顺利
@@ -654,7 +658,6 @@ class DynamicWeChat(_PluginBase):
"helloimg_s_token": self._helloimg_s_token,
"pushplus_token": self._pushplus_token,
"input_id_list": self._input_id_list,
# "standalone_chrome_address": self._diy_server,
"cookie_from_CC": self._cookie_from_CC,
"cookie_header": self._cookie_header,
"use_cookiecloud": self._use_cookiecloud,
@@ -755,7 +758,7 @@ class DynamicWeChat(_PluginBase):
'component': 'VSwitch',
'props': {
'model': 'local_scan',
'label': '本地扫码刷新Cookie',
'label': '扫码刷新Cookie和改IP',
}
}
]
@@ -912,6 +915,7 @@ class DynamicWeChat(_PluginBase):
if current_time > self._future_timestamp:
vaild_text = "二维码已过期"
color = "#ff0000"
self._qr_code_image = None
else:
# 二维码有效,格式化过期时间为 年-月-日 时:分:秒
expiration_time = datetime.fromtimestamp(self._future_timestamp).strftime('%Y-%m-%d %H:%M:%S')
@@ -922,7 +926,7 @@ class DynamicWeChat(_PluginBase):
if self._qr_code_image is None:
img_component = {
"component": "div",
"text": "本地扫码刷新cookie任务未运行",
"text": "所有的登录二维码都会在此展示,有效时间仅对应‘本地扫码功能’",
"props": {
"style": {
"fontSize": "22px",
@@ -984,7 +988,7 @@ class DynamicWeChat(_PluginBase):
}
]
},
img_component # 添加二维码图片或提示信息
img_component # 二维码图片
]
return base_content
@@ -1017,10 +1021,8 @@ class DynamicWeChat(_PluginBase):
time.sleep(90)
login_status = self.check_login_status(page, 'push_qr_code')
if login_status:
if self._use_cookiecloud and self._cc_server:
self._update_cookie(page, context) # 刷新cookie
else:
logger.info("远程推送任务: 没有可用的CookieCloud服务器只修改可信IP")
self._update_cookie(page, context) # 刷新cookie
logger.info("远程推送任务: 没有可用的CookieCloud服务器只修改可信IP")
self.click_app_management_buttons(page)
else:
logger.warning("远程推送任务: 未配置pushplus_token和helloimg_s_token")
@@ -1092,4 +1094,6 @@ class DynamicWeChat(_PluginBase):
self._scheduler.shutdown()
self._scheduler = None
except Exception as e:
logger.error(str(e))
logger.error(str(e))

View File

@@ -2,9 +2,39 @@ import hashlib
from typing import Dict, Any
import json
import requests
from app.utils.common import encrypt
import base64
from hashlib import md5
from Crypto import Random
from Crypto.Cipher import AES
def bytes_to_key(data: bytes, salt: bytes, output=48) -> bytes:
# 兼容v2 将bytes_to_key和encrypt导入
assert len(salt) == 8, len(salt)
data += salt
key = md5(data).digest()
final_key = key
while len(final_key) < output:
key = md5(key + data).digest()
final_key += key
return final_key[:output]
def encrypt(message: bytes, passphrase: bytes) -> bytes:
"""
CryptoJS 加密原文
This is a modified copy of https://stackoverflow.com/questions/36762098/how-to-decrypt-password-from-javascript-cryptojs-aes-encryptpassword-passphras
"""
salt = Random.new().read(8)
key_iv = bytes_to_key(passphrase, salt, 32 + 16)
key = key_iv[:32]
iv = key_iv[32:]
aes = AES.new(key, AES.MODE_CBC, iv)
length = 16 - (len(message) % 16)
data = message + (chr(length) * length).encode()
return base64.b64encode(b"Salted__" + salt + aes.encrypt(data))
class PyCookieCloud:
def __init__(self, url: str, uuid: str, password: str):
self.url: str = url
@@ -18,11 +48,8 @@ class PyCookieCloud:
:return: True if the connection is successful, False otherwise.
"""
try:
resp = requests.get(self.url)
if resp.status_code == 200:
return True
else:
return False
resp = requests.get(self.url, timeout=3) # 设置超时为3秒
return resp.status_code == 200
except Exception as e:
print(str(e))
return False