Add plugin ContractCheck

This commit is contained in:
xuzhi
2024-03-21 14:32:17 +00:00
parent e6dd0064bb
commit 75c201b640
7 changed files with 1405 additions and 23 deletions

BIN
icons/contract.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 768 KiB

View File

@@ -418,7 +418,7 @@
"IpDetect": {
"name": "本地IP检测",
"description": "如果QB、TR等服务在本地部署当本地IP改变时自动修改其server IP",
"version": "1.0",
"version": "1.1",
"icon": "ipAddress.png",
"author": "DzAvril",
"level": 1
@@ -430,5 +430,13 @@
"icon": "trackereditor_A.png",
"author": "honue",
"level": 1
},
"ContractCheck": {
"name": "契约检查",
"description": "定时检查保种契约达成情况",
"version": "1.0",
"icon": "contract.png",
"author": "DzAvril",
"level": 1
}
}

View File

@@ -0,0 +1,886 @@
import re
import warnings
from datetime import datetime, timedelta
from multiprocessing.dummy import Pool as ThreadPool
from threading import Lock
from typing import Optional, Any, List, Dict, Tuple
import pytz
import requests
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from ruamel.yaml import CommentedMap
from app.core.config import settings
from app.core.event import Event
from app.core.event import eventmanager
from app.db.site_oper import SiteOper
from app.helper.browser import PlaywrightHelper
from app.helper.module import ModuleHelper
from app.helper.sites import SitesHelper
from app.log import logger
from app.plugins import _PluginBase
from app.plugins.contractcheck.siteuserinfo import ISiteUserInfo
from app.schemas.types import EventType, NotificationType
from app.utils.http import RequestUtils
from app.utils.string import StringUtils
from app.utils.timer import TimerUtils
warnings.filterwarnings("ignore", category=FutureWarning)
lock = Lock()
class ContractCheck(_PluginBase):
# 插件名称
plugin_name = "契约检查"
# 插件描述
plugin_desc = "定时检查保种契约达成情况"
# 插件图标
plugin_icon = "contract.png"
# 插件版本
plugin_version = "1.0"
# 插件作者
plugin_author = "DzAvril"
# 作者主页
author_url = "https://github.com/DzAvril"
# 插件配置项ID前缀
plugin_config_prefix = "contractcheck_"
# 加载顺序
plugin_order = 1
# 可使用的用户级别
auth_level = 2
class ContractInfo:
def __init__(
self,
site_name: str = "",
official: bool = False,
size: int = 0,
num: int = 0,
duration: int = 0,
date: datetime = datetime.now(),
):
self.site_name: str = site_name
self.official: bool = official
self.size: int = size
self.num: int = num
self.duration: int = duration
self.date: datetime = date
# 私有属性
sites = None
siteoper = None
statistic_sites: list = []
contract_infos: list[ContractInfo] = []
_scheduler: Optional[BackgroundScheduler] = None
_sites_data: dict = {}
_site_schema: List[ISiteUserInfo] = None
# 配置属性
_enabled: bool = False
_onlyonce: bool = False
_cron: str = ""
_notify: bool = False
_queue_cnt: int = 5
_contract_infos: str = ""
def init_plugin(self, config: dict = None):
self.sites = SitesHelper()
self.siteoper = SiteOper()
# 停止现有任务
self.stop_service()
# 配置
if config:
self._enabled = config.get("enabled")
self._onlyonce = config.get("onlyonce")
self._cron = config.get("cron")
self._notify = config.get("notify")
self._queue_cnt = config.get("queue_cnt")
self._contract_infos = config.get("contract_infos")
self.parse_contract_infos(self._contract_infos)
if self._enabled or self._onlyonce:
# 加载模块
self._site_schema = ModuleHelper.load(
"app.plugins.contractcheck.siteuserinfo",
filter_func=lambda _, obj: hasattr(obj, "schema"),
)
self._site_schema.sort(key=lambda x: x.order)
# 站点数据
self._sites_data = {}
# 立即运行一次
if self._onlyonce:
# 定时服务
self._scheduler = BackgroundScheduler(timezone=settings.TZ)
logger.info(f"保种契约检查服务启动,立即运行一次")
self._scheduler.add_job(
self.refresh_all_site_data,
"date",
run_date=datetime.now(tz=pytz.timezone(settings.TZ))
+ timedelta(seconds=3),
)
# 关闭一次性开关
self._onlyonce = False
# 保存配置
self.__update_config()
# 启动任务
if self._scheduler.get_jobs():
self._scheduler.print_jobs()
self._scheduler.start()
def parse_contract_infos(self, infos):
if infos == None:
return
info_list = infos.split("\n")
for info in info_list:
_site_name, _official, _size, _num, _duration, date = info.split("|")
site_id = self._get_site_id(_site_name)
if site_id == None:
logger.error(f"站点{_site_name}不在数据库中,请检查配置!")
continue
date_format = "%Y/%m/%d"
date = datetime.strptime(date, date_format).date()
_official = True if _official == "" else False
c_info: self.ContractInfo = self.ContractInfo(
_site_name,
_official,
int(_size) * 1024 * 1024 * 1024,
int(_num),
int(_duration),
date,
)
self.contract_infos.append(c_info)
self.statistic_sites.append(site_id)
def _get_site_id(self, name):
all_sites = [site for site in self.siteoper.list_order_by_pri()] + [
site for site in self.__custom_sites()
]
for site in all_sites:
if name == site.name:
return site.id
return None
def get_state(self) -> bool:
return self._enabled
@staticmethod
def get_command() -> List[Dict[str, Any]]:
"""
定义远程控制命令
:return: 命令关键字、事件、描述、附带数据
"""
return [
{
"cmd": "/contract_check",
"event": EventType.PluginAction,
"desc": "保种契约检查",
"category": "",
"data": {"action": "contract_check"},
}
]
def get_api(self) -> List[Dict[str, Any]]:
"""
获取插件API
[{
"path": "/xx",
"endpoint": self.xxx,
"methods": ["GET", "POST"],
"summary": "API说明"
}]
"""
pass
def get_service(self) -> List[Dict[str, Any]]:
"""
注册插件公共服务
[{
"id": "服务ID",
"name": "服务名称",
"trigger": "触发器cron/interval/date/CronTrigger.from_crontab()",
"func": self.xxx,
"kwargs": {} # 定时器参数
}]
"""
if self._enabled and self._cron:
return [
{
"id": "ContractCheck",
"name": "契约检查服务",
"trigger": CronTrigger.from_crontab(self._cron),
"func": self.refresh_all_site_data,
"kwargs": {},
}
]
elif self._enabled:
triggers = TimerUtils.random_scheduler(
num_executions=1,
begin_hour=0,
end_hour=1,
min_interval=1,
max_interval=60,
)
ret_jobs = []
for trigger in triggers:
ret_jobs.append(
{
"id": f"ContractCheck|{trigger.hour}:{trigger.minute}",
"name": "契约检查服务",
"trigger": "cron",
"func": self.refresh_all_site_data,
"kwargs": {"hour": trigger.hour, "minute": trigger.minute},
}
)
return ret_jobs
return []
def get_form(self) -> Tuple[List[dict], Dict[str, Any]]:
"""
拼装插件配置页面需要返回两块数据1、页面配置2、数据结构
"""
return [
{
"component": "VForm",
"content": [
{
"component": "VRow",
"content": [
{
"component": "VCol",
"props": {"cols": 12, "md": 4},
"content": [
{
"component": "VSwitch",
"props": {
"model": "enabled",
"label": "启用插件",
},
}
],
},
{
"component": "VCol",
"props": {"cols": 12, "md": 4},
"content": [
{
"component": "VSwitch",
"props": {
"model": "notify",
"label": "发送通知",
},
}
],
},
{
"component": "VCol",
"props": {"cols": 12, "md": 4},
"content": [
{
"component": "VSwitch",
"props": {
"model": "onlyonce",
"label": "立即运行一次",
},
}
],
},
{
"component": "VCol",
"props": {"cols": 12, "md": 6},
"content": [
{
"component": "VTextField",
"props": {
"model": "cron",
"label": "执行周期",
"placeholder": "5位cron表达式留空自动",
},
}
],
},
{
"component": "VCol",
"props": {"cols": 12, "md": 6},
"content": [
{
"component": "VTextField",
"props": {
"model": "queue_cnt",
"label": "队列数量",
},
}
],
},
{
"component": "VCol",
"props": {"cols": 12, "md": 12},
"content": [
{
"component": "VTextarea",
"props": {
"model": "contract_infos",
"label": "契约信息",
"rows": 6,
"placeholder": "站点|是否官种|契约体积(G)|契约周期(天)|契约数量(没要求填0)|开始时间(2024/01/01)",
},
}
],
},
],
},
{
"component": "VRow",
"content": [
{
"component": "VCol",
"props": {
"cols": 12,
},
"content": [
{
"component": "VAlert",
"props": {
"type": "info",
"variant": "tonal",
"text": "契约格式为:站点名称|是否官种|体积|数量|周期|开始时间。其中站点名称和MP站点显示名称一致是否官种填是或否体积单位是GB周期单位是天时间格式为YYYY/MM/DD。例子憨憨|是|2048|200|365|2024/2/6",
},
}
],
},
{
"component": "VCol",
"props": {
"cols": 12,
},
"content": [
{
"component": "VAlert",
"props": {
"type": "info",
"variant": "tonal",
"text": "部分站点的官种信息靠种子名称里的官组名称过滤可能存在官组信息遗漏的情况导致统计信息有误如遇到此情况请提issue告知",
},
}
],
},
{
"component": "VCol",
"props": {
"cols": 12,
},
"content": [
{
"component": "VAlert",
"props": {
"type": "info",
"variant": "tonal",
"text": "插件作者的PT站点有限没法适配所有站点如有适配站点请与插件作者联系或自行提PR",
},
}
],
},
],
},
],
}
], {
"enabled": False,
"onlyonce": False,
"notify": True,
"cron": "5 1 * * *",
"queue_cnt": 5,
}
def get_page(self) -> List[dict]:
"""
拼装插件详情页面,需要返回页面配置,同时附带数据
"""
logger.info(f"self._sites_data: {self._sites_data} ")
if not self._sites_data:
return [
{
"component": "div",
"text": "暂无数据",
"props": {
"class": "text-center",
},
}
]
# 站点数据明细
site_trs = [
{
"component": "tr",
"props": {"class": "text-sm"},
"content": [
{
"component": "td",
"props": {
"class": "whitespace-nowrap break-keep text-high-emphasis"
},
"text": site,
},
{"component": "td", "text": data.get("is_official")},
{"component": "td", "text": data.get("contract_size")},
{"component": "td", "text": data.get("contract_num")},
{
"component": "td",
"text": str(data.get("contract_duration")) + "",
},
{"component": "td", "text": data.get("contract_start_on")},
{"component": "td", "text": data.get("total_seed_size")},
{"component": "td", "text": data.get("total_seed_num")},
{"component": "td", "text": data.get("official_seed_size")},
{"component": "td", "text": data.get("official_seed_num")},
{
"component": "td",
"props": {
"class": (
"text-success"
if data.get("is_satisfied")
else "text-error"
)
},
"text": "" if data.get("is_satisfied") else "",
},
{"component": "td", "text": data.get("size_gap")},
{"component": "td", "text": data.get("num_gap")},
{"component": "td", "text": str(data.get("duration_gap")) + ""},
],
}
for site, data in self._sites_data.items()
if not data.get("err_msg")
]
# 拼装页面
return [
{
"component": "VRow",
"content": [
# 各站点数据明细
{
"component": "VCol",
"props": {
"cols": 12,
},
"content": [
{
"component": "VTable",
"props": {"hover": True},
"content": [
{
"component": "thead",
"content": [
{
"component": "th",
"props": {"class": "text-start ps-4"},
"text": "契约站点",
},
{
"component": "th",
"props": {"class": "text-start ps-4"},
"text": "是否官种",
},
{
"component": "th",
"props": {"class": "text-start ps-4"},
"text": "契约体积",
},
{
"component": "th",
"props": {"class": "text-start ps-4"},
"text": "契约数量",
},
{
"component": "th",
"props": {"class": "text-start ps-4"},
"text": "契约周期",
},
{
"component": "th",
"props": {"class": "text-start ps-4"},
"text": "开始时间",
},
{
"component": "th",
"props": {"class": "text-start ps-4"},
"text": "保种体积",
},
{
"component": "th",
"props": {"class": "text-start ps-4"},
"text": "保种数量",
},
{
"component": "th",
"props": {"class": "text-start ps-4"},
"text": "官种体积",
},
{
"component": "th",
"props": {"class": "text-start ps-4"},
"text": "官种数量",
},
{
"component": "th",
"props": {"class": "text-start ps-4"},
"text": "是否满足",
},
{
"component": "th",
"props": {"class": "text-start ps-4"},
"text": "需增体积",
},
{
"component": "th",
"props": {"class": "text-start ps-4"},
"text": "需增数量",
},
{
"component": "th",
"props": {"class": "text-start ps-4"},
"text": "剩余时间",
},
],
},
{"component": "tbody", "content": site_trs},
],
}
],
}
],
}
]
def stop_service(self):
"""
退出插件
"""
try:
if self._scheduler:
self._scheduler.remove_all_jobs()
if self._scheduler.running:
self._scheduler.shutdown()
self._scheduler = None
except Exception as e:
logger.error("退出插件失败:%s" % str(e))
def __build_class(self, html_text: str) -> Any:
for site_schema in self._site_schema:
try:
if site_schema.match(html_text):
return site_schema
except Exception as e:
logger.error(f"站点匹配失败 {str(e)}")
return None
def build(self, site_info: CommentedMap) -> Optional[ISiteUserInfo]:
"""
构建站点信息
"""
site_cookie = site_info.get("cookie")
if not site_cookie:
return None
site_name = site_info.get("name")
url = site_info.get("url")
proxy = site_info.get("proxy")
ua = site_info.get("ua")
# 会话管理
with requests.Session() as session:
proxies = settings.PROXY if proxy else None
proxy_server = settings.PROXY_SERVER if proxy else None
render = site_info.get("render")
logger.debug(
f"站点 {site_name} url={url} site_cookie={site_cookie} ua={ua}"
)
if render:
# 演染模式
html_text = PlaywrightHelper().get_page_source(
url=url, cookies=site_cookie, ua=ua, proxies=proxy_server
)
else:
# 普通模式
res = RequestUtils(
cookies=site_cookie, session=session, ua=ua, proxies=proxies
).get_res(url=url)
if res and res.status_code == 200:
if re.search(r"charset=\"?utf-8\"?", res.text, re.IGNORECASE):
res.encoding = "utf-8"
else:
res.encoding = res.apparent_encoding
html_text = res.text
# 第一次登录反爬
if html_text.find("title") == -1:
i = html_text.find("window.location")
if i == -1:
return None
tmp_url = url + html_text[i : html_text.find(";")].replace(
'"', ""
).replace("+", "").replace(" ", "").replace(
"window.location=", ""
)
res = RequestUtils(
cookies=site_cookie, session=session, ua=ua, proxies=proxies
).get_res(url=tmp_url)
if res and res.status_code == 200:
if (
"charset=utf-8" in res.text
or "charset=UTF-8" in res.text
):
res.encoding = "UTF-8"
else:
res.encoding = res.apparent_encoding
html_text = res.text
if not html_text:
return None
else:
logger.error(
"站点 %s 被反爬限制:%s, 状态码:%s"
% (site_name, url, res.status_code)
)
return None
# 兼容假首页情况,假首页通常没有 <link rel="search" 属性
if '"search"' not in html_text and '"csrf-token"' not in html_text:
res = RequestUtils(
cookies=site_cookie, session=session, ua=ua, proxies=proxies
).get_res(url=url + "/index.php")
if res and res.status_code == 200:
if re.search(
r"charset=\"?utf-8\"?", res.text, re.IGNORECASE
):
res.encoding = "utf-8"
else:
res.encoding = res.apparent_encoding
html_text = res.text
if not html_text:
return None
elif res is not None:
logger.error(
f"站点 {site_name} 连接失败,状态码:{res.status_code}"
)
return None
else:
logger.error(f"站点 {site_name} 无法访问:{url}")
return None
# 解析站点类型
if html_text:
site_schema = self.__build_class(html_text)
if not site_schema:
logger.error("站点 %s 无法识别站点类型" % site_name)
return None
return site_schema(
site_name,
url,
site_cookie,
html_text,
session=session,
ua=ua,
proxy=proxy,
)
return None
# 检查契约达成情况,返回是否达成、差多少体积、差多少数量、还剩多少时间
def _check_seed_states(self, contract_info, site_user_info):
is_satisfied = False
is_size_satisfied = False
is_num_satisfied = False
size_gap = 0
num_gap = 0
duration_gap = 0
if contract_info.official:
current_seeding_size = site_user_info.official_seeding_size
else:
current_seeding_size = site_user_info.total_seeding_size
if contract_info.size < current_seeding_size[1]:
is_size_satisfied = True
else:
size_gap = contract_info.size - current_seeding_size[1]
if contract_info.num < current_seeding_size[0]:
is_num_satisfied = True
else:
num_gap = contract_info.num - current_seeding_size[0]
is_satisfied = is_size_satisfied and is_num_satisfied
duration = (datetime.now().date() - contract_info.date).days
if duration < contract_info.duration:
duration_gap = contract_info.duration - duration
return is_satisfied, size_gap, num_gap, duration_gap
def __refresh_site_data(self, site_info: CommentedMap) -> Optional[ISiteUserInfo]:
"""
更新单个site 数据信息
:param site_info:
:return:
"""
site_name = site_info.get("name")
site_url = site_info.get("url")
if not site_url:
return None
try:
site_user_info: ISiteUserInfo = self.build(site_info=site_info)
if site_user_info:
# 开始解析
site_user_info.parse_official_seeding_info()
logger.info(f"站点 {site_name} 解析完成")
# 获取不到数据时,仅返回错误信息,不做历史数据更新
if site_user_info.err_msg:
self._sites_data.update(
{site_name: {"err_msg": site_user_info.err_msg}}
)
return None
contract_info = self.ContractInfo()
for info in self.contract_infos:
if site_name == info.site_name:
contract_info = info
if contract_info is None:
logger.error(f"站点{site_name}不在契约站点列表中,请检查配置")
return site_user_info
is_satisfied, size_gap, num_gap, duration_gap = self._check_seed_states(
contract_info, site_user_info
)
self._sites_data.update(
{
site_name: {
"is_official": "" if contract_info.official else "",
"contract_size": StringUtils.str_filesize(
contract_info.size
),
"contract_num": contract_info.num,
"contract_duration": contract_info.duration,
"contract_start_on": contract_info.date,
"total_seed_num": site_user_info.total_seeding_size[0],
"total_seed_size": StringUtils.str_filesize(
site_user_info.total_seeding_size[1]
),
"official_seed_num": site_user_info.official_seeding_size[
0
],
"official_seed_size": StringUtils.str_filesize(
site_user_info.official_seeding_size[1]
),
"is_satisfied": is_satisfied,
"size_gap": StringUtils.str_filesize(size_gap),
"num_gap": num_gap,
"duration_gap": duration_gap,
"err_msg": site_user_info.err_msg,
}
}
)
return site_user_info
except Exception as e:
logger.error(f"站点 {site_name} 获取流量数据失败:{str(e)}")
return None
@eventmanager.register(EventType.PluginAction)
def refresh(self, event: Event):
"""
刷新站点数据
"""
if event:
event_data = event.event_data
if not event_data or event_data.get("action") != "contract_check":
return
logger.info("收到命令,开始检查保种契约 ...")
self.post_message(
channel=event.event_data.get("channel"),
title="开始检查保种契约 ...",
userid=event.event_data.get("user"),
)
self.refresh_all_site_data()
if event:
self.post_message(
channel=event.event_data.get("channel"),
title="保种契约检查完成!",
userid=event.event_data.get("user"),
)
def refresh_all_site_data(self):
"""
多线程刷新站点下载上传量默认间隔6小时
"""
if not self.sites.get_indexers():
return
logger.info("开始刷新站点数据 ...")
with lock:
all_sites = [
site for site in self.sites.get_indexers() if not site.get("public")
] + self.__custom_sites()
# 没有指定站点,默认使用全部站点
if not self.statistic_sites:
refresh_sites = all_sites
else:
refresh_sites = [
site for site in all_sites if site.get("id") in self.statistic_sites
]
if not refresh_sites:
return
# 并发刷新
with ThreadPool(min(len(refresh_sites), int(self._queue_cnt or 5))) as p:
p.map(self.__refresh_site_data, refresh_sites)
# 通知刷新完成
if self._notify:
notify_message = ""
for site, data in self._sites_data.items():
notify_message += f"------- ***{site}*** -------\n"
if data.get("is_official") == "":
notify_message += "***官种契约:***\n"
else:
notify_message += "***非官种契约:***\n"
notify_message += f'体积:{data.get("contract_size")},数量:{data.get("contract_num")},周期:{data.get("contract_duration")}\n'
notify_message += "***保种情况:***\n"
notify_message += f'保种总体积:{data.get("total_seed_size")},数量:{data.get("total_seed_num")}\n'
notify_message += f'官种体积:{data.get("official_seed_size")},数量:{data.get("official_seed_num")}\n'
if data.get("duration_gap") == 0:
notify_message += "契约已完成,恭喜!!\n\n"
else:
if data.get("is_satisfied"):
notify_message += f"***已满足***契约要求\n"
else:
notify_message += f'***未满足***契约要求,需增加保种体积{data.get("size_gap")},需增保种数量:{data.get("num_gap")}\n'
notify_message += (
f'剩余契约时间***{data.get("duration_gap")}天***\n\n'
)
self.post_message(
mtype=NotificationType.SiteMessage,
title=f"【保种契约检查】",
text=notify_message,
)
logger.info("站点数据刷新完成")
def __custom_sites(self) -> List[Any]:
custom_sites = []
custom_sites_config = self.get_config("CustomSites")
if custom_sites_config and custom_sites_config.get("enabled"):
custom_sites = custom_sites_config.get("sites")
return custom_sites
def __update_config(self):
self.update_config(
{
"enabled": self._enabled,
"onlyonce": self._onlyonce,
"cron": self._cron,
"notify": self._notify,
"queue_cnt": self._queue_cnt,
"contract_infos": self._contract_infos,
}
)

