Merge pull request #521 from RamenRa/master

This commit is contained in:
jxxghp
2024-10-28 06:55:12 +08:00
committed by GitHub
3 changed files with 107 additions and 66 deletions

View File

@@ -860,11 +860,13 @@
"name": "修改企业微信可信IP",
"description": "优先使用cookie可本地扫码刷新Cookie当填写两个第三方token时可手机远程更新cookie。",
"labels": "消息通知",
"version": "1.2.0",
"version": "1.3.0",
"icon": "Wecom_A.png",
"author": "RamenRa",
"level": 2,
"v2": true,
"history": {
"v1.3.0": "兼容v2操作cookie前检查一次CookieCloud",
"v1.2.0": "远程命令/push_qr立即推送一次二维码到pushplus。添加<本地扫码刷新cookie>",
"v1.1.5": "将chromium运行设置为headless模式",
"v1.1.4": "放弃self.post_message()的消息推送还原成send_pushplus_message()",

View File

@@ -30,7 +30,7 @@ class DynamicWeChat(_PluginBase):
# 插件图标
plugin_icon = "Wecom_A.png"
# 插件版本
plugin_version = "1.2.0"
plugin_version = "1.3.0"
# 插件作者
plugin_author = "RamenRa"
# 作者主页
@@ -79,7 +79,6 @@ class DynamicWeChat(_PluginBase):
# 过期时间
_future_timestamp = 0
# -------cookie add------------
# cookie有效检测
# _cookie_valid = False
# 使用CookieCloud开关
@@ -89,30 +88,24 @@ class DynamicWeChat(_PluginBase):
# 登录cookie
_cookie_header = ""
_server = f'http://localhost:{settings.NGINX_PORT}/cookiecloud'
# -------cookie END------------
_cookiecloud = CookieCloudHelper()
# 定时器
_scheduler: Optional[BackgroundScheduler] = None
def init_plugin(self, config: dict = None):
self._server = f'http://localhost:{settings.NGINX_PORT}/cookiecloud'
# 清空配置
# self._wechatUrl = 'https://work.weixin.qq.com/wework_admin/loginpage_wx?from=myhome'
# self._urls = []
self._server = f'http://localhost:{settings.NGINX_PORT}/cookiecloud'
self._helloimg_s_token = ''
self._pushplus_token = ''
self._ip_changed = True
self._forced_update = False
# self._cookie_valid = False
self._use_cookiecloud = True
self._local_scan = False
self._input_id_list = ''
self._cookie_header = ""
self._cookie_from_CC = ""
self._current_ip_address = self.get_ip_from_url(self._ip_urls[0])
# logger.info(f"当前公网 IP: {self._current_ip_address}")
# logger.info(f"server host: {self._server} _uuid: {settings.COOKIECLOUD_KEY} _password: {settings.COOKIECLOUD_PASSWORD}")
if config:
self._enabled = config.get("enabled")
self._cron = config.get("cron")
@@ -127,22 +120,11 @@ class DynamicWeChat(_PluginBase):
self._use_cookiecloud = config.get("use_cookiecloud")
self._cookie_header = config.get("cookie_header")
self._ip_changed = config.get("ip_changed")
if self._use_cookiecloud:
if settings.COOKIECLOUD_ENABLE_LOCAL:
self._cc_server = PyCookieCloud(url=self._server, uuid=settings.COOKIECLOUD_KEY,
password=settings.COOKIECLOUD_PASSWORD)
logger.info("使用内建CookieCloud服务器")
else: # 使用设置里的cookieCloud
self._cc_server = PyCookieCloud(url=settings.COOKIECLOUD_HOST, uuid=settings.COOKIECLOUD_KEY,
password=settings.COOKIECLOUD_PASSWORD)
logger.info("使用自定义CookieCloud服务器")
if not self._cc_server.check_connection():
self._cc_server = None
logger.error("没有可用的CookieCloud服务器")
self.try_connect_cc()
# 停止现有任务
self.stop_service()
if self._enabled or self._onlyonce:
if self._enabled or self._onlyonce and self._input_id_list:
# 定时服务
self._scheduler = BackgroundScheduler(timezone=settings.TZ)
# 运行一次定时服务
@@ -199,12 +181,11 @@ class DynamicWeChat(_PluginBase):
page.goto(self._wechatUrl)
time.sleep(3) # 页面加载等待时间
current_time = datetime.now()
future_time = current_time + timedelta(seconds=110)
self._future_timestamp = int(future_time.timestamp())
if self.find_qrc(page):
logger.info("请<重新进入!>插件面板扫码!每20秒检查登录状态最大尝试5次")
current_time = datetime.now()
future_time = current_time + timedelta(seconds=110)
self._future_timestamp = int(future_time.timestamp())
logger.info("请重新进入插件面板扫码!每20秒检查登录状态最大尝试5次")
max_attempts = 5
attempt = 0
while attempt < max_attempts:
@@ -212,13 +193,14 @@ class DynamicWeChat(_PluginBase):
# logger.info(f"第 {attempt} 次检查登录状态...")
time.sleep(20) # 每20秒检查一次
if self.check_login_status(page, task='local_scanning'):
logger.info("登录成功更新cookie")
self._update_cookie(page, context) # 刷新cookie
self.click_app_management_buttons(page)
break
else:
logger.info("未检测到登录,任务结束")
else:
logger.info("未找到二维码,任务结束")
logger.info("----------------------本次任务结束----------------------")
browser.close()
except Exception as e:
logger.error(f"本地扫码任务: 本地扫码失败: {e}")
@@ -248,10 +230,10 @@ class DynamicWeChat(_PluginBase):
# logger.info("检测公网IP完毕")
logger.info("----------------------本次任务结束----------------------")
if event:
self.post_message(channel=event.event_data.get("channel"),
title="检测公网IP完毕",
userid=event.event_data.get("user"))
# if event:
# self.post_message(channel=event.event_data.get("channel"),
# title="检测公网IP完毕",
# userid=event.event_data.get("user"))
def CheckIP(self):
retry_urls = random.sample(self._ip_urls, len(self._ip_urls))
@@ -286,6 +268,24 @@ class DynamicWeChat(_PluginBase):
else:
return False
def try_connect_cc(self):
if self._use_cookiecloud:
if settings.COOKIECLOUD_KEY and settings.COOKIECLOUD_PASSWORD: # 使用设置里的cookieCloud
if settings.COOKIECLOUD_ENABLE_LOCAL:
self._cc_server = PyCookieCloud(url=self._server, uuid=settings.COOKIECLOUD_KEY,
password=settings.COOKIECLOUD_PASSWORD)
logger.info("使用内建CookieCloud服务器")
else: # 使用设置里的cookieCloud
self._cc_server = PyCookieCloud(url=settings.COOKIECLOUD_HOST, uuid=settings.COOKIECLOUD_KEY,
password=settings.COOKIECLOUD_PASSWORD)
logger.info("使用自定义CookieCloud服务器")
if not self._cc_server.check_connection():
self._cc_server = None
logger.error("没有可用的CookieCloud服务器")
else: # 未设置cookieCloud
self._cc_server = None
logger.error("没有配置CookieCloud的用户KEY和PASSWORD")
def get_ip_from_url(self, url):
try:
# 发送 GET 请求
@@ -301,8 +301,10 @@ class DynamicWeChat(_PluginBase):
else:
return "获取IP失败"
except Exception as e:
logger.warning(f"{url}获取IP失败,Error: {e}")
# return "获取IP失败"
if "104" in str(e):
pass
else:
logger.warning(f"{url} 获取IP失败,Error: {e}")
def find_qrc(self, page):
# 查找 iframe 元素并切换到它
@@ -395,35 +397,44 @@ class DynamicWeChat(_PluginBase):
pass
def _update_cookie(self, page, context):
self._future_timestamp = 0 # 标记二维码失效
if not self._cc_server.check_connection: # 连接失败返回 False
self.try_connect_cc() # 再尝试一次连接
if self._cc_server is None:
return
if self._use_cookiecloud and self._cc_server:
logger.info("使用二维码登录成功开始刷新cookie")
try:
# logger.info("debug 开始连接CookieCloud")
if self._cc_server.check_connection():
# logger.info("成功连接CookieCloud")
current_url = page.url
current_cookies = context.cookies(current_url) # 通过 context 获取 cookies
# logger.info("原始 cookies", current_cookies)
if current_cookies is None:
logger.error("无法获取当前 cookies")
return
formatted_cookies = {}
for cookie in current_cookies:
domain = cookie['domain']
domain = cookie.get('domain') # 使用 get() 方法避免 KeyError
if domain is None:
continue # 跳过没有 domain 的 cookie
if domain not in formatted_cookies:
formatted_cookies[domain] = []
formatted_cookies[domain].append(cookie)
flag = self._cc_server.update_cookie({'cookie_data': formatted_cookies})
if flag:
logger.info("更新CookieCloud成功")
logger.info("更新 CookieCloud 成功")
else:
logger.error("更新CookieCloud失败")
logger.error("更新 CookieCloud 失败")
else:
logger.error("连接CookieCloud失败", self._server, settings.COOKIECLOUD_KEY,
settings.COOKIECLOUD_PASSWORD)
logger.error("连接 CookieCloud 失败", self._server)
except Exception as e:
logger.error(f"更新cookie发生错误: {e}")
logger.error(
f"更新 cookie 发生错误: {e}")
else:
logger.error("CookieCloud配置错误, 不刷新cookie")
logger.error("CookieCloud 配置错误, 不刷新 cookie")
# ----------cookie addd-----------------
def get_cookie(self): # 只有从CookieCloud获取cookie成功才返回True
try:
cookie_header = ''
@@ -505,10 +516,13 @@ class DynamicWeChat(_PluginBase):
# 在这里使用更安全的方式来检查元素是否存在
captcha_panel = page.wait_for_selector('.receive_captcha_panel', timeout=5000) # 检查验证码面板
if captcha_panel: # 出现了短信验证界面
logger.info("等待30秒请将短信验证码请以''结束,发送到<企业微信应用> 如: 110301")
time.sleep(30) # 多等30秒
if task == 'local_scanning':
time.sleep(6)
else:
logger.info("等待30秒请将短信验证码请以''结束,发送到<企业微信应用> 如: 110301")
time.sleep(30) # 多等30秒
if self._verification_code:
logger.info("输入验证码:" + self._verification_code)
# logger.info("输入验证码:" + self._verification_code)
for digit in self._verification_code:
page.keyboard.press(digit)
time.sleep(0.3) # 每个数字之间添加少量间隔以确保输入顺利
@@ -654,7 +668,6 @@ class DynamicWeChat(_PluginBase):
"helloimg_s_token": self._helloimg_s_token,
"pushplus_token": self._pushplus_token,
"input_id_list": self._input_id_list,
# "standalone_chrome_address": self._diy_server,
"cookie_from_CC": self._cookie_from_CC,
"cookie_header": self._cookie_header,
"use_cookiecloud": self._use_cookiecloud,
@@ -755,7 +768,7 @@ class DynamicWeChat(_PluginBase):
'component': 'VSwitch',
'props': {
'model': 'local_scan',
'label': '本地扫码刷新Cookie',
'label': '扫码刷新Cookie和改IP',
}
}
]
@@ -912,6 +925,7 @@ class DynamicWeChat(_PluginBase):
if current_time > self._future_timestamp:
vaild_text = "二维码已过期"
color = "#ff0000"
self._qr_code_image = None
else:
# 二维码有效,格式化过期时间为 年-月-日 时:分:秒
expiration_time = datetime.fromtimestamp(self._future_timestamp).strftime('%Y-%m-%d %H:%M:%S')
@@ -922,7 +936,7 @@ class DynamicWeChat(_PluginBase):
if self._qr_code_image is None:
img_component = {
"component": "div",
"text": "本地扫码刷新cookie任务未运行",
"text": "所有的登录二维码都会在此展示,有效时间仅对应‘本地扫码功能’",
"props": {
"style": {
"fontSize": "22px",
@@ -984,7 +998,7 @@ class DynamicWeChat(_PluginBase):
}
]
},
img_component # 添加二维码图片或提示信息
img_component # 二维码图片
]
return base_content
@@ -1017,10 +1031,8 @@ class DynamicWeChat(_PluginBase):
time.sleep(90)
login_status = self.check_login_status(page, 'push_qr_code')
if login_status:
if self._use_cookiecloud and self._cc_server:
self._update_cookie(page, context) # 刷新cookie
else:
logger.info("远程推送任务: 没有可用的CookieCloud服务器只修改可信IP")
self._update_cookie(page, context) # 刷新cookie
logger.info("远程推送任务: 没有可用的CookieCloud服务器只修改可信IP")
self.click_app_management_buttons(page)
else:
logger.warning("远程推送任务: 未配置pushplus_token和helloimg_s_token")
@@ -1092,4 +1104,6 @@ class DynamicWeChat(_PluginBase):
self._scheduler.shutdown()
self._scheduler = None
except Exception as e:
logger.error(str(e))
logger.error(str(e))

