Source code for scripts.report_generator

#!/usr/bin/env python3
"""
Generate a Markdown benchmark report with embedded charts from a JSON report.

Usage:
    python scripts/report_generator.py benchmarks/2026-03-18_123000.json
    python scripts/report_generator.py benchmarks/2026-03-18_123000.json --compare benchmarks/prev.json
"""

from __future__ import annotations

import argparse
import json
from pathlib import Path
from typing import Any, Dict, List, Optional

# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
from bench_utils import format_float as _fmt  # noqa: E402


def _delta_str(old: float, new: float) -> str:
    if old == 0.0 and new == 0.0:
        return "="
    if old == 0.0:
        return f"+{new:.3f}"
    pct = ((new - old) / abs(old)) * 100.0
    if abs(pct) < 0.05:
        return "="
    sign = "+" if pct > 0 else ""
    return f"{sign}{pct:.1f}%"


def _delta_arrow(old: float, new: float) -> str:
    if old == 0.0 and new == 0.0:
        return ""
    diff = new - old
    if abs(diff) < 0.001:
        return ""
    return "▲" if diff > 0 else "▼"


def _safe_get(d: Dict[str, Any], *keys: str, default: Any = 0.0) -> Any:
    for key in keys:
        val = d.get(key)
        if val is not None:
            return val
    return default


# ---------------------------------------------------------------------------
# Header
# ---------------------------------------------------------------------------


def _render_header(report: Dict[str, Any]) -> str:
    timestamp = report.get("timestamp", "N/A")
    tag = report.get("tag", "")
    base_url = report.get("base_url", "N/A")

    lines = [
        "# RAG Benchmark Report",
        "",
        "| **Date** | **Tag** | **Service** |",
        "|----------|---------|-------------|",
        f"| {timestamp} | {tag or '—'} | `{base_url}` |",
        "",
    ]
    return "\n".join(lines)


# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------


def _render_config(report: Dict[str, Any]) -> str:
    lines = ["## Configuration", ""]

    retrieval = report.get("retrieval", {})
    generation = report.get("generation", {})

    if retrieval:
        modes = list(retrieval.get("modes", {}).keys())
        lines.append(f"- **Retrieval dataset**: `{retrieval.get('dataset', 'N/A')}`")
        lines.append(f"- **Queries**: {retrieval.get('examples_total', '?')}")
        lines.append(f"- **Top K**: {retrieval.get('top_k', '?')}")
        lines.append(f"- **Modes**: {', '.join(modes)}")

    if generation:
        summary = generation.get("summary", {})
        lines.append(f"- **QA dataset**: `{generation.get('dataset', 'N/A')}`")
        lines.append(f"- **QA queries**: {summary.get('examples_total', '?')}")
        metrics = summary.get("metrics_evaluated", [])
        if metrics:
            lines.append(f"- **RAGAS metrics**: {', '.join(metrics)}")
            lines.append(f"- **Judge**: {summary.get('judge_provider', '?')} / {summary.get('judge_model', '?')}")

    lines.append("")
    return "\n".join(lines)


# ---------------------------------------------------------------------------
# Dataset info
# ---------------------------------------------------------------------------


def _render_dataset_info(report: Dict[str, Any]) -> str:
    lines = ["## Dataset Samples", ""]

    retrieval = report.get("retrieval", {})
    if retrieval:
        modes = retrieval.get("modes", {})
        first_mode = next(iter(modes.values()), {}) if modes else {}
        details = first_mode.get("details", [])
        if details:
            lines.append("### Retrieval queries (sample)")
            lines.append("")
            for detail in details[:3]:
                query = detail.get("query", "?")
                lines.append(f"- {query}")
            if len(details) > 3:
                lines.append(f"- ... ({len(details) - 3} more)")
            lines.append("")

    generation = report.get("generation", {})
    if generation:
        details = generation.get("details", [])
        if details:
            lines.append("### QA queries (sample)")
            lines.append("")
            for detail in details[:3]:
                query = detail.get("query", "?")
                lines.append(f"- {query}")
            if len(details) > 3:
                lines.append(f"- ... ({len(details) - 3} more)")
            lines.append("")

    return "\n".join(lines)


# ---------------------------------------------------------------------------
# Retrieval table
# ---------------------------------------------------------------------------


