diff --git a/app/api/endpoints/system.py b/app/api/endpoints/system.py index 0a1d9268..ff8ceabc 100644 --- a/app/api/endpoints/system.py +++ b/app/api/endpoints/system.py @@ -1,11 +1,9 @@ import asyncio import json -import posixpath -import re from collections import deque from datetime import datetime from typing import Any, Optional, Union, Annotated -from urllib.parse import parse_qs, unquote, urljoin, urlparse +from urllib.parse import urljoin, urlparse import aiofiles import pillow_avif # noqa 用于自动注册AVIF支持 @@ -32,7 +30,6 @@ from app.db.user_oper import ( get_current_active_user_async, ) from app.helper.image import ImageHelper -from app.helper.mediaserver import MediaServerHelper from app.helper.message import MessageHelper from app.helper.progress import ProgressHelper from app.helper.rule import RuleHelper @@ -52,21 +49,6 @@ from version import APP_VERSION router = APIRouter() _NETTEST_REDIRECT_STATUS_CODES = {301, 302, 303, 307, 308} -_MEDIA_SERVER_IMAGE_PATH_PATTERNS = ( - re.compile( - r"^/(?:emby/)?Items/[^/]+/Images/" - r"(?:Primary|Art|Backdrop|Banner|Logo|Thumb|Disc|Box|Screenshot|Menu|Chapter)" - r"(?:/[^/]+)?$", - re.IGNORECASE, - ), - re.compile( - r"^/library/metadata/[^/]+/" - r"(?:thumb|art|banner|poster|clearlogo|clearart|background)" - r"(?:/[^/]+)?$", - re.IGNORECASE, - ), - re.compile(r"^/api/v1/sys/img/.+", re.IGNORECASE), -) def _match_nettest_prefix(url: str, prefix: str) -> bool: @@ -359,95 +341,6 @@ async def _close_nettest_response(response: Any) -> None: logger.debug(f"关闭网络测试响应失败: {err}") -def _normalize_proxy_image_path(path: str) -> str: - """ - 归一化代理图片路径,用于识别媒体服务器图片接口。 - - URL path 可能包含编码后的特殊字符,这里先解码再规范化路径,避免 - `%2e%2e` 或重复斜杠绕过后续的媒体图片路径判断。 - """ - decoded_path = unquote(path or "/") - normalized_path = posixpath.normpath(decoded_path) - if not normalized_path.startswith("/"): - normalized_path = f"/{normalized_path}" - return normalized_path - - -def _is_known_media_server_image_path(path: str) -> bool: - """ - 判断路径是否属于已知媒体服务器图片读取接口。 - - 这里仅覆盖 MoviePilot 自身会返回给前端的封面、背景图和图片流接口, - 不允许媒体服务器同 host 下的任意 API 路径继续通过图片代理访问。 - """ - normalized_path = _normalize_proxy_image_path(path) - return any( - pattern.match(normalized_path) - for pattern in _MEDIA_SERVER_IMAGE_PATH_PATTERNS - ) - - -def _is_plex_transcode_image_url(url: str) -> bool: - """ - 校验 Plex 图片转码接口只转码 Plex 自身 metadata 图片路径。 - - Plex 的 posterUrl/artUrl 可能使用 `/photo/:/transcode` 包装真实图片路径, - 因此需要额外检查 query 里的 `url` 仍然是 metadata 图片路径,而不是 - 任意可被 Plex 代取的地址。 - """ - parsed_url = urlparse(url) - if _normalize_proxy_image_path(parsed_url.path) != "/photo/:/transcode": - return False - source_path = parse_qs(parsed_url.query).get("url", [None])[0] - if not source_path: - return False - source_url = urlparse(source_path) - if source_url.scheme or source_url.netloc: - return False - return _is_known_media_server_image_path(source_path) - - -def _is_ugreen_image_stream_url(url: str) -> bool: - """ - 校验绿联本机图片流接口只代理官方 scraper 图片。 - - 绿联本地图片需要带加密鉴权头,目前模块只会把 scraper.ugnas.com 的签名图 - 转成 getImaStream,本检查避免用户把该接口改造成任意远程 URL 中转。 - """ - parsed_url = urlparse(url) - if _normalize_proxy_image_path(parsed_url.path) != "/ugreen/v2/video/getImaStream": - return False - source_url = parse_qs(parsed_url.query).get("name", [None])[0] - if not source_url: - return False - parsed_source = urlparse(source_url) - return ( - parsed_source.scheme in {"http", "https"} - and parsed_source.netloc.lower() == "scraper.ugnas.com" - ) - - -def _is_allowed_media_server_image_url( - url: str, - media_server_domains: set[str], -) -> bool: - """ - 判断内网媒体服务器 URL 是否可作为图片代理目标。 - - 私有地址默认仍然禁止;只有 URL host 精确命中已配置媒体服务器,并且路径是 - 已知图片接口时才允许访问,用于兼容前端媒体库和最近入库图片展示。 - """ - if not media_server_domains: - return False - if not SecurityUtils.is_safe_url(url, media_server_domains, strict=True): - return False - return ( - _is_known_media_server_image_path(urlparse(url).path) - or _is_plex_transcode_image_url(url) - or _is_ugreen_image_stream_url(url) - ) - - async def fetch_image( url: str, proxy: Optional[bool] = None, @@ -455,7 +348,6 @@ async def fetch_image( if_none_match: Optional[str] = None, cookies: Optional[str | dict] = None, allowed_domains: Optional[set[str]] = None, - media_server_domains: Optional[set[str]] = None, ) -> Optional[Response]: """ 处理图片缓存逻辑,支持HTTP缓存和磁盘缓存 @@ -466,17 +358,16 @@ async def fetch_image( if allowed_domains is None: allowed_domains = set(settings.SECURITY_IMAGE_DOMAINS) + fetch_url = SecurityUtils.strip_url_signature(url) # 验证URL安全性 if not SecurityUtils.is_safe_url( url, allowed_domains, block_private=True - ) and not _is_allowed_media_server_image_url( - url, media_server_domains or set() - ): + ) and not (fetch_url := SecurityUtils.verify_signed_url(url)): logger.warn(f"Blocked unsafe image URL: {url}") return None content = await ImageHelper().async_fetch_image( - url=url, + url=fetch_url, proxy=proxy, use_cache=use_cache, cookies=cookies, @@ -491,7 +382,7 @@ async def fetch_image( # 返回缓存图片 return Response( content=content, - media_type=UrlUtils.get_mime_type(url, "image/jpeg"), + media_type=UrlUtils.get_mime_type(fetch_url, "image/jpeg"), headers=headers, ) return None @@ -509,13 +400,6 @@ async def proxy_img( """ 图片代理,可选是否使用代理服务器,支持 HTTP 缓存 """ - # 媒体服务器添加图片代理支持 - hosts = [ - config.config.get("host") - for config in MediaServerHelper().get_configs().values() - if config and config.config and config.config.get("host") - ] - media_server_domains = set(hosts) allowed_domains = set(settings.SECURITY_IMAGE_DOMAINS) cookies = ( MediaServerChain().get_image_cookies(server=None, image_url=imgurl) @@ -529,7 +413,6 @@ async def proxy_img( cookies=cookies, if_none_match=if_none_match, allowed_domains=allowed_domains, - media_server_domains=media_server_domains, ) diff --git a/app/chain/mediaserver.py b/app/chain/mediaserver.py index 6f239817..1718f044 100644 --- a/app/chain/mediaserver.py +++ b/app/chain/mediaserver.py @@ -8,6 +8,7 @@ from app.db.mediaserver_oper import MediaServerOper from app.helper.service import ServiceConfigHelper from app.log import logger from app.schemas import MediaServerLibrary, MediaServerItem, MediaServerSeasonInfo, MediaServerPlayItem +from app.utils.security import SecurityUtils lock = threading.Lock() @@ -17,12 +18,54 @@ class MediaServerChain(ChainBase): 媒体服务器处理链 """ + @staticmethod + def _sign_image_url(url: Optional[str]) -> Optional[str]: + """ + 为返回前端的媒体服务器图片 URL 添加代理签名。 + """ + return SecurityUtils.sign_url(url) if url else url + + def _sign_library_images( + self, libraries: Optional[List[MediaServerLibrary]] + ) -> List[MediaServerLibrary]: + """ + 给媒体库列表中的封面和封面组添加代理签名。 + """ + for library in libraries or []: + if library.image: + library.image = self._sign_image_url(library.image) + if library.image_list: + library.image_list = [ + self._sign_image_url(image) + for image in library.image_list + if image + ] + return libraries or [] + + def _sign_play_item_images( + self, items: Optional[List[MediaServerPlayItem]] + ) -> List[MediaServerPlayItem]: + """ + 给媒体服务器播放条目中的图片 URL 添加代理签名。 + """ + for item in items or []: + if item.image: + item.image = self._sign_image_url(item.image) + return items or [] + def librarys(self, server: str, username: Optional[str] = None, hidden: bool = False) -> List[MediaServerLibrary]: """ 获取媒体服务器所有媒体库 """ - return self.run_module("mediaserver_librarys", server=server, username=username, hidden=hidden) + return self._sign_library_images( + self.run_module( + "mediaserver_librarys", + server=server, + username=username, + hidden=hidden, + ) + ) def items(self, server: str, library_id: Union[str, int], start_index: Optional[int] = 0, limit: Optional[int] = -1) -> Generator[Any, None, None]: @@ -83,22 +126,46 @@ class MediaServerChain(ChainBase): """ 获取媒体服务器正在播放信息 """ - return self.run_module("mediaserver_playing", count=count, server=server, username=username) + return self._sign_play_item_images( + self.run_module( + "mediaserver_playing", + count=count, + server=server, + username=username, + ) + ) def latest(self, server: str, count: Optional[int] = 20, username: Optional[str] = None) -> List[MediaServerPlayItem]: """ 获取媒体服务器最新入库条目 """ - return self.run_module("mediaserver_latest", count=count, server=server, username=username) + return self._sign_play_item_images( + self.run_module( + "mediaserver_latest", + count=count, + server=server, + username=username, + ) + ) def get_latest_wallpapers(self, server: Optional[str] = None, count: Optional[int] = 10, remote: bool = True, username: Optional[str] = None) -> List[str]: """ 获取最新最新入库条目海报作为壁纸,缓存1小时 """ - return self.run_module("mediaserver_latest_images", server=server, count=count, - remote=remote, username=username) + wallpapers = self.run_module( + "mediaserver_latest_images", + server=server, + count=count, + remote=remote, + username=username, + ) + return [ + self._sign_image_url(wallpaper) + for wallpaper in wallpapers or [] + if wallpaper + ] def get_latest_wallpaper(self, server: Optional[str] = None, remote: bool = True, username: Optional[str] = None) -> Optional[str]: diff --git a/app/utils/security.py b/app/utils/security.py index e41a3f2f..5f908882 100644 --- a/app/utils/security.py +++ b/app/utils/security.py @@ -1,16 +1,21 @@ +import hmac import ipaddress import socket +import time from hashlib import sha256 from pathlib import Path from typing import List, Optional, Set, Union -from urllib.parse import quote, urlparse +from urllib.parse import parse_qsl, quote, urlencode, urlparse, urlunparse from anyio import Path as AsyncPath +from app.core.config import settings from app.log import logger class SecurityUtils: + _SIGNED_URL_PURPOSE = "image-proxy" + _SIGNED_URL_EXPIRE_SECONDS = 86400 @staticmethod def is_safe_path(base_path: Path, user_path: Path, @@ -107,6 +112,103 @@ class SecurityUtils: return False return True + @staticmethod + def _url_signature_payload(url: str, expires_at: int, purpose: str) -> bytes: + """ + 构造 URL 签名载荷。 + + 签名覆盖用途、过期时间和完整 URL,确保同一个签名不能挪用到其它 + 内网地址或其它代理用途。 + """ + return f"{purpose}\n{expires_at}\n{url}".encode("utf-8") + + @staticmethod + def _sign_url_payload(url: str, expires_at: int, purpose: str) -> str: + """ + 使用 RESOURCE_SECRET_KEY 对 URL 签名载荷生成 HMAC。 + """ + return hmac.new( + settings.RESOURCE_SECRET_KEY.encode("utf-8"), + SecurityUtils._url_signature_payload(url, expires_at, purpose), + sha256, + ).hexdigest() + + @staticmethod + def strip_url_signature(url: str) -> str: + """ + 移除 URL fragment 中的代理签名信息,得到真正要请求的地址。 + + 图片代理签名放在 fragment 中,浏览器会把它传给 MoviePilot,但 HTTP + 客户端请求媒体服务器前不能把这些内部参数带过去。 + """ + if not url: + return url + parsed_url = urlparse(url) + return urlunparse(parsed_url._replace(fragment="")) + + @staticmethod + def sign_url( + url: str, + expires_in: int = _SIGNED_URL_EXPIRE_SECONDS, + purpose: str = _SIGNED_URL_PURPOSE, + ) -> str: + """ + 给服务端返回的资源 URL 添加临时签名。 + + 该签名用于允许 `/system/img` 代理访问服务端已经确认过的私网图片 URL, + 避免代理端点重新依赖媒体服务器的具体路径规则。 + """ + if not url: + return url + parsed_url = urlparse(url) + if parsed_url.scheme not in {"http", "https"} or not parsed_url.netloc: + return url + clean_url = SecurityUtils.strip_url_signature(url) + expires_at = int(time.time() + expires_in) + signature = SecurityUtils._sign_url_payload(clean_url, expires_at, purpose) + fragment = urlencode( + { + "mp_exp": str(expires_at), + "mp_sig": signature, + "mp_purpose": purpose, + } + ) + return urlunparse(urlparse(clean_url)._replace(fragment=fragment)) + + @staticmethod + def verify_signed_url( + url: str, + purpose: str = _SIGNED_URL_PURPOSE, + ) -> Optional[str]: + """ + 验证 URL fragment 中的代理签名,成功时返回去签名后的真实 URL。 + """ + if not url: + return None + parsed_url = urlparse(url) + if parsed_url.scheme not in {"http", "https"} or not parsed_url.netloc: + return None + fragment_params = dict(parse_qsl(parsed_url.fragment, keep_blank_values=True)) + expires_at = fragment_params.get("mp_exp") + signature = fragment_params.get("mp_sig") + signed_purpose = fragment_params.get("mp_purpose") + if not expires_at or not signature or signed_purpose != purpose: + return None + try: + expires_at_int = int(expires_at) + except ValueError: + return None + if expires_at_int < int(time.time()): + return None + + clean_url = SecurityUtils.strip_url_signature(url) + expected_signature = SecurityUtils._sign_url_payload( + clean_url, expires_at_int, purpose + ) + if not hmac.compare_digest(signature, expected_signature): + return None + return clean_url + @staticmethod def is_safe_url( url: str, diff --git a/tests/test_mediaserver_image_signing.py b/tests/test_mediaserver_image_signing.py new file mode 100644 index 00000000..773f87e0 --- /dev/null +++ b/tests/test_mediaserver_image_signing.py @@ -0,0 +1,82 @@ +import sys +import unittest +from unittest.mock import Mock + +for _module_name in ( + "app.chain.mediaserver", + "app.db.models", + "app.db.user_oper", + "app.helper.message", + "app.utils.crypto", +): + if _module_name in sys.modules and not hasattr( + sys.modules[_module_name], "__file__" + ): + del sys.modules[_module_name] + +from app.chain.mediaserver import MediaServerChain +from app.schemas import MediaServerLibrary, MediaServerPlayItem +from app.utils.security import SecurityUtils + + +class MediaServerImageSigningTest(unittest.TestCase): + @staticmethod + def _build_chain(result): + """ + 构造只带 run_module 的 MediaServerChain,避免单测初始化真实模块管理器。 + """ + chain = MediaServerChain.__new__(MediaServerChain) + chain.run_module = Mock(return_value=result) + return chain + + def test_librarys_signs_image_fields(self): + """ + 媒体库接口返回前需要给 image 和 image_list 加签。 + """ + image = "http://192.168.1.50:8096/Items/lib/Images/Primary" + image_list = [ + "http://192.168.1.50:32400/library/metadata/1/thumb/1", + ] + chain = self._build_chain( + [ + MediaServerLibrary( + id="lib", + image=image, + image_list=image_list, + ) + ] + ) + + result = chain.librarys(server="jellyfin") + + self.assertEqual(SecurityUtils.verify_signed_url(result[0].image), image) + self.assertEqual( + SecurityUtils.verify_signed_url(result[0].image_list[0]), + image_list[0], + ) + + def test_latest_signs_play_item_images(self): + """ + 最近入库接口返回前需要给条目图片加签。 + """ + image = "http://192.168.1.50:8096/Items/item/Images/Backdrop" + chain = self._build_chain([MediaServerPlayItem(id="item", image=image)]) + + result = chain.latest(server="jellyfin") + + self.assertEqual(SecurityUtils.verify_signed_url(result[0].image), image) + + def test_latest_wallpapers_signs_urls(self): + """ + 媒体服务器壁纸 URL 返回前也需要加签。 + """ + wallpaper = "http://192.168.1.50:8096/Items/item/Images/Backdrop" + chain = self._build_chain([wallpaper]) + + result = chain.get_latest_wallpapers() + + self.assertEqual(SecurityUtils.verify_signed_url(result[0]), wallpaper) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_security_utils.py b/tests/test_security_utils.py index 61ae2d53..4cf61c13 100644 --- a/tests/test_security_utils.py +++ b/tests/test_security_utils.py @@ -6,6 +6,44 @@ from app.utils.security import SecurityUtils class SecurityUtilsTest(TestCase): + def test_signed_url_roundtrip_returns_clean_url(self): + """ + URL 签名验证成功后返回不含签名片段的真实请求地址。 + """ + url = "http://192.168.1.50:8096/Items/abc/Images/Primary?api_key=demo" + + signed_url = SecurityUtils.sign_url(url) + + self.assertIn("#mp_exp=", signed_url) + self.assertEqual(SecurityUtils.verify_signed_url(signed_url), url) + self.assertEqual(SecurityUtils.strip_url_signature(signed_url), url) + + def test_signed_url_rejects_tampered_url(self): + """ + 签名绑定完整 URL,签名后修改路径必须校验失败。 + """ + signed_url = SecurityUtils.sign_url( + "http://192.168.1.50:8096/Items/abc/Images/Primary" + ) + tampered_url = signed_url.replace( + "/Items/abc/Images/Primary", + "/System/Info/Public", + ) + + self.assertIsNone(SecurityUtils.verify_signed_url(tampered_url)) + + def test_signed_url_rejects_expired_signature(self): + """ + 已过期签名不能继续放行私网图片代理请求。 + """ + with patch("app.utils.security.time.time", return_value=1000): + signed_url = SecurityUtils.sign_url( + "http://192.168.1.50:8096/Items/abc/Images/Primary", + expires_in=10, + ) + + with patch("app.utils.security.time.time", return_value=1011): + self.assertIsNone(SecurityUtils.verify_signed_url(signed_url)) def test_is_safe_url_keeps_default_allowlist_behavior(self): """ diff --git a/tests/test_system_nettest.py b/tests/test_system_nettest.py index 5a5811a3..3939aed3 100644 --- a/tests/test_system_nettest.py +++ b/tests/test_system_nettest.py @@ -37,7 +37,12 @@ _stub_module("app.chain.media", MediaChain=_Dummy) _stub_module("app.chain.mediaserver", MediaServerChain=_Dummy) _stub_module("app.chain.search", SearchChain=_Dummy) _stub_module("app.chain.system", SystemChain=_Dummy) -_stub_module("app.core.event", eventmanager=_Dummy(), Event=_Dummy) +_stub_module( + "app.core.event", + eventmanager=_Dummy(), + Event=_Dummy, + EventManager=_Dummy, +) _stub_module("app.core.metainfo", MetaInfo=_Dummy) _stub_module("app.core.module", ModuleManager=_Dummy) _stub_module( @@ -82,10 +87,12 @@ from app.api.endpoints import system as system_endpoint class NettestSecurityTest(unittest.TestCase): - def test_fetch_image_allows_private_media_server_image_path(self): + def test_fetch_image_allows_signed_private_url(self): """ - 已配置媒体服务器的内网图片接口需要继续允许代理,保证前端封面显示。 + 服务端签名过的私网图片 URL 可以继续代理,保证前端封面显示。 """ + image_url = "http://192.168.1.50:8096/System/Info/Public" + signed_url = system_endpoint.SecurityUtils.sign_url(image_url) image_helper = Mock() image_helper.async_fetch_image = AsyncMock(return_value=b"image-bytes") @@ -96,14 +103,18 @@ class NettestSecurityTest(unittest.TestCase): ): resp = asyncio.run( system_endpoint.fetch_image( - url="http://192.168.1.50:8096/Items/abc/Images/Primary", - allowed_domains={"http://192.168.1.50:8096"}, - media_server_domains={"http://192.168.1.50:8096"}, + url=signed_url, + allowed_domains=set(), ) ) self.assertEqual(resp.status_code, 200) - image_helper.async_fetch_image.assert_awaited_once() + image_helper.async_fetch_image.assert_awaited_once_with( + url=image_url, + proxy=None, + use_cache=False, + cookies=None, + ) def test_fetch_image_blocks_private_allowed_url_before_request(self): """ @@ -123,39 +134,23 @@ class NettestSecurityTest(unittest.TestCase): self.assertIsNone(resp) - def test_fetch_image_blocks_private_media_server_non_image_path(self): + def test_fetch_image_blocks_tampered_signed_private_url(self): """ - 媒体服务器 host 只放行图片接口,不放行同 host 下的任意 API。 + 私网签名绑定完整 URL,改动路径后不能继续代理。 """ + signed_url = system_endpoint.SecurityUtils.sign_url( + "http://192.168.1.50:8096/Items/abc/Images/Primary" + ).replace("/Items/abc/Images/Primary", "/System/Info/Public") + class FailIfCalled: def __init__(self, *args, **kwargs): - raise AssertionError("fetch_image should block non-image media server paths") + raise AssertionError("fetch_image should block tampered signed URLs") with patch.object(system_endpoint, "ImageHelper", FailIfCalled): resp = asyncio.run( system_endpoint.fetch_image( - url="http://192.168.1.50:8096/System/Info/Public", - allowed_domains={"http://192.168.1.50:8096"}, - media_server_domains={"http://192.168.1.50:8096"}, - ) - ) - - self.assertIsNone(resp) - - def test_fetch_image_blocks_traversal_in_media_server_image_path(self): - """ - 编码后的路径穿越不能借媒体图片前缀绕过私网 SSRF 防护。 - """ - class FailIfCalled: - def __init__(self, *args, **kwargs): - raise AssertionError("fetch_image should block traversal image paths") - - with patch.object(system_endpoint, "ImageHelper", FailIfCalled): - resp = asyncio.run( - system_endpoint.fetch_image( - url="http://192.168.1.50:5666/api/v1/sys/img/%2e%2e/manager/user/list", - allowed_domains={"http://192.168.1.50:5666"}, - media_server_domains={"http://192.168.1.50:5666"}, + url=signed_url, + allowed_domains=set(), ) )