This commit is contained in:
thsrite
2023-11-21 20:40:30 +08:00
parent 22a4ee2177
commit 2f4447c818
26 changed files with 3241 additions and 0 deletions

160
.gitignore vendored Normal file
View File

@@ -0,0 +1,160 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

83
package.json Normal file
View File

@@ -0,0 +1,83 @@
{
"SiteStatisticNoMsg": {
"name": "站点数据统计",
"description": "自动统计和展示站点数据(无站点未读消息)。",
"version": "1.0",
"icon": "statistic.png",
"color": "#324A5E",
"author": "lightolly",
"level": 2
},
"CloudStrm": {
"name": "云盘strm生成",
"description": "监控文件创建生成strm文件。",
"version": "1.6",
"icon": "https://raw.githubusercontent.com/thsrite/MoviePilot-Plugin-Market/main/icons/cloudstrm.png",
"color": "#999999",
"author": "thsrite",
"level": 1
},
"SiteUnreadMsg": {
"name": "站点未读消息",
"description": "发送站点未读消息。",
"version": "1.2",
"icon": "https://raw.githubusercontent.com/thsrite/MoviePilot-Plugin-Market/main/icons/unread.png",
"color": "#4179F4",
"author": "thsrite",
"level": 2
},
"SubscribeClear": {
"name": "清理订阅缓存",
"description": "清理订阅已下载集数。",
"version": "1.0",
"icon": "https://raw.githubusercontent.com/thsrite/MoviePilot-Plugin-Market/main/icons/subscribeclear.png",
"color": "#80bef7",
"author": "thsrite",
"level": 2
},
"DownloadTorrent": {
"name": "下载种子",
"description": "选择下载器,添加种子任务。",
"version": "1.0",
"icon": "https://raw.githubusercontent.com/thsrite/MoviePilot-Plugin-Market/main/icons/download.png",
"color": "#f87878",
"author": "thsrite",
"level": 1
},
"SiteRemoveSafe": {
"name": "安全删除站点",
"description": "删除下载器中该站点辅种,保留该站点没有辅种的种子。",
"version": "1.0",
"icon": "https://raw.githubusercontent.com/thsrite/MoviePilot-Plugin-Market/main/icons/sitesafe.png",
"color": "#6bdd88",
"author": "thsrite",
"level": 1
},
"PluginAutoUpdate": {
"name": "插件自动更新",
"description": "监测已安装插件,自动更新最新版本。",
"version": "1.2",
"icon": "https://raw.githubusercontent.com/thsrite/MoviePilot-Plugin-Market/main/icons/pluginupdate.png",
"color": "#95eb95",
"author": "thsrite",
"level": 1
},
"PluginReInstall": {
"name": "插件强制重装",
"description": "卸载当前插件,强制重装。",
"version": "1.1",
"icon": "https://raw.githubusercontent.com/thsrite/MoviePilot-Plugin-Market/main/icons/reinstall.png",
"color": "#3c78d8",
"author": "thsrite",
"level": 1
},
"SynologyNotify": {
"name": "群辉Webhook通知",
"description": "接收群辉webhook通知并推送。",
"version": "1.1",
"icon": "https://raw.githubusercontent.com/thsrite/MoviePilot-Plugin-Market/main/icons/synology.png",
"color": "#adc8f7",
"author": "thsrite",
"level": 1
}
}

View File

@@ -0,0 +1,139 @@
# -*- coding: utf-8 -*-
import re
from typing import Optional
from lxml import etree
from app.plugins.sitestatistic.siteuserinfo import ISiteUserInfo, SITE_BASE_ORDER, SiteSchema
from app.utils.string import StringUtils
class DiscuzUserInfo(ISiteUserInfo):
schema = SiteSchema.DiscuzX
order = SITE_BASE_ORDER + 10
@classmethod
def match(cls, html_text: str) -> bool:
html = etree.HTML(html_text)
if not html:
return False
printable_text = html.xpath("string(.)") if html else ""
return 'Powered by Discuz!' in printable_text
def _parse_user_base_info(self, html_text: str):
html_text = self._prepare_html_text(html_text)
html = etree.HTML(html_text)
user_info = html.xpath('//a[contains(@href, "&uid=")]')
if user_info:
user_id_match = re.search(r"&uid=(\d+)", user_info[0].attrib['href'])
if user_id_match and user_id_match.group().strip():
self.userid = user_id_match.group(1)
self._torrent_seeding_page = f"forum.php?&mod=torrents&cat_5up=on"
self._user_detail_page = user_info[0].attrib['href']
self.username = user_info[0].text.strip()
def _parse_site_page(self, html_text: str):
# TODO
pass
def _parse_user_detail_info(self, html_text: str):
"""
解析用户额外信息,加入时间,等级
:param html_text:
:return:
"""
html = etree.HTML(html_text)
if not html:
return None
# 用户等级
user_levels_text = html.xpath('//a[contains(@href, "usergroup")]/text()')
if user_levels_text:
self.user_level = user_levels_text[-1].strip()
# 加入日期
join_at_text = html.xpath('//li[em[text()="注册时间"]]/text()')
if join_at_text:
self.join_at = StringUtils.unify_datetime_str(join_at_text[0].strip())
# 分享率
ratio_text = html.xpath('//li[contains(.//text(), "分享率")]//text()')
if ratio_text:
ratio_match = re.search(r"\(([\d,.]+)\)", ratio_text[0])
if ratio_match and ratio_match.group(1).strip():
self.bonus = StringUtils.str_float(ratio_match.group(1))
# 积分
bouns_text = html.xpath('//li[em[text()="积分"]]/text()')
if bouns_text:
self.bonus = StringUtils.str_float(bouns_text[0].strip())
# 上传
upload_text = html.xpath('//li[em[contains(text(),"上传量")]]/text()')
if upload_text:
self.upload = StringUtils.num_filesize(upload_text[0].strip().split('/')[-1])
# 下载
download_text = html.xpath('//li[em[contains(text(),"下载量")]]/text()')
if download_text:
self.download = StringUtils.num_filesize(download_text[0].strip().split('/')[-1])
def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]:
"""
做种相关信息
:param html_text:
:param multi_page: 是否多页数据
:return: 下页地址
"""
html = etree.HTML(html_text)
if not html:
return None
size_col = 3
seeders_col = 4
# 搜索size列
if html.xpath('//tr[position()=1]/td[.//img[@class="size"] and .//img[@alt="size"]]'):
size_col = len(html.xpath('//tr[position()=1]/td[.//img[@class="size"] '
'and .//img[@alt="size"]]/preceding-sibling::td')) + 1
# 搜索seeders列
if html.xpath('//tr[position()=1]/td[.//img[@class="seeders"] and .//img[@alt="seeders"]]'):
seeders_col = len(html.xpath('//tr[position()=1]/td[.//img[@class="seeders"] '
'and .//img[@alt="seeders"]]/preceding-sibling::td')) + 1
page_seeding = 0
page_seeding_size = 0
page_seeding_info = []
seeding_sizes = html.xpath(f'//tr[position()>1]/td[{size_col}]')
seeding_seeders = html.xpath(f'//tr[position()>1]/td[{seeders_col}]//text()')
if seeding_sizes and seeding_seeders:
page_seeding = len(seeding_sizes)
for i in range(0, len(seeding_sizes)):
size = StringUtils.num_filesize(seeding_sizes[i].xpath("string(.)").strip())
seeders = StringUtils.str_int(seeding_seeders[i])
page_seeding_size += size
page_seeding_info.append([seeders, size])
self.seeding += page_seeding
self.seeding_size += page_seeding_size
self.seeding_info.extend(page_seeding_info)
# 是否存在下页数据
next_page = None
next_page_text = html.xpath('//a[contains(.//text(), "下一页") or contains(.//text(), "下一頁")]/@href')
if next_page_text:
next_page = next_page_text[-1].strip()
return next_page
def _parse_user_traffic_info(self, html_text: str):
pass
def _parse_message_unread_links(self, html_text: str, msg_links: list) -> Optional[str]:
return None
def _parse_message_content(self, html_text):
return None, None, None

View File

@@ -0,0 +1,118 @@
# -*- coding: utf-8 -*-
import re
from typing import Optional
from lxml import etree
from app.plugins.sitestatistic.siteuserinfo import ISiteUserInfo, SITE_BASE_ORDER, SiteSchema
from app.utils.string import StringUtils
class FileListSiteUserInfo(ISiteUserInfo):
schema = SiteSchema.FileList
order = SITE_BASE_ORDER + 50
@classmethod
def match(cls, html_text: str) -> bool:
html = etree.HTML(html_text)
if not html:
return False
printable_text = html.xpath("string(.)") if html else ""
return 'Powered by FileList' in printable_text
def _parse_site_page(self, html_text: str):
html_text = self._prepare_html_text(html_text)
user_detail = re.search(r"userdetails.php\?id=(\d+)", html_text)
if user_detail and user_detail.group().strip():
self._user_detail_page = user_detail.group().strip().lstrip('/')
self.userid = user_detail.group(1)
self._torrent_seeding_page = f"snatchlist.php?id={self.userid}&action=torrents&type=seeding"
def _parse_user_base_info(self, html_text: str):
html_text = self._prepare_html_text(html_text)
html = etree.HTML(html_text)
ret = html.xpath(f'//a[contains(@href, "userdetails") and contains(@href, "{self.userid}")]//text()')
if ret:
self.username = str(ret[0])
def _parse_user_traffic_info(self, html_text: str):
"""
上传/下载/分享率 [做种数/魔力值]
:param html_text:
:return:
"""
return
def _parse_user_detail_info(self, html_text: str):
html_text = self._prepare_html_text(html_text)
html = etree.HTML(html_text)
upload_html = html.xpath('//table//tr/td[text()="Uploaded"]/following-sibling::td//text()')
if upload_html:
self.upload = StringUtils.num_filesize(upload_html[0])
download_html = html.xpath('//table//tr/td[text()="Downloaded"]/following-sibling::td//text()')
if download_html:
self.download = StringUtils.num_filesize(download_html[0])
self.ratio = 0 if self.download == 0 else self.upload / self.download
user_level_html = html.xpath('//table//tr/td[text()="Class"]/following-sibling::td//text()')
if user_level_html:
self.user_level = user_level_html[0].strip()
join_at_html = html.xpath('//table//tr/td[contains(text(), "Join")]/following-sibling::td//text()')
if join_at_html:
self.join_at = StringUtils.unify_datetime_str(join_at_html[0].strip())
bonus_html = html.xpath('//a[contains(@href, "shop.php")]')
if bonus_html:
self.bonus = StringUtils.str_float(bonus_html[0].xpath("string(.)").strip())
pass
def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]:
"""
做种相关信息
:param html_text:
:param multi_page: 是否多页数据
:return: 下页地址
"""
html = etree.HTML(html_text)
if not html:
return None
size_col = 6
seeders_col = 7
page_seeding = 0
page_seeding_size = 0
page_seeding_info = []
seeding_sizes = html.xpath(f'//table/tr[position()>1]/td[{size_col}]')
seeding_seeders = html.xpath(f'//table/tr[position()>1]/td[{seeders_col}]')
if seeding_sizes and seeding_seeders:
page_seeding = len(seeding_sizes)
for i in range(0, len(seeding_sizes)):
size = StringUtils.num_filesize(seeding_sizes[i].xpath("string(.)").strip())
seeders = StringUtils.str_int(seeding_seeders[i].xpath("string(.)").strip())
page_seeding_size += size
page_seeding_info.append([seeders, size])
self.seeding += page_seeding
self.seeding_size += page_seeding_size
self.seeding_info.extend(page_seeding_info)
# 是否存在下页数据
next_page = None
return next_page
def _parse_message_unread_links(self, html_text: str, msg_links: list) -> Optional[str]:
return None
def _parse_message_content(self, html_text):
return None, None, None

View File

