From f4ca4120bc768dda2ab43871c4f11766c3bf6402 Mon Sep 17 00:00:00 2001 From: InfinityPacer <160988576+InfinityPacer@users.noreply.github.com> Date: Fri, 29 May 2026 14:14:16 +0800 Subject: [PATCH] =?UTF-8?q?fix(image-proxy):=20=E9=98=BB=E6=96=AD=E6=97=A5?= =?UTF-8?q?=E5=BF=97=E8=BE=93=E5=87=BA=E8=AF=8A=E6=96=AD=E5=8E=9F=E5=9B=A0?= =?UTF-8?q?=E5=B9=B6=E5=90=88=E5=B9=B6=E9=87=8D=E5=A4=8D=E5=91=8A=E8=AD=A6?= =?UTF-8?q?=20(#5858)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/api/endpoints/system.py | 6 +- app/utils/coalesce.py | 210 ++++++++++++++ app/utils/security.py | 416 ++++++++++++++++++++++++--- tests/test_coalesce.py | 201 +++++++++++++ tests/test_security_image_url_log.py | 289 +++++++++++++++++++ tests/test_security_utils.py | 170 +++++++++++ 6 files changed, 1248 insertions(+), 44 deletions(-) create mode 100644 app/utils/coalesce.py create mode 100644 tests/test_coalesce.py create mode 100644 tests/test_security_image_url_log.py diff --git a/app/api/endpoints/system.py b/app/api/endpoints/system.py index 26b97962..e038a734 100644 --- a/app/api/endpoints/system.py +++ b/app/api/endpoints/system.py @@ -360,13 +360,11 @@ async def fetch_image( fetch_url = SecurityUtils.strip_url_signature(url) # 验证URL安全性 - if not await SecurityUtils.is_safe_url_async( + if not await SecurityUtils.is_safe_image_url_async( url, allowed_domains, - block_private=True, allowed_private_ranges=settings.IMAGE_PROXY_ALLOWED_PRIVATE_RANGES, - ) and not (fetch_url := SecurityUtils.verify_signed_url(url)): - logger.warn(f"Blocked unsafe image URL: {url}") + ): return None content = await ImageHelper().async_fetch_image( diff --git a/app/utils/coalesce.py b/app/utils/coalesce.py new file mode 100644 index 00000000..1e46375b --- /dev/null +++ b/app/utils/coalesce.py @@ -0,0 +1,210 @@ +""" +通用时间窗口事件合并器。 + +定位:在固定时间窗口内对相同 key 的重复事件做合并,避免下游(通常是日志、告警、上报)被高频重复事件刷爆。 + +典型场景:同一原因的高频拦截 warning、同一目标的连续失败告警、同一错误码的批量上报——首条事件立即输出保留上下文, +后续命中在窗口内合并为一条计数摘要。 +""" + +import asyncio +import inspect +from dataclasses import dataclass +from enum import Enum +from typing import Any, Awaitable, Callable, Dict, Hashable, Optional, Union + +from app.log import logger + + +class CoalesceDecision(Enum): + """ + `EventCoalescer.record` 的返回值,告知调用方对当前事件应采取的动作。 + """ + + # 首次事件:调用方应立即按原样输出(写日志、发告警等) + EMIT = "emit" + # 窗口内已合并:调用方静默,不再输出 + SUPPRESS = "suppress" + + +@dataclass(frozen=True) +class CoalesceSummary: + """ + 窗口结束时回调给 `on_flush` 的聚合摘要,描述该窗口内被合并的事件。 + """ + + # 聚合键,与 `record` 调用方传入的 key 一致 + key: Hashable + # 窗口内同 key 命中总次数,包含首条 EMIT 的事件 + count: int + # 首条事件的 payload,便于摘要里附"样例"以减少信息丢失 + first_payload: Any + # 该窗口的时长(秒),与 coalescer 构造时一致 + window_seconds: float + + +# `on_flush` 回调签名:同步或 async 均可,由 coalescer 内部按需调度 +OnFlushCallback = Callable[[CoalesceSummary], Union[Awaitable[None], None]] + + +@dataclass +class _BucketState: + """ + 单个 key 的窗口内状态;仅供 `EventCoalescer` 内部使用。 + """ + + # 首条事件 payload,原样保留用于 flush 摘要 + first_payload: Any + # 窗口内累计命中次数(含首条) + count: int + # `loop.call_later` 返回的 handle,用于 close() 时取消 + flush_handle: Optional[asyncio.TimerHandle] + + +class EventCoalescer: + """ + 时间窗口内对相同 key 的重复事件做合并; + + 工作流程: + - 首次出现某 key:`record` 返回 `EMIT`,调用方按原样输出事件; + coalescer 通过 `loop.call_later(window_seconds, ...)` 注册 flush。 + - 窗口内同 key 再次出现:`record` 返回 `SUPPRESS`,累加计数。 + - 窗口到期:取出 bucket,若 `count > 1` 则触发 `on_flush(summary)`; + `count == 1` 时认为单次事件已被首条 EMIT 完整表达,不再补摘要。 + + 线程模型:所有公开方法均为 `async`,仅设计在单个事件循环内使用。 + bucket 字典的读写均落在不含 `await` 的同步段内,靠事件循环的协作式调度 + 天然原子,因此不需要显式锁;也避免了模块级实例化的 `asyncio.Lock` 在 + 跨事件循环复用时可能触发的 `RuntimeError`。 + """ + + def __init__( + self, + window_seconds: float, + on_flush: OnFlushCallback, + source: str = "", + ) -> None: + """ + :param window_seconds: 合并窗口时长(秒),必须 > 0 + :param on_flush: 窗口到期且 count>1 时回调;同步或 async 函数均可 + :param source: 业务来源标识,仅用于内部 debug 日志的前缀,便于区分多 + 个 coalescer 的来源;不会出现在 `on_flush` 摘要里 + """ + if window_seconds <= 0: + raise ValueError("window_seconds must be positive") + self._window_seconds = window_seconds + self._on_flush = on_flush + self._source = source + self._buckets: Dict[Hashable, _BucketState] = {} + self._is_flush_async = inspect.iscoroutinefunction(on_flush) + + @property + def window_seconds(self) -> float: + """ + 合并窗口时长(秒),供外部只读。 + """ + return self._window_seconds + + async def record( + self, key: Hashable, payload: Any = None + ) -> CoalesceDecision: + """ + 登记一次事件。 + + :param key: 聚合键,必须可哈希;推荐使用 tuple 组合业务维度(如 + `(host, reason)`),避免不同业务维度互相吞并 + :param payload: 事件附加信息,仅在该 key 在当前窗口内"首次出现" + 时被保留,用于 flush 摘要里附样例 + :return: `EMIT` 表示调用方应立即输出原事件;`SUPPRESS` 表示窗口 + 内已合并,调用方应静默 + """ + bucket = self._buckets.get(key) + if bucket is None: + handle = self._schedule_flush(key) + self._buckets[key] = _BucketState( + first_payload=payload, + count=1, + flush_handle=handle, + ) + return CoalesceDecision.EMIT + bucket.count += 1 + return CoalesceDecision.SUPPRESS + + async def close(self) -> None: + """ + 立即 flush 所有未到期窗口并清空内部状态。 + + 典型用于进程退出路径与单元测试。已注册的 `loop.call_later` 句柄 + 会被取消,避免在事件循环关闭后再被触发;count>1 的 bucket 同步 + 调用 `on_flush`(async on_flush 会被 await)。 + """ + buckets = list(self._buckets.items()) + self._buckets.clear() + for key, bucket in buckets: + if bucket.flush_handle is not None: + bucket.flush_handle.cancel() + await self._emit_summary_if_needed(key, bucket) + + def _schedule_flush(self, key: Hashable) -> asyncio.TimerHandle: + """ + 为指定 key 注册窗口到期 flush。 + + `call_later` 回调本身只能是同步函数,因此用 `asyncio.create_task` + 把异步 flush 链接回事件循环。捕获事件循环异常并降级为 debug 日志, + 避免基础设施层把异常抛回业务调用方。 + """ + loop = asyncio.get_running_loop() + return loop.call_later(self._window_seconds, self._on_flush_timer, key) + + def _on_flush_timer(self, key: Hashable) -> None: + """ + `loop.call_later` 到期回调:从事件循环里把异步 flush 任务接力起来。 + """ + try: + asyncio.get_running_loop().create_task(self._flush_key(key)) + except RuntimeError as exc: + # 事件循环已关闭等罕见路径:记录后丢弃,避免影响其它 bucket + self._log_debug(f"flush 调度失败,已忽略 key={key!r}: {exc}") + + async def _flush_key(self, key: Hashable) -> None: + """ + 窗口到期后的实际 flush 路径:取出 bucket 并按需调用 `on_flush`。 + """ + bucket = self._buckets.pop(key, None) + if bucket is None: + return + await self._emit_summary_if_needed(key, bucket) + + async def _emit_summary_if_needed( + self, key: Hashable, bucket: _BucketState + ) -> None: + """ + 仅当窗口内命中次数 > 1 时输出聚合摘要。 + + on_flush 的同步/异步形态在构造时已识别;该方法负责按形态正确调度, + 并把消费者异常吞掉转为 debug 日志,避免基础设施把上层业务搞崩。 + """ + if bucket.count <= 1: + return + summary = CoalesceSummary( + key=key, + count=bucket.count, + first_payload=bucket.first_payload, + window_seconds=self._window_seconds, + ) + try: + if self._is_flush_async: + await self._on_flush(summary) # type: ignore[misc] + else: + self._on_flush(summary) + except Exception as exc: # noqa: BLE001 - 基础设施不能因消费者异常崩溃 + self._log_debug(f"on_flush 回调异常已吞: key={key!r}, error={exc}") + + def _log_debug(self, message: str) -> None: + """ + 内部 debug 日志统一加 source 前缀,便于排查多 coalescer 共存时的来源。 + """ + if self._source: + logger.debug(f"[EventCoalescer:{self._source}] {message}") + else: + logger.debug(f"[EventCoalescer] {message}") diff --git a/app/utils/security.py b/app/utils/security.py index b73c0e91..054a3030 100644 --- a/app/utils/security.py +++ b/app/utils/security.py @@ -3,6 +3,8 @@ import hmac import ipaddress import socket import threading +from dataclasses import dataclass, field +from enum import Enum from hashlib import sha256 from pathlib import Path from typing import Dict, Iterable, List, Optional, Set, Union @@ -13,6 +15,11 @@ from cachetools import TTLCache from app.core.config import settings from app.log import logger +from app.utils.coalesce import ( + CoalesceDecision, + CoalesceSummary, + EventCoalescer, +) # DNS 解析结果缓存。 @@ -36,6 +43,50 @@ _dns_inflight_locks: Dict[str, asyncio.Lock] = {} _dns_inflight_meta_lock = threading.Lock() +class UrlSafetyReason(str, Enum): + """ + `evaluate_url_safety` 返回的诊断原因枚举。 + + 成员值为稳定的小写蛇形字符串,可直接作为日志字段或告警标签使用, + 扩展枚举时保留既有成员的取值,避免破坏下游聚合系统对原因的归类。 + """ + + # 通过全部校验,URL 可被请求 + ALLOWED = "allowed" + # 协议非 http/https,或 netloc 无效,或域名不在允许列表内 + DOMAIN_NOT_ALLOWED = "domain_not_allowed" + # 已通过域名 allowlist,但 DNS 解析失败(无返回或抛错) + DNS_RESOLUTION_FAILED = "dns_resolution_failed" + # DNS 解析到至少一个非公网地址,且未配置 `allowed_private_ranges` + NON_GLOBAL_DNS_RESULT = "non_global_dns_result" + # 配置了 `allowed_private_ranges`,但仍存在不在允许网段内的解析结果 + MIXED_OR_DISALLOWED_PRIVATE_RESULT = "mixed_or_disallowed_private_result" + + +@dataclass(frozen=True) +class UrlSafetyDiagnosis: + """ + URL 安全校验的结构化诊断结果,由 `evaluate_url_safety(_async)` 返回。 + + `is_safe_url` 仅使用 `allowed` 字段;日志、告警、运维诊断需要细分原因或 + 解析 IP 时通过本对象消费。字段约束: + - `host` 仅在通过域名 allowlist 后才被填充;DOMAIN_NOT_ALLOWED 场景为 None。 + - `ips` 仅在执行过 DNS 阶段后才可能非空;不含纯字符串协议失败场景。 + - `matched_private_ranges` 仅在通过 `allowed_private_ranges` 放行时填充。 + """ + + # 是否放行 + allowed: bool + # 放行/拦截的具体原因 + reason: UrlSafetyReason + # 通过 allowlist 后从 URL 解析出的 hostname,未通过时为 None + host: Optional[str] = None + # DNS 解析结果(含命中或未命中私网放行的 IP),格式化为字符串 + ips: List[str] = field(default_factory=list) + # 命中允许放行的非公网网段,仅 `ALLOWED` 且走私网放行分支时非空 + matched_private_ranges: List[str] = field(default_factory=list) + + def _resolve_addrinfo_to_ips( address_infos: Iterable, ) -> Optional[List[ipaddress._BaseAddress]]: @@ -546,36 +597,26 @@ class SecurityUtils: allowed_private_ranges: Optional[Iterable[str]] = None, ) -> bool: """ - 验证URL是否在允许的域名列表中,包括带有端口的域名(同步版本) + 验证 URL 是否在允许的域名列表中,包括带有端口的域名(同步版本)。 :param url: 需要验证的 URL :param allowed_domains: 允许的域名集合,域名可以包含端口 - :param strict: 是否严格匹配一级域名(默认为 False,允许多级域名) + :param strict: 是否严格匹配一级域名(默认 False,允许多级域名) :param block_private: 是否拦截解析到非公网地址的 URL,防止 SSRF :param allowed_private_ranges: 域名命中后额外允许的非公网 IP/CIDR 网段 - :return: 如果URL合法且在允许的域名列表中,返回 True;否则返回 False + :return: URL 合法且通过安全校验时返回 True,否则返回 False - 注意:`block_private=True` 时会同步调用 `getaddrinfo`;async 上下文请改用 - `is_safe_url_async`。 + 校验细节与失败原因由 `evaluate_url_safety` 返回;本方法只暴露布尔结果, + 作为只关心通过/拒绝判断的调用方的最薄入口。`block_private=True` 时会 + 同步调用 `getaddrinfo`;async 上下文请改用 `is_safe_url_async`。 """ - try: - hostname = SecurityUtils._check_url_allowlist(url, allowed_domains, strict) - if hostname is None: - return False - - if block_private and not SecurityUtils._is_global_hostname(hostname): - private_match = SecurityUtils._is_allowed_private_hostname( - hostname, allowed_private_ranges - ) - if private_match: - SecurityUtils._log_private_range_allowed(url, private_match) - return True - return False - - return True - except Exception as e: - logger.debug(f"Error occurred while validating URL: {e}") - return False + return SecurityUtils.evaluate_url_safety( + url, + allowed_domains, + strict=strict, + block_private=block_private, + allowed_private_ranges=allowed_private_ranges, + ).allowed @staticmethod async def is_safe_url_async( @@ -586,30 +627,194 @@ class SecurityUtils: allowed_private_ranges: Optional[Iterable[str]] = None, ) -> bool: """ - `is_safe_url` 的异步版本,参数与返回值含义不变。 + 判定 URL 是否在允许的域名列表中,包括带有端口的域名。 - DNS 解析通过事件循环线程池执行,并复用 TTL 缓存。 + DNS 解析通过事件循环线程池执行,并复用 TTL 缓存,不阻塞调用方所在的 + 事件循环。参数与返回值含义同 `is_safe_url`;需要失败原因/解析 IP + 等结构化信息时调用 `evaluate_url_safety_async`。 + """ + diagnosis = await SecurityUtils.evaluate_url_safety_async( + url, + allowed_domains, + strict=strict, + block_private=block_private, + allowed_private_ranges=allowed_private_ranges, + ) + return diagnosis.allowed + + @staticmethod + def evaluate_url_safety( + url: str, + allowed_domains: Union[Set[str], List[str]], + strict: bool = False, + block_private: bool = False, + allowed_private_ranges: Optional[Iterable[str]] = None, + ) -> "UrlSafetyDiagnosis": + """ + 在 `is_safe_url` 的判定路径上输出结构化诊断结果(同步版本)。 + + 与 `is_safe_url` 共用同一套校验顺序:协议/域名 allowlist → 可选 DNS 解析 + → 可选非公网放行匹配;本方法额外返回失败原因、解析到的 IP 列表和命中的 + 私网网段,供日志与告警渲染消费。校验中遇到未预期异常时按默认拒绝原则 + 归类为 `DOMAIN_NOT_ALLOWED`,避免任何解析路径漏过 SSRF 校验。 """ try: hostname = SecurityUtils._check_url_allowlist(url, allowed_domains, strict) if hostname is None: - return False - - if block_private and not await SecurityUtils._is_global_hostname_async( - hostname - ): - private_match = await SecurityUtils._is_allowed_private_hostname_async( - hostname, allowed_private_ranges + return UrlSafetyDiagnosis( + allowed=False, + reason=UrlSafetyReason.DOMAIN_NOT_ALLOWED, ) - if private_match: - SecurityUtils._log_private_range_allowed(url, private_match) - return True - return False - - return True - except Exception as e: + if not block_private: + return UrlSafetyDiagnosis( + allowed=True, + reason=UrlSafetyReason.ALLOWED, + host=hostname, + ) + addresses = SecurityUtils._hostname_addresses(hostname) + return SecurityUtils._diagnose_resolved_addresses( + url, hostname, addresses, allowed_private_ranges + ) + except Exception as e: # noqa: BLE001 - 默认拒绝,避免漏过 SSRF 校验 logger.debug(f"Error occurred while validating URL: {e}") - return False + return UrlSafetyDiagnosis( + allowed=False, + reason=UrlSafetyReason.DOMAIN_NOT_ALLOWED, + ) + + @staticmethod + async def evaluate_url_safety_async( + url: str, + allowed_domains: Union[Set[str], List[str]], + strict: bool = False, + block_private: bool = False, + allowed_private_ranges: Optional[Iterable[str]] = None, + ) -> "UrlSafetyDiagnosis": + """ + 输出与 `evaluate_url_safety` 完全一致的结构化诊断结果。 + + DNS 解析通过事件循环线程池执行,并复用 TTL 缓存,不阻塞调用方所在的 + 事件循环;校验顺序、字段含义、异常归类均与同步版本相同。 + """ + try: + hostname = SecurityUtils._check_url_allowlist(url, allowed_domains, strict) + if hostname is None: + return UrlSafetyDiagnosis( + allowed=False, + reason=UrlSafetyReason.DOMAIN_NOT_ALLOWED, + ) + if not block_private: + return UrlSafetyDiagnosis( + allowed=True, + reason=UrlSafetyReason.ALLOWED, + host=hostname, + ) + addresses = await SecurityUtils._hostname_addresses_async(hostname) + return SecurityUtils._diagnose_resolved_addresses( + url, hostname, addresses, allowed_private_ranges + ) + except Exception as e: # noqa: BLE001 - 默认拒绝,避免漏过 SSRF 校验 + logger.debug(f"Error occurred while validating URL: {e}") + return UrlSafetyDiagnosis( + allowed=False, + reason=UrlSafetyReason.DOMAIN_NOT_ALLOWED, + ) + + @staticmethod + async def is_safe_image_url_async( + url: str, + allowed_domains: Union[Set[str], List[str]], + allowed_private_ranges: Optional[Iterable[str]] = None, + ) -> bool: + """ + 判定 URL 是否可作为图片代理请求目标。 + + 校验顺序:协议 + 域名 allowlist + DNS SSRF 拦截 + 非公网放行匹配;标准 + 校验失败时再用 `verify_signed_url` 兜底,允许后端预签名的媒体服务器 + URL 跳过私网拦截。两者皆失败才视为拒绝。 + + 拒绝路径会输出结构化阻断日志:单次拦截立即打印一条 warning,同 + `(host, reason)` 的连续命中在 `_IMAGE_PROXY_BLOCK_LOG_WINDOW_SECONDS` + 窗口内合并为一条聚合摘要,避免媒体详情页一次请求把日志刷爆。日志字段 + 范围严格限定为 URL、host、reason、解析 IP 与允许网段配置;cookies、 + 签名串、token、请求头等敏感材料一律不进入日志。 + """ + diagnosis = await SecurityUtils.evaluate_url_safety_async( + url, + allowed_domains, + block_private=True, + allowed_private_ranges=allowed_private_ranges, + ) + if diagnosis.allowed: + return True + if SecurityUtils.verify_signed_url(url) is not None: + return True + await _emit_image_proxy_block_warning( + url=url, + diagnosis=diagnosis, + signature_carried=_url_carries_signature(url), + allowed_private_ranges=allowed_private_ranges, + ) + return False + + @staticmethod + def _diagnose_resolved_addresses( + url: str, + hostname: str, + addresses: Optional[List[ipaddress._BaseAddress]], + allowed_private_ranges: Optional[Iterable[str]], + ) -> "UrlSafetyDiagnosis": + """ + 对已完成 DNS 解析的地址列表执行非公网放行判断,并归一化诊断结果。 + + - 地址列表为空/None:视为 DNS 不可信,拒绝并标记 `DNS_RESOLUTION_FAILED`。 + - 全部公网地址:直接放行。 + - 存在非公网地址且未配置允许网段:拒绝并标记 `NON_GLOBAL_DNS_RESULT`, + 供日志附带"如使用 fake-ip 需要配置 IMAGE_PROXY_ALLOWED_PRIVATE_RANGES" + 的提示。 + - 存在非公网地址且配置了允许网段但未全部命中:拒绝并标记 + `MIXED_OR_DISALLOWED_PRIVATE_RESULT`,提示存在不允许的解析结果。 + - 全部命中允许网段:放行并附带命中的 IP 与网段,由 + `_log_private_range_allowed` 输出排查日志。 + """ + if not addresses: + return UrlSafetyDiagnosis( + allowed=False, + reason=UrlSafetyReason.DNS_RESOLUTION_FAILED, + host=hostname, + ) + if SecurityUtils._addresses_all_global(addresses): + return UrlSafetyDiagnosis( + allowed=True, + reason=UrlSafetyReason.ALLOWED, + host=hostname, + ips=[str(addr) for addr in addresses], + ) + networks = SecurityUtils._parse_ip_networks(allowed_private_ranges) + if not networks: + return UrlSafetyDiagnosis( + allowed=False, + reason=UrlSafetyReason.NON_GLOBAL_DNS_RESULT, + host=hostname, + ips=[str(addr) for addr in addresses], + ) + match = SecurityUtils._match_private_addresses(addresses, networks) + if match is None: + return UrlSafetyDiagnosis( + allowed=False, + reason=UrlSafetyReason.MIXED_OR_DISALLOWED_PRIVATE_RESULT, + host=hostname, + ips=[str(addr) for addr in addresses], + ) + matched_addresses, matched_networks = match + SecurityUtils._log_private_range_allowed(url, match) + return UrlSafetyDiagnosis( + allowed=True, + reason=UrlSafetyReason.ALLOWED, + host=hostname, + ips=[str(addr) for addr in matched_addresses], + matched_private_ranges=[str(net) for net in matched_networks], + ) @staticmethod def sanitize_url_path(url: str, max_length: int = 120) -> str: @@ -636,3 +841,134 @@ class SecurityUtils: safe_path = f"compressed_{hash_value}{file_extension}" return safe_path + + +# 图片代理阻断日志聚合窗口(秒)。媒体详情页一次请求会批量触发同 host/同原因的拦截, +# 按 (host, reason) 合并后只输出首条 warning + 窗口结束的聚合摘要,避免日志刷屏。 +_IMAGE_PROXY_BLOCK_LOG_WINDOW_SECONDS = 60.0 + +# fake-ip / 旁路 DNS 用户最常因 IMAGE_PROXY_ALLOWED_PRIVATE_RANGES 未配置而踩坑, +# 在 reason=NON_GLOBAL_DNS_RESULT 且当前未配置允许网段时随 warning 一起输出,指向正确的修复开关。 +_IMAGE_PROXY_FAKEIP_HINT = ( + "提示:若使用 fake-ip / 旁路 DNS(常见网段 198.18.0.0/15、100.64.0.0/10)," + "请将对应网段加入 IMAGE_PROXY_ALLOWED_PRIVATE_RANGES" +) + +# URL fragment 中实际携带代理签名但校验失败时附在 reason 末尾的标记。 +# 仅起标识作用,签名串本身不写入日志,避免泄露签名材料。 +_INVALID_SIGNATURE_TAG = "invalid_signature" + + +def _url_carries_signature(url: str) -> bool: + """ + 判断 URL 是否在 fragment 中显式携带代理签名参数 `mp_sig`。 + + 仅做轻量字符串匹配,避免对普通图片 URL 跑完整签名校验路径;未携带签名 + 的外链不会触发 `invalid_signature` 标记,避免阻断日志误导未签名调用方。 + """ + if not url: + return False + fragment_start = url.find("#") + if fragment_start < 0: + return False + return "mp_sig=" in url[fragment_start + 1:] + + +def _format_image_proxy_block_warning( + *, + url: str, + reason: str, + host: Optional[str], + ips: List[str], + allowed_private_ranges: List[str], + hint: Optional[str], +) -> str: + """ + 渲染图片代理首条阻断 warning 文案。 + + 字段范围严格限定为 URL、host、reason、IP 与允许网段配置;hint 仅在 + reason 与配置缺失同时满足时由调用方填充。其余敏感材料(cookies、签名 + 串、token、请求头)不允许进入该日志路径。 + """ + fields = [ + f"url={url}", + f"reason={reason}", + f"host={host or ''}", + f"ips={','.join(ips)}", + f"allowed_private_ranges={','.join(allowed_private_ranges)}", + ] + line = "Blocked unsafe image URL: " + ", ".join(fields) + if hint: + line = f"{line} | {hint}" + return line + + +def _log_image_proxy_block_summary(summary: CoalesceSummary) -> None: + """ + 图片代理阻断日志聚合窗口到期回调,输出窗口内的命中计数与首条样例。 + + summary.key 由 `_emit_image_proxy_block_warning` 固定构造为 + `(host, reason_label)` 二元组;摘要保留首条事件的 URL 与解析 IP, + 避免运维只看到 count 而无法定位是哪批请求被合并。 + """ + host, reason = summary.key + payload = summary.first_payload or {} + sample_ips = ",".join(payload.get("ips") or []) + logger.warn( + "Blocked unsafe image URL (aggregated): " + f"host={host or ''}, reason={reason}, " + f"count={summary.count}, window={summary.window_seconds:g}s, " + f"sample_url={payload.get('url', '')}, sample_ips={sample_ips}" + ) + + +# 图片代理阻断日志聚合器。同 (host, reason) 高频拦截在窗口内合并为一条聚合摘要,避免媒体详情页一次请求把日志刷爆; +# 放行 debug 日志与诊断布尔结果不受聚合影响。 +_image_proxy_block_log_coalescer = EventCoalescer( + window_seconds=_IMAGE_PROXY_BLOCK_LOG_WINDOW_SECONDS, + on_flush=_log_image_proxy_block_summary, + source="image_proxy", +) + + +async def _emit_image_proxy_block_warning( + *, + url: str, + diagnosis: "UrlSafetyDiagnosis", + signature_carried: bool, + allowed_private_ranges: Optional[Iterable[str]], +) -> None: + """ + 把诊断结果转写为结构化阻断 warning,并交由 coalescer 决定是否实际输出。 + + `signature_carried=True` 表示请求 URL 在 fragment 里实际携带了代理签名但 + 校验失败,此时在 reason 末尾追加 `invalid_signature` 标记,便于区分 + "未签名外链直接撞 allowlist"与"签名 URL 已失效"两种排查路径。 + """ + # reason_label 既作为 warning 字段,也作为 coalescer 桶键的一部分;签名 + # 标记拼接到同一字符串里是为了让"带签名失败"的命中与"裸 URL 失败"分桶, + # 各自独立计数与摘要,不要在不引入新桶维度的情况下拆开。 + reason_label = diagnosis.reason.value + if signature_carried: + reason_label = f"{reason_label}+{_INVALID_SIGNATURE_TAG}" + allowed_ranges = [str(r) for r in (allowed_private_ranges or [])] + hint = ( + _IMAGE_PROXY_FAKEIP_HINT + if diagnosis.reason is UrlSafetyReason.NON_GLOBAL_DNS_RESULT + and not allowed_ranges + else None + ) + key = (diagnosis.host or "", reason_label) + payload = {"url": url, "ips": list(diagnosis.ips)} + decision = await _image_proxy_block_log_coalescer.record(key=key, payload=payload) + if decision is CoalesceDecision.EMIT: + logger.warn( + _format_image_proxy_block_warning( + url=url, + reason=reason_label, + host=diagnosis.host, + ips=list(diagnosis.ips), + allowed_private_ranges=allowed_ranges, + hint=hint, + ) + ) diff --git a/tests/test_coalesce.py b/tests/test_coalesce.py new file mode 100644 index 00000000..5eee8615 --- /dev/null +++ b/tests/test_coalesce.py @@ -0,0 +1,201 @@ +""" +`EventCoalescer` 基础设施单元测试。 + +测试策略:用极短窗口(默认 0.05s)驱动真实事件循环触发 flush,避免引入 +对时间 mock 的复杂度;同时通过 `asyncio.sleep` 让出控制权以保证 flush +回调被调度执行。 +""" + +import asyncio +from typing import List +from unittest import IsolatedAsyncioTestCase + +from app.utils.coalesce import ( + CoalesceDecision, + CoalesceSummary, + EventCoalescer, +) + + +# 窗口尽量短,但要大于事件循环单次 tick 的开销,避免 flush 在 record 仍持锁时触发 +_TEST_WINDOW = 0.05 +# 等待窗口到期 + flush 任务完成所需的额外余量 +_TEST_WAIT = _TEST_WINDOW * 4 + + +class EventCoalescerTest(IsolatedAsyncioTestCase): + """ + 覆盖 EventCoalescer 的核心契约:首条 EMIT、窗口内 SUPPRESS、count>1 + 时 flush 摘要、不同 key 互不影响、close() 立即 flush、on_flush 异常 + 被吞、同步/async on_flush 都可用。 + """ + + async def test_first_record_returns_emit(self): + """ + 某 key 在新窗口内的首次出现必须返回 EMIT,确保调用方按原样输出。 + """ + summaries: List[CoalesceSummary] = [] + coalescer = EventCoalescer(_TEST_WINDOW, summaries.append) + + decision = await coalescer.record(("host", "reason"), payload={"i": 1}) + + self.assertIs(decision, CoalesceDecision.EMIT) + await coalescer.close() + + async def test_subsequent_same_key_records_are_suppressed(self): + """ + 同一 key 在窗口内连续命中,第 2 次起返回 SUPPRESS。 + """ + coalescer = EventCoalescer(_TEST_WINDOW, lambda _s: None) + await coalescer.record("k", payload="first") + + for _ in range(3): + self.assertIs( + await coalescer.record("k", payload="ignored"), + CoalesceDecision.SUPPRESS, + ) + await coalescer.close() + + async def test_window_expiry_flushes_summary_when_count_gt_one(self): + """ + 窗口到期且 count>1 时,on_flush 收到包含 count、first_payload、window 的摘要。 + """ + summaries: List[CoalesceSummary] = [] + coalescer = EventCoalescer(_TEST_WINDOW, summaries.append, source="test") + key = ("h", "r") + await coalescer.record(key, payload={"url": "u1"}) + await coalescer.record(key, payload={"url": "u2"}) + await coalescer.record(key, payload={"url": "u3"}) + + await asyncio.sleep(_TEST_WAIT) + + self.assertEqual(len(summaries), 1) + summary = summaries[0] + self.assertEqual(summary.key, key) + self.assertEqual(summary.count, 3) + self.assertEqual(summary.first_payload, {"url": "u1"}) + self.assertEqual(summary.window_seconds, _TEST_WINDOW) + + async def test_window_expiry_does_not_flush_when_count_is_one(self): + """ + 窗口内只出现一次时,首条 EMIT 已表达完整事件,不再补发聚合摘要。 + """ + summaries: List[CoalesceSummary] = [] + coalescer = EventCoalescer(_TEST_WINDOW, summaries.append) + await coalescer.record("solo", payload=None) + + await asyncio.sleep(_TEST_WAIT) + + self.assertEqual(summaries, []) + + async def test_different_keys_do_not_collapse(self): + """ + 不同 key 各自独立计数与 flush,互不吞并。 + """ + summaries: List[CoalesceSummary] = [] + coalescer = EventCoalescer(_TEST_WINDOW, summaries.append) + await coalescer.record("a", payload="a1") + await coalescer.record("b", payload="b1") + await coalescer.record("a", payload="a2") + await coalescer.record("b", payload="b2") + await coalescer.record("a", payload="a3") + + await asyncio.sleep(_TEST_WAIT) + + by_key = {s.key: s for s in summaries} + self.assertEqual(set(by_key.keys()), {"a", "b"}) + self.assertEqual(by_key["a"].count, 3) + self.assertEqual(by_key["a"].first_payload, "a1") + self.assertEqual(by_key["b"].count, 2) + self.assertEqual(by_key["b"].first_payload, "b1") + + async def test_new_window_after_flush_emits_again(self): + """ + 窗口结束后下一条同 key 事件应被视为新窗口的首条,返回 EMIT。 + """ + coalescer = EventCoalescer(_TEST_WINDOW, lambda _s: None) + await coalescer.record("k", payload=1) + await coalescer.record("k", payload=2) + await asyncio.sleep(_TEST_WAIT) + + decision = await coalescer.record("k", payload=3) + + self.assertIs(decision, CoalesceDecision.EMIT) + await coalescer.close() + + async def test_close_flushes_pending_buckets_immediately(self): + """ + close() 必须取消未到期 timer 并立即触发 count>1 的 bucket flush, + 用于进程退出路径。 + """ + # 使用一个足够长的窗口,确保自然到期不会先于 close 触发 + summaries: List[CoalesceSummary] = [] + coalescer = EventCoalescer(1.0, summaries.append) + await coalescer.record("k", payload="first") + await coalescer.record("k", payload="second") + + await coalescer.close() + + self.assertEqual(len(summaries), 1) + self.assertEqual(summaries[0].count, 2) + self.assertEqual(summaries[0].first_payload, "first") + + async def test_close_does_not_emit_when_count_is_one(self): + """ + close() 与正常窗口到期一致,count==1 时不输出摘要。 + """ + summaries: List[CoalesceSummary] = [] + coalescer = EventCoalescer(1.0, summaries.append) + await coalescer.record("k", payload="only") + + await coalescer.close() + + self.assertEqual(summaries, []) + + async def test_async_on_flush_is_awaited(self): + """ + on_flush 为 async 函数时应被正确 await,而不是被丢弃成协程对象。 + """ + awaited: List[CoalesceSummary] = [] + + async def on_flush(summary: CoalesceSummary) -> None: + await asyncio.sleep(0) + awaited.append(summary) + + coalescer = EventCoalescer(_TEST_WINDOW, on_flush) + await coalescer.record("k", payload="a") + await coalescer.record("k", payload="b") + + await asyncio.sleep(_TEST_WAIT) + + self.assertEqual(len(awaited), 1) + self.assertEqual(awaited[0].count, 2) + + async def test_on_flush_exception_is_swallowed(self): + """ + on_flush 抛异常不能影响 coalescer 自身或上层调用方,仅 debug 记录。 + """ + def on_flush(_summary: CoalesceSummary) -> None: + raise RuntimeError("boom") + + coalescer = EventCoalescer(_TEST_WINDOW, on_flush) + await coalescer.record("k", payload="x") + await coalescer.record("k", payload="y") + + await asyncio.sleep(_TEST_WAIT) + + # 异常被吞,新窗口可以继续接受 record + self.assertIs( + await coalescer.record("k", payload="z"), + CoalesceDecision.EMIT, + ) + await coalescer.close() + + async def test_invalid_window_raises(self): + """ + 非正数窗口值在构造期即拒绝,避免运行期出现 0 或负窗口的死循环 flush。 + """ + with self.assertRaises(ValueError): + EventCoalescer(0, lambda _s: None) + with self.assertRaises(ValueError): + EventCoalescer(-1.0, lambda _s: None) diff --git a/tests/test_security_image_url_log.py b/tests/test_security_image_url_log.py new file mode 100644 index 00000000..91fcfaf5 --- /dev/null +++ b/tests/test_security_image_url_log.py @@ -0,0 +1,289 @@ +""" +覆盖 `SecurityUtils.is_safe_image_url_async` 的阻断分支与日志聚合接线: + +- 各 `UrlSafetyReason` 分支落入正确的 warning 字段; +- `NON_GLOBAL_DNS_RESULT` 且未配置允许网段时附 fake-ip 提示; +- 签名 URL 校验通过时静默放行;URL 携带签名但失败时附 `invalid_signature` 标记; +- 同 (host, reason) 高频拦截只输出首条 warning,窗口结束输出聚合摘要; +- 不同 (host, reason) 互不吞并。 +""" + +import asyncio +from typing import List, Optional +from unittest import IsolatedAsyncioTestCase +from unittest.mock import patch + +from app.utils import security as security_module +from app.utils.coalesce import EventCoalescer +from app.utils.security import ( + SecurityUtils, + UrlSafetyDiagnosis, + UrlSafetyReason, +) + + +_TEST_WINDOW = 0.05 +_TEST_WAIT = _TEST_WINDOW * 4 + + +def _diag( + reason: UrlSafetyReason, + *, + host: Optional[str] = "image.tmdb.org", + ips: Optional[List[str]] = None, +) -> UrlSafetyDiagnosis: + """ + 构造测试用 `UrlSafetyDiagnosis`:DOMAIN_NOT_ALLOWED 强制清空 host,保持与 + `evaluate_url_safety_async` 真实输出的字段约束一致。 + """ + if reason is UrlSafetyReason.DOMAIN_NOT_ALLOWED: + host = None + return UrlSafetyDiagnosis( + allowed=False, + reason=reason, + host=host, + ips=ips or [], + ) + + +class IsSafeImageUrlLogTest(IsolatedAsyncioTestCase): + """ + `is_safe_image_url_async` 阻断路径的结构化日志 + 聚合行为校验。 + """ + + async def asyncSetUp(self) -> None: + # 用短窗口实例临时替换模块级 coalescer,便于在测试内驱动窗口到期 flush + self._original_coalescer = security_module._image_proxy_block_log_coalescer + self._coalescer = EventCoalescer( + window_seconds=_TEST_WINDOW, + on_flush=security_module._log_image_proxy_block_summary, + source="image_proxy_test", + ) + security_module._image_proxy_block_log_coalescer = self._coalescer + self._allowed_domains = {"image.tmdb.org"} + + async def asyncTearDown(self) -> None: + await self._coalescer.close() + security_module._image_proxy_block_log_coalescer = self._original_coalescer + + async def _invoke( + self, + diagnosis: UrlSafetyDiagnosis, + *, + url: str = "https://image.tmdb.org/t/p/w500/x.jpg", + signed_clean_url: Optional[str] = None, + allowed_private_ranges: Optional[List[str]] = None, + ): + """ + 以指定诊断结果与签名校验返回值驱动 `is_safe_image_url_async`,捕获 warning。 + """ + async def fake_evaluate(*_args, **_kwargs): + return diagnosis + + warns: List[str] = [] + with patch.object( + SecurityUtils, + "evaluate_url_safety_async", + side_effect=fake_evaluate, + ), patch.object( + SecurityUtils, + "verify_signed_url", + return_value=signed_clean_url, + ), patch.object( + security_module.logger, + "warn", + side_effect=warns.append, + ): + allowed = await SecurityUtils.is_safe_image_url_async( + url, + self._allowed_domains, + allowed_private_ranges=allowed_private_ranges, + ) + return allowed, warns + + async def test_domain_not_allowed_emits_clean_reason_label(self): + """ + 普通外链(未携带 mp_sig)撞 allowlist 失败时,warning 标记 + DOMAIN_NOT_ALLOWED,不附 fake-ip 提示,也不挂签名失败标记, + 避免误导未签名调用方以为必须签名。 + """ + allowed, warns = await self._invoke( + _diag(UrlSafetyReason.DOMAIN_NOT_ALLOWED), + ) + + self.assertFalse(allowed) + self.assertEqual(len(warns), 1) + self.assertIn("reason=domain_not_allowed", warns[0]) + self.assertIn("Blocked unsafe image URL", warns[0]) + self.assertNotIn("fake-ip", warns[0]) + self.assertNotIn("invalid_signature", warns[0]) + + async def test_invalid_signature_tag_only_when_url_signed(self): + """ + URL 显式携带 `#mp_sig=...` 但校验失败时,reason 末尾追加 + `invalid_signature`,便于区分"签名失效"与"未签名外链拦截"。 + """ + allowed, warns = await self._invoke( + _diag(UrlSafetyReason.DOMAIN_NOT_ALLOWED), + url="https://attacker.example.com/x.jpg#mp_sig=deadbeef&mp_purpose=image-proxy", + ) + + self.assertFalse(allowed) + self.assertEqual(len(warns), 1) + self.assertIn( + "reason=domain_not_allowed+invalid_signature", warns[0] + ) + + async def test_non_global_dns_result_lists_ips_with_hint(self): + """ + DNS 解析到非公网且未配置允许网段时,warning 列出解析 IP 并附 fake-ip 提示。 + """ + allowed, warns = await self._invoke( + _diag( + UrlSafetyReason.NON_GLOBAL_DNS_RESULT, + ips=["198.18.16.96", "198.18.16.97"], + ), + ) + + self.assertFalse(allowed) + self.assertEqual(len(warns), 1) + warning = warns[0] + self.assertIn("reason=non_global_dns_result", warning) + self.assertIn("host=image.tmdb.org", warning) + self.assertIn("ips=198.18.16.96,198.18.16.97", warning) + self.assertIn("IMAGE_PROXY_ALLOWED_PRIVATE_RANGES", warning) + self.assertIn("198.18.0.0/15", warning) + + async def test_configured_ranges_skip_fakeip_hint(self): + """ + 已配置 allowed_private_ranges 时不再追加 fake-ip 提示,避免重复引导。 + warning 同时把已生效的网段列在字段里供运维对照。 + """ + _, warns = await self._invoke( + _diag( + UrlSafetyReason.MIXED_OR_DISALLOWED_PRIVATE_RESULT, + ips=["10.0.0.8"], + ), + allowed_private_ranges=["198.18.0.0/15"], + ) + + self.assertEqual(len(warns), 1) + warning = warns[0] + self.assertIn("reason=mixed_or_disallowed_private_result", warning) + self.assertIn("allowed_private_ranges=198.18.0.0/15", warning) + self.assertNotIn("提示", warning) + + async def test_dns_resolution_failed_carries_empty_ips(self): + """ + DNS 解析失败的 warning 携带空 ips 字段,便于运维直接定位 DNS 路径。 + """ + _, warns = await self._invoke( + _diag(UrlSafetyReason.DNS_RESOLUTION_FAILED, ips=[]), + ) + + self.assertEqual(len(warns), 1) + self.assertIn("reason=dns_resolution_failed", warns[0]) + self.assertIn("ips=,", warns[0]) + + async def test_signed_url_success_silently_allows(self): + """ + 标准校验失败但签名 URL 校验通过时返回 True,且不输出 warning, + 避免运维误判后端预签名路径是异常拦截。 + """ + allowed, warns = await self._invoke( + _diag(UrlSafetyReason.DOMAIN_NOT_ALLOWED), + signed_clean_url="https://image.tmdb.org/t/p/w500/x.jpg", + ) + + self.assertTrue(allowed) + self.assertEqual(warns, []) + + async def test_repeated_block_in_window_emits_only_first_warning(self): + """ + 同 (host, reason) 在窗口内的多次命中只输出首条 warning;窗口到期后 + 补一条聚合摘要,count 等于窗口内总命中数,sample_url 来自首条事件。 + """ + diag = _diag( + UrlSafetyReason.NON_GLOBAL_DNS_RESULT, + ips=["198.18.16.96"], + ) + + async def fake_evaluate(*_args, **_kwargs): + return diag + + warns: List[str] = [] + with patch.object( + SecurityUtils, + "evaluate_url_safety_async", + side_effect=fake_evaluate, + ), patch.object( + SecurityUtils, + "verify_signed_url", + return_value=None, + ), patch.object( + security_module.logger, + "warn", + side_effect=warns.append, + ): + for i in range(5): + await SecurityUtils.is_safe_image_url_async( + f"https://image.tmdb.org/t/p/w500/{i}.jpg", + self._allowed_domains, + ) + self.assertEqual(len(warns), 1) + self.assertIn("/0.jpg", warns[0]) + + await asyncio.sleep(_TEST_WAIT) + + self.assertEqual(len(warns), 2) + summary = warns[1] + self.assertIn("aggregated", summary) + self.assertIn("count=5", summary) + self.assertIn("/0.jpg", summary) + self.assertNotIn("/1.jpg", summary) + self.assertNotIn("/4.jpg", summary) + # 摘要附带首条样例的解析 IP,便于直接锁定批量拦截的网络成因 + self.assertIn("sample_ips=198.18.16.96", summary) + + async def test_different_keys_do_not_collapse(self): + """ + 不同 (host, reason) 各自计数与输出,互不吞并。 + """ + warns: List[str] = [] + sequence = { + "evil": _diag(UrlSafetyReason.DOMAIN_NOT_ALLOWED, host=None), + "tmdb": _diag( + UrlSafetyReason.NON_GLOBAL_DNS_RESULT, + host="image.tmdb.org", + ips=["198.18.16.96"], + ), + } + + async def fake_evaluate(url, *_args, **_kwargs): + return sequence["evil"] if "evil" in url else sequence["tmdb"] + + with patch.object( + SecurityUtils, + "evaluate_url_safety_async", + side_effect=fake_evaluate, + ), patch.object( + SecurityUtils, + "verify_signed_url", + return_value=None, + ), patch.object( + security_module.logger, + "warn", + side_effect=warns.append, + ): + await SecurityUtils.is_safe_image_url_async( + "https://evil.example.com/x.jpg", + self._allowed_domains, + ) + await SecurityUtils.is_safe_image_url_async( + "https://image.tmdb.org/t/p/w500/a.jpg", + self._allowed_domains, + ) + + self.assertEqual(len(warns), 2) + self.assertIn("reason=domain_not_allowed", warns[0]) + self.assertIn("reason=non_global_dns_result", warns[1]) diff --git a/tests/test_security_utils.py b/tests/test_security_utils.py index 76beb305..cc7771d7 100644 --- a/tests/test_security_utils.py +++ b/tests/test_security_utils.py @@ -4,6 +4,8 @@ from unittest.mock import patch from app.utils.security import ( SecurityUtils, + UrlSafetyDiagnosis, + UrlSafetyReason, _dns_inflight_locks, _dns_negative_cache, _dns_positive_cache, @@ -681,3 +683,171 @@ class SecurityUtilsTest(TestCase): _dns_inflight_locks, "并发等待者全部退出后必须释放 in-flight 锁字典条目", ) + + +class UrlSafetyDiagnosisTest(TestCase): + """ + 覆盖 `evaluate_url_safety(_async)` 的结构化诊断结果,确保每条 + `UrlSafetyReason` 分支返回的字段满足日志渲染契约。 + """ + + def setUp(self) -> None: + _dns_positive_cache.clear() + _dns_negative_cache.clear() + _dns_inflight_locks.clear() + + def test_domain_not_allowed_returns_reason_and_no_host(self): + """ + 协议或 allowlist 校验未通过时,诊断返回 DOMAIN_NOT_ALLOWED, + 且不暴露 host/ips 字段。 + """ + diag = SecurityUtils.evaluate_url_safety( + "https://attacker.example.com/x.jpg", + {"image.tmdb.org"}, + ) + + self.assertIsInstance(diag, UrlSafetyDiagnosis) + self.assertFalse(diag.allowed) + self.assertIs(diag.reason, UrlSafetyReason.DOMAIN_NOT_ALLOWED) + self.assertIsNone(diag.host) + self.assertEqual(diag.ips, []) + self.assertEqual(diag.matched_private_ranges, []) + + def test_allowed_without_block_private_skips_dns(self): + """ + 未启用 block_private 时直接放行,不发起 DNS 解析,ips 保持为空。 + """ + with patch( + "app.utils.security.socket.getaddrinfo", + side_effect=AssertionError("不应触发 DNS 解析"), + ): + diag = SecurityUtils.evaluate_url_safety( + "https://image.tmdb.org/t/p/w500/x.jpg", + {"image.tmdb.org"}, + ) + + self.assertTrue(diag.allowed) + self.assertIs(diag.reason, UrlSafetyReason.ALLOWED) + self.assertEqual(diag.host, "image.tmdb.org") + self.assertEqual(diag.ips, []) + + def test_dns_resolution_failed_carries_host_without_ips(self): + """ + `block_private=True` 下 DNS 抛错时返回 DNS_RESOLUTION_FAILED, + 附带 host 便于排查但不携带 ips。 + """ + with patch( + "app.utils.security.socket.getaddrinfo", + side_effect=socket.gaierror, + ): + diag = SecurityUtils.evaluate_url_safety( + "https://image.tmdb.org/t/p/w500/x.jpg", + {"image.tmdb.org"}, + block_private=True, + ) + + self.assertFalse(diag.allowed) + self.assertIs(diag.reason, UrlSafetyReason.DNS_RESOLUTION_FAILED) + self.assertEqual(diag.host, "image.tmdb.org") + self.assertEqual(diag.ips, []) + + def test_non_global_dns_result_lists_resolved_ips(self): + """ + 命中 allowlist 但 DNS 解析到非公网且未配置允许网段时,诊断标记 + NON_GLOBAL_DNS_RESULT 并把解析到的 IP 列出来,供日志附带 fake-ip 提示。 + """ + with patch( + "app.utils.security.socket.getaddrinfo", + return_value=[ + (socket.AF_INET, socket.SOCK_STREAM, 0, "", ("198.18.16.96", 0)), + ], + ): + diag = SecurityUtils.evaluate_url_safety( + "https://image.tmdb.org/t/p/w500/x.jpg", + {"image.tmdb.org"}, + block_private=True, + ) + + self.assertFalse(diag.allowed) + self.assertIs(diag.reason, UrlSafetyReason.NON_GLOBAL_DNS_RESULT) + self.assertEqual(diag.host, "image.tmdb.org") + self.assertEqual(diag.ips, ["198.18.16.96"]) + self.assertEqual(diag.matched_private_ranges, []) + + def test_mixed_private_and_public_with_ranges_reports_mixed_reason(self): + """ + 配置了 allowed_private_ranges 但解析结果存在公网或不在允许网段内的私网 + 地址时,诊断必须标记 MIXED_OR_DISALLOWED_PRIVATE_RESULT,避免与"未配置 + 允许网段"场景混淆。 + """ + with patch( + "app.utils.security.socket.getaddrinfo", + return_value=[ + (socket.AF_INET, socket.SOCK_STREAM, 0, "", ("198.18.16.96", 0)), + (socket.AF_INET, socket.SOCK_STREAM, 0, "", ("10.0.0.8", 0)), + ], + ): + diag = SecurityUtils.evaluate_url_safety( + "https://image.tmdb.org/t/p/w500/x.jpg", + {"image.tmdb.org"}, + block_private=True, + allowed_private_ranges=["198.18.0.0/15"], + ) + + self.assertFalse(diag.allowed) + self.assertIs( + diag.reason, UrlSafetyReason.MIXED_OR_DISALLOWED_PRIVATE_RESULT + ) + self.assertEqual(diag.ips, ["198.18.16.96", "10.0.0.8"]) + + def test_allowed_via_configured_private_range_reports_matched_networks(self): + """ + 通过 allowed_private_ranges 放行时返回 ALLOWED,同时把命中的 IP 与 + 网段填入诊断对象,便于排查日志确认放行依据。 + """ + with patch( + "app.utils.security.socket.getaddrinfo", + return_value=[ + (socket.AF_INET, socket.SOCK_STREAM, 0, "", ("198.18.16.96", 0)), + ], + ): + diag = SecurityUtils.evaluate_url_safety( + "https://image.tmdb.org/t/p/w500/x.jpg", + {"image.tmdb.org"}, + block_private=True, + allowed_private_ranges=["198.18.0.0/15"], + ) + + self.assertTrue(diag.allowed) + self.assertIs(diag.reason, UrlSafetyReason.ALLOWED) + self.assertEqual(diag.ips, ["198.18.16.96"]) + self.assertEqual(diag.matched_private_ranges, ["198.18.0.0/15"]) + + def test_async_evaluation_returns_same_diagnosis(self): + """ + 异步版本走事件循环线程池但应保持与同步版本一致的诊断结果。 + """ + import asyncio + + async def fake_getaddrinfo(host, *_args, **_kwargs): + return [ + (socket.AF_INET, socket.SOCK_STREAM, 0, "", ("198.18.16.96", 0)), + ] + + async def run(): + with patch.object( + asyncio.get_running_loop(), + "getaddrinfo", + side_effect=fake_getaddrinfo, + create=True, + ): + return await SecurityUtils.evaluate_url_safety_async( + "https://image.tmdb.org/x.jpg", + {"image.tmdb.org"}, + block_private=True, + ) + + diag = asyncio.run(run()) + self.assertFalse(diag.allowed) + self.assertIs(diag.reason, UrlSafetyReason.NON_GLOBAL_DNS_RESULT) + self.assertEqual(diag.ips, ["198.18.16.96"])