fix: simplify audiences unread pagination

This commit is contained in:
jxxghp
2026-05-20 21:42:53 +08:00
parent d37954e6bc
commit 014dc2884c
2 changed files with 25 additions and 144 deletions

View File

@@ -1,8 +1,7 @@
# -*- coding: utf-8 -*-
import json
import re
from typing import Optional
from urllib.parse import parse_qsl, urlencode, urljoin, urlsplit, urlunsplit
from urllib.parse import urljoin
from lxml import etree
@@ -20,7 +19,8 @@ class NexusAudiencesSiteUserInfo(NexusPhpSiteUserInfo):
"""
super().__init__(*args, **kwargs)
self._user_mail_unread_page = self.__build_unread_mailbox_page(box=1)
self._sys_mail_unread_page = self.__build_unread_mailbox_page(box=-2)
self._sys_mail_unread_page = None
self.__next_mail_page = 1
def _parse_message_unread(self, html_text):
"""
@@ -64,7 +64,7 @@ class NexusAudiencesSiteUserInfo(NexusPhpSiteUserInfo):
'or @alt="Unread" or @title="未读"]]/td/a[contains(@href, "viewmessage")]/@href'
)
msg_links.extend(message_links)
next_page = self.__parse_next_message_page(html)
next_page = self.__build_next_unread_mailbox_page(bool(message_links))
finally:
if html is not None:
del html
@@ -78,134 +78,16 @@ class NexusAudiencesSiteUserInfo(NexusPhpSiteUserInfo):
"""
return f"messages.php?action=viewmailbox&box={box}&unread=yes"
@classmethod
def __parse_next_message_page(cls, html) -> Optional[str]:
def __build_next_unread_mailbox_page(self, has_unread: bool) -> str:
"""
解析 Audiences 新版分页中的下一页链接,兼容图标按钮和英文无障碍标签
当前页存在未读消息时按 Audiences 的 page 参数规则生成下一页地址
"""
next_pages = []
for link in html.xpath('//a[@href]'):
if cls.__is_next_message_page_link(link):
next_pages.append(cls.__normalize_message_page_link(link.get("href").strip()))
if next_pages:
return next_pages[-1]
return cls.__parse_next_numeric_message_page(html)
@classmethod
def __parse_next_numeric_message_page(cls, html) -> Optional[str]:
"""
从数字分页中推断下一页,兼容只展示页码按钮的私信列表。
"""
current_page = None
page_links = []
for link in html.xpath('//a[@href]'):
page = cls.__extract_message_page(link.get("href"))
if page is None:
continue
page_links.append((page, link.get("href").strip()))
if cls.__is_current_page_link(link):
current_page = page
if current_page is None:
current_page = cls.__extract_current_page_from_markup(html)
if current_page is None or not page_links:
if not has_unread:
return None
next_links = [(page, href) for page, href in page_links if page > current_page]
if not next_links:
return None
_, next_link = sorted(next_links, key=lambda item: item[0])[0]
return cls.__normalize_message_page_link(next_link)
@staticmethod
def __is_next_message_page_link(link) -> bool:
"""
判断分页链接是否指向下一页,避免只识别中文“下一页”文本。
"""
link_class = f" {link.get('class') or ''} ".lower()
if any(flag in link_class for flag in (" disabled ", " disable ", " inactive ")):
return False
link_text = re.sub(r"\s+", " ", link.xpath("string(.)") or "").strip().lower()
title = (link.get("title") or "").strip().lower()
aria_label = (link.get("aria-label") or "").strip().lower()
rel = (link.get("rel") or "").strip().lower()
signal_text = " ".join([link_text, title, aria_label, rel])
if any(keyword in signal_text for keyword in ("下一页", "下一頁", "next")):
return True
if link_text in {">", "", "»"}:
return True
return any(flag in link_class for flag in (" next ", " pager-next ", " page-next "))
@classmethod
def __extract_message_page(cls, href: str) -> Optional[int]:
"""
从 Audiences 私信分页链接中提取页码。
"""
if not href:
return None
query_params = dict(parse_qsl(urlsplit(href).query, keep_blank_values=True))
if query_params.get("action") != "viewmailbox":
return None
return StringUtils.str_int(query_params.get("page"))
@staticmethod
def __is_current_page_link(link) -> bool:
"""
判断页码链接是否表示当前页。
"""
link_class = f" {link.get('class') or ''} ".lower()
parent_class = f" {link.getparent().get('class') or ''} ".lower() if link.getparent() is not None else ""
aria_current = (link.get("aria-current") or "").strip().lower()
current_flags = (" active ", " current ", " selected ")
return aria_current == "page" or any(flag in link_class or flag in parent_class for flag in current_flags)
@staticmethod
def __extract_current_page_from_markup(html) -> Optional[int]:
"""
从非链接的当前页标记中提取 Audiences 的 page 参数值。
"""
current_texts = html.xpath(
'//*[contains(concat(" ", normalize-space(@class), " "), " active ") '
'or contains(concat(" ", normalize-space(@class), " "), " current ") '
'or contains(concat(" ", normalize-space(@class), " "), " selected ") '
'or @aria-current="page"]/text()'
)
for current_text in current_texts:
page_match = re.search(r"\d+", str(current_text))
if page_match:
# Audiences 页码展示从 1 开始,但 URL 中 page=1 表示第二页。
return max(StringUtils.str_int(page_match.group()) - 1, 0)
return None
@classmethod
def __normalize_message_page_link(cls, href: str) -> str:
"""
给翻页链接补齐未读过滤,避免后续页退回站点完整收件箱。
"""
if not href:
return href
url_info = urlsplit(href)
query_params = dict(parse_qsl(url_info.query, keep_blank_values=True))
if query_params.get("action") != "viewmailbox":
return href
query_params.setdefault("unread", "yes")
return urlunsplit(
(
url_info.scheme,
url_info.netloc,
url_info.path,
urlencode(query_params),
url_info.fragment,
)
)
next_page = self.__next_mail_page
self.__next_mail_page += 1
return f"{self._user_mail_unread_page}&page={next_page}"
def _parse_user_traffic_info(self, html_text):
"""

