Source code for lalandre_rag.response.fallbacks

"""
Fallback answer builders for degraded-mode responses.
"""

from typing import Any, Dict, List, Optional

from lalandre_core.config import get_config
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate

from .builder import validate_citations

_NO_SOURCE_MESSAGES: Dict[str, str] = {
    "rag": (
        "Je ne peux pas répondre en mode RAG sans source exploitable. "
        "Reformulez votre question ou utilisez le mode llm_only."
    ),
    "summarize": (
        "Je ne peux pas produire ce résumé sans source exploitable. "
        "Vérifiez le CELEX, reformulez votre demande ou utilisez le mode llm_only."
    ),
    "compare": (
        "Je ne peux pas comparer ces textes sans sources exploitables. "
        "Vérifiez les CELEX, reformulez votre demande ou utilisez le mode llm_only."
    ),
}

_INVALID_CITATION_MESSAGES: Dict[str, str] = {
    "rag": (
        "Je ne peux pas valider cette réponse RAG car les citations générées "
        "ne sont pas fiables. Les sources récupérées restent disponibles."
    ),
    "summarize": (
        "Je ne peux pas valider ce résumé car les citations générées ne sont "
        "pas fiables. Les sources récupérées restent disponibles."
    ),
    "compare": (
        "Je ne peux pas valider cette comparaison car les citations générées "
        "ne sont pas fiables. Les sources récupérées restent disponibles."
    ),
}

_CITATION_FAILURE_DETAILS: Dict[str, str] = {
    "empty_answer": "La génération n'a produit aucune réponse exploitable",
    "missing_citations": "La réponse générée ne contenait aucune citation exploitable",
    "unknown_citations": "Les citations générées ne correspondent pas aux sources récupérées",
    "invalid_citations": "Les citations générées restent incohérentes après vérification",
}

_CITATION_REPAIR_PROMPT = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            (
                "Tu corriges une réponse RAG pour qu'elle soit strictement sourcée. "
                "Conserve le sens, ajoute ou corrige les citations "
                "[Sx], [Gx], [Rx] ou [Cx], n'invente aucune source, et "
                "supprime les affirmations non couvertes par les sources "
                "fournies. Retourne uniquement la réponse finale."
            ),
        ),
        (
            "human",
            (
                "Mode: {mode}\n"
                "Question: {question}\n\n"
                "Réponse à corriger:\n{draft_answer}\n\n"
                "Sources disponibles:\n{sources_block}"
            ),
        ),
    ]
)

_SOURCE_LIST_KEYS = ("documents", "graph_nodes", "graph_edges", "cypher_rows", "community_reports")


def _extract_doc_preview(doc: Dict[str, Any], *, max_chars: Optional[int] = None) -> str:
    """Extract a short content preview from a source document dict."""
    if max_chars is None:
        max_chars = get_config().context_budget.fallback_preview_chars
    for key in ("content_used", "snippet", "content_preview", "content", "summary"):
        value = doc.get(key)
        if isinstance(value, str) and value.strip():
            normalized = " ".join(value.split())
            return normalized[:max_chars]
    return ""