@@ -0,0 +1,163 @@
# -*- coding: utf-8 -*-
import re
from typing import Optional
from lxml import etree
from app.plugins.sitestatistic.siteuserinfo import ISiteUserInfo, SITE_BASE_ORDER, SiteSchema
from app.utils.string import StringUtils
class GazelleSiteUserInfo(ISiteUserInfo):
schema = SiteSchema.Gazelle
order = SITE_BASE_ORDER
@classmethod
def match(cls, html_text: str) -> bool:
html = etree.HTML(html_text)
if not html:
return False
printable_text = html.xpath("string(.)") if html else ""
return "Powered by Gazelle" in printable_text or "DIC Music" in printable_text
def _parse_user_base_info(self, html_text: str):
html_text = self._prepare_html_text(html_text)
html = etree.HTML(html_text)
tmps = html.xpath('//a[contains(@href, "user.php?id=")]')
if tmps:
user_id_match = re.search(r"user.php\?id=(\d+)", tmps[0].attrib['href'])
if user_id_match and user_id_match.group().strip():
self.userid = user_id_match.group(1)
self._torrent_seeding_page = f"torrents.php?type=seeding&userid={self.userid}"
self._user_detail_page = f"user.php?id={self.userid}"
self.username = tmps[0].text.strip()
tmps = html.xpath('//*[@id="header-uploaded-value"]/@data-value')
if tmps:
self.upload = StringUtils.num_filesize(tmps[0])
else:
tmps = html.xpath('//li[@id="stats_seeding"]/span/text()')
if tmps:
self.upload = StringUtils.num_filesize(tmps[0])
tmps = html.xpath('//*[@id="header-downloaded-value"]/@data-value')
if tmps:
self.download = StringUtils.num_filesize(tmps[0])
else:
tmps = html.xpath('//li[@id="stats_leeching"]/span/text()')
if tmps:
self.download = StringUtils.num_filesize(tmps[0])
self.ratio = 0.0 if self.download <= 0.0 else round(self.upload / self.download, 3)
tmps = html.xpath('//a[contains(@href, "bonus.php")]/@data-tooltip')
if tmps:
bonus_match = re.search(r"([\d,.]+)", tmps[0])
if bonus_match and bonus_match.group(1).strip():
self.bonus = StringUtils.str_float(bonus_match.group(1))
else:
tmps = html.xpath('//a[contains(@href, "bonus.php")]')
if tmps:
bonus_text = tmps[0].xpath("string(.)")
bonus_match = re.search(r"([\d,.]+)", bonus_text)
if bonus_match and bonus_match.group(1).strip():
self.bonus = StringUtils.str_float(bonus_match.group(1))
def _parse_site_page(self, html_text: str):
# TODO
pass
def _parse_user_detail_info(self, html_text: str):
"""
解析用户额外信息,加入时间,等级
:param html_text:
:return:
"""
html = etree.HTML(html_text)
if not html:
return None
# 用户等级
user_levels_text = html.xpath('//*[@id="class-value"]/@data-value')
if user_levels_text:
self.user_level = user_levels_text[0].strip()
else:
user_levels_text = html.xpath('//li[contains(text(), "用户等级")]/text()')
if user_levels_text:
self.user_level = user_levels_text[0].split(':')[1].strip()
# 加入日期
join_at_text = html.xpath('//*[@id="join-date-value"]/@data-value')
if join_at_text:
self.join_at = StringUtils.unify_datetime_str(join_at_text[0].strip())
else:
join_at_text = html.xpath(
'//div[contains(@class, "box_userinfo_stats")]//li[contains(text(), "加入时间")]/span/text()')
if join_at_text:
self.join_at = StringUtils.unify_datetime_str(join_at_text[0].strip())
def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]:
"""
做种相关信息
:param html_text:
:param multi_page: 是否多页数据
:return: 下页地址
"""
html = etree.HTML(html_text)
if not html:
return None
size_col = 3
# 搜索size列
if html.xpath('//table[contains(@id, "torrent")]//tr[1]/td'):
size_col = len(html.xpath('//table[contains(@id, "torrent")]//tr[1]/td')) - 3
# 搜索seeders列
seeders_col = size_col + 2
page_seeding = 0
page_seeding_size = 0
page_seeding_info = []
seeding_sizes = html.xpath(f'//table[contains(@id, "torrent")]//tr[position()>1]/td[{size_col}]')
seeding_seeders = html.xpath(f'//table[contains(@id, "torrent")]//tr[position()>1]/td[{seeders_col}]/text()')
if seeding_sizes and seeding_seeders:
page_seeding = len(seeding_sizes)
for i in range(0, len(seeding_sizes)):
size = StringUtils.num_filesize(seeding_sizes[i].xpath("string(.)").strip())
seeders = int(seeding_seeders[i])
page_seeding_size += size
page_seeding_info.append([seeders, size])
if multi_page:
self.seeding += page_seeding
self.seeding_size += page_seeding_size
self.seeding_info.extend(page_seeding_info)
else:
if not self.seeding:
self.seeding = page_seeding
if not self.seeding_size:
self.seeding_size = page_seeding_size
if not self.seeding_info:
self.seeding_info = page_seeding_info
# 是否存在下页数据
next_page = None
next_page_text = html.xpath('//a[contains(.//text(), "Next") or contains(.//text(), "下一页")]/@href')
if next_page_text:
next_page = next_page_text[-1].strip()
return next_page
def _parse_user_traffic_info(self, html_text: str):
# TODO
pass
def _parse_message_unread_links(self, html_text: str, msg_links: list) -> Optional[str]:
return None
def _parse_message_content(self, html_text):
return None, None, None

View File

@@ -0,0 +1,93 @@
# -*- coding: utf-8 -*-
import re
from typing import Optional
from lxml import etree
from app.plugins.sitestatistic.siteuserinfo import ISiteUserInfo, SITE_BASE_ORDER, SiteSchema
from app.utils.string import StringUtils
class IptSiteUserInfo(ISiteUserInfo):
schema = SiteSchema.Ipt
order = SITE_BASE_ORDER + 35
@classmethod
def match(cls, html_text: str) -> bool:
return 'IPTorrents' in html_text
def _parse_user_base_info(self, html_text: str):
html_text = self._prepare_html_text(html_text)
html = etree.HTML(html_text)
tmps = html.xpath('//a[contains(@href, "/u/")]//text()')
tmps_id = html.xpath('//a[contains(@href, "/u/")]/@href')
if tmps:
self.username = str(tmps[-1])
if tmps_id:
user_id_match = re.search(r"/u/(\d+)", tmps_id[0])
if user_id_match and user_id_match.group().strip():
self.userid = user_id_match.group(1)
self._user_detail_page = f"user.php?u={self.userid}"
self._torrent_seeding_page = f"peers?u={self.userid}"
tmps = html.xpath('//div[@class = "stats"]/div/div')
if tmps:
self.upload = StringUtils.num_filesize(str(tmps[0].xpath('span/text()')[1]).strip())
self.download = StringUtils.num_filesize(str(tmps[0].xpath('span/text()')[2]).strip())
self.seeding = StringUtils.str_int(tmps[0].xpath('a')[2].xpath('text()')[0])
self.leeching = StringUtils.str_int(tmps[0].xpath('a')[2].xpath('text()')[1])
self.ratio = StringUtils.str_float(str(tmps[0].xpath('span/text()')[0]).strip().replace('-', '0'))
self.bonus = StringUtils.str_float(tmps[0].xpath('a')[3].xpath('text()')[0])
def _parse_site_page(self, html_text: str):
# TODO
pass
def _parse_user_detail_info(self, html_text: str):
html = etree.HTML(html_text)
if not html:
return
user_levels_text = html.xpath('//tr/th[text()="Class"]/following-sibling::td[1]/text()')
if user_levels_text:
self.user_level = user_levels_text[0].strip()
# 加入日期
join_at_text = html.xpath('//tr/th[text()="Join date"]/following-sibling::td[1]/text()')
if join_at_text:
self.join_at = StringUtils.unify_datetime_str(join_at_text[0].split(' (')[0])
def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]:
html = etree.HTML(html_text)
if not html:
return
# seeding start
seeding_end_pos = 3
if html.xpath('//tr/td[text() = "Leechers"]'):
seeding_end_pos = len(html.xpath('//tr/td[text() = "Leechers"]/../preceding-sibling::tr')) + 1
seeding_end_pos = seeding_end_pos - 3
page_seeding = 0
page_seeding_size = 0
seeding_torrents = html.xpath('//tr/td[text() = "Seeders"]/../following-sibling::tr/td[position()=6]/text()')
if seeding_torrents:
page_seeding = seeding_end_pos
for per_size in seeding_torrents[:seeding_end_pos]:
if '(' in per_size and ')' in per_size:
per_size = per_size.split('(')[-1]
per_size = per_size.split(')')[0]
page_seeding_size += StringUtils.num_filesize(per_size)
self.seeding = page_seeding
self.seeding_size = page_seeding_size
def _parse_user_traffic_info(self, html_text: str):
# TODO
pass
def _parse_message_unread_links(self, html_text: str, msg_links: list) -> Optional[str]:
return None
def _parse_message_content(self, html_text):
return None, None, None

View File

@@ -0,0 +1,61 @@
# -*- coding: utf-8 -*-
import re
from lxml import etree
from app.plugins.sitestatistic.siteuserinfo import SITE_BASE_ORDER, SiteSchema
from app.plugins.sitestatistic.siteuserinfo.nexus_php import NexusPhpSiteUserInfo
from app.utils.string import StringUtils
class NexusHhanclubSiteUserInfo(NexusPhpSiteUserInfo):
schema = SiteSchema.NexusHhanclub
order = SITE_BASE_ORDER + 20
@classmethod
def match(cls, html_text: str) -> bool:
return 'hhanclub.top' in html_text
def _parse_user_traffic_info(self, html_text):
super()._parse_user_traffic_info(html_text)
html_text = self._prepare_html_text(html_text)
html = etree.HTML(html_text)
# 上传、下载、分享率
upload_match = re.search(r"[_<>/a-zA-Z-=\"'\s#;]+([\d,.\s]+[KMGTPI]*B)",
html.xpath('//*[@id="user-info-panel"]/div[2]/div[2]/div[4]/text()')[0])
download_match = re.search(r"[_<>/a-zA-Z-=\"'\s#;]+([\d,.\s]+[KMGTPI]*B)",
html.xpath('//*[@id="user-info-panel"]/div[2]/div[2]/div[5]/text()')[0])
ratio_match = re.search(r"分享率][:_<>/a-zA-Z-=\"'\s#;]+([\d,.\s]+)",
html.xpath('//*[@id="user-info-panel"]/div[2]/div[1]/div[1]/div/text()')[0])
# 计算分享率
self.upload = StringUtils.num_filesize(upload_match.group(1).strip()) if upload_match else 0
self.download = StringUtils.num_filesize(download_match.group(1).strip()) if download_match else 0
# 优先使用页面上的分享率
calc_ratio = 0.0 if self.download <= 0.0 else round(self.upload / self.download, 3)
self.ratio = StringUtils.str_float(ratio_match.group(1)) if (
ratio_match and ratio_match.group(1).strip()) else calc_ratio
def _parse_user_detail_info(self, html_text: str):
"""
解析用户额外信息,加入时间,等级
:param html_text:
:return:
"""
super()._parse_user_detail_info(html_text)
html = etree.HTML(html_text)
if not html:
return
# 加入时间
join_at_text = html.xpath('//*[@id="mainContent"]/div/div[2]/div[4]/div[3]/span[2]/text()[1]')
if join_at_text:
self.join_at = StringUtils.unify_datetime_str(join_at_text[0].split(' (')[0].strip())
def _get_user_level(self, html):
super()._get_user_level(html)
user_level_path = html.xpath('//*[@id="mainContent"]/div/div[2]/div[2]/div[4]/span[2]/img/@title')
if user_level_path:
self.user_level = user_level_path[0]

View File

