"""
Response Builder
Centralized builder for unified response format across all RAG modes
"""
import re
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, cast
from lalandre_core.config import get_config
from pydantic import BaseModel, Field, model_validator
from lalandre_rag.retrieval.trace import extract_trace
from lalandre_rag.scoring import build_relevance_score_payload
SourceDoc = Dict[str, Any]
MetadataDict = Dict[str, Any]
ActsDict = Dict[str, Any]
[docs]
class SourcesBlock(BaseModel):
"""Validated sources block in a RAG response."""
total: int = 0
documents: List[Dict[str, Any]] = Field(default_factory=list)
acts: Dict[str, Any] = Field(default_factory=dict)
[docs]
@model_validator(mode="after")
def sync_total(self) -> "SourcesBlock":
"""Keep ``total`` aligned with the number of document sources."""
if self.total != len(self.documents):
object.__setattr__(self, "total", len(self.documents))
return self
[docs]
class RAGResponse(BaseModel):
"""Validated unified response format for all RAG modes."""
mode: str
query: str
answer: Optional[str] = None
sources: SourcesBlock = Field(default_factory=SourcesBlock)
metadata: Dict[str, Any] = Field(default_factory=dict)
def _empty_sources() -> List[SourceDoc]:
return []
def _empty_metadata() -> MetadataDict:
return {}
def _empty_acts() -> ActsDict:
return {}
def _coerce_metadata(value: Any) -> MetadataDict:
if isinstance(value, dict):
return cast(MetadataDict, value)
return {}
def _as_optional_int(value: Any) -> int | None:
return value if isinstance(value, int) else None
[docs]
@dataclass
class ResponseBuilder:
"""
Builder for the unified response format
Usage:
builder = ResponseBuilder(mode="search", query="test")
builder.set_answer(None)
builder.set_sources([{"celex": "123", "title": "Test"}])
builder.add_metadata("warning", "Test warning")
response = builder.build()
"""
mode: str
query: str
_answer: Optional[str] = None
_sources: List[SourceDoc] = field(default_factory=_empty_sources)
_metadata: MetadataDict = field(default_factory=_empty_metadata)
_acts: ActsDict = field(default_factory=_empty_acts)
[docs]
def set_answer(self, answer: Optional[str]) -> "ResponseBuilder":
"""Set the generated answer."""
self._answer = answer
return self
[docs]
def set_sources(self, documents: List[Dict[str, Any]]) -> "ResponseBuilder":
"""Replace all source documents."""
self._sources = documents
return self
[docs]
def set_acts(self, acts: Dict[str, Any]) -> "ResponseBuilder":
"""Replace all act contexts."""
self._acts = acts
return self
[docs]
def build(self) -> Dict[str, Any]:
"""
Build the response in the unified format.
Returns:
Dict with the validated unified format.
"""
return RAGResponse(
mode=self.mode,
query=self.query,
answer=self._answer,
sources=SourcesBlock(
total=len(self._sources),
documents=self._sources,
acts=self._acts,
),
metadata=self._metadata,
).model_dump()
[docs]
def build_source_trace(metadata: Optional[Dict[str, Any]]) -> Dict[str, Any]:
"""
Extract a compact, traceable subset of metadata for sources.
Avoids duplicating large payload fields while preserving retrieval provenance.
"""
return extract_trace(metadata)
[docs]
def build_source_document(
doc: Any,
*,
include_relations: bool = False,
include_subjects: bool = False,
include_full_content: bool = True,
include_content_preview: bool = False,
content_preview_length: Optional[int] = None,
include_snippet: bool = False,
snippet_length: Optional[int] = None,
content_used: Optional[str] = None,
content_truncated: Optional[bool] = None,
source_id: Optional[str] = None,
) -> Dict[str, Any]:
"""
Build a standardized source document payload from a context slice.
This keeps response formatting consistent across RAG modes while
reusing upstream metadata where possible.
"""
budget_cfg = get_config().context_budget
if content_preview_length is None:
content_preview_length = budget_cfg.content_preview_chars
if snippet_length is None:
snippet_length = budget_cfg.snippet_preview_chars
full_content = doc.content or ""
content = full_content if include_full_content else ""
trace_input = getattr(doc, "trace", None)
metadata: MetadataDict
source_doc: SourceDoc
if hasattr(doc, "act") and hasattr(doc, "doc"):
act = doc.act
doc_meta = doc.doc
metadata = _coerce_metadata(getattr(doc_meta, "payload", None))
trace_metadata = _coerce_metadata(trace_input)
trace_source: MetadataDict | None = trace_metadata if trace_metadata else (metadata if metadata else None)
source_doc = {
"act_id": act.act_id,
"celex": act.celex,
"title": act.title,
"act_type": act.act_type,
"subdivision_id": doc_meta.subdivision_id,
"subdivision_type": doc_meta.subdivision_type,
"sequence_order": doc_meta.sequence_order,
"content": content,
"content_length": len(full_content),
"trace": build_source_trace(trace_source),
**build_relevance_score_payload(doc.score),
}
if act.url_eurlex:
source_doc["url"] = act.url_eurlex
if doc_meta.chunk_id is not None:
source_doc["chunk_id"] = doc_meta.chunk_id
if doc_meta.chunk_index is not None:
source_doc["chunk_index"] = doc_meta.chunk_index
if doc_meta.source_kind:
source_doc["source_kind"] = doc_meta.source_kind
else:
metadata = _coerce_metadata(getattr(doc, "metadata", None))
trace_metadata = _coerce_metadata(trace_input)
trace_source = trace_metadata if trace_metadata else (metadata if metadata else None)
source_doc = {
"celex": doc.celex,
"title": doc.title or doc.celex or "Unknown",
"act_id": doc.act_id,
"act_type": doc.act_type,
"subdivision_id": doc.subdivision_id,
"subdivision_type": doc.subdivision_type,
"sequence_order": doc.sequence_order,
"content": content,
"content_length": len(full_content),
"trace": build_source_trace(trace_source),
**build_relevance_score_payload(doc.score),
}
if doc.url_eurlex:
source_doc["url"] = doc.url_eurlex
chunk_id = getattr(doc, "chunk_id", None)
chunk_index = getattr(doc, "chunk_index", None)
if chunk_id is None and "chunk_id" in metadata:
chunk_id = metadata.get("chunk_id")
if chunk_index is None and "chunk_index" in metadata:
chunk_index = metadata.get("chunk_index")
chunk_id_int = _as_optional_int(chunk_id)
chunk_index_int = _as_optional_int(chunk_index)
if chunk_id_int is not None:
source_doc["chunk_id"] = chunk_id_int
if chunk_index_int is not None:
source_doc["chunk_index"] = chunk_index_int
source_kind = getattr(doc, "source_kind", None)
if isinstance(source_kind, str) and source_kind:
source_doc["source_kind"] = source_kind
if include_content_preview:
preview = full_content[:content_preview_length]
if len(full_content) > content_preview_length:
preview += "..."
source_doc["content_preview"] = preview
if include_snippet:
snippet = full_content[:snippet_length]
if len(full_content) > snippet_length:
snippet += "..."
source_doc["snippet"] = snippet
if content_used is not None:
source_doc["content_used"] = content_used
if content_truncated is not None:
source_doc["content_truncated"] = content_truncated
if source_id is not None:
source_doc["source_id"] = source_id
if include_relations and not hasattr(doc, "act"):
relations = getattr(doc, "relations", None)
if relations:
source_doc["relations"] = relations
if include_subjects and not hasattr(doc, "act"):
subjects = getattr(doc, "subjects", None)
if subjects:
source_doc["subjects"] = subjects
return source_doc
[docs]
def build_act_context(act: Any) -> Dict[str, Any]:
"""Build a normalized act context payload."""
data: Dict[str, Any] = {
"act_id": act.act_id,
"celex": act.celex,
"title": act.title or act.celex or "Unknown",
"act_type": act.act_type,
}
if act.url_eurlex:
data["url"] = act.url_eurlex
adoption_date = getattr(act, "adoption_date", None)
if adoption_date is not None:
data["adoption_date"] = adoption_date
force_date = getattr(act, "force_date", None)
if force_date is not None:
data["force_date"] = force_date
regulatory_level = getattr(act, "regulatory_level", None)
if regulatory_level is not None:
data["regulatory_level"] = regulatory_level
if getattr(act, "relations", None):
data["relations"] = act.relations
if getattr(act, "subjects", None):
data["subjects"] = act.subjects
return data
[docs]
def validate_citations(answer: Optional[str], source_ids: List[str]) -> Dict[str, Any]:
"""
Validate that citations in the answer refer to available source IDs.
Expected formats: [S1], [G1], [R1], [C1], ...
"""
available = sorted({sid for sid in source_ids if sid})
has_sources = bool(available)
if not answer:
return {
"available": available,
"used": [],
"missing": [],
"unused": available,
"has_sources": has_sources,
"has_citations": False,
"status": "empty_answer" if has_sources else "ok",
"ok": not has_sources,
}
bracket_blocks = re.findall(r"\[([^\]]+)\]", answer)
used_ids: set[str] = set()
for block in bracket_blocks:
for match in re.findall(r"\b(?:S|G|R|C)\d+\b", block):
used_ids.add(match)
used: List[str] = sorted(used_ids)
missing: List[str] = sorted(set(used) - set(available))
unused: List[str] = sorted(set(available) - set(used))
has_citations = bool(used)
ok = (not has_sources and not has_citations) or (has_citations and not missing)
if ok:
status = "ok"
elif has_sources and not has_citations:
status = "missing_citations"
elif missing:
status = "unknown_citations"
else:
status = "invalid_citations"
return {
"available": available,
"used": used,
"missing": missing,
"unused": unused,
"has_sources": has_sources,
"has_citations": has_citations,
"status": status,
"ok": ok,
}
[docs]
def collect_act_contexts(docs: List[Any]) -> Dict[str, Dict[str, Any]]:
"""Collect unique act contexts from a list of context slices."""
acts: Dict[str, Dict[str, Any]] = {}
for doc in docs:
act = getattr(doc, "act", None)
if not act:
continue
act_id = getattr(act, "act_id", None)
if act_id is None:
continue
act_key = str(act_id)
if act_key not in acts:
acts[act_key] = build_act_context(act)
return acts