Source code for extraction_worker.service_metrics
"""Prometheus extraction metrics recorder."""
from lalandre_core.queue.worker_metrics import build_worker_metrics
from lalandre_core.utils.metrics_utils import normalize_label as _normalize_label
from lalandre_extraction.metrics import ExtractionMetricsRecorder
from prometheus_client import Counter, Histogram
_job_metrics = build_worker_metrics(
"extraction",
{"extract_all", "extract_act", "summarize_all", "summarize_act", "build_communities"},
)
observe_job_execution = _job_metrics.observe_execution
observe_job_error = _job_metrics.observe_error
_LATENCY_BUCKETS = (
0.005,
0.01,
0.025,
0.05,
0.1,
0.25,
0.5,
1.0,
2.5,
5.0,
10.0,
30.0,
)
_EXTRACTED_RELATIONS_BUCKETS = (0, 1, 2, 3, 5, 8, 13, 21, 34)
EXTRACTION_LLM_CALL_DURATION_SECONDS = Histogram(
"lalandre_extraction_llm_call_duration_seconds",
"Time spent in extraction LLM calls.",
["provider", "model", "outcome"],
buckets=_LATENCY_BUCKETS,
)
EXTRACTION_LLM_ERRORS_TOTAL = Counter(
"lalandre_extraction_llm_errors_total",
"Extraction LLM call errors by provider/model/type.",
["provider", "model", "error_type"],
)
EXTRACTION_LLM_JSON_PARSE_TOTAL = Counter(
"lalandre_extraction_llm_json_parse_total",
"Extraction LLM JSON parse mode counts.",
["provider", "model", "parse_mode"],
)
EXTRACTION_LLM_RELATIONS_PER_CALL = Histogram(
"lalandre_extraction_llm_relations_per_call",
"Number of parsed relations per successful extraction LLM call.",
["provider", "model"],
buckets=_EXTRACTED_RELATIONS_BUCKETS,
)
[docs]
class PrometheusExtractionMetricsRecorder(ExtractionMetricsRecorder):
"""Prometheus-backed recorder for extraction LLM metrics."""
[docs]
def observe_call(
self,
*,
provider: str,
model: str,
outcome: str,
duration_seconds: float,
) -> None:
"""Record one extraction LLM call latency and outcome."""
EXTRACTION_LLM_CALL_DURATION_SECONDS.labels(
provider=_normalize_label(provider),
model=_normalize_label(model),
outcome=_normalize_label(outcome),
).observe(max(float(duration_seconds), 0.0))
[docs]
def observe_error(
self,
*,
provider: str,
model: str,
error_type: str,
) -> None:
"""Record one extraction LLM error bucket."""
EXTRACTION_LLM_ERRORS_TOTAL.labels(
provider=_normalize_label(provider),
model=_normalize_label(model),
error_type=_normalize_label(error_type),
).inc()
[docs]
def observe_json_parse(
self,
*,
provider: str,
model: str,
parse_mode: str,
) -> None:
"""Record how extraction output was parsed into JSON."""
EXTRACTION_LLM_JSON_PARSE_TOTAL.labels(
provider=_normalize_label(provider),
model=_normalize_label(model),
parse_mode=_normalize_label(parse_mode),
).inc()
[docs]
def observe_relations(
self,
*,
provider: str,
model: str,
count: int,
) -> None:
"""Record the number of relations produced per extraction call."""
EXTRACTION_LLM_RELATIONS_PER_CALL.labels(
provider=_normalize_label(provider),
model=_normalize_label(model),
).observe(max(int(count), 0))