Source code for lalandre_rag.graph.context_budget

"""
Token-budget-aware context builder for Graph RAG.

Instead of blindly truncating acts by position, this module manages a
character budget split across three zones:

- **Semantic zone** (60 %): content from Qdrant vector matches
- **Graph zone** (30 %):   act titles and descriptions from Neo4j expansion
- **Relation zone** (10 %): relationship descriptions

Each zone is filled with the *highest-ranked* items first, so the LLM
always receives the most relevant content regardless of total volume.

Usage::

    budget = GraphContextBudget(max_chars=20000)
    context = budget.build(
        semantic_results=semantic_results,
        ranked_nodes=ranked_nodes,
        ranked_relationships=ranked_relationships,
    )
"""

import logging
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Tuple

from lalandre_core.utils import as_dict, to_optional_int

from lalandre_rag.graph.source_payloads import (
    build_graph_edge_source_item,
    build_graph_node_source_item,
)

logger = logging.getLogger(__name__)


[docs] @dataclass class BudgetAllocation: """Character-budget allocation across context zones.""" semantic_chars: int graph_chars: int relation_chars: int @property def total(self) -> int: """Return the total allocated character budget across all zones.""" return self.semantic_chars + self.graph_chars + self.relation_chars
[docs] @dataclass class ContextBuildResult: """Output of the context builder.""" combined_context: str source_id_map: Dict[Tuple[str, Optional[int], Optional[int]], str] semantic_count: int graph_nodes_used: int relationships_used: int budget_allocation: BudgetAllocation graph_node_refs: List[Dict[str, Any]] = field(default_factory=list) relationship_refs: List[Dict[str, Any]] = field(default_factory=list) chars_used: Dict[str, int] = field(default_factory=lambda: {})
[docs] class GraphContextBudget: """Build LLM context from scored and ranked graph results. Args: max_chars: Total character budget for the whole context block. semantic_share: Fraction reserved for semantic search content. graph_share: Fraction reserved for graph-expanded act information. relation_share: Fraction reserved for relationship descriptions. min_chars_per_source: Minimum chars reserved for each semantic source. """ def __init__( self, max_chars: int = 20000, semantic_share: float = 0.60, graph_share: float = 0.30, relation_share: float = 0.10, min_chars_per_source: int = 200, ) -> None: total_share = semantic_share + graph_share + relation_share self.max_chars = max_chars self.semantic_share = semantic_share / total_share self.graph_share = graph_share / total_share self.relation_share = relation_share / total_share self.min_chars_per_source = min_chars_per_source def _allocate(self) -> BudgetAllocation: return BudgetAllocation( semantic_chars=int(self.max_chars * self.semantic_share), graph_chars=int(self.max_chars * self.graph_share), relation_chars=int(self.max_chars * self.relation_share), ) # ── Semantic zone ───────────────────────────────────────────────── def _build_semantic_block( self, semantic_results: List[Any], budget: int, ) -> Tuple[str, Dict[Tuple[str, Optional[int], Optional[int]], str], int]: """ Build context from the top semantic search results. Returns (text_block, source_id_map, chars_remaining). """ source_id_map: Dict[Tuple[str, Optional[int], Optional[int]], str] = {} parts: List[str] = [] remaining = budget total_docs = len(semantic_results) for idx, doc in enumerate(semantic_results, start=1): if remaining <= 0: break payload = as_dict(doc.payload) content = str(payload.get("content", "") or "") celex = str(payload.get("celex", "Unknown") or "Unknown") title = str(payload.get("act_title") or payload.get("title") or "Unknown") chunk_id = to_optional_int(payload.get("chunk_id")) chunk_index = to_optional_int(payload.get("chunk_index")) subdivision_id = to_optional_int(payload.get("subdivision_id")) act_id = to_optional_int(payload.get("act_id")) if chunk_id is not None: location = f"chunk {chunk_id}:{chunk_index}" key: Tuple[str, Optional[int], Optional[int]] = ("chunk", chunk_id, act_id) fallback_key: Tuple[str, Optional[int], Optional[int]] = ("chunk", chunk_id, None) else: location = f"subdivision {subdivision_id}" key = ("subdivision", subdivision_id, act_id) fallback_key = ("subdivision", subdivision_id, None) source_id = f"S{idx}" source_id_map[key] = source_id source_id_map.setdefault(fallback_key, source_id) header = f"[{source_id} | CELEX: {celex} | {location}] {title}" header_len = len(header) + 1 remaining -= header_len if remaining <= 0: parts.append(header) break remaining_docs = total_docs - idx min_for_others = self.min_chars_per_source * remaining_docs alloc = max(0, remaining - min_for_others) if alloc < self.min_chars_per_source: alloc = min(remaining, self.min_chars_per_source) content_used = content[:alloc] remaining -= len(content_used) parts.append(f"{header}\n{content_used}") return "\n\n".join(parts), source_id_map, max(remaining, 0) # ── Graph zone ──────────────────────────────────────────────────── @staticmethod def _build_graph_block( ranked_nodes: List[Dict[str, Any]], budget: int, ) -> Tuple[str, int, List[Dict[str, Any]]]: """ Build context from ranked graph-expanded acts. Returns (text_block, nodes_used, node_refs). """ if not ranked_nodes or budget <= 0: return "", 0, [] lines: List[str] = ["--- Related Acts (ranked by relevance) ---"] remaining = budget - len(lines[0]) - 1 used = 0 refs: List[Dict[str, Any]] = [] for node in ranked_nodes: act_id = node.get("id") if act_id is None: continue source_id = f"G{used + 1}" source_item = build_graph_node_source_item( node=node, source_id=source_id, sequence_order=used + 1, ) celex = str(source_item["celex"]) title = str(source_item["title"]) score = float(source_item["score"]) line = f"- [{source_id}] [{celex}] {title} (relevance: {score:.2f})" line_len = len(line) + 1 if line_len > remaining: break lines.append(line) remaining -= line_len used += 1 refs.append(source_item) return "\n".join(lines), used, refs # ── Relation zone ───────────────────────────────────────────────── @staticmethod def _build_relation_block( ranked_relationships: List[Dict[str, Any]], act_lookup: Dict[int, Dict[str, str]], budget: int, ) -> Tuple[str, int, List[Dict[str, Any]]]: """ Build context from ranked relationships. Returns (text_block, relationships_used, relationship_refs). """ if not ranked_relationships or budget <= 0: return "", 0, [] lines: List[str] = ["--- Key Regulatory Relationships ---"] remaining = budget - len(lines[0]) - 1 used = 0 refs: List[Dict[str, Any]] = [] # Also add a summary distribution line type_counts: Dict[str, int] = {} for rel in ranked_relationships: rtype = str(rel.get("type", "RELATED_TO")) type_counts[rtype] = type_counts.get(rtype, 0) + 1 for rel in ranked_relationships: rtype = str(rel.get("type", "RELATED_TO")) src_id = rel.get("start_node") tgt_id = rel.get("end_node") src_meta = act_lookup.get(int(src_id), {}) if src_id is not None else {} tgt_meta = act_lookup.get(int(tgt_id), {}) if tgt_id is not None else {} src_celex = src_meta.get("celex", f"ACT-{src_id}") if src_id is not None else "?" tgt_celex = tgt_meta.get("celex", f"ACT-{tgt_id}") if tgt_id is not None else "?" description = str(rel.get("description", "") or "").strip() source_id = f"R{used + 1}" relation_excerpt = f"{src_celex} -[{rtype}]-> {tgt_celex}" line = f"- [{source_id}] {relation_excerpt}" if description: line += f" | {description}" line_len = len(line) + 1 if line_len > remaining: break lines.append(line) remaining -= line_len used += 1 refs.append( build_graph_edge_source_item( relationship=rel, source_id=source_id, sequence_order=used, start_celex=src_celex, end_celex=tgt_celex, ) ) if type_counts and remaining > 20: dist_parts = sorted(type_counts.items(), key=lambda x: x[1], reverse=True) dist_str = ", ".join(f"{t}:{c}" for t, c in dist_parts[:8]) dist_line = f"Distribution: {dist_str}" if len(dist_line) + 1 <= remaining: lines.append(dist_line) return "\n".join(lines), used, refs # ── Public API ────────────────────────────────────────────────────
[docs] def build( self, *, semantic_results: List[Any], ranked_nodes: List[Dict[str, Any]], ranked_relationships: List[Dict[str, Any]], ) -> ContextBuildResult: """ Assemble the full context string from ranked results. Spills unused budget from one zone to the next (semantic → graph → relation). """ alloc = self._allocate() # Phase 1: Semantic (gets first priority for spillover) sem_block, source_id_map, sem_remaining = self._build_semantic_block(semantic_results, alloc.semantic_chars) sem_used = alloc.semantic_chars - sem_remaining # Spill unused semantic budget → graph effective_graph_budget = alloc.graph_chars + sem_remaining # Phase 2: Graph # Build act_labels from ranked_nodes act_lookup: Dict[int, Dict[str, str]] = {} for node in ranked_nodes: nid = node.get("id") if nid is not None: celex = str(node.get("celex", f"ACT-{nid}") or f"ACT-{nid}") title = str(node.get("title", "Unknown") or "Unknown") act_lookup[int(nid)] = { "celex": celex, "title": title, "label": f"{celex} | {title}", } graph_block, graph_count, graph_refs = self._build_graph_block(ranked_nodes, effective_graph_budget) graph_used = len(graph_block) graph_remaining = max(effective_graph_budget - graph_used, 0) # Spill unused graph budget → relation effective_rel_budget = alloc.relation_chars + graph_remaining # Phase 3: Relations rel_block, rel_count, rel_refs = self._build_relation_block( ranked_relationships, act_lookup, effective_rel_budget ) rel_used = len(rel_block) # Assemble sections = [s for s in [sem_block, graph_block, rel_block] if s] combined = "\n\n".join(sections) return ContextBuildResult( combined_context=combined, source_id_map=source_id_map, semantic_count=len(semantic_results), graph_nodes_used=graph_count, relationships_used=rel_count, graph_node_refs=graph_refs, relationship_refs=rel_refs, budget_allocation=alloc, chars_used={ "semantic": sem_used, "graph": graph_used, "relation": rel_used, "total": len(combined), }, )