修复强制更改IP时配置面板延时过长的问题

This commit is contained in:
ramen
2024-11-06 17:06:27 +08:00
parent 1d8bec5db2
commit a8de218346
4 changed files with 113 additions and 177 deletions

View File

@@ -863,12 +863,13 @@
"name": "修改企业微信可信IP",
"description": "优先使用cookie可本地扫码刷新Cookie当填写两个第三方token时可手机远程更新cookie。",
"labels": "消息通知",
"version": "1.3.1",
"version": "1.4.0",
"icon": "Wecom_A.png",
"author": "RamenRa",
"level": 2,
"v2": true,
"history": {
"v1.4.0": "修复强制更改IP时配置面板延时过长的问题。庆祝v2进入正式版显示了一个没用的参数",
"v1.3.1": "修正一些逻辑判断修改ip成功会通知一次",
"v1.3.0": "兼容v2操作cookie前检查一次CookieCloud",
"v1.2.0": "远程命令/push_qr立即推送一次二维码到pushplus。添加<本地扫码刷新cookie>",

View File

@@ -30,7 +30,7 @@ class DynamicWeChat(_PluginBase):
# 插件图标
plugin_icon = "Wecom_A.png"
# 插件版本
plugin_version = "1.3.1"
plugin_version = "1.4.0"
# 插件作者
plugin_author = "RamenRa"
# 作者主页
@@ -80,11 +80,11 @@ class DynamicWeChat(_PluginBase):
_future_timestamp = 0
# cookie有效检测
# _cookie_valid = False
_cookie_valid = True
# cookie存活时间
_cookie_lifetime = 0
# 使用CookieCloud开关
_use_cookiecloud = True
# 从CookieCloud获取的cookie
_cookie_from_CC = ""
# 登录cookie
_cookie_header = ""
_server = f'http://localhost:{settings.NGINX_PORT}/cookiecloud'
@@ -95,7 +95,6 @@ class DynamicWeChat(_PluginBase):
def init_plugin(self, config: dict = None):
# 清空配置
self._server = f'http://localhost:{settings.NGINX_PORT}/cookiecloud'
self._helloimg_s_token = ''
self._pushplus_token = ''
self._ip_changed = True
@@ -104,8 +103,8 @@ class DynamicWeChat(_PluginBase):
self._local_scan = False
self._input_id_list = ''
self._cookie_header = ""
self._cookie_from_CC = ""
self._current_ip_address = self.get_ip_from_url(self._ip_urls[0])
self._cookie_lifetime = PyCookieCloud.load_cookie_lifetime()
if config:
self._enabled = config.get("enabled")
self._cron = config.get("cron")
@@ -114,13 +113,11 @@ class DynamicWeChat(_PluginBase):
self._current_ip_address = config.get("current_ip_address")
self._pushplus_token = config.get("pushplus_token")
self._helloimg_s_token = config.get("helloimg_s_token")
self._cookie_from_CC = config.get("cookie_from_CC")
self._forced_update = config.get("forced_update")
self._local_scan = config.get("local_scan")
self._use_cookiecloud = config.get("use_cookiecloud")
self._cookie_header = config.get("cookie_header")
self._ip_changed = config.get("ip_changed")
self.try_connect_cc()
# 停止现有任务
self.stop_service()
@@ -128,7 +125,7 @@ class DynamicWeChat(_PluginBase):
# 定时服务
self._scheduler = BackgroundScheduler(timezone=settings.TZ)
# 运行一次定时服务
if self._onlyonce or self._forced_update:
if self._onlyonce:
logger.info("立即检测公网IP")
self._scheduler.add_job(func=self.check, trigger='date',
run_date=datetime.now(tz=pytz.timezone(settings.TZ)) + timedelta(seconds=3),
@@ -136,6 +133,12 @@ class DynamicWeChat(_PluginBase):
# 关闭一次性开关
self._onlyonce = False
if self._forced_update:
self._scheduler.add_job(func=self.forced_change, trigger='date',
run_date=datetime.now(tz=pytz.timezone(settings.TZ)) + timedelta(seconds=3),
name="强制更新公网IP") # 添加任务
self._forced_update = False
if self._local_scan:
self._scheduler.add_job(func=self.local_scanning, trigger='date',
run_date=datetime.now(tz=pytz.timezone(settings.TZ)) + timedelta(seconds=3),
@@ -155,11 +158,24 @@ class DynamicWeChat(_PluginBase):
if self._scheduler.get_jobs():
self._scheduler.print_jobs()
self._scheduler.start()
if self._forced_update:
time.sleep(4)
self._forced_update = False
self.__update_config()
@eventmanager.register(EventType.PluginAction)
def forced_change(self, event: Event = None):
"""
强制修改IP
"""
if not self._enabled:
logger.error("插件未开启")
return
if event:
event_data = event.event_data
if not event_data or event_data.get("action") != "dynamicwechat":
return
self.ChangeIP()
self.__update_config()
logger.info("----------------------本次任务结束----------------------")
@eventmanager.register(EventType.PluginAction)
def local_scanning(self, event: Event = None):
"""
@@ -185,7 +201,7 @@ class DynamicWeChat(_PluginBase):
current_time = datetime.now()
future_time = current_time + timedelta(seconds=110)
self._future_timestamp = int(future_time.timestamp())
logger.info("请重新进入插件面板扫码!每20秒检查登录状态最大尝试5次")
logger.info("请重新进入插件面板扫码! 每20秒检查登录状态最大尝试5次")
max_attempts = 5
attempt = 0
while attempt < max_attempts:
@@ -197,9 +213,9 @@ class DynamicWeChat(_PluginBase):
self.click_app_management_buttons(page)
break
else:
logger.info("未检测到登录,任务结束")
logger.info("用户可能没有扫码或登录失败")
else:
logger.info("未找到二维码,任务结束")
logger.error("未找到二维码,任务结束")
logger.info("----------------------本次任务结束----------------------")
browser.close()
except Exception as e:
@@ -218,10 +234,6 @@ class DynamicWeChat(_PluginBase):
event_data = event.event_data
if not event_data or event_data.get("action") != "dynamicwechat":
return
# logger.info("收到命令开始检测公网IP ...")
# self.post_message(channel=event.event_data.get("channel"),
# title="开始检测公网IP ...",
# userid=event.event_data.get("user"))
logger.info("开始检测公网IP")
if self.CheckIP():
@@ -398,6 +410,7 @@ class DynamicWeChat(_PluginBase):
def _update_cookie(self, page, context):
self._future_timestamp = 0 # 标记二维码失效
PyCookieCloud.save_cookie_lifetime(0) # 重置cookie存活时间
if self._use_cookiecloud:
if not self._cc_server: # 连接失败返回 False
self.try_connect_cc() # 再尝试一次连接
@@ -453,7 +466,6 @@ class DynamicWeChat(_PluginBase):
else: # 不使用CookieCloud
return
cookie = self.parse_cookie_header(cookie_header)
self._cookie_from_CC = cookie
return cookie
except Exception as e:
logger.error(f"从CookieCloud获取cookie错误错误原因:{e}")
@@ -485,7 +497,11 @@ class DynamicWeChat(_PluginBase):
page.goto(self._wechatUrl)
time.sleep(3)
if not self.check_login_status(page, task='refresh_cookie'):
self._cookie_valid = False
logger.info("cookie已失效下次IP变动推送二维码")
else:
PyCookieCloud.increase_cookie_lifetime(1200)
self._cookie_lifetime = PyCookieCloud.load_cookie_lifetime()
browser.close()
except Exception as e:
logger.error(f"cookie校验失败:{e}")
@@ -536,8 +552,8 @@ class DynamicWeChat(_PluginBase):
except Exception as e:
# logger.debug(str(e)) # 基于bug运行请不要将错误输出到日志
# try: # 没有登录成功,也没有短信验证码
if self.find_qrc(page) and not task == 'refresh_cookie': # 延长任务找到的二维码不会被发送,所以不算用户没有扫码
logger.error(f"用户没有扫描二维码")
if self.find_qrc(page) and not task == 'refresh_cookie' and not task == 'local_scanning': # 延长任务找到的二维码不会被发送,所以不算用户没有扫码
logger.warning(f"用户没有扫描二维码")
return False
def click_app_management_buttons(self, page):
@@ -672,7 +688,6 @@ class DynamicWeChat(_PluginBase):
"helloimg_s_token": self._helloimg_s_token,
"pushplus_token": self._pushplus_token,
"input_id_list": self._input_id_list,
"cookie_from_CC": self._cookie_from_CC,
"cookie_header": self._cookie_header,
"use_cookiecloud": self._use_cookiecloud,
})
@@ -772,7 +787,7 @@ class DynamicWeChat(_PluginBase):
'component': 'VSwitch',
'props': {
'model': 'local_scan',
'label': '扫码刷新Cookie和改IP',
'label': '本地扫码修改IP',
}
}
]
@@ -878,7 +893,7 @@ class DynamicWeChat(_PluginBase):
'props': {
'type': 'info',
'variant': 'tonal',
'text': '使用内建CookieCloud 或 自定义 或 填写两个token 至少三选一,否则无法正常使用'
'text': '使用内建CookieCloud 或 自定义 或 填写两个token 至少三选一。任何扫码操作都会更新Cookie'
}
}
]
@@ -920,7 +935,6 @@ class DynamicWeChat(_PluginBase):
"input_id_list": "",
}
def get_page(self) -> List[dict]:
# 获取当前时间戳
current_time = datetime.now().timestamp()
@@ -973,6 +987,34 @@ class DynamicWeChat(_PluginBase):
}
}
# 计算 cookie_lifetime 的天数、小时数和分钟数
cookie_lifetime_days = self._cookie_lifetime // 86400 # 一天的秒数为 86400
cookie_lifetime_hours = (self._cookie_lifetime % 86400) // 3600 # 计算小时数
cookie_lifetime_minutes = (self._cookie_lifetime % 3600) // 60 # 计算分钟数
if self._cookie_valid:
bg_color = "#40bb45"
else:
bg_color = "#ff0000"
cookie_lifetime_text = (
f"Cookie 已使用: {cookie_lifetime_days}{cookie_lifetime_hours}小时{cookie_lifetime_minutes}分钟"
)
cookie_lifetime_component = {
"component": "div",
"text": cookie_lifetime_text,
"props": {
"style": {
"fontSize": "18px",
"color": "#ffffff", # 白色字体
"backgroundColor": bg_color,
"padding": "10px",
"borderRadius": "5px",
"textAlign": "center",
"marginTop": "10px",
"display": "inline-block"
}
}
}
# 页面内容,显示二维码状态信息和二维码图片或提示信息
base_content = [
{
@@ -996,13 +1038,23 @@ class DynamicWeChat(_PluginBase):
"borderRadius": "5px",
"display": "inline-block",
"textAlign": "center",
"marginBottom": "40px"
"marginBottom": "10px"
}
}
}
},
{
"component": "div",
"content": [cookie_lifetime_component],
"props": {
"style": {
"textAlign": "center",
"marginBottom": "10px"
}
}
},
img_component # 二维码图片或提示信息
]
},
img_component # 二维码图片
}
]
return base_content

View File

@@ -1,144 +0,0 @@
from Cryptodome import Random
from Cryptodome.Cipher import AES
import base64
import json
import hashlib
import requests
from playwright.sync_api import sync_playwright
from typing import Dict, Any
class PyCookieCloud:
def __init__(self, url: str, uuid: str, password: str):
self.url: str = url
self.uuid: str = uuid
self.password: str = password
self.BLOCK_SIZE = 16
def check_connection(self) -> bool:
"""
Test the connection to the CookieCloud server.
:return: True if the connection is successful, False otherwise.
"""
try:
resp = requests.get(self.url)
# print(self.url)
if resp.status_code == 200:
return True
else:
return False
except Exception as e:
print(str(e))
return False
def update_cookie(self, cookie: Dict[str, Any]) -> bool:
"""
Update cookie data to CookieCloud.
:param cookie: cookie value to update, if this cookie does not contain 'cookie_data' key, it will be added into 'cookie_data'.
:return: if update success, return True, else return False.
"""
# 确保 cookie 是完整的结构,并直接放入 cookie_data 中
# cookie_data = {
# "cookie_data": cookie # 直接将 cookie 数据放入 cookie_data
# }
raw_data = json.dumps(cookie)
encrypted_data = self.encrypt(raw_data.encode('utf-8'), self.get_the_key().encode('utf-8')).decode('utf-8')
request_data = {'uuid': self.uuid, 'encrypted': encrypted_data}
print("请求数据:", request_data) # 打印请求数据
# headers = {'Content-Type': 'application/json'} # 设置请求头为 JSON
cookie_cloud_request = requests.post(self.url + '/update', json=request_data)
print(cookie_cloud_request) # 打印响应对象
if cookie_cloud_request.status_code != 200:
print("错误信息:", cookie_cloud_request.text) # 打印错误信息
if cookie_cloud_request.status_code == 200:
if cookie_cloud_request.json().get('action') == 'done':
return True
return False
def get_the_key(self) -> str:
"""
Get the key used to encrypt and decrypt data.
:return: the key.
"""
md5 = hashlib.md5()
md5.update((self.uuid + '-' + self.password).encode('utf-8'))
return md5.hexdigest()[:16]
@staticmethod
def bytes_to_key(data, salt, output=48):
# extended from https://gist.github.com/gsakkis/4546068
assert len(salt) == 8, len(salt)
data += salt
key = hashlib.md5(data).digest()
final_key = key
while len(final_key) < output:
key = hashlib.md5(key + data).digest()
final_key += key
return final_key[:output]
def pad(self, data):
length = self.BLOCK_SIZE - (len(data) % self.BLOCK_SIZE)
return data + (chr(length) * length).encode()
def encrypt(self, message: bytes, passphrase: bytes) -> bytes:
# 请替换为实际的加密实现,以下是示例
# 使用 AES 或其他算法进行加密
# 这里只是一个占位符,实际实现请根据需要修改
# def encrypt(message, passphrase):
salt = Random.new().read(8)
key_iv = self.bytes_to_key(passphrase, salt, 32 + 16)
key = key_iv[:32]
iv = key_iv[32:]
aes = AES.new(key, AES.MODE_CBC, iv)
return base64.b64encode(b"Salted__" + salt + aes.encrypt(self.pad(message)))
# return message # 示例:返回原始消息
def main(server: str, url: str, uuid: str, password: str):
with sync_playwright() as p:
browser = p.chromium.launch(headless=False)
page = browser.new_page()
# 打开指定的 URL
page.goto(url)
# 等待 60 秒用户登录
print("请在30秒内完成登录...")
page.wait_for_timeout(30000) # 等待60秒
# 获取 cookies
cookies = page.context.cookies()
# 关闭浏览器
browser.close()
# 创建 PyCookieCloud 实例并上传 cookies
py_cookie_cloud = PyCookieCloud(url=server, uuid=uuid, password=password)
cookie_data = {cookie['name']: cookie['value'] for cookie in cookies} # 转换为字典形式
if py_cookie_cloud.check_connection():
print("连接成功,请稍等片刻...")
result = py_cookie_cloud.update_cookie(cookie_data)
else:
print("连接失败,请检查网络连接")
result = False
if result:
print("Cookies 上传成功!")
else:
print("Cookies 上传失败!")
if __name__ == "__main__":
# 设置参数
server = "http://172.16.8.110:43000/cookiecloud"
target_url = "https://work.weixin.qq.com/wework_admin/loginpage_wx?from=myhome" # 请替换为实际的目标 URL
uuid = "hFQrymvqMBX11d14TTmKb6" # 替换为实际的 UUID
password = "2Bfr3LmzVy3t3bsQ5FLAbZ" # 替换为实际的密码
main(server, target_url, uuid, password)

View File

@@ -1,11 +1,15 @@
from typing import Dict, Any
import os
import json
import requests
import base64
import hashlib
from typing import Dict, Any
from Crypto import Random
from Crypto.Cipher import AES
script_dir = os.path.dirname(os.path.abspath(__file__))
settings_file = os.path.join(script_dir, 'settings.json')
def bytes_to_key(data: bytes, salt: bytes, output=48) -> bytes:
# 兼容v2 将bytes_to_key和encrypt导入
@@ -34,6 +38,7 @@ def encrypt(message: bytes, passphrase: bytes) -> bytes:
data = message + (chr(length) * length).encode()
return base64.b64encode(b"Salted__" + salt + aes.encrypt(data))
class PyCookieCloud:
def __init__(self, url: str, uuid: str, password: str):
self.url: str = url
@@ -63,7 +68,8 @@ class PyCookieCloud:
cookie = {'cookie_data': cookie}
raw_data = json.dumps(cookie)
encrypted_data = encrypt(raw_data.encode('utf-8'), self.get_the_key().encode('utf-8')).decode('utf-8')
cookie_cloud_request = requests.post(self.url + '/update', json={'uuid': self.uuid, 'encrypted': encrypted_data})
cookie_cloud_request = requests.post(self.url + '/update',
json={'uuid': self.uuid, 'encrypted': encrypted_data})
if cookie_cloud_request.status_code == 200:
if cookie_cloud_request.json()['action'] == 'done':
return True
@@ -78,3 +84,24 @@ class PyCookieCloud:
md5 = hashlib.md5()
md5.update((self.uuid + '-' + self.password).encode('utf-8'))
return md5.hexdigest()[:16]
@staticmethod
def load_cookie_lifetime(): # 返回时间戳 单位秒
if os.path.exists(settings_file):
with open(settings_file, 'r') as file:
settings = json.load(file)
return settings.get('_cookie_lifetime', 0)
else:
return 0
@staticmethod
def save_cookie_lifetime(cookie_lifetime): # 传入时间戳 单位秒
with open(settings_file, 'w') as file:
json.dump({'_cookie_lifetime': cookie_lifetime}, file)
@staticmethod
def increase_cookie_lifetime(seconds: int):
current_lifetime = PyCookieCloud.load_cookie_lifetime()
new_lifetime = current_lifetime + seconds
# 保存新的 _cookie_lifetime
PyCookieCloud.save_cookie_lifetime(new_lifetime)