@@ -0,0 +1,392 @@
# -*- coding: utf-8 -*-
import re
from typing import Optional
from lxml import etree
from app.log import logger
from app.plugins.sitestatistic.siteuserinfo import ISiteUserInfo, SITE_BASE_ORDER, SiteSchema
from app.utils.string import StringUtils
class NexusPhpSiteUserInfo(ISiteUserInfo):
schema = SiteSchema.NexusPhp
order = SITE_BASE_ORDER * 2
@classmethod
def match(cls, html_text: str) -> bool:
"""
默认使用NexusPhp解析
:param html_text:
:return:
"""
return True
def _parse_site_page(self, html_text: str):
html_text = self._prepare_html_text(html_text)
user_detail = re.search(r"userdetails.php\?id=(\d+)", html_text)
if user_detail and user_detail.group().strip():
self._user_detail_page = user_detail.group().strip().lstrip('/')
self.userid = user_detail.group(1)
self._torrent_seeding_page = f"getusertorrentlistajax.php?userid={self.userid}&type=seeding"
else:
user_detail = re.search(r"(userdetails)", html_text)
if user_detail and user_detail.group().strip():
self._user_detail_page = user_detail.group().strip().lstrip('/')
self.userid = None
self._torrent_seeding_page = None
def _parse_message_unread(self, html_text):
"""
解析未读短消息数量
:param html_text:
:return:
"""
html = etree.HTML(html_text)
if not html:
return
message_labels = html.xpath('//a[@href="messages.php"]/..')
message_labels.extend(html.xpath('//a[contains(@href, "messages.php")]/..'))
if message_labels:
message_text = message_labels[0].xpath("string(.)")
logger.debug(f"{self.site_name} 消息原始信息 {message_text}")
message_unread_match = re.findall(r"[^Date](信息箱\s*|\(|你有\xa0)(\d+)", message_text)
if message_unread_match and len(message_unread_match[-1]) == 2:
self.message_unread = StringUtils.str_int(message_unread_match[-1][1])
elif message_text.isdigit():
self.message_unread = StringUtils.str_int(message_text)
def _parse_user_base_info(self, html_text: str):
# 合并解析,减少额外请求调用
self._parse_user_traffic_info(html_text)
self._user_traffic_page = None
self._parse_message_unread(html_text)
html = etree.HTML(html_text)
if not html:
return
ret = html.xpath(f'//a[contains(@href, "userdetails") and contains(@href, "{self.userid}")]//b//text()')
if ret:
self.username = str(ret[0])
return
ret = html.xpath(f'//a[contains(@href, "userdetails") and contains(@href, "{self.userid}")]//text()')
if ret:
self.username = str(ret[0])
ret = html.xpath('//a[contains(@href, "userdetails")]//strong//text()')
if ret:
self.username = str(ret[0])
return
def _parse_user_traffic_info(self, html_text):
html_text = self._prepare_html_text(html_text)
upload_match = re.search(r"[^总]上[传傳]量?[:_<>/a-zA-Z-=\"'\s#;]+([\d,.\s]+[KMGTPI]*B)", html_text,
re.IGNORECASE)
self.upload = StringUtils.num_filesize(upload_match.group(1).strip()) if upload_match else 0
download_match = re.search(r"[^总子影力]下[载載]量?[:_<>/a-zA-Z-=\"'\s#;]+([\d,.\s]+[KMGTPI]*B)", html_text,
re.IGNORECASE)
self.download = StringUtils.num_filesize(download_match.group(1).strip()) if download_match else 0
ratio_match = re.search(r"分享率[:_<>/a-zA-Z-=\"'\s#;]+([\d,.\s]+)", html_text)
# 计算分享率
calc_ratio = 0.0 if self.download <= 0.0 else round(self.upload / self.download, 3)
# 优先使用页面上的分享率
self.ratio = StringUtils.str_float(ratio_match.group(1)) if (
ratio_match and ratio_match.group(1).strip()) else calc_ratio
leeching_match = re.search(r"(Torrents leeching|下载中)[\u4E00-\u9FA5\D\s]+(\d+)[\s\S]+<", html_text)
self.leeching = StringUtils.str_int(leeching_match.group(2)) if leeching_match and leeching_match.group(
2).strip() else 0
html = etree.HTML(html_text)
has_ucoin, self.bonus = self._parse_ucoin(html)
if has_ucoin:
return
tmps = html.xpath('//a[contains(@href,"mybonus")]/text()') if html else None
if tmps:
bonus_text = str(tmps[0]).strip()
bonus_match = re.search(r"([\d,.]+)", bonus_text)
if bonus_match and bonus_match.group(1).strip():
self.bonus = StringUtils.str_float(bonus_match.group(1))
return
bonus_match = re.search(r"mybonus.[\[\]:<>/a-zA-Z_\-=\"'\s#;.(使用魔力值豆]+\s*([\d,.]+)[<()&\s]", html_text)
try:
if bonus_match and bonus_match.group(1).strip():
self.bonus = StringUtils.str_float(bonus_match.group(1))
return
bonus_match = re.search(r"[魔力值|\]][\[\]:<>/a-zA-Z_\-=\"'\s#;]+\s*([\d,.]+|\"[\d,.]+\")[<>()&\s]",
html_text,
flags=re.S)
if bonus_match and bonus_match.group(1).strip():
self.bonus = StringUtils.str_float(bonus_match.group(1).strip('"'))
except Exception as err:
logger.error(f"{self.site_name} 解析魔力值出错, 错误信息: {str(err)}")
@staticmethod
def _parse_ucoin(html):
"""
解析ucoin, 统一转换为铜币
:param html:
:return:
"""
if html:
gold, silver, copper = None, None, None
golds = html.xpath('//span[@class = "ucoin-symbol ucoin-gold"]//text()')
if golds:
gold = StringUtils.str_float(str(golds[-1]))
silvers = html.xpath('//span[@class = "ucoin-symbol ucoin-silver"]//text()')
if silvers:
silver = StringUtils.str_float(str(silvers[-1]))
coppers = html.xpath('//span[@class = "ucoin-symbol ucoin-copper"]//text()')
if coppers:
copper = StringUtils.str_float(str(coppers[-1]))
if gold or silver or copper:
gold = gold if gold else 0
silver = silver if silver else 0
copper = copper if copper else 0
return True, gold * 100 * 100 + silver * 100 + copper
return False, 0.0
def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]:
"""
做种相关信息
:param html_text:
:param multi_page: 是否多页数据
:return: 下页地址
"""
html = etree.HTML(str(html_text).replace(r'\/', '/'))
if not html:
return None
# 首页存在扩展链接,使用扩展链接
seeding_url_text = html.xpath('//a[contains(@href,"torrents.php") '
'and contains(@href,"seeding")]/@href')
if multi_page is False and seeding_url_text and seeding_url_text[0].strip():
self._torrent_seeding_page = seeding_url_text[0].strip()
return self._torrent_seeding_page
size_col = 3
seeders_col = 4
# 搜索size列
size_col_xpath = '//tr[position()=1]/' \
'td[(img[@class="size"] and img[@alt="size"])' \
' or (text() = "大小")' \
' or (a/img[@class="size" and @alt="size"])]'
if html.xpath(size_col_xpath):
size_col = len(html.xpath(f'{size_col_xpath}/preceding-sibling::td')) + 1
# 搜索seeders列
seeders_col_xpath = '//tr[position()=1]/' \
'td[(img[@class="seeders"] and img[@alt="seeders"])' \
' or (text() = "在做种")' \
' or (a/img[@class="seeders" and @alt="seeders"])]'
if html.xpath(seeders_col_xpath):
seeders_col = len(html.xpath(f'{seeders_col_xpath}/preceding-sibling::td')) + 1
page_seeding = 0
page_seeding_size = 0
page_seeding_info = []
# 如果 table class="torrents"则增加table[@class="torrents"]
table_class = '//table[@class="torrents"]' if html.xpath('//table[@class="torrents"]') else ''
seeding_sizes = html.xpath(f'{table_class}//tr[position()>1]/td[{size_col}]')
seeding_seeders = html.xpath(f'{table_class}//tr[position()>1]/td[{seeders_col}]/b/a/text()')
if not seeding_seeders:
seeding_seeders = html.xpath(f'{table_class}//tr[position()>1]/td[{seeders_col}]//text()')
if seeding_sizes and seeding_seeders:
page_seeding = len(seeding_sizes)
for i in range(0, len(seeding_sizes)):
size = StringUtils.num_filesize(seeding_sizes[i].xpath("string(.)").strip())
seeders = StringUtils.str_int(seeding_seeders[i])
page_seeding_size += size
page_seeding_info.append([seeders, size])
self.seeding += page_seeding
self.seeding_size += page_seeding_size
self.seeding_info.extend(page_seeding_info)
# 是否存在下页数据
next_page = None
next_page_text = html.xpath('//a[contains(.//text(), "下一页") or contains(.//text(), "下一頁")]/@href')
if next_page_text:
next_page = next_page_text[-1].strip()
# fix up page url
if self.userid not in next_page:
next_page = f'{next_page}&userid={self.userid}&type=seeding'
return next_page
def _parse_user_detail_info(self, html_text: str):
"""
解析用户额外信息,加入时间,等级
:param html_text:
:return:
"""
html = etree.HTML(html_text)
if not html:
return
self._get_user_level(html)
self._fixup_traffic_info(html)
# 加入日期
join_at_text = html.xpath(
'//tr/td[text()="加入日期" or text()="注册日期" or *[text()="加入日期"]]/following-sibling::td[1]//text()'
'|//div/b[text()="加入日期"]/../text()')
if join_at_text:
self.join_at = StringUtils.unify_datetime_str(join_at_text[0].split(' (')[0].strip())
# 做种体积 & 做种数
# seeding 页面获取不到的话,此处再获取一次
seeding_sizes = html.xpath('//tr/td[text()="当前上传"]/following-sibling::td[1]//'
'table[tr[1][td[4 and text()="尺寸"]]]//tr[position()>1]/td[4]')
seeding_seeders = html.xpath('//tr/td[text()="当前上传"]/following-sibling::td[1]//'
'table[tr[1][td[5 and text()="做种者"]]]//tr[position()>1]/td[5]//text()')
tmp_seeding = len(seeding_sizes)
tmp_seeding_size = 0
tmp_seeding_info = []
for i in range(0, len(seeding_sizes)):
size = StringUtils.num_filesize(seeding_sizes[i].xpath("string(.)").strip())
seeders = StringUtils.str_int(seeding_seeders[i])
tmp_seeding_size += size
tmp_seeding_info.append([seeders, size])
if not self.seeding_size:
self.seeding_size = tmp_seeding_size
if not self.seeding:
self.seeding = tmp_seeding
if not self.seeding_info:
self.seeding_info = tmp_seeding_info
seeding_sizes = html.xpath('//tr/td[text()="做种统计"]/following-sibling::td[1]//text()')
if seeding_sizes:
seeding_match = re.search(r"总做种数:\s+(\d+)", seeding_sizes[0], re.IGNORECASE)
seeding_size_match = re.search(r"总做种体积:\s+([\d,.\s]+[KMGTPI]*B)", seeding_sizes[0], re.IGNORECASE)
tmp_seeding = StringUtils.str_int(seeding_match.group(1)) if (
seeding_match and seeding_match.group(1)) else 0
tmp_seeding_size = StringUtils.num_filesize(
seeding_size_match.group(1).strip()) if seeding_size_match else 0
if not self.seeding_size:
self.seeding_size = tmp_seeding_size
if not self.seeding:
self.seeding = tmp_seeding
self._fixup_torrent_seeding_page(html)
def _fixup_torrent_seeding_page(self, html):
"""
修正种子页面链接
:param html:
:return:
"""
# 单独的种子页面
seeding_url_text = html.xpath('//a[contains(@href,"getusertorrentlist.php") '
'and contains(@href,"seeding")]/@href')
if seeding_url_text:
self._torrent_seeding_page = seeding_url_text[0].strip()
# 从JS调用种获取用户ID
seeding_url_text = html.xpath('//a[contains(@href, "javascript: getusertorrentlistajax") '
'and contains(@href,"seeding")]/@href')
csrf_text = html.xpath('//meta[@name="x-csrf"]/@content')
if not self._torrent_seeding_page and seeding_url_text:
user_js = re.search(r"javascript: getusertorrentlistajax\(\s*'(\d+)", seeding_url_text[0])
if user_js and user_js.group(1).strip():
self.userid = user_js.group(1).strip()
self._torrent_seeding_page = f"getusertorrentlistajax.php?userid={self.userid}&type=seeding"
elif seeding_url_text and csrf_text:
if csrf_text[0].strip():
self._torrent_seeding_page \
= f"ajax_getusertorrentlist.php"
self._torrent_seeding_params = {'userid': self.userid, 'type': 'seeding', 'csrf': csrf_text[0].strip()}
# 分类做种模式
# 临时屏蔽
# seeding_url_text = html.xpath('//tr/td[text()="当前做种"]/following-sibling::td[1]'
# '/table//td/a[contains(@href,"seeding")]/@href')
# if seeding_url_text:
# self._torrent_seeding_page = seeding_url_text
def _get_user_level(self, html):
# 等级 获取同一行等级数据图片格式等级取title信息否则取文本信息
user_levels_text = html.xpath('//tr/td[text()="等級" or text()="等级" or *[text()="等级"]]/'
'following-sibling::td[1]/img[1]/@title')
if user_levels_text:
self.user_level = user_levels_text[0].strip()
return
user_levels_text = html.xpath('//tr/td[text()="等級" or text()="等级"]/'
'following-sibling::td[1 and not(img)]'
'|//tr/td[text()="等級" or text()="等级"]/'
'following-sibling::td[1 and img[not(@title)]]')
if user_levels_text:
self.user_level = user_levels_text[0].xpath("string(.)").strip()
return
user_levels_text = html.xpath('//tr/td[text()="等級" or text()="等级"]/'
'following-sibling::td[1]')
if user_levels_text:
self.user_level = user_levels_text[0].xpath("string(.)").strip()
return
user_levels_text = html.xpath('//a[contains(@href, "userdetails")]/text()')
if not self.user_level and user_levels_text:
for user_level_text in user_levels_text:
user_level_match = re.search(r"\[(.*)]", user_level_text)
if user_level_match and user_level_match.group(1).strip():
self.user_level = user_level_match.group(1).strip()
break
def _parse_message_unread_links(self, html_text: str, msg_links: list) -> Optional[str]:
html = etree.HTML(html_text)
if not html:
return None
message_links = html.xpath('//tr[not(./td/img[@alt="Read"])]/td/a[contains(@href, "viewmessage")]/@href')
msg_links.extend(message_links)
# 是否存在下页数据
next_page = None
next_page_text = html.xpath('//a[contains(.//text(), "下一页") or contains(.//text(), "下一頁")]/@href')
if next_page_text:
next_page = next_page_text[-1].strip()
return next_page
def _parse_message_content(self, html_text):
html = etree.HTML(html_text)
if not html:
return None, None, None
# 标题
message_head_text = None
message_head = html.xpath('//h1/text()'
'|//div[@class="layui-card-header"]/span[1]/text()')
if message_head:
message_head_text = message_head[-1].strip()
# 消息时间
message_date_text = None
message_date = html.xpath('//h1/following-sibling::table[.//tr/td[@class="colhead"]]//tr[2]/td[2]'
'|//div[@class="layui-card-header"]/span[2]/span[2]')
if message_date:
message_date_text = message_date[0].xpath("string(.)").strip()
# 消息内容
message_content_text = None
message_content = html.xpath('//h1/following-sibling::table[.//tr/td[@class="colhead"]]//tr[3]/td'
'|//div[contains(@class,"layui-card-body")]')
if message_content:
message_content_text = message_content[0].xpath("string(.)").strip()
return message_head_text, message_date_text, message_content_text
def _fixup_traffic_info(self, html):
# fixup bonus
if not self.bonus:
bonus_text = html.xpath('//tr/td[text()="魔力值" or text()="猫粮"]/following-sibling::td[1]/text()')
if bonus_text:
self.bonus = StringUtils.str_float(bonus_text[0].strip())

View File

@@ -0,0 +1,24 @@
# -*- coding: utf-8 -*-
import re
from app.plugins.sitestatistic.siteuserinfo import SITE_BASE_ORDER, SiteSchema
from app.plugins.sitestatistic.siteuserinfo.nexus_php import NexusPhpSiteUserInfo
class NexusProjectSiteUserInfo(NexusPhpSiteUserInfo):
schema = SiteSchema.NexusProject
order = SITE_BASE_ORDER + 25
@classmethod
def match(cls, html_text: str) -> bool:
return 'Nexus Project' in html_text
def _parse_site_page(self, html_text: str):
html_text = self._prepare_html_text(html_text)
user_detail = re.search(r"userdetails.php\?id=(\d+)", html_text)
if user_detail and user_detail.group().strip():
self._user_detail_page = user_detail.group().strip().lstrip('/')
self.userid = user_detail.group(1)
self._torrent_seeding_page = f"viewusertorrents.php?id={self.userid}&show=seeding"

View File

@@ -0,0 +1,57 @@
# -*- coding: utf-8 -*-
import json
from typing import Optional
from lxml import etree
from app.log import logger
from app.plugins.sitestatistic.siteuserinfo import SITE_BASE_ORDER, SiteSchema
from app.plugins.sitestatistic.siteuserinfo.nexus_php import NexusPhpSiteUserInfo
class NexusRabbitSiteUserInfo(NexusPhpSiteUserInfo):
schema = SiteSchema.NexusRabbit
order = SITE_BASE_ORDER + 5
@classmethod
def match(cls, html_text: str) -> bool:
html = etree.HTML(html_text)
if not html:
return False
printable_text = html.xpath("string(.)") if html else ""
return 'Style by Rabbit' in printable_text
def _parse_site_page(self, html_text: str):
super()._parse_site_page(html_text)
self._torrent_seeding_page = f"getusertorrentlistajax.php?page=1&limit=5000000&type=seeding&uid={self.userid}"
self._torrent_seeding_headers = {"Accept": "application/json, text/javascript, */*; q=0.01"}
def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]:
"""
做种相关信息
:param html_text:
:param multi_page: 是否多页数据
:return: 下页地址
"""
try:
torrents = json.loads(html_text).get('data')
except Exception as e:
logger.error(f"解析做种信息失败: {str(e)}")
return
page_seeding_size = 0
page_seeding_info = []
page_seeding = len(torrents)
for torrent in torrents:
seeders = int(torrent.get('seeders', 0))
size = int(torrent.get('size', 0))
page_seeding_size += int(torrent.get('size', 0))
page_seeding_info.append([seeders, size])
self.seeding += page_seeding
self.seeding_size += page_seeding_size
self.seeding_info.extend(page_seeding_info)

View File

