Source code for api_gateway.deps

"""Shared FastAPI dependencies for api-gateway."""

from typing import Optional

import jwt as pyjwt
from fastapi import HTTPException, Request


[docs] def get_runtime_config(request: Request) -> tuple[str, float]: """Return (rag_service_url, timeout_seconds) from app state.""" rag_service_url = getattr(request.app.state, "rag_service_url", None) if not rag_service_url: raise HTTPException(status_code=500, detail="RAG service URL not configured") timeout = getattr(request.app.state, "rag_proxy_timeout_seconds", 300.0) return rag_service_url, float(timeout)
[docs] def extract_user_id(request: Request) -> Optional[str]: """Best-effort extraction of user_id (sub) from JWT without re-validating. When auth is disabled the header won't be present — returns None. """ auth_header = request.headers.get("authorization", "") if not auth_header.startswith("Bearer "): return None try: claims = pyjwt.decode(auth_header[7:], options={"verify_signature": False}) return claims.get("sub") except Exception: return None