Revert "fix 站点未读消息删除重复代码、依赖于[站点数据统计]插件"

This reverts commit b78767fc
This commit is contained in:
thsrite
2024-04-23 13:43:46 +08:00
parent 9eacfab9ea
commit cc8abcbd04
17 changed files with 2129 additions and 26 deletions

View File

@@ -10,7 +10,7 @@ MoviePilot三方插件市场https://github.com/thsrite/MoviePilot-Plugins/
### 插件新增
- 站点数据统计 1.4 (无未读消息版本)(废弃)
- 站点未读消息 1.7 (需要安装[站点数据统计插件]
- 站点未读消息 1.6
- [云盘Strm生成 3.7](docs%2FCloudStrm.md)
- [Strm文件模式转换 1.0](docs%2FStrmConvert.md)
- 清理订阅缓存 1.0

View File

@@ -28,12 +28,11 @@
"SiteUnreadMsg": {
"name": "站点未读消息",
"description": "发送站点未读消息。",
"version": "1.7",
"version": "1.6",
"icon": "Synomail_A.png",
"author": "thsrite",
"level": 2,
"history": {
"v1.7": "删除重复代码、依赖于[站点数据统计]插件",
"v1.6": "增加解析失败日志",
"v1.5": "修复馒头未读消息1",
"v1.4": "sync主仓库",

View File

@@ -37,7 +37,7 @@ class SiteUnreadMsg(_PluginBase):
# 插件图标
plugin_icon = "Synomail_A.png"
# 插件版本
plugin_version = "1.7"
plugin_version = "1.6"
# 插件作者
plugin_author = "thsrite"
# 作者主页
@@ -88,7 +88,7 @@ class SiteUnreadMsg(_PluginBase):
if self._enabled or self._onlyonce:
# 加载模块
self._site_schema = ModuleHelper.load('app.plugins.sitestatistic.siteuserinfo',
self._site_schema = ModuleHelper.load('app.plugins.siteunreadmsg.siteuserinfo',
filter_func=lambda _, obj: hasattr(obj, 'schema'))
# 定时服务
@@ -274,27 +274,6 @@ class SiteUnreadMsg(_PluginBase):
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VAlert',
'props': {
'type': 'info',
'variant': 'tonal',
'text': '依赖于[站点数据统计]插件。'
}
}
]
}
]
}
]
}

View File

@@ -0,0 +1,422 @@
# -*- coding: utf-8 -*-
import json
import re
from abc import ABCMeta, abstractmethod
from enum import Enum
from typing import Optional
from urllib.parse import urljoin, urlsplit
from requests import Session
from app.core.config import settings
from app.helper.cloudflare import under_challenge
from app.log import logger
from app.utils.http import RequestUtils
from app.utils.site import SiteUtils
SITE_BASE_ORDER = 1000
# 站点框架
class SiteSchema(Enum):
DiscuzX = "Discuz!"
Gazelle = "Gazelle"
Ipt = "IPTorrents"
NexusPhp = "NexusPhp"
NexusProject = "NexusProject"
NexusRabbit = "NexusRabbit"
NexusHhanclub = "NexusHhanclub"
SmallHorse = "Small Horse"
Unit3d = "Unit3d"
TorrentLeech = "TorrentLeech"
FileList = "FileList"
TNode = "TNode"
MTorrent = "MTorrent"
class ISiteUserInfo(metaclass=ABCMeta):
# 站点模版
schema = SiteSchema.NexusPhp
# 站点解析时判断顺序,值越小越先解析
order = SITE_BASE_ORDER
# 请求模式 cookie/apikey
request_mode = "cookie"
def __init__(self, site_name: str,
url: str,
site_cookie: str,
index_html: str,
session: Session = None,
ua: str = None,
emulate: bool = False,
proxy: bool = None):
super().__init__()
# 站点信息
self.site_name = None
self.site_url = None
# 用户信息
self.username = None
self.userid = None
# 未读消息
self.message_unread = 0
self.message_unread_contents = []
# 流量信息
self.upload = 0
self.download = 0
self.ratio = 0
# 种子信息
self.seeding = 0
self.leeching = 0
self.uploaded = 0
self.completed = 0
self.incomplete = 0
self.seeding_size = 0
self.leeching_size = 0
self.uploaded_size = 0
self.completed_size = 0
self.incomplete_size = 0
# 做种人数, 种子大小
self.seeding_info = []
# 用户详细信息
self._user_basic_page = None
self._user_basic_params = None
self._user_basic_headers = None
self.user_level = None
self.join_at = None
self.bonus = 0.0
# 错误信息
self.err_msg = None
# 内部数据
self._base_url = None
self._site_cookie = None
self._index_html = None
self._addition_headers = None
# 站点页面
self._brief_page = "index.php"
self._user_detail_page = "userdetails.php?id="
self._user_detail_params = None
self._user_detail_headers = None
self._user_traffic_page = "index.php"
self._user_traffic_params = None
self._user_traffic_headers = None
self._user_mail_unread_page = "messages.php?action=viewmailbox&box=1&unread=yes"
self._sys_mail_unread_page = "messages.php?action=viewmailbox&box=-2&unread=yes"
self._mail_unread_params = None
self._mail_unread_headers = None
self._mail_content_params = None
self._mail_content_headers = None
self._torrent_seeding_page = "getusertorrentlistajax.php?userid="
self._torrent_seeding_params = None
self._torrent_seeding_headers = None
split_url = urlsplit(url)
self.site_name = site_name
self.site_url = url
self.site_domain = split_url.netloc
self._base_url = f"{split_url.scheme}://{split_url.netloc}"
self._site_cookie = site_cookie
self._index_html = index_html
self._session = session if session else None
self._ua = ua
self._emulate = emulate
self._proxy = proxy
def site_schema(self) -> SiteSchema:
"""
站点解析模型
:return: 站点解析模型
"""
return self.schema
@classmethod
def match(cls, html_text: str) -> bool:
"""
是否匹配当前解析模型
:param html_text: 站点首页html
:return: 是否匹配
"""
pass
def parse(self):
"""
解析站点信息
:return:
"""
# 检查是否已经登录
if not self._parse_logged_in(self._index_html):
return
# 解析站点页面
self._parse_site_page(self._index_html)
# 解析用户基础信息
if self._user_basic_page:
self._parse_user_base_info(
self._get_page_content(
url=urljoin(self._base_url, self._user_basic_page),
params=self._user_basic_params,
headers=self._user_basic_headers
)
)
else:
self._parse_user_base_info(self._index_html)
# 解析用户详细信息
if self._user_detail_page:
self._parse_user_detail_info(
self._get_page_content(
url=urljoin(self._base_url, self._user_detail_page),
params=self._user_detail_params,
headers=self._user_detail_headers
)
)
# 解析用户未读消息
self._pase_unread_msgs()
# 解析用户上传、下载、分享率等信息
if self._user_traffic_page:
self._parse_user_traffic_info(
self._get_page_content(
url=urljoin(self._base_url, self._user_traffic_page),
params=self._user_traffic_params,
headers=self._user_traffic_headers
)
)
# 解析用户做种信息
self._parse_seeding_pages()
self.seeding_info = json.dumps(self.seeding_info)
def _pase_unread_msgs(self):
"""
解析所有未读消息标题和内容
:return:
"""
unread_msg_links = []
if self.message_unread > 0:
links = {self._user_mail_unread_page, self._sys_mail_unread_page}
for link in links:
if not link:
continue
msg_links = []
next_page = self._parse_message_unread_links(
self._get_page_content(
url=urljoin(self._base_url, link),
params=self._mail_unread_params,
headers=self._mail_unread_headers
),
msg_links)
while next_page:
next_page = self._parse_message_unread_links(
self._get_page_content(
url=urljoin(self._base_url, next_page),
params=self._mail_unread_params,
headers=self._mail_unread_headers
),
msg_links
)
unread_msg_links.extend(msg_links)
# 解析未读消息内容
for msg_link in unread_msg_links:
logger.debug(f"{self.site_name} 信息链接 {msg_link}")
head, date, content = self._parse_message_content(
self._get_page_content(
urljoin(self._base_url, msg_link),
params=self._mail_content_params,
headers=self._mail_content_headers
)
)
logger.debug(f"{self.site_name} 标题 {head} 时间 {date} 内容 {content}")
self.message_unread_contents.append((head, date, content))
def _parse_seeding_pages(self):
"""
解析做种页面
"""
if self._torrent_seeding_page:
# 第一页
next_page = self._parse_user_torrent_seeding_info(
self._get_page_content(
url=urljoin(self._base_url, self._torrent_seeding_page),
params=self._torrent_seeding_params,
headers=self._torrent_seeding_headers
)
)
# 其他页处理
while next_page is not None and next_page is not False:
next_page = self._parse_user_torrent_seeding_info(
self._get_page_content(
url=urljoin(urljoin(self._base_url, self._torrent_seeding_page), next_page),
params=self._torrent_seeding_params,
headers=self._torrent_seeding_headers
),
multi_page=True)
@staticmethod
def _prepare_html_text(html_text):
"""
处理掉HTML中的干扰部分
"""
return re.sub(r"#\d+", "", re.sub(r"\d+px", "", html_text))
@abstractmethod
def _parse_message_unread_links(self, html_text: str, msg_links: list) -> Optional[str]:
"""
获取未阅读消息链接
:param html_text:
:return:
"""
pass
def _get_page_content(self, url: str, params: dict = None, headers: dict = None):
"""
:param url: 网页地址
:param params: post参数
:param headers: 额外的请求头
:return:
"""
req_headers = None
proxies = settings.PROXY if self._proxy else None
if self._ua or headers or self._addition_headers:
req_headers = {
"User-Agent": f"{self._ua}"
}
if headers:
req_headers.update(headers)
else:
req_headers.update({
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
})
if self._addition_headers:
req_headers.update(self._addition_headers)
if self.request_mode == "apikey":
# 使用apikey请求通过请求头传递
cookie = None
session = None
else:
# 使用cookie请求
cookie = self._site_cookie
session = self._session
if params:
if req_headers.get("Content-Type") == "application/json":
res = RequestUtils(cookies=cookie,
session=session,
timeout=60,
proxies=proxies,
headers=req_headers).post_res(url=url, json=params)
else:
res = RequestUtils(cookies=cookie,
session=session,
timeout=60,
proxies=proxies,
headers=req_headers).post_res(url=url, data=params)
else:
res = RequestUtils(cookies=cookie,
session=session,
timeout=60,
proxies=proxies,
headers=req_headers).get_res(url=url)
if res is not None and res.status_code in (200, 500, 403):
if req_headers and "application/json" in str(req_headers.get("Accept")):
return json.dumps(res.json())
else:
# 如果cloudflare 有防护,尝试使用浏览器仿真
if under_challenge(res.text):
logger.warn(
f"{self.site_name} 检测到Cloudflare请更新Cookie和UA")
return ""
if re.search(r"charset=\"?utf-8\"?", res.text, re.IGNORECASE):
res.encoding = "utf-8"
else:
res.encoding = res.apparent_encoding
return res.text
return ""
@abstractmethod
def _parse_site_page(self, html_text: str):
"""
解析站点相关信息页面
:param html_text:
:return:
"""
pass
@abstractmethod
def _parse_user_base_info(self, html_text: str):
"""
解析用户基础信息
:param html_text:
:return:
"""
pass
def _parse_logged_in(self, html_text):
"""
解析用户是否已经登陆
:param html_text:
:return: True/False
"""
logged_in = SiteUtils.is_logged_in(html_text)
if not logged_in:
self.err_msg = "未检测到已登陆请检查cookies是否过期"
logger.warn(f"{self.site_name} 未登录,跳过后续操作")
return logged_in
@abstractmethod
def _parse_user_traffic_info(self, html_text: str):
"""
解析用户的上传,下载,分享率等信息
:param html_text:
:return:
"""
pass
@abstractmethod
def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]:
"""
解析用户的做种相关信息
:param html_text:
:param multi_page: 是否多页数据
:return: 下页地址
"""
pass
@abstractmethod
def _parse_user_detail_info(self, html_text: str):
"""
解析用户的详细信息
加入时间/等级/魔力值等
:param html_text:
:return:
"""
pass
@abstractmethod
def _parse_message_content(self, html_text):
"""
解析短消息内容
:param html_text:
:return: head: message, date: time, content: message content
"""
pass
def to_dict(self):
"""
转化为字典
"""
attributes = [
attr for attr in dir(self)
if not callable(getattr(self, attr)) and not attr.startswith("_")
]
return {
attr: getattr(self, attr).value
if isinstance(getattr(self, attr), SiteSchema)
else getattr(self, attr) for attr in attributes
}

