Source code for lalandre_rag.response.builder

"""
Response Builder
Centralized builder for unified response format across all RAG modes
"""

import re
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, cast

from lalandre_core.config import get_config
from pydantic import BaseModel, Field, model_validator

from lalandre_rag.retrieval.trace import extract_trace
from lalandre_rag.scoring import build_relevance_score_payload

SourceDoc = Dict[str, Any]
MetadataDict = Dict[str, Any]
ActsDict = Dict[str, Any]


[docs] class SourcesBlock(BaseModel): """Validated sources block in a RAG response.""" total: int = 0 documents: List[Dict[str, Any]] = Field(default_factory=list) acts: Dict[str, Any] = Field(default_factory=dict)
[docs] @model_validator(mode="after") def sync_total(self) -> "SourcesBlock": """Keep ``total`` aligned with the number of document sources.""" if self.total != len(self.documents): object.__setattr__(self, "total", len(self.documents)) return self
[docs] class RAGResponse(BaseModel): """Validated unified response format for all RAG modes.""" mode: str query: str answer: Optional[str] = None sources: SourcesBlock = Field(default_factory=SourcesBlock) metadata: Dict[str, Any] = Field(default_factory=dict)
[docs] def format_doc_location( chunk_id: Optional[int], chunk_index: Optional[int], subdivision_type: str, subdivision_id: int, ) -> str: """ Formate la localisation d'une tranche de document (chunk ou subdivision). Ex. : "chunk 42:3" ou "article 12" Utilisé pour construire les en-têtes de sources dans tous les modes. """ if chunk_id is not None: return f"chunk {chunk_id}:{chunk_index}" return f"{subdivision_type} {subdivision_id}"
[docs] def format_source_header( source_id: str, celex: str, location: str, title: str, regulatory_level: Optional[str] = None, ) -> str: """ Formate l'en-tête standard inséré dans le contexte LLM. Ex. : "[S1 | CELEX: 32016R0679 | L1 | article 5] Règlement général..." """ level_tag = f" | {regulatory_level}" if regulatory_level else "" return f"[{source_id} | CELEX: {celex}{level_tag} | {location}] {title}"
def _empty_sources() -> List[SourceDoc]: return [] def _empty_metadata() -> MetadataDict: return {} def _empty_acts() -> ActsDict: return {} def _coerce_metadata(value: Any) -> MetadataDict: if isinstance(value, dict): return cast(MetadataDict, value) return {} def _as_optional_int(value: Any) -> int | None: return value if isinstance(value, int) else None
[docs] @dataclass class ResponseBuilder: """ Builder for the unified response format Usage: builder = ResponseBuilder(mode="search", query="test") builder.set_answer(None) builder.set_sources([{"celex": "123", "title": "Test"}]) builder.add_metadata("warning", "Test warning") response = builder.build() """ mode: str query: str _answer: Optional[str] = None _sources: List[SourceDoc] = field(default_factory=_empty_sources) _metadata: MetadataDict = field(default_factory=_empty_metadata) _acts: ActsDict = field(default_factory=_empty_acts)
[docs] def set_answer(self, answer: Optional[str]) -> "ResponseBuilder": """Set the generated answer.""" self._answer = answer return self
[docs] def set_sources(self, documents: List[Dict[str, Any]]) -> "ResponseBuilder": """Replace all source documents.""" self._sources = documents return self
[docs] def add_metadata(self, key: str, value: Any) -> "ResponseBuilder": """Add a metadata entry.""" self._metadata[key] = value return self
[docs] def set_acts(self, acts: Dict[str, Any]) -> "ResponseBuilder": """Replace all act contexts.""" self._acts = acts return self
[docs] def build(self) -> Dict[str, Any]: """ Build the response in the unified format. Returns: Dict with the validated unified format. """ return RAGResponse( mode=self.mode, query=self.query, answer=self._answer, sources=SourcesBlock( total=len(self._sources), documents=self._sources, acts=self._acts, ), metadata=self._metadata, ).model_dump()
[docs] def build_source_trace(metadata: Optional[Dict[str, Any]]) -> Dict[str, Any]: """ Extract a compact, traceable subset of metadata for sources. Avoids duplicating large payload fields while preserving retrieval provenance. """ return extract_trace(metadata)
[docs] def build_source_document( doc: Any, *, include_relations: bool = False, include_subjects: bool = False, include_full_content: bool = True, include_content_preview: bool = False, content_preview_length: Optional[int] = None, include_snippet: bool = False, snippet_length: Optional[int] = None, content_used: Optional[str] = None, content_truncated: Optional[bool] = None, source_id: Optional[str] = None, ) -> Dict[str, Any]: """ Build a standardized source document payload from a context slice. This keeps response formatting consistent across RAG modes while reusing upstream metadata where possible. """ budget_cfg = get_config().context_budget if content_preview_length is None: content_preview_length = budget_cfg.content_preview_chars if snippet_length is None: snippet_length = budget_cfg.snippet_preview_chars full_content = doc.content or "" content = full_content if include_full_content else "" trace_input = getattr(doc, "trace", None) metadata: MetadataDict source_doc: SourceDoc if hasattr(doc, "act") and hasattr(doc, "doc"): act = doc.act doc_meta = doc.doc metadata = _coerce_metadata(getattr(doc_meta, "payload", None)) trace_metadata = _coerce_metadata(trace_input) trace_source: MetadataDict | None = trace_metadata if trace_metadata else (metadata if metadata else None) source_doc = { "act_id": act.act_id, "celex": act.celex, "title": act.title, "act_type": act.act_type, "subdivision_id": doc_meta.subdivision_id, "subdivision_type": doc_meta.subdivision_type, "sequence_order": doc_meta.sequence_order, "content": content, "content_length": len(full_content), "trace": build_source_trace(trace_source), **build_relevance_score_payload(doc.score), } if act.url_eurlex: source_doc["url"] = act.url_eurlex if doc_meta.chunk_id is not None: source_doc["chunk_id"] = doc_meta.chunk_id if doc_meta.chunk_index is not None: source_doc["chunk_index"] = doc_meta.chunk_index if doc_meta.source_kind: source_doc["source_kind"] = doc_meta.source_kind else: metadata = _coerce_metadata(getattr(doc, "metadata", None)) trace_metadata = _coerce_metadata(trace_input) trace_source = trace_metadata if trace_metadata else (metadata if metadata else None) source_doc = { "celex": doc.celex, "title": doc.title or doc.celex or "Unknown", "act_id": doc.act_id, "act_type": doc.act_type, "subdivision_id": doc.subdivision_id, "subdivision_type": doc.subdivision_type, "sequence_order": doc.sequence_order, "content": content, "content_length": len(full_content), "trace": build_source_trace(trace_source), **build_relevance_score_payload(doc.score), } if doc.url_eurlex: source_doc["url"] = doc.url_eurlex chunk_id = getattr(doc, "chunk_id", None) chunk_index = getattr(doc, "chunk_index", None) if chunk_id is None and "chunk_id" in metadata: chunk_id = metadata.get("chunk_id") if chunk_index is None and "chunk_index" in metadata: chunk_index = metadata.get("chunk_index") chunk_id_int = _as_optional_int(chunk_id) chunk_index_int = _as_optional_int(chunk_index) if chunk_id_int is not None: source_doc["chunk_id"] = chunk_id_int if chunk_index_int is not None: source_doc["chunk_index"] = chunk_index_int source_kind = getattr(doc, "source_kind", None) if isinstance(source_kind, str) and source_kind: source_doc["source_kind"] = source_kind if include_content_preview: preview = full_content[:content_preview_length] if len(full_content) > content_preview_length: preview += "..." source_doc["content_preview"] = preview if include_snippet: snippet = full_content[:snippet_length] if len(full_content) > snippet_length: snippet += "..." source_doc["snippet"] = snippet if content_used is not None: source_doc["content_used"] = content_used if content_truncated is not None: source_doc["content_truncated"] = content_truncated if source_id is not None: source_doc["source_id"] = source_id if include_relations and not hasattr(doc, "act"): relations = getattr(doc, "relations", None) if relations: source_doc["relations"] = relations if include_subjects and not hasattr(doc, "act"): subjects = getattr(doc, "subjects", None) if subjects: source_doc["subjects"] = subjects return source_doc
[docs] def build_act_context(act: Any) -> Dict[str, Any]: """Build a normalized act context payload.""" data: Dict[str, Any] = { "act_id": act.act_id, "celex": act.celex, "title": act.title or act.celex or "Unknown", "act_type": act.act_type, } if act.url_eurlex: data["url"] = act.url_eurlex adoption_date = getattr(act, "adoption_date", None) if adoption_date is not None: data["adoption_date"] = adoption_date force_date = getattr(act, "force_date", None) if force_date is not None: data["force_date"] = force_date regulatory_level = getattr(act, "regulatory_level", None) if regulatory_level is not None: data["regulatory_level"] = regulatory_level if getattr(act, "relations", None): data["relations"] = act.relations if getattr(act, "subjects", None): data["subjects"] = act.subjects return data
[docs] def validate_citations(answer: Optional[str], source_ids: List[str]) -> Dict[str, Any]: """ Validate that citations in the answer refer to available source IDs. Expected formats: [S1], [G1], [R1], [C1], ... """ available = sorted({sid for sid in source_ids if sid}) has_sources = bool(available) if not answer: return { "available": available, "used": [], "missing": [], "unused": available, "has_sources": has_sources, "has_citations": False, "status": "empty_answer" if has_sources else "ok", "ok": not has_sources, } bracket_blocks = re.findall(r"\[([^\]]+)\]", answer) used_ids: set[str] = set() for block in bracket_blocks: for match in re.findall(r"\b(?:S|G|R|C)\d+\b", block): used_ids.add(match) used: List[str] = sorted(used_ids) missing: List[str] = sorted(set(used) - set(available)) unused: List[str] = sorted(set(available) - set(used)) has_citations = bool(used) ok = (not has_sources and not has_citations) or (has_citations and not missing) if ok: status = "ok" elif has_sources and not has_citations: status = "missing_citations" elif missing: status = "unknown_citations" else: status = "invalid_citations" return { "available": available, "used": used, "missing": missing, "unused": unused, "has_sources": has_sources, "has_citations": has_citations, "status": status, "ok": ok, }
[docs] def collect_act_contexts(docs: List[Any]) -> Dict[str, Dict[str, Any]]: """Collect unique act contexts from a list of context slices.""" acts: Dict[str, Dict[str, Any]] = {} for doc in docs: act = getattr(doc, "act", None) if not act: continue act_id = getattr(act, "act_id", None) if act_id is None: continue act_key = str(act_id) if act_key not in acts: acts[act_key] = build_act_context(act) return acts