Source code for lalandre_rag.summaries.service

"""Services for reading, refreshing, and augmenting canonical act summaries."""

from __future__ import annotations

import logging
from datetime import datetime
from typing import Any, Dict, Iterable, List, Optional

from lalandre_core.config import get_config
from lalandre_db_postgres import ActsSQL

from .generator import CanonicalSummaryGenerator
from .models import (
    CANONICAL_SUMMARY_KIND,
    CANONICAL_SUMMARY_PROMPT_VERSION,
    SUMMARY_STATUS_FAILED,
    SUMMARY_STATUS_PENDING,
    SUMMARY_STATUS_READY,
    CanonicalSummarySnapshot,
    SummaryTraceRecorder,
)

logger = logging.getLogger(__name__)


def _language_code(value: Any) -> str:
    """Normalize enum-backed or raw language values to DB-compatible strings."""
    return str(getattr(value, "value", value)).strip().lower()


[docs] class ActSummaryService: """Read and refresh canonical act summaries.""" def __init__( self, *, pg_repo: Any, generator: Optional[CanonicalSummaryGenerator] = None, prompt_version: str = CANONICAL_SUMMARY_PROMPT_VERSION, model_id: Optional[str] = None, ) -> None: self.pg_repo = pg_repo self.generator = generator self.prompt_version = prompt_version self.model_id = model_id or self.build_runtime_model_id()
[docs] @staticmethod def build_runtime_model_id() -> str: """Build the model identifier recorded for generated summaries.""" extraction_cfg = get_config().extraction provider = extraction_cfg.llm_provider.strip() model = extraction_cfg.llm_model.strip() if provider and model: return f"{provider}:{model}" return "deterministic:fallback"
[docs] def get_canonical_summary_by_celex(self, celex: str) -> Optional[CanonicalSummarySnapshot]: """Fetch the current canonical summary snapshot for one act.""" with self.pg_repo.get_session() as session: act = self.pg_repo.get_act_by_celex(session, celex) if act is None: return None return self._build_snapshot(session=session, act=act)
def _acts_model(self): return ActsSQL def _build_snapshot(self, *, session: Any, act: Any) -> CanonicalSummarySnapshot: language = _language_code(getattr(act, "language", "")) current_version = self.pg_repo.get_current_version_for_act(session, act.id) record = self.pg_repo.get_act_summary( session, act_id=act.id, language=language, summary_kind=CANONICAL_SUMMARY_KIND, ) if record is None: return CanonicalSummarySnapshot( act_id=act.id, celex=str(getattr(act, "celex", "")), language=language, status="missing", is_stale=False, summary=None, generated_at=None, prompt_version=None, model_id=None, source_version_id=None, error_text=None, trace=SummaryTraceRecorder.lookup(status="missing", is_stale=False), ) is_stale = self._is_stale(record=record, act=act, current_version=current_version) trace = dict(getattr(record, "trace_jsonb", {}) or {}) trace.update( SummaryTraceRecorder.lookup( status=str(getattr(record, "status", "")), is_stale=is_stale, ) ) return CanonicalSummarySnapshot( act_id=act.id, celex=str(getattr(act, "celex", "")), language=language, status=str(getattr(record, "status", SUMMARY_STATUS_FAILED)), is_stale=is_stale, summary=getattr(record, "summary_text", None), generated_at=getattr(record, "generated_at", None), prompt_version=getattr(record, "prompt_version", None), model_id=getattr(record, "model_id", None), source_version_id=getattr(record, "source_version_id", None), error_text=getattr(record, "error_text", None), trace=trace, ) def _is_stale(self, *, record: Any, act: Any, current_version: Any) -> bool: if str(getattr(record, "status", "")) != SUMMARY_STATUS_READY: return False act_hash = getattr(act, "content_hash", None) if act_hash and getattr(record, "content_hash", None) != act_hash: return True current_version_id = getattr(current_version, "id", None) if current_version is not None else None if getattr(record, "source_version_id", None) != current_version_id: return True if getattr(record, "prompt_version", None) != self.prompt_version: return True if getattr(record, "model_id", None) != self.model_id: return True return False
[docs] def refresh_canonical_summary_for_celex(self, celex: str) -> CanonicalSummarySnapshot: """Regenerate and persist the canonical summary for one act.""" if self.generator is None: raise RuntimeError("CanonicalSummaryGenerator is not configured") with self.pg_repo.get_session() as session: act = self.pg_repo.get_act_by_celex(session, celex) if act is None: raise ValueError(f"Act {celex} not found") current_version = self.pg_repo.get_current_version_for_act(session, act.id) version_id = getattr(current_version, "id", None) if current_version is not None else None subdivisions = self.pg_repo.list_subdivisions_for_act_version(session, act.id, version_id) language = _language_code(getattr(act, "language", "")) existing = self.pg_repo.get_act_summary( session, act_id=act.id, language=language, summary_kind=CANONICAL_SUMMARY_KIND, ) pending_trace = SummaryTraceRecorder.lookup(status=SUMMARY_STATUS_PENDING, is_stale=False) self.pg_repo.upsert_act_summary( session, { "act_id": act.id, "language": language, "summary_kind": CANONICAL_SUMMARY_KIND, "status": SUMMARY_STATUS_PENDING, "summary_text": getattr(existing, "summary_text", None), "content_hash": getattr(act, "content_hash", None), "source_version_id": version_id, "prompt_version": self.prompt_version, "model_id": self.model_id, "generated_at": getattr(existing, "generated_at", None), "last_attempt_at": datetime.utcnow(), "error_text": None, "trace_jsonb": pending_trace, }, ) session.commit() try: generated = self.generator.generate(act=act, version=current_version, subdivisions=subdivisions) self.pg_repo.upsert_act_summary( session, { "act_id": act.id, "language": language, "summary_kind": CANONICAL_SUMMARY_KIND, "status": SUMMARY_STATUS_READY, "summary_text": generated["summary_text"], "content_hash": getattr(act, "content_hash", None), "source_version_id": version_id, "prompt_version": self.prompt_version, "model_id": generated["model_id"], "generated_at": datetime.utcnow(), "last_attempt_at": datetime.utcnow(), "error_text": None, "trace_jsonb": generated["trace"], }, ) session.commit() except Exception as exc: logger.exception("Canonical summary refresh failed for %s", celex) preserved_summary = getattr(existing, "summary_text", None) preserved_generated_at = getattr(existing, "generated_at", None) preserved_status = SUMMARY_STATUS_READY if preserved_summary else SUMMARY_STATUS_FAILED self.pg_repo.upsert_act_summary( session, { "act_id": act.id, "language": language, "summary_kind": CANONICAL_SUMMARY_KIND, "status": preserved_status, "summary_text": preserved_summary, "content_hash": ( getattr(existing, "content_hash", None) if preserved_summary else getattr(act, "content_hash", None) ), "source_version_id": ( getattr(existing, "source_version_id", None) if preserved_summary else version_id ), "prompt_version": ( getattr(existing, "prompt_version", None) if preserved_summary else self.prompt_version ), "model_id": (getattr(existing, "model_id", None) if preserved_summary else self.model_id), "generated_at": preserved_generated_at, "last_attempt_at": datetime.utcnow(), "error_text": str(exc), "trace_jsonb": SummaryTraceRecorder.lookup( status=preserved_status, is_stale=bool(preserved_summary), reason=str(exc), ), }, ) session.commit() return self._build_snapshot(session=session, act=act)
[docs] def list_celex_needing_canonical_summary(self) -> List[str]: """List acts whose canonical summaries are missing, failed, or stale.""" celexes: List[str] = [] with self.pg_repo.get_session() as session: acts = self.pg_repo.list_acts_with_metadata(session) for act in acts: snapshot = self._build_snapshot(session=session, act=act) if snapshot.status in {"missing", SUMMARY_STATUS_FAILED} or snapshot.is_stale: celexes.append(str(getattr(act, "celex", ""))) return celexes
[docs] class QuestionSummaryService: """Augment personalized summarize/compare prompts with canonical summaries.""" def __init__(self, act_summary_service: Optional[ActSummaryService]) -> None: self.act_summary_service = act_summary_service
[docs] def augment_question(self, *, celex: Optional[str], question: str) -> tuple[str, Dict[str, Any]]: """Augment one summarize question with cached canonical context when available.""" if not celex or self.act_summary_service is None: return question, { "summary_source": "question_augmented", "summary_cache_hit": False, "summary_stale": False, } snapshot = self.act_summary_service.get_canonical_summary_by_celex(celex) if snapshot is None or not snapshot.available: return question, { "summary_source": "question_augmented", "summary_cache_hit": False, "summary_stale": bool(snapshot.is_stale) if snapshot is not None else False, } augmented = ( f"{question.strip()}\n\n" "Contexte canonique de l'acte à prendre en compte comme mémoire de synthèse, " "sans le traiter comme une preuve documentaire autonome:\n" f"{snapshot.summary}" ) return augmented, { "summary_source": "question_augmented", "summary_cache_hit": True, "summary_stale": snapshot.is_stale, }
[docs] def augment_compare_question( self, *, comparison_question: str, celex_list: Iterable[str], ) -> tuple[str, Dict[str, Any]]: """Augment a compare question with cached summaries for each compared act.""" if self.act_summary_service is None: return comparison_question, { "summary_source": "question_augmented", "summary_cache_hit": False, "summary_stale": False, } blocks: List[str] = [] cache_hit = False stale = False for celex in celex_list: snapshot = self.act_summary_service.get_canonical_summary_by_celex(celex) if snapshot is None or not snapshot.available: continue cache_hit = True stale = stale or snapshot.is_stale blocks.append(f"{celex}: {snapshot.summary}") if not blocks: return comparison_question, { "summary_source": "question_augmented", "summary_cache_hit": False, "summary_stale": False, } augmented = ( f"{comparison_question.strip()}\n\n" "Résumés canoniques des actes comparés à utiliser comme contexte de cadrage " "et non comme preuves documentaires autonomes:\n" + "\n\n".join(blocks) ) return augmented, { "summary_source": "question_augmented", "summary_cache_hit": cache_hit, "summary_stale": stale, }