View File

@@ -0,0 +1,139 @@
# -*- coding: utf-8 -*-
import re
from typing import Optional
from lxml import etree
from app.plugins.sitestatistic.siteuserinfo import ISiteUserInfo, SITE_BASE_ORDER, SiteSchema
from app.utils.string import StringUtils
class DiscuzUserInfo(ISiteUserInfo):
schema = SiteSchema.DiscuzX
order = SITE_BASE_ORDER + 10
@classmethod
def match(cls, html_text: str) -> bool:
html = etree.HTML(html_text)
if not html:
return False
printable_text = html.xpath("string(.)") if html else ""
return 'Powered by Discuz!' in printable_text
def _parse_user_base_info(self, html_text: str):
html_text = self._prepare_html_text(html_text)
html = etree.HTML(html_text)
user_info = html.xpath('//a[contains(@href, "&uid=")]')
if user_info:
user_id_match = re.search(r"&uid=(\d+)", user_info[0].attrib['href'])
if user_id_match and user_id_match.group().strip():
self.userid = user_id_match.group(1)
self._torrent_seeding_page = f"forum.php?&mod=torrents&cat_5up=on"
self._user_detail_page = user_info[0].attrib['href']
self.username = user_info[0].text.strip()
def _parse_site_page(self, html_text: str):
# TODO
pass
def _parse_user_detail_info(self, html_text: str):
"""
解析用户额外信息,加入时间,等级
:param html_text:
:return:
"""
html = etree.HTML(html_text)
if not html:
return None
# 用户等级
user_levels_text = html.xpath('//a[contains(@href, "usergroup")]/text()')
if user_levels_text:
self.user_level = user_levels_text[-1].strip()
# 加入日期
join_at_text = html.xpath('//li[em[text()="注册时间"]]/text()')
if join_at_text:
self.join_at = StringUtils.unify_datetime_str(join_at_text[0].strip())
# 分享率
ratio_text = html.xpath('//li[contains(.//text(), "分享率")]//text()')
if ratio_text:
ratio_match = re.search(r"\(([\d,.]+)\)", ratio_text[0])
if ratio_match and ratio_match.group(1).strip():
self.bonus = StringUtils.str_float(ratio_match.group(1))
# 积分
bouns_text = html.xpath('//li[em[text()="积分"]]/text()')
if bouns_text:
self.bonus = StringUtils.str_float(bouns_text[0].strip())
# 上传
upload_text = html.xpath('//li[em[contains(text(),"上传量")]]/text()')
if upload_text:
self.upload = StringUtils.num_filesize(upload_text[0].strip().split('/')[-1])
# 下载
download_text = html.xpath('//li[em[contains(text(),"下载量")]]/text()')
if download_text:
self.download = StringUtils.num_filesize(download_text[0].strip().split('/')[-1])
def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]:
"""
做种相关信息
:param html_text:
:param multi_page: 是否多页数据
:return: 下页地址
"""
html = etree.HTML(html_text)
if not html:
return None
size_col = 3
seeders_col = 4
# 搜索size列
if html.xpath('//tr[position()=1]/td[.//img[@class="size"] and .//img[@alt="size"]]'):
size_col = len(html.xpath('//tr[position()=1]/td[.//img[@class="size"] '
'and .//img[@alt="size"]]/preceding-sibling::td')) + 1
# 搜索seeders列
if html.xpath('//tr[position()=1]/td[.//img[@class="seeders"] and .//img[@alt="seeders"]]'):
seeders_col = len(html.xpath('//tr[position()=1]/td[.//img[@class="seeders"] '
'and .//img[@alt="seeders"]]/preceding-sibling::td')) + 1
page_seeding = 0
page_seeding_size = 0
page_seeding_info = []
seeding_sizes = html.xpath(f'//tr[position()>1]/td[{size_col}]')
seeding_seeders = html.xpath(f'//tr[position()>1]/td[{seeders_col}]//text()')
if seeding_sizes and seeding_seeders:
page_seeding = len(seeding_sizes)
for i in range(0, len(seeding_sizes)):
size = StringUtils.num_filesize(seeding_sizes[i].xpath("string(.)").strip())
seeders = StringUtils.str_int(seeding_seeders[i])
page_seeding_size += size
page_seeding_info.append([seeders, size])
self.seeding += page_seeding
self.seeding_size += page_seeding_size
self.seeding_info.extend(page_seeding_info)
# 是否存在下页数据
next_page = None
next_page_text = html.xpath('//a[contains(.//text(), "下一页") or contains(.//text(), "下一頁")]/@href')
if next_page_text:
next_page = next_page_text[-1].strip()
return next_page
def _parse_user_traffic_info(self, html_text: str):
pass
def _parse_message_unread_links(self, html_text: str, msg_links: list) -> Optional[str]:
return None
def _parse_message_content(self, html_text):
return None, None, None

View File