def _render_retrieval_table(retrieval: Dict[str, Any]) -> str:
    modes_data = retrieval.get("modes", {})
    if not modes_data:
        return ""

    top_k = retrieval.get("top_k", "K")
    lines = [
        "## Retrieval Metrics",
        "",
        f"| Mode | Hit@{top_k} | MRR | NDCG@{top_k} | P@{top_k} | p50 (ms) | p95 (ms) |",
        "|------|---------|-----|----------|-------|----------|----------|",
    ]

    for mode_name, mode_report in modes_data.items():
        s = mode_report.get("summary", {})
        lines.append(
            f"| {mode_name} "
            f"| {_fmt(s.get('celex_hit_at_k', 0))} "
            f"| {_fmt(s.get('celex_mrr', 0))} "
            f"| {_fmt(s.get('celex_ndcg_at_k', 0))} "
            f"| {_fmt(s.get('celex_precision_at_k', 0))} "
            f"| {s.get('latency_p50_ms', 0):.0f} "
            f"| {s.get('latency_p95_ms', 0):.0f} |"
        )

    lines.append("")
    return "\n".join(lines)


# ---------------------------------------------------------------------------
# Generation table
# ---------------------------------------------------------------------------


def _render_generation_table(generation: Dict[str, Any]) -> str:
    summary = generation.get("summary", {})
    if not summary:
        return ""

    metrics = summary.get("metrics_evaluated", [])

    lines = [
        "## Generation Metrics (RAGAS)",
        "",
    ]

    if not metrics:
        lines.append("*RAGAS not available — simple metrics only.*")
        lines.append("")
    else:
        lines.append("| Metric | Mean | Scored |")
        lines.append("|--------|------|--------|")
        for metric_name in metrics:
            mean_val = summary.get(f"{metric_name}_mean", 0.0)
            scored = summary.get(f"{metric_name}_scored_count", 0)
            label = metric_name.replace("_", " ").title()
            lines.append(f"| {label} | {_fmt(mean_val)} | {scored} |")
        lines.append("")

        if "faithfulness" in metrics:
            threshold = summary.get("faithfulness_threshold", 0.8)
            below_count = summary.get("faithfulness_below_threshold_count", 0)
            below_ratio = summary.get("faithfulness_below_threshold_ratio", 0.0)
            lines.append(f"Faithfulness threshold: **{threshold}** — {below_count} queries below ({below_ratio:.0%})")
            lines.append("")

    lines.append(f"- Avg answer length: {summary.get('avg_answer_chars', 0):.0f} chars")
    lines.append(f"- Avg sources: {summary.get('avg_sources_per_example', 0):.1f}")
    lines.append(
        f"- Latency p50/p95: {summary.get('latency_p50_ms', 0):.0f}ms / {summary.get('latency_p95_ms', 0):.0f}ms"
    )
    lines.append("")
    return "\n".join(lines)


# ---------------------------------------------------------------------------
# Worst performers
# ---------------------------------------------------------------------------


def _render_per_query_details(
    retrieval: Optional[Dict[str, Any]],
    generation: Optional[Dict[str, Any]],
    worst_n: int = 5,
) -> str:
    lines = ["## Worst Performers", ""]

    if retrieval:
        modes_data = retrieval.get("modes", {})
        for mode_name, mode_report in modes_data.items():
            details = mode_report.get("details", [])
            missed = [d for d in details if d.get("celex_rank") is None]
            if missed:
                lines.append(f"### Retrieval misses — {mode_name} ({len(missed)} queries)")
                lines.append("")
                lines.append("| Query | Results | Latency |")
                lines.append("|-------|---------|---------|")
                for d in missed[:worst_n]:
                    query = d.get("query", "?")[:80]
                    results_count = d.get("results_count", 0)
                    latency = d.get("latency_ms", 0)
                    lines.append(f"| {query} | {results_count} | {latency:.0f}ms |")
                lines.append("")

    if generation:
        details = generation.get("details", [])
        flagged = [d for d in details if d.get("below_threshold")]
        if flagged:
            lines.append(f"### Low faithfulness ({len(flagged)} queries)")
            lines.append("")
            lines.append("| Query | Faithfulness | Contexts | Latency |")
            lines.append("|-------|-------------|----------|---------|")
            for d in sorted(flagged, key=lambda x: x.get("faithfulness", 0))[:worst_n]:
                query = d.get("query", "?")[:80]
                faith = d.get("faithfulness", 0)
                ctx = d.get("contexts_count", 0)
                latency = d.get("latency_ms", 0)
                lines.append(f"| {query} | {_fmt(faith)} | {ctx} | {latency:.0f}ms |")
            lines.append("")

    if len(lines) == 2:
        lines.append("*No issues detected.*")
        lines.append("")

    return "\n".join(lines)


