Source code for lalandre_rag.modes.hybrid_generation

"""
Generation strategies for HybridMode — standard and global modes, QA chain execution.

Extracted from hybrid_mode.py to keep the orchestrator focused on the pipeline.
"""

import asyncio
import concurrent.futures
import logging
import time
from dataclasses import dataclass
from typing import Any, Callable, Dict, Iterator, List, Optional, cast

from lalandre_core.config import get_config
from lalandre_core.linking import LegalEntityLinker
from langchain_core.messages import BaseMessage
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate

from lalandre_rag.citation_sanitizer import normalize_citation_tags
from lalandre_rag.graph.map_reduce import map_reduce_generate, should_use_map_reduce
from lalandre_rag.prose_linker import link_prose
from lalandre_rag.prose_rewriter import rewrite_to_prose
from lalandre_rag.retrieval.context.community_reports import (
    CommunityReport,
    CommunityReportBuilder,
)
from lalandre_rag.retrieval.context.models import ContextSlice

from ..response import (
    build_sources,
    build_weakly_grounded_answer,
    collect_act_contexts,
    create_blocked_sourced_response,
    create_rag_response,
    describe_citation_validation_failure,
    enforce_cited_answer,
    normalize_sources_payload,
)
from .hybrid_helpers import (
    GraphFetchResult,
    ProgressCallback,
    attach_citation_validation,
    build_ranked_graph_context,
    build_relation_summary,
    build_source_context,
    emit_progress,
    format_reports_block,
)

logger = logging.getLogger(__name__)


def _apply_prose_linker(
    answer: str,
    entity_linker: Optional[LegalEntityLinker],
    *,
    allowed_act_ids: Optional[set[int]] = None,
    external_detector: Optional[Callable[[str], Any]] = None,
) -> str:
    """Best-effort prose linking: never raise, always return valid text.

    When ``allowed_act_ids`` is provided, only links to these acts are emitted
    so the UI never promises a clickable reference that has no RAG chunk
    behind it. When ``external_detector`` is provided, its detections are
    merged with the regex+fuzzy ones (e.g. NER service spans).
    """
    if entity_linker is None or not answer:
        return answer
    try:
        return link_prose(
            answer,
            entity_linker,
            allowed_act_ids=allowed_act_ids,
            external_detector=external_detector,
        )
    except Exception:
        logger.warning("prose_linker post-processing failed; keeping plain answer", exc_info=True)
        return answer


def _apply_prose_rewriter(answer: str, llm: Any) -> str:
    """Best-effort prose rewriting: never raise, always return valid text.

    Uses the lightweight LLM to turn bullet-heavy answers into flowing prose,
    preserving citation tags verbatim. Falls back to the original answer on
    any issue (LLM failure, tag drift, length drift, missing prompt).
    """
    if llm is None or not answer:
        return answer
    try:
        return rewrite_to_prose(answer, llm)
    except Exception:
        logger.warning("prose_rewriter post-processing failed; keeping plain answer", exc_info=True)
        return answer


