Source code for lalandre_rag.retrieval.context.community_reports

"""
Deterministic community report builder for graph-aware global RAG mode.
"""

from dataclasses import dataclass
from typing import Any, Dict, List, Optional, TypedDict

from .models import ContextSlice


[docs] class ActMeta(TypedDict): """Minimal act metadata used while assembling community reports.""" celex: str title: str
[docs] class RelationRow(TypedDict): """Normalized relation row used by the report builder.""" source_act_id: int target_act_id: int relation_type: str description: Optional[str]
[docs] class RelationTypeCount(TypedDict): """Relation-type histogram entry for one community.""" relation_type: str count: int
[docs] class CentralAct(TypedDict): """Central act description used in community summaries.""" act_id: int celex: str title: str degree: int
[docs] @dataclass(frozen=True) class CommunityReport: """Compact summary of one connected component in the relation graph.""" community_id: str act_ids: List[int] celexes: List[str] relation_count: int top_relation_types: List[RelationTypeCount] central_acts: List[CentralAct] evidences: List[str] summary: str
[docs] def to_dict(self) -> Dict[str, Any]: """Serialize the report to a plain dictionary for API responses.""" return { "community_id": self.community_id, "act_ids": self.act_ids, "celexes": self.celexes, "relation_count": self.relation_count, "top_relation_types": self.top_relation_types, "central_acts": self.central_acts, "evidences": self.evidences, "summary": self.summary, }
[docs] class CommunityReportBuilder: """ Build deterministic community reports from context slices and act relations. The algorithm is intentionally lightweight: - keep only relations between acts present in the retrieved context, - build connected components, - summarize each component with relation distribution and pivot acts. """ def __init__( self, *, max_reports: int = 6, min_cluster_size: int = 2, max_evidence_per_report: int = 3, top_relation_types_limit: int = 5, central_acts_limit: int = 3, ) -> None: self.max_reports = max(max_reports, 1) self.min_cluster_size = max(min_cluster_size, 1) self.max_evidence_per_report = max(max_evidence_per_report, 0) self.top_relation_types_limit = max(top_relation_types_limit, 1) self.central_acts_limit = max(central_acts_limit, 1)
[docs] def build_reports(self, slices: List[ContextSlice]) -> List[CommunityReport]: """Build deterministic community reports from enriched context slices.""" if not slices: return [] act_meta = self._collect_acts(slices) if not act_meta: return [] relations = self._collect_relations(slices, set(act_meta.keys())) adjacency = self._build_adjacency(act_ids=set(act_meta.keys()), relations=relations) components = self._connected_components(adjacency) raw_reports: List[CommunityReport] = [] for component in components: if len(component) < self.min_cluster_size: continue report = self._build_single_report( component=component, relations=relations, act_meta=act_meta, community_id="", ) raw_reports.append(report) if not raw_reports: return [] raw_reports.sort( key=lambda report: ( report.relation_count, len(report.act_ids), report.community_id, ), reverse=True, ) selected = raw_reports[: self.max_reports] assigned: List[CommunityReport] = [] for index, report in enumerate(selected, start=1): assigned.append( CommunityReport( community_id=f"CM{index}", act_ids=report.act_ids, celexes=report.celexes, relation_count=report.relation_count, top_relation_types=report.top_relation_types, central_acts=report.central_acts, evidences=report.evidences, summary=report.summary, ) ) return assigned
@staticmethod def _collect_acts(slices: List[ContextSlice]) -> Dict[int, ActMeta]: acts: Dict[int, ActMeta] = {} for item in slices: act = item.act acts[act.act_id] = { "celex": act.celex, "title": act.title, } return acts @staticmethod def _collect_relations( slices: List[ContextSlice], valid_act_ids: set[int], ) -> List[RelationRow]: dedupe: set[tuple[int, int, str, Optional[str]]] = set() rows: List[RelationRow] = [] for item in slices: act = item.act if not act.relations: continue for relation in act.relations: source_id = relation.get("source_act_id") target_id = relation.get("target_act_id") if not isinstance(source_id, int) or not isinstance(target_id, int): continue if source_id not in valid_act_ids or target_id not in valid_act_ids: continue relation_type_raw = relation.get("relation_type") relation_type = str(relation_type_raw).upper() if relation_type_raw is not None else "RELATED_TO" description_raw = relation.get("description") description = ( str(description_raw).strip() if isinstance(description_raw, str) and description_raw.strip() else None ) key = (source_id, target_id, relation_type, description) if key in dedupe: continue dedupe.add(key) rows.append( { "source_act_id": source_id, "target_act_id": target_id, "relation_type": relation_type, "description": description, } ) return rows @staticmethod def _build_adjacency( *, act_ids: set[int], relations: List[RelationRow], ) -> Dict[int, set[int]]: adjacency: Dict[int, set[int]] = {act_id: set() for act_id in act_ids} for relation in relations: source_id = relation["source_act_id"] target_id = relation["target_act_id"] adjacency.setdefault(source_id, set()).add(target_id) adjacency.setdefault(target_id, set()).add(source_id) return adjacency @staticmethod def _connected_components(adjacency: Dict[int, set[int]]) -> List[List[int]]: visited: set[int] = set() components: List[List[int]] = [] for root in sorted(adjacency.keys()): if root in visited: continue stack = [root] component: List[int] = [] while stack: node = stack.pop() if node in visited: continue visited.add(node) component.append(node) for neighbor in sorted(adjacency.get(node, set())): if neighbor not in visited: stack.append(neighbor) components.append(sorted(component)) return components def _build_single_report( self, *, component: List[int], relations: List[RelationRow], act_meta: Dict[int, ActMeta], community_id: str, ) -> CommunityReport: component_ids = set(component) internal_relations = [ relation for relation in relations if relation["source_act_id"] in component_ids and relation["target_act_id"] in component_ids ] relation_type_counts: Dict[str, int] = {} degrees: Dict[int, int] = {act_id: 0 for act_id in component} evidences: List[str] = [] for relation in internal_relations: relation_type = relation["relation_type"] relation_type_counts[relation_type] = relation_type_counts.get(relation_type, 0) + 1 source_id = relation["source_act_id"] target_id = relation["target_act_id"] degrees[source_id] = degrees.get(source_id, 0) + 1 degrees[target_id] = degrees.get(target_id, 0) + 1 description = relation.get("description") if ( isinstance(description, str) and description not in evidences and len(evidences) < self.max_evidence_per_report ): evidences.append(description) ordered_relation_types = sorted( relation_type_counts.items(), key=lambda item: item[1], reverse=True, ) top_relation_types: List[RelationTypeCount] = [ {"relation_type": relation_type, "count": count} for relation_type, count in ordered_relation_types[: self.top_relation_types_limit] ] ordered_central = sorted( component, key=lambda act_id: (degrees.get(act_id, 0), act_meta[act_id]["celex"]), reverse=True, ) central_acts: List[CentralAct] = [] for act_id in ordered_central[: self.central_acts_limit]: central_acts.append( { "act_id": act_id, "celex": act_meta[act_id]["celex"], "title": act_meta[act_id]["title"], "degree": degrees.get(act_id, 0), } ) relation_types_label = ", ".join( f"{item['relation_type']}:{item['count']}" for item in top_relation_types[: self.central_acts_limit] ) if not relation_types_label: relation_types_label = "aucune relation qualifiée" summary = ( f"Communauté de {len(component)} actes et {len(internal_relations)} relations " f"(types dominants: {relation_types_label})." ) return CommunityReport( community_id=community_id, act_ids=component, celexes=[act_meta[act_id]["celex"] for act_id in component], relation_count=len(internal_relations), top_relation_types=top_relation_types, central_acts=central_acts, evidences=evidences, summary=summary, )