@@ -0,0 +1,110 @@
# -*- coding: utf-8 -*-
import re
from typing import Optional
from lxml import etree
from app.plugins.sitestatistic.siteuserinfo import ISiteUserInfo, SITE_BASE_ORDER, SiteSchema
from app.utils.string import StringUtils
class SmallHorseSiteUserInfo(ISiteUserInfo):
schema = SiteSchema.SmallHorse
order = SITE_BASE_ORDER + 30
@classmethod
def match(cls, html_text: str) -> bool:
return 'Small Horse' in html_text
def _parse_site_page(self, html_text: str):
html_text = self._prepare_html_text(html_text)
user_detail = re.search(r"user.php\?id=(\d+)", html_text)
if user_detail and user_detail.group().strip():
self._user_detail_page = user_detail.group().strip().lstrip('/')
self.userid = user_detail.group(1)
self._torrent_seeding_page = f"torrents.php?type=seeding&userid={self.userid}"
self._user_traffic_page = f"user.php?id={self.userid}"
def _parse_user_base_info(self, html_text: str):
html_text = self._prepare_html_text(html_text)
html = etree.HTML(html_text)
ret = html.xpath('//a[contains(@href, "user.php")]//text()')
if ret:
self.username = str(ret[0])
def _parse_user_traffic_info(self, html_text: str):
"""
上传/下载/分享率 [做种数/魔力值]
:param html_text:
:return:
"""
html_text = self._prepare_html_text(html_text)
html = etree.HTML(html_text)
tmps = html.xpath('//ul[@class = "stats nobullet"]')
if tmps:
if tmps[1].xpath("li") and tmps[1].xpath("li")[0].xpath("span//text()"):
self.join_at = StringUtils.unify_datetime_str(tmps[1].xpath("li")[0].xpath("span//text()")[0])
self.upload = StringUtils.num_filesize(str(tmps[1].xpath("li")[2].xpath("text()")[0]).split(":")[1].strip())
self.download = StringUtils.num_filesize(
str(tmps[1].xpath("li")[3].xpath("text()")[0]).split(":")[1].strip())
if tmps[1].xpath("li")[4].xpath("span//text()"):
self.ratio = StringUtils.str_float(str(tmps[1].xpath("li")[4].xpath("span//text()")[0]).replace('', '0'))
else:
self.ratio = StringUtils.str_float(str(tmps[1].xpath("li")[5].xpath("text()")[0]).split(":")[1])
self.bonus = StringUtils.str_float(str(tmps[1].xpath("li")[5].xpath("text()")[0]).split(":")[1])
self.user_level = str(tmps[3].xpath("li")[0].xpath("text()")[0]).split(":")[1].strip()
self.leeching = StringUtils.str_int(
(tmps[4].xpath("li")[6].xpath("text()")[0]).split(":")[1].replace("[", ""))
def _parse_user_detail_info(self, html_text: str):
pass
def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]:
"""
做种相关信息
:param html_text:
:param multi_page: 是否多页数据
:return: 下页地址
"""
html = etree.HTML(html_text)
if not html:
return None
size_col = 6
seeders_col = 8
page_seeding = 0
page_seeding_size = 0
page_seeding_info = []
seeding_sizes = html.xpath(f'//table[@id="torrent_table"]//tr[position()>1]/td[{size_col}]')
seeding_seeders = html.xpath(f'//table[@id="torrent_table"]//tr[position()>1]/td[{seeders_col}]')
if seeding_sizes and seeding_seeders:
page_seeding = len(seeding_sizes)
for i in range(0, len(seeding_sizes)):
size = StringUtils.num_filesize(seeding_sizes[i].xpath("string(.)").strip())
seeders = StringUtils.str_int(seeding_seeders[i].xpath("string(.)").strip())
page_seeding_size += size
page_seeding_info.append([seeders, size])
self.seeding += page_seeding
self.seeding_size += page_seeding_size
self.seeding_info.extend(page_seeding_info)
# 是否存在下页数据
next_page = None
next_pages = html.xpath('//ul[@class="pagination"]/li[contains(@class,"active")]/following-sibling::li')
if next_pages and len(next_pages) > 1:
page_num = next_pages[0].xpath("string(.)").strip()
if page_num.isdigit():
next_page = f"{self._torrent_seeding_page}&page={page_num}"
return next_page
def _parse_message_unread_links(self, html_text: str, msg_links: list) -> Optional[str]:
return None
def _parse_message_content(self, html_text):
return None, None, None

View File

@@ -0,0 +1,103 @@
# -*- coding: utf-8 -*-
import json
import re
from typing import Optional
from app.plugins.sitestatistic.siteuserinfo import ISiteUserInfo, SITE_BASE_ORDER, SiteSchema
from app.utils.string import StringUtils
class TNodeSiteUserInfo(ISiteUserInfo):
schema = SiteSchema.TNode
order = SITE_BASE_ORDER + 60
@classmethod
def match(cls, html_text: str) -> bool:
return 'Powered By TNode' in html_text
def _parse_site_page(self, html_text: str):
html_text = self._prepare_html_text(html_text)
# <meta name="x-csrf-token" content="fd169876a7b4846f3a7a16fcd5cccf8d">
csrf_token = re.search(r'<meta name="x-csrf-token" content="(.+?)">', html_text)
if csrf_token:
self._addition_headers = {'X-CSRF-TOKEN': csrf_token.group(1)}
self._user_detail_page = "api/user/getMainInfo"
self._torrent_seeding_page = "api/user/listTorrentActivity?id=&type=seeding&page=1&size=20000"
def _parse_logged_in(self, html_text):
"""
判断是否登录成功, 通过判断是否存在用户信息
暂时跳过检测,待后续优化
:param html_text:
:return:
"""
return True
def _parse_user_base_info(self, html_text: str):
self.username = self.userid
def _parse_user_traffic_info(self, html_text: str):
pass
def _parse_user_detail_info(self, html_text: str):
detail = json.loads(html_text)
if detail.get("status") != 200:
return
user_info = detail.get("data", {})
self.userid = user_info.get("id")
self.username = user_info.get("username")
self.user_level = user_info.get("class", {}).get("name")
self.join_at = user_info.get("regTime", 0)
self.join_at = StringUtils.unify_datetime_str(str(self.join_at))
self.upload = user_info.get("upload")
self.download = user_info.get("download")
self.ratio = 0 if self.download <= 0 else round(self.upload / self.download, 3)
self.bonus = user_info.get("bonus")
self.message_unread = user_info.get("unreadAdmin", 0) + user_info.get("unreadInbox", 0) + user_info.get(
"unreadSystem", 0)
pass
def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]:
"""
解析用户做种信息
"""
seeding_info = json.loads(html_text)
if seeding_info.get("status") != 200:
return
torrents = seeding_info.get("data", {}).get("torrents", [])
page_seeding_size = 0
page_seeding_info = []
for torrent in torrents:
size = torrent.get("size", 0)
seeders = torrent.get("seeding", 0)
page_seeding_size += size
page_seeding_info.append([seeders, size])
self.seeding += len(torrents)
self.seeding_size += page_seeding_size
self.seeding_info.extend(page_seeding_info)
# 是否存在下页数据
next_page = None
return next_page
def _parse_message_unread_links(self, html_text: str, msg_links: list) -> Optional[str]:
return None
def _parse_message_content(self, html_text):
"""
系统信息 api/message/listSystem?page=1&size=20
收件箱信息 api/message/listInbox?page=1&size=20
管理员信息 api/message/listAdmin?page=1&size=20
:param html_text:
:return:
"""
return None, None, None

View File

@@ -0,0 +1,109 @@
# -*- coding: utf-8 -*-
import re
from typing import Optional
from lxml import etree
from app.plugins.sitestatistic.siteuserinfo import ISiteUserInfo, SITE_BASE_ORDER, SiteSchema
from app.utils.string import StringUtils
class TorrentLeechSiteUserInfo(ISiteUserInfo):
schema = SiteSchema.TorrentLeech
order = SITE_BASE_ORDER + 40
@classmethod
def match(cls, html_text: str) -> bool:
return 'TorrentLeech' in html_text
def _parse_site_page(self, html_text: str):
html_text = self._prepare_html_text(html_text)
user_detail = re.search(r"/profile/([^/]+)/", html_text)
if user_detail and user_detail.group().strip():
self._user_detail_page = user_detail.group().strip().lstrip('/')
self.userid = user_detail.group(1)
self._user_traffic_page = f"profile/{self.userid}/view"
self._torrent_seeding_page = f"profile/{self.userid}/seeding"
def _parse_user_base_info(self, html_text: str):
self.username = self.userid
def _parse_user_traffic_info(self, html_text: str):
"""
上传/下载/分享率 [做种数/魔力值]
:param html_text:
:return:
"""
html_text = self._prepare_html_text(html_text)
html = etree.HTML(html_text)
upload_html = html.xpath('//div[contains(@class,"profile-uploaded")]//span/text()')
if upload_html:
self.upload = StringUtils.num_filesize(upload_html[0])
download_html = html.xpath('//div[contains(@class,"profile-downloaded")]//span/text()')
if download_html:
self.download = StringUtils.num_filesize(download_html[0])
ratio_html = html.xpath('//div[contains(@class,"profile-ratio")]//span/text()')
if ratio_html:
self.ratio = StringUtils.str_float(ratio_html[0].replace('', '0'))
user_level_html = html.xpath('//table[contains(@class, "profileViewTable")]'
'//tr/td[text()="Class"]/following-sibling::td/text()')
if user_level_html:
self.user_level = user_level_html[0].strip()
join_at_html = html.xpath('//table[contains(@class, "profileViewTable")]'
'//tr/td[text()="Registration date"]/following-sibling::td/text()')
if join_at_html:
self.join_at = StringUtils.unify_datetime_str(join_at_html[0].strip())
bonus_html = html.xpath('//span[contains(@class, "total-TL-points")]/text()')
if bonus_html:
self.bonus = StringUtils.str_float(bonus_html[0].strip())
def _parse_user_detail_info(self, html_text: str):
pass
def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]:
"""
做种相关信息
:param html_text:
:param multi_page: 是否多页数据
:return: 下页地址
"""
html = etree.HTML(html_text)
if not html:
return None
size_col = 2
seeders_col = 7
page_seeding = 0
page_seeding_size = 0
page_seeding_info = []
seeding_sizes = html.xpath(f'//tbody/tr/td[{size_col}]')
seeding_seeders = html.xpath(f'//tbody/tr/td[{seeders_col}]/text()')
if seeding_sizes and seeding_seeders:
page_seeding = len(seeding_sizes)
for i in range(0, len(seeding_sizes)):
size = StringUtils.num_filesize(seeding_sizes[i].xpath("string(.)").strip())
seeders = StringUtils.str_int(seeding_seeders[i])
page_seeding_size += size
page_seeding_info.append([seeders, size])
self.seeding += page_seeding
self.seeding_size += page_seeding_size
self.seeding_info.extend(page_seeding_info)
# 是否存在下页数据
next_page = None
return next_page
def _parse_message_unread_links(self, html_text: str, msg_links: list) -> Optional[str]:
return None
def _parse_message_content(self, html_text):
return None, None, None

View File

@@ -0,0 +1,130 @@
# -*- coding: utf-8 -*-
import re
from typing import Optional
from lxml import etree
from app.plugins.sitestatistic.siteuserinfo import ISiteUserInfo, SITE_BASE_ORDER, SiteSchema
from app.utils.string import StringUtils
class Unit3dSiteUserInfo(ISiteUserInfo):
schema = SiteSchema.Unit3d
order = SITE_BASE_ORDER + 15
@classmethod
def match(cls, html_text: str) -> bool:
return "unit3d.js" in html_text
def _parse_user_base_info(self, html_text: str):
html_text = self._prepare_html_text(html_text)
html = etree.HTML(html_text)
tmps = html.xpath('//a[contains(@href, "/users/") and contains(@href, "settings")]/@href')
if tmps:
user_name_match = re.search(r"/users/(.+)/settings", tmps[0])
if user_name_match and user_name_match.group().strip():
self.username = user_name_match.group(1)
self._torrent_seeding_page = f"/users/{self.username}/active?perPage=100&client=&seeding=include"
self._user_detail_page = f"/users/{self.username}"
tmps = html.xpath('//a[contains(@href, "bonus/earnings")]')
if tmps:
bonus_text = tmps[0].xpath("string(.)")
bonus_match = re.search(r"([\d,.]+)", bonus_text)
if bonus_match and bonus_match.group(1).strip():
self.bonus = StringUtils.str_float(bonus_match.group(1))
def _parse_site_page(self, html_text: str):
# TODO
pass
def _parse_user_detail_info(self, html_text: str):
"""
解析用户额外信息,加入时间,等级
:param html_text:
:return:
"""
html = etree.HTML(html_text)
if not html:
return None
# 用户等级
user_levels_text = html.xpath('//div[contains(@class, "content")]//span[contains(@class, "badge-user")]/text()')
if user_levels_text:
self.user_level = user_levels_text[0].strip()
# 加入日期
join_at_text = html.xpath('//div[contains(@class, "content")]//h4[contains(text(), "注册日期") '
'or contains(text(), "註冊日期") '
'or contains(text(), "Registration date")]/text()')
if join_at_text:
self.join_at = StringUtils.unify_datetime_str(
join_at_text[0].replace('注册日期', '').replace('註冊日期', '').replace('Registration date', ''))
def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]:
"""
做种相关信息
:param html_text:
:param multi_page: 是否多页数据
:return: 下页地址
"""
html = etree.HTML(html_text)
if not html:
return None
size_col = 9
seeders_col = 2
# 搜索size列
if html.xpath('//thead//th[contains(@class,"size")]'):
size_col = len(html.xpath('//thead//th[contains(@class,"size")][1]/preceding-sibling::th')) + 1
# 搜索seeders列
if html.xpath('//thead//th[contains(@class,"seeders")]'):
seeders_col = len(html.xpath('//thead//th[contains(@class,"seeders")]/preceding-sibling::th')) + 1
page_seeding = 0
page_seeding_size = 0
page_seeding_info = []
seeding_sizes = html.xpath(f'//tr[position()]/td[{size_col}]')
seeding_seeders = html.xpath(f'//tr[position()]/td[{seeders_col}]')
if seeding_sizes and seeding_seeders:
page_seeding = len(seeding_sizes)
for i in range(0, len(seeding_sizes)):
size = StringUtils.num_filesize(seeding_sizes[i].xpath("string(.)").strip())
seeders = StringUtils.str_int(seeding_seeders[i].xpath("string(.)").strip())
page_seeding_size += size
page_seeding_info.append([seeders, size])
self.seeding += page_seeding
self.seeding_size += page_seeding_size
self.seeding_info.extend(page_seeding_info)
# 是否存在下页数据
next_page = None
next_pages = html.xpath('//ul[@class="pagination"]/li[contains(@class,"active")]/following-sibling::li')
if next_pages and len(next_pages) > 1:
page_num = next_pages[0].xpath("string(.)").strip()
if page_num.isdigit():
next_page = f"{self._torrent_seeding_page}&page={page_num}"
return next_page
def _parse_user_traffic_info(self, html_text: str):
html_text = self._prepare_html_text(html_text)
upload_match = re.search(r"[^总]上[传傳]量?[:_<>/a-zA-Z-=\"'\s#;]+([\d,.\s]+[KMGTPI]*B)", html_text,
re.IGNORECASE)
self.upload = StringUtils.num_filesize(upload_match.group(1).strip()) if upload_match else 0
download_match = re.search(r"[^总子影力]下[载載]量?[:_<>/a-zA-Z-=\"'\s#;]+([\d,.\s]+[KMGTPI]*B)", html_text,
re.IGNORECASE)
self.download = StringUtils.num_filesize(download_match.group(1).strip()) if download_match else 0
ratio_match = re.search(r"分享率[:_<>/a-zA-Z-=\"'\s#;]+([\d,.\s]+)", html_text)
self.ratio = StringUtils.str_float(ratio_match.group(1)) if (
ratio_match and ratio_match.group(1).strip()) else 0.0
def _parse_message_unread_links(self, html_text: str, msg_links: list) -> Optional[str]:
return None
def _parse_message_content(self, html_text):
return None, None, None