View File

@@ -0,0 +1,317 @@
# -*- coding: utf-8 -*-
import json
import re
from abc import ABCMeta, abstractmethod
from enum import Enum
from typing import Optional
from urllib.parse import urljoin, urlsplit
from requests import Session
from lxml import etree
from app.core.config import settings
from app.helper.cloudflare import under_challenge
from app.log import logger
from app.utils.http import RequestUtils
from app.utils.site import SiteUtils
SITE_BASE_ORDER = 1000
# 站点框架
class SiteSchema(Enum):
DiscuzX = "Discuz!"
Gazelle = "Gazelle"
Ipt = "IPTorrents"
NexusPhp = "NexusPhp"
NexusProject = "NexusProject"
NexusRabbit = "NexusRabbit"
NexusHhanclub = "NexusHhanclub"
SmallHorse = "Small Horse"
Unit3d = "Unit3d"
TorrentLeech = "TorrentLeech"
FileList = "FileList"
TNode = "TNode"
NexusTtg = "NexusTtg"
class ISiteUserInfo(metaclass=ABCMeta):
# 站点模版
schema = SiteSchema.NexusPhp
# 站点解析时判断顺序,值越小越先解析
order = SITE_BASE_ORDER
def __init__(self, site_name: str,
url: str,
site_cookie: str,
index_html: str,
session: Session = None,
ua: str = None,
emulate: bool = False,
proxy: bool = None):
super().__init__()
# 站点信息
self.site_name = None
self.site_url = None
# 用户信息
self.userid = None
# 种子标题,种子大小
self.torrent_title_size = []
# 种子总大小 (数量,大小)
self.total_seeding_size = [0, 0]
# 官种总大小 (数量,大小)
self.official_seeding_size = [0, 0]
# 站点官组
self.official_team = {
"观众": ["Audies", "ADE", "ADWeb", "ADAudio", "ADeBook", "ADMusic"],
"UBits": ["UBits"],
"听听歌": ["TTG", "WiKi", "DoA", "NGB", "ARiN"],
"馒头": ["MTeam", "MTeamTV"],
}
# 错误信息
self.err_msg = None
# 内部数据
self._base_url = None
self._site_cookie = None
self._index_html = None
self._addition_headers = None
# 站点页面
self._user_detail_page = "userdetails.php?id="
self._torrent_seeding_page = "getusertorrentlistajax.php?userid="
self._torrent_seeding_params = None
self._torrent_seeding_headers = None
split_url = urlsplit(url)
self.site_name = site_name
self.site_url = url
self._base_url = f"{split_url.scheme}://{split_url.netloc}"
self._site_cookie = site_cookie
self._index_html = index_html
self._session = session if session else None
self._ua = ua
self._emulate = emulate
self._proxy = proxy
def site_schema(self) -> SiteSchema:
"""
站点解析模型
:return: 站点解析模型
"""
return self.schema
@classmethod
def match(cls, html_text: str) -> bool:
"""
是否匹配当前解析模型
:param html_text: 站点首页html
:return: 是否匹配
"""
pass
# 用于契约检查插件获取保种信息
def parse_official_seeding_info(self):
"""
解析站点保种信息
:return:
"""
if not self._parse_logged_in(self._index_html):
return
self._parse_site_page(self._index_html)
# 某些站点已统计官种,直接解析
if self.site_name == "憨憨":
seeding_size = self._get_page_content(
urljoin(
self._base_url,
f"getusertorrentlistajax.php?userid={self.userid}&type=size",
)
)
if seeding_size:
seeding_size = json.loads(seeding_size)
self.total_seeding_size = (
seeding_size.get("total_count", 0),
self._size_to_byte(seeding_size.get("total_size", 0)),
)
self.official_seeding_size = (
seeding_size.get("total_official_count", 0),
self._size_to_byte(seeding_size.get("total_official_size", 0)),
)
else:
logger.error(f"获取官种信息失败")
elif self.site_name == "春天":
html_text = self._get_page_content(
urljoin(
self._base_url,
f"getusertorrentlistajax.php?userid={self.userid}&type=seeding",
)
)
html = etree.HTML(html_text)
if not html:
return
total_num = int(html.xpath('//body[1]/b[1]/text()')[0])
total_size = html.xpath('//body[1]/b[2]/text()')
official_num = int(html.xpath('//body[1]/b[3]/text()')[0])
official_size = html.xpath('//body[1]/b[4]/text()')
self.total_seeding_size = (total_num if total_num else 0, self._size_to_byte(total_size[0]) if total_size else 0)
self.official_seeding_size = (official_num if official_num else 0, self._size_to_byte(official_size[0]) if official_size else 0)
else:
self._parse_seeding_pages()
if len(self.torrent_title_size) == 0:
logger.error(f"{self.site_name}:获取种子信息失败")
return
total_num = 0
total_size = 0
official_num = 0
official_size = 0
for torrent in self.torrent_title_size:
self.total_seeding_size[0] += 1
self.total_seeding_size[1] += torrent[1]
if any(team in torrent[0] for team in self.official_team.get(self.site_name, [])):
self.official_seeding_size[0] += 1
self.official_seeding_size[1] += torrent[1]
logger.info(f"{self.site_name} 官种信息 {self.official_seeding_size} 总种信息 {self.total_seeding_size}")
# 将各种格式大小统一转为Byte
def _size_to_byte(self, size: str) -> float:
if str is None:
return 0
if size.endswith("TB"):
return float(size[:-2]) * 1024 * 1024 * 1024 * 1024
if size.endswith("GB"):
return float(size[:-2]) * 1024 * 1024 * 1024
elif size.endswith("MB"):
return float(size[:-2]) * 1024 * 1024
elif size.endswith("KB"):
return float(size[:-2]) * 1024
elif size.endswith("B"):
return float(size[:-1])
else:
return 0
def _parse_seeding_pages(self):
if self._torrent_seeding_page:
# 处理特殊站点
if self.site_name == "听听歌":
self._torrent_seeding_page = self._user_detail_page
elif self.site_name == "馒头":
self._torrent_seeding_page = f"getusertorrentlist.php?userid={self.userid}&type=seeding"
# 第一页
next_page = self._parse_user_torrent_seeding_info(
self._get_page_content(urljoin(self._base_url, self._torrent_seeding_page),
self._torrent_seeding_params,
self._torrent_seeding_headers))
# 其他页处理
while next_page:
next_page = self._parse_user_torrent_seeding_info(
self._get_page_content(urljoin(urljoin(self._base_url, self._torrent_seeding_page), next_page),
self._torrent_seeding_params,
self._torrent_seeding_headers),
multi_page=True)
@staticmethod
def _prepare_html_text(html_text):
"""
处理掉HTML中的干扰部分
"""
return re.sub(r"#\d+", "", re.sub(r"\d+px", "", html_text))
def _get_page_content(self, url: str, params: dict = None, headers: dict = None):
"""
:param url: 网页地址
:param params: post参数
:param headers: 额外的请求头
:return:
"""
req_headers = None
proxies = settings.PROXY if self._proxy else None
if self._ua or headers or self._addition_headers:
req_headers = {}
if headers:
req_headers.update(headers)
req_headers.update({
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
"User-Agent": f"{self._ua}"
})
if self._addition_headers:
req_headers.update(self._addition_headers)
if params:
res = RequestUtils(cookies=self._site_cookie,
session=self._session,
timeout=60,
proxies=proxies,
headers=req_headers).post_res(url=url, data=params)
else:
res = RequestUtils(cookies=self._site_cookie,
session=self._session,
timeout=60,
proxies=proxies,
headers=req_headers).get_res(url=url)
if res is not None and res.status_code in (200, 500, 403):
# 如果cloudflare 有防护,尝试使用浏览器仿真
if under_challenge(res.text):
logger.warn(
f"{self.site_name} 检测到Cloudflare请更新Cookie和UA")
return ""
if re.search(r"charset=\"?utf-8\"?", res.text, re.IGNORECASE):
res.encoding = "utf-8"
else:
res.encoding = res.apparent_encoding
return res.text
return ""
@abstractmethod
def _parse_site_page(self, html_text: str):
"""
解析站点相关信息页面
:param html_text:
:return:
"""
pass
def _parse_logged_in(self, html_text):
"""
解析用户是否已经登陆
:param html_text:
:return: True/False
"""
logged_in = SiteUtils.is_logged_in(html_text)
if not logged_in:
self.err_msg = "未检测到已登陆请检查cookies是否过期"
logger.warn(f"{self.site_name} 未登录,跳过后续操作")
return logged_in
@abstractmethod
def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]:
"""
解析用户的做种相关信息
:param html_text:
:param multi_page: 是否多页数据
:return: 下页地址
"""
pass
def to_dict(self):
"""
转化为字典
"""
attributes = [
attr for attr in dir(self)
if not callable(getattr(self, attr)) and not attr.startswith("_")
]
return {
attr: getattr(self, attr).value
if isinstance(getattr(self, attr), SiteSchema)
else getattr(self, attr) for attr in attributes
}

