"""Prometheus metrics helpers for embedding-service."""
from typing import Any
from lalandre_core.utils.metrics_utils import (
LATENCY_BUCKETS as _LATENCY_BUCKETS,
)
from lalandre_core.utils.metrics_utils import (
classify_error as _classify_error,
)
from lalandre_core.utils.metrics_utils import (
normalize_label as _normalize_label,
)
from lalandre_core.utils.metrics_utils import (
status_class as _status_class,
)
from prometheus_client import Counter, Histogram
HTTP_REQUESTS_TOTAL = Counter(
"lalandre_embedding_service_http_requests_total",
"Embedding service HTTP requests by path, method, and status class.",
["path", "method", "status_class"],
)
HTTP_REQUEST_DURATION_SECONDS = Histogram(
"lalandre_embedding_service_http_request_duration_seconds",
"Embedding service HTTP request duration.",
["path", "method"],
buckets=_LATENCY_BUCKETS,
)
EMBED_REQUESTS_TOTAL = Counter(
"lalandre_embedding_service_embed_requests_total",
"Embedding endpoint calls by endpoint/provider/outcome.",
["endpoint", "provider", "outcome"],
)
EMBED_DURATION_SECONDS = Histogram(
"lalandre_embedding_service_embed_duration_seconds",
"Embedding endpoint duration by endpoint/provider/outcome.",
["endpoint", "provider", "outcome"],
buckets=_LATENCY_BUCKETS,
)
EMBED_BATCH_SIZE = Histogram(
"lalandre_embedding_service_embed_batch_size",
"Requested number of texts per embedding call.",
["endpoint", "provider"],
buckets=(1, 2, 4, 8, 16, 32, 64, 128),
)
EMBED_ERRORS_TOTAL = Counter(
"lalandre_embedding_service_embed_errors_total",
"Embedding endpoint errors by endpoint/provider/type.",
["endpoint", "provider", "error_type"],
)
[docs]
def observe_http_request(
*,
path: str,
method: str,
status_code: int,
duration_seconds: float,
) -> None:
"""Record one HTTP request handled by the embedding service."""
normalized_path = _normalize_path(path)
normalized_method = _normalize_label(method.upper())
HTTP_REQUESTS_TOTAL.labels(
path=normalized_path,
method=normalized_method,
status_class=_status_class(status_code),
).inc()
HTTP_REQUEST_DURATION_SECONDS.labels(
path=normalized_path,
method=normalized_method,
).observe(max(float(duration_seconds), 0.0))
[docs]
def observe_embed_request(
*,
endpoint: str,
provider: str,
batch_size: int,
duration_seconds: float,
outcome: str,
) -> None:
"""Record one embedding request outcome and latency."""
normalized_endpoint = _normalize_endpoint(endpoint)
normalized_provider = _normalize_label(provider)
normalized_outcome = _normalize_label(outcome)
EMBED_REQUESTS_TOTAL.labels(
endpoint=normalized_endpoint,
provider=normalized_provider,
outcome=normalized_outcome,
).inc()
EMBED_DURATION_SECONDS.labels(
endpoint=normalized_endpoint,
provider=normalized_provider,
outcome=normalized_outcome,
).observe(max(float(duration_seconds), 0.0))
EMBED_BATCH_SIZE.labels(
endpoint=normalized_endpoint,
provider=normalized_provider,
).observe(max(int(batch_size), 1))
[docs]
def observe_embed_error(
*,
endpoint: str,
provider: str,
exc_or_reason: Any,
) -> None:
"""Record one embedding request error."""
_, error_type = _classify_error(exc_or_reason)
EMBED_ERRORS_TOTAL.labels(
endpoint=_normalize_endpoint(endpoint),
provider=_normalize_label(provider),
error_type=error_type,
).inc()
def _normalize_path(path: str) -> str:
if path in {"/", "/health", "/info", "/embed", "/embed/single", "/metrics"}:
return path
return "other"
def _normalize_endpoint(endpoint: str) -> str:
if endpoint in {"embed", "embed_single"}:
return endpoint
return "other"