Source code for lalandre_chunking.pipeline

"""
Keeps Postgres / Qdrant chunk artefacts in sync and provides a canonical
dict representation of chunk records.
"""

import logging
from dataclasses import dataclass
from typing import Any

from lalandre_core.utils import is_eurlex_celex, is_legifrance_celex

logger = logging.getLogger(__name__)


# ---------------------------------------------------------------------------
# Chunk record serialisation
# ---------------------------------------------------------------------------


[docs] def serialize_chunk_records(chunks: list[Any]) -> list[dict[str, Any]]: """Convert chunk ORM objects to plain dicts for ``insert_chunk_records``.""" records: list[dict[str, Any]] = [] for chunk in chunks: records.append( { "subdivision_id": chunk.subdivision_id, "chunk_index": chunk.chunk_index, "content": chunk.content, "char_start": chunk.char_start, "char_end": chunk.char_end, "token_count": chunk.token_count, "chunk_metadata": chunk.chunk_metadata, } ) return records
# --------------------------------------------------------------------------- # Article-level chunking preparation # ---------------------------------------------------------------------------
[docs] @dataclass class ArticleLevelPlan: """Pre-computed plan for article-level chunking of an act. Attributes: active: Whether article-level chunking applies for this act. skip_ids: IDs of paragraph subdivisions folded into their parent article. article_content: Mapping of article subdivision IDs to aggregated content. """ active: bool skip_ids: set[int] article_content: dict[int, str]
[docs] def prepare_article_level_plan( celex: str, subdivisions: list[Any], article_level_enabled: bool, ) -> ArticleLevelPlan: """Build an :class:`ArticleLevelPlan` for the given act. Args: celex: Normalized CELEX identifier of the act. subdivisions: Ordered subdivisions for the act, including child paragraphs. article_level_enabled: Value of ``config.chunking.article_level_chunking``. """ empty = ArticleLevelPlan(active=False, skip_ids=set(), article_content={}) if not article_level_enabled: return empty if not (is_eurlex_celex(celex) or is_legifrance_celex(celex)): return empty plan = ArticleLevelPlan(active=True, skip_ids=set(), article_content={}) # Légifrance articles already carry their full text → nothing to aggregate if is_legifrance_celex(celex): return plan # EUR-Lex: articles only hold the title; real text lives in child paragraphs _norm = _normalize_subdivision_type subdiv_by_id = {s.id: s for s in subdivisions} children_by_article: dict[int, list[Any]] = {} for subdiv in subdivisions: if subdiv.parent_id is None: continue parent = subdiv_by_id.get(subdiv.parent_id) if ( parent is not None and _norm(parent.subdivision_type) == "article" and _norm(subdiv.subdivision_type) == "paragraph" ): children_by_article.setdefault(parent.id, []).append(subdiv) plan.skip_ids.add(subdiv.id) for article_id, children in children_by_article.items(): children.sort(key=lambda s: s.sequence_order) parent = subdiv_by_id[article_id] parts = [parent.content or ""] for child in children: parts.append(child.content or "") aggregated = "\n".join(p for p in parts if p) if aggregated: plan.article_content[article_id] = aggregated return plan
# --------------------------------------------------------------------------- # Helpers # ---------------------------------------------------------------------------
[docs] def make_article_level_chunks( *, chunker: Any, subdivision: Any, article_level_plan: ArticleLevelPlan, ) -> list[Any] | None: """Return one full-article chunk when article-level chunking applies.""" subdivision_type = _normalize_subdivision_type(getattr(subdivision, "subdivision_type", None)) if not article_level_plan.active or subdivision_type != "article": return None content = article_level_plan.article_content.get(subdivision.id) or subdivision.content or "" if not str(content).strip(): return [] return [ chunker.make_single_chunk( subdivision_id=subdivision.id, content=content, metadata={ "article_level_chunking": True, "subdivision_type": "article", "preserve_full_article": True, }, ) ]
def _normalize_subdivision_type(subdivision_type: Any) -> str | None: if subdivision_type is None: return None type_value = getattr(subdivision_type, "value", subdivision_type) return str(type_value).strip().lower() or None