# ---------------------------------------------------------------------------
# Comparison
# ---------------------------------------------------------------------------


def _render_comparison(
    current: Dict[str, Any],
    previous: Dict[str, Any],
) -> str:
    prev_ts = previous.get("timestamp", "?")
    prev_tag = previous.get("tag", "")
    label = f"{prev_tag} ({prev_ts})" if prev_tag else prev_ts

    lines = [
        "## Comparison vs previous",
        "",
        f"Previous: **{label}**",
        "",
        "| Metric | Previous | Current | Delta |",
        "|--------|----------|---------|-------|",
    ]

    # Retrieval deltas
    curr_ret = current.get("retrieval", {}).get("modes", {})
    prev_ret = previous.get("retrieval", {}).get("modes", {})
    for mode_name in curr_ret:
        if mode_name not in prev_ret:
            continue
        cs = curr_ret[mode_name].get("summary", {})
        ps = prev_ret[mode_name].get("summary", {})
        for key, label_name in [
            ("celex_hit_at_k", "Hit@K"),
            ("celex_mrr", "MRR"),
            ("celex_ndcg_at_k", "NDCG"),
            ("latency_p50_ms", "p50 (ms)"),
        ]:
            old_val = ps.get(key, 0.0)
            new_val = cs.get(key, 0.0)
            arrow = _delta_arrow(old_val, new_val)
            delta = _delta_str(old_val, new_val)
            if "ms" in label_name:
                lines.append(f"| {mode_name} {label_name} | {old_val:.0f} | {new_val:.0f} | {arrow} {delta} |")
            else:
                lines.append(f"| {mode_name} {label_name} | {_fmt(old_val)} | {_fmt(new_val)} | {arrow} {delta} |")

    # Generation deltas
    curr_gen = current.get("generation", {}).get("summary", {})
    prev_gen = previous.get("generation", {}).get("summary", {})
    if curr_gen and prev_gen:
        for metric_name in curr_gen.get("metrics_evaluated", []):
            mean_key = f"{metric_name}_mean"
            old_val = prev_gen.get(mean_key, 0.0)
            new_val = curr_gen.get(mean_key, 0.0)
            label_name = metric_name.replace("_", " ").title()
            arrow = _delta_arrow(old_val, new_val)
            delta = _delta_str(old_val, new_val)
            lines.append(f"| {label_name} | {_fmt(old_val)} | {_fmt(new_val)} | {arrow} {delta} |")

    lines.append("")
    return "\n".join(lines)


# ---------------------------------------------------------------------------
# Charts
# ---------------------------------------------------------------------------


def _generate_retrieval_bar_chart(
    retrieval: Dict[str, Any],
    charts_dir: Path,
    timestamp_prefix: str,
) -> Optional[str]:
    """Grouped bar chart comparing retrieval modes on key metrics."""
    modes_data = retrieval.get("modes", {})
    if not modes_data:
        return None

    import matplotlib

    matplotlib.use("Agg")
    import matplotlib.pyplot as plt
    import numpy as np

    mode_names = list(modes_data.keys())
    metric_keys = [
        ("celex_hit_at_k", "Hit@K"),
        ("celex_mrr", "MRR"),
        ("celex_ndcg_at_k", "NDCG@K"),
        ("celex_precision_at_k", "P@K"),
    ]

    values = {label: [] for _, label in metric_keys}
    for mode_name in mode_names:
        s = modes_data[mode_name].get("summary", {})
        for key, label in metric_keys:
            values[label].append(s.get(key, 0.0))

    x = np.arange(len(mode_names))
    width = 0.18
    fig, ax = plt.subplots(figsize=(10, 5))

    colors = ["#2196F3", "#4CAF50", "#FF9800", "#9C27B0"]
    for i, (_, label) in enumerate(metric_keys):
        offset = (i - len(metric_keys) / 2 + 0.5) * width
        bars = ax.bar(x + offset, values[label], width, label=label, color=colors[i])
        for bar in bars:
            height = bar.get_height()
            if height > 0:
                ax.text(
                    bar.get_x() + bar.get_width() / 2.0,
                    height + 0.01,
                    f"{height:.2f}",
                    ha="center",
                    va="bottom",
                    fontsize=8,
                )

    ax.set_xlabel("Mode")
    ax.set_ylabel("Score")
    ax.set_title("Retrieval Metrics by Mode")
    ax.set_xticks(x)
    ax.set_xticklabels(mode_names)
    ax.set_ylim(0, 1.15)
    ax.legend(loc="upper right")
    ax.grid(axis="y", alpha=0.3)
    fig.tight_layout()

    filename = f"{timestamp_prefix}_retrieval.png"
    filepath = charts_dir / filename
    fig.savefig(filepath, dpi=150)
    plt.close(fig)

    return f"charts/{filename}"


