"""
Fallback answer builders for degraded-mode responses.
"""
from typing import Any, Dict, List, Optional
from lalandre_core.config import get_config
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from .builder import validate_citations
_NO_SOURCE_MESSAGES: Dict[str, str] = {
"rag": (
"Je ne peux pas répondre en mode RAG sans source exploitable. "
"Reformulez votre question ou utilisez le mode llm_only."
),
"summarize": (
"Je ne peux pas produire ce résumé sans source exploitable. "
"Vérifiez le CELEX, reformulez votre demande ou utilisez le mode llm_only."
),
"compare": (
"Je ne peux pas comparer ces textes sans sources exploitables. "
"Vérifiez les CELEX, reformulez votre demande ou utilisez le mode llm_only."
),
}
_INVALID_CITATION_MESSAGES: Dict[str, str] = {
"rag": (
"Je ne peux pas valider cette réponse RAG car les citations générées "
"ne sont pas fiables. Les sources récupérées restent disponibles."
),
"summarize": (
"Je ne peux pas valider ce résumé car les citations générées ne sont "
"pas fiables. Les sources récupérées restent disponibles."
),
"compare": (
"Je ne peux pas valider cette comparaison car les citations générées "
"ne sont pas fiables. Les sources récupérées restent disponibles."
),
}
_CITATION_FAILURE_DETAILS: Dict[str, str] = {
"empty_answer": "La génération n'a produit aucune réponse exploitable",
"missing_citations": "La réponse générée ne contenait aucune citation exploitable",
"unknown_citations": "Les citations générées ne correspondent pas aux sources récupérées",
"invalid_citations": "Les citations générées restent incohérentes après vérification",
}
_CITATION_REPAIR_PROMPT = ChatPromptTemplate.from_messages(
[
(
"system",
(
"Tu corriges une réponse RAG pour qu'elle soit strictement sourcée. "
"Conserve le sens, ajoute ou corrige les citations "
"[Sx], [Gx], [Rx] ou [Cx], n'invente aucune source, et "
"supprime les affirmations non couvertes par les sources "
"fournies. Retourne uniquement la réponse finale."
),
),
(
"human",
(
"Mode: {mode}\n"
"Question: {question}\n\n"
"Réponse à corriger:\n{draft_answer}\n\n"
"Sources disponibles:\n{sources_block}"
),
),
]
)
_SOURCE_LIST_KEYS = ("documents", "graph_nodes", "graph_edges", "cypher_rows", "community_reports")
def _extract_doc_preview(doc: Dict[str, Any], *, max_chars: Optional[int] = None) -> str:
"""Extract a short content preview from a source document dict."""
if max_chars is None:
max_chars = get_config().context_budget.fallback_preview_chars
for key in ("content_used", "snippet", "content_preview", "content", "summary"):
value = doc.get(key)
if isinstance(value, str) and value.strip():
normalized = " ".join(value.split())
return normalized[:max_chars]
return ""
[docs]
def flatten_source_items(
sources: Optional[Dict[str, Any]],
) -> List[Dict[str, Any]]:
"""Flatten every evidence list carried in a ``sources`` payload."""
if not isinstance(sources, dict):
return []
items: List[Dict[str, Any]] = []
for key in _SOURCE_LIST_KEYS:
value = sources.get(key)
if isinstance(value, list):
items.extend(item for item in value if isinstance(item, dict))
return items
[docs]
def build_retrieval_fallback_answer(
*,
mode: str,
question: str,
documents: List[Dict[str, Any]],
reason: str,
) -> str:
"""Build a user-facing fallback answer when LLM generation fails.
Lists up to 3 retrieved documents so the user still gets value.
"""
lines = [
(f"Mode {mode} en reponse degradee: la generation LLM est indisponible ({reason})."),
]
if documents:
lines.append("Elements recuperes depuis la base documentaire:")
for idx, doc in enumerate(documents[:3], start=1):
label_raw = doc.get("celex") or doc.get("source_id") or doc.get("act_id") or f"doc-{idx}"
label = str(label_raw)
preview = _extract_doc_preview(doc)
if preview:
lines.append(f"{idx}. {label}: {preview}")
else:
lines.append(f"{idx}. {label}")
lines.append("Relancer avec un endpoint LLM valide pour obtenir une synthese complete.")
else:
lines.append(f"Aucune source exploitable n'a pu etre extraite pour: {question[:120]}")
return "\n".join(lines)
[docs]
def build_no_source_blocked_answer(mode: str) -> str:
"""Return the deterministic fail-closed answer for sourced modes."""
return _NO_SOURCE_MESSAGES.get(mode, _NO_SOURCE_MESSAGES["rag"])
[docs]
def build_invalid_citation_blocked_answer(mode: str) -> str:
"""Return the fail-closed answer when sources exist but citations are invalid."""
return _INVALID_CITATION_MESSAGES.get(mode, _INVALID_CITATION_MESSAGES["rag"])
[docs]
def describe_citation_validation_failure(validation: Optional[Dict[str, Any]]) -> str:
"""Return a user-facing explanation for the current citation-validation failure."""
if not isinstance(validation, dict):
return "Les citations n'ont pas pu être réparées de façon fiable"
status = validation.get("status")
if isinstance(status, str) and status in _CITATION_FAILURE_DETAILS:
return _CITATION_FAILURE_DETAILS[status]
return "Les citations n'ont pas pu être réparées de façon fiable"
[docs]
def create_blocked_sourced_response(
*,
mode: str,
query: str,
reason: str,
answer: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
sources: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""Return a fail-closed sourced-mode response, preserving sources when available."""
normalized_sources = normalize_sources_payload(sources)
response_metadata: Dict[str, Any] = {
"blocked_no_sources": normalized_sources is None,
"blocked_reason": reason,
}
if metadata:
response_metadata.update(metadata)
return {
"mode": mode,
"query": query,
"answer": answer
or (
build_invalid_citation_blocked_answer(mode)
if reason == "invalid_citations" and normalized_sources is not None
else build_no_source_blocked_answer(mode)
),
"sources": normalized_sources,
"metadata": response_metadata,
}
[docs]
def normalize_sources_payload(
sources: Optional[Dict[str, Any]],
) -> Optional[Dict[str, Any]]:
"""Normalize empty source payloads to ``None`` and keep non-empty blocks coherent."""
if not isinstance(sources, dict):
return None
normalized: Dict[str, Any] = {}
total = 0
for key in _SOURCE_LIST_KEYS:
value = sources.get(key)
if isinstance(value, list) and value:
normalized[key] = value
total += len(value)
if total <= 0:
return None
normalized["total"] = total
if "documents" not in normalized:
normalized["documents"] = []
acts_raw = sources.get("acts")
if isinstance(acts_raw, dict):
normalized["acts"] = acts_raw
graph_query_raw = sources.get("graph_query")
if isinstance(graph_query_raw, dict):
normalized["graph_query"] = graph_query_raw
return normalized
[docs]
def merge_sources_payload(
base_sources: Optional[Dict[str, Any]],
extra_sources: Optional[Dict[str, Any]],
) -> Optional[Dict[str, Any]]:
"""Merge two source payloads while preserving all evidence families."""
if not isinstance(base_sources, dict):
return normalize_sources_payload(extra_sources)
if not isinstance(extra_sources, dict):
return normalize_sources_payload(base_sources)
merged: Dict[str, Any] = {}
for key in _SOURCE_LIST_KEYS:
base_list = base_sources.get(key)
extra_list = extra_sources.get(key)
combined: List[Dict[str, Any]] = []
if isinstance(base_list, list):
combined.extend(item for item in base_list if isinstance(item, dict))
if isinstance(extra_list, list):
combined.extend(item for item in extra_list if isinstance(item, dict))
if combined:
merged[key] = combined
acts: Dict[str, Any] = {}
for raw in (base_sources.get("acts"), extra_sources.get("acts")):
if isinstance(raw, dict):
acts.update(raw)
if acts:
merged["acts"] = acts
graph_query = extra_sources.get("graph_query")
if not isinstance(graph_query, dict):
graph_query = base_sources.get("graph_query")
if isinstance(graph_query, dict):
merged["graph_query"] = graph_query
return normalize_sources_payload(merged)
[docs]
def repair_citations_once(
*,
mode: str,
question: str,
draft_answer: str,
sources: List[Dict[str, Any]],
llm: Any,
) -> Optional[str]:
"""Try a single citation-repair pass. Returns ``None`` on failure."""
if llm is None or not draft_answer.strip():
return None
source_blocks: List[str] = []
for source_doc in sources:
source_id = source_doc.get("source_id")
if not isinstance(source_id, str) or not source_id:
continue
label = source_doc.get("title") or source_doc.get("celex") or source_doc.get("act_id") or source_id
excerpt = _extract_doc_preview(source_doc, max_chars=800)
if not excerpt:
continue
source_blocks.append(f"[{source_id}] {label}\n{excerpt}")
if not source_blocks:
return None
repaired = (_CITATION_REPAIR_PROMPT | llm | StrOutputParser()).invoke(
{
"mode": mode,
"question": question,
"draft_answer": draft_answer,
"sources_block": "\n\n".join(source_blocks),
}
)
repaired_text = repaired.strip()
return repaired_text or None
[docs]
def enforce_cited_answer(
*,
mode: str,
question: str,
draft_answer: str,
sources: List[Dict[str, Any]],
llm: Any,
) -> Dict[str, Any]:
"""Validate citations without rewriting the draft.
Preserves the streamed answer verbatim so the UI never sees its text
flashed and replaced. Validation results are still returned so callers
can surface citation quality in metadata, but no LLM repair pass runs
and the answer is never blanked out.
"""
del mode, question, llm # kept for signature compatibility
source_ids = extract_source_ids(sources)
validation = validate_citations(draft_answer, source_ids)
return {
"answer": draft_answer,
"validation": validation,
"repaired": False,
"repair_attempted": False,
"blocked": False,
}