@@ -0,0 +1,118 @@
# -*- coding: utf-8 -*-
import re
from typing import Optional
from lxml import etree
from app.plugins.sitestatistic.siteuserinfo import ISiteUserInfo, SITE_BASE_ORDER, SiteSchema
from app.utils.string import StringUtils
class FileListSiteUserInfo(ISiteUserInfo):
schema = SiteSchema.FileList
order = SITE_BASE_ORDER + 50
@classmethod
def match(cls, html_text: str) -> bool:
html = etree.HTML(html_text)
if not html:
return False
printable_text = html.xpath("string(.)") if html else ""
return 'Powered by FileList' in printable_text
def _parse_site_page(self, html_text: str):
html_text = self._prepare_html_text(html_text)
user_detail = re.search(r"userdetails.php\?id=(\d+)", html_text)
if user_detail and user_detail.group().strip():
self._user_detail_page = user_detail.group().strip().lstrip('/')
self.userid = user_detail.group(1)
self._torrent_seeding_page = f"snatchlist.php?id={self.userid}&action=torrents&type=seeding"
def _parse_user_base_info(self, html_text: str):
html_text = self._prepare_html_text(html_text)
html = etree.HTML(html_text)
ret = html.xpath(f'//a[contains(@href, "userdetails") and contains(@href, "{self.userid}")]//text()')
if ret:
self.username = str(ret[0])
def _parse_user_traffic_info(self, html_text: str):
"""
上传/下载/分享率 [做种数/魔力值]
:param html_text:
:return:
"""
return
def _parse_user_detail_info(self, html_text: str):
html_text = self._prepare_html_text(html_text)
html = etree.HTML(html_text)
upload_html = html.xpath('//table//tr/td[text()="Uploaded"]/following-sibling::td//text()')
if upload_html:
self.upload = StringUtils.num_filesize(upload_html[0])
download_html = html.xpath('//table//tr/td[text()="Downloaded"]/following-sibling::td//text()')
if download_html:
self.download = StringUtils.num_filesize(download_html[0])
self.ratio = 0 if self.download == 0 else self.upload / self.download
user_level_html = html.xpath('//table//tr/td[text()="Class"]/following-sibling::td//text()')
if user_level_html:
self.user_level = user_level_html[0].strip()
join_at_html = html.xpath('//table//tr/td[contains(text(), "Join")]/following-sibling::td//text()')
if join_at_html:
self.join_at = StringUtils.unify_datetime_str(join_at_html[0].strip())
bonus_html = html.xpath('//a[contains(@href, "shop.php")]')
if bonus_html:
self.bonus = StringUtils.str_float(bonus_html[0].xpath("string(.)").strip())
pass
def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]:
"""
做种相关信息
:param html_text:
:param multi_page: 是否多页数据
:return: 下页地址
"""
html = etree.HTML(html_text)
if not html:
return None
size_col = 6
seeders_col = 7
page_seeding = 0
page_seeding_size = 0
page_seeding_info = []
seeding_sizes = html.xpath(f'//table/tr[position()>1]/td[{size_col}]')
seeding_seeders = html.xpath(f'//table/tr[position()>1]/td[{seeders_col}]')
if seeding_sizes and seeding_seeders:
page_seeding = len(seeding_sizes)
for i in range(0, len(seeding_sizes)):
size = StringUtils.num_filesize(seeding_sizes[i].xpath("string(.)").strip())
seeders = StringUtils.str_int(seeding_seeders[i].xpath("string(.)").strip())
page_seeding_size += size
page_seeding_info.append([seeders, size])
self.seeding += page_seeding
self.seeding_size += page_seeding_size
self.seeding_info.extend(page_seeding_info)
# 是否存在下页数据
next_page = None
return next_page
def _parse_message_unread_links(self, html_text: str, msg_links: list) -> Optional[str]:
return None
def _parse_message_content(self, html_text):
return None, None, None

View File

@@ -0,0 +1,163 @@
# -*- coding: utf-8 -*-
import re
from typing import Optional
from lxml import etree
from app.plugins.sitestatistic.siteuserinfo import ISiteUserInfo, SITE_BASE_ORDER, SiteSchema
from app.utils.string import StringUtils
class GazelleSiteUserInfo(ISiteUserInfo):
schema = SiteSchema.Gazelle
order = SITE_BASE_ORDER
@classmethod
def match(cls, html_text: str) -> bool:
html = etree.HTML(html_text)
if not html:
return False
printable_text = html.xpath("string(.)") if html else ""
return "Powered by Gazelle" in printable_text or "DIC Music" in printable_text
def _parse_user_base_info(self, html_text: str):
html_text = self._prepare_html_text(html_text)
html = etree.HTML(html_text)
tmps = html.xpath('//a[contains(@href, "user.php?id=")]')
if tmps:
user_id_match = re.search(r"user.php\?id=(\d+)", tmps[0].attrib['href'])
if user_id_match and user_id_match.group().strip():
self.userid = user_id_match.group(1)
self._torrent_seeding_page = f"torrents.php?type=seeding&userid={self.userid}"
self._user_detail_page = f"user.php?id={self.userid}"
self.username = tmps[0].text.strip()
tmps = html.xpath('//*[@id="header-uploaded-value"]/@data-value')
if tmps:
self.upload = StringUtils.num_filesize(tmps[0])
else:
tmps = html.xpath('//li[@id="stats_seeding"]/span/text()')
if tmps:
self.upload = StringUtils.num_filesize(tmps[0])
tmps = html.xpath('//*[@id="header-downloaded-value"]/@data-value')
if tmps:
self.download = StringUtils.num_filesize(tmps[0])
else:
tmps = html.xpath('//li[@id="stats_leeching"]/span/text()')
if tmps:
self.download = StringUtils.num_filesize(tmps[0])
self.ratio = 0.0 if self.download <= 0.0 else round(self.upload / self.download, 3)
tmps = html.xpath('//a[contains(@href, "bonus.php")]/@data-tooltip')
if tmps:
bonus_match = re.search(r"([\d,.]+)", tmps[0])
if bonus_match and bonus_match.group(1).strip():
self.bonus = StringUtils.str_float(bonus_match.group(1))
else:
tmps = html.xpath('//a[contains(@href, "bonus.php")]')
if tmps:
bonus_text = tmps[0].xpath("string(.)")
bonus_match = re.search(r"([\d,.]+)", bonus_text)
if bonus_match and bonus_match.group(1).strip():
self.bonus = StringUtils.str_float(bonus_match.group(1))
def _parse_site_page(self, html_text: str):
# TODO
pass
def _parse_user_detail_info(self, html_text: str):
"""
解析用户额外信息,加入时间,等级
:param html_text:
:return:
"""
html = etree.HTML(html_text)
if not html:
return None
# 用户等级
user_levels_text = html.xpath('//*[@id="class-value"]/@data-value')
if user_levels_text:
self.user_level = user_levels_text[0].strip()
else:
user_levels_text = html.xpath('//li[contains(text(), "用户等级")]/text()')
if user_levels_text:
self.user_level = user_levels_text[0].split(':')[1].strip()
# 加入日期
join_at_text = html.xpath('//*[@id="join-date-value"]/@data-value')
if join_at_text:
self.join_at = StringUtils.unify_datetime_str(join_at_text[0].strip())
else:
join_at_text = html.xpath(
'//div[contains(@class, "box_userinfo_stats")]//li[contains(text(), "加入时间")]/span/text()')
if join_at_text:
self.join_at = StringUtils.unify_datetime_str(join_at_text[0].strip())
def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]:
"""
做种相关信息
:param html_text:
:param multi_page: 是否多页数据
:return: 下页地址
"""
html = etree.HTML(html_text)
if not html:
return None
size_col = 3
# 搜索size列
if html.xpath('//table[contains(@id, "torrent")]//tr[1]/td'):
size_col = len(html.xpath('//table[contains(@id, "torrent")]//tr[1]/td')) - 3
# 搜索seeders列
seeders_col = size_col + 2
page_seeding = 0
page_seeding_size = 0
page_seeding_info = []
seeding_sizes = html.xpath(f'//table[contains(@id, "torrent")]//tr[position()>1]/td[{size_col}]')
seeding_seeders = html.xpath(f'//table[contains(@id, "torrent")]//tr[position()>1]/td[{seeders_col}]/text()')
if seeding_sizes and seeding_seeders:
page_seeding = len(seeding_sizes)
for i in range(0, len(seeding_sizes)):
size = StringUtils.num_filesize(seeding_sizes[i].xpath("string(.)").strip())
seeders = int(seeding_seeders[i])
page_seeding_size += size
page_seeding_info.append([seeders, size])
if multi_page:
self.seeding += page_seeding
self.seeding_size += page_seeding_size
self.seeding_info.extend(page_seeding_info)
else:
if not self.seeding:
self.seeding = page_seeding
if not self.seeding_size:
self.seeding_size = page_seeding_size
if not self.seeding_info:
self.seeding_info = page_seeding_info
# 是否存在下页数据
next_page = None
next_page_text = html.xpath('//a[contains(.//text(), "Next") or contains(.//text(), "下一页")]/@href')
if next_page_text:
next_page = next_page_text[-1].strip()
return next_page
def _parse_user_traffic_info(self, html_text: str):
# TODO
pass
def _parse_message_unread_links(self, html_text: str, msg_links: list) -> Optional[str]:
return None
def _parse_message_content(self, html_text):
return None, None, None

View File