def _generate_ragas_radar_chart(
    generation: Dict[str, Any],
    charts_dir: Path,
    timestamp_prefix: str,
) -> Optional[str]:
    """Radar chart for RAGAS metrics."""
    summary = generation.get("summary", {})
    metrics = summary.get("metrics_evaluated", [])
    if not metrics:
        return None

    import matplotlib

    matplotlib.use("Agg")
    import matplotlib.pyplot as plt
    import numpy as np

    labels = [m.replace("_", " ").title() for m in metrics]
    values = [summary.get(f"{m}_mean", 0.0) for m in metrics]

    # Close the radar polygon
    angles = np.linspace(0, 2 * np.pi, len(labels), endpoint=False).tolist()
    values_plot = values + [values[0]]
    angles_plot = angles + [angles[0]]

    fig, ax = plt.subplots(figsize=(6, 6), subplot_kw=dict(polar=True))
    ax.fill(angles_plot, values_plot, alpha=0.25, color="#2196F3")
    ax.plot(angles_plot, values_plot, "o-", color="#2196F3", linewidth=2)

    ax.set_xticks(angles)
    ax.set_xticklabels(labels, fontsize=10)
    ax.set_ylim(0, 1.0)
    ax.set_yticks([0.2, 0.4, 0.6, 0.8, 1.0])
    ax.set_yticklabels(["0.2", "0.4", "0.6", "0.8", "1.0"], fontsize=8)
    ax.set_title("RAGAS Metrics", pad=20, fontsize=13)

    # Add value annotations
    for angle, value, label in zip(angles, values, labels):
        ax.annotate(f"{value:.2f}", xy=(angle, value), xytext=(5, 5), textcoords="offset points", fontsize=9)

    fig.tight_layout()

    filename = f"{timestamp_prefix}_ragas_radar.png"
    filepath = charts_dir / filename
    fig.savefig(filepath, dpi=150)
    plt.close(fig)

    return f"charts/{filename}"


def _generate_latency_chart(
    report: Dict[str, Any],
    charts_dir: Path,
    timestamp_prefix: str,
) -> Optional[str]:
    """Latency bar chart (p50/p95/p99) for retrieval modes + generation."""
    import matplotlib

    matplotlib.use("Agg")
    import matplotlib.pyplot as plt
    import numpy as np

    categories: List[str] = []
    p50_vals: List[float] = []
    p95_vals: List[float] = []
    p99_vals: List[float] = []

    retrieval = report.get("retrieval", {})
    for mode_name, mode_report in retrieval.get("modes", {}).items():
        s = mode_report.get("summary", {})
        categories.append(f"Retrieval\n{mode_name}")
        p50_vals.append(s.get("latency_p50_ms", 0))
        p95_vals.append(s.get("latency_p95_ms", 0))
        p99_vals.append(s.get("latency_p99_ms", 0))

    generation = report.get("generation", {})
    gen_summary = generation.get("summary", {})
    if gen_summary.get("latency_p50_ms"):
        categories.append("Generation")
        p50_vals.append(gen_summary.get("latency_p50_ms", 0))
        p95_vals.append(gen_summary.get("latency_p95_ms", 0))
        p99_vals.append(gen_summary.get("latency_p99_ms", 0))

    if not categories:
        return None

    x = np.arange(len(categories))
    width = 0.25
    fig, ax = plt.subplots(figsize=(10, 5))

    ax.bar(x - width, p50_vals, width, label="p50", color="#4CAF50")
    ax.bar(x, p95_vals, width, label="p95", color="#FF9800")
    ax.bar(x + width, p99_vals, width, label="p99", color="#F44336")

    ax.set_xlabel("Phase")
    ax.set_ylabel("Latency (ms)")
    ax.set_title("Latency Distribution")
    ax.set_xticks(x)
    ax.set_xticklabels(categories, fontsize=9)
    ax.legend()
    ax.grid(axis="y", alpha=0.3)
    fig.tight_layout()

    filename = f"{timestamp_prefix}_latency.png"
    filepath = charts_dir / filename
    fig.savefig(filepath, dpi=150)
    plt.close(fig)

    return f"charts/{filename}"