View File

@@ -0,0 +1,95 @@
# -*- coding: utf-8 -*-
import re
from typing import Optional
from lxml import etree
from app.log import logger
from app.plugins.contractcheck.siteuserinfo import ISiteUserInfo, SITE_BASE_ORDER, SiteSchema
from app.utils.string import StringUtils
class NexusPhpSiteUserInfo(ISiteUserInfo):
schema = SiteSchema.NexusPhp
order = SITE_BASE_ORDER * 2
@classmethod
def match(cls, html_text: str) -> bool:
"""
默认使用NexusPhp解析
:param html_text:
:return:
"""
return True
def _parse_site_page(self, html_text: str):
html_text = self._prepare_html_text(html_text)
user_detail = re.search(r"userdetails.php\?id=(\d+)", html_text)
if user_detail and user_detail.group().strip():
self._user_detail_page = user_detail.group().strip().lstrip('/')
self.userid = user_detail.group(1)
self._torrent_seeding_page = f"getusertorrentlistajax.php?userid={self.userid}&type=seeding"
else:
user_detail = re.search(r"(userdetails)", html_text)
if user_detail and user_detail.group().strip():
self._user_detail_page = user_detail.group().strip().lstrip('/')
self.userid = None
self._torrent_seeding_page = None
def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]:
"""
做种相关信息
:param html_text:
:param multi_page: 是否多页数据
:return: 下页地址
"""
html = etree.HTML(str(html_text).replace(r'\/', '/'))
if not html:
return None
# 首页存在扩展链接,使用扩展链接
seeding_url_text = html.xpath('//a[contains(@href,"torrents.php") '
'and contains(@href,"seeding")]/@href')
if multi_page is False and seeding_url_text and seeding_url_text[0].strip():
self._torrent_seeding_page = seeding_url_text[0].strip()
return self._torrent_seeding_page
title_col = 2
size_col = 3
seeders_col = 4
# 搜索size列
size_col_xpath = '//tr[position()=1]/' \
'td[(img[@class="size"] and img[@alt="size"])' \
' or (text() = "大小")' \
' or (a/img[@class="size" and @alt="size"])]'
if html.xpath(size_col_xpath):
size_col = len(html.xpath(f'{size_col_xpath}/preceding-sibling::td')) + 1
# 搜索title列
title_col_xpath = '//tr[position()=1]/' \
'td[(text() = "标题")]'
if html.xpath(title_col_xpath):
title_col = len(html.xpath(f'{title_col_xpath}/preceding-sibling::td')) + 1
page_torrent_info = []
# 如果 table class="torrents"则增加table[@class="torrents"]
table_class = '//table[@class="torrents"]' if html.xpath('//table[@class="torrents"]') else ''
seeding_sizes = html.xpath(f'{table_class}//tr[position()>1]/td[{size_col}]')
seeding_torrents = html.xpath(f'{table_class}//tr[position()>1]/td[{title_col}]/a/@title')
if seeding_sizes:
for i in range(0, len(seeding_sizes)):
size = StringUtils.num_filesize(seeding_sizes[i].xpath("string(.)").strip())
page_torrent_info.append([seeding_torrents[i], size])
self.torrent_title_size.extend(page_torrent_info)
# 是否存在下页数据
next_page = None
next_page_text = html.xpath('//a[contains(.//text(), "下一页") or contains(.//text(), "下一頁")]/@href')
if next_page_text:
next_page = next_page_text[-1].strip()
# fix up page url
if self.userid not in next_page:
next_page = f'{next_page}&userid={self.userid}&type=seeding'
return next_page