def _finalize_rag_answer(
    *,
    question: str,
    raw_answer: str,
    streamed_answer: str,
    llm: Any,
    lightweight_llm: Any,
    entity_linker: Optional[LegalEntityLinker],
    external_detector: Optional[Callable[[str], Any]],
    source_artifacts: "SourceArtifacts",
    graph_node_refs: List[Dict[str, Any]],
    relationship_refs: List[Dict[str, Any]],
    final_answer_callback: "FinalAnswerCallback",
    progress_callback: "ProgressCallback",
    phase_timings_ms: Dict[str, float],
) -> tuple[Dict[str, Any], bool]:
    """Run the shared post-LLM pipeline (citations / rewrite / link / response).

    Returns ``(response, blocked)``. The standard and global generation modes
    consume this and only need to attach their mode-specific metadata afterwards
    (context_budget, community_context, global_context, etc.).
    """
    sources = source_artifacts.documents
    acts = source_artifacts.acts
    validation_sources = source_artifacts.validation_sources

    emit_progress(
        progress_callback,
        phase="citations",
        status="active",
        label="Vérification des citations",
        detail="Contrôle des preuves citées dans la réponse",
    )
    enforcement = enforce_cited_answer(
        mode="rag",
        question=question,
        draft_answer=raw_answer,
        sources=validation_sources,
        llm=llm,
    )
    citation_validation = cast(Dict[str, Any], enforcement["validation"])
    citation_meta = {
        "validation": citation_validation,
        "repaired": bool(enforcement["repaired"]),
        "repair_attempted": bool(enforcement["repair_attempted"]),
        "blocked": bool(enforcement["blocked"]),
    }

    if enforcement["blocked"]:
        blocked_detail = describe_citation_validation_failure(citation_validation)
        blocked_answer = build_weakly_grounded_answer(
            sources=source_artifacts.payload,
            reason="invalid_citations",
        )
        if final_answer_callback is not None and blocked_answer != streamed_answer:
            final_answer_callback(blocked_answer)
        emit_progress(
            progress_callback,
            phase="citations",
            status="done",
            label="Réponse",
            detail=blocked_detail,
            meta=citation_meta,
        )
        weak_response = create_rag_response(
            query=question,
            answer=blocked_answer,
            documents=sources,
            context_summary=None,
            acts=acts,
        )
        _merge_graph_evidence_sources(
            response=weak_response,
            documents=sources,
            graph_node_refs=graph_node_refs,
            relationship_refs=relationship_refs,
        )
        weak_response["metadata"].update(
            {
                "citation_validation": citation_validation,
                "citation_failure_detail": blocked_detail,
                "citation_repair_attempted": bool(enforcement["repair_attempted"]),
                "citation_repaired": bool(enforcement["repaired"]),
                "phase_timings_ms": dict(phase_timings_ms),
            }
        )
        return weak_response, True

    answer = cast(str, enforcement["answer"])
    answer = normalize_citation_tags(answer)
    answer = _apply_prose_rewriter(answer, lightweight_llm)
    # No allowed_act_ids filter: every act resolved by the linker (regex or
    # NER) becomes clickable, even if no chunk/graph evidence backs it in
    # *this* response. The side panel falls back to act-level metadata + a
    # library shortcut via ``NoChunkFallback`` when a chunk is absent.
    # The strict "every link maps to a chunk" guarantee was relaxed because
    # users want full clickability on any regulatory mention.
    answer = _apply_prose_linker(
        answer,
        entity_linker,
        allowed_act_ids=None,
        external_detector=external_detector,
    )
    if final_answer_callback is not None and answer != streamed_answer:
        final_answer_callback(answer)

    validation_ok = bool(citation_validation.get("ok"))
    emit_progress(
        progress_callback,
        phase="citations",
        status="done",
        label="Citations validées" if validation_ok else "Citations vérifiées",
        detail=(
            "Toutes les affirmations restent ancrées dans des preuves"
            if validation_ok
            else "Certaines citations ne sont pas ancrées sur une source retenue"
        ),
        count=len(citation_validation.get("used") or []),
        meta=citation_meta,
    )

    response = create_rag_response(
        query=question,
        answer=answer,
        documents=sources,
        context_summary=None,
        acts=acts,
    )
    _merge_graph_evidence_sources(
        response=response,
        documents=sources,
        graph_node_refs=graph_node_refs,
        relationship_refs=relationship_refs,
    )
    attach_citation_validation(response=response, answer=answer, sources=validation_sources)
    response["metadata"]["citation_repair_attempted"] = bool(enforcement["repair_attempted"])
    response["metadata"]["citation_repaired"] = bool(enforcement["repaired"])
    return response, False


TokenCallback = Optional[Callable[[str], None]]
PreambleCallback = Optional[Callable[[Optional[Dict[str, Any]], Dict[str, Any]], None]]
FinalAnswerCallback = Optional[Callable[[str], None]]


