Source code for extraction_worker.service_metrics

"""Prometheus extraction metrics recorder."""

from lalandre_core.queue.worker_metrics import build_worker_metrics
from lalandre_core.utils.metrics_utils import normalize_label as _normalize_label
from lalandre_extraction.metrics import ExtractionMetricsRecorder
from prometheus_client import Counter, Histogram

_job_metrics = build_worker_metrics(
    "extraction",
    {"extract_all", "extract_act", "summarize_all", "summarize_act", "build_communities"},
)
observe_job_execution = _job_metrics.observe_execution
observe_job_error = _job_metrics.observe_error

_LATENCY_BUCKETS = (
    0.005,
    0.01,
    0.025,
    0.05,
    0.1,
    0.25,
    0.5,
    1.0,
    2.5,
    5.0,
    10.0,
    30.0,
)

_EXTRACTED_RELATIONS_BUCKETS = (0, 1, 2, 3, 5, 8, 13, 21, 34)

EXTRACTION_LLM_CALL_DURATION_SECONDS = Histogram(
    "lalandre_extraction_llm_call_duration_seconds",
    "Time spent in extraction LLM calls.",
    ["provider", "model", "outcome"],
    buckets=_LATENCY_BUCKETS,
)

EXTRACTION_LLM_ERRORS_TOTAL = Counter(
    "lalandre_extraction_llm_errors_total",
    "Extraction LLM call errors by provider/model/type.",
    ["provider", "model", "error_type"],
)

EXTRACTION_LLM_JSON_PARSE_TOTAL = Counter(
    "lalandre_extraction_llm_json_parse_total",
    "Extraction LLM JSON parse mode counts.",
    ["provider", "model", "parse_mode"],
)

EXTRACTION_LLM_RELATIONS_PER_CALL = Histogram(
    "lalandre_extraction_llm_relations_per_call",
    "Number of parsed relations per successful extraction LLM call.",
    ["provider", "model"],
    buckets=_EXTRACTED_RELATIONS_BUCKETS,
)


[docs] class PrometheusExtractionMetricsRecorder(ExtractionMetricsRecorder): """Prometheus-backed recorder for extraction LLM metrics."""
[docs] def observe_call( self, *, provider: str, model: str, outcome: str, duration_seconds: float, ) -> None: """Record one extraction LLM call latency and outcome.""" EXTRACTION_LLM_CALL_DURATION_SECONDS.labels( provider=_normalize_label(provider), model=_normalize_label(model), outcome=_normalize_label(outcome), ).observe(max(float(duration_seconds), 0.0))
[docs] def observe_error( self, *, provider: str, model: str, error_type: str, ) -> None: """Record one extraction LLM error bucket.""" EXTRACTION_LLM_ERRORS_TOTAL.labels( provider=_normalize_label(provider), model=_normalize_label(model), error_type=_normalize_label(error_type), ).inc()
[docs] def observe_json_parse( self, *, provider: str, model: str, parse_mode: str, ) -> None: """Record how extraction output was parsed into JSON.""" EXTRACTION_LLM_JSON_PARSE_TOTAL.labels( provider=_normalize_label(provider), model=_normalize_label(model), parse_mode=_normalize_label(parse_mode), ).inc()
[docs] def observe_relations( self, *, provider: str, model: str, count: int, ) -> None: """Record the number of relations produced per extraction call.""" EXTRACTION_LLM_RELATIONS_PER_CALL.labels( provider=_normalize_label(provider), model=_normalize_label(model), ).observe(max(int(count), 0))