@@ -0,0 +1,93 @@
# -*- coding: utf-8 -*-
import re
from typing import Optional
from lxml import etree
from app.plugins.sitestatistic.siteuserinfo import ISiteUserInfo, SITE_BASE_ORDER, SiteSchema
from app.utils.string import StringUtils
class IptSiteUserInfo(ISiteUserInfo):
schema = SiteSchema.Ipt
order = SITE_BASE_ORDER + 35
@classmethod
def match(cls, html_text: str) -> bool:
return 'IPTorrents' in html_text
def _parse_user_base_info(self, html_text: str):
html_text = self._prepare_html_text(html_text)
html = etree.HTML(html_text)
tmps = html.xpath('//a[contains(@href, "/u/")]//text()')
tmps_id = html.xpath('//a[contains(@href, "/u/")]/@href')
if tmps:
self.username = str(tmps[-1])
if tmps_id:
user_id_match = re.search(r"/u/(\d+)", tmps_id[0])
if user_id_match and user_id_match.group().strip():
self.userid = user_id_match.group(1)
self._user_detail_page = f"user.php?u={self.userid}"
self._torrent_seeding_page = f"peers?u={self.userid}"
tmps = html.xpath('//div[@class = "stats"]/div/div')
if tmps:
self.upload = StringUtils.num_filesize(str(tmps[0].xpath('span/text()')[1]).strip())
self.download = StringUtils.num_filesize(str(tmps[0].xpath('span/text()')[2]).strip())
self.seeding = StringUtils.str_int(tmps[0].xpath('a')[2].xpath('text()')[0])
self.leeching = StringUtils.str_int(tmps[0].xpath('a')[2].xpath('text()')[1])
self.ratio = StringUtils.str_float(str(tmps[0].xpath('span/text()')[0]).strip().replace('-', '0'))
self.bonus = StringUtils.str_float(tmps[0].xpath('a')[3].xpath('text()')[0])
def _parse_site_page(self, html_text: str):
# TODO
pass
def _parse_user_detail_info(self, html_text: str):
html = etree.HTML(html_text)
if not html:
return
user_levels_text = html.xpath('//tr/th[text()="Class"]/following-sibling::td[1]/text()')
if user_levels_text:
self.user_level = user_levels_text[0].strip()
# 加入日期
join_at_text = html.xpath('//tr/th[text()="Join date"]/following-sibling::td[1]/text()')
if join_at_text:
self.join_at = StringUtils.unify_datetime_str(join_at_text[0].split(' (')[0])
def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]:
html = etree.HTML(html_text)
if not html:
return
# seeding start
seeding_end_pos = 3
if html.xpath('//tr/td[text() = "Leechers"]'):
seeding_end_pos = len(html.xpath('//tr/td[text() = "Leechers"]/../preceding-sibling::tr')) + 1
seeding_end_pos = seeding_end_pos - 3
page_seeding = 0
page_seeding_size = 0
seeding_torrents = html.xpath('//tr/td[text() = "Seeders"]/../following-sibling::tr/td[position()=6]/text()')
if seeding_torrents:
page_seeding = seeding_end_pos
for per_size in seeding_torrents[:seeding_end_pos]:
if '(' in per_size and ')' in per_size:
per_size = per_size.split('(')[-1]
per_size = per_size.split(')')[0]
page_seeding_size += StringUtils.num_filesize(per_size)
self.seeding = page_seeding
self.seeding_size = page_seeding_size
def _parse_user_traffic_info(self, html_text: str):
# TODO
pass
def _parse_message_unread_links(self, html_text: str, msg_links: list) -> Optional[str]:
return None
def _parse_message_content(self, html_text):
return None, None, None

View File

@@ -0,0 +1,198 @@
# -*- coding: utf-8 -*-
import json
from typing import Optional, Tuple
from urllib.parse import urljoin
from lxml import etree
from app.log import logger
from app.db.systemconfig_oper import SystemConfigOper
from app.plugins.sitestatistic.siteuserinfo import ISiteUserInfo, SITE_BASE_ORDER, SiteSchema
from app.utils.string import StringUtils
class MTorrentSiteUserInfo(ISiteUserInfo):
schema = SiteSchema.MTorrent
order = SITE_BASE_ORDER + 60
request_mode = "apikey"
# 用户级别字典
MTeam_sysRoleList = {
"1": "User",
"2": "Power User",
"3": "Elite User",
"4": "Crazy User",
"5": "Insane User",
"6": "Veteran User",
"7": "Extreme User",
"8": "Ultimate User",
"9": "Nexus Master",
"10": "VIP",
"11": "Retiree",
"12": "Uploader",
"13": "Moderator",
"14": "Administrator",
"15": "Sysop",
"16": "Staff",
"17": "Offer memberStaff",
"18": "Bet memberStaff",
}
@classmethod
def match(cls, html_text: str) -> bool:
html = etree.HTML(html_text)
if not html:
return False
if html.xpath("//title/text()") and "M-Team" in html.xpath("//title/text()")[0]:
return True
return False
def _parse_site_page(self, html_text: str):
"""
获取站点页面地址
"""
self._user_traffic_page = None
self._user_detail_page = None
self._user_basic_page = "api/member/profile"
self._user_basic_params = {
"uid": self.userid
}
self._sys_mail_unread_page = None
self._user_mail_unread_page = "api/msg/search"
self._mail_unread_params = {
"keyword": "",
"box": "-2",
"type": "pageNumber",
"pageSize": 100
}
self._torrent_seeding_page = "api/member/getUserTorrentList"
domain = StringUtils.get_url_host(self.site_url)
self._torrent_seeding_headers = {
"Content-Type": "application/json",
"Accept": "application/json, text/plain, */*"
}
self._addition_headers = {
"x-api-key": SystemConfigOper().get(f"site.{domain}.apikey"),
}
def _parse_logged_in(self, html_text):
"""
判断是否登录成功, 通过判断是否存在用户信息
暂时跳过检测,待后续优化
:param html_text:
:return:
"""
return True
def _parse_user_base_info(self, html_text: str):
"""
解析用户基本信息这里把_parse_user_traffic_info和_parse_user_detail_info合并到这里
"""
if not html_text:
return None
detail = json.loads(html_text)
if not detail or detail.get("code") != "0":
return
user_info = detail.get("data", {})
self.userid = user_info.get("id")
self.username = user_info.get("username")
self.user_level = self.MTeam_sysRoleList.get(user_info.get("role") or "1")
self.join_at = user_info.get("memberStatus", {}).get("createdDate")
self.upload = int(user_info.get("memberCount", {}).get("uploaded") or '0')
self.download = int(user_info.get("memberCount", {}).get("downloaded") or '0')
self.ratio = user_info.get("memberCount", {}).get("shareRate") or 0
self.bonus = user_info.get("memberCount", {}).get("bonus") or 0
self.message_unread = 0
self._torrent_seeding_params = {
"pageNumber": 1,
"pageSize": 200,
"type": "SEEDING",
"userid": self.userid
}
def _parse_user_traffic_info(self, html_text: str):
"""
解析用户流量信息
"""
pass
def _parse_user_detail_info(self, html_text: str):
"""
解析用户详细信息
"""
pass
def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]:
"""
解析用户做种信息
"""
if not html_text:
return None
seeding_info = json.loads(html_text)
if not seeding_info or seeding_info.get("code") != "0":
return None
torrents = seeding_info.get("data", {}).get("data", [])
page_seeding_size = 0
page_seeding_info = []
for info in torrents:
torrent = info.get("torrent", {})
size = int(torrent.get("size") or '0')
seeders = int(torrent.get("source") or '0')
page_seeding_size += size
page_seeding_info.append([seeders, size])
self.seeding += len(torrents)
self.seeding_size += page_seeding_size
self.seeding_info.extend(page_seeding_info)
# 查询总做种数
seeder_count = 0
try:
result = self._get_page_content(
url=urljoin(self.site_url, "api/tracker/myPeerStatus"),
params={"uid": self.userid},
)
if result:
seeder_info = json.loads(result)
seeder_count = int(seeder_info.get("data", {}).get("seeder") or 0)
except Exception as e:
logger.error(f"获取做种数失败: {str(e)}")
if not seeder_count:
return None
if self.seeding >= seeder_count:
return None
# 还有下一页
self._torrent_seeding_params["pageNumber"] += 1
return ""
def _parse_message_unread_links(self, html_text: str, msg_links: list) -> Optional[str]:
"""
解析未读消息链接,这里直接读出详情
"""
if not html_text:
return None
messages_info = json.loads(html_text)
if not messages_info or messages_info.get("code") != "0":
return None
messages = messages_info.get("data", {}).get("data", [])
for message in messages:
if not message.get("unread"):
continue
head = message.get("title")
date = message.get("createdDate")
content = message.get("context")
if head and date and content:
self.message_unread_contents.append((head, date, content))
# 设置已读
self._get_page_content(
url=urljoin(self.site_url, f"api/msg/markRead"),
params={"msgId": message.get("id")}
)
# 是否存在下页数据
return None
def _parse_message_content(self, html_text) -> Tuple[Optional[str], Optional[str], Optional[str]]:
"""
解析消息内容
"""
pass

View File

