mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-05-29 23:16:48 +00:00
684 lines
23 KiB
Python
684 lines
23 KiB
Python
import socket
|
||
from unittest import TestCase
|
||
from unittest.mock import patch
|
||
|
||
from app.utils.security import (
|
||
SecurityUtils,
|
||
_dns_inflight_locks,
|
||
_dns_negative_cache,
|
||
_dns_positive_cache,
|
||
)
|
||
|
||
|
||
class SecurityUtilsTest(TestCase):
|
||
def setUp(self) -> None:
|
||
"""
|
||
每个用例前清空 DNS TTL 缓存与 in-flight 锁,避免跨用例状态污染。
|
||
"""
|
||
_dns_positive_cache.clear()
|
||
_dns_negative_cache.clear()
|
||
_dns_inflight_locks.clear()
|
||
|
||
def test_signed_url_roundtrip_returns_clean_url(self):
|
||
"""
|
||
URL 签名验证成功后返回不含签名片段的真实请求地址。
|
||
"""
|
||
url = "http://192.168.1.50:8096/Items/abc/Images/Primary?api_key=demo"
|
||
|
||
signed_url = SecurityUtils.sign_url(url)
|
||
|
||
self.assertIn("#mp_sig=", signed_url)
|
||
self.assertIn("mp_purpose=image-proxy", signed_url)
|
||
self.assertEqual(SecurityUtils.verify_signed_url(signed_url), url)
|
||
self.assertEqual(SecurityUtils.strip_url_signature(signed_url), url)
|
||
|
||
def test_signed_url_rejects_tampered_url(self):
|
||
"""
|
||
签名绑定完整 URL,签名后修改路径必须校验失败。
|
||
"""
|
||
signed_url = SecurityUtils.sign_url(
|
||
"http://192.168.1.50:8096/Items/abc/Images/Primary"
|
||
)
|
||
tampered_url = signed_url.replace(
|
||
"/Items/abc/Images/Primary",
|
||
"/System/Info/Public",
|
||
)
|
||
|
||
self.assertIsNone(SecurityUtils.verify_signed_url(tampered_url))
|
||
|
||
def test_signed_url_is_deterministic_for_same_inputs(self):
|
||
"""
|
||
相同 URL 与 RESOURCE_SECRET_KEY 多次签名结果必须完全一致,
|
||
保证浏览器 / Service Worker 缓存能稳定命中。
|
||
"""
|
||
url = "http://192.168.1.50:8096/Items/abc/Images/Primary"
|
||
|
||
first = SecurityUtils.sign_url(url)
|
||
second = SecurityUtils.sign_url(url)
|
||
|
||
self.assertEqual(first, second)
|
||
self.assertEqual(SecurityUtils.verify_signed_url(first), url)
|
||
|
||
def test_signed_url_invalidated_after_secret_rotation(self):
|
||
"""
|
||
`RESOURCE_SECRET_KEY` 变更(进程重启或运维显式轮换)后旧签名必须作废,
|
||
作为签名长期有效模型的失效兜底。
|
||
"""
|
||
url = "http://192.168.1.50:8096/Items/abc/Images/Primary"
|
||
|
||
with patch(
|
||
"app.utils.security.settings.RESOURCE_SECRET_KEY",
|
||
"old-secret-value-aaaaaaaaaaaaaaaaaaaaaaaa",
|
||
):
|
||
signed_url = SecurityUtils.sign_url(url)
|
||
self.assertEqual(SecurityUtils.verify_signed_url(signed_url), url)
|
||
|
||
with patch(
|
||
"app.utils.security.settings.RESOURCE_SECRET_KEY",
|
||
"new-secret-value-bbbbbbbbbbbbbbbbbbbbbbbb",
|
||
):
|
||
self.assertIsNone(SecurityUtils.verify_signed_url(signed_url))
|
||
|
||
def test_signed_url_rejects_other_purpose(self):
|
||
"""
|
||
签名绑定 `purpose`,挪用到其它签名用途必须被拒绝。
|
||
"""
|
||
url = "http://192.168.1.50:8096/Items/abc/Images/Primary"
|
||
signed_url = SecurityUtils.sign_url(url)
|
||
|
||
self.assertIsNone(
|
||
SecurityUtils.verify_signed_url(signed_url, purpose="other-purpose")
|
||
)
|
||
|
||
def test_is_safe_url_keeps_default_allowlist_behavior(self):
|
||
"""
|
||
默认 URL 校验保持历史 allowlist 行为,避免影响非代理调用方。
|
||
"""
|
||
self.assertTrue(
|
||
SecurityUtils.is_safe_url(
|
||
"http://192.168.1.50:8096/secret.png",
|
||
{"http://192.168.1.50:8096"},
|
||
)
|
||
)
|
||
|
||
def test_is_safe_url_blocks_private_literal_ip_when_enabled(self):
|
||
"""
|
||
启用 SSRF 防护时,即使内网 IP 命中 allowlist 也不能放行。
|
||
"""
|
||
self.assertFalse(
|
||
SecurityUtils.is_safe_url(
|
||
"http://192.168.1.50:8096/secret.png",
|
||
{"http://192.168.1.50:8096"},
|
||
block_private=True,
|
||
)
|
||
)
|
||
|
||
def test_is_safe_url_blocks_loopback_dns_result_when_enabled(self):
|
||
"""
|
||
主机名解析到回环地址时必须拒绝,防止通过域名绕过内网地址拦截。
|
||
"""
|
||
with patch(
|
||
"app.utils.security.socket.getaddrinfo",
|
||
return_value=[
|
||
(
|
||
socket.AF_INET,
|
||
socket.SOCK_STREAM,
|
||
0,
|
||
"",
|
||
("127.0.0.1", 0),
|
||
)
|
||
],
|
||
):
|
||
self.assertFalse(
|
||
SecurityUtils.is_safe_url(
|
||
"http://internal.example.com/secret.png",
|
||
{"example.com"},
|
||
block_private=True,
|
||
)
|
||
)
|
||
|
||
def test_is_safe_url_blocks_mixed_public_and_private_dns_results(self):
|
||
"""
|
||
同一域名只要存在任一非公网解析结果,就不能作为图片代理目标。
|
||
"""
|
||
with patch(
|
||
"app.utils.security.socket.getaddrinfo",
|
||
return_value=[
|
||
(
|
||
socket.AF_INET,
|
||
socket.SOCK_STREAM,
|
||
0,
|
||
"",
|
||
("93.184.216.34", 0),
|
||
),
|
||
(
|
||
socket.AF_INET,
|
||
socket.SOCK_STREAM,
|
||
0,
|
||
"",
|
||
("10.0.0.8", 0),
|
||
),
|
||
],
|
||
):
|
||
self.assertFalse(
|
||
SecurityUtils.is_safe_url(
|
||
"https://assets.example.com/poster.jpg",
|
||
{"example.com"},
|
||
block_private=True,
|
||
)
|
||
)
|
||
|
||
def test_is_safe_url_allows_public_dns_result_when_enabled(self):
|
||
"""
|
||
域名解析结果全部为公网地址且命中 allowlist 时继续允许访问。
|
||
"""
|
||
with patch(
|
||
"app.utils.security.socket.getaddrinfo",
|
||
return_value=[
|
||
(
|
||
socket.AF_INET,
|
||
socket.SOCK_STREAM,
|
||
0,
|
||
"",
|
||
("93.184.216.34", 0),
|
||
)
|
||
],
|
||
):
|
||
self.assertTrue(
|
||
SecurityUtils.is_safe_url(
|
||
"https://assets.example.com/poster.jpg",
|
||
{"example.com"},
|
||
block_private=True,
|
||
)
|
||
)
|
||
|
||
def test_is_safe_url_rejects_dns_resolution_failure_when_enabled(self):
|
||
"""
|
||
SSRF 防护无法确认目标地址时按失败处理,避免解析异常时继续请求。
|
||
"""
|
||
with patch(
|
||
"app.utils.security.socket.getaddrinfo",
|
||
side_effect=socket.gaierror,
|
||
):
|
||
self.assertFalse(
|
||
SecurityUtils.is_safe_url(
|
||
"https://assets.example.com/poster.jpg",
|
||
{"example.com"},
|
||
block_private=True,
|
||
)
|
||
)
|
||
|
||
def test_is_safe_url_allows_configured_private_range_after_domain_match(self):
|
||
"""
|
||
图片域名命中 allowlist 后,可通过配置允许 TUN fake-ip 等特定非公网网段。
|
||
"""
|
||
with patch(
|
||
"app.utils.security.socket.getaddrinfo",
|
||
return_value=[
|
||
(
|
||
socket.AF_INET,
|
||
socket.SOCK_STREAM,
|
||
0,
|
||
"",
|
||
("198.18.16.96", 0),
|
||
)
|
||
],
|
||
), patch("app.utils.security.logger.debug") as debug_log:
|
||
self.assertTrue(
|
||
SecurityUtils.is_safe_url(
|
||
"https://img1.doubanio.com/poster.webp",
|
||
{"doubanio.com"},
|
||
block_private=True,
|
||
allowed_private_ranges=["198.18.0.0/15"],
|
||
)
|
||
)
|
||
debug_message = debug_log.call_args.args[0]
|
||
self.assertIn("ips=198.18.16.96", debug_message)
|
||
self.assertIn("ranges=198.18.0.0/15", debug_message)
|
||
|
||
def test_is_safe_url_blocks_configured_private_range_without_domain_match(self):
|
||
"""
|
||
非公网网段例外必须依附域名白名单,不能单独放行任意用户 URL。
|
||
"""
|
||
with patch(
|
||
"app.utils.security.socket.getaddrinfo",
|
||
return_value=[
|
||
(
|
||
socket.AF_INET,
|
||
socket.SOCK_STREAM,
|
||
0,
|
||
"",
|
||
("198.18.16.96", 0),
|
||
)
|
||
],
|
||
):
|
||
self.assertFalse(
|
||
SecurityUtils.is_safe_url(
|
||
"https://attacker.example.com/poster.webp",
|
||
{"doubanio.com"},
|
||
block_private=True,
|
||
allowed_private_ranges=["198.18.0.0/15"],
|
||
)
|
||
)
|
||
|
||
def test_is_safe_url_blocks_private_result_outside_configured_range(self):
|
||
"""
|
||
仅允许显式配置的非公网网段,其它内网解析结果仍按 SSRF 风险拦截。
|
||
"""
|
||
with patch(
|
||
"app.utils.security.socket.getaddrinfo",
|
||
return_value=[
|
||
(
|
||
socket.AF_INET,
|
||
socket.SOCK_STREAM,
|
||
0,
|
||
"",
|
||
("10.0.0.8", 0),
|
||
)
|
||
],
|
||
):
|
||
self.assertFalse(
|
||
SecurityUtils.is_safe_url(
|
||
"https://assets.example.com/poster.jpg",
|
||
{"example.com"},
|
||
block_private=True,
|
||
allowed_private_ranges=["198.18.0.0/15"],
|
||
)
|
||
)
|
||
|
||
def test_is_safe_url_blocks_mixed_allowed_and_disallowed_private_results(self):
|
||
"""
|
||
同一域名的解析结果必须全部落在允许网段内,避免部分安全结果掩盖风险地址。
|
||
"""
|
||
with patch(
|
||
"app.utils.security.socket.getaddrinfo",
|
||
return_value=[
|
||
(
|
||
socket.AF_INET,
|
||
socket.SOCK_STREAM,
|
||
0,
|
||
"",
|
||
("198.18.16.96", 0),
|
||
),
|
||
(
|
||
socket.AF_INET,
|
||
socket.SOCK_STREAM,
|
||
0,
|
||
"",
|
||
("10.0.0.8", 0),
|
||
),
|
||
],
|
||
):
|
||
self.assertFalse(
|
||
SecurityUtils.is_safe_url(
|
||
"https://assets.example.com/poster.jpg",
|
||
{"example.com"},
|
||
block_private=True,
|
||
allowed_private_ranges=["198.18.0.0/15"],
|
||
)
|
||
)
|
||
|
||
def test_is_safe_url_async_uses_event_loop_resolver(self):
|
||
"""
|
||
异步版本通过事件循环的非阻塞 getaddrinfo 完成 SSRF 校验,
|
||
且语义与同步版本保持一致:解析到非公网地址时仍然拒绝。
|
||
"""
|
||
import asyncio
|
||
|
||
async def fake_getaddrinfo(host, *_args, **_kwargs):
|
||
self.assertEqual(host, "internal.example.com")
|
||
return [
|
||
(
|
||
socket.AF_INET,
|
||
socket.SOCK_STREAM,
|
||
0,
|
||
"",
|
||
("127.0.0.1", 0),
|
||
)
|
||
]
|
||
|
||
async def run() -> bool:
|
||
loop = asyncio.get_running_loop()
|
||
with patch.object(loop, "getaddrinfo", side_effect=fake_getaddrinfo):
|
||
return await SecurityUtils.is_safe_url_async(
|
||
"http://internal.example.com/secret.png",
|
||
{"example.com"},
|
||
block_private=True,
|
||
)
|
||
|
||
self.assertFalse(asyncio.run(run()))
|
||
|
||
def test_is_safe_url_async_hits_dns_cache(self):
|
||
"""
|
||
异步与同步版本共享 DNS TTL 缓存:同步预热后,异步版本不应再发起 DNS 查询。
|
||
"""
|
||
import asyncio
|
||
|
||
# 先用同步路径预热缓存
|
||
with patch(
|
||
"app.utils.security.socket.getaddrinfo",
|
||
return_value=[
|
||
(
|
||
socket.AF_INET,
|
||
socket.SOCK_STREAM,
|
||
0,
|
||
"",
|
||
("93.184.216.34", 0),
|
||
)
|
||
],
|
||
):
|
||
self.assertTrue(
|
||
SecurityUtils.is_safe_url(
|
||
"https://assets.example.com/poster.jpg",
|
||
{"example.com"},
|
||
block_private=True,
|
||
)
|
||
)
|
||
|
||
async def run() -> bool:
|
||
loop = asyncio.get_running_loop()
|
||
with patch.object(
|
||
loop,
|
||
"getaddrinfo",
|
||
side_effect=AssertionError("缓存命中后不应再次发起 DNS 查询"),
|
||
):
|
||
return await SecurityUtils.is_safe_url_async(
|
||
"https://assets.example.com/poster.jpg",
|
||
{"example.com"},
|
||
block_private=True,
|
||
)
|
||
|
||
self.assertTrue(asyncio.run(run()))
|
||
|
||
def test_is_safe_url_async_allows_public_dns_result(self):
|
||
"""
|
||
异步版本对全公网解析结果且命中 allowlist 时放行。
|
||
"""
|
||
import asyncio
|
||
|
||
async def fake_getaddrinfo(host, *_args, **_kwargs):
|
||
self.assertEqual(host, "assets.example.com")
|
||
return [
|
||
(
|
||
socket.AF_INET,
|
||
socket.SOCK_STREAM,
|
||
0,
|
||
"",
|
||
("93.184.216.34", 0),
|
||
)
|
||
]
|
||
|
||
async def run() -> bool:
|
||
loop = asyncio.get_running_loop()
|
||
with patch.object(loop, "getaddrinfo", side_effect=fake_getaddrinfo):
|
||
return await SecurityUtils.is_safe_url_async(
|
||
"https://assets.example.com/poster.jpg",
|
||
{"example.com"},
|
||
block_private=True,
|
||
)
|
||
|
||
self.assertTrue(asyncio.run(run()))
|
||
|
||
def test_dns_resolution_failure_populates_negative_cache(self):
|
||
"""
|
||
DNS 解析失败应回填负向缓存,避免短期内对同一目标反复触发 `getaddrinfo`。
|
||
"""
|
||
from app.utils.security import _dns_negative_cache as neg_cache
|
||
|
||
with patch(
|
||
"app.utils.security.socket.getaddrinfo",
|
||
side_effect=socket.gaierror,
|
||
) as mock_resolve:
|
||
self.assertFalse(
|
||
SecurityUtils.is_safe_url(
|
||
"https://assets.example.com/poster.jpg",
|
||
{"example.com"},
|
||
block_private=True,
|
||
)
|
||
)
|
||
self.assertEqual(mock_resolve.call_count, 1)
|
||
self.assertIn("assets.example.com", neg_cache)
|
||
|
||
self.assertFalse(
|
||
SecurityUtils.is_safe_url(
|
||
"https://assets.example.com/another.jpg",
|
||
{"example.com"},
|
||
block_private=True,
|
||
)
|
||
)
|
||
self.assertEqual(
|
||
mock_resolve.call_count,
|
||
1,
|
||
"命中负向缓存后不应再次调用 getaddrinfo",
|
||
)
|
||
|
||
def test_literal_ip_skips_dns_cache(self):
|
||
"""
|
||
URL 中的字面量 IP 走快路径,不应进入 DNS 缓存或触发 `getaddrinfo`。
|
||
"""
|
||
from app.utils.security import (
|
||
_dns_negative_cache as neg_cache,
|
||
_dns_positive_cache as pos_cache,
|
||
)
|
||
|
||
with patch(
|
||
"app.utils.security.socket.getaddrinfo",
|
||
side_effect=AssertionError("字面量 IP 不应触发 getaddrinfo"),
|
||
):
|
||
self.assertFalse(
|
||
SecurityUtils.is_safe_url(
|
||
"http://10.0.0.5:8080/secret.png",
|
||
{"http://10.0.0.5:8080"},
|
||
block_private=True,
|
||
)
|
||
)
|
||
self.assertNotIn("10.0.0.5", pos_cache)
|
||
self.assertNotIn("10.0.0.5", neg_cache)
|
||
|
||
def test_literal_ipv6_in_brackets_is_recognized(self):
|
||
"""
|
||
`urlparse` 已为 IPv6 字面量脱壳,`_literal_ip` 兼容直接传入带方括号的形式。
|
||
"""
|
||
self.assertEqual(
|
||
str(SecurityUtils._literal_ip("[::1]")),
|
||
"::1",
|
||
)
|
||
self.assertEqual(
|
||
str(SecurityUtils._literal_ip("::1")),
|
||
"::1",
|
||
)
|
||
self.assertIsNone(SecurityUtils._literal_ip("not-an-ip"))
|
||
|
||
def test_is_safe_url_async_dedupes_concurrent_inflight_queries(self):
|
||
"""
|
||
同 hostname 的并发未命中请求应通过 in-flight 锁去重,只触发一次 DNS 查询。
|
||
"""
|
||
import asyncio
|
||
|
||
call_count = 0
|
||
|
||
async def run() -> None:
|
||
nonlocal call_count
|
||
loop = asyncio.get_running_loop()
|
||
release = asyncio.Event()
|
||
|
||
async def slow_getaddrinfo(host, *_args, **_kwargs):
|
||
nonlocal call_count
|
||
call_count += 1
|
||
await release.wait()
|
||
return [
|
||
(
|
||
socket.AF_INET,
|
||
socket.SOCK_STREAM,
|
||
0,
|
||
"",
|
||
("93.184.216.34", 0),
|
||
)
|
||
]
|
||
|
||
with patch.object(loop, "getaddrinfo", side_effect=slow_getaddrinfo):
|
||
tasks = [
|
||
asyncio.create_task(
|
||
SecurityUtils.is_safe_url_async(
|
||
"https://assets.example.com/poster.jpg",
|
||
{"example.com"},
|
||
block_private=True,
|
||
)
|
||
)
|
||
for _ in range(5)
|
||
]
|
||
# 让所有任务都进入 in-flight 等待状态
|
||
await asyncio.sleep(0)
|
||
await asyncio.sleep(0)
|
||
release.set()
|
||
results = await asyncio.gather(*tasks)
|
||
|
||
self.assertTrue(all(results))
|
||
self.assertEqual(call_count, 1, "并发未命中应去重为单次 DNS 查询")
|
||
|
||
asyncio.run(run())
|
||
|
||
def test_sync_cache_access_is_thread_safe(self):
|
||
"""
|
||
同步路径下并发线程访问 DNS 缓存不应触发异常或拿到不一致结果。
|
||
TTLCache 自身非线程安全,依赖模块级 `_dns_cache_lock` 串行化读写。
|
||
"""
|
||
import threading
|
||
|
||
with patch(
|
||
"app.utils.security.socket.getaddrinfo",
|
||
return_value=[
|
||
(
|
||
socket.AF_INET,
|
||
socket.SOCK_STREAM,
|
||
0,
|
||
"",
|
||
("93.184.216.34", 0),
|
||
)
|
||
],
|
||
):
|
||
results: list = []
|
||
errors: list = []
|
||
|
||
def worker() -> None:
|
||
try:
|
||
for _ in range(50):
|
||
results.append(
|
||
SecurityUtils.is_safe_url(
|
||
"https://assets.example.com/poster.jpg",
|
||
{"example.com"},
|
||
block_private=True,
|
||
)
|
||
)
|
||
except Exception as exc: # noqa: BLE001 - 用例需捕获任意异常
|
||
errors.append(exc)
|
||
|
||
threads = [threading.Thread(target=worker) for _ in range(8)]
|
||
for t in threads:
|
||
t.start()
|
||
for t in threads:
|
||
t.join()
|
||
|
||
self.assertEqual(errors, [])
|
||
self.assertTrue(all(results))
|
||
self.assertEqual(len(results), 8 * 50)
|
||
|
||
def test_async_dns_resolution_failure_releases_inflight_lock(self):
|
||
"""
|
||
DNS 解析失败后 in-flight 锁字典中必须被清理,避免每个解析失败的 hostname
|
||
都在 `_dns_inflight_locks` 里残留一把 `asyncio.Lock`。
|
||
"""
|
||
import asyncio
|
||
|
||
async def fail_getaddrinfo(*_args, **_kwargs):
|
||
raise socket.gaierror()
|
||
|
||
async def run() -> None:
|
||
loop = asyncio.get_running_loop()
|
||
with patch.object(loop, "getaddrinfo", side_effect=fail_getaddrinfo):
|
||
result = await SecurityUtils._hostname_addresses_async(
|
||
"bad-host.example"
|
||
)
|
||
self.assertIsNone(result)
|
||
|
||
asyncio.run(run())
|
||
self.assertNotIn(
|
||
"bad-host.example",
|
||
_dns_inflight_locks,
|
||
"解析失败路径必须释放 in-flight 锁字典条目",
|
||
)
|
||
|
||
def test_async_dns_resolution_success_releases_inflight_lock(self):
|
||
"""
|
||
正常解析完成后 in-flight 锁字典也必须被清理,避免 hostname 累积。
|
||
"""
|
||
import asyncio
|
||
|
||
async def fake_getaddrinfo(*_args, **_kwargs):
|
||
return [
|
||
(
|
||
socket.AF_INET,
|
||
socket.SOCK_STREAM,
|
||
0,
|
||
"",
|
||
("93.184.216.34", 0),
|
||
)
|
||
]
|
||
|
||
async def run() -> None:
|
||
loop = asyncio.get_running_loop()
|
||
with patch.object(loop, "getaddrinfo", side_effect=fake_getaddrinfo):
|
||
result = await SecurityUtils._hostname_addresses_async(
|
||
"ok-host.example"
|
||
)
|
||
self.assertIsNotNone(result)
|
||
|
||
asyncio.run(run())
|
||
self.assertNotIn(
|
||
"ok-host.example",
|
||
_dns_inflight_locks,
|
||
"正常解析路径必须释放 in-flight 锁字典条目",
|
||
)
|
||
|
||
def test_async_dns_concurrent_waiters_release_inflight_lock(self):
|
||
"""
|
||
并发未命中场景下,所有等待者完成后 in-flight 锁字典也必须被清理,
|
||
覆盖"等到锁但缓存已被前一个协程回填"的二次返回路径。
|
||
"""
|
||
import asyncio
|
||
|
||
async def run() -> None:
|
||
loop = asyncio.get_running_loop()
|
||
release = asyncio.Event()
|
||
|
||
async def slow_getaddrinfo(*_args, **_kwargs):
|
||
await release.wait()
|
||
return [
|
||
(
|
||
socket.AF_INET,
|
||
socket.SOCK_STREAM,
|
||
0,
|
||
"",
|
||
("93.184.216.34", 0),
|
||
)
|
||
]
|
||
|
||
with patch.object(loop, "getaddrinfo", side_effect=slow_getaddrinfo):
|
||
tasks = [
|
||
asyncio.create_task(
|
||
SecurityUtils._hostname_addresses_async("multi-host.example")
|
||
)
|
||
for _ in range(5)
|
||
]
|
||
await asyncio.sleep(0)
|
||
await asyncio.sleep(0)
|
||
release.set()
|
||
await asyncio.gather(*tasks)
|
||
|
||
asyncio.run(run())
|
||
self.assertNotIn(
|
||
"multi-host.example",
|
||
_dns_inflight_locks,
|
||
"并发等待者全部退出后必须释放 in-flight 锁字典条目",
|
||
)
|