Source code for lalandre_rag.retrieval.context.community_reports
"""
Deterministic community report builder for graph-aware global RAG mode.
"""
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, TypedDict
from .models import ContextSlice
[docs]
class ActMeta(TypedDict):
"""Minimal act metadata used while assembling community reports."""
celex: str
title: str
[docs]
class RelationRow(TypedDict):
"""Normalized relation row used by the report builder."""
source_act_id: int
target_act_id: int
relation_type: str
description: Optional[str]
[docs]
class RelationTypeCount(TypedDict):
"""Relation-type histogram entry for one community."""
relation_type: str
count: int
[docs]
class CentralAct(TypedDict):
"""Central act description used in community summaries."""
act_id: int
celex: str
title: str
degree: int
[docs]
@dataclass(frozen=True)
class CommunityReport:
"""Compact summary of one connected component in the relation graph."""
community_id: str
act_ids: List[int]
celexes: List[str]
relation_count: int
top_relation_types: List[RelationTypeCount]
central_acts: List[CentralAct]
evidences: List[str]
summary: str
[docs]
def to_dict(self) -> Dict[str, Any]:
"""Serialize the report to a plain dictionary for API responses."""
return {
"community_id": self.community_id,
"act_ids": self.act_ids,
"celexes": self.celexes,
"relation_count": self.relation_count,
"top_relation_types": self.top_relation_types,
"central_acts": self.central_acts,
"evidences": self.evidences,
"summary": self.summary,
}
[docs]
class CommunityReportBuilder:
"""
Build deterministic community reports from context slices and act relations.
The algorithm is intentionally lightweight:
- keep only relations between acts present in the retrieved context,
- build connected components,
- summarize each component with relation distribution and pivot acts.
"""
def __init__(
self,
*,
max_reports: int = 6,
min_cluster_size: int = 2,
max_evidence_per_report: int = 3,
top_relation_types_limit: int = 5,
central_acts_limit: int = 3,
) -> None:
self.max_reports = max(max_reports, 1)
self.min_cluster_size = max(min_cluster_size, 1)
self.max_evidence_per_report = max(max_evidence_per_report, 0)
self.top_relation_types_limit = max(top_relation_types_limit, 1)
self.central_acts_limit = max(central_acts_limit, 1)
[docs]
def build_reports(self, slices: List[ContextSlice]) -> List[CommunityReport]:
"""Build deterministic community reports from enriched context slices."""
if not slices:
return []
act_meta = self._collect_acts(slices)
if not act_meta:
return []
relations = self._collect_relations(slices, set(act_meta.keys()))
adjacency = self._build_adjacency(act_ids=set(act_meta.keys()), relations=relations)
components = self._connected_components(adjacency)
raw_reports: List[CommunityReport] = []
for component in components:
if len(component) < self.min_cluster_size:
continue
report = self._build_single_report(
component=component,
relations=relations,
act_meta=act_meta,
community_id="",
)
raw_reports.append(report)
if not raw_reports:
return []
raw_reports.sort(
key=lambda report: (
report.relation_count,
len(report.act_ids),
report.community_id,
),
reverse=True,
)
selected = raw_reports[: self.max_reports]
assigned: List[CommunityReport] = []
for index, report in enumerate(selected, start=1):
assigned.append(
CommunityReport(
community_id=f"CM{index}",
act_ids=report.act_ids,
celexes=report.celexes,
relation_count=report.relation_count,
top_relation_types=report.top_relation_types,
central_acts=report.central_acts,
evidences=report.evidences,
summary=report.summary,
)
)
return assigned
@staticmethod
def _collect_acts(slices: List[ContextSlice]) -> Dict[int, ActMeta]:
acts: Dict[int, ActMeta] = {}
for item in slices:
act = item.act
acts[act.act_id] = {
"celex": act.celex,
"title": act.title,
}
return acts
@staticmethod
def _collect_relations(
slices: List[ContextSlice],
valid_act_ids: set[int],
) -> List[RelationRow]:
dedupe: set[tuple[int, int, str, Optional[str]]] = set()
rows: List[RelationRow] = []
for item in slices:
act = item.act
if not act.relations:
continue
for relation in act.relations:
source_id = relation.get("source_act_id")
target_id = relation.get("target_act_id")
if not isinstance(source_id, int) or not isinstance(target_id, int):
continue
if source_id not in valid_act_ids or target_id not in valid_act_ids:
continue
relation_type_raw = relation.get("relation_type")
relation_type = str(relation_type_raw).upper() if relation_type_raw is not None else "RELATED_TO"
description_raw = relation.get("description")
description = (
str(description_raw).strip()
if isinstance(description_raw, str) and description_raw.strip()
else None
)
key = (source_id, target_id, relation_type, description)
if key in dedupe:
continue
dedupe.add(key)
rows.append(
{
"source_act_id": source_id,
"target_act_id": target_id,
"relation_type": relation_type,
"description": description,
}
)
return rows
@staticmethod
def _build_adjacency(
*,
act_ids: set[int],
relations: List[RelationRow],
) -> Dict[int, set[int]]:
adjacency: Dict[int, set[int]] = {act_id: set() for act_id in act_ids}
for relation in relations:
source_id = relation["source_act_id"]
target_id = relation["target_act_id"]
adjacency.setdefault(source_id, set()).add(target_id)
adjacency.setdefault(target_id, set()).add(source_id)
return adjacency
@staticmethod
def _connected_components(adjacency: Dict[int, set[int]]) -> List[List[int]]:
visited: set[int] = set()
components: List[List[int]] = []
for root in sorted(adjacency.keys()):
if root in visited:
continue
stack = [root]
component: List[int] = []
while stack:
node = stack.pop()
if node in visited:
continue
visited.add(node)
component.append(node)
for neighbor in sorted(adjacency.get(node, set())):
if neighbor not in visited:
stack.append(neighbor)
components.append(sorted(component))
return components
def _build_single_report(
self,
*,
component: List[int],
relations: List[RelationRow],
act_meta: Dict[int, ActMeta],
community_id: str,
) -> CommunityReport:
component_ids = set(component)
internal_relations = [
relation
for relation in relations
if relation["source_act_id"] in component_ids and relation["target_act_id"] in component_ids
]
relation_type_counts: Dict[str, int] = {}
degrees: Dict[int, int] = {act_id: 0 for act_id in component}
evidences: List[str] = []
for relation in internal_relations:
relation_type = relation["relation_type"]
relation_type_counts[relation_type] = relation_type_counts.get(relation_type, 0) + 1
source_id = relation["source_act_id"]
target_id = relation["target_act_id"]
degrees[source_id] = degrees.get(source_id, 0) + 1
degrees[target_id] = degrees.get(target_id, 0) + 1
description = relation.get("description")
if (
isinstance(description, str)
and description not in evidences
and len(evidences) < self.max_evidence_per_report
):
evidences.append(description)
ordered_relation_types = sorted(
relation_type_counts.items(),
key=lambda item: item[1],
reverse=True,
)
top_relation_types: List[RelationTypeCount] = [
{"relation_type": relation_type, "count": count}
for relation_type, count in ordered_relation_types[: self.top_relation_types_limit]
]
ordered_central = sorted(
component,
key=lambda act_id: (degrees.get(act_id, 0), act_meta[act_id]["celex"]),
reverse=True,
)
central_acts: List[CentralAct] = []
for act_id in ordered_central[: self.central_acts_limit]:
central_acts.append(
{
"act_id": act_id,
"celex": act_meta[act_id]["celex"],
"title": act_meta[act_id]["title"],
"degree": degrees.get(act_id, 0),
}
)
relation_types_label = ", ".join(
f"{item['relation_type']}:{item['count']}" for item in top_relation_types[: self.central_acts_limit]
)
if not relation_types_label:
relation_types_label = "aucune relation qualifiée"
summary = (
f"Communauté de {len(component)} actes et {len(internal_relations)} relations "
f"(types dominants: {relation_types_label})."
)
return CommunityReport(
community_id=community_id,
act_ids=component,
celexes=[act_meta[act_id]["celex"] for act_id in component],
relation_count=len(internal_relations),
top_relation_types=top_relation_types,
central_acts=central_acts,
evidences=evidences,
summary=summary,
)