add SyncCookieCloud

This commit is contained in:
jxxghp
2024-10-18 13:28:36 +08:00
parent 315e88a075
commit 195c50d8e1
6 changed files with 443 additions and 75 deletions

View File

@@ -862,5 +862,21 @@
"history": {
"v1.1.3": "关闭cookie输入框延长cookie任务成功时不输出日志使用设定中的CookieCloud设置"
}
},
"SyncCookieCloud": {
"name": "同步CookieCloud",
"description": "同步MoviePilot站点Cookie到本地CookieCloud。",
"labels": "站点",
"version": "1.4",
"icon": "Cookiecloud_A.png",
"author": "thsrite",
"level": 1,
"history": {
"v1.4": "修复问题",
"v1.3": "感谢MidnightShake共享代码同步时保留MoviePilot不匹配站点的cookie",
"v1.2": "同步到本地CookieCloud",
"v1.1": "修复CookieCloud覆盖到浏览器",
"v1.0": "同步MoviePilot站点Cookie到CookieCloud"
}
}
}

View File

@@ -1,22 +1,24 @@
from app.core.event import eventmanager, Event
import io
import random
import re
import time
import requests
import random
import io
from playwright.sync_api import sync_playwright
from datetime import datetime, timedelta
import pytz
from typing import Optional
from app.schemas.types import EventType
from typing import Tuple, List, Dict, Any
import pytz
import requests
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from playwright.sync_api import sync_playwright
from app.core.config import settings
from app.core.event import eventmanager, Event
from app.helper.cookiecloud import CookieCloudHelper
from app.log import logger
from app.plugins import _PluginBase
from app.core.config import settings
from app.helper.cookiecloud import CookieCloudHelper
from typing import Tuple, List, Dict, Any
from app.plugins.dynamicwechat.update_help import PyCookieCloud
from app.schemas.types import EventType, NotificationType
# import UpdateHelp
@@ -53,15 +55,15 @@ class DynamicWeChat(_PluginBase):
_cc_server = None
_push_qr_now = False
#匹配ip地址的正则
# 匹配ip地址的正则
_ip_pattern = r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b'
# 获取ip地址的网址列表
_ip_urls = ["https://myip.ipip.net", "https://ddns.oray.com/checkip", "https://ip.3322.net", "https://4.ipw.cn"]
# 当前ip地址
_current_ip_address = '0.0.0.0'
#企业微信登录
# 企业微信登录
_wechatUrl = 'https://work.weixin.qq.com/wework_admin/loginpage_wx?from=myhome'
#检测间隔时间,默认10分钟
# 检测间隔时间,默认10分钟
_refresh_cron = '*/20 * * * *'
# _urls = []
_input_id_list = ''
@@ -267,6 +269,7 @@ class DynamicWeChat(_PluginBase):
logger.warning("未找到二维码")
return False
except Exception as e:
logger.debug(str(e))
return False
def remote_push_qr(self):
@@ -286,14 +289,20 @@ class DynamicWeChat(_PluginBase):
if self.find_qrc(page):
if self._pushplus_token and self._helloimg_s_token:
img_src, refuse_time = self.upload_image(self._qr_code_image)
self.send_pushplus_message(refuse_time, f"企业微信登录二维码<br/><img src='{img_src}' />")
logger.info("二维码已经发送,等待用户 90 秒内扫码登录")
logger.info("如收到短信验证码请以?结束,发送到<企业微信应用> 如: 110301")
time.sleep(90)
login_status = self.check_login_status(page, '')
if login_status:
self._update_cookie(page, context) # 刷新cookie
self.click_app_management_buttons(page)
if img_src:
self.post_message(
mtype=NotificationType.Plugin,
title="企业微信登录二维码",
text=refuse_time,
image=img_src
)
logger.info("二维码已经发送,等待用户 90 秒内扫码登录")
logger.info("如收到短信验证码请以?结束,发送到<企业微信应用> 如: 110301")
time.sleep(90)
login_status = self.check_login_status(page, '')
if login_status:
self._update_cookie(page, context) # 刷新cookie
self.click_app_management_buttons(page)
else:
logger.warning("远程推送任务 未配置pushplus_token 或 helloimg_s_token")
else:
@@ -302,7 +311,6 @@ class DynamicWeChat(_PluginBase):
except Exception as e:
logger.error(f"远程推送任务 推送二维码失败: {e}")
def ChangeIP(self):
logger.info("开始请求企业微信管理更改可信IP")
try:
@@ -321,15 +329,20 @@ class DynamicWeChat(_PluginBase):
if self.find_qrc(page):
if self._pushplus_token and self._helloimg_s_token:
img_src, refuse_time = self.upload_image(self._qr_code_image)
self.send_pushplus_message(refuse_time, f"企业微信登录二维码<br/><img src='{img_src}' />")
logger.info("二维码已经发送,等待用户 90 秒内扫码登录")
logger.info("如收到短信验证码请以?结束,发送到<企业微信应用> 如: 110301")
time.sleep(90) # 等待用户扫码
login_status = self.check_login_status(page, "")
if login_status:
self._update_cookie(page, context) # 刷新cookie
self.click_app_management_buttons(page)
if img_src:
self.post_message(
mtype=NotificationType.Plugin,
title="企业微信登录二维码",
text=refuse_time,
image=img_src
)
logger.info("二维码已经发送,等待用户 90 秒内扫码登录")
logger.info("如收到短信验证码请以?结束,发送到<企业微信应用> 如: 110301")
time.sleep(90) # 等待用户扫码
login_status = self.check_login_status(page, "")
if login_status:
self._update_cookie(page, context) # 刷新cookie
self.click_app_management_buttons(page)
else:
self._ip_changed = False
else:
@@ -401,7 +414,6 @@ class DynamicWeChat(_PluginBase):
if cookie_header == '':
cookie_header = self._cookie_header
else: # 不使用CookieCloud
cookie_header = self._cookie_header
return
cookie = self.parse_cookie_header(cookie_header)
self._cookie_from_CC = cookie
@@ -411,7 +423,8 @@ class DynamicWeChat(_PluginBase):
# logger.info("尝试推送登录二维码")
return
def parse_cookie_header(self, cookie_header):
@staticmethod
def parse_cookie_header(cookie_header):
cookies = []
for cookie in cookie_header.split(';'):
name, value = cookie.strip().split('=', 1)
@@ -435,8 +448,8 @@ class DynamicWeChat(_PluginBase):
page.goto(self._wechatUrl)
time.sleep(3)
if not self.check_login_status(page, task='refresh_cookie'):
# pass
# else:
# pass
# else:
logger.info("cookie已失效下次IP变动推送二维码")
browser.close()
except Exception as e:
@@ -457,7 +470,7 @@ class DynamicWeChat(_PluginBase):
logger.info("登录成功!")
return True
except Exception as e:
# logger.error(f"检查登录状态时发生错误: {e}")
logger.debug(str(e))
pass
try:
@@ -482,6 +495,7 @@ class DynamicWeChat(_PluginBase):
logger.error("未收到短信验证码")
return False
except Exception as e:
logger.debug(str(e))
# try: # 没有登录成功,也没有短信验证码。 查找二维码是否还存在
if self.find_qrc(page):
logger.error(f"用户没有扫描二维码")
@@ -493,15 +507,14 @@ class DynamicWeChat(_PluginBase):
buttons = [
# ("//span[@class='frame_nav_item_title' and text()='应用管理']", "应用管理"),
# ("//div[@class='app_index_item_title ' and contains(text(), 'MoviePilot')]", "MoviePilot"),
(
"//div[contains(@class, 'js_show_ipConfig_dialog')]//a[contains(@class, '_mod_card_operationLink') and text()='配置']",
"配置")
("//div[contains(@class, 'js_show_ipConfig_dialog')]//a[contains(@class, '_mod_card_operationLink') and text()='配置']",
"配置")
]
if self._input_id_list:
id_list = self._input_id_list.split(",")
app_urls = [f"{bash_url}{app_id.strip()}" for app_id in id_list]
for app_url in app_urls:
page.goto(app_url) # 打开应用详情页
page.goto(app_url) # 打开应用详情页
# logger.info(f"已打开{app_url}")
time.sleep(2)
# 依次点击每个按钮
@@ -530,20 +543,6 @@ class DynamicWeChat(_PluginBase):
logger.error("未找到应用id修改IP失败")
return
def send_pushplus_message(self, title, content):
pushplus_url = f"http://www.pushplus.plus/send/{self._pushplus_token}"
pushplus_data = {
"title": title,
"content": content,
"template": "html"
}
# if wait_time > 2:
# # time.sleep(wait_time)
# logger.info(f"pushplus API 调用次数限制,本次不发送 至少间隔 {wait_time} 秒")
# else:
response = requests.post(pushplus_url, json=pushplus_data)
# return response
def upload_image(self, file_obj, permission=1, strategy_id=1, album_id=1):
"""
上传图片到 helloimg 图床,支持传入文件路径或 BytesIO 对象。
@@ -587,6 +586,7 @@ class DynamicWeChat(_PluginBase):
response = requests.post(helloimg_url, headers=headers, files=files, data=helloimg_data)
# 检查响应内容是否符合预期
response_data = None
try:
response_data = response.json()
if not response_data['status']:
@@ -893,7 +893,6 @@ class DynamicWeChat(_PluginBase):
logger.info("远程命令开始推送二维码")
self.remote_push_qr()
@staticmethod
def get_command() -> List[Dict[str, Any]]:
return [
@@ -911,8 +910,6 @@ class DynamicWeChat(_PluginBase):
def get_api(self) -> List[Dict[str, Any]]:
pass
@eventmanager.register(EventType.UserMessage)
def talk(self, event: Event):
"""
@@ -958,19 +955,7 @@ class DynamicWeChat(_PluginBase):
if self._scheduler:
self._scheduler.remove_all_jobs()
if self._scheduler.running:
self._event.set()
self._scheduler.shutdown()
self._event.clear()
self._scheduler = None
except Exception as e:
logger.error(str(e))

