Files
archived-MoviePilot/app/api/endpoints/auth.py

71 lines
2.0 KiB
Python

from typing import Any
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from app import schemas
from app.core.auth_bridge import build_token_response, consume_plugin_auth_ticket
from app.core.plugin import PluginManager
from app.db.models.passkey import PassKey
from app.db.models.user import User
router = APIRouter()
class AuthExchangeRequest(BaseModel):
"""
插件认证票据兑换请求。
"""
ticket: str
def _system_auth_providers() -> list[dict[str, Any]]:
"""
获取系统内建的匿名登录方式摘要。
:return: 系统认证提供方列表
"""
has_passkey = bool(PassKey.list(db=None))
return [
{
"id": "system:passkey",
"type": "system",
"method": "passkey",
"name": "通行密钥",
"icon": "material-symbols:passkey",
"enabled": has_passkey,
}
]
@router.get("/providers", summary="查询登录认证提供方", response_model=list[dict])
def auth_providers() -> list[dict[str, Any]]:
"""
查询系统和插件提供的登录认证入口。
:return: 认证提供方摘要列表
"""
providers = _system_auth_providers()
providers.extend(PluginManager().get_plugin_auth_providers())
return [provider for provider in providers if provider.get("enabled", True)]
@router.post("/exchange", summary="兑换插件认证登录票据", response_model=schemas.Token)
def auth_exchange(body: AuthExchangeRequest) -> schemas.Token:
"""
将插件认证成功后生成的一次性票据兑换为系统 Token。
:param body: 票据兑换请求
:return: 标准登录 Token
"""
ticket_data = consume_plugin_auth_ticket(body.ticket)
if not ticket_data:
raise HTTPException(status_code=401, detail="认证票据无效或已过期")
user = User.get(db=None, rid=ticket_data.get("user_id"))
if not user or not user.is_active:
raise HTTPException(status_code=403, detail="用户不存在或已禁用")
return build_token_response(user)