"""
Token-budget-aware context builder for Graph RAG.
Instead of blindly truncating acts by position, this module manages a
character budget split across three zones:
- **Semantic zone** (60 %): content from Qdrant vector matches
- **Graph zone** (30 %): act titles and descriptions from Neo4j expansion
- **Relation zone** (10 %): relationship descriptions
Each zone is filled with the *highest-ranked* items first, so the LLM
always receives the most relevant content regardless of total volume.
Usage::
budget = GraphContextBudget(max_chars=20000)
context = budget.build(
semantic_results=semantic_results,
ranked_nodes=ranked_nodes,
ranked_relationships=ranked_relationships,
)
"""
import logging
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Tuple
from lalandre_core.utils import as_dict, to_optional_int
from lalandre_rag.graph.source_payloads import (
build_graph_edge_source_item,
build_graph_node_source_item,
)
logger = logging.getLogger(__name__)
[docs]
@dataclass
class BudgetAllocation:
"""Character-budget allocation across context zones."""
semantic_chars: int
graph_chars: int
relation_chars: int
@property
def total(self) -> int:
"""Return the total allocated character budget across all zones."""
return self.semantic_chars + self.graph_chars + self.relation_chars
[docs]
@dataclass
class ContextBuildResult:
"""Output of the context builder."""
combined_context: str
source_id_map: Dict[Tuple[str, Optional[int], Optional[int]], str]
semantic_count: int
graph_nodes_used: int
relationships_used: int
budget_allocation: BudgetAllocation
graph_node_refs: List[Dict[str, Any]] = field(default_factory=list)
relationship_refs: List[Dict[str, Any]] = field(default_factory=list)
chars_used: Dict[str, int] = field(default_factory=lambda: {})
[docs]
class GraphContextBudget:
"""Build LLM context from scored and ranked graph results.
Args:
max_chars: Total character budget for the whole context block.
semantic_share: Fraction reserved for semantic search content.
graph_share: Fraction reserved for graph-expanded act information.
relation_share: Fraction reserved for relationship descriptions.
min_chars_per_source: Minimum chars reserved for each semantic source.
"""
def __init__(
self,
max_chars: int = 20000,
semantic_share: float = 0.60,
graph_share: float = 0.30,
relation_share: float = 0.10,
min_chars_per_source: int = 200,
) -> None:
total_share = semantic_share + graph_share + relation_share
self.max_chars = max_chars
self.semantic_share = semantic_share / total_share
self.graph_share = graph_share / total_share
self.relation_share = relation_share / total_share
self.min_chars_per_source = min_chars_per_source
def _allocate(self) -> BudgetAllocation:
return BudgetAllocation(
semantic_chars=int(self.max_chars * self.semantic_share),
graph_chars=int(self.max_chars * self.graph_share),
relation_chars=int(self.max_chars * self.relation_share),
)
# ── Semantic zone ─────────────────────────────────────────────────
def _build_semantic_block(
self,
semantic_results: List[Any],
budget: int,
) -> Tuple[str, Dict[Tuple[str, Optional[int], Optional[int]], str], int]:
"""
Build context from the top semantic search results.
Returns (text_block, source_id_map, chars_remaining).
"""
source_id_map: Dict[Tuple[str, Optional[int], Optional[int]], str] = {}
parts: List[str] = []
remaining = budget
total_docs = len(semantic_results)
for idx, doc in enumerate(semantic_results, start=1):
if remaining <= 0:
break
payload = as_dict(doc.payload)
content = str(payload.get("content", "") or "")
celex = str(payload.get("celex", "Unknown") or "Unknown")
title = str(payload.get("act_title") or payload.get("title") or "Unknown")
chunk_id = to_optional_int(payload.get("chunk_id"))
chunk_index = to_optional_int(payload.get("chunk_index"))
subdivision_id = to_optional_int(payload.get("subdivision_id"))
act_id = to_optional_int(payload.get("act_id"))
if chunk_id is not None:
location = f"chunk {chunk_id}:{chunk_index}"
key: Tuple[str, Optional[int], Optional[int]] = ("chunk", chunk_id, act_id)
fallback_key: Tuple[str, Optional[int], Optional[int]] = ("chunk", chunk_id, None)
else:
location = f"subdivision {subdivision_id}"
key = ("subdivision", subdivision_id, act_id)
fallback_key = ("subdivision", subdivision_id, None)
source_id = f"S{idx}"
source_id_map[key] = source_id
source_id_map.setdefault(fallback_key, source_id)
header = f"[{source_id} | CELEX: {celex} | {location}] {title}"
header_len = len(header) + 1
remaining -= header_len
if remaining <= 0:
parts.append(header)
break
remaining_docs = total_docs - idx
min_for_others = self.min_chars_per_source * remaining_docs
alloc = max(0, remaining - min_for_others)
if alloc < self.min_chars_per_source:
alloc = min(remaining, self.min_chars_per_source)
content_used = content[:alloc]
remaining -= len(content_used)
parts.append(f"{header}\n{content_used}")
return "\n\n".join(parts), source_id_map, max(remaining, 0)
# ── Graph zone ────────────────────────────────────────────────────
@staticmethod
def _build_graph_block(
ranked_nodes: List[Dict[str, Any]],
budget: int,
) -> Tuple[str, int, List[Dict[str, Any]]]:
"""
Build context from ranked graph-expanded acts.
Returns (text_block, nodes_used, node_refs).
"""
if not ranked_nodes or budget <= 0:
return "", 0, []
lines: List[str] = ["--- Related Acts (ranked by relevance) ---"]
remaining = budget - len(lines[0]) - 1
used = 0
refs: List[Dict[str, Any]] = []
for node in ranked_nodes:
act_id = node.get("id")
if act_id is None:
continue
source_id = f"G{used + 1}"
source_item = build_graph_node_source_item(
node=node,
source_id=source_id,
sequence_order=used + 1,
)
celex = str(source_item["celex"])
title = str(source_item["title"])
score = float(source_item["score"])
line = f"- [{source_id}] [{celex}] {title} (relevance: {score:.2f})"
line_len = len(line) + 1
if line_len > remaining:
break
lines.append(line)
remaining -= line_len
used += 1
refs.append(source_item)
return "\n".join(lines), used, refs
# ── Relation zone ─────────────────────────────────────────────────
@staticmethod
def _build_relation_block(
ranked_relationships: List[Dict[str, Any]],
act_lookup: Dict[int, Dict[str, str]],
budget: int,
) -> Tuple[str, int, List[Dict[str, Any]]]:
"""
Build context from ranked relationships.
Returns (text_block, relationships_used, relationship_refs).
"""
if not ranked_relationships or budget <= 0:
return "", 0, []
lines: List[str] = ["--- Key Regulatory Relationships ---"]
remaining = budget - len(lines[0]) - 1
used = 0
refs: List[Dict[str, Any]] = []
# Also add a summary distribution line
type_counts: Dict[str, int] = {}
for rel in ranked_relationships:
rtype = str(rel.get("type", "RELATED_TO"))
type_counts[rtype] = type_counts.get(rtype, 0) + 1
for rel in ranked_relationships:
rtype = str(rel.get("type", "RELATED_TO"))
src_id = rel.get("start_node")
tgt_id = rel.get("end_node")
src_meta = act_lookup.get(int(src_id), {}) if src_id is not None else {}
tgt_meta = act_lookup.get(int(tgt_id), {}) if tgt_id is not None else {}
src_celex = src_meta.get("celex", f"ACT-{src_id}") if src_id is not None else "?"
tgt_celex = tgt_meta.get("celex", f"ACT-{tgt_id}") if tgt_id is not None else "?"
description = str(rel.get("description", "") or "").strip()
source_id = f"R{used + 1}"
relation_excerpt = f"{src_celex} -[{rtype}]-> {tgt_celex}"
line = f"- [{source_id}] {relation_excerpt}"
if description:
line += f" | {description}"
line_len = len(line) + 1
if line_len > remaining:
break
lines.append(line)
remaining -= line_len
used += 1
refs.append(
build_graph_edge_source_item(
relationship=rel,
source_id=source_id,
sequence_order=used,
start_celex=src_celex,
end_celex=tgt_celex,
)
)
if type_counts and remaining > 20:
dist_parts = sorted(type_counts.items(), key=lambda x: x[1], reverse=True)
dist_str = ", ".join(f"{t}:{c}" for t, c in dist_parts[:8])
dist_line = f"Distribution: {dist_str}"
if len(dist_line) + 1 <= remaining:
lines.append(dist_line)
return "\n".join(lines), used, refs
# ── Public API ────────────────────────────────────────────────────
[docs]
def build(
self,
*,
semantic_results: List[Any],
ranked_nodes: List[Dict[str, Any]],
ranked_relationships: List[Dict[str, Any]],
) -> ContextBuildResult:
"""
Assemble the full context string from ranked results.
Spills unused budget from one zone to the next
(semantic → graph → relation).
"""
alloc = self._allocate()
# Phase 1: Semantic (gets first priority for spillover)
sem_block, source_id_map, sem_remaining = self._build_semantic_block(semantic_results, alloc.semantic_chars)
sem_used = alloc.semantic_chars - sem_remaining
# Spill unused semantic budget → graph
effective_graph_budget = alloc.graph_chars + sem_remaining
# Phase 2: Graph
# Build act_labels from ranked_nodes
act_lookup: Dict[int, Dict[str, str]] = {}
for node in ranked_nodes:
nid = node.get("id")
if nid is not None:
celex = str(node.get("celex", f"ACT-{nid}") or f"ACT-{nid}")
title = str(node.get("title", "Unknown") or "Unknown")
act_lookup[int(nid)] = {
"celex": celex,
"title": title,
"label": f"{celex} | {title}",
}
graph_block, graph_count, graph_refs = self._build_graph_block(ranked_nodes, effective_graph_budget)
graph_used = len(graph_block)
graph_remaining = max(effective_graph_budget - graph_used, 0)
# Spill unused graph budget → relation
effective_rel_budget = alloc.relation_chars + graph_remaining
# Phase 3: Relations
rel_block, rel_count, rel_refs = self._build_relation_block(
ranked_relationships, act_lookup, effective_rel_budget
)
rel_used = len(rel_block)
# Assemble
sections = [s for s in [sem_block, graph_block, rel_block] if s]
combined = "\n\n".join(sections)
return ContextBuildResult(
combined_context=combined,
source_id_map=source_id_map,
semantic_count=len(semantic_results),
graph_nodes_used=graph_count,
relationships_used=rel_count,
graph_node_refs=graph_refs,
relationship_refs=rel_refs,
budget_allocation=alloc,
chars_used={
"semantic": sem_used,
"graph": graph_used,
"relation": rel_used,
"total": len(combined),
},
)