View File

@@ -0,0 +1,139 @@
# -*- coding: utf-8 -*-
import re
from typing import Optional
from lxml import etree
from app.plugins.sitestatistic.siteuserinfo import ISiteUserInfo, SITE_BASE_ORDER, SiteSchema
from app.utils.string import StringUtils
class DiscuzUserInfo(ISiteUserInfo):
schema = SiteSchema.DiscuzX
order = SITE_BASE_ORDER + 10
@classmethod
def match(cls, html_text: str) -> bool:
html = etree.HTML(html_text)
if not html:
return False
printable_text = html.xpath("string(.)") if html else ""
return 'Powered by Discuz!' in printable_text
def _parse_user_base_info(self, html_text: str):
html_text = self._prepare_html_text(html_text)
html = etree.HTML(html_text)
user_info = html.xpath('//a[contains(@href, "&uid=")]')
if user_info:
user_id_match = re.search(r"&uid=(\d+)", user_info[0].attrib['href'])
if user_id_match and user_id_match.group().strip():
self.userid = user_id_match.group(1)
self._torrent_seeding_page = f"forum.php?&mod=torrents&cat_5up=on"
self._user_detail_page = user_info[0].attrib['href']
self.username = user_info[0].text.strip()
def _parse_site_page(self, html_text: str):
# TODO
pass
def _parse_user_detail_info(self, html_text: str):
"""
解析用户额外信息,加入时间,等级
:param html_text:
:return:
"""
html = etree.HTML(html_text)
if not html:
return None
# 用户等级
user_levels_text = html.xpath('//a[contains(@href, "usergroup")]/text()')
if user_levels_text:
self.user_level = user_levels_text[-1].strip()
# 加入日期
join_at_text = html.xpath('//li[em[text()="注册时间"]]/text()')
if join_at_text:
self.join_at = StringUtils.unify_datetime_str(join_at_text[0].strip())
# 分享率
ratio_text = html.xpath('//li[contains(.//text(), "分享率")]//text()')
if ratio_text:
ratio_match = re.search(r"\(([\d,.]+)\)", ratio_text[0])
if ratio_match and ratio_match.group(1).strip():
self.bonus = StringUtils.str_float(ratio_match.group(1))
# 积分
bouns_text = html.xpath('//li[em[text()="积分"]]/text()')
if bouns_text:
self.bonus = StringUtils.str_float(bouns_text[0].strip())
# 上传
upload_text = html.xpath('//li[em[contains(text(),"上传量")]]/text()')
if upload_text:
self.upload = StringUtils.num_filesize(upload_text[0].strip().split('/')[-1])
# 下载
download_text = html.xpath('//li[em[contains(text(),"下载量")]]/text()')
if download_text:
self.download = StringUtils.num_filesize(download_text[0].strip().split('/')[-1])
def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]:
"""
做种相关信息
:param html_text:
:param multi_page: 是否多页数据
:return: 下页地址
"""
html = etree.HTML(html_text)
if not html:
return None
size_col = 3
seeders_col = 4
# 搜索size列
if html.xpath('//tr[position()=1]/td[.//img[@class="size"] and .//img[@alt="size"]]'):
size_col = len(html.xpath('//tr[position()=1]/td[.//img[@class="size"] '
'and .//img[@alt="size"]]/preceding-sibling::td')) + 1
# 搜索seeders列
if html.xpath('//tr[position()=1]/td[.//img[@class="seeders"] and .//img[@alt="seeders"]]'):
seeders_col = len(html.xpath('//tr[position()=1]/td[.//img[@class="seeders"] '
'and .//img[@alt="seeders"]]/preceding-sibling::td')) + 1
page_seeding = 0
page_seeding_size = 0
page_seeding_info = []
seeding_sizes = html.xpath(f'//tr[position()>1]/td[{size_col}]')
seeding_seeders = html.xpath(f'//tr[position()>1]/td[{seeders_col}]//text()')
if seeding_sizes and seeding_seeders:
page_seeding = len(seeding_sizes)
for i in range(0, len(seeding_sizes)):
size = StringUtils.num_filesize(seeding_sizes[i].xpath("string(.)").strip())
seeders = StringUtils.str_int(seeding_seeders[i])
page_seeding_size += size
page_seeding_info.append([seeders, size])
self.seeding += page_seeding
self.seeding_size += page_seeding_size
self.seeding_info.extend(page_seeding_info)
# 是否存在下页数据
next_page = None
next_page_text = html.xpath('//a[contains(.//text(), "下一页") or contains(.//text(), "下一頁")]/@href')
if next_page_text:
next_page = next_page_text[-1].strip()
return next_page
def _parse_user_traffic_info(self, html_text: str):
pass
def _parse_message_unread_links(self, html_text: str, msg_links: list) -> Optional[str]:
return None
def _parse_message_content(self, html_text):
return None, None, None

View File

@@ -0,0 +1,118 @@
# -*- coding: utf-8 -*-
import re
from typing import Optional
from lxml import etree
from app.plugins.sitestatistic.siteuserinfo import ISiteUserInfo, SITE_BASE_ORDER, SiteSchema
from app.utils.string import StringUtils
class FileListSiteUserInfo(ISiteUserInfo):
schema = SiteSchema.FileList
order = SITE_BASE_ORDER + 50
@classmethod
def match(cls, html_text: str) -> bool:
html = etree.HTML(html_text)
if not html:
return False
printable_text = html.xpath("string(.)") if html else ""
return 'Powered by FileList' in printable_text
def _parse_site_page(self, html_text: str):
html_text = self._prepare_html_text(html_text)
user_detail = re.search(r"userdetails.php\?id=(\d+)", html_text)
if user_detail and user_detail.group().strip():
self._user_detail_page = user_detail.group().strip().lstrip('/')
self.userid = user_detail.group(1)
self._torrent_seeding_page = f"snatchlist.php?id={self.userid}&action=torrents&type=seeding"
def _parse_user_base_info(self, html_text: str):
html_text = self._prepare_html_text(html_text)
html = etree.HTML(html_text)
ret = html.xpath(f'//a[contains(@href, "userdetails") and contains(@href, "{self.userid}")]//text()')
if ret:
self.username = str(ret[0])
def _parse_user_traffic_info(self, html_text: str):
"""
上传/下载/分享率 [做种数/魔力值]
:param html_text:
:return:
"""
return
def _parse_user_detail_info(self, html_text: str):
html_text = self._prepare_html_text(html_text)
html = etree.HTML(html_text)
upload_html = html.xpath('//table//tr/td[text()="Uploaded"]/following-sibling::td//text()')
if upload_html:
self.upload = StringUtils.num_filesize(upload_html[0])
download_html = html.xpath('//table//tr/td[text()="Downloaded"]/following-sibling::td//text()')
if download_html:
self.download = StringUtils.num_filesize(download_html[0])
self.ratio = 0 if self.download == 0 else self.upload / self.download
user_level_html = html.xpath('//table//tr/td[text()="Class"]/following-sibling::td//text()')
if user_level_html:
self.user_level = user_level_html[0].strip()
join_at_html = html.xpath('//table//tr/td[contains(text(), "Join")]/following-sibling::td//text()')
if join_at_html:
self.join_at = StringUtils.unify_datetime_str(join_at_html[0].strip())
bonus_html = html.xpath('//a[contains(@href, "shop.php")]')
if bonus_html:
self.bonus = StringUtils.str_float(bonus_html[0].xpath("string(.)").strip())
pass
def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]:
"""
做种相关信息
:param html_text:
:param multi_page: 是否多页数据
:return: 下页地址
"""
html = etree.HTML(html_text)
if not html:
return None
size_col = 6
seeders_col = 7
page_seeding = 0
page_seeding_size = 0
page_seeding_info = []
seeding_sizes = html.xpath(f'//table/tr[position()>1]/td[{size_col}]')
seeding_seeders = html.xpath(f'//table/tr[position()>1]/td[{seeders_col}]')
if seeding_sizes and seeding_seeders:
page_seeding = len(seeding_sizes)
for i in range(0, len(seeding_sizes)):
size = StringUtils.num_filesize(seeding_sizes[i].xpath("string(.)").strip())
seeders = StringUtils.str_int(seeding_seeders[i].xpath("string(.)").strip())
page_seeding_size += size
page_seeding_info.append([seeders, size])
self.seeding += page_seeding
self.seeding_size += page_seeding_size
self.seeding_info.extend(page_seeding_info)
# 是否存在下页数据
next_page = None
return next_page
def _parse_message_unread_links(self, html_text: str, msg_links: list) -> Optional[str]:
return None
def _parse_message_content(self, html_text):
return None, None, None

View File

@@ -0,0 +1,163 @@
# -*- coding: utf-8 -*-
import re
from typing import Optional
from lxml import etree
from app.plugins.sitestatistic.siteuserinfo import ISiteUserInfo, SITE_BASE_ORDER, SiteSchema
from app.utils.string import StringUtils
class GazelleSiteUserInfo(ISiteUserInfo):
schema = SiteSchema.Gazelle
order = SITE_BASE_ORDER
@classmethod
def match(cls, html_text: str) -> bool:
html = etree.HTML(html_text)
if not html:
return False
printable_text = html.xpath("string(.)") if html else ""
return "Powered by Gazelle" in printable_text or "DIC Music" in printable_text
def _parse_user_base_info(self, html_text: str):
html_text = self._prepare_html_text(html_text)
html = etree.HTML(html_text)
tmps = html.xpath('//a[contains(@href, "user.php?id=")]')
if tmps:
user_id_match = re.search(r"user.php\?id=(\d+)", tmps[0].attrib['href'])
if user_id_match and user_id_match.group().strip():
self.userid = user_id_match.group(1)
self._torrent_seeding_page = f"torrents.php?type=seeding&userid={self.userid}"
self._user_detail_page = f"user.php?id={self.userid}"
self.username = tmps[0].text.strip()
tmps = html.xpath('//*[@id="header-uploaded-value"]/@data-value')
if tmps:
self.upload = StringUtils.num_filesize(tmps[0])
else:
tmps = html.xpath('//li[@id="stats_seeding"]/span/text()')
if tmps:
self.upload = StringUtils.num_filesize(tmps[0])
tmps = html.xpath('//*[@id="header-downloaded-value"]/@data-value')
if tmps:
self.download = StringUtils.num_filesize(tmps[0])
else:
tmps = html.xpath('//li[@id="stats_leeching"]/span/text()')
if tmps:
self.download = StringUtils.num_filesize(tmps[0])
self.ratio = 0.0 if self.download <= 0.0 else round(self.upload / self.download, 3)
tmps = html.xpath('//a[contains(@href, "bonus.php")]/@data-tooltip')
if tmps:
bonus_match = re.search(r"([\d,.]+)", tmps[0])
if bonus_match and bonus_match.group(1).strip():
self.bonus = StringUtils.str_float(bonus_match.group(1))
else:
tmps = html.xpath('//a[contains(@href, "bonus.php")]')
if tmps:
bonus_text = tmps[0].xpath("string(.)")
bonus_match = re.search(r"([\d,.]+)", bonus_text)
if bonus_match and bonus_match.group(1).strip():
self.bonus = StringUtils.str_float(bonus_match.group(1))
def _parse_site_page(self, html_text: str):
# TODO
pass
def _parse_user_detail_info(self, html_text: str):
"""
解析用户额外信息,加入时间,等级
:param html_text:
:return:
"""
html = etree.HTML(html_text)
if not html:
return None
# 用户等级
user_levels_text = html.xpath('//*[@id="class-value"]/@data-value')
if user_levels_text:
self.user_level = user_levels_text[0].strip()
else:
user_levels_text = html.xpath('//li[contains(text(), "用户等级")]/text()')
if user_levels_text:
self.user_level = user_levels_text[0].split(':')[1].strip()
# 加入日期
join_at_text = html.xpath('//*[@id="join-date-value"]/@data-value')
if join_at_text:
self.join_at = StringUtils.unify_datetime_str(join_at_text[0].strip())
else:
join_at_text = html.xpath(
'//div[contains(@class, "box_userinfo_stats")]//li[contains(text(), "加入时间")]/span/text()')
if join_at_text:
self.join_at = StringUtils.unify_datetime_str(join_at_text[0].strip())
def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]:
"""
做种相关信息
:param html_text:
:param multi_page: 是否多页数据
:return: 下页地址
"""
html = etree.HTML(html_text)
if not html:
return None
size_col = 3
# 搜索size列
if html.xpath('//table[contains(@id, "torrent")]//tr[1]/td'):
size_col = len(html.xpath('//table[contains(@id, "torrent")]//tr[1]/td')) - 3
# 搜索seeders列
seeders_col = size_col + 2
page_seeding = 0
page_seeding_size = 0
page_seeding_info = []
seeding_sizes = html.xpath(f'//table[contains(@id, "torrent")]//tr[position()>1]/td[{size_col}]')
seeding_seeders = html.xpath(f'//table[contains(@id, "torrent")]//tr[position()>1]/td[{seeders_col}]/text()')
if seeding_sizes and seeding_seeders:
page_seeding = len(seeding_sizes)
for i in range(0, len(seeding_sizes)):
size = StringUtils.num_filesize(seeding_sizes[i].xpath("string(.)").strip())
seeders = int(seeding_seeders[i])
page_seeding_size += size
page_seeding_info.append([seeders, size])
if multi_page:
self.seeding += page_seeding
self.seeding_size += page_seeding_size
self.seeding_info.extend(page_seeding_info)
else:
if not self.seeding:
self.seeding = page_seeding
if not self.seeding_size:
self.seeding_size = page_seeding_size
if not self.seeding_info:
self.seeding_info = page_seeding_info
# 是否存在下页数据
next_page = None
next_page_text = html.xpath('//a[contains(.//text(), "Next") or contains(.//text(), "下一页")]/@href')
if next_page_text:
next_page = next_page_text[-1].strip()
return next_page
def _parse_user_traffic_info(self, html_text: str):
# TODO
pass
def _parse_message_unread_links(self, html_text: str, msg_links: list) -> Optional[str]:
return None
def _parse_message_content(self, html_text):
return None, None, None