@@ -0,0 +1,61 @@
# -*- coding: utf-8 -*-
import re
from lxml import etree
from app.plugins.sitestatistic.siteuserinfo import SITE_BASE_ORDER, SiteSchema
from app.plugins.sitestatistic.siteuserinfo.nexus_php import NexusPhpSiteUserInfo
from app.utils.string import StringUtils
class NexusHhanclubSiteUserInfo(NexusPhpSiteUserInfo):
schema = SiteSchema.NexusHhanclub
order = SITE_BASE_ORDER + 20
@classmethod
def match(cls, html_text: str) -> bool:
return 'hhanclub.top' in html_text
def _parse_user_traffic_info(self, html_text):
super()._parse_user_traffic_info(html_text)
html_text = self._prepare_html_text(html_text)
html = etree.HTML(html_text)
# 上传、下载、分享率
upload_match = re.search(r"[_<>/a-zA-Z-=\"'\s#;]+([\d,.\s]+[KMGTPI]*B)",
html.xpath('//*[@id="user-info-panel"]/div[2]/div[2]/div[4]/text()')[0])
download_match = re.search(r"[_<>/a-zA-Z-=\"'\s#;]+([\d,.\s]+[KMGTPI]*B)",
html.xpath('//*[@id="user-info-panel"]/div[2]/div[2]/div[5]/text()')[0])
ratio_match = re.search(r"分享率][:_<>/a-zA-Z-=\"'\s#;]+([\d,.\s]+)",
html.xpath('//*[@id="user-info-panel"]/div[2]/div[1]/div[1]/div/text()')[0])
# 计算分享率
self.upload = StringUtils.num_filesize(upload_match.group(1).strip()) if upload_match else 0
self.download = StringUtils.num_filesize(download_match.group(1).strip()) if download_match else 0
# 优先使用页面上的分享率
calc_ratio = 0.0 if self.download <= 0.0 else round(self.upload / self.download, 3)
self.ratio = StringUtils.str_float(ratio_match.group(1)) if (
ratio_match and ratio_match.group(1).strip()) else calc_ratio
def _parse_user_detail_info(self, html_text: str):
"""
解析用户额外信息,加入时间,等级
:param html_text:
:return:
"""
super()._parse_user_detail_info(html_text)
html = etree.HTML(html_text)
if not html:
return
# 加入时间
join_at_text = html.xpath('//*[@id="mainContent"]/div/div[2]/div[4]/div[3]/span[2]/text()[1]')
if join_at_text:
self.join_at = StringUtils.unify_datetime_str(join_at_text[0].split(' (')[0].strip())
def _get_user_level(self, html):
super()._get_user_level(html)
user_level_path = html.xpath('//*[@id="mainContent"]/div/div[2]/div[2]/div[4]/span[2]/img/@title')
if user_level_path:
self.user_level = user_level_path[0]

View File

