Source code for rag_service.metrics.retrieval
"""Prometheus backend for lalandre_rag runtime retrieval metrics."""
from typing import Any
from lalandre_core.utils.metrics_utils import (
classify_error as _classify_error,
)
from lalandre_core.utils.metrics_utils import (
normalize_label as _normalize_label,
)
from lalandre_rag.retrieval.metrics import RetrievalMetricsRecorder
from prometheus_client import Counter
from .service import PHASE_DURATION_SECONDS
RETRIEVAL_ERRORS_TOTAL = Counter(
"lalandre_rag_retrieval_errors_total",
"Retrieval pipeline errors by operation, phase, and backend.",
["operation", "phase", "provider", "error_type"],
)
[docs]
class PrometheusRetrievalMetricsRecorder(RetrievalMetricsRecorder):
"""Prometheus-backed recorder for retrieval phase metrics."""
[docs]
def observe_phase(
self,
*,
operation: str,
phase: str,
duration_seconds: float,
) -> None:
"""Record one retrieval phase latency."""
PHASE_DURATION_SECONDS.labels(
mode=_normalize_label(operation),
phase=_normalize_label(phase),
).observe(max(float(duration_seconds), 0.0))
[docs]
def observe_error(
self,
*,
operation: str,
phase: str,
exc_or_reason: Any,
) -> None:
"""Record one retrieval error classified by provider and type."""
provider, error_type = _classify_error(exc_or_reason)
RETRIEVAL_ERRORS_TOTAL.labels(
operation=_normalize_label(operation),
phase=_normalize_label(phase),
provider=provider,
error_type=error_type,
).inc()