Source code for lalandre_chunking.pipeline
"""
Keeps Postgres / Qdrant chunk artefacts in sync and provides a canonical
dict representation of chunk records.
"""
import logging
from dataclasses import dataclass
from typing import Any
from lalandre_core.utils import is_eurlex_celex, is_legifrance_celex
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Chunk record serialisation
# ---------------------------------------------------------------------------
[docs]
def serialize_chunk_records(chunks: list[Any]) -> list[dict[str, Any]]:
"""Convert chunk ORM objects to plain dicts for ``insert_chunk_records``."""
records: list[dict[str, Any]] = []
for chunk in chunks:
records.append(
{
"subdivision_id": chunk.subdivision_id,
"chunk_index": chunk.chunk_index,
"content": chunk.content,
"char_start": chunk.char_start,
"char_end": chunk.char_end,
"token_count": chunk.token_count,
"chunk_metadata": chunk.chunk_metadata,
}
)
return records
# ---------------------------------------------------------------------------
# Article-level chunking preparation
# ---------------------------------------------------------------------------
[docs]
@dataclass
class ArticleLevelPlan:
"""Pre-computed plan for article-level chunking of an act.
Attributes:
active: Whether article-level chunking applies for this act.
skip_ids: IDs of paragraph subdivisions folded into their parent article.
article_content: Mapping of article subdivision IDs to aggregated content.
"""
active: bool
skip_ids: set[int]
article_content: dict[int, str]
[docs]
def prepare_article_level_plan(
celex: str,
subdivisions: list[Any],
article_level_enabled: bool,
) -> ArticleLevelPlan:
"""Build an :class:`ArticleLevelPlan` for the given act.
Args:
celex: Normalized CELEX identifier of the act.
subdivisions: Ordered subdivisions for the act, including child paragraphs.
article_level_enabled: Value of ``config.chunking.article_level_chunking``.
"""
empty = ArticleLevelPlan(active=False, skip_ids=set(), article_content={})
if not article_level_enabled:
return empty
if not (is_eurlex_celex(celex) or is_legifrance_celex(celex)):
return empty
plan = ArticleLevelPlan(active=True, skip_ids=set(), article_content={})
# Légifrance articles already carry their full text → nothing to aggregate
if is_legifrance_celex(celex):
return plan
# EUR-Lex: articles only hold the title; real text lives in child paragraphs
_norm = _normalize_subdivision_type
subdiv_by_id = {s.id: s for s in subdivisions}
children_by_article: dict[int, list[Any]] = {}
for subdiv in subdivisions:
if subdiv.parent_id is None:
continue
parent = subdiv_by_id.get(subdiv.parent_id)
if (
parent is not None
and _norm(parent.subdivision_type) == "article"
and _norm(subdiv.subdivision_type) == "paragraph"
):
children_by_article.setdefault(parent.id, []).append(subdiv)
plan.skip_ids.add(subdiv.id)
for article_id, children in children_by_article.items():
children.sort(key=lambda s: s.sequence_order)
parent = subdiv_by_id[article_id]
parts = [parent.content or ""]
for child in children:
parts.append(child.content or "")
aggregated = "\n".join(p for p in parts if p)
if aggregated:
plan.article_content[article_id] = aggregated
return plan
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
[docs]
def make_article_level_chunks(
*,
chunker: Any,
subdivision: Any,
article_level_plan: ArticleLevelPlan,
) -> list[Any] | None:
"""Return one full-article chunk when article-level chunking applies."""
subdivision_type = _normalize_subdivision_type(getattr(subdivision, "subdivision_type", None))
if not article_level_plan.active or subdivision_type != "article":
return None
content = article_level_plan.article_content.get(subdivision.id) or subdivision.content or ""
if not str(content).strip():
return []
return [
chunker.make_single_chunk(
subdivision_id=subdivision.id,
content=content,
metadata={
"article_level_chunking": True,
"subdivision_type": "article",
"preserve_full_article": True,
},
)
]
def _normalize_subdivision_type(subdivision_type: Any) -> str | None:
if subdivision_type is None:
return None
type_value = getattr(subdivision_type, "value", subdivision_type)
return str(type_value).strip().lower() or None