[docs] def flatten_source_items( sources: Optional[Dict[str, Any]], ) -> List[Dict[str, Any]]: """Flatten every evidence list carried in a ``sources`` payload.""" if not isinstance(sources, dict): return [] items: List[Dict[str, Any]] = [] for key in _SOURCE_LIST_KEYS: value = sources.get(key) if isinstance(value, list): items.extend(item for item in value if isinstance(item, dict)) return items
[docs] def build_retrieval_fallback_answer( *, mode: str, question: str, documents: List[Dict[str, Any]], reason: str, ) -> str: """Build a user-facing fallback answer when LLM generation fails. Lists up to 3 retrieved documents so the user still gets value. """ lines = [ (f"Mode {mode} en reponse degradee: la generation LLM est indisponible ({reason})."), ] if documents: lines.append("Elements recuperes depuis la base documentaire:") for idx, doc in enumerate(documents[:3], start=1): label_raw = doc.get("celex") or doc.get("source_id") or doc.get("act_id") or f"doc-{idx}" label = str(label_raw) preview = _extract_doc_preview(doc) if preview: lines.append(f"{idx}. {label}: {preview}") else: lines.append(f"{idx}. {label}") lines.append("Relancer avec un endpoint LLM valide pour obtenir une synthese complete.") else: lines.append(f"Aucune source exploitable n'a pu etre extraite pour: {question[:120]}") return "\n".join(lines)
[docs] def build_no_source_blocked_answer(mode: str) -> str: """Return the deterministic fail-closed answer for sourced modes.""" return _NO_SOURCE_MESSAGES.get(mode, _NO_SOURCE_MESSAGES["rag"])
[docs] def build_invalid_citation_blocked_answer(mode: str) -> str: """Return the fail-closed answer when sources exist but citations are invalid.""" return _INVALID_CITATION_MESSAGES.get(mode, _INVALID_CITATION_MESSAGES["rag"])
[docs] def describe_citation_validation_failure(validation: Optional[Dict[str, Any]]) -> str: """Return a user-facing explanation for the current citation-validation failure.""" if not isinstance(validation, dict): return "Les citations n'ont pas pu être réparées de façon fiable" status = validation.get("status") if isinstance(status, str) and status in _CITATION_FAILURE_DETAILS: return _CITATION_FAILURE_DETAILS[status] return "Les citations n'ont pas pu être réparées de façon fiable"
[docs] def create_blocked_sourced_response( *, mode: str, query: str, reason: str, answer: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None, sources: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """Return a fail-closed sourced-mode response, preserving sources when available.""" normalized_sources = normalize_sources_payload(sources) response_metadata: Dict[str, Any] = { "blocked_no_sources": normalized_sources is None, "blocked_reason": reason, } if metadata: response_metadata.update(metadata) return { "mode": mode, "query": query, "answer": answer or ( build_invalid_citation_blocked_answer(mode) if reason == "invalid_citations" and normalized_sources is not None else build_no_source_blocked_answer(mode) ), "sources": normalized_sources, "metadata": response_metadata, }
[docs] def normalize_sources_payload( sources: Optional[Dict[str, Any]], ) -> Optional[Dict[str, Any]]: """Normalize empty source payloads to ``None`` and keep non-empty blocks coherent.""" if not isinstance(sources, dict): return None normalized: Dict[str, Any] = {} total = 0 for key in _SOURCE_LIST_KEYS: value = sources.get(key) if isinstance(value, list) and value: normalized[key] = value total += len(value) if total <= 0: return None normalized["total"] = total if "documents" not in normalized: normalized["documents"] = [] acts_raw = sources.get("acts") if isinstance(acts_raw, dict): normalized["acts"] = acts_raw graph_query_raw = sources.get("graph_query") if isinstance(graph_query_raw, dict): normalized["graph_query"] = graph_query_raw return normalized
[docs] def merge_sources_payload( base_sources: Optional[Dict[str, Any]], extra_sources: Optional[Dict[str, Any]], ) -> Optional[Dict[str, Any]]: """Merge two source payloads while preserving all evidence families.""" if not isinstance(base_sources, dict): return normalize_sources_payload(extra_sources) if not isinstance(extra_sources, dict): return normalize_sources_payload(base_sources) merged: Dict[str, Any] = {} for key in _SOURCE_LIST_KEYS: base_list = base_sources.get(key) extra_list = extra_sources.get(key) combined: List[Dict[str, Any]] = [] if isinstance(base_list, list): combined.extend(item for item in base_list if isinstance(item, dict)) if isinstance(extra_list, list): combined.extend(item for item in extra_list if isinstance(item, dict)) if combined: merged[key] = combined acts: Dict[str, Any] = {} for raw in (base_sources.get("acts"), extra_sources.get("acts")): if isinstance(raw, dict): acts.update(raw) if acts: merged["acts"] = acts graph_query = extra_sources.get("graph_query") if not isinstance(graph_query, dict): graph_query = base_sources.get("graph_query") if isinstance(graph_query, dict): merged["graph_query"] = graph_query return normalize_sources_payload(merged)
[docs] def extract_source_ids(sources: List[Dict[str, Any]]) -> List[str]: """Collect available source IDs from a source-doc list.""" source_ids: List[str] = [] for source_doc in sources: source_id = source_doc.get("source_id") if isinstance(source_id, str) and source_id: source_ids.append(source_id) return source_ids
[docs] def repair_citations_once( *, mode: str, question: str, draft_answer: str, sources: List[Dict[str, Any]], llm: Any, ) -> Optional[str]: """Try a single citation-repair pass. Returns ``None`` on failure.""" if llm is None or not draft_answer.strip(): return None source_blocks: List[str] = [] for source_doc in sources: source_id = source_doc.get("source_id") if not isinstance(source_id, str) or not source_id: continue label = source_doc.get("title") or source_doc.get("celex") or source_doc.get("act_id") or source_id excerpt = _extract_doc_preview(source_doc, max_chars=800) if not excerpt: continue source_blocks.append(f"[{source_id}] {label}\n{excerpt}") if not source_blocks: return None repaired = (_CITATION_REPAIR_PROMPT | llm | StrOutputParser()).invoke( { "mode": mode, "question": question, "draft_answer": draft_answer, "sources_block": "\n\n".join(source_blocks), } ) repaired_text = repaired.strip() return repaired_text or None
[docs] def enforce_cited_answer( *, mode: str, question: str, draft_answer: str, sources: List[Dict[str, Any]], llm: Any, ) -> Dict[str, Any]: """Validate citations without rewriting the draft. Preserves the streamed answer verbatim so the UI never sees its text flashed and replaced. Validation results are still returned so callers can surface citation quality in metadata, but no LLM repair pass runs and the answer is never blanked out. """ del mode, question, llm # kept for signature compatibility source_ids = extract_source_ids(sources) validation = validate_citations(draft_answer, source_ids) return { "answer": draft_answer, "validation": validation, "repaired": False, "repair_attempted": False, "blocked": False, }