View File

@@ -0,0 +1,47 @@
# -*- coding: utf-8 -*-
import re
from lxml import etree
from typing import Optional
from app.plugins.contractcheck.siteuserinfo import SITE_BASE_ORDER, SiteSchema
from app.plugins.contractcheck.siteuserinfo.nexus_php import NexusPhpSiteUserInfo
from app.utils.string import StringUtils
class NexusTtgSiteUserInfo(NexusPhpSiteUserInfo):
schema = SiteSchema.NexusTtg
order = SITE_BASE_ORDER + 20
@classmethod
def match(cls, html_text: str) -> bool:
return 'totheglory.im' in html_text
def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]:
"""
做种相关信息
:param html_text:
:param multi_page: 是否多页数据
:return: 下页地址
"""
html = etree.HTML(str(html_text).replace(r'\/', '/'))
if not html:
return None
title_col = 2
size_col = 4
page_torrent_info = []
table_class = '//div[@id="ka2"]/table'
seeding_sizes = html.xpath(f'{table_class}//tr[position()>1]/td[{size_col}]')
seeding_torrents = html.xpath(f'{table_class}//tr[position()>1]/td[{title_col}]/a/b/text()')
if seeding_sizes:
for i in range(0, len(seeding_sizes)):
size = StringUtils.num_filesize(seeding_sizes[i].xpath("string(.)").strip())
page_torrent_info.append([seeding_torrents[i], size])
self.torrent_title_size.extend(page_torrent_info)
# 不存在下页数据
return False

