"""Adaptive response policy for RAG outputs."""
from __future__ import annotations
import re
from dataclasses import dataclass
from typing import Any, Dict, List, Literal, Optional, cast
from lalandre_core.config import get_config
IntentClass = Literal["conversational", "documentary"]
EvidenceGrade = Literal["none", "weak", "sufficient"]
CitationStatus = Literal["not_applicable", "valid", "repaired", "invalid"]
ResponsePolicyState = Literal["llm_only", "grounded", "weakly_grounded", "clarify", "hard_block"]
_SOURCE_LIST_KEYS = ("documents", "graph_nodes", "graph_edges", "cypher_rows", "community_reports")
_LEGAL_ANCHOR_RE = re.compile(
r"\b("
r"article\s+\d+|art\.\s*\d+|"
r"directive|regulation|r[èe]glement|decision|d[ée]cision|"
r"3\d{4}[A-Z]\d{4}|"
r"\d{4}/\d+/[A-Z]{2,}"
r")\b",
re.IGNORECASE,
)
[docs]
@dataclass(frozen=True)
class ResponsePolicyDecision:
"""Final policy decision for a RAG response."""
state: ResponsePolicyState
reason: str
label: str
intent_class: IntentClass
evidence_grade: EvidenceGrade
citation_status: CitationStatus
can_use_sources: bool
should_run_cypher: bool
clarification_question: Optional[str] = None
def metadata(self) -> Dict[str, Any]:
"""Return the structured metadata payload for the policy decision."""
return {
"response_policy": {
"state": self.state,
"intent_class": self.intent_class,
"evidence_grade": self.evidence_grade,
"citation_status": self.citation_status,
"reason": self.reason,
"label": self.label,
"clarification_question": self.clarification_question,
"can_use_sources": self.can_use_sources,
"should_run_cypher": self.should_run_cypher,
}
}
[docs]
def is_anchored_legal_question(*, question: str, retrieval_profile: Optional[str] = None) -> bool:
"""Return whether the user question is anchored enough for strict fail-closed behavior."""
if retrieval_profile in {"citation_precision", "relationship_focus"}:
return True
return bool(_LEGAL_ANCHOR_RE.search(question))
[docs]
def infer_intent_class(
*,
intent_class: Optional[str],
skip_retrieval: bool = False,
) -> IntentClass:
"""Infer the high-level intent class used by the response policy."""
if intent_class == "conversational" or skip_retrieval:
return "conversational"
return "documentary"
[docs]
def infer_evidence_grade(
*,
has_sources: bool,
crag_meta: Optional[Dict[str, Any]] = None,
) -> EvidenceGrade:
"""Infer evidence strength from retrieval availability and CRAG metadata."""
if not has_sources:
return "none"
if isinstance(crag_meta, dict):
evaluations = crag_meta.get("evaluations")
if isinstance(evaluations, list) and evaluations:
last = evaluations[-1]
if isinstance(last, dict) and last.get("status") in {"PARTIAL", "INSUFFICIENT"}:
return "weak"
return "sufficient"
[docs]
def infer_citation_status(
*,
validation: Optional[Dict[str, Any]],
repaired: bool = False,
) -> CitationStatus:
"""Infer citation validity from the validation payload."""
if not isinstance(validation, dict):
return "not_applicable"
if validation.get("ok"):
return "repaired" if repaired else "valid"
return "invalid"
[docs]
def decide_pre_generation(
*,
intent_class: IntentClass,
evidence_grade: EvidenceGrade,
question: str,
retrieval_profile: Optional[str] = None,
clarification_question: Optional[str] = None,
strict_grounding_requested: bool = False,
) -> ResponsePolicyDecision:
"""Choose the policy branch before answer generation starts."""
anchored = strict_grounding_requested or is_anchored_legal_question(
question=question,
retrieval_profile=retrieval_profile,
)
if intent_class == "conversational":
return ResponsePolicyDecision(
state="llm_only",
reason="conversational",
label="Réponse conversationnelle",
intent_class=intent_class,
evidence_grade="none",
citation_status="not_applicable",
can_use_sources=False,
should_run_cypher=False,
clarification_question=clarification_question,
)
if evidence_grade == "none":
if anchored:
return ResponsePolicyDecision(
state="hard_block",
reason="missing_anchored_evidence",
label="Réponse bloquée",
intent_class=intent_class,
evidence_grade=evidence_grade,
citation_status="not_applicable",
can_use_sources=False,
should_run_cypher=False,
clarification_question=clarification_question,
)
return ResponsePolicyDecision(
state="clarify",
reason="missing_broad_evidence",
label="Précision demandée",
intent_class=intent_class,
evidence_grade=evidence_grade,
citation_status="not_applicable",
can_use_sources=False,
should_run_cypher=False,
clarification_question=clarification_question,
)
if evidence_grade == "weak":
return ResponsePolicyDecision(
state="weakly_grounded",
reason="weak_evidence",
label="Réponse",
intent_class=intent_class,
evidence_grade=evidence_grade,
citation_status="not_applicable",
can_use_sources=True,
should_run_cypher=True,
clarification_question=clarification_question,
)
return ResponsePolicyDecision(
state="grounded",
reason="grounded",
label="Réponse",
intent_class=intent_class,
evidence_grade=evidence_grade,
citation_status="not_applicable",
can_use_sources=True,
should_run_cypher=True,
clarification_question=clarification_question,
)
[docs]
def decide_post_generation(
*,
intent_class: IntentClass,
evidence_grade: EvidenceGrade,
citation_status: CitationStatus,
question: str,
has_sources: bool,
retrieval_profile: Optional[str] = None,
clarification_question: Optional[str] = None,
strict_grounding_requested: bool = False,
) -> ResponsePolicyDecision:
"""Choose the policy branch after generation and citation validation."""
anchored = strict_grounding_requested or is_anchored_legal_question(
question=question,
retrieval_profile=retrieval_profile,
)
if intent_class == "conversational":
return ResponsePolicyDecision(
state="llm_only",
reason="conversational",
label="Réponse conversationnelle",
intent_class=intent_class,
evidence_grade="none",
citation_status="not_applicable",
can_use_sources=False,
should_run_cypher=False,
clarification_question=clarification_question,
)
if evidence_grade == "none":
if anchored:
return ResponsePolicyDecision(
state="hard_block",
reason="missing_anchored_evidence",
label="Réponse bloquée",
intent_class=intent_class,
evidence_grade=evidence_grade,
citation_status=citation_status,
can_use_sources=False,
should_run_cypher=False,
clarification_question=clarification_question,
)
return ResponsePolicyDecision(
state="clarify",
reason="missing_broad_evidence",
label="Précision demandée",
intent_class=intent_class,
evidence_grade=evidence_grade,
citation_status=citation_status,
can_use_sources=False,
should_run_cypher=False,
clarification_question=clarification_question,
)
if citation_status == "invalid" and has_sources:
return ResponsePolicyDecision(
state="weakly_grounded",
reason="invalid_citations",
label="Réponse",
intent_class=intent_class,
evidence_grade="weak",
citation_status=citation_status,
can_use_sources=True,
should_run_cypher=True,
clarification_question=clarification_question,
)
if evidence_grade == "weak":
return ResponsePolicyDecision(
state="weakly_grounded",
reason="weak_evidence",
label="Réponse",
intent_class=intent_class,
evidence_grade=evidence_grade,
citation_status=citation_status,
can_use_sources=True,
should_run_cypher=True,
clarification_question=clarification_question,
)
return ResponsePolicyDecision(
state="grounded",
reason="grounded",
label="Réponse",
intent_class=intent_class,
evidence_grade=evidence_grade,
citation_status=citation_status,
can_use_sources=True,
should_run_cypher=True,
clarification_question=clarification_question,
)
def _extract_preview(doc: Dict[str, Any], *, max_chars: int) -> str:
for key in ("content_used", "snippet", "content_preview", "content", "summary"):
value = doc.get(key)
if isinstance(value, str) and value.strip():
return " ".join(value.split())[:max_chars]
return ""
[docs]
def flatten_policy_sources(sources: Optional[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Flatten every supported source list into one homogeneous sequence."""
if not isinstance(sources, dict):
return []
items: List[Dict[str, Any]] = []
for key in _SOURCE_LIST_KEYS:
value = sources.get(key)
if isinstance(value, list):
items.extend(item for item in value if isinstance(item, dict))
return items
[docs]
def build_clarification_answer(
*,
clarification_question: Optional[str] = None,
) -> str:
"""Build a fail-closed clarification response for underspecified questions."""
if clarification_question:
return (
f"Je peux répondre, mais il me faut une précision pour rester solidement sourcé. {clarification_question}"
)
return (
"Je peux répondre, mais la question est trop large pour garantir une réponse RAG fiable. "
"Précisez le texte visé, le CELEX, l'article ou le point exact à comparer."
)
[docs]
def build_weakly_grounded_answer(
*,
sources: Optional[Dict[str, Any]],
reason: str,
clarification_question: Optional[str] = None,
) -> str:
"""Build a cautious fallback answer from the currently available sources."""
items = flatten_policy_sources(sources)
preview_chars = max(int(get_config().context_budget.fallback_preview_chars), 80)
lines: List[str] = []
if items:
lines.append("Ce que les sources permettent d'établir :")
for item in items[:3]:
source_id = cast(str, item.get("source_id") or "S?")
label = str(item.get("title") or item.get("celex") or item.get("act_id") or source_id)
preview = _extract_preview(item, max_chars=min(preview_chars, 220))
if preview:
lines.append(f"- [{source_id}] {label}: {preview}")
else:
lines.append(f"- [{source_id}] {label}")
else:
lines.append("Aucune source exploitable n'a pu être transformée en synthèse courte.")
if clarification_question:
lines.append(f"Pour aller plus loin: {clarification_question}")
else:
lines.append(
"Ce qui reste incertain: le périmètre exact ou la formulation "
"doit être précisé pour une réponse pleinement fondée."
)
return "\n".join(lines)