View File

@@ -0,0 +1,93 @@
# -*- coding: utf-8 -*-
import re
from typing import Optional
from lxml import etree
from app.plugins.sitestatistic.siteuserinfo import ISiteUserInfo, SITE_BASE_ORDER, SiteSchema
from app.utils.string import StringUtils
class IptSiteUserInfo(ISiteUserInfo):
schema = SiteSchema.Ipt
order = SITE_BASE_ORDER + 35
@classmethod
def match(cls, html_text: str) -> bool:
return 'IPTorrents' in html_text
def _parse_user_base_info(self, html_text: str):
html_text = self._prepare_html_text(html_text)
html = etree.HTML(html_text)
tmps = html.xpath('//a[contains(@href, "/u/")]//text()')
tmps_id = html.xpath('//a[contains(@href, "/u/")]/@href')
if tmps:
self.username = str(tmps[-1])
if tmps_id:
user_id_match = re.search(r"/u/(\d+)", tmps_id[0])
if user_id_match and user_id_match.group().strip():
self.userid = user_id_match.group(1)
self._user_detail_page = f"user.php?u={self.userid}"
self._torrent_seeding_page = f"peers?u={self.userid}"
tmps = html.xpath('//div[@class = "stats"]/div/div')
if tmps:
self.upload = StringUtils.num_filesize(str(tmps[0].xpath('span/text()')[1]).strip())
self.download = StringUtils.num_filesize(str(tmps[0].xpath('span/text()')[2]).strip())
self.seeding = StringUtils.str_int(tmps[0].xpath('a')[2].xpath('text()')[0])
self.leeching = StringUtils.str_int(tmps[0].xpath('a')[2].xpath('text()')[1])
self.ratio = StringUtils.str_float(str(tmps[0].xpath('span/text()')[0]).strip().replace('-', '0'))
self.bonus = StringUtils.str_float(tmps[0].xpath('a')[3].xpath('text()')[0])
def _parse_site_page(self, html_text: str):
# TODO
pass
def _parse_user_detail_info(self, html_text: str):
html = etree.HTML(html_text)
if not html:
return
user_levels_text = html.xpath('//tr/th[text()="Class"]/following-sibling::td[1]/text()')
if user_levels_text:
self.user_level = user_levels_text[0].strip()
# 加入日期
join_at_text = html.xpath('//tr/th[text()="Join date"]/following-sibling::td[1]/text()')
if join_at_text:
self.join_at = StringUtils.unify_datetime_str(join_at_text[0].split(' (')[0])
def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]:
html = etree.HTML(html_text)
if not html:
return
# seeding start
seeding_end_pos = 3
if html.xpath('//tr/td[text() = "Leechers"]'):
seeding_end_pos = len(html.xpath('//tr/td[text() = "Leechers"]/../preceding-sibling::tr')) + 1
seeding_end_pos = seeding_end_pos - 3
page_seeding = 0
page_seeding_size = 0
seeding_torrents = html.xpath('//tr/td[text() = "Seeders"]/../following-sibling::tr/td[position()=6]/text()')
if seeding_torrents:
page_seeding = seeding_end_pos
for per_size in seeding_torrents[:seeding_end_pos]:
if '(' in per_size and ')' in per_size:
per_size = per_size.split('(')[-1]
per_size = per_size.split(')')[0]
page_seeding_size += StringUtils.num_filesize(per_size)
self.seeding = page_seeding
self.seeding_size = page_seeding_size
def _parse_user_traffic_info(self, html_text: str):
# TODO
pass
def _parse_message_unread_links(self, html_text: str, msg_links: list) -> Optional[str]:
return None
def _parse_message_content(self, html_text):
return None, None, None

View File

@@ -0,0 +1,61 @@
# -*- coding: utf-8 -*-
import re
from lxml import etree
from app.plugins.sitestatistic.siteuserinfo import SITE_BASE_ORDER, SiteSchema
from app.plugins.sitestatistic.siteuserinfo.nexus_php import NexusPhpSiteUserInfo
from app.utils.string import StringUtils
class NexusHhanclubSiteUserInfo(NexusPhpSiteUserInfo):
schema = SiteSchema.NexusHhanclub
order = SITE_BASE_ORDER + 20
@classmethod
def match(cls, html_text: str) -> bool:
return 'hhanclub.top' in html_text
def _parse_user_traffic_info(self, html_text):
super()._parse_user_traffic_info(html_text)
html_text = self._prepare_html_text(html_text)
html = etree.HTML(html_text)
# 上传、下载、分享率
upload_match = re.search(r"[_<>/a-zA-Z-=\"'\s#;]+([\d,.\s]+[KMGTPI]*B)",
html.xpath('//*[@id="user-info-panel"]/div[2]/div[2]/div[4]/text()')[0])
download_match = re.search(r"[_<>/a-zA-Z-=\"'\s#;]+([\d,.\s]+[KMGTPI]*B)",
html.xpath('//*[@id="user-info-panel"]/div[2]/div[2]/div[5]/text()')[0])
ratio_match = re.search(r"分享率][:_<>/a-zA-Z-=\"'\s#;]+([\d,.\s]+)",
html.xpath('//*[@id="user-info-panel"]/div[2]/div[1]/div[1]/div/text()')[0])
# 计算分享率
self.upload = StringUtils.num_filesize(upload_match.group(1).strip()) if upload_match else 0
self.download = StringUtils.num_filesize(download_match.group(1).strip()) if download_match else 0
# 优先使用页面上的分享率
calc_ratio = 0.0 if self.download <= 0.0 else round(self.upload / self.download, 3)
self.ratio = StringUtils.str_float(ratio_match.group(1)) if (
ratio_match and ratio_match.group(1).strip()) else calc_ratio
def _parse_user_detail_info(self, html_text: str):
"""
解析用户额外信息,加入时间,等级
:param html_text:
:return:
"""
super()._parse_user_detail_info(html_text)
html = etree.HTML(html_text)
if not html:
return
# 加入时间
join_at_text = html.xpath('//*[@id="mainContent"]/div/div[2]/div[4]/div[3]/span[2]/text()[1]')
if join_at_text:
self.join_at = StringUtils.unify_datetime_str(join_at_text[0].split(' (')[0].strip())
def _get_user_level(self, html):
super()._get_user_level(html)
user_level_path = html.xpath('//*[@id="mainContent"]/div/div[2]/div[2]/div[4]/span[2]/img/@title')
if user_level_path:
self.user_level = user_level_path[0]

View File

