Source code for scripts.eval_ragas

#!/usr/bin/env python3
"""Evaluate RAG quality against the running rag-service with RAGAS.

Input data can be JSON or JSONL. Each record contains a ``query`` and may also
define ``ground_truth``, ``mode``, ``top_k``, and ``min_score`` overrides.

Reported metrics include ``faithfulness``, ``answer_relevancy``,
``context_precision``, and ``context_recall`` when ground-truth answers are
available.
"""

from __future__ import annotations

import argparse
import json
import os
import time
import uuid
from dataclasses import dataclass
from importlib import import_module
from pathlib import Path
from typing import Any, Dict, List, Mapping, Optional, Sequence, cast

import requests
from bench_utils import build_ragas_embeddings as _build_ragas_embeddings
from bench_utils import percentile as _percentile
from bench_utils import safe_mean as _safe_mean

JsonObject = Dict[str, object]

AVAILABLE_METRICS = ("faithfulness", "answer_relevancy", "context_precision", "context_recall")


def _as_json_object(value: object) -> Optional[JsonObject]:
    if not isinstance(value, dict):
        return None
    result: JsonObject = {}
    for key, item in value.items():
        if isinstance(key, str):
            result[key] = item
    return result


def _as_json_list(value: object) -> Optional[List[object]]:
    if not isinstance(value, list):
        return None
    return cast(List[object], value)