@@ -0,0 +1,398 @@
# -*- coding: utf-8 -*-
import re
from typing import Optional
from lxml import etree
from app.log import logger
from app.plugins.sitestatistic.siteuserinfo import ISiteUserInfo, SITE_BASE_ORDER, SiteSchema
from app.utils.string import StringUtils
class NexusPhpSiteUserInfo(ISiteUserInfo):
schema = SiteSchema.NexusPhp
order = SITE_BASE_ORDER * 2
@classmethod
def match(cls, html_text: str) -> bool:
"""
默认使用NexusPhp解析
:param html_text:
:return:
"""
return True
def _parse_site_page(self, html_text: str):
html_text = self._prepare_html_text(html_text)
user_detail = re.search(r"userdetails.php\?id=(\d+)", html_text)
if user_detail and user_detail.group().strip():
self._user_detail_page = user_detail.group().strip().lstrip('/')
self.userid = user_detail.group(1)
self._torrent_seeding_page = f"getusertorrentlistajax.php?userid={self.userid}&type=seeding"
else:
user_detail = re.search(r"(userdetails)", html_text)
if user_detail and user_detail.group().strip():
self._user_detail_page = user_detail.group().strip().lstrip('/')
self.userid = None
self._torrent_seeding_page = None
def _parse_message_unread(self, html_text):
"""
解析未读短消息数量
:param html_text:
:return:
"""
html = etree.HTML(html_text)
if not html:
return
message_labels = html.xpath('//a[@href="messages.php"]/..')
message_labels.extend(html.xpath('//a[contains(@href, "messages.php")]/..'))
if message_labels:
message_text = message_labels[0].xpath("string(.)")
logger.debug(f"{self.site_name} 消息原始信息 {message_text}")
message_unread_match = re.findall(r"[^Date](信息箱\s*|\(|你有\xa0)(\d+)", message_text)
if message_unread_match and len(message_unread_match[-1]) == 2:
self.message_unread = StringUtils.str_int(message_unread_match[-1][1])
elif message_text.isdigit():
self.message_unread = StringUtils.str_int(message_text)
def _parse_user_base_info(self, html_text: str):
"""
解析用户基本信息
"""
# 合并解析,减少额外请求调用
self._parse_user_traffic_info(html_text)
self._user_traffic_page = None
self._parse_message_unread(html_text)
html = etree.HTML(html_text)
if not html:
return
ret = html.xpath(f'//a[contains(@href, "userdetails") and contains(@href, "{self.userid}")]//b//text()')
if ret:
self.username = str(ret[0])
return
ret = html.xpath(f'//a[contains(@href, "userdetails") and contains(@href, "{self.userid}")]//text()')
if ret:
self.username = str(ret[0])
ret = html.xpath('//a[contains(@href, "userdetails")]//strong//text()')
if ret:
self.username = str(ret[0])
return
def _parse_user_traffic_info(self, html_text):
"""
解析用户流量信息
"""
html_text = self._prepare_html_text(html_text)
upload_match = re.search(r"[^总]上[传傳]量?[:_<>/a-zA-Z-=\"'\s#;]+([\d,.\s]+[KMGTPI]*B)", html_text,
re.IGNORECASE)
self.upload = StringUtils.num_filesize(upload_match.group(1).strip()) if upload_match else 0
download_match = re.search(r"[^总子影力]下[载載]量?[:_<>/a-zA-Z-=\"'\s#;]+([\d,.\s]+[KMGTPI]*B)", html_text,
re.IGNORECASE)
self.download = StringUtils.num_filesize(download_match.group(1).strip()) if download_match else 0
ratio_match = re.search(r"分享率[:_<>/a-zA-Z-=\"'\s#;]+([\d,.\s]+)", html_text)
# 计算分享率
calc_ratio = 0.0 if self.download <= 0.0 else round(self.upload / self.download, 3)
# 优先使用页面上的分享率
self.ratio = StringUtils.str_float(ratio_match.group(1)) if (
ratio_match and ratio_match.group(1).strip()) else calc_ratio
leeching_match = re.search(r"(Torrents leeching|下载中)[\u4E00-\u9FA5\D\s]+(\d+)[\s\S]+<", html_text)
self.leeching = StringUtils.str_int(leeching_match.group(2)) if leeching_match and leeching_match.group(
2).strip() else 0
html = etree.HTML(html_text)
has_ucoin, self.bonus = self._parse_ucoin(html)
if has_ucoin:
return
tmps = html.xpath('//a[contains(@href,"mybonus")]/text()') if html else None
if tmps:
bonus_text = str(tmps[0]).strip()
bonus_match = re.search(r"([\d,.]+)", bonus_text)
if bonus_match and bonus_match.group(1).strip():
self.bonus = StringUtils.str_float(bonus_match.group(1))
return
bonus_match = re.search(r"mybonus.[\[\]:<>/a-zA-Z_\-=\"'\s#;.(使用魔力值豆]+\s*([\d,.]+)[<()&\s]", html_text)
try:
if bonus_match and bonus_match.group(1).strip():
self.bonus = StringUtils.str_float(bonus_match.group(1))
return
bonus_match = re.search(r"[魔力值|\]][\[\]:<>/a-zA-Z_\-=\"'\s#;]+\s*([\d,.]+|\"[\d,.]+\")[<>()&\s]",
html_text,
flags=re.S)
if bonus_match and bonus_match.group(1).strip():
self.bonus = StringUtils.str_float(bonus_match.group(1).strip('"'))
except Exception as err:
logger.error(f"{self.site_name} 解析魔力值出错, 错误信息: {str(err)}")
@staticmethod
def _parse_ucoin(html):
"""
解析ucoin, 统一转换为铜币
:param html:
:return:
"""
if html:
gold, silver, copper = None, None, None
golds = html.xpath('//span[@class = "ucoin-symbol ucoin-gold"]//text()')
if golds:
gold = StringUtils.str_float(str(golds[-1]))
silvers = html.xpath('//span[@class = "ucoin-symbol ucoin-silver"]//text()')
if silvers:
silver = StringUtils.str_float(str(silvers[-1]))
coppers = html.xpath('//span[@class = "ucoin-symbol ucoin-copper"]//text()')
if coppers:
copper = StringUtils.str_float(str(coppers[-1]))
if gold or silver or copper:
gold = gold if gold else 0
silver = silver if silver else 0
copper = copper if copper else 0
return True, gold * 100 * 100 + silver * 100 + copper
return False, 0.0
def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]:
"""
做种相关信息
:param html_text:
:param multi_page: 是否多页数据
:return: 下页地址
"""
html = etree.HTML(str(html_text).replace(r'\/', '/'))
if not html:
return None
# 首页存在扩展链接,使用扩展链接
seeding_url_text = html.xpath('//a[contains(@href,"torrents.php") '
'and contains(@href,"seeding")]/@href')
if multi_page is False and seeding_url_text and seeding_url_text[0].strip():
self._torrent_seeding_page = seeding_url_text[0].strip()
return self._torrent_seeding_page
size_col = 3
seeders_col = 4
# 搜索size列
size_col_xpath = '//tr[position()=1]/' \
'td[(img[@class="size"] and img[@alt="size"])' \
' or (text() = "大小")' \
' or (a/img[@class="size" and @alt="size"])]'
if html.xpath(size_col_xpath):
size_col = len(html.xpath(f'{size_col_xpath}/preceding-sibling::td')) + 1
# 搜索seeders列
seeders_col_xpath = '//tr[position()=1]/' \
'td[(img[@class="seeders"] and img[@alt="seeders"])' \
' or (text() = "在做种")' \
' or (a/img[@class="seeders" and @alt="seeders"])]'
if html.xpath(seeders_col_xpath):
seeders_col = len(html.xpath(f'{seeders_col_xpath}/preceding-sibling::td')) + 1
page_seeding = 0
page_seeding_size = 0
page_seeding_info = []
# 如果 table class="torrents"则增加table[@class="torrents"]
table_class = '//table[@class="torrents"]' if html.xpath('//table[@class="torrents"]') else ''
seeding_sizes = html.xpath(f'{table_class}//tr[position()>1]/td[{size_col}]')
seeding_seeders = html.xpath(f'{table_class}//tr[position()>1]/td[{seeders_col}]/b/a/text()')
if not seeding_seeders:
seeding_seeders = html.xpath(f'{table_class}//tr[position()>1]/td[{seeders_col}]//text()')
if seeding_sizes and seeding_seeders:
page_seeding = len(seeding_sizes)
for i in range(0, len(seeding_sizes)):
size = StringUtils.num_filesize(seeding_sizes[i].xpath("string(.)").strip())
seeders = StringUtils.str_int(seeding_seeders[i])
page_seeding_size += size
page_seeding_info.append([seeders, size])
self.seeding += page_seeding
self.seeding_size += page_seeding_size
self.seeding_info.extend(page_seeding_info)
# 是否存在下页数据
next_page = None
next_page_text = html.xpath('//a[contains(.//text(), "下一页") or contains(.//text(), "下一頁") or contains(.//text(), ">")]/@href')
if next_page_text:
next_page = next_page_text[-1].strip()
# fix up page url
if self.userid not in next_page:
next_page = f'{next_page}&userid={self.userid}&type=seeding'
return next_page
def _parse_user_detail_info(self, html_text: str):
"""
解析用户额外信息,加入时间,等级
:param html_text:
:return:
"""
html = etree.HTML(html_text)
if not html:
return
self._get_user_level(html)
self._fixup_traffic_info(html)
# 加入日期
join_at_text = html.xpath(
'//tr/td[text()="加入日期" or text()="注册日期" or *[text()="加入日期"]]/following-sibling::td[1]//text()'
'|//div/b[text()="加入日期"]/../text()')
if join_at_text:
self.join_at = StringUtils.unify_datetime_str(join_at_text[0].split(' (')[0].strip())
# 做种体积 & 做种数
# seeding 页面获取不到的话,此处再获取一次
seeding_sizes = html.xpath('//tr/td[text()="当前上传"]/following-sibling::td[1]//'
'table[tr[1][td[4 and text()="尺寸"]]]//tr[position()>1]/td[4]')
seeding_seeders = html.xpath('//tr/td[text()="当前上传"]/following-sibling::td[1]//'
'table[tr[1][td[5 and text()="做种者"]]]//tr[position()>1]/td[5]//text()')
tmp_seeding = len(seeding_sizes)
tmp_seeding_size = 0
tmp_seeding_info = []
for i in range(0, len(seeding_sizes)):
size = StringUtils.num_filesize(seeding_sizes[i].xpath("string(.)").strip())
seeders = StringUtils.str_int(seeding_seeders[i])
tmp_seeding_size += size
tmp_seeding_info.append([seeders, size])
if not self.seeding_size:
self.seeding_size = tmp_seeding_size
if not self.seeding:
self.seeding = tmp_seeding
if not self.seeding_info:
self.seeding_info = tmp_seeding_info
seeding_sizes = html.xpath('//tr/td[text()="做种统计"]/following-sibling::td[1]//text()')
if seeding_sizes:
seeding_match = re.search(r"总做种数:\s+(\d+)", seeding_sizes[0], re.IGNORECASE)
seeding_size_match = re.search(r"总做种体积:\s+([\d,.\s]+[KMGTPI]*B)", seeding_sizes[0], re.IGNORECASE)
tmp_seeding = StringUtils.str_int(seeding_match.group(1)) if (
seeding_match and seeding_match.group(1)) else 0
tmp_seeding_size = StringUtils.num_filesize(
seeding_size_match.group(1).strip()) if seeding_size_match else 0
if not self.seeding_size:
self.seeding_size = tmp_seeding_size
if not self.seeding:
self.seeding = tmp_seeding
self._fixup_torrent_seeding_page(html)
def _fixup_torrent_seeding_page(self, html):
"""
修正种子页面链接
:param html:
:return:
"""
# 单独的种子页面
seeding_url_text = html.xpath('//a[contains(@href,"getusertorrentlist.php") '
'and contains(@href,"seeding")]/@href')
if seeding_url_text:
self._torrent_seeding_page = seeding_url_text[0].strip()
# 从JS调用种获取用户ID
seeding_url_text = html.xpath('//a[contains(@href, "javascript: getusertorrentlistajax") '
'and contains(@href,"seeding")]/@href')
csrf_text = html.xpath('//meta[@name="x-csrf"]/@content')
if not self._torrent_seeding_page and seeding_url_text:
user_js = re.search(r"javascript: getusertorrentlistajax\(\s*'(\d+)", seeding_url_text[0])
if user_js and user_js.group(1).strip():
self.userid = user_js.group(1).strip()
self._torrent_seeding_page = f"getusertorrentlistajax.php?userid={self.userid}&type=seeding"
elif seeding_url_text and csrf_text:
if csrf_text[0].strip():
self._torrent_seeding_page \
= f"ajax_getusertorrentlist.php"
self._torrent_seeding_params = {'userid': self.userid, 'type': 'seeding', 'csrf': csrf_text[0].strip()}
# 分类做种模式
# 临时屏蔽
# seeding_url_text = html.xpath('//tr/td[text()="当前做种"]/following-sibling::td[1]'
# '/table//td/a[contains(@href,"seeding")]/@href')
# if seeding_url_text:
# self._torrent_seeding_page = seeding_url_text
def _get_user_level(self, html):
# 等级 获取同一行等级数据图片格式等级取title信息否则取文本信息
user_levels_text = html.xpath('//tr/td[text()="等級" or text()="等级" or *[text()="等级"]]/'
'following-sibling::td[1]/img[1]/@title')
if user_levels_text:
self.user_level = user_levels_text[0].strip()
return
user_levels_text = html.xpath('//tr/td[text()="等級" or text()="等级"]/'
'following-sibling::td[1 and not(img)]'
'|//tr/td[text()="等級" or text()="等级"]/'
'following-sibling::td[1 and img[not(@title)]]')
if user_levels_text:
self.user_level = user_levels_text[0].xpath("string(.)").strip()
return
user_levels_text = html.xpath('//tr/td[text()="等級" or text()="等级"]/'
'following-sibling::td[1]')
if user_levels_text:
self.user_level = user_levels_text[0].xpath("string(.)").strip()
return
user_levels_text = html.xpath('//a[contains(@href, "userdetails")]/text()')
if not self.user_level and user_levels_text:
for user_level_text in user_levels_text:
user_level_match = re.search(r"\[(.*)]", user_level_text)
if user_level_match and user_level_match.group(1).strip():
self.user_level = user_level_match.group(1).strip()
break
def _parse_message_unread_links(self, html_text: str, msg_links: list) -> Optional[str]:
html = etree.HTML(html_text)
if not html:
return None
message_links = html.xpath('//tr[not(./td/img[@alt="Read"])]/td/a[contains(@href, "viewmessage")]/@href')
msg_links.extend(message_links)
# 是否存在下页数据
next_page = None
next_page_text = html.xpath('//a[contains(.//text(), "下一页") or contains(.//text(), "下一頁")]/@href')
if next_page_text:
next_page = next_page_text[-1].strip()
return next_page
def _parse_message_content(self, html_text):
html = etree.HTML(html_text)
if not html:
return None, None, None
# 标题
message_head_text = None
message_head = html.xpath('//h1/text()'
'|//div[@class="layui-card-header"]/span[1]/text()')
if message_head:
message_head_text = message_head[-1].strip()
# 消息时间
message_date_text = None
message_date = html.xpath('//h1/following-sibling::table[.//tr/td[@class="colhead"]]//tr[2]/td[2]'
'|//div[@class="layui-card-header"]/span[2]/span[2]')
if message_date:
message_date_text = message_date[0].xpath("string(.)").strip()
# 消息内容
message_content_text = None
message_content = html.xpath('//h1/following-sibling::table[.//tr/td[@class="colhead"]]//tr[3]/td'
'|//div[contains(@class,"layui-card-body")]')
if message_content:
message_content_text = message_content[0].xpath("string(.)").strip()
return message_head_text, message_date_text, message_content_text
def _fixup_traffic_info(self, html):
# fixup bonus
if not self.bonus:
bonus_text = html.xpath('//tr/td[text()="魔力值" or text()="猫粮"]/following-sibling::td[1]/text()')
if bonus_text:
self.bonus = StringUtils.str_float(bonus_text[0].strip())

View File

@@ -0,0 +1,24 @@
# -*- coding: utf-8 -*-
import re
from app.plugins.sitestatistic.siteuserinfo import SITE_BASE_ORDER, SiteSchema
from app.plugins.sitestatistic.siteuserinfo.nexus_php import NexusPhpSiteUserInfo
class NexusProjectSiteUserInfo(NexusPhpSiteUserInfo):
schema = SiteSchema.NexusProject
order = SITE_BASE_ORDER + 25
@classmethod
def match(cls, html_text: str) -> bool:
return 'Nexus Project' in html_text
def _parse_site_page(self, html_text: str):
html_text = self._prepare_html_text(html_text)
user_detail = re.search(r"userdetails.php\?id=(\d+)", html_text)
if user_detail and user_detail.group().strip():
self._user_detail_page = user_detail.group().strip().lstrip('/')
self.userid = user_detail.group(1)
self._torrent_seeding_page = f"viewusertorrents.php?id={self.userid}&show=seeding"

