perf(security): make image proxy signature stable to enable client caching (#5835)

This commit is contained in:
InfinityPacer
2026-05-25 16:46:29 +08:00
committed by GitHub
parent d57deb1df1
commit 7ab1a668cb
2 changed files with 65 additions and 35 deletions

View File

@@ -3,7 +3,6 @@ import hmac
import ipaddress
import socket
import threading
import time
from hashlib import sha256
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Set, Union
@@ -57,7 +56,6 @@ def _resolve_addrinfo_to_ips(
class SecurityUtils:
_SIGNED_URL_PURPOSE = "image-proxy"
_SIGNED_URL_EXPIRE_SECONDS = 86400
@staticmethod
def is_safe_path(base_path: Path, user_path: Path,
@@ -377,23 +375,26 @@ class SecurityUtils:
)
@staticmethod
def _url_signature_payload(url: str, expires_at: int, purpose: str) -> bytes:
def _url_signature_payload(url: str, purpose: str) -> bytes:
"""
构造 URL 签名载荷。
签名覆盖用途、过期时间和完整 URL确保同一个签名不能挪用到其它
内网地址或其它代理用途。
签名覆盖用途完整 URL确保同一个签名不能挪用到其它代理用途或其它 URL。
"""
return f"{purpose}\n{expires_at}\n{url}".encode("utf-8")
return f"{purpose}\n{url}".encode("utf-8")
@staticmethod
def _sign_url_payload(url: str, expires_at: int, purpose: str) -> str:
def _sign_url_payload(url: str, purpose: str) -> str:
"""
使用 RESOURCE_SECRET_KEY 对 URL 签名载荷生成 HMAC。
相同 `(url, purpose, RESOURCE_SECRET_KEY)` 组合在进程生命周期内输出
完全一致;签名的失效边界绑定在 `RESOURCE_SECRET_KEY` 上,进程重启
或显式轮换密钥时所有旧签名一起作废。
"""
return hmac.new(
settings.RESOURCE_SECRET_KEY.encode("utf-8"),
SecurityUtils._url_signature_payload(url, expires_at, purpose),
SecurityUtils._url_signature_payload(url, purpose),
sha256,
).hexdigest()
@@ -413,14 +414,19 @@ class SecurityUtils:
@staticmethod
def sign_url(
url: str,
expires_in: int = _SIGNED_URL_EXPIRE_SECONDS,
purpose: str = _SIGNED_URL_PURPOSE,
) -> str:
"""
给服务端返回的资源 URL 添加临时签名。
给服务端返回的资源 URL 添加稳定签名。
签名用于允许 `/system/img` 代理访问服务端已经确认过的私网图片 URL
避免代理端点重新依赖媒体服务器的具体路径规则。
签名作为 `/system/img` 代理放行私网图片 URL 的能力凭证:图片代理默认
拒绝解析到非公网地址的 URL防 SSRF合法媒体服务器 URL 必须由后端
预先签名后才能跳过该限制。
签名为 `(url, purpose, RESOURCE_SECRET_KEY)` 的确定性 HMAC**不带
过期时间**:相同 URL 多次调用结果完全一致,让浏览器与 Service Worker
的缓存能稳定命中;失效边界由 `RESOURCE_SECRET_KEY` 控制——进程重启
自动重生成、或者运维显式轮换后所有历史签名一起作废。
"""
if not url:
return url
@@ -428,11 +434,9 @@ class SecurityUtils:
if parsed_url.scheme not in {"http", "https"} or not parsed_url.netloc:
return url
clean_url = SecurityUtils.strip_url_signature(url)
expires_at = int(time.time() + expires_in)
signature = SecurityUtils._sign_url_payload(clean_url, expires_at, purpose)
signature = SecurityUtils._sign_url_payload(clean_url, purpose)
fragment = urlencode(
{
"mp_exp": str(expires_at),
"mp_sig": signature,
"mp_purpose": purpose,
}
@@ -446,6 +450,9 @@ class SecurityUtils:
) -> Optional[str]:
"""
验证 URL fragment 中的代理签名,成功时返回去签名后的真实 URL。
签名只校验 `(url, purpose, RESOURCE_SECRET_KEY)`,密钥轮换/进程重启
后旧签名自动失效。
"""
if not url:
return None
@@ -453,22 +460,13 @@ class SecurityUtils:
if parsed_url.scheme not in {"http", "https"} or not parsed_url.netloc:
return None
fragment_params = dict(parse_qsl(parsed_url.fragment, keep_blank_values=True))
expires_at = fragment_params.get("mp_exp")
signature = fragment_params.get("mp_sig")
signed_purpose = fragment_params.get("mp_purpose")
if not expires_at or not signature or signed_purpose != purpose:
return None
try:
expires_at_int = int(expires_at)
except ValueError:
return None
if expires_at_int < int(time.time()):
if not signature or signed_purpose != purpose:
return None
clean_url = SecurityUtils.strip_url_signature(url)
expected_signature = SecurityUtils._sign_url_payload(
clean_url, expires_at_int, purpose
)
expected_signature = SecurityUtils._sign_url_payload(clean_url, purpose)
if not hmac.compare_digest(signature, expected_signature):
return None
return clean_url

View File

@@ -27,7 +27,8 @@ class SecurityUtilsTest(TestCase):
signed_url = SecurityUtils.sign_url(url)
self.assertIn("#mp_exp=", signed_url)
self.assertIn("#mp_sig=", signed_url)
self.assertIn("mp_purpose=image-proxy", signed_url)
self.assertEqual(SecurityUtils.verify_signed_url(signed_url), url)
self.assertEqual(SecurityUtils.strip_url_signature(signed_url), url)
@@ -45,19 +46,50 @@ class SecurityUtilsTest(TestCase):
self.assertIsNone(SecurityUtils.verify_signed_url(tampered_url))
def test_signed_url_rejects_expired_signature(self):
def test_signed_url_is_deterministic_for_same_inputs(self):
"""
已过期签名不能继续放行私网图片代理请求。
相同 URL 与 RESOURCE_SECRET_KEY 多次签名结果必须完全一致,
保证浏览器 / Service Worker 缓存能稳定命中。
"""
with patch("app.utils.security.time.time", return_value=1000):
signed_url = SecurityUtils.sign_url(
"http://192.168.1.50:8096/Items/abc/Images/Primary",
expires_in=10,
)
url = "http://192.168.1.50:8096/Items/abc/Images/Primary"
with patch("app.utils.security.time.time", return_value=1011):
first = SecurityUtils.sign_url(url)
second = SecurityUtils.sign_url(url)
self.assertEqual(first, second)
self.assertEqual(SecurityUtils.verify_signed_url(first), url)
def test_signed_url_invalidated_after_secret_rotation(self):
"""
`RESOURCE_SECRET_KEY` 变更(进程重启或运维显式轮换)后旧签名必须作废,
作为签名长期有效模型的失效兜底。
"""
url = "http://192.168.1.50:8096/Items/abc/Images/Primary"
with patch(
"app.utils.security.settings.RESOURCE_SECRET_KEY",
"old-secret-value-aaaaaaaaaaaaaaaaaaaaaaaa",
):
signed_url = SecurityUtils.sign_url(url)
self.assertEqual(SecurityUtils.verify_signed_url(signed_url), url)
with patch(
"app.utils.security.settings.RESOURCE_SECRET_KEY",
"new-secret-value-bbbbbbbbbbbbbbbbbbbbbbbb",
):
self.assertIsNone(SecurityUtils.verify_signed_url(signed_url))
def test_signed_url_rejects_other_purpose(self):
"""
签名绑定 `purpose`,挪用到其它签名用途必须被拒绝。
"""
url = "http://192.168.1.50:8096/Items/abc/Images/Primary"
signed_url = SecurityUtils.sign_url(url)
self.assertIsNone(
SecurityUtils.verify_signed_url(signed_url, purpose="other-purpose")
)
def test_is_safe_url_keeps_default_allowlist_behavior(self):
"""
默认 URL 校验保持历史 allowlist 行为,避免影响非代理调用方。