@@ -0,0 +1,392 @@
# -*- coding: utf-8 -*-
import re
from typing import Optional
from lxml import etree
from app.log import logger
from app.plugins.sitestatistic.siteuserinfo import ISiteUserInfo, SITE_BASE_ORDER, SiteSchema
from app.utils.string import StringUtils
class NexusPhpSiteUserInfo(ISiteUserInfo):
schema = SiteSchema.NexusPhp
order = SITE_BASE_ORDER * 2
@classmethod
def match(cls, html_text: str) -> bool:
"""
默认使用NexusPhp解析
:param html_text:
:return:
"""
return True
def _parse_site_page(self, html_text: str):
html_text = self._prepare_html_text(html_text)
user_detail = re.search(r"userdetails.php\?id=(\d+)", html_text)
if user_detail and user_detail.group().strip():
self._user_detail_page = user_detail.group().strip().lstrip('/')
self.userid = user_detail.group(1)
self._torrent_seeding_page = f"getusertorrentlistajax.php?userid={self.userid}&type=seeding"
else:
user_detail = re.search(r"(userdetails)", html_text)
if user_detail and user_detail.group().strip():
self._user_detail_page = user_detail.group().strip().lstrip('/')
self.userid = None
self._torrent_seeding_page = None
def _parse_message_unread(self, html_text):
"""
解析未读短消息数量
:param html_text:
:return:
"""
html = etree.HTML(html_text)
if not html:
return
message_labels = html.xpath('//a[@href="messages.php"]/..')
message_labels.extend(html.xpath('//a[contains(@href, "messages.php")]/..'))
if message_labels:
message_text = message_labels[0].xpath("string(.)")
logger.debug(f"{self.site_name} 消息原始信息 {message_text}")
message_unread_match = re.findall(r"[^Date](信息箱\s*|\(|你有\xa0)(\d+)", message_text)
if message_unread_match and len(message_unread_match[-1]) == 2:
self.message_unread = StringUtils.str_int(message_unread_match[-1][1])
elif message_text.isdigit():
self.message_unread = StringUtils.str_int(message_text)
def _parse_user_base_info(self, html_text: str):
# 合并解析,减少额外请求调用
self._parse_user_traffic_info(html_text)
self._user_traffic_page = None
self._parse_message_unread(html_text)
html = etree.HTML(html_text)
if not html:
return
ret = html.xpath(f'//a[contains(@href, "userdetails") and contains(@href, "{self.userid}")]//b//text()')
if ret:
self.username = str(ret[0])
return
ret = html.xpath(f'//a[contains(@href, "userdetails") and contains(@href, "{self.userid}")]//text()')
if ret:
self.username = str(ret[0])
ret = html.xpath('//a[contains(@href, "userdetails")]//strong//text()')
if ret:
self.username = str(ret[0])
return
def _parse_user_traffic_info(self, html_text):
html_text = self._prepare_html_text(html_text)
upload_match = re.search(r"[^总]上[传傳]量?[:_<>/a-zA-Z-=\"'\s#;]+([\d,.\s]+[KMGTPI]*B)", html_text,
re.IGNORECASE)
self.upload = StringUtils.num_filesize(upload_match.group(1).strip()) if upload_match else 0
download_match = re.search(r"[^总子影力]下[载載]量?[:_<>/a-zA-Z-=\"'\s#;]+([\d,.\s]+[KMGTPI]*B)", html_text,
re.IGNORECASE)
self.download = StringUtils.num_filesize(download_match.group(1).strip()) if download_match else 0
ratio_match = re.search(r"分享率[:_<>/a-zA-Z-=\"'\s#;]+([\d,.\s]+)", html_text)
# 计算分享率
calc_ratio = 0.0 if self.download <= 0.0 else round(self.upload / self.download, 3)
# 优先使用页面上的分享率
self.ratio = StringUtils.str_float(ratio_match.group(1)) if (
ratio_match and ratio_match.group(1).strip()) else calc_ratio
leeching_match = re.search(r"(Torrents leeching|下载中)[\u4E00-\u9FA5\D\s]+(\d+)[\s\S]+<", html_text)
self.leeching = StringUtils.str_int(leeching_match.group(2)) if leeching_match and leeching_match.group(
2).strip() else 0
html = etree.HTML(html_text)
has_ucoin, self.bonus = self._parse_ucoin(html)
if has_ucoin:
return
tmps = html.xpath('//a[contains(@href,"mybonus")]/text()') if html else None
if tmps:
bonus_text = str(tmps[0]).strip()
bonus_match = re.search(r"([\d,.]+)", bonus_text)
if bonus_match and bonus_match.group(1).strip():
self.bonus = StringUtils.str_float(bonus_match.group(1))
return
bonus_match = re.search(r"mybonus.[\[\]:<>/a-zA-Z_\-=\"'\s#;.(使用魔力值豆]+\s*([\d,.]+)[<()&\s]", html_text)
try:
if bonus_match and bonus_match.group(1).strip():
self.bonus = StringUtils.str_float(bonus_match.group(1))
return
bonus_match = re.search(r"[魔力值|\]][\[\]:<>/a-zA-Z_\-=\"'\s#;]+\s*([\d,.]+|\"[\d,.]+\")[<>()&\s]",
html_text,
flags=re.S)
if bonus_match and bonus_match.group(1).strip():
self.bonus = StringUtils.str_float(bonus_match.group(1).strip('"'))
except Exception as err:
logger.error(f"{self.site_name} 解析魔力值出错, 错误信息: {str(err)}")
@staticmethod
def _parse_ucoin(html):
"""
解析ucoin, 统一转换为铜币
:param html:
:return:
"""
if html:
gold, silver, copper = None, None, None
golds = html.xpath('//span[@class = "ucoin-symbol ucoin-gold"]//text()')
if golds:
gold = StringUtils.str_float(str(golds[-1]))
silvers = html.xpath('//span[@class = "ucoin-symbol ucoin-silver"]//text()')
if silvers:
silver = StringUtils.str_float(str(silvers[-1]))
coppers = html.xpath('//span[@class = "ucoin-symbol ucoin-copper"]//text()')
if coppers:
copper = StringUtils.str_float(str(coppers[-1]))
if gold or silver or copper:
gold = gold if gold else 0
silver = silver if silver else 0
copper = copper if copper else 0
return True, gold * 100 * 100 + silver * 100 + copper
return False, 0.0
def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]:
"""
做种相关信息
:param html_text:
:param multi_page: 是否多页数据
:return: 下页地址
"""
html = etree.HTML(str(html_text).replace(r'\/', '/'))
if not html:
return None
# 首页存在扩展链接,使用扩展链接
seeding_url_text = html.xpath('//a[contains(@href,"torrents.php") '
'and contains(@href,"seeding")]/@href')
if multi_page is False and seeding_url_text and seeding_url_text[0].strip():
self._torrent_seeding_page = seeding_url_text[0].strip()
return self._torrent_seeding_page
size_col = 3
seeders_col = 4
# 搜索size列
size_col_xpath = '//tr[position()=1]/' \
'td[(img[@class="size"] and img[@alt="size"])' \
' or (text() = "大小")' \
' or (a/img[@class="size" and @alt="size"])]'
if html.xpath(size_col_xpath):
size_col = len(html.xpath(f'{size_col_xpath}/preceding-sibling::td')) + 1
# 搜索seeders列
seeders_col_xpath = '//tr[position()=1]/' \
'td[(img[@class="seeders"] and img[@alt="seeders"])' \
' or (text() = "在做种")' \
' or (a/img[@class="seeders" and @alt="seeders"])]'
if html.xpath(seeders_col_xpath):
seeders_col = len(html.xpath(f'{seeders_col_xpath}/preceding-sibling::td')) + 1
page_seeding = 0
page_seeding_size = 0
page_seeding_info = []
# 如果 table class="torrents"则增加table[@class="torrents"]
table_class = '//table[@class="torrents"]' if html.xpath('//table[@class="torrents"]') else ''
seeding_sizes = html.xpath(f'{table_class}//tr[position()>1]/td[{size_col}]')
seeding_seeders = html.xpath(f'{table_class}//tr[position()>1]/td[{seeders_col}]/b/a/text()')
if not seeding_seeders:
seeding_seeders = html.xpath(f'{table_class}//tr[position()>1]/td[{seeders_col}]//text()')
if seeding_sizes and seeding_seeders:
page_seeding = len(seeding_sizes)
for i in range(0, len(seeding_sizes)):
size = StringUtils.num_filesize(seeding_sizes[i].xpath("string(.)").strip())
seeders = StringUtils.str_int(seeding_seeders[i])
page_seeding_size += size
page_seeding_info.append([seeders, size])
self.seeding += page_seeding
self.seeding_size += page_seeding_size
self.seeding_info.extend(page_seeding_info)
# 是否存在下页数据
next_page = None
next_page_text = html.xpath('//a[contains(.//text(), "下一页") or contains(.//text(), "下一頁")]/@href')
if next_page_text:
next_page = next_page_text[-1].strip()
# fix up page url
if self.userid not in next_page:
next_page = f'{next_page}&userid={self.userid}&type=seeding'
return next_page
def _parse_user_detail_info(self, html_text: str):
"""
解析用户额外信息,加入时间,等级
:param html_text:
:return:
"""
html = etree.HTML(html_text)
if not html:
return
self._get_user_level(html)
self._fixup_traffic_info(html)
# 加入日期
join_at_text = html.xpath(
'//tr/td[text()="加入日期" or text()="注册日期" or *[text()="加入日期"]]/following-sibling::td[1]//text()'
'|//div/b[text()="加入日期"]/../text()')
if join_at_text:
self.join_at = StringUtils.unify_datetime_str(join_at_text[0].split(' (')[0].strip())
# 做种体积 & 做种数
# seeding 页面获取不到的话,此处再获取一次
seeding_sizes = html.xpath('//tr/td[text()="当前上传"]/following-sibling::td[1]//'
'table[tr[1][td[4 and text()="尺寸"]]]//tr[position()>1]/td[4]')
seeding_seeders = html.xpath('//tr/td[text()="当前上传"]/following-sibling::td[1]//'
'table[tr[1][td[5 and text()="做种者"]]]//tr[position()>1]/td[5]//text()')
tmp_seeding = len(seeding_sizes)
tmp_seeding_size = 0
tmp_seeding_info = []
for i in range(0, len(seeding_sizes)):
size = StringUtils.num_filesize(seeding_sizes[i].xpath("string(.)").strip())
seeders = StringUtils.str_int(seeding_seeders[i])
tmp_seeding_size += size
tmp_seeding_info.append([seeders, size])
if not self.seeding_size:
self.seeding_size = tmp_seeding_size
if not self.seeding:
self.seeding = tmp_seeding
if not self.seeding_info:
self.seeding_info = tmp_seeding_info
seeding_sizes = html.xpath('//tr/td[text()="做种统计"]/following-sibling::td[1]//text()')
if seeding_sizes:
seeding_match = re.search(r"总做种数:\s+(\d+)", seeding_sizes[0], re.IGNORECASE)
seeding_size_match = re.search(r"总做种体积:\s+([\d,.\s]+[KMGTPI]*B)", seeding_sizes[0], re.IGNORECASE)
tmp_seeding = StringUtils.str_int(seeding_match.group(1)) if (
seeding_match and seeding_match.group(1)) else 0
tmp_seeding_size = StringUtils.num_filesize(
seeding_size_match.group(1).strip()) if seeding_size_match else 0
if not self.seeding_size:
self.seeding_size = tmp_seeding_size
if not self.seeding:
self.seeding = tmp_seeding
self._fixup_torrent_seeding_page(html)
def _fixup_torrent_seeding_page(self, html):
"""
修正种子页面链接
:param html:
:return:
"""
# 单独的种子页面
seeding_url_text = html.xpath('//a[contains(@href,"getusertorrentlist.php") '
'and contains(@href,"seeding")]/@href')
if seeding_url_text:
self._torrent_seeding_page = seeding_url_text[0].strip()
# 从JS调用种获取用户ID
seeding_url_text = html.xpath('//a[contains(@href, "javascript: getusertorrentlistajax") '
'and contains(@href,"seeding")]/@href')
csrf_text = html.xpath('//meta[@name="x-csrf"]/@content')
if not self._torrent_seeding_page and seeding_url_text:
user_js = re.search(r"javascript: getusertorrentlistajax\(\s*'(\d+)", seeding_url_text[0])
if user_js and user_js.group(1).strip():
self.userid = user_js.group(1).strip()
self._torrent_seeding_page = f"getusertorrentlistajax.php?userid={self.userid}&type=seeding"
elif seeding_url_text and csrf_text:
if csrf_text[0].strip():
self._torrent_seeding_page \
= f"ajax_getusertorrentlist.php"
self._torrent_seeding_params = {'userid': self.userid, 'type': 'seeding', 'csrf': csrf_text[0].strip()}
# 分类做种模式
# 临时屏蔽
# seeding_url_text = html.xpath('//tr/td[text()="当前做种"]/following-sibling::td[1]'
# '/table//td/a[contains(@href,"seeding")]/@href')
# if seeding_url_text:
# self._torrent_seeding_page = seeding_url_text
def _get_user_level(self, html):
# 等级 获取同一行等级数据图片格式等级取title信息否则取文本信息
user_levels_text = html.xpath('//tr/td[text()="等級" or text()="等级" or *[text()="等级"]]/'
'following-sibling::td[1]/img[1]/@title')
if user_levels_text:
self.user_level = user_levels_text[0].strip()
return
user_levels_text = html.xpath('//tr/td[text()="等級" or text()="等级"]/'
'following-sibling::td[1 and not(img)]'
'|//tr/td[text()="等級" or text()="等级"]/'
'following-sibling::td[1 and img[not(@title)]]')
if user_levels_text:
self.user_level = user_levels_text[0].xpath("string(.)").strip()
return
user_levels_text = html.xpath('//tr/td[text()="等級" or text()="等级"]/'
'following-sibling::td[1]')
if user_levels_text:
self.user_level = user_levels_text[0].xpath("string(.)").strip()
return
user_levels_text = html.xpath('//a[contains(@href, "userdetails")]/text()')
if not self.user_level and user_levels_text:
for user_level_text in user_levels_text:
user_level_match = re.search(r"\[(.*)]", user_level_text)
if user_level_match and user_level_match.group(1).strip():
self.user_level = user_level_match.group(1).strip()
break
def _parse_message_unread_links(self, html_text: str, msg_links: list) -> Optional[str]:
html = etree.HTML(html_text)
if not html:
return None
message_links = html.xpath('//tr[not(./td/img[@alt="Read"])]/td/a[contains(@href, "viewmessage")]/@href')
msg_links.extend(message_links)
# 是否存在下页数据
next_page = None
next_page_text = html.xpath('//a[contains(.//text(), "下一页") or contains(.//text(), "下一頁")]/@href')
if next_page_text:
next_page = next_page_text[-1].strip()
return next_page
def _parse_message_content(self, html_text):
html = etree.HTML(html_text)
if not html:
return None, None, None
# 标题
message_head_text = None
message_head = html.xpath('//h1/text()'
'|//div[@class="layui-card-header"]/span[1]/text()')
if message_head:
message_head_text = message_head[-1].strip()
# 消息时间
message_date_text = None
message_date = html.xpath('//h1/following-sibling::table[.//tr/td[@class="colhead"]]//tr[2]/td[2]'
'|//div[@class="layui-card-header"]/span[2]/span[2]')
if message_date:
message_date_text = message_date[0].xpath("string(.)").strip()
# 消息内容
message_content_text = None
message_content = html.xpath('//h1/following-sibling::table[.//tr/td[@class="colhead"]]//tr[3]/td'
'|//div[contains(@class,"layui-card-body")]')
if message_content:
message_content_text = message_content[0].xpath("string(.)").strip()
return message_head_text, message_date_text, message_content_text
def _fixup_traffic_info(self, html):
# fixup bonus
if not self.bonus:
bonus_text = html.xpath('//tr/td[text()="魔力值" or text()="猫粮"]/following-sibling::td[1]/text()')
if bonus_text:
self.bonus = StringUtils.str_float(bonus_text[0].strip())

View File

@@ -0,0 +1,24 @@
# -*- coding: utf-8 -*-
import re
from app.plugins.sitestatistic.siteuserinfo import SITE_BASE_ORDER, SiteSchema
from app.plugins.sitestatistic.siteuserinfo.nexus_php import NexusPhpSiteUserInfo
class NexusProjectSiteUserInfo(NexusPhpSiteUserInfo):
schema = SiteSchema.NexusProject
order = SITE_BASE_ORDER + 25
@classmethod
def match(cls, html_text: str) -> bool:
return 'Nexus Project' in html_text
def _parse_site_page(self, html_text: str):
html_text = self._prepare_html_text(html_text)
user_detail = re.search(r"userdetails.php\?id=(\d+)", html_text)
if user_detail and user_detail.group().strip():
self._user_detail_page = user_detail.group().strip().lstrip('/')
self.userid = user_detail.group(1)
self._torrent_seeding_page = f"viewusertorrents.php?id={self.userid}&show=seeding"

View File

@@ -0,0 +1,57 @@
# -*- coding: utf-8 -*-
import json
from typing import Optional
from lxml import etree
from app.log import logger
from app.plugins.sitestatistic.siteuserinfo import SITE_BASE_ORDER, SiteSchema
from app.plugins.sitestatistic.siteuserinfo.nexus_php import NexusPhpSiteUserInfo
class NexusRabbitSiteUserInfo(NexusPhpSiteUserInfo):
schema = SiteSchema.NexusRabbit
order = SITE_BASE_ORDER + 5
@classmethod
def match(cls, html_text: str) -> bool:
html = etree.HTML(html_text)
if not html:
return False
printable_text = html.xpath("string(.)") if html else ""
return 'Style by Rabbit' in printable_text
def _parse_site_page(self, html_text: str):
super()._parse_site_page(html_text)
self._torrent_seeding_page = f"getusertorrentlistajax.php?page=1&limit=5000000&type=seeding&uid={self.userid}"
self._torrent_seeding_headers = {"Accept": "application/json, text/javascript, */*; q=0.01"}
def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]:
"""
做种相关信息
:param html_text:
:param multi_page: 是否多页数据
:return: 下页地址
"""
try:
torrents = json.loads(html_text).get('data')
except Exception as e:
logger.error(f"解析做种信息失败: {str(e)}")
return
page_seeding_size = 0
page_seeding_info = []
page_seeding = len(torrents)
for torrent in torrents:
seeders = int(torrent.get('seeders', 0))
size = int(torrent.get('size', 0))
page_seeding_size += int(torrent.get('size', 0))
page_seeding_info.append([seeders, size])
self.seeding += page_seeding
self.seeding_size += page_seeding_size
self.seeding_info.extend(page_seeding_info)

View File

