Source code for lalandre_core.queue.worker_metrics
"""Reusable Prometheus metrics factory for Redis-based workers."""
from dataclasses import dataclass
from typing import Any, Callable
from prometheus_client import Counter, Histogram
from ..utils.metrics_utils import (
LATENCY_BUCKETS,
classify_error,
normalize_label,
)
[docs]
@dataclass(frozen=True)
class WorkerMetrics:
"""Pre-built Prometheus instruments + observe helpers for a worker."""
observe_execution: Callable[..., None]
observe_error: Callable[..., None]
[docs]
def build_worker_metrics(worker_name: str, valid_job_types: set[str]) -> WorkerMetrics:
"""Create Prometheus counters/histograms for a worker and return observe helpers.
Args:
worker_name: Short name used in metric names, such as ``"embedding"``.
valid_job_types: Whitelist of known job type labels for normalization.
"""
executions = Counter(
f"lalandre_{worker_name}_worker_job_executions_total",
f"{worker_name.title()} worker job executions by type and outcome.",
["job_type", "outcome"],
)
duration = Histogram(
f"lalandre_{worker_name}_worker_job_duration_seconds",
f"{worker_name.title()} worker job duration by type and outcome.",
["job_type", "outcome"],
buckets=LATENCY_BUCKETS,
)
errors = Counter(
f"lalandre_{worker_name}_worker_job_errors_total",
f"{worker_name.title()} worker job failures by type and error class.",
["job_type", "error_type"],
)
always_valid = {"invalid_payload", "unknown_job_type"}
all_valid = valid_job_types | always_valid
known_outcomes = {"success", "failed", "invalid_payload", "unknown_job_type"}
# Pre-initialize the primary job_type x outcome series so idle workers still
# expose zero-valued time series right after startup.
for job_type in sorted(all_valid):
for outcome in sorted(known_outcomes):
executions.labels(job_type=job_type, outcome=outcome)
duration.labels(job_type=job_type, outcome=outcome)
def _normalize_job_type(job_type: str) -> str:
normalized = normalize_label(job_type)
return normalized if normalized in all_valid else "unknown"
def observe_execution(*, job_type: str, outcome: str, duration_seconds: float) -> None:
njt = _normalize_job_type(job_type)
no = normalize_label(outcome)
executions.labels(job_type=njt, outcome=no).inc()
duration.labels(job_type=njt, outcome=no).observe(max(float(duration_seconds), 0.0))
def observe_error(*, job_type: str, exc_or_reason: Any) -> None:
_, error_type = classify_error(exc_or_reason)
errors.labels(job_type=_normalize_job_type(job_type), error_type=error_type).inc()
return WorkerMetrics(
observe_execution=observe_execution,
observe_error=observe_error,
)