View File

@@ -0,0 +1,57 @@
# -*- coding: utf-8 -*-
import json
from typing import Optional
from lxml import etree
from app.log import logger
from app.plugins.sitestatistic.siteuserinfo import SITE_BASE_ORDER, SiteSchema
from app.plugins.sitestatistic.siteuserinfo.nexus_php import NexusPhpSiteUserInfo
class NexusRabbitSiteUserInfo(NexusPhpSiteUserInfo):
schema = SiteSchema.NexusRabbit
order = SITE_BASE_ORDER + 5
@classmethod
def match(cls, html_text: str) -> bool:
html = etree.HTML(html_text)
if not html:
return False
printable_text = html.xpath("string(.)") if html else ""
return 'Style by Rabbit' in printable_text
def _parse_site_page(self, html_text: str):
super()._parse_site_page(html_text)
self._torrent_seeding_page = f"getusertorrentlistajax.php?page=1&limit=5000000&type=seeding&uid={self.userid}"
self._torrent_seeding_headers = {"Accept": "application/json, text/javascript, */*; q=0.01"}
def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]:
"""
做种相关信息
:param html_text:
:param multi_page: 是否多页数据
:return: 下页地址
"""
try:
torrents = json.loads(html_text).get('data')
except Exception as e:
logger.error(f"解析做种信息失败: {str(e)}")
return
page_seeding_size = 0
page_seeding_info = []
page_seeding = len(torrents)
for torrent in torrents:
seeders = int(torrent.get('seeders', 0))
size = int(torrent.get('size', 0))
page_seeding_size += int(torrent.get('size', 0))
page_seeding_info.append([seeders, size])
self.seeding += page_seeding
self.seeding_size += page_seeding_size
self.seeding_info.extend(page_seeding_info)

View File

@@ -0,0 +1,110 @@
# -*- coding: utf-8 -*-
import re
from typing import Optional
from lxml import etree
from app.plugins.sitestatistic.siteuserinfo import ISiteUserInfo, SITE_BASE_ORDER, SiteSchema
from app.utils.string import StringUtils
class SmallHorseSiteUserInfo(ISiteUserInfo):
schema = SiteSchema.SmallHorse
order = SITE_BASE_ORDER + 30
@classmethod
def match(cls, html_text: str) -> bool:
return 'Small Horse' in html_text
def _parse_site_page(self, html_text: str):
html_text = self._prepare_html_text(html_text)
user_detail = re.search(r"user.php\?id=(\d+)", html_text)
if user_detail and user_detail.group().strip():
self._user_detail_page = user_detail.group().strip().lstrip('/')
self.userid = user_detail.group(1)
self._torrent_seeding_page = f"torrents.php?type=seeding&userid={self.userid}"
self._user_traffic_page = f"user.php?id={self.userid}"
def _parse_user_base_info(self, html_text: str):
html_text = self._prepare_html_text(html_text)
html = etree.HTML(html_text)
ret = html.xpath('//a[contains(@href, "user.php")]//text()')
if ret:
self.username = str(ret[0])
def _parse_user_traffic_info(self, html_text: str):
"""
上传/下载/分享率 [做种数/魔力值]
:param html_text:
:return:
"""
html_text = self._prepare_html_text(html_text)
html = etree.HTML(html_text)
tmps = html.xpath('//ul[@class = "stats nobullet"]')
if tmps:
if tmps[1].xpath("li") and tmps[1].xpath("li")[0].xpath("span//text()"):
self.join_at = StringUtils.unify_datetime_str(tmps[1].xpath("li")[0].xpath("span//text()")[0])
self.upload = StringUtils.num_filesize(str(tmps[1].xpath("li")[2].xpath("text()")[0]).split(":")[1].strip())
self.download = StringUtils.num_filesize(
str(tmps[1].xpath("li")[3].xpath("text()")[0]).split(":")[1].strip())
if tmps[1].xpath("li")[4].xpath("span//text()"):
self.ratio = StringUtils.str_float(str(tmps[1].xpath("li")[4].xpath("span//text()")[0]).replace('', '0'))
else:
self.ratio = StringUtils.str_float(str(tmps[1].xpath("li")[5].xpath("text()")[0]).split(":")[1])
self.bonus = StringUtils.str_float(str(tmps[1].xpath("li")[5].xpath("text()")[0]).split(":")[1])
self.user_level = str(tmps[3].xpath("li")[0].xpath("text()")[0]).split(":")[1].strip()
self.leeching = StringUtils.str_int(
(tmps[4].xpath("li")[6].xpath("text()")[0]).split(":")[1].replace("[", ""))
def _parse_user_detail_info(self, html_text: str):
pass
def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]:
"""
做种相关信息
:param html_text:
:param multi_page: 是否多页数据
:return: 下页地址
"""
html = etree.HTML(html_text)
if not html:
return None
size_col = 6
seeders_col = 8
page_seeding = 0
page_seeding_size = 0
page_seeding_info = []
seeding_sizes = html.xpath(f'//table[@id="torrent_table"]//tr[position()>1]/td[{size_col}]')
seeding_seeders = html.xpath(f'//table[@id="torrent_table"]//tr[position()>1]/td[{seeders_col}]')
if seeding_sizes and seeding_seeders:
page_seeding = len(seeding_sizes)
for i in range(0, len(seeding_sizes)):
size = StringUtils.num_filesize(seeding_sizes[i].xpath("string(.)").strip())
seeders = StringUtils.str_int(seeding_seeders[i].xpath("string(.)").strip())
page_seeding_size += size
page_seeding_info.append([seeders, size])
self.seeding += page_seeding
self.seeding_size += page_seeding_size
self.seeding_info.extend(page_seeding_info)
# 是否存在下页数据
next_page = None
next_pages = html.xpath('//ul[@class="pagination"]/li[contains(@class,"active")]/following-sibling::li')
if next_pages and len(next_pages) > 1:
page_num = next_pages[0].xpath("string(.)").strip()
if page_num.isdigit():
next_page = f"{self._torrent_seeding_page}&page={page_num}"
return next_page
def _parse_message_unread_links(self, html_text: str, msg_links: list) -> Optional[str]:
return None
def _parse_message_content(self, html_text):
return None, None, None

View File

@@ -0,0 +1,103 @@
# -*- coding: utf-8 -*-
import json
import re
from typing import Optional
from app.plugins.sitestatistic.siteuserinfo import ISiteUserInfo, SITE_BASE_ORDER, SiteSchema
from app.utils.string import StringUtils
class TNodeSiteUserInfo(ISiteUserInfo):
schema = SiteSchema.TNode
order = SITE_BASE_ORDER + 60
@classmethod
def match(cls, html_text: str) -> bool:
return 'Powered By TNode' in html_text
def _parse_site_page(self, html_text: str):
html_text = self._prepare_html_text(html_text)
# <meta name="x-csrf-token" content="fd169876a7b4846f3a7a16fcd5cccf8d">
csrf_token = re.search(r'<meta name="x-csrf-token" content="(.+?)">', html_text)
if csrf_token:
self._addition_headers = {'X-CSRF-TOKEN': csrf_token.group(1)}
self._user_detail_page = "api/user/getMainInfo"
self._torrent_seeding_page = "api/user/listTorrentActivity?id=&type=seeding&page=1&size=20000"
def _parse_logged_in(self, html_text):
"""
判断是否登录成功, 通过判断是否存在用户信息
暂时跳过检测,待后续优化
:param html_text:
:return:
"""
return True
def _parse_user_base_info(self, html_text: str):
self.username = self.userid
def _parse_user_traffic_info(self, html_text: str):
pass
def _parse_user_detail_info(self, html_text: str):
detail = json.loads(html_text)
if detail.get("status") != 200:
return
user_info = detail.get("data", {})
self.userid = user_info.get("id")
self.username = user_info.get("username")
self.user_level = user_info.get("class", {}).get("name")
self.join_at = user_info.get("regTime", 0)
self.join_at = StringUtils.unify_datetime_str(str(self.join_at))
self.upload = user_info.get("upload")
self.download = user_info.get("download")
self.ratio = 0 if self.download <= 0 else round(self.upload / self.download, 3)
self.bonus = user_info.get("bonus")
self.message_unread = user_info.get("unreadAdmin", 0) + user_info.get("unreadInbox", 0) + user_info.get(
"unreadSystem", 0)
pass
def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]:
"""
解析用户做种信息
"""
seeding_info = json.loads(html_text)
if seeding_info.get("status") != 200:
return
torrents = seeding_info.get("data", {}).get("torrents", [])
page_seeding_size = 0
page_seeding_info = []
for torrent in torrents:
size = torrent.get("size", 0)
seeders = torrent.get("seeding", 0)
page_seeding_size += size
page_seeding_info.append([seeders, size])
self.seeding += len(torrents)
self.seeding_size += page_seeding_size
self.seeding_info.extend(page_seeding_info)
# 是否存在下页数据
next_page = None
return next_page
def _parse_message_unread_links(self, html_text: str, msg_links: list) -> Optional[str]:
return None
def _parse_message_content(self, html_text):
"""
系统信息 api/message/listSystem?page=1&size=20
收件箱信息 api/message/listInbox?page=1&size=20
管理员信息 api/message/listAdmin?page=1&size=20
:param html_text:
:return:
"""
return None, None, None

