#!/usr/bin/env python3
"""
Generate a Markdown benchmark report with embedded charts from a JSON report.
Usage:
python scripts/report_generator.py benchmarks/2026-03-18_123000.json
python scripts/report_generator.py benchmarks/2026-03-18_123000.json --compare benchmarks/prev.json
"""
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any, Dict, List, Optional
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
from bench_utils import format_float as _fmt # noqa: E402
def _delta_str(old: float, new: float) -> str:
if old == 0.0 and new == 0.0:
return "="
if old == 0.0:
return f"+{new:.3f}"
pct = ((new - old) / abs(old)) * 100.0
if abs(pct) < 0.05:
return "="
sign = "+" if pct > 0 else ""
return f"{sign}{pct:.1f}%"
def _delta_arrow(old: float, new: float) -> str:
if old == 0.0 and new == 0.0:
return ""
diff = new - old
if abs(diff) < 0.001:
return ""
return "▲" if diff > 0 else "▼"
def _safe_get(d: Dict[str, Any], *keys: str, default: Any = 0.0) -> Any:
for key in keys:
val = d.get(key)
if val is not None:
return val
return default
# ---------------------------------------------------------------------------
# Header
# ---------------------------------------------------------------------------
def _render_header(report: Dict[str, Any]) -> str:
timestamp = report.get("timestamp", "N/A")
tag = report.get("tag", "")
base_url = report.get("base_url", "N/A")
lines = [
"# RAG Benchmark Report",
"",
"| **Date** | **Tag** | **Service** |",
"|----------|---------|-------------|",
f"| {timestamp} | {tag or '—'} | `{base_url}` |",
"",
]
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
def _render_config(report: Dict[str, Any]) -> str:
lines = ["## Configuration", ""]
retrieval = report.get("retrieval", {})
generation = report.get("generation", {})
if retrieval:
modes = list(retrieval.get("modes", {}).keys())
lines.append(f"- **Retrieval dataset**: `{retrieval.get('dataset', 'N/A')}`")
lines.append(f"- **Queries**: {retrieval.get('examples_total', '?')}")
lines.append(f"- **Top K**: {retrieval.get('top_k', '?')}")
lines.append(f"- **Modes**: {', '.join(modes)}")
if generation:
summary = generation.get("summary", {})
lines.append(f"- **QA dataset**: `{generation.get('dataset', 'N/A')}`")
lines.append(f"- **QA queries**: {summary.get('examples_total', '?')}")
metrics = summary.get("metrics_evaluated", [])
if metrics:
lines.append(f"- **RAGAS metrics**: {', '.join(metrics)}")
lines.append(f"- **Judge**: {summary.get('judge_provider', '?')} / {summary.get('judge_model', '?')}")
lines.append("")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Dataset info
# ---------------------------------------------------------------------------
def _render_dataset_info(report: Dict[str, Any]) -> str:
lines = ["## Dataset Samples", ""]
retrieval = report.get("retrieval", {})
if retrieval:
modes = retrieval.get("modes", {})
first_mode = next(iter(modes.values()), {}) if modes else {}
details = first_mode.get("details", [])
if details:
lines.append("### Retrieval queries (sample)")
lines.append("")
for detail in details[:3]:
query = detail.get("query", "?")
lines.append(f"- {query}")
if len(details) > 3:
lines.append(f"- ... ({len(details) - 3} more)")
lines.append("")
generation = report.get("generation", {})
if generation:
details = generation.get("details", [])
if details:
lines.append("### QA queries (sample)")
lines.append("")
for detail in details[:3]:
query = detail.get("query", "?")
lines.append(f"- {query}")
if len(details) > 3:
lines.append(f"- ... ({len(details) - 3} more)")
lines.append("")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Retrieval table
# ---------------------------------------------------------------------------
def _render_retrieval_table(retrieval: Dict[str, Any]) -> str:
modes_data = retrieval.get("modes", {})
if not modes_data:
return ""
top_k = retrieval.get("top_k", "K")
lines = [
"## Retrieval Metrics",
"",
f"| Mode | Hit@{top_k} | MRR | NDCG@{top_k} | P@{top_k} | p50 (ms) | p95 (ms) |",
"|------|---------|-----|----------|-------|----------|----------|",
]
for mode_name, mode_report in modes_data.items():
s = mode_report.get("summary", {})
lines.append(
f"| {mode_name} "
f"| {_fmt(s.get('celex_hit_at_k', 0))} "
f"| {_fmt(s.get('celex_mrr', 0))} "
f"| {_fmt(s.get('celex_ndcg_at_k', 0))} "
f"| {_fmt(s.get('celex_precision_at_k', 0))} "
f"| {s.get('latency_p50_ms', 0):.0f} "
f"| {s.get('latency_p95_ms', 0):.0f} |"
)
lines.append("")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Generation table
# ---------------------------------------------------------------------------
def _render_generation_table(generation: Dict[str, Any]) -> str:
summary = generation.get("summary", {})
if not summary:
return ""
metrics = summary.get("metrics_evaluated", [])
lines = [
"## Generation Metrics (RAGAS)",
"",
]
if not metrics:
lines.append("*RAGAS not available — simple metrics only.*")
lines.append("")
else:
lines.append("| Metric | Mean | Scored |")
lines.append("|--------|------|--------|")
for metric_name in metrics:
mean_val = summary.get(f"{metric_name}_mean", 0.0)
scored = summary.get(f"{metric_name}_scored_count", 0)
label = metric_name.replace("_", " ").title()
lines.append(f"| {label} | {_fmt(mean_val)} | {scored} |")
lines.append("")
if "faithfulness" in metrics:
threshold = summary.get("faithfulness_threshold", 0.8)
below_count = summary.get("faithfulness_below_threshold_count", 0)
below_ratio = summary.get("faithfulness_below_threshold_ratio", 0.0)
lines.append(f"Faithfulness threshold: **{threshold}** — {below_count} queries below ({below_ratio:.0%})")
lines.append("")
lines.append(f"- Avg answer length: {summary.get('avg_answer_chars', 0):.0f} chars")
lines.append(f"- Avg sources: {summary.get('avg_sources_per_example', 0):.1f}")
lines.append(
f"- Latency p50/p95: {summary.get('latency_p50_ms', 0):.0f}ms / {summary.get('latency_p95_ms', 0):.0f}ms"
)
lines.append("")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Worst performers
# ---------------------------------------------------------------------------
def _render_per_query_details(
retrieval: Optional[Dict[str, Any]],
generation: Optional[Dict[str, Any]],
worst_n: int = 5,
) -> str:
lines = ["## Worst Performers", ""]
if retrieval:
modes_data = retrieval.get("modes", {})
for mode_name, mode_report in modes_data.items():
details = mode_report.get("details", [])
missed = [d for d in details if d.get("celex_rank") is None]
if missed:
lines.append(f"### Retrieval misses — {mode_name} ({len(missed)} queries)")
lines.append("")
lines.append("| Query | Results | Latency |")
lines.append("|-------|---------|---------|")
for d in missed[:worst_n]:
query = d.get("query", "?")[:80]
results_count = d.get("results_count", 0)
latency = d.get("latency_ms", 0)
lines.append(f"| {query} | {results_count} | {latency:.0f}ms |")
lines.append("")
if generation:
details = generation.get("details", [])
flagged = [d for d in details if d.get("below_threshold")]
if flagged:
lines.append(f"### Low faithfulness ({len(flagged)} queries)")
lines.append("")
lines.append("| Query | Faithfulness | Contexts | Latency |")
lines.append("|-------|-------------|----------|---------|")
for d in sorted(flagged, key=lambda x: x.get("faithfulness", 0))[:worst_n]:
query = d.get("query", "?")[:80]
faith = d.get("faithfulness", 0)
ctx = d.get("contexts_count", 0)
latency = d.get("latency_ms", 0)
lines.append(f"| {query} | {_fmt(faith)} | {ctx} | {latency:.0f}ms |")
lines.append("")
if len(lines) == 2:
lines.append("*No issues detected.*")
lines.append("")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Comparison
# ---------------------------------------------------------------------------
def _render_comparison(
current: Dict[str, Any],
previous: Dict[str, Any],
) -> str:
prev_ts = previous.get("timestamp", "?")
prev_tag = previous.get("tag", "")
label = f"{prev_tag} ({prev_ts})" if prev_tag else prev_ts
lines = [
"## Comparison vs previous",
"",
f"Previous: **{label}**",
"",
"| Metric | Previous | Current | Delta |",
"|--------|----------|---------|-------|",
]
# Retrieval deltas
curr_ret = current.get("retrieval", {}).get("modes", {})
prev_ret = previous.get("retrieval", {}).get("modes", {})
for mode_name in curr_ret:
if mode_name not in prev_ret:
continue
cs = curr_ret[mode_name].get("summary", {})
ps = prev_ret[mode_name].get("summary", {})
for key, label_name in [
("celex_hit_at_k", "Hit@K"),
("celex_mrr", "MRR"),
("celex_ndcg_at_k", "NDCG"),
("latency_p50_ms", "p50 (ms)"),
]:
old_val = ps.get(key, 0.0)
new_val = cs.get(key, 0.0)
arrow = _delta_arrow(old_val, new_val)
delta = _delta_str(old_val, new_val)
if "ms" in label_name:
lines.append(f"| {mode_name} {label_name} | {old_val:.0f} | {new_val:.0f} | {arrow} {delta} |")
else:
lines.append(f"| {mode_name} {label_name} | {_fmt(old_val)} | {_fmt(new_val)} | {arrow} {delta} |")
# Generation deltas
curr_gen = current.get("generation", {}).get("summary", {})
prev_gen = previous.get("generation", {}).get("summary", {})
if curr_gen and prev_gen:
for metric_name in curr_gen.get("metrics_evaluated", []):
mean_key = f"{metric_name}_mean"
old_val = prev_gen.get(mean_key, 0.0)
new_val = curr_gen.get(mean_key, 0.0)
label_name = metric_name.replace("_", " ").title()
arrow = _delta_arrow(old_val, new_val)
delta = _delta_str(old_val, new_val)
lines.append(f"| {label_name} | {_fmt(old_val)} | {_fmt(new_val)} | {arrow} {delta} |")
lines.append("")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Charts
# ---------------------------------------------------------------------------
def _generate_retrieval_bar_chart(
retrieval: Dict[str, Any],
charts_dir: Path,
timestamp_prefix: str,
) -> Optional[str]:
"""Grouped bar chart comparing retrieval modes on key metrics."""
modes_data = retrieval.get("modes", {})
if not modes_data:
return None
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import numpy as np
mode_names = list(modes_data.keys())
metric_keys = [
("celex_hit_at_k", "Hit@K"),
("celex_mrr", "MRR"),
("celex_ndcg_at_k", "NDCG@K"),
("celex_precision_at_k", "P@K"),
]
values = {label: [] for _, label in metric_keys}
for mode_name in mode_names:
s = modes_data[mode_name].get("summary", {})
for key, label in metric_keys:
values[label].append(s.get(key, 0.0))
x = np.arange(len(mode_names))
width = 0.18
fig, ax = plt.subplots(figsize=(10, 5))
colors = ["#2196F3", "#4CAF50", "#FF9800", "#9C27B0"]
for i, (_, label) in enumerate(metric_keys):
offset = (i - len(metric_keys) / 2 + 0.5) * width
bars = ax.bar(x + offset, values[label], width, label=label, color=colors[i])
for bar in bars:
height = bar.get_height()
if height > 0:
ax.text(
bar.get_x() + bar.get_width() / 2.0,
height + 0.01,
f"{height:.2f}",
ha="center",
va="bottom",
fontsize=8,
)
ax.set_xlabel("Mode")
ax.set_ylabel("Score")
ax.set_title("Retrieval Metrics by Mode")
ax.set_xticks(x)
ax.set_xticklabels(mode_names)
ax.set_ylim(0, 1.15)
ax.legend(loc="upper right")
ax.grid(axis="y", alpha=0.3)
fig.tight_layout()
filename = f"{timestamp_prefix}_retrieval.png"
filepath = charts_dir / filename
fig.savefig(filepath, dpi=150)
plt.close(fig)
return f"charts/{filename}"
def _generate_ragas_radar_chart(
generation: Dict[str, Any],
charts_dir: Path,
timestamp_prefix: str,
) -> Optional[str]:
"""Radar chart for RAGAS metrics."""
summary = generation.get("summary", {})
metrics = summary.get("metrics_evaluated", [])
if not metrics:
return None
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import numpy as np
labels = [m.replace("_", " ").title() for m in metrics]
values = [summary.get(f"{m}_mean", 0.0) for m in metrics]
# Close the radar polygon
angles = np.linspace(0, 2 * np.pi, len(labels), endpoint=False).tolist()
values_plot = values + [values[0]]
angles_plot = angles + [angles[0]]
fig, ax = plt.subplots(figsize=(6, 6), subplot_kw=dict(polar=True))
ax.fill(angles_plot, values_plot, alpha=0.25, color="#2196F3")
ax.plot(angles_plot, values_plot, "o-", color="#2196F3", linewidth=2)
ax.set_xticks(angles)
ax.set_xticklabels(labels, fontsize=10)
ax.set_ylim(0, 1.0)
ax.set_yticks([0.2, 0.4, 0.6, 0.8, 1.0])
ax.set_yticklabels(["0.2", "0.4", "0.6", "0.8", "1.0"], fontsize=8)
ax.set_title("RAGAS Metrics", pad=20, fontsize=13)
# Add value annotations
for angle, value, label in zip(angles, values, labels):
ax.annotate(f"{value:.2f}", xy=(angle, value), xytext=(5, 5), textcoords="offset points", fontsize=9)
fig.tight_layout()
filename = f"{timestamp_prefix}_ragas_radar.png"
filepath = charts_dir / filename
fig.savefig(filepath, dpi=150)
plt.close(fig)
return f"charts/{filename}"
def _generate_latency_chart(
report: Dict[str, Any],
charts_dir: Path,
timestamp_prefix: str,
) -> Optional[str]:
"""Latency bar chart (p50/p95/p99) for retrieval modes + generation."""
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import numpy as np
categories: List[str] = []
p50_vals: List[float] = []
p95_vals: List[float] = []
p99_vals: List[float] = []
retrieval = report.get("retrieval", {})
for mode_name, mode_report in retrieval.get("modes", {}).items():
s = mode_report.get("summary", {})
categories.append(f"Retrieval\n{mode_name}")
p50_vals.append(s.get("latency_p50_ms", 0))
p95_vals.append(s.get("latency_p95_ms", 0))
p99_vals.append(s.get("latency_p99_ms", 0))
generation = report.get("generation", {})
gen_summary = generation.get("summary", {})
if gen_summary.get("latency_p50_ms"):
categories.append("Generation")
p50_vals.append(gen_summary.get("latency_p50_ms", 0))
p95_vals.append(gen_summary.get("latency_p95_ms", 0))
p99_vals.append(gen_summary.get("latency_p99_ms", 0))
if not categories:
return None
x = np.arange(len(categories))
width = 0.25
fig, ax = plt.subplots(figsize=(10, 5))
ax.bar(x - width, p50_vals, width, label="p50", color="#4CAF50")
ax.bar(x, p95_vals, width, label="p95", color="#FF9800")
ax.bar(x + width, p99_vals, width, label="p99", color="#F44336")
ax.set_xlabel("Phase")
ax.set_ylabel("Latency (ms)")
ax.set_title("Latency Distribution")
ax.set_xticks(x)
ax.set_xticklabels(categories, fontsize=9)
ax.legend()
ax.grid(axis="y", alpha=0.3)
fig.tight_layout()
filename = f"{timestamp_prefix}_latency.png"
filepath = charts_dir / filename
fig.savefig(filepath, dpi=150)
plt.close(fig)
return f"charts/{filename}"
# ---------------------------------------------------------------------------
# Main assembly
# ---------------------------------------------------------------------------
[docs]
def generate_report(
report: Dict[str, Any],
output_path: Path,
charts_dir: Path,
previous_report: Optional[Dict[str, Any]] = None,
) -> Path:
"""
Generate a Markdown benchmark report with embedded charts.
Args:
report: Parsed JSON benchmark report.
output_path: Path for the .md file.
charts_dir: Directory for chart PNGs.
previous_report: Optional previous report for comparison deltas.
Returns:
Path to the written .md file.
"""
charts_dir.mkdir(parents=True, exist_ok=True)
# Derive timestamp prefix for chart filenames
timestamp = report.get("timestamp", "")
if timestamp:
timestamp_prefix = timestamp[:19].replace(":", "").replace("-", "").replace("T", "_")
else:
timestamp_prefix = "report"
sections: List[str] = []
# Header
sections.append(_render_header(report))
# Horizontal rule
sections.append("---\n")
# Configuration
sections.append(_render_config(report))
# Dataset samples
sections.append(_render_dataset_info(report))
# Retrieval
retrieval = report.get("retrieval")
if retrieval:
sections.append(_render_retrieval_table(retrieval))
chart_ref = _generate_retrieval_bar_chart(retrieval, charts_dir, timestamp_prefix)
if chart_ref:
sections.append(f"\n")
# Generation
generation = report.get("generation")
if generation:
sections.append(_render_generation_table(generation))
chart_ref = _generate_ragas_radar_chart(generation, charts_dir, timestamp_prefix)
if chart_ref:
sections.append(f"\n")
# Latency
latency_ref = _generate_latency_chart(report, charts_dir, timestamp_prefix)
if latency_ref:
sections.append("## Latency\n")
sections.append(f"\n")
# Worst performers
sections.append(_render_per_query_details(retrieval, generation))
# Comparison
if previous_report:
sections.append(_render_comparison(report, previous_report))
# Footer
sections.append("---\n")
sections.append("*Generated by `report_generator.py`*\n")
# Write
md_content = "\n".join(sections)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(md_content, encoding="utf-8")
return output_path
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
[docs]
def main() -> int:
"""Convert a benchmark JSON report into Markdown plus chart assets."""
parser = argparse.ArgumentParser(description="Generate Markdown report from JSON benchmark")
parser.add_argument("json_path", help="Path to JSON benchmark report")
parser.add_argument("--compare", default=None, help="Path to previous JSON report for comparison")
parser.add_argument("--output", default=None, help="Output .md path (default: same as JSON with .md)")
args = parser.parse_args()
json_path = Path(args.json_path).resolve()
if not json_path.exists():
print(f"ERROR: {json_path} not found")
return 1
report = json.loads(json_path.read_text(encoding="utf-8"))
previous_report = None
if args.compare:
compare_path = Path(args.compare).resolve()
if compare_path.exists():
previous_report = json.loads(compare_path.read_text(encoding="utf-8"))
else:
print(f"WARNING: comparison file {compare_path} not found")
output_path = Path(args.output).resolve() if args.output else json_path.with_suffix(".md")
charts_dir = json_path.parent / "charts"
result = generate_report(report, output_path, charts_dir, previous_report)
print(f"Report generated: {result}")
return 0
if __name__ == "__main__":
raise SystemExit(main())