diff --git a/app/api/endpoints/system.py b/app/api/endpoints/system.py index ff8ceabc..f33d4b9e 100644 --- a/app/api/endpoints/system.py +++ b/app/api/endpoints/system.py @@ -361,7 +361,10 @@ async def fetch_image( fetch_url = SecurityUtils.strip_url_signature(url) # 验证URL安全性 if not SecurityUtils.is_safe_url( - url, allowed_domains, block_private=True + url, + allowed_domains, + block_private=True, + allowed_private_ranges=settings.IMAGE_PROXY_ALLOWED_PRIVATE_RANGES, ) and not (fetch_url := SecurityUtils.verify_signed_url(url)): logger.warn(f"Blocked unsafe image URL: {url}") return None diff --git a/app/core/config.py b/app/core/config.py index cb832f35..1f092086 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -510,6 +510,8 @@ class ConfigModel(BaseModel): "qpic.cn", ] ) + # 图片代理允许访问的非公网 IP/CIDR,默认不放行任何非公网解析结果 + IMAGE_PROXY_ALLOWED_PRIVATE_RANGES: list = Field(default=[]) # 允许的图片文件后缀格式 SECURITY_IMAGE_SUFFIXES: list = Field( default=[".jpg", ".jpeg", ".png", ".webp", ".gif", ".svg", ".avif"] diff --git a/app/utils/security.py b/app/utils/security.py index 5f908882..41e5bee7 100644 --- a/app/utils/security.py +++ b/app/utils/security.py @@ -4,7 +4,7 @@ import socket import time from hashlib import sha256 from pathlib import Path -from typing import List, Optional, Set, Union +from typing import Iterable, List, Optional, Set, Union from urllib.parse import parse_qsl, quote, urlencode, urlparse, urlunparse from anyio import Path as AsyncPath @@ -112,6 +112,86 @@ class SecurityUtils: return False return True + @staticmethod + def _parse_ip_networks(ranges: Optional[Iterable[str]]) -> List[ipaddress._BaseNetwork]: + """ + 解析用户配置的 IP/CIDR 网段。 + + 配置错误的条目会被忽略并写入 debug 日志,避免单个无效值导致所有图片代理 + 校验失败。调用方仍然需要先完成域名白名单匹配,不能单独依赖该网段放行。 + """ + networks = [] + for value in ranges or []: + if not value: + continue + try: + networks.append(ipaddress.ip_network(str(value).strip(), strict=False)) + except ValueError: + logger.debug(f"忽略无效的图片代理允许网段配置: {value}") + return networks + + @staticmethod + def _hostname_addresses(hostname: str) -> Optional[List[ipaddress._BaseAddress]]: + """ + 解析主机名并返回全部 IP 地址。 + + 字面量 IP 直接返回自身;DNS 解析失败或结果异常时返回 None,让上层按 + 不安全目标处理。 + """ + if not hostname: + return None + try: + return [ipaddress.ip_address(hostname)] + except ValueError: + pass + + try: + address_infos = socket.getaddrinfo(hostname, None, type=socket.SOCK_STREAM) + except socket.gaierror: + return None + + if not address_infos: + return None + + addresses = [] + for address_info in address_infos: + try: + addresses.append(ipaddress.ip_address(address_info[4][0])) + except ValueError: + return None + return addresses + + @staticmethod + def _is_allowed_private_hostname( + hostname: str, + allowed_private_ranges: Optional[Iterable[str]], + ) -> Optional[tuple[List[ipaddress._BaseAddress], List[ipaddress._BaseNetwork]]]: + """ + 返回主机名命中的显式允许非公网地址和网段。 + + 该能力只用于图片代理的受控例外,例如 TUN fake-ip 或内网 CDN。必须由 + `is_safe_url` 先完成域名 allowlist 校验后再调用,避免把任意用户 URL + 变成 SSRF 绕过入口。 + """ + networks = SecurityUtils._parse_ip_networks(allowed_private_ranges) + if not networks: + return None + addresses = SecurityUtils._hostname_addresses(hostname) + if not addresses: + return None + if all(address.is_global for address in addresses): + return None + + matched_networks = [] + for address in addresses: + matched_for_address = [ + network for network in networks if address in network + ] + if not matched_for_address: + return None + matched_networks.extend(matched_for_address) + return addresses, list(dict.fromkeys(matched_networks)) + @staticmethod def _url_signature_payload(url: str, expires_at: int, purpose: str) -> bytes: """ @@ -215,6 +295,7 @@ class SecurityUtils: allowed_domains: Union[Set[str], List[str]], strict: bool = False, block_private: bool = False, + allowed_private_ranges: Optional[Iterable[str]] = None, ) -> bool: """ 验证URL是否在允许的域名列表中,包括带有端口的域名 @@ -223,6 +304,7 @@ class SecurityUtils: :param allowed_domains: 允许的域名集合,域名可以包含端口 :param strict: 是否严格匹配一级域名(默认为 False,允许多级域名) :param block_private: 是否拦截解析到非公网地址的 URL,防止 SSRF + :param allowed_private_ranges: 域名命中后额外允许的非公网 IP/CIDR 网段 :return: 如果URL合法且在允许的域名列表中,返回 True;否则返回 False """ try: @@ -242,11 +324,9 @@ class SecurityUtils: if not netloc: return False - if block_private and not SecurityUtils._is_global_hostname(parsed_url.hostname or ""): - return False - # 检查每个允许的域名 allowed_domains = {d.lower() for d in allowed_domains} + domain_allowed = False for domain in allowed_domains: parsed_allowed_url = urlparse(domain) allowed_netloc = parsed_allowed_url.netloc or parsed_allowed_url.path @@ -254,13 +334,33 @@ class SecurityUtils: if strict: # 严格模式下,要求完全匹配域名和端口 if netloc == allowed_netloc: - return True + domain_allowed = True + break else: # 非严格模式下,允许子域名匹配 if netloc == allowed_netloc or netloc.endswith('.' + allowed_netloc): - return True + domain_allowed = True + break - return False + if not domain_allowed: + return False + + hostname = parsed_url.hostname or "" + if block_private and not SecurityUtils._is_global_hostname(hostname): + private_match = SecurityUtils._is_allowed_private_hostname( + hostname, allowed_private_ranges + ) + if private_match: + addresses, matched_networks = private_match + logger.debug( + "图片代理允许访问配置的非公网网段: " + f"url={url}, ips={','.join(map(str, addresses))}, " + f"ranges={','.join(map(str, matched_networks))}" + ) + return True + return False + + return True except Exception as e: logger.debug(f"Error occurred while validating URL: {e}") return False diff --git a/tests/test_security_utils.py b/tests/test_security_utils.py index 4cf61c13..222b1243 100644 --- a/tests/test_security_utils.py +++ b/tests/test_security_utils.py @@ -162,3 +162,113 @@ class SecurityUtilsTest(TestCase): block_private=True, ) ) + + def test_is_safe_url_allows_configured_private_range_after_domain_match(self): + """ + 图片域名命中 allowlist 后,可通过配置允许 TUN fake-ip 等特定非公网网段。 + """ + with patch( + "app.utils.security.socket.getaddrinfo", + return_value=[ + ( + socket.AF_INET, + socket.SOCK_STREAM, + 0, + "", + ("198.18.16.96", 0), + ) + ], + ), patch("app.utils.security.logger.debug") as debug_log: + self.assertTrue( + SecurityUtils.is_safe_url( + "https://img1.doubanio.com/poster.webp", + {"doubanio.com"}, + block_private=True, + allowed_private_ranges=["198.18.0.0/15"], + ) + ) + debug_message = debug_log.call_args.args[0] + self.assertIn("ips=198.18.16.96", debug_message) + self.assertIn("ranges=198.18.0.0/15", debug_message) + + def test_is_safe_url_blocks_configured_private_range_without_domain_match(self): + """ + 非公网网段例外必须依附域名白名单,不能单独放行任意用户 URL。 + """ + with patch( + "app.utils.security.socket.getaddrinfo", + return_value=[ + ( + socket.AF_INET, + socket.SOCK_STREAM, + 0, + "", + ("198.18.16.96", 0), + ) + ], + ): + self.assertFalse( + SecurityUtils.is_safe_url( + "https://attacker.example.com/poster.webp", + {"doubanio.com"}, + block_private=True, + allowed_private_ranges=["198.18.0.0/15"], + ) + ) + + def test_is_safe_url_blocks_private_result_outside_configured_range(self): + """ + 仅允许显式配置的非公网网段,其它内网解析结果仍按 SSRF 风险拦截。 + """ + with patch( + "app.utils.security.socket.getaddrinfo", + return_value=[ + ( + socket.AF_INET, + socket.SOCK_STREAM, + 0, + "", + ("10.0.0.8", 0), + ) + ], + ): + self.assertFalse( + SecurityUtils.is_safe_url( + "https://assets.example.com/poster.jpg", + {"example.com"}, + block_private=True, + allowed_private_ranges=["198.18.0.0/15"], + ) + ) + + def test_is_safe_url_blocks_mixed_allowed_and_disallowed_private_results(self): + """ + 同一域名的解析结果必须全部落在允许网段内,避免部分安全结果掩盖风险地址。 + """ + with patch( + "app.utils.security.socket.getaddrinfo", + return_value=[ + ( + socket.AF_INET, + socket.SOCK_STREAM, + 0, + "", + ("198.18.16.96", 0), + ), + ( + socket.AF_INET, + socket.SOCK_STREAM, + 0, + "", + ("10.0.0.8", 0), + ), + ], + ): + self.assertFalse( + SecurityUtils.is_safe_url( + "https://assets.example.com/poster.jpg", + {"example.com"}, + block_private=True, + allowed_private_ranges=["198.18.0.0/15"], + ) + ) diff --git a/tests/test_system_nettest.py b/tests/test_system_nettest.py index 3939aed3..31b2b2ac 100644 --- a/tests/test_system_nettest.py +++ b/tests/test_system_nettest.py @@ -1,4 +1,5 @@ import asyncio +import ipaddress import sys import unittest from types import ModuleType, SimpleNamespace @@ -134,6 +135,47 @@ class NettestSecurityTest(unittest.TestCase): self.assertIsNone(resp) + def test_fetch_image_allows_configured_private_range_after_domain_match(self): + """ + 图片代理在域名白名单命中后,可按配置放行指定非公网解析网段。 + """ + image_helper = Mock() + image_helper.async_fetch_image = AsyncMock(return_value=b"image-bytes") + + with patch.object(system_endpoint, "ImageHelper", return_value=image_helper), patch.object( + system_endpoint.HashUtils, "md5", return_value="etag", create=True + ), patch.object( + system_endpoint.RequestUtils, "generate_cache_headers", return_value={}, create=True + ), patch.object( + system_endpoint.SecurityUtils, + "_is_global_hostname", + return_value=False, + ), patch.object( + system_endpoint.SecurityUtils, + "_hostname_addresses", + return_value=[ipaddress.ip_address("198.18.16.96")], + ), patch.object( + system_endpoint.settings, + "IMAGE_PROXY_ALLOWED_PRIVATE_RANGES", + ["198.18.0.0/15"], + ), patch( + "app.utils.security.logger.debug", + ): + resp = asyncio.run( + system_endpoint.fetch_image( + url="https://img1.doubanio.com/poster.webp", + allowed_domains={"doubanio.com"}, + ) + ) + + self.assertEqual(resp.status_code, 200) + image_helper.async_fetch_image.assert_awaited_once_with( + url="https://img1.doubanio.com/poster.webp", + proxy=None, + use_cache=False, + cookies=None, + ) + def test_fetch_image_blocks_tampered_signed_private_url(self): """ 私网签名绑定完整 URL,改动路径后不能继续代理。