View File

@@ -0,0 +1,109 @@
# -*- coding: utf-8 -*-
import re
from typing import Optional
from lxml import etree
from app.plugins.sitestatistic.siteuserinfo import ISiteUserInfo, SITE_BASE_ORDER, SiteSchema
from app.utils.string import StringUtils
class TorrentLeechSiteUserInfo(ISiteUserInfo):
schema = SiteSchema.TorrentLeech
order = SITE_BASE_ORDER + 40
@classmethod
def match(cls, html_text: str) -> bool:
return 'TorrentLeech' in html_text
def _parse_site_page(self, html_text: str):
html_text = self._prepare_html_text(html_text)
user_detail = re.search(r"/profile/([^/]+)/", html_text)
if user_detail and user_detail.group().strip():
self._user_detail_page = user_detail.group().strip().lstrip('/')
self.userid = user_detail.group(1)
self._user_traffic_page = f"profile/{self.userid}/view"
self._torrent_seeding_page = f"profile/{self.userid}/seeding"
def _parse_user_base_info(self, html_text: str):
self.username = self.userid
def _parse_user_traffic_info(self, html_text: str):
"""
上传/下载/分享率 [做种数/魔力值]
:param html_text:
:return:
"""
html_text = self._prepare_html_text(html_text)
html = etree.HTML(html_text)
upload_html = html.xpath('//div[contains(@class,"profile-uploaded")]//span/text()')
if upload_html:
self.upload = StringUtils.num_filesize(upload_html[0])
download_html = html.xpath('//div[contains(@class,"profile-downloaded")]//span/text()')
if download_html:
self.download = StringUtils.num_filesize(download_html[0])
ratio_html = html.xpath('//div[contains(@class,"profile-ratio")]//span/text()')
if ratio_html:
self.ratio = StringUtils.str_float(ratio_html[0].replace('', '0'))
user_level_html = html.xpath('//table[contains(@class, "profileViewTable")]'
'//tr/td[text()="Class"]/following-sibling::td/text()')
if user_level_html:
self.user_level = user_level_html[0].strip()
join_at_html = html.xpath('//table[contains(@class, "profileViewTable")]'
'//tr/td[text()="Registration date"]/following-sibling::td/text()')
if join_at_html:
self.join_at = StringUtils.unify_datetime_str(join_at_html[0].strip())
bonus_html = html.xpath('//span[contains(@class, "total-TL-points")]/text()')
if bonus_html:
self.bonus = StringUtils.str_float(bonus_html[0].strip())
def _parse_user_detail_info(self, html_text: str):
pass
def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]:
"""
做种相关信息
:param html_text:
:param multi_page: 是否多页数据
:return: 下页地址
"""
html = etree.HTML(html_text)
if not html:
return None
size_col = 2
seeders_col = 7
page_seeding = 0
page_seeding_size = 0
page_seeding_info = []
seeding_sizes = html.xpath(f'//tbody/tr/td[{size_col}]')
seeding_seeders = html.xpath(f'//tbody/tr/td[{seeders_col}]/text()')
if seeding_sizes and seeding_seeders:
page_seeding = len(seeding_sizes)
for i in range(0, len(seeding_sizes)):
size = StringUtils.num_filesize(seeding_sizes[i].xpath("string(.)").strip())
seeders = StringUtils.str_int(seeding_seeders[i])
page_seeding_size += size
page_seeding_info.append([seeders, size])
self.seeding += page_seeding
self.seeding_size += page_seeding_size
self.seeding_info.extend(page_seeding_info)
# 是否存在下页数据
next_page = None
return next_page
def _parse_message_unread_links(self, html_text: str, msg_links: list) -> Optional[str]:
return None
def _parse_message_content(self, html_text):
return None, None, None

View File

@@ -0,0 +1,130 @@
# -*- coding: utf-8 -*-
import re
from typing import Optional
from lxml import etree
from app.plugins.sitestatistic.siteuserinfo import ISiteUserInfo, SITE_BASE_ORDER, SiteSchema
from app.utils.string import StringUtils
class Unit3dSiteUserInfo(ISiteUserInfo):
schema = SiteSchema.Unit3d
order = SITE_BASE_ORDER + 15
@classmethod
def match(cls, html_text: str) -> bool:
return "unit3d.js" in html_text
def _parse_user_base_info(self, html_text: str):
html_text = self._prepare_html_text(html_text)
html = etree.HTML(html_text)
tmps = html.xpath('//a[contains(@href, "/users/") and contains(@href, "settings")]/@href')
if tmps:
user_name_match = re.search(r"/users/(.+)/settings", tmps[0])
if user_name_match and user_name_match.group().strip():
self.username = user_name_match.group(1)
self._torrent_seeding_page = f"/users/{self.username}/active?perPage=100&client=&seeding=include"
self._user_detail_page = f"/users/{self.username}"
tmps = html.xpath('//a[contains(@href, "bonus/earnings")]')
if tmps:
bonus_text = tmps[0].xpath("string(.)")
bonus_match = re.search(r"([\d,.]+)", bonus_text)
if bonus_match and bonus_match.group(1).strip():
self.bonus = StringUtils.str_float(bonus_match.group(1))
def _parse_site_page(self, html_text: str):
# TODO
pass
def _parse_user_detail_info(self, html_text: str):
"""
解析用户额外信息,加入时间,等级
:param html_text:
:return:
"""
html = etree.HTML(html_text)
if not html:
return None
# 用户等级
user_levels_text = html.xpath('//div[contains(@class, "content")]//span[contains(@class, "badge-user")]/text()')
if user_levels_text:
self.user_level = user_levels_text[0].strip()
# 加入日期
join_at_text = html.xpath('//div[contains(@class, "content")]//h4[contains(text(), "注册日期") '
'or contains(text(), "註冊日期") '
'or contains(text(), "Registration date")]/text()')
if join_at_text:
self.join_at = StringUtils.unify_datetime_str(
join_at_text[0].replace('注册日期', '').replace('註冊日期', '').replace('Registration date', ''))
def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]:
"""
做种相关信息
:param html_text:
:param multi_page: 是否多页数据
:return: 下页地址
"""
html = etree.HTML(html_text)
if not html:
return None
size_col = 9
seeders_col = 2
# 搜索size列
if html.xpath('//thead//th[contains(@class,"size")]'):
size_col = len(html.xpath('//thead//th[contains(@class,"size")][1]/preceding-sibling::th')) + 1
# 搜索seeders列
if html.xpath('//thead//th[contains(@class,"seeders")]'):
seeders_col = len(html.xpath('//thead//th[contains(@class,"seeders")]/preceding-sibling::th')) + 1
page_seeding = 0
page_seeding_size = 0
page_seeding_info = []
seeding_sizes = html.xpath(f'//tr[position()]/td[{size_col}]')
seeding_seeders = html.xpath(f'//tr[position()]/td[{seeders_col}]')
if seeding_sizes and seeding_seeders:
page_seeding = len(seeding_sizes)
for i in range(0, len(seeding_sizes)):
size = StringUtils.num_filesize(seeding_sizes[i].xpath("string(.)").strip())
seeders = StringUtils.str_int(seeding_seeders[i].xpath("string(.)").strip())
page_seeding_size += size
page_seeding_info.append([seeders, size])
self.seeding += page_seeding
self.seeding_size += page_seeding_size
self.seeding_info.extend(page_seeding_info)
# 是否存在下页数据
next_page = None
next_pages = html.xpath('//ul[@class="pagination"]/li[contains(@class,"active")]/following-sibling::li')
if next_pages and len(next_pages) > 1:
page_num = next_pages[0].xpath("string(.)").strip()
if page_num.isdigit():
next_page = f"{self._torrent_seeding_page}&page={page_num}"
return next_page
def _parse_user_traffic_info(self, html_text: str):
html_text = self._prepare_html_text(html_text)
upload_match = re.search(r"[^总]上[传傳]量?[:_<>/a-zA-Z-=\"'\s#;]+([\d,.\s]+[KMGTPI]*B)", html_text,
re.IGNORECASE)
self.upload = StringUtils.num_filesize(upload_match.group(1).strip()) if upload_match else 0
download_match = re.search(r"[^总子影力]下[载載]量?[:_<>/a-zA-Z-=\"'\s#;]+([\d,.\s]+[KMGTPI]*B)", html_text,
re.IGNORECASE)
self.download = StringUtils.num_filesize(download_match.group(1).strip()) if download_match else 0
ratio_match = re.search(r"分享率[:_<>/a-zA-Z-=\"'\s#;]+([\d,.\s]+)", html_text)
self.ratio = StringUtils.str_float(ratio_match.group(1)) if (
ratio_match and ratio_match.group(1).strip()) else 0.0
def _parse_message_unread_links(self, html_text: str, msg_links: list) -> Optional[str]:
return None
def _parse_message_content(self, html_text):
return None, None, None