"""Services for reading, refreshing, and augmenting canonical act summaries."""
from __future__ import annotations
import logging
from datetime import datetime
from typing import Any, Dict, Iterable, List, Optional
from lalandre_core.config import get_config
from lalandre_db_postgres import ActsSQL
from .generator import CanonicalSummaryGenerator
from .models import (
CANONICAL_SUMMARY_KIND,
CANONICAL_SUMMARY_PROMPT_VERSION,
SUMMARY_STATUS_FAILED,
SUMMARY_STATUS_PENDING,
SUMMARY_STATUS_READY,
CanonicalSummarySnapshot,
SummaryTraceRecorder,
)
logger = logging.getLogger(__name__)
def _language_code(value: Any) -> str:
"""Normalize enum-backed or raw language values to DB-compatible strings."""
return str(getattr(value, "value", value)).strip().lower()
[docs]
class ActSummaryService:
"""Read and refresh canonical act summaries."""
def __init__(
self,
*,
pg_repo: Any,
generator: Optional[CanonicalSummaryGenerator] = None,
prompt_version: str = CANONICAL_SUMMARY_PROMPT_VERSION,
model_id: Optional[str] = None,
) -> None:
self.pg_repo = pg_repo
self.generator = generator
self.prompt_version = prompt_version
self.model_id = model_id or self.build_runtime_model_id()
[docs]
@staticmethod
def build_runtime_model_id() -> str:
"""Build the model identifier recorded for generated summaries."""
extraction_cfg = get_config().extraction
provider = extraction_cfg.llm_provider.strip()
model = extraction_cfg.llm_model.strip()
if provider and model:
return f"{provider}:{model}"
return "deterministic:fallback"
[docs]
def get_canonical_summary_by_celex(self, celex: str) -> Optional[CanonicalSummarySnapshot]:
"""Fetch the current canonical summary snapshot for one act."""
with self.pg_repo.get_session() as session:
act = self.pg_repo.get_act_by_celex(session, celex)
if act is None:
return None
return self._build_snapshot(session=session, act=act)
def _acts_model(self):
return ActsSQL
def _build_snapshot(self, *, session: Any, act: Any) -> CanonicalSummarySnapshot:
language = _language_code(getattr(act, "language", ""))
current_version = self.pg_repo.get_current_version_for_act(session, act.id)
record = self.pg_repo.get_act_summary(
session,
act_id=act.id,
language=language,
summary_kind=CANONICAL_SUMMARY_KIND,
)
if record is None:
return CanonicalSummarySnapshot(
act_id=act.id,
celex=str(getattr(act, "celex", "")),
language=language,
status="missing",
is_stale=False,
summary=None,
generated_at=None,
prompt_version=None,
model_id=None,
source_version_id=None,
error_text=None,
trace=SummaryTraceRecorder.lookup(status="missing", is_stale=False),
)
is_stale = self._is_stale(record=record, act=act, current_version=current_version)
trace = dict(getattr(record, "trace_jsonb", {}) or {})
trace.update(
SummaryTraceRecorder.lookup(
status=str(getattr(record, "status", "")),
is_stale=is_stale,
)
)
return CanonicalSummarySnapshot(
act_id=act.id,
celex=str(getattr(act, "celex", "")),
language=language,
status=str(getattr(record, "status", SUMMARY_STATUS_FAILED)),
is_stale=is_stale,
summary=getattr(record, "summary_text", None),
generated_at=getattr(record, "generated_at", None),
prompt_version=getattr(record, "prompt_version", None),
model_id=getattr(record, "model_id", None),
source_version_id=getattr(record, "source_version_id", None),
error_text=getattr(record, "error_text", None),
trace=trace,
)
def _is_stale(self, *, record: Any, act: Any, current_version: Any) -> bool:
if str(getattr(record, "status", "")) != SUMMARY_STATUS_READY:
return False
act_hash = getattr(act, "content_hash", None)
if act_hash and getattr(record, "content_hash", None) != act_hash:
return True
current_version_id = getattr(current_version, "id", None) if current_version is not None else None
if getattr(record, "source_version_id", None) != current_version_id:
return True
if getattr(record, "prompt_version", None) != self.prompt_version:
return True
if getattr(record, "model_id", None) != self.model_id:
return True
return False
[docs]
def refresh_canonical_summary_for_celex(self, celex: str) -> CanonicalSummarySnapshot:
"""Regenerate and persist the canonical summary for one act."""
if self.generator is None:
raise RuntimeError("CanonicalSummaryGenerator is not configured")
with self.pg_repo.get_session() as session:
act = self.pg_repo.get_act_by_celex(session, celex)
if act is None:
raise ValueError(f"Act {celex} not found")
current_version = self.pg_repo.get_current_version_for_act(session, act.id)
version_id = getattr(current_version, "id", None) if current_version is not None else None
subdivisions = self.pg_repo.list_subdivisions_for_act_version(session, act.id, version_id)
language = _language_code(getattr(act, "language", ""))
existing = self.pg_repo.get_act_summary(
session,
act_id=act.id,
language=language,
summary_kind=CANONICAL_SUMMARY_KIND,
)
pending_trace = SummaryTraceRecorder.lookup(status=SUMMARY_STATUS_PENDING, is_stale=False)
self.pg_repo.upsert_act_summary(
session,
{
"act_id": act.id,
"language": language,
"summary_kind": CANONICAL_SUMMARY_KIND,
"status": SUMMARY_STATUS_PENDING,
"summary_text": getattr(existing, "summary_text", None),
"content_hash": getattr(act, "content_hash", None),
"source_version_id": version_id,
"prompt_version": self.prompt_version,
"model_id": self.model_id,
"generated_at": getattr(existing, "generated_at", None),
"last_attempt_at": datetime.utcnow(),
"error_text": None,
"trace_jsonb": pending_trace,
},
)
session.commit()
try:
generated = self.generator.generate(act=act, version=current_version, subdivisions=subdivisions)
self.pg_repo.upsert_act_summary(
session,
{
"act_id": act.id,
"language": language,
"summary_kind": CANONICAL_SUMMARY_KIND,
"status": SUMMARY_STATUS_READY,
"summary_text": generated["summary_text"],
"content_hash": getattr(act, "content_hash", None),
"source_version_id": version_id,
"prompt_version": self.prompt_version,
"model_id": generated["model_id"],
"generated_at": datetime.utcnow(),
"last_attempt_at": datetime.utcnow(),
"error_text": None,
"trace_jsonb": generated["trace"],
},
)
session.commit()
except Exception as exc:
logger.exception("Canonical summary refresh failed for %s", celex)
preserved_summary = getattr(existing, "summary_text", None)
preserved_generated_at = getattr(existing, "generated_at", None)
preserved_status = SUMMARY_STATUS_READY if preserved_summary else SUMMARY_STATUS_FAILED
self.pg_repo.upsert_act_summary(
session,
{
"act_id": act.id,
"language": language,
"summary_kind": CANONICAL_SUMMARY_KIND,
"status": preserved_status,
"summary_text": preserved_summary,
"content_hash": (
getattr(existing, "content_hash", None)
if preserved_summary
else getattr(act, "content_hash", None)
),
"source_version_id": (
getattr(existing, "source_version_id", None) if preserved_summary else version_id
),
"prompt_version": (
getattr(existing, "prompt_version", None) if preserved_summary else self.prompt_version
),
"model_id": (getattr(existing, "model_id", None) if preserved_summary else self.model_id),
"generated_at": preserved_generated_at,
"last_attempt_at": datetime.utcnow(),
"error_text": str(exc),
"trace_jsonb": SummaryTraceRecorder.lookup(
status=preserved_status,
is_stale=bool(preserved_summary),
reason=str(exc),
),
},
)
session.commit()
return self._build_snapshot(session=session, act=act)
[docs]
def list_celex_needing_canonical_summary(self) -> List[str]:
"""List acts whose canonical summaries are missing, failed, or stale."""
celexes: List[str] = []
with self.pg_repo.get_session() as session:
acts = self.pg_repo.list_acts_with_metadata(session)
for act in acts:
snapshot = self._build_snapshot(session=session, act=act)
if snapshot.status in {"missing", SUMMARY_STATUS_FAILED} or snapshot.is_stale:
celexes.append(str(getattr(act, "celex", "")))
return celexes
[docs]
class QuestionSummaryService:
"""Augment personalized summarize/compare prompts with canonical summaries."""
def __init__(self, act_summary_service: Optional[ActSummaryService]) -> None:
self.act_summary_service = act_summary_service
[docs]
def augment_question(self, *, celex: Optional[str], question: str) -> tuple[str, Dict[str, Any]]:
"""Augment one summarize question with cached canonical context when available."""
if not celex or self.act_summary_service is None:
return question, {
"summary_source": "question_augmented",
"summary_cache_hit": False,
"summary_stale": False,
}
snapshot = self.act_summary_service.get_canonical_summary_by_celex(celex)
if snapshot is None or not snapshot.available:
return question, {
"summary_source": "question_augmented",
"summary_cache_hit": False,
"summary_stale": bool(snapshot.is_stale) if snapshot is not None else False,
}
augmented = (
f"{question.strip()}\n\n"
"Contexte canonique de l'acte à prendre en compte comme mémoire de synthèse, "
"sans le traiter comme une preuve documentaire autonome:\n"
f"{snapshot.summary}"
)
return augmented, {
"summary_source": "question_augmented",
"summary_cache_hit": True,
"summary_stale": snapshot.is_stale,
}
[docs]
def augment_compare_question(
self,
*,
comparison_question: str,
celex_list: Iterable[str],
) -> tuple[str, Dict[str, Any]]:
"""Augment a compare question with cached summaries for each compared act."""
if self.act_summary_service is None:
return comparison_question, {
"summary_source": "question_augmented",
"summary_cache_hit": False,
"summary_stale": False,
}
blocks: List[str] = []
cache_hit = False
stale = False
for celex in celex_list:
snapshot = self.act_summary_service.get_canonical_summary_by_celex(celex)
if snapshot is None or not snapshot.available:
continue
cache_hit = True
stale = stale or snapshot.is_stale
blocks.append(f"{celex}: {snapshot.summary}")
if not blocks:
return comparison_question, {
"summary_source": "question_augmented",
"summary_cache_hit": False,
"summary_stale": False,
}
augmented = (
f"{comparison_question.strip()}\n\n"
"Résumés canoniques des actes comparés à utiliser comme contexte de cadrage "
"et non comme preuves documentaires autonomes:\n" + "\n\n".join(blocks)
)
return augmented, {
"summary_source": "question_augmented",
"summary_cache_hit": cache_hit,
"summary_stale": stale,
}