Source code for lalandre_rag.response.policy

"""Adaptive response policy for RAG outputs."""

from __future__ import annotations

import re
from dataclasses import dataclass
from typing import Any, Dict, List, Literal, Optional, cast

from lalandre_core.config import get_config

IntentClass = Literal["conversational", "documentary"]
EvidenceGrade = Literal["none", "weak", "sufficient"]
CitationStatus = Literal["not_applicable", "valid", "repaired", "invalid"]
ResponsePolicyState = Literal["llm_only", "grounded", "weakly_grounded", "clarify", "hard_block"]

_SOURCE_LIST_KEYS = ("documents", "graph_nodes", "graph_edges", "cypher_rows", "community_reports")
_LEGAL_ANCHOR_RE = re.compile(
    r"\b("
    r"article\s+\d+|art\.\s*\d+|"
    r"directive|regulation|r[èe]glement|decision|d[ée]cision|"
    r"3\d{4}[A-Z]\d{4}|"
    r"\d{4}/\d+/[A-Z]{2,}"
    r")\b",
    re.IGNORECASE,
)


[docs] @dataclass(frozen=True) class ResponsePolicyDecision: """Final policy decision for a RAG response.""" state: ResponsePolicyState reason: str label: str intent_class: IntentClass evidence_grade: EvidenceGrade citation_status: CitationStatus can_use_sources: bool should_run_cypher: bool clarification_question: Optional[str] = None def metadata(self) -> Dict[str, Any]: """Return the structured metadata payload for the policy decision.""" return { "response_policy": { "state": self.state, "intent_class": self.intent_class, "evidence_grade": self.evidence_grade, "citation_status": self.citation_status, "reason": self.reason, "label": self.label, "clarification_question": self.clarification_question, "can_use_sources": self.can_use_sources, "should_run_cypher": self.should_run_cypher, } }
[docs] def legacy_metadata(self) -> Dict[str, Any]: """Return backward-compatible metadata fields derived from the decision.""" legacy: Dict[str, Any] = {} if self.state == "llm_only": legacy["auto_mode_fallback"] = "llm_only" legacy["auto_mode_fallback_reason"] = self.reason elif self.state in {"clarify", "hard_block"}: legacy["blocked_reason"] = self.reason legacy["blocked_no_sources"] = not self.can_use_sources return legacy
[docs] def infer_intent_class( *, intent_class: Optional[str], skip_retrieval: bool = False, ) -> IntentClass: """Infer the high-level intent class used by the response policy.""" if intent_class == "conversational" or skip_retrieval: return "conversational" return "documentary"
[docs] def infer_evidence_grade( *, has_sources: bool, crag_meta: Optional[Dict[str, Any]] = None, ) -> EvidenceGrade: """Infer evidence strength from retrieval availability and CRAG metadata.""" if not has_sources: return "none" if isinstance(crag_meta, dict): evaluations = crag_meta.get("evaluations") if isinstance(evaluations, list) and evaluations: last = evaluations[-1] if isinstance(last, dict) and last.get("status") in {"PARTIAL", "INSUFFICIENT"}: return "weak" return "sufficient"
[docs] def infer_citation_status( *, validation: Optional[Dict[str, Any]], repaired: bool = False, ) -> CitationStatus: """Infer citation validity from the validation payload.""" if not isinstance(validation, dict): return "not_applicable" if validation.get("ok"): return "repaired" if repaired else "valid" return "invalid"
[docs] def decide_pre_generation( *, intent_class: IntentClass, evidence_grade: EvidenceGrade, question: str, retrieval_profile: Optional[str] = None, clarification_question: Optional[str] = None, strict_grounding_requested: bool = False, ) -> ResponsePolicyDecision: """Choose the policy branch before answer generation starts.""" anchored = strict_grounding_requested or is_anchored_legal_question( question=question, retrieval_profile=retrieval_profile, ) if intent_class == "conversational": return ResponsePolicyDecision( state="llm_only", reason="conversational", label="Réponse conversationnelle", intent_class=intent_class, evidence_grade="none", citation_status="not_applicable", can_use_sources=False, should_run_cypher=False, clarification_question=clarification_question, ) if evidence_grade == "none": if anchored: return ResponsePolicyDecision( state="hard_block", reason="missing_anchored_evidence", label="Réponse bloquée", intent_class=intent_class, evidence_grade=evidence_grade, citation_status="not_applicable", can_use_sources=False, should_run_cypher=False, clarification_question=clarification_question, ) return ResponsePolicyDecision( state="clarify", reason="missing_broad_evidence", label="Précision demandée", intent_class=intent_class, evidence_grade=evidence_grade, citation_status="not_applicable", can_use_sources=False, should_run_cypher=False, clarification_question=clarification_question, ) if evidence_grade == "weak": return ResponsePolicyDecision( state="weakly_grounded", reason="weak_evidence", label="Réponse", intent_class=intent_class, evidence_grade=evidence_grade, citation_status="not_applicable", can_use_sources=True, should_run_cypher=True, clarification_question=clarification_question, ) return ResponsePolicyDecision( state="grounded", reason="grounded", label="Réponse", intent_class=intent_class, evidence_grade=evidence_grade, citation_status="not_applicable", can_use_sources=True, should_run_cypher=True, clarification_question=clarification_question, )
[docs] def decide_post_generation( *, intent_class: IntentClass, evidence_grade: EvidenceGrade, citation_status: CitationStatus, question: str, has_sources: bool, retrieval_profile: Optional[str] = None, clarification_question: Optional[str] = None, strict_grounding_requested: bool = False, ) -> ResponsePolicyDecision: """Choose the policy branch after generation and citation validation.""" anchored = strict_grounding_requested or is_anchored_legal_question( question=question, retrieval_profile=retrieval_profile, ) if intent_class == "conversational": return ResponsePolicyDecision( state="llm_only", reason="conversational", label="Réponse conversationnelle", intent_class=intent_class, evidence_grade="none", citation_status="not_applicable", can_use_sources=False, should_run_cypher=False, clarification_question=clarification_question, ) if evidence_grade == "none": if anchored: return ResponsePolicyDecision( state="hard_block", reason="missing_anchored_evidence", label="Réponse bloquée", intent_class=intent_class, evidence_grade=evidence_grade, citation_status=citation_status, can_use_sources=False, should_run_cypher=False, clarification_question=clarification_question, ) return ResponsePolicyDecision( state="clarify", reason="missing_broad_evidence", label="Précision demandée", intent_class=intent_class, evidence_grade=evidence_grade, citation_status=citation_status, can_use_sources=False, should_run_cypher=False, clarification_question=clarification_question, ) if citation_status == "invalid" and has_sources: return ResponsePolicyDecision( state="weakly_grounded", reason="invalid_citations", label="Réponse", intent_class=intent_class, evidence_grade="weak", citation_status=citation_status, can_use_sources=True, should_run_cypher=True, clarification_question=clarification_question, ) if evidence_grade == "weak": return ResponsePolicyDecision( state="weakly_grounded", reason="weak_evidence", label="Réponse", intent_class=intent_class, evidence_grade=evidence_grade, citation_status=citation_status, can_use_sources=True, should_run_cypher=True, clarification_question=clarification_question, ) return ResponsePolicyDecision( state="grounded", reason="grounded", label="Réponse", intent_class=intent_class, evidence_grade=evidence_grade, citation_status=citation_status, can_use_sources=True, should_run_cypher=True, clarification_question=clarification_question, )
def _extract_preview(doc: Dict[str, Any], *, max_chars: int) -> str: for key in ("content_used", "snippet", "content_preview", "content", "summary"): value = doc.get(key) if isinstance(value, str) and value.strip(): return " ".join(value.split())[:max_chars] return ""
[docs] def flatten_policy_sources(sources: Optional[Dict[str, Any]]) -> List[Dict[str, Any]]: """Flatten every supported source list into one homogeneous sequence.""" if not isinstance(sources, dict): return [] items: List[Dict[str, Any]] = [] for key in _SOURCE_LIST_KEYS: value = sources.get(key) if isinstance(value, list): items.extend(item for item in value if isinstance(item, dict)) return items
[docs] def build_clarification_answer( *, clarification_question: Optional[str] = None, ) -> str: """Build a fail-closed clarification response for underspecified questions.""" if clarification_question: return ( f"Je peux répondre, mais il me faut une précision pour rester solidement sourcé. {clarification_question}" ) return ( "Je peux répondre, mais la question est trop large pour garantir une réponse RAG fiable. " "Précisez le texte visé, le CELEX, l'article ou le point exact à comparer." )
[docs] def build_weakly_grounded_answer( *, sources: Optional[Dict[str, Any]], reason: str, clarification_question: Optional[str] = None, ) -> str: """Build a cautious fallback answer from the currently available sources.""" items = flatten_policy_sources(sources) preview_chars = max(int(get_config().context_budget.fallback_preview_chars), 80) lines: List[str] = [] if items: lines.append("Ce que les sources permettent d'établir :") for item in items[:3]: source_id = cast(str, item.get("source_id") or "S?") label = str(item.get("title") or item.get("celex") or item.get("act_id") or source_id) preview = _extract_preview(item, max_chars=min(preview_chars, 220)) if preview: lines.append(f"- [{source_id}] {label}: {preview}") else: lines.append(f"- [{source_id}] {label}") else: lines.append("Aucune source exploitable n'a pu être transformée en synthèse courte.") if clarification_question: lines.append(f"Pour aller plus loin: {clarification_question}") else: lines.append( "Ce qui reste incertain: le périmètre exact ou la formulation " "doit être précisé pour une réponse pleinement fondée." ) return "\n".join(lines)