Source code for lalandre_core.queue.worker_metrics

"""Reusable Prometheus metrics factory for Redis-based workers."""

from dataclasses import dataclass
from typing import Any, Callable

from prometheus_client import Counter, Histogram

from ..utils.metrics_utils import (
    LATENCY_BUCKETS,
    classify_error,
    normalize_label,
)


[docs] @dataclass(frozen=True) class WorkerMetrics: """Pre-built Prometheus instruments + observe helpers for a worker.""" observe_execution: Callable[..., None] observe_error: Callable[..., None]
[docs] def build_worker_metrics(worker_name: str, valid_job_types: set[str]) -> WorkerMetrics: """Create Prometheus counters/histograms for a worker and return observe helpers. Args: worker_name: Short name used in metric names, such as ``"embedding"``. valid_job_types: Whitelist of known job type labels for normalization. """ executions = Counter( f"lalandre_{worker_name}_worker_job_executions_total", f"{worker_name.title()} worker job executions by type and outcome.", ["job_type", "outcome"], ) duration = Histogram( f"lalandre_{worker_name}_worker_job_duration_seconds", f"{worker_name.title()} worker job duration by type and outcome.", ["job_type", "outcome"], buckets=LATENCY_BUCKETS, ) errors = Counter( f"lalandre_{worker_name}_worker_job_errors_total", f"{worker_name.title()} worker job failures by type and error class.", ["job_type", "error_type"], ) always_valid = {"invalid_payload", "unknown_job_type"} all_valid = valid_job_types | always_valid known_outcomes = {"success", "failed", "invalid_payload", "unknown_job_type"} # Pre-initialize the primary job_type x outcome series so idle workers still # expose zero-valued time series right after startup. for job_type in sorted(all_valid): for outcome in sorted(known_outcomes): executions.labels(job_type=job_type, outcome=outcome) duration.labels(job_type=job_type, outcome=outcome) def _normalize_job_type(job_type: str) -> str: normalized = normalize_label(job_type) return normalized if normalized in all_valid else "unknown" def observe_execution(*, job_type: str, outcome: str, duration_seconds: float) -> None: njt = _normalize_job_type(job_type) no = normalize_label(outcome) executions.labels(job_type=njt, outcome=no).inc() duration.labels(job_type=njt, outcome=no).observe(max(float(duration_seconds), 0.0)) def observe_error(*, job_type: str, exc_or_reason: Any) -> None: _, error_type = classify_error(exc_or_reason) errors.labels(job_type=_normalize_job_type(job_type), error_type=error_type).inc() return WorkerMetrics( observe_execution=observe_execution, observe_error=observe_error, )