Source code for rag_service.metrics.retrieval

"""Prometheus backend for lalandre_rag runtime retrieval metrics."""

from typing import Any

from lalandre_core.utils.metrics_utils import (
    classify_error as _classify_error,
)
from lalandre_core.utils.metrics_utils import (
    normalize_label as _normalize_label,
)
from lalandre_rag.retrieval.metrics import RetrievalMetricsRecorder
from prometheus_client import Counter

from .service import PHASE_DURATION_SECONDS

RETRIEVAL_ERRORS_TOTAL = Counter(
    "lalandre_rag_retrieval_errors_total",
    "Retrieval pipeline errors by operation, phase, and backend.",
    ["operation", "phase", "provider", "error_type"],
)


[docs] class PrometheusRetrievalMetricsRecorder(RetrievalMetricsRecorder): """Prometheus-backed recorder for retrieval phase metrics."""
[docs] def observe_phase( self, *, operation: str, phase: str, duration_seconds: float, ) -> None: """Record one retrieval phase latency.""" PHASE_DURATION_SECONDS.labels( mode=_normalize_label(operation), phase=_normalize_label(phase), ).observe(max(float(duration_seconds), 0.0))
[docs] def observe_error( self, *, operation: str, phase: str, exc_or_reason: Any, ) -> None: """Record one retrieval error classified by provider and type.""" provider, error_type = _classify_error(exc_or_reason) RETRIEVAL_ERRORS_TOTAL.labels( operation=_normalize_label(operation), phase=_normalize_label(phase), provider=provider, error_type=error_type, ).inc()