Source code for rag_service.routers._deps

"""Shared FastAPI dependencies for rag-service routers."""

from typing import Optional

from fastapi import HTTPException, Request
from rag_service.bootstrap import RagComponents


[docs] def get_components(request: Request) -> RagComponents: """Return initialized rag-service components from the FastAPI state.""" components = getattr(request.app.state, "components", None) if components is None: raise HTTPException(status_code=503, detail="Service not initialized yet") if not isinstance(components, RagComponents): raise HTTPException(status_code=503, detail="Service components are not ready") return components
[docs] def extract_user_id(request: Request) -> Optional[str]: """Extract user_id from X-User-Id header (set by api-gateway from JWT sub claim).""" return request.headers.get("x-user-id") or None