# ---------------------------------------------------------------------------
# Main assembly
# ---------------------------------------------------------------------------


[docs] def generate_report( report: Dict[str, Any], output_path: Path, charts_dir: Path, previous_report: Optional[Dict[str, Any]] = None, ) -> Path: """ Generate a Markdown benchmark report with embedded charts. Args: report: Parsed JSON benchmark report. output_path: Path for the .md file. charts_dir: Directory for chart PNGs. previous_report: Optional previous report for comparison deltas. Returns: Path to the written .md file. """ charts_dir.mkdir(parents=True, exist_ok=True) # Derive timestamp prefix for chart filenames timestamp = report.get("timestamp", "") if timestamp: timestamp_prefix = timestamp[:19].replace(":", "").replace("-", "").replace("T", "_") else: timestamp_prefix = "report" sections: List[str] = [] # Header sections.append(_render_header(report)) # Horizontal rule sections.append("---\n") # Configuration sections.append(_render_config(report)) # Dataset samples sections.append(_render_dataset_info(report)) # Retrieval retrieval = report.get("retrieval") if retrieval: sections.append(_render_retrieval_table(retrieval)) chart_ref = _generate_retrieval_bar_chart(retrieval, charts_dir, timestamp_prefix) if chart_ref: sections.append(f"![Retrieval metrics comparison]({chart_ref})\n") # Generation generation = report.get("generation") if generation: sections.append(_render_generation_table(generation)) chart_ref = _generate_ragas_radar_chart(generation, charts_dir, timestamp_prefix) if chart_ref: sections.append(f"![RAGAS radar chart]({chart_ref})\n") # Latency latency_ref = _generate_latency_chart(report, charts_dir, timestamp_prefix) if latency_ref: sections.append("## Latency\n") sections.append(f"![Latency distribution]({latency_ref})\n") # Worst performers sections.append(_render_per_query_details(retrieval, generation)) # Comparison if previous_report: sections.append(_render_comparison(report, previous_report)) # Footer sections.append("---\n") sections.append("*Generated by `report_generator.py`*\n") # Write md_content = "\n".join(sections) output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text(md_content, encoding="utf-8") return output_path
# --------------------------------------------------------------------------- # CLI # ---------------------------------------------------------------------------
[docs] def main() -> int: """Convert a benchmark JSON report into Markdown plus chart assets.""" parser = argparse.ArgumentParser(description="Generate Markdown report from JSON benchmark") parser.add_argument("json_path", help="Path to JSON benchmark report") parser.add_argument("--compare", default=None, help="Path to previous JSON report for comparison") parser.add_argument("--output", default=None, help="Output .md path (default: same as JSON with .md)") args = parser.parse_args() json_path = Path(args.json_path).resolve() if not json_path.exists(): print(f"ERROR: {json_path} not found") return 1 report = json.loads(json_path.read_text(encoding="utf-8")) previous_report = None if args.compare: compare_path = Path(args.compare).resolve() if compare_path.exists(): previous_report = json.loads(compare_path.read_text(encoding="utf-8")) else: print(f"WARNING: comparison file {compare_path} not found") output_path = Path(args.output).resolve() if args.output else json_path.with_suffix(".md") charts_dir = json_path.parent / "charts" result = generate_report(report, output_path, charts_dir, previous_report) print(f"Report generated: {result}") return 0
if __name__ == "__main__": raise SystemExit(main())