@@ -0,0 +1,110 @@
# -*- coding: utf-8 -*-
import re
from typing import Optional
from lxml import etree
from app.plugins.sitestatistic.siteuserinfo import ISiteUserInfo, SITE_BASE_ORDER, SiteSchema
from app.utils.string import StringUtils
class SmallHorseSiteUserInfo(ISiteUserInfo):
schema = SiteSchema.SmallHorse
order = SITE_BASE_ORDER + 30
@classmethod
def match(cls, html_text: str) -> bool:
return 'Small Horse' in html_text
def _parse_site_page(self, html_text: str):
html_text = self._prepare_html_text(html_text)
user_detail = re.search(r"user.php\?id=(\d+)", html_text)
if user_detail and user_detail.group().strip():
self._user_detail_page = user_detail.group().strip().lstrip('/')
self.userid = user_detail.group(1)
self._torrent_seeding_page = f"torrents.php?type=seeding&userid={self.userid}"
self._user_traffic_page = f"user.php?id={self.userid}"
def _parse_user_base_info(self, html_text: str):
html_text = self._prepare_html_text(html_text)
html = etree.HTML(html_text)
ret = html.xpath('//a[contains(@href, "user.php")]//text()')
if ret:
self.username = str(ret[0])
def _parse_user_traffic_info(self, html_text: str):
"""
上传/下载/分享率 [做种数/魔力值]
:param html_text:
:return:
"""
html_text = self._prepare_html_text(html_text)
html = etree.HTML(html_text)
tmps = html.xpath('//ul[@class = "stats nobullet"]')
if tmps:
if tmps[1].xpath("li") and tmps[1].xpath("li")[0].xpath("span//text()"):
self.join_at = StringUtils.unify_datetime_str(tmps[1].xpath("li")[0].xpath("span//text()")[0])
self.upload = StringUtils.num_filesize(str(tmps[1].xpath("li")[2].xpath("text()")[0]).split(":")[1].strip())
self.download = StringUtils.num_filesize(
str(tmps[1].xpath("li")[3].xpath("text()")[0]).split(":")[1].strip())
if tmps[1].xpath("li")[4].xpath("span//text()"):
self.ratio = StringUtils.str_float(str(tmps[1].xpath("li")[4].xpath("span//text()")[0]).replace('', '0'))
else:
self.ratio = StringUtils.str_float(str(tmps[1].xpath("li")[5].xpath("text()")[0]).split(":")[1])
self.bonus = StringUtils.str_float(str(tmps[1].xpath("li")[5].xpath("text()")[0]).split(":")[1])
self.user_level = str(tmps[3].xpath("li")[0].xpath("text()")[0]).split(":")[1].strip()
self.leeching = StringUtils.str_int(
(tmps[4].xpath("li")[6].xpath("text()")[0]).split(":")[1].replace("[", ""))
def _parse_user_detail_info(self, html_text: str):
pass
def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]:
"""
做种相关信息
:param html_text:
:param multi_page: 是否多页数据
:return: 下页地址
"""
html = etree.HTML(html_text)
if not html:
return None
size_col = 6
seeders_col = 8
page_seeding = 0
page_seeding_size = 0
page_seeding_info = []
seeding_sizes = html.xpath(f'//table[@id="torrent_table"]//tr[position()>1]/td[{size_col}]')
seeding_seeders = html.xpath(f'//table[@id="torrent_table"]//tr[position()>1]/td[{seeders_col}]')
if seeding_sizes and seeding_seeders:
page_seeding = len(seeding_sizes)
for i in range(0, len(seeding_sizes)):
size = StringUtils.num_filesize(seeding_sizes[i].xpath("string(.)").strip())
seeders = StringUtils.str_int(seeding_seeders[i].xpath("string(.)").strip())
page_seeding_size += size
page_seeding_info.append([seeders, size])
self.seeding += page_seeding
self.seeding_size += page_seeding_size
self.seeding_info.extend(page_seeding_info)
# 是否存在下页数据
next_page = None
next_pages = html.xpath('//ul[@class="pagination"]/li[contains(@class,"active")]/following-sibling::li')
if next_pages and len(next_pages) > 1:
page_num = next_pages[0].xpath("string(.)").strip()
if page_num.isdigit():
next_page = f"{self._torrent_seeding_page}&page={page_num}"
return next_page
def _parse_message_unread_links(self, html_text: str, msg_links: list) -> Optional[str]:
return None
def _parse_message_content(self, html_text):
return None, None, None

View File

@@ -0,0 +1,103 @@
# -*- coding: utf-8 -*-
import json
import re
from typing import Optional
from app.plugins.sitestatistic.siteuserinfo import ISiteUserInfo, SITE_BASE_ORDER, SiteSchema
from app.utils.string import StringUtils
class TNodeSiteUserInfo(ISiteUserInfo):
schema = SiteSchema.TNode
order = SITE_BASE_ORDER + 60
@classmethod
def match(cls, html_text: str) -> bool:
return 'Powered By TNode' in html_text
def _parse_site_page(self, html_text: str):
html_text = self._prepare_html_text(html_text)
# <meta name="x-csrf-token" content="fd169876a7b4846f3a7a16fcd5cccf8d">
csrf_token = re.search(r'<meta name="x-csrf-token" content="(.+?)">', html_text)
if csrf_token:
self._addition_headers = {'X-CSRF-TOKEN': csrf_token.group(1)}
self._user_detail_page = "api/user/getMainInfo"
self._torrent_seeding_page = "api/user/listTorrentActivity?id=&type=seeding&page=1&size=20000"
def _parse_logged_in(self, html_text):
"""
判断是否登录成功, 通过判断是否存在用户信息
暂时跳过检测,待后续优化
:param html_text:
:return:
"""
return True
def _parse_user_base_info(self, html_text: str):
self.username = self.userid
def _parse_user_traffic_info(self, html_text: str):
pass
def _parse_user_detail_info(self, html_text: str):
detail = json.loads(html_text)
if detail.get("status") != 200:
return
user_info = detail.get("data", {})
self.userid = user_info.get("id")
self.username = user_info.get("username")
self.user_level = user_info.get("class", {}).get("name")
self.join_at = user_info.get("regTime", 0)
self.join_at = StringUtils.unify_datetime_str(str(self.join_at))
self.upload = user_info.get("upload")
self.download = user_info.get("download")
self.ratio = 0 if self.download <= 0 else round(self.upload / self.download, 3)
self.bonus = user_info.get("bonus")
self.message_unread = user_info.get("unreadAdmin", 0) + user_info.get("unreadInbox", 0) + user_info.get(
"unreadSystem", 0)
pass
def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]:
"""
解析用户做种信息
"""
seeding_info = json.loads(html_text)
if seeding_info.get("status") != 200:
return
torrents = seeding_info.get("data", {}).get("torrents", [])
page_seeding_size = 0
page_seeding_info = []
for torrent in torrents:
size = torrent.get("size", 0)
seeders = torrent.get("seeding", 0)
page_seeding_size += size
page_seeding_info.append([seeders, size])
self.seeding += len(torrents)
self.seeding_size += page_seeding_size
self.seeding_info.extend(page_seeding_info)
# 是否存在下页数据
next_page = None
return next_page
def _parse_message_unread_links(self, html_text: str, msg_links: list) -> Optional[str]:
return None
def _parse_message_content(self, html_text):
"""
系统信息 api/message/listSystem?page=1&size=20
收件箱信息 api/message/listInbox?page=1&size=20
管理员信息 api/message/listAdmin?page=1&size=20
:param html_text:
:return:
"""
return None, None, None

View File

@@ -0,0 +1,109 @@
# -*- coding: utf-8 -*-
import re
from typing import Optional
from lxml import etree
from app.plugins.sitestatistic.siteuserinfo import ISiteUserInfo, SITE_BASE_ORDER, SiteSchema
from app.utils.string import StringUtils
class TorrentLeechSiteUserInfo(ISiteUserInfo):
schema = SiteSchema.TorrentLeech
order = SITE_BASE_ORDER + 40
@classmethod
def match(cls, html_text: str) -> bool:
return 'TorrentLeech' in html_text
def _parse_site_page(self, html_text: str):
html_text = self._prepare_html_text(html_text)
user_detail = re.search(r"/profile/([^/]+)/", html_text)
if user_detail and user_detail.group().strip():
self._user_detail_page = user_detail.group().strip().lstrip('/')
self.userid = user_detail.group(1)
self._user_traffic_page = f"profile/{self.userid}/view"
self._torrent_seeding_page = f"profile/{self.userid}/seeding"
def _parse_user_base_info(self, html_text: str):
self.username = self.userid
def _parse_user_traffic_info(self, html_text: str):
"""
上传/下载/分享率 [做种数/魔力值]
:param html_text:
:return:
"""
html_text = self._prepare_html_text(html_text)
html = etree.HTML(html_text)
upload_html = html.xpath('//div[contains(@class,"profile-uploaded")]//span/text()')
if upload_html:
self.upload = StringUtils.num_filesize(upload_html[0])
download_html = html.xpath('//div[contains(@class,"profile-downloaded")]//span/text()')
if download_html:
self.download = StringUtils.num_filesize(download_html[0])
ratio_html = html.xpath('//div[contains(@class,"profile-ratio")]//span/text()')
if ratio_html:
self.ratio = StringUtils.str_float(ratio_html[0].replace('', '0'))
user_level_html = html.xpath('//table[contains(@class, "profileViewTable")]'
'//tr/td[text()="Class"]/following-sibling::td/text()')
if user_level_html:
self.user_level = user_level_html[0].strip()
join_at_html = html.xpath('//table[contains(@class, "profileViewTable")]'
'//tr/td[text()="Registration date"]/following-sibling::td/text()')
if join_at_html:
self.join_at = StringUtils.unify_datetime_str(join_at_html[0].strip())
bonus_html = html.xpath('//span[contains(@class, "total-TL-points")]/text()')
if bonus_html:
self.bonus = StringUtils.str_float(bonus_html[0].strip())
def _parse_user_detail_info(self, html_text: str):
pass
def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]:
"""
做种相关信息
:param html_text:
:param multi_page: 是否多页数据
:return: 下页地址
"""
html = etree.HTML(html_text)
if not html:
return None
size_col = 2
seeders_col = 7
page_seeding = 0
page_seeding_size = 0
page_seeding_info = []
seeding_sizes = html.xpath(f'//tbody/tr/td[{size_col}]')
seeding_seeders = html.xpath(f'//tbody/tr/td[{seeders_col}]/text()')
if seeding_sizes and seeding_seeders:
page_seeding = len(seeding_sizes)
for i in range(0, len(seeding_sizes)):
size = StringUtils.num_filesize(seeding_sizes[i].xpath("string(.)").strip())
seeders = StringUtils.str_int(seeding_seeders[i])
page_seeding_size += size
page_seeding_info.append([seeders, size])
self.seeding += page_seeding
self.seeding_size += page_seeding_size
self.seeding_info.extend(page_seeding_info)
# 是否存在下页数据
next_page = None
return next_page
def _parse_message_unread_links(self, html_text: str, msg_links: list) -> Optional[str]:
return None
def _parse_message_content(self, html_text):
return None, None, None

View File

@@ -0,0 +1,130 @@
# -*- coding: utf-8 -*-
import re
from typing import Optional
from lxml import etree
from app.plugins.sitestatistic.siteuserinfo import ISiteUserInfo, SITE_BASE_ORDER, SiteSchema
from app.utils.string import StringUtils
class Unit3dSiteUserInfo(ISiteUserInfo):
schema = SiteSchema.Unit3d
order = SITE_BASE_ORDER + 15
@classmethod
def match(cls, html_text: str) -> bool:
return "unit3d.js" in html_text
def _parse_user_base_info(self, html_text: str):
html_text = self._prepare_html_text(html_text)
html = etree.HTML(html_text)
tmps = html.xpath('//a[contains(@href, "/users/") and contains(@href, "settings")]/@href')
if tmps:
user_name_match = re.search(r"/users/(.+)/settings", tmps[0])
if user_name_match and user_name_match.group().strip():
self.username = user_name_match.group(1)
self._torrent_seeding_page = f"/users/{self.username}/active?perPage=100&client=&seeding=include"
self._user_detail_page = f"/users/{self.username}"
tmps = html.xpath('//a[contains(@href, "bonus/earnings")]')
if tmps:
bonus_text = tmps[0].xpath("string(.)")
bonus_match = re.search(r"([\d,.]+)", bonus_text)
if bonus_match and bonus_match.group(1).strip():
self.bonus = StringUtils.str_float(bonus_match.group(1))
def _parse_site_page(self, html_text: str):
# TODO
pass
def _parse_user_detail_info(self, html_text: str):
"""
解析用户额外信息,加入时间,等级
:param html_text:
:return:
"""
html = etree.HTML(html_text)
if not html:
return None
# 用户等级
user_levels_text = html.xpath('//div[contains(@class, "content")]//span[contains(@class, "badge-user")]/text()')
if user_levels_text:
self.user_level = user_levels_text[0].strip()
# 加入日期
join_at_text = html.xpath('//div[contains(@class, "content")]//h4[contains(text(), "注册日期") '
'or contains(text(), "註冊日期") '
'or contains(text(), "Registration date")]/text()')
if join_at_text:
self.join_at = StringUtils.unify_datetime_str(
join_at_text[0].replace('注册日期', '').replace('註冊日期', '').replace('Registration date', ''))
def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]:
"""
做种相关信息
:param html_text:
:param multi_page: 是否多页数据
:return: 下页地址
"""
html = etree.HTML(html_text)
if not html:
return None
size_col = 9
seeders_col = 2
# 搜索size列
if html.xpath('//thead//th[contains(@class,"size")]'):
size_col = len(html.xpath('//thead//th[contains(@class,"size")][1]/preceding-sibling::th')) + 1
# 搜索seeders列
if html.xpath('//thead//th[contains(@class,"seeders")]'):
seeders_col = len(html.xpath('//thead//th[contains(@class,"seeders")]/preceding-sibling::th')) + 1
page_seeding = 0
page_seeding_size = 0
page_seeding_info = []
seeding_sizes = html.xpath(f'//tr[position()]/td[{size_col}]')
seeding_seeders = html.xpath(f'//tr[position()]/td[{seeders_col}]')
if seeding_sizes and seeding_seeders:
page_seeding = len(seeding_sizes)
for i in range(0, len(seeding_sizes)):
size = StringUtils.num_filesize(seeding_sizes[i].xpath("string(.)").strip())
seeders = StringUtils.str_int(seeding_seeders[i].xpath("string(.)").strip())
page_seeding_size += size
page_seeding_info.append([seeders, size])
self.seeding += page_seeding
self.seeding_size += page_seeding_size
self.seeding_info.extend(page_seeding_info)
# 是否存在下页数据
next_page = None
next_pages = html.xpath('//ul[@class="pagination"]/li[contains(@class,"active")]/following-sibling::li')
if next_pages and len(next_pages) > 1:
page_num = next_pages[0].xpath("string(.)").strip()
if page_num.isdigit():
next_page = f"{self._torrent_seeding_page}&page={page_num}"
return next_page
def _parse_user_traffic_info(self, html_text: str):
html_text = self._prepare_html_text(html_text)
upload_match = re.search(r"[^总]上[传傳]量?[:_<>/a-zA-Z-=\"'\s#;]+([\d,.\s]+[KMGTPI]*B)", html_text,
re.IGNORECASE)
self.upload = StringUtils.num_filesize(upload_match.group(1).strip()) if upload_match else 0
download_match = re.search(r"[^总子影力]下[载載]量?[:_<>/a-zA-Z-=\"'\s#;]+([\d,.\s]+[KMGTPI]*B)", html_text,
re.IGNORECASE)
self.download = StringUtils.num_filesize(download_match.group(1).strip()) if download_match else 0
ratio_match = re.search(r"分享率[:_<>/a-zA-Z-=\"'\s#;]+([\d,.\s]+)", html_text)
self.ratio = StringUtils.str_float(ratio_match.group(1)) if (
ratio_match and ratio_match.group(1).strip()) else 0.0
def _parse_message_unread_links(self, html_text: str, msg_links: list) -> Optional[str]:
return None
def _parse_message_content(self, html_text):
return None, None, None