View File

@@ -9,10 +9,12 @@ import base64
BLOCK_SIZE = 16
def pad(data):
length = BLOCK_SIZE - (len(data) % BLOCK_SIZE)
return data + (chr(length) * length).encode()
def bytes_to_key(data, salt, output=48):
# extended from https://gist.github.com/gsakkis/4546068
assert len(salt) == 8, len(salt)
@@ -24,6 +26,7 @@ def bytes_to_key(data, salt, output=48):
final_key += key
return final_key[:output]
def encrypt(message, passphrase):
salt = Random.new().read(8)
key_iv = bytes_to_key(passphrase, salt, 32 + 16)
@@ -32,6 +35,7 @@ def encrypt(message, passphrase):
aes = AES.new(key, AES.MODE_CBC, iv)
return base64.b64encode(b"Salted__" + salt + aes.encrypt(pad(message)))
class PyCookieCloud:
def __init__(self, url: str, uuid: str, password: str):
self.url: str = url
@@ -51,6 +55,7 @@ class PyCookieCloud:
else:
return False
except Exception as e:
print(str(e))
return False
def update_cookie(self, cookie: Dict[str, Any]) -> bool:
@@ -64,7 +69,8 @@ class PyCookieCloud:
cookie = {'cookie_data': cookie}
raw_data = json.dumps(cookie)
encrypted_data = encrypt(raw_data.encode('utf-8'), self.get_the_key().encode('utf-8')).decode('utf-8')
cookie_cloud_request = requests.post(urljoin(self.url, '/update'), data={'uuid': self.uuid, 'encrypted': encrypted_data})
cookie_cloud_request = requests.post(urljoin(self.url, '/update'),
data={'uuid': self.uuid, 'encrypted': encrypted_data})
if cookie_cloud_request.status_code == 200:
if cookie_cloud_request.json()['action'] == 'done':
return True
@@ -78,4 +84,4 @@ class PyCookieCloud:
"""
md5 = hashlib.md5()
md5.update((self.uuid + '-' + self.password).encode('utf-8'))
return md5.hexdigest()[:16]
return md5.hexdigest()[:16]

View File

@@ -30,6 +30,7 @@ class PyCookieCloud:
else:
return False
except Exception as e:
print(str(e))
return False
def update_cookie(self, cookie: Dict[str, Any]) -> bool:
@@ -70,7 +71,8 @@ class PyCookieCloud:
md5.update((self.uuid + '-' + self.password).encode('utf-8'))
return md5.hexdigest()[:16]
def bytes_to_key(self, data, salt, output=48):
@staticmethod
def bytes_to_key(data, salt, output=48):
# extended from https://gist.github.com/gsakkis/4546068
assert len(salt) == 8, len(salt)
data += salt
@@ -120,7 +122,7 @@ def main(server: str, url: str, uuid: str, password: str):
# 创建 PyCookieCloud 实例并上传 cookies
py_cookie_cloud = PyCookieCloud(url=server, uuid=uuid, password=password)
cookie_data = {cookie['name']: cookie['value'] for cookie in cookies} # 转换为字典形式
if (py_cookie_cloud.check_connection()):
if py_cookie_cloud.check_connection():
print("连接成功,请稍等片刻...")
result = py_cookie_cloud.update_cookie(cookie_data)
else:

View File

@@ -24,6 +24,7 @@ class PyCookieCloud:
else:
return False
except Exception as e:
print(str(e))
return False
def update_cookie(self, cookie: Dict[str, Any]) -> bool:
@@ -52,5 +53,3 @@ class PyCookieCloud:
md5 = hashlib.md5()
md5.update((self.uuid + '-' + self.password).encode('utf-8'))
return md5.hexdigest()[:16]

View File

@@ -0,0 +1,360 @@
import json
from datetime import datetime, timedelta
from hashlib import md5
from urllib.parse import urlparse
import pytz
from app.core.config import settings
from app.db.site_oper import SiteOper
from app.plugins import _PluginBase
from typing import Any, List, Dict, Tuple, Optional
from app.log import logger
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from app.utils.common import encrypt, decrypt
class SyncCookieCloud(_PluginBase):
# 插件名称
plugin_name = "同步CookieCloud"
# 插件描述
plugin_desc = "同步MoviePilot站点Cookie到本地CookieCloud。"
# 插件图标
plugin_icon = "Cookiecloud_A.png"
# 插件版本
plugin_version = "1.4"
# 插件作者
plugin_author = "thsrite"
# 作者主页
author_url = "https://github.com/thsrite"
# 插件配置项ID前缀
plugin_config_prefix = "synccookiecloud_"
# 加载顺序
plugin_order = 28
# 可使用的用户级别
auth_level = 1
# 私有属性
_enabled: bool = False
_onlyonce: bool = False
_cron: str = ""
siteoper = None
_scheduler: Optional[BackgroundScheduler] = None
def init_plugin(self, config: dict = None):
self.siteoper = SiteOper()
# 停止现有任务
self.stop_service()
if config:
self._enabled = config.get("enabled")
self._onlyonce = config.get("onlyonce")
self._cron = config.get("cron")
if self._enabled or self._onlyonce:
# 定时服务
self._scheduler = BackgroundScheduler(timezone=settings.TZ)
# 立即运行一次
if self._onlyonce:
logger.info(f"同步CookieCloud服务启动立即运行一次")
self._scheduler.add_job(self.__sync_to_cookiecloud, 'date',
run_date=datetime.now(
tz=pytz.timezone(settings.TZ)) + timedelta(seconds=3),
name="同步CookieCloud")
# 关闭一次性开关
self._onlyonce = False
# 保存配置
self.__update_config()
# 周期运行
if self._cron:
try:
self._scheduler.add_job(func=self.__sync_to_cookiecloud,
trigger=CronTrigger.from_crontab(self._cron),
name="同步CookieCloud")
except Exception as err:
logger.error(f"定时任务配置错误:{err}")
# 推送实时消息
self.systemmessage.put(f"执行周期配置错误:{err}")
# 启动任务
if self._scheduler.get_jobs():
self._scheduler.print_jobs()
self._scheduler.start()
def __sync_to_cookiecloud(self):
"""
同步站点cookie到cookiecloud
"""
# 获取所有站点
sites = self.siteoper.list_order_by_pri()
if not sites:
return
if not settings.COOKIECLOUD_ENABLE_LOCAL:
logger.error('本地CookieCloud服务器未启用')
return
cookies = {}
for site in sites:
domain = urlparse(site.url).netloc
cookie = site.cookie
if not cookie:
logger.error(f"站点 {domain} 无cookie跳过处理...")
continue
# 解析cookie
site_cookies = []
for ck in cookie.split(";"):
kv = ck.split("=")
if len(kv) < 2:
continue
site_cookies.append({
"domain": domain,
"name": ck.split("=")[0],
"value": ck.split("=")[1]
})
# 存储cookies
cookies[domain] = site_cookies
if cookies:
decrypted_cookies_data, errmsg = self.__decrypted()
if decrypted_cookies_data:
update_data = self.__build_data(cookies, decrypted_cookies_data)
crypt_key = self._get_crypt_key()
try:
cookies = {'cookie_data': update_data}
encrypted_data = encrypt(json.dumps(cookies).encode('utf-8'), crypt_key).decode('utf-8')
except Exception as e:
logger.error(f"CookieCloud加密失败{e}")
return
ck = {'encrypted': encrypted_data}
cookie_path = settings.COOKIE_PATH / f"{settings.COOKIECLOUD_KEY}.json"
cookie_path.write_bytes(json.dumps(ck).encode('utf-8'))
logger.info(f"同步站点cookie到CookieCloud成功")
else:
logger.error(f"同步站点cookie到CookieCloud失败{errmsg}")
def __decrypted(self):
"""
获取并解密本地CookieCloud数据
"""
encrypt_data = self.__load_local_encrypt_data()
if not encrypt_data:
return {}, "未获取到本地CookieCloud数据"
encrypted = encrypt_data.get("encrypted")
if not encrypted:
return {}, "未获取到cookie密文"
else:
crypt_key = self._get_crypt_key()
try:
decrypted_data = decrypt(encrypted, crypt_key).decode('utf-8')
result = json.loads(decrypted_data)
except Exception as e:
return {}, "cookie解密失败" + str(e)
if not result:
return {}, "cookie解密为空"
if result.get("cookie_data"):
contents = result.get("cookie_data")
else:
contents = result
return contents
@staticmethod
def __load_local_encrypt_data() -> Dict[str, Any]:
"""
加载本地CookieCloud加密数据
"""
file_path = settings.COOKIE_PATH / f"{settings.COOKIECLOUD_KEY}.json"
# 检查文件是否存在
if not file_path.exists():
return {}
# 读取文件
with open(file_path, encoding="utf-8", mode="r") as file:
read_content = file.read()
data = json.loads(read_content.encode("utf-8"))
return data
@staticmethod
def __build_data(in_list: dict, out_list: dict) -> dict:
"""
构建站点数据
"""
# 清除空值
out_list = {key: value for key, value in out_list.items() if value}
temp_list = {}
for domain in in_list.keys():
# 构建站点数据模板
template = {}
for domain_out in out_list:
if domain.endswith(domain_out):
for d in out_list[domain_out]:
for key, value in d.items():
if key not in template:
template[key] = value
# 构建站点新数据
temp_list[domain] = []
for d1 in in_list[domain]:
temp_dict = {k: template.get(k, "") for k in template.keys()}
temp_dict.update(d1)
temp_list[domain].append(temp_dict)
# 覆盖修改源站点数据
for temp_domain in temp_list.keys():
found_match = False
for idx, domain2 in enumerate(out_list):
if temp_domain.endswith(domain2):
out_list[temp_domain] = out_list.pop(domain2)
out_list[temp_domain] = temp_list[temp_domain]
found_match = True
break
if not found_match:
out_list[temp_domain] = temp_list[temp_domain]
return out_list
@staticmethod
def _get_crypt_key() -> bytes:
"""
使用UUID和密码生成CookieCloud的加解密密钥
"""
md5_generator = md5()
md5_generator.update(
(str(settings.COOKIECLOUD_KEY).strip() + '-' + str(settings.COOKIECLOUD_PASSWORD).strip()).encode('utf-8'))
return (md5_generator.hexdigest()[:16]).encode('utf-8')
def __update_config(self):
self.update_config({
"enabled": self._enabled,
"onlyonce": self._onlyonce,
"cron": self._cron
})
def get_state(self) -> bool:
return self._enabled
@staticmethod
def get_command() -> List[Dict[str, Any]]:
pass
def get_api(self) -> List[Dict[str, Any]]:
pass
def get_form(self) -> Tuple[List[dict], Dict[str, Any]]:
"""
拼装插件配置页面需要返回两块数据1、页面配置2、数据结构
"""
return [
{
'component': 'VForm',
'content': [
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 6
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'enabled',
'label': '启用插件',
}
}
]
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 6
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'onlyonce',
'label': '立即运行一次',
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VTextField',
'props': {
'model': 'cron',
'label': '执行周期',
'placeholder': '5位cron表达式留空自动'
}
}
]
},
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VAlert',
'props': {
'type': 'info',
'variant': 'tonal',
'text': '需要MoviePilot设定-站点启用本地CookieCloud服务器。'
}
}
]
}
]
},
]
}
], {
"enabled": False,
"onlyonce": False,
"cron": "5 1 * * *",
}
def get_page(self) -> List[dict]:
pass
def stop_service(self):
"""
退出插件
"""
try:
if self._scheduler:
self._scheduler.remove_all_jobs()
if self._scheduler.running:
self._scheduler.shutdown()
self._scheduler = None
except Exception as e:
logger.error("退出插件失败:%s" % str(e))