View File

@@ -9,6 +9,9 @@ from dotenv import set_key
from app.core.module import ModuleManager
from app.scheduler import Scheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime, timedelta
import pytz
class IpDetect(_PluginBase):
@@ -19,7 +22,7 @@ class IpDetect(_PluginBase):
# 插件图标
plugin_icon = "ipAddress.png"
# 插件版本
plugin_version = "1.0"
plugin_version = "1.1"
# 插件作者
plugin_author = "DzAvril"
# 作者主页
@@ -45,9 +48,11 @@ class IpDetect(_PluginBase):
_onlyonce = False
_cron = ""
_setting_keys = []
_scheduler = None
def init_plugin(self, config: dict = None):
logger.info(f"Hello IpDetect, config {config}")
self.stop_service()
if config:
self._enabled = config.get("enabled")
self._onlyonce = config.get("onlyonce")
@@ -116,25 +121,40 @@ class IpDetect(_PluginBase):
self.__update_config()
logger.info(f"_setting_keys: {self._setting_keys}")
if self._onlyonce:
# 定时任务
self._scheduler = BackgroundScheduler(timezone=settings.TZ)
logger.info(f"本地IP检测服务启动立即运行一次")
self._scheduler.add_job(
self.detect_ip,
"date",
run_date=datetime.now(tz=pytz.timezone(settings.TZ))
+ timedelta(seconds=3),
)
self._onlyonce = False
self.__update_config()
self.detect_ip()
# 启动任务
if self._scheduler.get_jobs():
self._scheduler.print_jobs()
self._scheduler.start()
def __update_config(self):
self.update_config({
"enabled": self._enabled,
"onlyonce": self._onlyonce,
"cron": self._cron,
"notify": self._notify,
"enable_qb": self._enable_qb,
"enable_tr": self._enable_tr,
"enable_emby": self._enable_emby,
"enable_emby_play": self._enable_emby_play,
"enable_jellyfin": self._enable_jellyfin,
"enable_jellyfin_play": self._enable_jellyfin_play,
"enable_plex": self._enable_plex,
"enable_plex_play": self._enable_plex_play,
})
self.update_config(
{
"enabled": self._enabled,
"onlyonce": self._onlyonce,
"cron": self._cron,
"notify": self._notify,
"enable_qb": self._enable_qb,
"enable_tr": self._enable_tr,
"enable_emby": self._enable_emby,
"enable_emby_play": self._enable_emby_play,
"enable_jellyfin": self._enable_jellyfin,
"enable_jellyfin_play": self._enable_jellyfin_play,
"enable_plex": self._enable_plex,
"enable_plex_play": self._enable_plex_play,
}
)
def get_state(self) -> bool:
return self._enabled
@@ -185,8 +205,8 @@ class IpDetect(_PluginBase):
old_value,
)
else: # ip:port
ip_pattern = r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}):(\d+)'
v = re.sub(ip_pattern, r'{}:\2'.format(v), old_value)
ip_pattern = r"(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}):(\d+)"
v = re.sub(ip_pattern, r"{}:\2".format(v), old_value)
if hasattr(settings, k):
if v == "None":
v = None
@@ -219,7 +239,7 @@ class IpDetect(_PluginBase):
return None
def parse_ip(self, ip):
ip_pattern = r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})'
ip_pattern = r"(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})"
match = re.search(ip_pattern, ip)
if match:
return match.group(1)
@@ -476,7 +496,7 @@ class IpDetect(_PluginBase):
"text": "本插件针对部署在本地的服务如QB下载器、Emby服务等检测到本地IP变化时同步修改服务地址请勾选部署在本地的服务。",
},
}
]
],
},
{
"component": "VCol",
@@ -494,7 +514,6 @@ class IpDetect(_PluginBase):
}
],
},
],
},
],
@@ -517,4 +536,14 @@ class IpDetect(_PluginBase):
pass
def stop_service(self):
pass
"""
退出插件
"""
try:
if self._scheduler:
self._scheduler.remove_all_jobs()
if self._scheduler.running:
self._scheduler.shutdown()
self._scheduler = None
except Exception as e:
logger.error("退出插件失败:%s" % str(e))