#!/usr/bin/env python3
"""Evaluate RAG quality against the running rag-service with RAGAS.
Input data can be JSON or JSONL. Each record contains a ``query`` and may also
define ``ground_truth``, ``mode``, ``top_k``, and ``min_score`` overrides.
Reported metrics include ``faithfulness``, ``answer_relevancy``,
``context_precision``, and ``context_recall`` when ground-truth answers are
available.
"""
from __future__ import annotations
import argparse
import json
import os
import time
import uuid
from dataclasses import dataclass
from importlib import import_module
from pathlib import Path
from typing import Any, Dict, List, Mapping, Optional, Sequence, cast
import requests
from bench_utils import build_ragas_embeddings as _build_ragas_embeddings
from bench_utils import percentile as _percentile
from bench_utils import safe_mean as _safe_mean
JsonObject = Dict[str, object]
AVAILABLE_METRICS = ("faithfulness", "answer_relevancy", "context_precision", "context_recall")
def _as_json_object(value: object) -> Optional[JsonObject]:
if not isinstance(value, dict):
return None
result: JsonObject = {}
for key, item in value.items():
if isinstance(key, str):
result[key] = item
return result
def _as_json_list(value: object) -> Optional[List[object]]:
if not isinstance(value, list):
return None
return cast(List[object], value)
[docs]
@dataclass(frozen=True)
class QAExample:
"""One QA evaluation example loaded from the dataset."""
query: str
ground_truth: Optional[str]
mode: Optional[str]
top_k: Optional[int]
min_score: Optional[float]
def _as_optional_float(value: Any) -> Optional[float]:
if value is None or value == "":
return None
try:
return float(value)
except (TypeError, ValueError):
return None
def _as_optional_int(value: Any) -> Optional[int]:
if value is None or value == "":
return None
try:
parsed = int(value)
except (TypeError, ValueError):
return None
return parsed if parsed > 0 else None
def _parse_example(raw: Mapping[str, object], *, index: int) -> QAExample:
query = str(raw.get("query") or raw.get("question") or "").strip()
if not query:
raise ValueError(f"Dataset item #{index} is missing a non-empty 'query' field")
ground_truth_raw = raw.get("ground_truth")
if ground_truth_raw is None:
ground_truth_raw = raw.get("reference")
ground_truth = str(ground_truth_raw).strip() if ground_truth_raw is not None else None
if ground_truth == "":
ground_truth = None
mode_raw = raw.get("mode")
mode = str(mode_raw).strip() if mode_raw is not None else None
if mode == "":
mode = None
return QAExample(
query=query,
ground_truth=ground_truth,
mode=mode,
top_k=_as_optional_int(raw.get("top_k")),
min_score=_as_optional_float(raw.get("min_score")),
)
[docs]
def load_dataset(dataset_path: Path) -> List[QAExample]:
"""Load QA examples from a JSON or JSONL dataset file."""
if not dataset_path.exists():
raise FileNotFoundError(f"Dataset not found: {dataset_path}")
suffix = dataset_path.suffix.lower()
if suffix == ".jsonl":
examples: list[QAExample] = []
with dataset_path.open("r", encoding="utf-8") as handle:
for index, line in enumerate(handle, start=1):
stripped = line.strip()
if not stripped:
continue
parsed_raw = cast(object, json.loads(stripped))
parsed = _as_json_object(parsed_raw)
if parsed is None:
raise ValueError(f"Invalid JSONL item at line {index}: expected object")
examples.append(_parse_example(parsed, index=index))
return examples
parsed_root = cast(object, json.loads(dataset_path.read_text(encoding="utf-8")))
parsed = _as_json_list(parsed_root)
if parsed is None:
raise ValueError("JSON dataset must be a list of objects")
examples: list[QAExample] = []
for index, raw_item in enumerate(parsed, start=1):
item = _as_json_object(raw_item)
if item is None:
raise ValueError(f"Invalid dataset item #{index}: expected object")
examples.append(_parse_example(item, index=index))
return examples
def _extract_contexts(
response_payload: Mapping[str, object],
*,
max_contexts: int,
) -> tuple[tuple[str, ...], int]:
sources_raw = _as_json_object(response_payload.get("sources"))
if sources_raw is None:
return tuple(), 0
documents_raw = _as_json_list(sources_raw.get("documents"))
if documents_raw is None:
return tuple(), 0
contexts: list[str] = []
seen: set[str] = set()
source_count = 0
for raw_doc in documents_raw:
document = _as_json_object(raw_doc)
if document is None:
continue
source_count += 1
content_raw = document.get("content")
if not isinstance(content_raw, str) or not content_raw.strip():
preview_raw = document.get("content_preview")
content = str(preview_raw).strip() if preview_raw is not None else ""
else:
content = content_raw.strip()
if not content:
continue
if content in seen:
continue
seen.add(content)
contexts.append(content)
if len(contexts) >= max_contexts:
break
return tuple(contexts), source_count
def _query_rag_service(
*,
base_url: str,
query_id: str,
question: str,
mode: str,
top_k: int,
min_score: Optional[float],
timeout_seconds: float,
include_full_content: bool,
) -> tuple[JsonObject, float]:
endpoint = base_url.rstrip("/") + "/query"
payload: JsonObject = {
"query_id": query_id,
"question": question,
"mode": mode,
"top_k": top_k,
"sources": True,
"include_full_content": include_full_content,
}
if min_score is not None:
payload["min_score"] = min_score
t0 = time.perf_counter()
response = requests.post(endpoint, json=payload, timeout=timeout_seconds)
latency_ms = (time.perf_counter() - t0) * 1000.0
response.raise_for_status()
parsed_raw = cast(object, response.json())
parsed = _as_json_object(parsed_raw)
if parsed is None:
raise ValueError("Unexpected /query response: expected object")
return parsed, latency_ms
[docs]
def collect_samples(
*,
examples: Sequence[QAExample],
base_url: str,
default_mode: str,
default_top_k: int,
default_min_score: Optional[float],
timeout_seconds: float,
max_contexts: int,
include_full_content: bool,
) -> List[RagasInputRow]:
"""Query the RAG service and normalize responses for evaluation."""
rows: list[RagasInputRow] = []
for index, example in enumerate(examples, start=1):
query_id = f"ragas-{index}-{uuid.uuid4().hex[:8]}"
mode = example.mode if example.mode else default_mode
top_k = example.top_k if example.top_k is not None else default_top_k
min_score = example.min_score if example.min_score is not None else default_min_score
response_payload, latency_ms = _query_rag_service(
base_url=base_url,
query_id=query_id,
question=example.query,
mode=mode,
top_k=max(top_k, 1),
min_score=min_score,
timeout_seconds=timeout_seconds,
include_full_content=include_full_content,
)
answer_raw = response_payload.get("answer")
answer = str(answer_raw).strip() if answer_raw is not None else ""
contexts, source_count = _extract_contexts(response_payload, max_contexts=max_contexts)
rows.append(
RagasInputRow(
query=example.query,
answer=answer,
contexts=contexts,
ground_truth=example.ground_truth,
mode=mode,
top_k=top_k,
min_score=min_score,
num_sources=source_count,
query_id=query_id,
latency_ms=latency_ms,
)
)
return rows
def _build_judge_llm(
*,
provider: str,
model: str,
base_url: str,
api_key: Optional[str],
timeout_seconds: float,
temperature: float,
) -> Any:
try:
ragas_llms = import_module("ragas.llms")
except Exception as exc:
raise RuntimeError("Ragas dependency missing. Install with: poetry add ragas datasets") from exc
wrapper_factory = cast(Any, getattr(ragas_llms, "LangchainLLMWrapper", None))
if wrapper_factory is None:
raise RuntimeError("ragas.llms.LangchainLLMWrapper not available")
from lalandre_core.llm import build_chat_model, normalize_provider
# Map legacy "openai_compat" alias used by CLI
resolved_provider = normalize_provider(provider)
if provider == "openai_compat":
resolved_provider = "openai_compatible"
llm = build_chat_model(
provider=resolved_provider,
model=model,
api_key=api_key or "dummy",
base_url=base_url,
temperature=temperature,
timeout_seconds=timeout_seconds,
)
return wrapper_factory(llm)
def _validate_runtime_dependencies(judge_provider: str) -> None:
try:
import_module("ragas")
import_module("datasets")
except Exception as exc:
raise RuntimeError("Ragas dependencies missing. Install with: poetry add ragas datasets") from exc
if judge_provider == "openai_compat":
try:
import_module("langchain_openai")
except Exception as exc:
raise RuntimeError(
"provider=openai_compat requires langchain-openai. Install with: poetry add langchain-openai"
) from exc
def _build_ragas_dataset(rows: Sequence[RagasInputRow]) -> Any:
try:
ragas_module = import_module("ragas")
evaluation_dataset_cls = cast(
Any,
getattr(ragas_module, "EvaluationDataset", None),
)
except Exception:
evaluation_dataset_cls = None
if evaluation_dataset_cls is not None:
records: list[dict[str, Any]] = []
for row in rows:
record: dict[str, Any] = {
"user_input": row.query,
"response": row.answer,
"retrieved_contexts": list(row.contexts),
}
if row.ground_truth:
record["reference"] = row.ground_truth
records.append(record)
return evaluation_dataset_cls.from_list(records)
try:
datasets_module = import_module("datasets")
dataset_cls = cast(Any, getattr(datasets_module, "Dataset", None))
except Exception as exc:
raise RuntimeError("Ragas dataset dependency missing. Install with: poetry add ragas datasets") from exc
if dataset_cls is None:
raise RuntimeError("datasets.Dataset not available")
payload: dict[str, Any] = {
"question": [row.query for row in rows],
"answer": [row.answer for row in rows],
"contexts": [list(row.contexts) for row in rows],
}
if any(row.ground_truth for row in rows):
payload["ground_truths"] = [row.ground_truth or "" for row in rows]
return dataset_cls.from_dict(payload)
def _resolve_metrics(
requested: Sequence[str],
has_ground_truth: bool,
) -> tuple[list[Any], list[str]]:
"""Resolve RAGAS metric objects from names."""
try:
metrics_module = import_module("ragas.metrics")
except Exception as exc:
raise RuntimeError("ragas.metrics not available") from exc
resolved: list[Any] = []
names: list[str] = []
for name in requested:
if name == "context_recall" and not has_ground_truth:
continue
metric_obj = getattr(metrics_module, name, None)
if metric_obj is not None:
resolved.append(metric_obj)
names.append(name)
return resolved, names
def _as_records(result: Any) -> List[Dict[str, Any]]:
if hasattr(result, "to_pandas"):
try:
pandas_df = result.to_pandas()
as_dict_raw = cast(object, pandas_df.to_dict(orient="records"))
as_dict = _as_json_list(as_dict_raw)
if as_dict is not None:
records: list[Dict[str, Any]] = []
for raw_item in as_dict:
item = _as_json_object(raw_item)
if item is not None:
records.append(cast(Dict[str, Any], item))
return records
except Exception:
pass
scores = _as_json_list(cast(object, getattr(result, "scores", None)))
if scores is not None:
records: list[Dict[str, Any]] = []
for raw_item in scores:
item = _as_json_object(raw_item)
if item is not None:
records.append(cast(Dict[str, Any], item))
return records
return []
def _extract_metric_score(record: Dict[str, Any], metric_name: str) -> Optional[float]:
candidate_keys = (metric_name, f"{metric_name}_score")
for key in candidate_keys:
value = record.get(key)
if value is None:
continue
try:
parsed = float(value)
except (TypeError, ValueError):
continue
return parsed
return None
[docs]
def run_ragas_evaluation(
*,
rows: Sequence[RagasInputRow],
judge_provider: str,
judge_model: str,
judge_base_url: str,
judge_api_key: Optional[str],
judge_timeout_seconds: float,
judge_temperature: float,
metric_names: Sequence[str] = AVAILABLE_METRICS,
faithfulness_threshold: float = 0.8,
embeddings: Any = None,
) -> Dict[str, Any]:
"""Run RAGAS scoring for the collected rows and summarize the results."""
try:
ragas_module = import_module("ragas")
except Exception as exc:
raise RuntimeError("Ragas not installed. Install with: poetry add ragas datasets") from exc
evaluate_fn = cast(Any, getattr(ragas_module, "evaluate", None))
if evaluate_fn is None:
raise RuntimeError("Ragas evaluate API not available")
has_ground_truth = any(row.ground_truth for row in rows)
metrics, resolved_names = _resolve_metrics(metric_names, has_ground_truth)
if not metrics:
raise RuntimeError("No RAGAS metrics could be resolved")
judge_llm = _build_judge_llm(
provider=judge_provider,
model=judge_model,
base_url=judge_base_url,
api_key=judge_api_key,
timeout_seconds=judge_timeout_seconds,
temperature=judge_temperature,
)
dataset = _build_ragas_dataset(rows)
eval_kwargs: Dict[str, Any] = dict(
dataset=dataset,
metrics=metrics,
llm=judge_llm,
)
if embeddings is not None:
eval_kwargs["embeddings"] = embeddings
try:
result = evaluate_fn(**eval_kwargs)
except TypeError:
result = evaluate_fn(dataset, metrics, llm=judge_llm)
ragas_records = _as_records(result)
# Collect per-metric scores
metric_values: dict[str, list[float]] = {name: [] for name in resolved_names}
details: list[dict[str, Any]] = []
for index, row in enumerate(rows):
metric_record = ragas_records[index] if index < len(ragas_records) else {}
detail: dict[str, Any] = {
"query_id": row.query_id,
"query": row.query,
"mode": row.mode,
"top_k": row.top_k,
"min_score": row.min_score,
"answer_chars": len(row.answer),
"contexts_count": len(row.contexts),
"sources_count": row.num_sources,
"latency_ms": round(row.latency_ms, 1),
}
for metric_name in resolved_names:
score = _extract_metric_score(metric_record, metric_name)
detail[metric_name] = score
if score is not None:
metric_values[metric_name].append(score)
# Faithfulness threshold flag
faith_score = detail.get("faithfulness")
if faith_score is not None:
detail["below_threshold"] = bool(faith_score < faithfulness_threshold)
details.append(detail)
# Build summary
latencies = [row.latency_ms for row in rows]
summary: Dict[str, Any] = {
"examples_total": len(rows),
"examples_with_contexts": sum(1 for row in rows if row.contexts),
"avg_contexts_per_example": _safe_mean([float(len(row.contexts)) for row in rows]),
"avg_sources_per_example": _safe_mean([float(row.num_sources) for row in rows]),
"avg_answer_chars": _safe_mean([float(len(row.answer)) for row in rows]),
"latency_p50_ms": round(_percentile(latencies, 50), 1),
"latency_p95_ms": round(_percentile(latencies, 95), 1),
"latency_p99_ms": round(_percentile(latencies, 99), 1),
"metrics_evaluated": resolved_names,
"judge_provider": judge_provider,
"judge_model": judge_model,
"judge_temperature": judge_temperature,
"faithfulness_threshold": faithfulness_threshold,
}
for metric_name in resolved_names:
values = metric_values[metric_name]
summary[f"{metric_name}_mean"] = _safe_mean(values)
summary[f"{metric_name}_scored_count"] = len(values)
if "faithfulness" in metric_values:
faith_vals = metric_values["faithfulness"]
below = [s for s in faith_vals if s < faithfulness_threshold]
summary["faithfulness_below_threshold_count"] = len(below)
summary["faithfulness_below_threshold_ratio"] = (
float(len(below)) / float(len(faith_vals)) if faith_vals else 0.0
)
return {"summary": summary, "details": details}
[docs]
def collect_simple_metrics(rows: Sequence[RagasInputRow]) -> Dict[str, Any]:
"""Fallback metrics when RAGAS is not installed."""
latencies = [row.latency_ms for row in rows]
details: list[dict[str, Any]] = []
for row in rows:
details.append(
{
"query_id": row.query_id,
"query": row.query,
"mode": row.mode,
"top_k": row.top_k,
"answer_chars": len(row.answer),
"contexts_count": len(row.contexts),
"sources_count": row.num_sources,
"latency_ms": round(row.latency_ms, 1),
}
)
summary: Dict[str, Any] = {
"examples_total": len(rows),
"examples_with_contexts": sum(1 for row in rows if row.contexts),
"avg_contexts_per_example": _safe_mean([float(len(row.contexts)) for row in rows]),
"avg_sources_per_example": _safe_mean([float(row.num_sources) for row in rows]),
"avg_answer_chars": _safe_mean([float(len(row.answer)) for row in rows]),
"latency_p50_ms": round(_percentile(latencies, 50), 1),
"latency_p95_ms": round(_percentile(latencies, 95), 1),
"latency_p99_ms": round(_percentile(latencies, 99), 1),
"metrics_evaluated": [],
"ragas_available": False,
}
return {"summary": summary, "details": details}
[docs]
def parse_args() -> argparse.Namespace:
"""Parse CLI arguments for the RAGAS evaluation runner."""
parser = argparse.ArgumentParser(description="Evaluate /query quality with Ragas")
parser.add_argument("--dataset", required=True, help="Path to JSON or JSONL dataset")
parser.add_argument("--base-url", default="http://localhost:8001", help="rag-service base URL")
parser.add_argument("--mode", default="rag", choices=["rag", "graph"], help="Default query mode")
parser.add_argument("--top-k", type=int, default=12, help="Default top_k if absent in dataset item")
parser.add_argument("--min-score", type=float, default=None, help="Default min_score if absent in dataset")
parser.add_argument("--timeout", type=float, default=45.0, help="HTTP timeout for rag-service calls")
parser.add_argument("--max-contexts", type=int, default=12, help="Max contexts kept per example")
parser.add_argument(
"--include-full-content",
action="store_true",
default=False,
help="Request full source content from /query (recommended for faithfulness)",
)
parser.add_argument(
"--metrics",
default="faithfulness,answer_relevancy,context_precision,context_recall",
help="Comma-separated RAGAS metrics to evaluate",
)
parser.add_argument(
"--judge-provider",
default="mistral",
choices=["mistral", "openai_compat"],
help="Judge LLM provider used by Ragas",
)
parser.add_argument(
"--judge-model",
default="mistral-small-latest",
help="Judge LLM model name",
)
parser.add_argument(
"--judge-base-url",
default=os.getenv("RAGAS_JUDGE_BASE_URL", "https://api.mistral.ai/v1"),
help="Judge endpoint base URL",
)
parser.add_argument(
"--judge-api-key",
default=os.getenv("RAGAS_JUDGE_API_KEY", ""),
help="Judge API key (required for mistral/openai_compat)",
)
parser.add_argument("--judge-timeout", type=float, default=120.0, help="Judge request timeout")
parser.add_argument("--judge-temperature", type=float, default=0.0, help="Judge generation temperature")
parser.add_argument(
"--faithfulness-threshold",
type=float,
default=0.8,
help="Threshold used to flag likely hallucinations",
)
parser.add_argument("--output", default="", help="Optional path to write JSON report")
return parser.parse_args()
[docs]
def main() -> int:
"""Run the RAGAS CLI workflow and print the JSON report."""
args = parse_args()
_validate_runtime_dependencies(args.judge_provider)
dataset_path = Path(args.dataset).resolve()
examples = load_dataset(dataset_path)
if not examples:
raise ValueError("Dataset is empty after parsing")
rows = collect_samples(
examples=examples,
base_url=args.base_url,
default_mode=args.mode,
default_top_k=max(args.top_k, 1),
default_min_score=args.min_score,
timeout_seconds=max(args.timeout, 1.0),
max_contexts=max(args.max_contexts, 1),
include_full_content=bool(args.include_full_content),
)
metric_names = [m.strip() for m in args.metrics.split(",") if m.strip()]
judge_api_key_resolved = str(args.judge_api_key).strip() or None
ragas_embeddings = _build_ragas_embeddings(
provider=args.judge_provider,
api_key=judge_api_key_resolved,
)
report = run_ragas_evaluation(
rows=rows,
judge_provider=args.judge_provider,
judge_model=args.judge_model,
judge_base_url=args.judge_base_url,
judge_api_key=judge_api_key_resolved,
judge_timeout_seconds=max(args.judge_timeout, 1.0),
judge_temperature=float(args.judge_temperature),
metric_names=metric_names,
faithfulness_threshold=min(max(float(args.faithfulness_threshold), 0.0), 1.0),
embeddings=ragas_embeddings,
)
output_json = json.dumps(report, indent=2, ensure_ascii=False)
print(output_json)
if args.output:
output_path = Path(args.output).resolve()
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(output_json + "\n", encoding="utf-8")
return 0
if __name__ == "__main__":
raise SystemExit(main())