View File

@@ -1,10 +1,39 @@
import hashlib
from typing import Dict, Any
import json
import requests
from app.utils.common import encrypt
import base64
import hashlib
from Crypto import Random
from Crypto.Cipher import AES
def bytes_to_key(data: bytes, salt: bytes, output=48) -> bytes:
# 兼容v2 将bytes_to_key和encrypt导入
assert len(salt) == 8, len(salt)
data += salt
key = hashlib.md5(data).digest()
final_key = key
while len(final_key) < output:
key = hashlib.md5(key + data).digest()
final_key += key
return final_key[:output]
def encrypt(message: bytes, passphrase: bytes) -> bytes:
"""
CryptoJS 加密原文
This is a modified copy of https://stackoverflow.com/questions/36762098/how-to-decrypt-password-from-javascript-cryptojs-aes-encryptpassword-passphras
"""
salt = Random.new().read(8)
key_iv = bytes_to_key(passphrase, salt, 32 + 16)
key = key_iv[:32]
iv = key_iv[32:]
aes = AES.new(key, AES.MODE_CBC, iv)
length = 16 - (len(message) % 16)
data = message + (chr(length) * length).encode()
return base64.b64encode(b"Salted__" + salt + aes.encrypt(data))
class PyCookieCloud:
def __init__(self, url: str, uuid: str, password: str):
self.url: str = url
@@ -18,13 +47,9 @@ class PyCookieCloud:
:return: True if the connection is successful, False otherwise.
"""
try:
resp = requests.get(self.url)
if resp.status_code == 200:
return True
else:
return False
resp = requests.get(self.url, timeout=3) # 设置超时为3秒
return resp.status_code == 200
except Exception as e:
print(str(e))
return False
def update_cookie(self, cookie: Dict[str, Any]) -> bool: