Source code for embedding_service.service_metrics

"""Prometheus metrics helpers for embedding-service."""

from typing import Any

from lalandre_core.utils.metrics_utils import (
    LATENCY_BUCKETS as _LATENCY_BUCKETS,
)
from lalandre_core.utils.metrics_utils import (
    classify_error as _classify_error,
)
from lalandre_core.utils.metrics_utils import (
    normalize_label as _normalize_label,
)
from lalandre_core.utils.metrics_utils import (
    status_class as _status_class,
)
from prometheus_client import Counter, Histogram

HTTP_REQUESTS_TOTAL = Counter(
    "lalandre_embedding_service_http_requests_total",
    "Embedding service HTTP requests by path, method, and status class.",
    ["path", "method", "status_class"],
)

HTTP_REQUEST_DURATION_SECONDS = Histogram(
    "lalandre_embedding_service_http_request_duration_seconds",
    "Embedding service HTTP request duration.",
    ["path", "method"],
    buckets=_LATENCY_BUCKETS,
)

EMBED_REQUESTS_TOTAL = Counter(
    "lalandre_embedding_service_embed_requests_total",
    "Embedding endpoint calls by endpoint/provider/outcome.",
    ["endpoint", "provider", "outcome"],
)

EMBED_DURATION_SECONDS = Histogram(
    "lalandre_embedding_service_embed_duration_seconds",
    "Embedding endpoint duration by endpoint/provider/outcome.",
    ["endpoint", "provider", "outcome"],
    buckets=_LATENCY_BUCKETS,
)

EMBED_BATCH_SIZE = Histogram(
    "lalandre_embedding_service_embed_batch_size",
    "Requested number of texts per embedding call.",
    ["endpoint", "provider"],
    buckets=(1, 2, 4, 8, 16, 32, 64, 128),
)

EMBED_ERRORS_TOTAL = Counter(
    "lalandre_embedding_service_embed_errors_total",
    "Embedding endpoint errors by endpoint/provider/type.",
    ["endpoint", "provider", "error_type"],
)


[docs] def observe_http_request( *, path: str, method: str, status_code: int, duration_seconds: float, ) -> None: """Record one HTTP request handled by the embedding service.""" normalized_path = _normalize_path(path) normalized_method = _normalize_label(method.upper()) HTTP_REQUESTS_TOTAL.labels( path=normalized_path, method=normalized_method, status_class=_status_class(status_code), ).inc() HTTP_REQUEST_DURATION_SECONDS.labels( path=normalized_path, method=normalized_method, ).observe(max(float(duration_seconds), 0.0))
[docs] def observe_embed_request( *, endpoint: str, provider: str, batch_size: int, duration_seconds: float, outcome: str, ) -> None: """Record one embedding request outcome and latency.""" normalized_endpoint = _normalize_endpoint(endpoint) normalized_provider = _normalize_label(provider) normalized_outcome = _normalize_label(outcome) EMBED_REQUESTS_TOTAL.labels( endpoint=normalized_endpoint, provider=normalized_provider, outcome=normalized_outcome, ).inc() EMBED_DURATION_SECONDS.labels( endpoint=normalized_endpoint, provider=normalized_provider, outcome=normalized_outcome, ).observe(max(float(duration_seconds), 0.0)) EMBED_BATCH_SIZE.labels( endpoint=normalized_endpoint, provider=normalized_provider, ).observe(max(int(batch_size), 1))
[docs] def observe_embed_error( *, endpoint: str, provider: str, exc_or_reason: Any, ) -> None: """Record one embedding request error.""" _, error_type = _classify_error(exc_or_reason) EMBED_ERRORS_TOTAL.labels( endpoint=_normalize_endpoint(endpoint), provider=_normalize_label(provider), error_type=error_type, ).inc()
def _normalize_path(path: str) -> str: if path in {"/", "/health", "/info", "/embed", "/embed/single", "/metrics"}: return path return "other" def _normalize_endpoint(endpoint: str) -> str: if endpoint in {"embed", "embed_single"}: return endpoint return "other"