[docs] @dataclass(frozen=True) class QAExample: """One QA evaluation example loaded from the dataset.""" query: str ground_truth: Optional[str] mode: Optional[str] top_k: Optional[int] min_score: Optional[float]
[docs] @dataclass(frozen=True) class RagasInputRow: """Normalized row sent to the RAGAS evaluation pipeline.""" query: str answer: str contexts: tuple[str, ...] ground_truth: Optional[str] mode: str top_k: int min_score: Optional[float] num_sources: int query_id: str latency_ms: float
def _as_optional_float(value: Any) -> Optional[float]: if value is None or value == "": return None try: return float(value) except (TypeError, ValueError): return None def _as_optional_int(value: Any) -> Optional[int]: if value is None or value == "": return None try: parsed = int(value) except (TypeError, ValueError): return None return parsed if parsed > 0 else None def _parse_example(raw: Mapping[str, object], *, index: int) -> QAExample: query = str(raw.get("query") or raw.get("question") or "").strip() if not query: raise ValueError(f"Dataset item #{index} is missing a non-empty 'query' field") ground_truth_raw = raw.get("ground_truth") if ground_truth_raw is None: ground_truth_raw = raw.get("reference") ground_truth = str(ground_truth_raw).strip() if ground_truth_raw is not None else None if ground_truth == "": ground_truth = None mode_raw = raw.get("mode") mode = str(mode_raw).strip() if mode_raw is not None else None if mode == "": mode = None return QAExample( query=query, ground_truth=ground_truth, mode=mode, top_k=_as_optional_int(raw.get("top_k")), min_score=_as_optional_float(raw.get("min_score")), )
[docs] def load_dataset(dataset_path: Path) -> List[QAExample]: """Load QA examples from a JSON or JSONL dataset file.""" if not dataset_path.exists(): raise FileNotFoundError(f"Dataset not found: {dataset_path}") suffix = dataset_path.suffix.lower() if suffix == ".jsonl": examples: list[QAExample] = [] with dataset_path.open("r", encoding="utf-8") as handle: for index, line in enumerate(handle, start=1): stripped = line.strip() if not stripped: continue parsed_raw = cast(object, json.loads(stripped)) parsed = _as_json_object(parsed_raw) if parsed is None: raise ValueError(f"Invalid JSONL item at line {index}: expected object") examples.append(_parse_example(parsed, index=index)) return examples parsed_root = cast(object, json.loads(dataset_path.read_text(encoding="utf-8"))) parsed = _as_json_list(parsed_root) if parsed is None: raise ValueError("JSON dataset must be a list of objects") examples: list[QAExample] = [] for index, raw_item in enumerate(parsed, start=1): item = _as_json_object(raw_item) if item is None: raise ValueError(f"Invalid dataset item #{index}: expected object") examples.append(_parse_example(item, index=index)) return examples
def _extract_contexts( response_payload: Mapping[str, object], *, max_contexts: int, ) -> tuple[tuple[str, ...], int]: sources_raw = _as_json_object(response_payload.get("sources")) if sources_raw is None: return tuple(), 0 documents_raw = _as_json_list(sources_raw.get("documents")) if documents_raw is None: return tuple(), 0 contexts: list[str] = [] seen: set[str] = set() source_count = 0 for raw_doc in documents_raw: document = _as_json_object(raw_doc) if document is None: continue source_count += 1 content_raw = document.get("content") if not isinstance(content_raw, str) or not content_raw.strip(): preview_raw = document.get("content_preview") content = str(preview_raw).strip() if preview_raw is not None else "" else: content = content_raw.strip() if not content: continue if content in seen: continue seen.add(content) contexts.append(content) if len(contexts) >= max_contexts: break return tuple(contexts), source_count def _query_rag_service( *, base_url: str, query_id: str, question: str, mode: str, top_k: int, min_score: Optional[float], timeout_seconds: float, include_full_content: bool, ) -> tuple[JsonObject, float]: endpoint = base_url.rstrip("/") + "/query" payload: JsonObject = { "query_id": query_id, "question": question, "mode": mode, "top_k": top_k, "sources": True, "include_full_content": include_full_content, } if min_score is not None: payload["min_score"] = min_score t0 = time.perf_counter() response = requests.post(endpoint, json=payload, timeout=timeout_seconds) latency_ms = (time.perf_counter() - t0) * 1000.0 response.raise_for_status() parsed_raw = cast(object, response.json()) parsed = _as_json_object(parsed_raw) if parsed is None: raise ValueError("Unexpected /query response: expected object") return parsed, latency_ms
[docs] def collect_samples( *, examples: Sequence[QAExample], base_url: str, default_mode: str, default_top_k: int, default_min_score: Optional[float], timeout_seconds: float, max_contexts: int, include_full_content: bool, ) -> List[RagasInputRow]: """Query the RAG service and normalize responses for evaluation.""" rows: list[RagasInputRow] = [] for index, example in enumerate(examples, start=1): query_id = f"ragas-{index}-{uuid.uuid4().hex[:8]}" mode = example.mode if example.mode else default_mode top_k = example.top_k if example.top_k is not None else default_top_k min_score = example.min_score if example.min_score is not None else default_min_score response_payload, latency_ms = _query_rag_service( base_url=base_url, query_id=query_id, question=example.query, mode=mode, top_k=max(top_k, 1), min_score=min_score, timeout_seconds=timeout_seconds, include_full_content=include_full_content, ) answer_raw = response_payload.get("answer") answer = str(answer_raw).strip() if answer_raw is not None else "" contexts, source_count = _extract_contexts(response_payload, max_contexts=max_contexts) rows.append( RagasInputRow( query=example.query, answer=answer, contexts=contexts, ground_truth=example.ground_truth, mode=mode, top_k=top_k, min_score=min_score, num_sources=source_count, query_id=query_id, latency_ms=latency_ms, ) ) return rows
def _build_judge_llm( *, provider: str, model: str, base_url: str, api_key: Optional[str], timeout_seconds: float, temperature: float, ) -> Any: try: ragas_llms = import_module("ragas.llms") except Exception as exc: raise RuntimeError("Ragas dependency missing. Install with: poetry add ragas datasets") from exc wrapper_factory = cast(Any, getattr(ragas_llms, "LangchainLLMWrapper", None)) if wrapper_factory is None: raise RuntimeError("ragas.llms.LangchainLLMWrapper not available") from lalandre_core.llm import build_chat_model, normalize_provider # Map legacy "openai_compat" alias used by CLI resolved_provider = normalize_provider(provider) if provider == "openai_compat": resolved_provider = "openai_compatible" llm = build_chat_model( provider=resolved_provider, model=model, api_key=api_key or "dummy", base_url=base_url, temperature=temperature, timeout_seconds=timeout_seconds, ) return wrapper_factory(llm) def _validate_runtime_dependencies(judge_provider: str) -> None: try: import_module("ragas") import_module("datasets") except Exception as exc: raise RuntimeError("Ragas dependencies missing. Install with: poetry add ragas datasets") from exc if judge_provider == "openai_compat": try: import_module("langchain_openai") except Exception as exc: raise RuntimeError( "provider=openai_compat requires langchain-openai. Install with: poetry add langchain-openai" ) from exc def _build_ragas_dataset(rows: Sequence[RagasInputRow]) -> Any: try: ragas_module = import_module("ragas") evaluation_dataset_cls = cast( Any, getattr(ragas_module, "EvaluationDataset", None), ) except Exception: evaluation_dataset_cls = None if evaluation_dataset_cls is not None: records: list[dict[str, Any]] = [] for row in rows: record: dict[str, Any] = { "user_input": row.query, "response": row.answer, "retrieved_contexts": list(row.contexts), } if row.ground_truth: record["reference"] = row.ground_truth records.append(record) return evaluation_dataset_cls.from_list(records) try: datasets_module = import_module("datasets") dataset_cls = cast(Any, getattr(datasets_module, "Dataset", None)) except Exception as exc: raise RuntimeError("Ragas dataset dependency missing. Install with: poetry add ragas datasets") from exc if dataset_cls is None: raise RuntimeError("datasets.Dataset not available") payload: dict[str, Any] = { "question": [row.query for row in rows], "answer": [row.answer for row in rows], "contexts": [list(row.contexts) for row in rows], } if any(row.ground_truth for row in rows): payload["ground_truths"] = [row.ground_truth or "" for row in rows] return dataset_cls.from_dict(payload) def _resolve_metrics( requested: Sequence[str], has_ground_truth: bool, ) -> tuple[list[Any], list[str]]: """Resolve RAGAS metric objects from names.""" try: metrics_module = import_module("ragas.metrics") except Exception as exc: raise RuntimeError("ragas.metrics not available") from exc resolved: list[Any] = [] names: list[str] = [] for name in requested: if name == "context_recall" and not has_ground_truth: continue metric_obj = getattr(metrics_module, name, None) if metric_obj is not None: resolved.append(metric_obj) names.append(name) return resolved, names def _as_records(result: Any) -> List[Dict[str, Any]]: if hasattr(result, "to_pandas"): try: pandas_df = result.to_pandas() as_dict_raw = cast(object, pandas_df.to_dict(orient="records")) as_dict = _as_json_list(as_dict_raw) if as_dict is not None: records: list[Dict[str, Any]] = [] for raw_item in as_dict: item = _as_json_object(raw_item) if item is not None: records.append(cast(Dict[str, Any], item)) return records except Exception: pass scores = _as_json_list(cast(object, getattr(result, "scores", None))) if scores is not None: records: list[Dict[str, Any]] = [] for raw_item in scores: item = _as_json_object(raw_item) if item is not None: records.append(cast(Dict[str, Any], item)) return records return [] def _extract_metric_score(record: Dict[str, Any], metric_name: str) -> Optional[float]: candidate_keys = (metric_name, f"{metric_name}_score") for key in candidate_keys: value = record.get(key) if value is None: continue try: parsed = float(value) except (TypeError, ValueError): continue return parsed return None
[docs] def run_ragas_evaluation( *, rows: Sequence[RagasInputRow], judge_provider: str, judge_model: str, judge_base_url: str, judge_api_key: Optional[str], judge_timeout_seconds: float, judge_temperature: float, metric_names: Sequence[str] = AVAILABLE_METRICS, faithfulness_threshold: float = 0.8, embeddings: Any = None, ) -> Dict[str, Any]: """Run RAGAS scoring for the collected rows and summarize the results.""" try: ragas_module = import_module("ragas") except Exception as exc: raise RuntimeError("Ragas not installed. Install with: poetry add ragas datasets") from exc evaluate_fn = cast(Any, getattr(ragas_module, "evaluate", None)) if evaluate_fn is None: raise RuntimeError("Ragas evaluate API not available") has_ground_truth = any(row.ground_truth for row in rows) metrics, resolved_names = _resolve_metrics(metric_names, has_ground_truth) if not metrics: raise RuntimeError("No RAGAS metrics could be resolved") judge_llm = _build_judge_llm( provider=judge_provider, model=judge_model, base_url=judge_base_url, api_key=judge_api_key, timeout_seconds=judge_timeout_seconds, temperature=judge_temperature, ) dataset = _build_ragas_dataset(rows) eval_kwargs: Dict[str, Any] = dict( dataset=dataset, metrics=metrics, llm=judge_llm, ) if embeddings is not None: eval_kwargs["embeddings"] = embeddings try: result = evaluate_fn(**eval_kwargs) except TypeError: result = evaluate_fn(dataset, metrics, llm=judge_llm) ragas_records = _as_records(result) # Collect per-metric scores metric_values: dict[str, list[float]] = {name: [] for name in resolved_names} details: list[dict[str, Any]] = [] for index, row in enumerate(rows): metric_record = ragas_records[index] if index < len(ragas_records) else {} detail: dict[str, Any] = { "query_id": row.query_id, "query": row.query, "mode": row.mode, "top_k": row.top_k, "min_score": row.min_score, "answer_chars": len(row.answer), "contexts_count": len(row.contexts), "sources_count": row.num_sources, "latency_ms": round(row.latency_ms, 1), } for metric_name in resolved_names: score = _extract_metric_score(metric_record, metric_name) detail[metric_name] = score if score is not None: metric_values[metric_name].append(score) # Faithfulness threshold flag faith_score = detail.get("faithfulness") if faith_score is not None: detail["below_threshold"] = bool(faith_score < faithfulness_threshold) details.append(detail) # Build summary latencies = [row.latency_ms for row in rows] summary: Dict[str, Any] = { "examples_total": len(rows), "examples_with_contexts": sum(1 for row in rows if row.contexts), "avg_contexts_per_example": _safe_mean([float(len(row.contexts)) for row in rows]), "avg_sources_per_example": _safe_mean([float(row.num_sources) for row in rows]), "avg_answer_chars": _safe_mean([float(len(row.answer)) for row in rows]), "latency_p50_ms": round(_percentile(latencies, 50), 1), "latency_p95_ms": round(_percentile(latencies, 95), 1), "latency_p99_ms": round(_percentile(latencies, 99), 1), "metrics_evaluated": resolved_names, "judge_provider": judge_provider, "judge_model": judge_model, "judge_temperature": judge_temperature, "faithfulness_threshold": faithfulness_threshold, } for metric_name in resolved_names: values = metric_values[metric_name] summary[f"{metric_name}_mean"] = _safe_mean(values) summary[f"{metric_name}_scored_count"] = len(values) if "faithfulness" in metric_values: faith_vals = metric_values["faithfulness"] below = [s for s in faith_vals if s < faithfulness_threshold] summary["faithfulness_below_threshold_count"] = len(below) summary["faithfulness_below_threshold_ratio"] = ( float(len(below)) / float(len(faith_vals)) if faith_vals else 0.0 ) return {"summary": summary, "details": details}
[docs] def collect_simple_metrics(rows: Sequence[RagasInputRow]) -> Dict[str, Any]: """Fallback metrics when RAGAS is not installed.""" latencies = [row.latency_ms for row in rows] details: list[dict[str, Any]] = [] for row in rows: details.append( { "query_id": row.query_id, "query": row.query, "mode": row.mode, "top_k": row.top_k, "answer_chars": len(row.answer), "contexts_count": len(row.contexts), "sources_count": row.num_sources, "latency_ms": round(row.latency_ms, 1), } ) summary: Dict[str, Any] = { "examples_total": len(rows), "examples_with_contexts": sum(1 for row in rows if row.contexts), "avg_contexts_per_example": _safe_mean([float(len(row.contexts)) for row in rows]), "avg_sources_per_example": _safe_mean([float(row.num_sources) for row in rows]), "avg_answer_chars": _safe_mean([float(len(row.answer)) for row in rows]), "latency_p50_ms": round(_percentile(latencies, 50), 1), "latency_p95_ms": round(_percentile(latencies, 95), 1), "latency_p99_ms": round(_percentile(latencies, 99), 1), "metrics_evaluated": [], "ragas_available": False, } return {"summary": summary, "details": details}
[docs] def parse_args() -> argparse.Namespace: """Parse CLI arguments for the RAGAS evaluation runner.""" parser = argparse.ArgumentParser(description="Evaluate /query quality with Ragas") parser.add_argument("--dataset", required=True, help="Path to JSON or JSONL dataset") parser.add_argument("--base-url", default="http://localhost:8001", help="rag-service base URL") parser.add_argument("--mode", default="rag", choices=["rag", "graph"], help="Default query mode") parser.add_argument("--top-k", type=int, default=12, help="Default top_k if absent in dataset item") parser.add_argument("--min-score", type=float, default=None, help="Default min_score if absent in dataset") parser.add_argument("--timeout", type=float, default=45.0, help="HTTP timeout for rag-service calls") parser.add_argument("--max-contexts", type=int, default=12, help="Max contexts kept per example") parser.add_argument( "--include-full-content", action="store_true", default=False, help="Request full source content from /query (recommended for faithfulness)", ) parser.add_argument( "--metrics", default="faithfulness,answer_relevancy,context_precision,context_recall", help="Comma-separated RAGAS metrics to evaluate", ) parser.add_argument( "--judge-provider", default="mistral", choices=["mistral", "openai_compat"], help="Judge LLM provider used by Ragas", ) parser.add_argument( "--judge-model", default="mistral-small-latest", help="Judge LLM model name", ) parser.add_argument( "--judge-base-url", default=os.getenv("RAGAS_JUDGE_BASE_URL", "https://api.mistral.ai/v1"), help="Judge endpoint base URL", ) parser.add_argument( "--judge-api-key", default=os.getenv("RAGAS_JUDGE_API_KEY", ""), help="Judge API key (required for mistral/openai_compat)", ) parser.add_argument("--judge-timeout", type=float, default=120.0, help="Judge request timeout") parser.add_argument("--judge-temperature", type=float, default=0.0, help="Judge generation temperature") parser.add_argument( "--faithfulness-threshold", type=float, default=0.8, help="Threshold used to flag likely hallucinations", ) parser.add_argument("--output", default="", help="Optional path to write JSON report") return parser.parse_args()
[docs] def main() -> int: """Run the RAGAS CLI workflow and print the JSON report.""" args = parse_args() _validate_runtime_dependencies(args.judge_provider) dataset_path = Path(args.dataset).resolve() examples = load_dataset(dataset_path) if not examples: raise ValueError("Dataset is empty after parsing") rows = collect_samples( examples=examples, base_url=args.base_url, default_mode=args.mode, default_top_k=max(args.top_k, 1), default_min_score=args.min_score, timeout_seconds=max(args.timeout, 1.0), max_contexts=max(args.max_contexts, 1), include_full_content=bool(args.include_full_content), ) metric_names = [m.strip() for m in args.metrics.split(",") if m.strip()] judge_api_key_resolved = str(args.judge_api_key).strip() or None ragas_embeddings = _build_ragas_embeddings( provider=args.judge_provider, api_key=judge_api_key_resolved, ) report = run_ragas_evaluation( rows=rows, judge_provider=args.judge_provider, judge_model=args.judge_model, judge_base_url=args.judge_base_url, judge_api_key=judge_api_key_resolved, judge_timeout_seconds=max(args.judge_timeout, 1.0), judge_temperature=float(args.judge_temperature), metric_names=metric_names, faithfulness_threshold=min(max(float(args.faithfulness_threshold), 0.0), 1.0), embeddings=ragas_embeddings, ) output_json = json.dumps(report, indent=2, ensure_ascii=False) print(output_json) if args.output: output_path = Path(args.output).resolve() output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text(output_json + "\n", encoding="utf-8") return 0
if __name__ == "__main__": raise SystemExit(main())