View File

@@ -149,7 +149,7 @@ def test_audiences_table_unread_links_ignore_content_rows():
next_page = parser._parse_message_unread_links(html_text, msg_links)
assert msg_links == ["messages.php?action=viewmessage&id=4318225"]
assert next_page is None
assert next_page == "messages.php?action=viewmailbox&box=1&unread=yes&page=1"
def test_audiences_readpm_row_is_not_unread_message():
@@ -183,9 +183,9 @@ def test_audiences_readpm_row_is_not_unread_message():
assert msg_links == []
def test_audiences_unread_mailbox_first_page_omits_page_param():
def test_audiences_unread_mailbox_only_uses_user_box():
"""
Audiences 私信首页不传 pagepage=1 实际表示第二页。
Audiences 只使用用户消息箱,首页不传 pagepage=1 实际表示第二页。
"""
parser = NexusAudiencesSiteUserInfo(
site_name="Audiences",
@@ -196,12 +196,12 @@ def test_audiences_unread_mailbox_first_page_omits_page_param():
)
assert parser._user_mail_unread_page == "messages.php?action=viewmailbox&box=1&unread=yes"
assert parser._sys_mail_unread_page == "messages.php?action=viewmailbox&box=-2&unread=yes"
assert parser._sys_mail_unread_page is None
def test_audiences_unread_links_follow_icon_next_page():
def test_audiences_unread_links_increment_page_until_empty():
"""
Audiences 新版分页可能只有图标和 aria-label不能只依赖中文下一页文本
Audiences 每页固定 10 条,有未读行时按 page 参数自增继续翻页
"""
parser = NexusAudiencesSiteUserInfo(
site_name="Audiences",
@@ -223,20 +223,19 @@ def test_audiences_unread_links_follow_icon_next_page():
</td>
</tr>
</table>
<nav class="pagination">
<a class="page-link disabled" href="messages.php?action=viewmailbox&amp;box=1">Prev</a>
<a class="page-link active" href="messages.php?action=viewmailbox&amp;box=1">1</a>
<a class="page-link" href="messages.php?action=viewmailbox&amp;box=1&amp;page=1"
aria-label="Next">
<i class="fas fa-angle-right"></i>
</a>
</nav>
</body>
</html>
"""
msg_links = []
next_page = parser._parse_message_unread_links(html_text, msg_links)
next_next_page = parser._parse_message_unread_links(html_text, msg_links)
stop_page = parser._parse_message_unread_links("<html><body><table></table></body></html>", msg_links)
assert msg_links == ["messages.php?action=viewmessage&id=4318225"]
assert next_page == "messages.php?action=viewmailbox&box=1&page=1&unread=yes"
assert msg_links == [
"messages.php?action=viewmessage&id=4318225",
"messages.php?action=viewmessage&id=4318225",
]
assert next_page == "messages.php?action=viewmailbox&box=1&unread=yes&page=1"
assert next_next_page == "messages.php?action=viewmailbox&box=1&unread=yes&page=2"
assert stop_page is None