[docs] @dataclass class SourceArtifacts: """Prepared source payloads shared by sync and streaming generation paths.""" acts: Dict[str, Any] documents: List[Dict[str, Any]] validation_sources: List[Dict[str, Any]] payload: Optional[Dict[str, Any]] build_ms: float
def _merge_graph_evidence_sources( *, response: Dict[str, Any], documents: List[Dict[str, Any]], graph_node_refs: List[Dict[str, Any]], relationship_refs: List[Dict[str, Any]], ) -> None: """Attach graph evidence blocks to an existing RAG response.""" sources = cast(Dict[str, Any], response.get("sources") or {}) sources["total"] = len(documents) + len(graph_node_refs) + len(relationship_refs) if graph_node_refs: sources["graph_nodes"] = graph_node_refs if relationship_refs: sources["graph_edges"] = relationship_refs response["sources"] = sources def _build_community_source_items( reports: List[CommunityReport], ) -> List[Dict[str, Any]]: """Convert community reports into source items for the frontend.""" items: List[Dict[str, Any]] = [] for report in reports: celexes = ", ".join(report.celexes[:5]) pivots = ", ".join(f"{a['celex']}" for a in report.central_acts[:3]) content = f"{report.summary}\nActes centraux: {pivots}" if pivots else report.summary items.append( { "source_id": report.community_id, "source_kind": "community_report", "title": f"Communauté {report.community_id}", "subdivision_type": "COMMUNITY", "content_used": content, "content_preview": f"{celexes}{report.summary[:200]}", "relation_count": report.relation_count, "act_count": len(report.act_ids), } ) return items def _build_source_artifacts( *, context_slices: List[ContextSlice], source_refs: List[Dict[str, Any]], include_relations: bool, include_subjects: bool, include_full_content: bool, return_sources: bool, graph_node_refs: List[Dict[str, Any]], relationship_refs: List[Dict[str, Any]], community_reports: Optional[List[CommunityReport]] = None, cypher_documents: Optional[List[Dict[str, Any]]] = None, cypher_query_meta: Optional[Dict[str, Any]] = None, ) -> SourceArtifacts: """Assemble source payloads once so sync and streaming modes stay aligned.""" sources_build_started_at = time.perf_counter() acts = collect_act_contexts(context_slices) if return_sources else {} documents = ( build_sources( refs=source_refs, include_relations=include_relations, include_subjects=include_subjects, include_full_content=include_full_content, ) if return_sources else [] ) community_items = _build_community_source_items(community_reports) if community_reports else [] cypher_items: List[Dict[str, Any]] = list(cypher_documents) if cypher_documents else [] payload = normalize_sources_payload( { "documents": documents, "acts": acts, "graph_nodes": graph_node_refs, "graph_edges": relationship_refs, "cypher_rows": cypher_items if cypher_items else None, "graph_query": cypher_query_meta if cypher_query_meta else None, "community_reports": community_items if community_items else None, } ) validation_sources = [*documents, *graph_node_refs, *relationship_refs, *cypher_items] return SourceArtifacts( acts=acts, documents=documents, validation_sources=validation_sources, payload=payload, build_ms=(time.perf_counter() - sources_build_started_at) * 1000.0, ) def _format_cypher_block(cypher_documents: Optional[List[Dict[str, Any]]]) -> str: """Render the Cypher rows as a citation-prefixed block for the LLM context. Each row is exposed as ``[Cx] ...`` so the LLM can ground assertions on Cypher results just like it would on any other native source. """ if not cypher_documents: return "" lines: List[str] = ["=== Résultats Cypher (relations graphe Neo4j) ==="] for item in cypher_documents: source_id = str(item.get("source_id") or "").strip() if not source_id: continue title = str(item.get("title") or source_id).strip() body = item.get("content_used") or item.get("content_preview") or item.get("content") or "" body = str(body).strip() if body: lines.append(f"[{source_id}] {title}{body}") else: lines.append(f"[{source_id}] {title}") if len(lines) == 1: return "" return "\n".join(lines) def _stream_or_invoke_rag_answer( *, use_map_reduce: bool, context: str, question: str, llm: Any, rag_prompt: ChatPromptTemplate, chat_history: Optional[List[BaseMessage]], token_callback: TokenCallback, ) -> str: """Generate an answer either via live streaming or via the blocking path.""" if use_map_reduce: with concurrent.futures.ThreadPoolExecutor( max_workers=min(1, get_config().search.max_parallel_workers) ) as pool: return pool.submit( asyncio.run, map_reduce_generate(context=context, question=question, llm=llm), ).result() if token_callback is None: return run_rag_chain( rag_prompt=rag_prompt, llm=llm, question=question, context=context, chat_history=chat_history, ) streamed_chunks: List[str] = [] for chunk in stream_rag_chain( rag_prompt=rag_prompt, llm=llm, question=question, context=context, chat_history=chat_history, ): text = str(chunk) if not text: continue streamed_chunks.append(text) token_callback(text) return "".join(streamed_chunks) # ── QA chain execution ───────────────────────────────────────────────────
[docs] def run_rag_chain( *, rag_prompt: ChatPromptTemplate, llm: Any, question: str, context: str, graph_context: str = "", chat_history: Optional[List[BaseMessage]] = None, ) -> str: """Run the blocking QA chain and return the generated answer text.""" rag_chain = cast(Any, rag_prompt | llm | StrOutputParser()) invoke_input: Dict[str, Any] = { "context": context, "graph_context": graph_context, "question": question, } if chat_history: invoke_input["chat_history"] = chat_history answer: str = rag_chain.invoke(invoke_input) return answer
[docs] def stream_rag_chain( *, rag_prompt: ChatPromptTemplate, llm: Any, question: str, context: str, graph_context: str = "", chat_history: Optional[List[BaseMessage]] = None, ) -> Iterator[str]: """Stream answer chunks from the QA chain.""" rag_chain = cast(Any, rag_prompt | llm | StrOutputParser()) invoke_input: Dict[str, Any] = { "context": context, "graph_context": graph_context, "question": question, } if chat_history: invoke_input["chat_history"] = chat_history for chunk in rag_chain.stream(invoke_input): yield chunk
# ── Generation strategies ────────────────────────────────────────────────
[docs] def query_standard_mode( *, question: str, context_slices: List[ContextSlice], llm: Any, rag_prompt: ChatPromptTemplate, include_relations: bool, include_subjects: bool, include_full_content: bool, return_sources: bool, graph_fetch: Optional[GraphFetchResult] = None, chat_history: Optional[List[BaseMessage]] = None, progress_callback: ProgressCallback = None, preamble_callback: PreambleCallback = None, token_callback: TokenCallback = None, final_answer_callback: FinalAnswerCallback = None, entity_linker: Optional[LegalEntityLinker] = None, external_detector: Optional[Callable[[str], Any]] = None, lightweight_llm: Any = None, cypher_documents: Optional[List[Dict[str, Any]]] = None, cypher_query_meta: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """Run the standard hybrid QA path with optional graph support.""" config = get_config() budget_cfg = config.context_budget graph_cfg = config.graph max_context_chars = config.generation.max_context_chars context_assembly_started_at = time.perf_counter() graph_ranking_meta: Dict[str, Any] = {} graph_node_refs: List[Dict[str, Any]] = [] relationship_refs: List[Dict[str, Any]] = [] community_reports: List[CommunityReport] = [] if graph_fetch is not None: graph_acts_limit = max(graph_cfg.acts_limit, 1) graph_rels_limit = max(graph_cfg.relationships_limit, 1) ( context, graph_ranking_meta, graph_node_refs, relationship_refs, ) = build_ranked_graph_context( fetch_result=graph_fetch, semantic_results=[], max_context_chars=max_context_chars, graph_acts_limit=graph_acts_limit, graph_relationships_limit=graph_rels_limit, hop_decay=graph_cfg.ranking_hop_decay, semantic_boost=graph_cfg.ranking_semantic_boost, relation_weight_factor=graph_cfg.ranking_relation_weight_factor, budget_semantic_share=0.0, budget_graph_share=graph_cfg.budget_graph_share, budget_relation_share=graph_cfg.budget_relation_share, min_chars_per_source=max(budget_cfg.rag_min_chars_per_source, 0), max_depth=graph_cfg.depth, ) graph_chars_used = len(context) source_budget = max(max_context_chars - graph_chars_used, 0) source_context, source_refs, remaining_source_chars = build_source_context( context_slices=context_slices, max_context_chars=source_budget, min_chars_per_source=max(budget_cfg.rag_min_chars_per_source, 0), max_sources=max(budget_cfg.rag_max_sources, 1), ) if source_context: context = f"{source_context}\n\n---\n\n{context}" cypher_block = _format_cypher_block(cypher_documents) if cypher_block: context = f"{context}\n\n---\n\n{cypher_block}" if context else cypher_block else: relation_budget = ( int(max_context_chars * budget_cfg.standard_relation_budget_fraction) if include_relations else 0 ) source_budget = max(max_context_chars - relation_budget, 0) source_context, source_refs, remaining_source_chars = build_source_context( context_slices=context_slices, max_context_chars=source_budget, min_chars_per_source=max(budget_cfg.rag_min_chars_per_source, 0), max_sources=max(budget_cfg.rag_max_sources, 1), ) relation_block = "" if include_relations and relation_budget > 0: relation_block = build_relation_summary( context_slices=context_slices, line_limit=max(budget_cfg.rag_relation_lines, 0), ) if relation_block and len(relation_block) > relation_budget: relation_block = relation_block[:relation_budget] community_block = "" if config.graph.use_communities_in_rag: report_builder = CommunityReportBuilder( max_reports=min(budget_cfg.global_max_reports, 2), min_cluster_size=budget_cfg.global_min_cluster_size, max_evidence_per_report=budget_cfg.global_max_evidence_per_report, top_relation_types_limit=budget_cfg.community_top_relation_types, central_acts_limit=budget_cfg.community_central_acts, ) community_reports = report_builder.build_reports(context_slices) if community_reports: community_block = format_reports_block(community_reports) cypher_block = _format_cypher_block(cypher_documents) sections = [source_context] if relation_block: sections.append(relation_block) if community_block: sections.append(community_block) if cypher_block: sections.append(cypher_block) context = "\n\n---\n\n".join(section for section in sections if section) context_assembly_ms = (time.perf_counter() - context_assembly_started_at) * 1000.0 if not source_refs: return create_blocked_sourced_response( mode="rag", query=question, reason="no_source_context", metadata={ "phase_timings_ms": { "context_assembly_ms": round(context_assembly_ms, 1), "generation_ms": 0.0, "sources_build_ms": 0.0, }, }, ) source_artifacts = _build_source_artifacts( context_slices=context_slices, source_refs=source_refs, include_relations=include_relations, include_subjects=include_subjects, include_full_content=include_full_content, return_sources=return_sources, graph_node_refs=graph_node_refs, relationship_refs=relationship_refs, community_reports=community_reports if community_reports else None, cypher_documents=cypher_documents, cypher_query_meta=cypher_query_meta, ) sources_build_ms = source_artifacts.build_ms budget_meta: Dict[str, Any] = { "mode": "standard_ranked" if graph_fetch is not None else "standard", "source_budget_chars": source_budget, "remaining_source_chars": remaining_source_chars, "sources_used": len(source_refs), } if graph_ranking_meta: budget_meta["graph_ranking"] = graph_ranking_meta preamble_metadata: Dict[str, Any] = { "context_budget": budget_meta, "phase_timings_ms": { "context_assembly_ms": round(context_assembly_ms, 1), "sources_build_ms": round(sources_build_ms, 1), "generation_ms": 0.0, }, } if not graph_fetch and community_reports: preamble_metadata["community_context"] = { "reports_count": len(community_reports), "reports": [report.to_dict() for report in community_reports], } if preamble_callback is not None: preamble_callback(source_artifacts.payload, preamble_metadata) # Generation: map-reduce for large contexts, single call otherwise use_map_reduce = should_use_map_reduce(context) emit_progress( progress_callback, phase="generation", status="active", label="Rédaction de la réponse", detail=("Synthèse multi-fragments en cours" if use_map_reduce else "Synthèse des preuves en cours"), meta={"mode": "map_reduce" if use_map_reduce else "single_pass"}, ) generation_started_at = time.perf_counter() streamed_answer = _stream_or_invoke_rag_answer( use_map_reduce=use_map_reduce, context=context, question=question, llm=llm, rag_prompt=rag_prompt, chat_history=chat_history, token_callback=token_callback, ) answer = streamed_answer generation_ms = (time.perf_counter() - generation_started_at) * 1000.0 emit_progress( progress_callback, phase="generation", status="done", label="Réponse rédigée", duration_ms=generation_ms, meta={"mode": "map_reduce" if use_map_reduce else "single_pass"}, ) phase_timings_ms = { "context_assembly_ms": round(context_assembly_ms, 1), "generation_ms": round(generation_ms, 1), "sources_build_ms": round(sources_build_ms, 1), } response, blocked = _finalize_rag_answer( question=question, raw_answer=answer, streamed_answer=streamed_answer, llm=llm, lightweight_llm=lightweight_llm, entity_linker=entity_linker, external_detector=external_detector, source_artifacts=source_artifacts, graph_node_refs=graph_node_refs, relationship_refs=relationship_refs, final_answer_callback=final_answer_callback, progress_callback=progress_callback, phase_timings_ms=phase_timings_ms, ) if blocked: return response response["metadata"]["context_budget"] = budget_meta if not graph_fetch and community_reports: response["metadata"]["community_context"] = { "reports_count": len(community_reports), "reports": [report.to_dict() for report in community_reports], } response["metadata"]["phase_timings_ms"] = phase_timings_ms return response
[docs] def query_global_mode( *, question: str, context_slices: List[ContextSlice], llm: Any, rag_prompt: ChatPromptTemplate, include_full_content: bool, include_subjects: bool, return_sources: bool, graph_fetch: Optional[GraphFetchResult] = None, chat_history: Optional[List[BaseMessage]] = None, progress_callback: ProgressCallback = None, preamble_callback: PreambleCallback = None, token_callback: TokenCallback = None, final_answer_callback: FinalAnswerCallback = None, entity_linker: Optional[LegalEntityLinker] = None, external_detector: Optional[Callable[[str], Any]] = None, lightweight_llm: Any = None, cypher_documents: Optional[List[Dict[str, Any]]] = None, cypher_query_meta: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """Run the global GraphRAG path with community reporting.""" config = get_config() budget_cfg = config.context_budget graph_cfg = config.graph max_context_chars = config.generation.max_context_chars reports_share, _ = budget_cfg.normalized_global_shares() graph_ranked_block = "" graph_ranking_meta: Dict[str, Any] = {} graph_node_refs: List[Dict[str, Any]] = [] relationship_refs: List[Dict[str, Any]] = [] if graph_fetch is not None: graph_acts_limit = max(graph_cfg.acts_limit, 1) graph_rels_limit = max(graph_cfg.relationships_limit, 1) ( graph_ranked_block, graph_ranking_meta, graph_node_refs, relationship_refs, ) = build_ranked_graph_context( fetch_result=graph_fetch, semantic_results=[], max_context_chars=int(max_context_chars * budget_cfg.global_graph_budget_fraction), graph_acts_limit=graph_acts_limit, graph_relationships_limit=graph_rels_limit, hop_decay=graph_cfg.ranking_hop_decay, semantic_boost=graph_cfg.ranking_semantic_boost, relation_weight_factor=graph_cfg.ranking_relation_weight_factor, budget_semantic_share=0.0, budget_graph_share=graph_cfg.budget_graph_share, budget_relation_share=graph_cfg.budget_relation_share, min_chars_per_source=max(budget_cfg.rag_min_chars_per_source, 0), max_depth=graph_cfg.depth, ) graph_chars_used = len(graph_ranked_block) reports_budget = int((max_context_chars - graph_chars_used) * reports_share) sources_budget = max(max_context_chars - reports_budget - graph_chars_used, 0) context_assembly_started_at = time.perf_counter() report_builder = CommunityReportBuilder( max_reports=budget_cfg.global_max_reports, min_cluster_size=budget_cfg.global_min_cluster_size, max_evidence_per_report=budget_cfg.global_max_evidence_per_report, top_relation_types_limit=budget_cfg.community_top_relation_types, central_acts_limit=budget_cfg.community_central_acts, ) reports = report_builder.build_reports(context_slices) reports_block = format_reports_block(reports) if len(reports_block) > reports_budget: reports_block = reports_block[:reports_budget] source_context, source_refs, remaining_source_chars = build_source_context( context_slices=context_slices, max_context_chars=sources_budget, min_chars_per_source=max(budget_cfg.rag_min_chars_per_source, 0), max_sources=max(budget_cfg.global_max_source_docs, 1), ) cypher_block = _format_cypher_block(cypher_documents) sections: List[str] = [] if reports_block: sections.append(reports_block) if source_context: sections.append(source_context) if graph_ranked_block: sections.append(graph_ranked_block) if cypher_block: sections.append(cypher_block) context = "\n\n---\n\n".join(sections) context_assembly_ms = (time.perf_counter() - context_assembly_started_at) * 1000.0 if not source_refs: return create_blocked_sourced_response( mode="rag", query=question, reason="no_source_context", metadata={ "phase_timings_ms": { "context_assembly_ms": round(context_assembly_ms, 1), "generation_ms": 0.0, "sources_build_ms": 0.0, }, }, ) source_artifacts = _build_source_artifacts( context_slices=context_slices, source_refs=source_refs, include_relations=True, include_subjects=include_subjects, include_full_content=include_full_content, return_sources=return_sources, graph_node_refs=graph_node_refs, relationship_refs=relationship_refs, community_reports=reports if reports else None, cypher_documents=cypher_documents, cypher_query_meta=cypher_query_meta, ) sources_build_ms = source_artifacts.build_ms global_budget_meta: Dict[str, Any] = { "mode": "global_ranked" if graph_fetch is not None else "global", "reports_budget_chars": reports_budget, "sources_budget_chars": sources_budget, "remaining_source_chars": remaining_source_chars, "sources_used": len(source_refs), } if graph_ranking_meta: global_budget_meta["graph_ranking"] = graph_ranking_meta preamble_metadata: Dict[str, Any] = { "context_budget": global_budget_meta, "global_context": { "reports_count": len(reports), "reports": [report.to_dict() for report in reports], }, "phase_timings_ms": { "context_assembly_ms": round(context_assembly_ms, 1), "sources_build_ms": round(sources_build_ms, 1), "generation_ms": 0.0, }, } if preamble_callback is not None: preamble_callback(source_artifacts.payload, preamble_metadata) use_map_reduce = should_use_map_reduce(context) emit_progress( progress_callback, phase="generation", status="active", label="Rédaction de la réponse", detail=("Synthèse multi-fragments en cours" if use_map_reduce else "Synthèse des preuves en cours"), meta={"mode": "map_reduce" if use_map_reduce else "single_pass"}, ) generation_started_at = time.perf_counter() streamed_answer = _stream_or_invoke_rag_answer( use_map_reduce=use_map_reduce, context=context, question=question, llm=llm, rag_prompt=rag_prompt, chat_history=chat_history, token_callback=token_callback, ) answer = streamed_answer generation_ms = (time.perf_counter() - generation_started_at) * 1000.0 emit_progress( progress_callback, phase="generation", status="done", label="Réponse rédigée", duration_ms=generation_ms, meta={"mode": "map_reduce" if use_map_reduce else "single_pass"}, ) phase_timings_ms = { "context_assembly_ms": round(context_assembly_ms, 1), "generation_ms": round(generation_ms, 1), "sources_build_ms": round(sources_build_ms, 1), } response, blocked = _finalize_rag_answer( question=question, raw_answer=answer, streamed_answer=streamed_answer, llm=llm, lightweight_llm=lightweight_llm, entity_linker=entity_linker, external_detector=external_detector, source_artifacts=source_artifacts, graph_node_refs=graph_node_refs, relationship_refs=relationship_refs, final_answer_callback=final_answer_callback, progress_callback=progress_callback, phase_timings_ms=phase_timings_ms, ) if blocked: return response response["metadata"]["context_budget"] = global_budget_meta response["metadata"]["global_context"] = { "reports_count": len(reports), "reports": [report.to_dict() for report in reports], } response["metadata"]["phase_timings_ms"] = phase_timings_ms return response