Source code for lalandre_core.linking.entity_linker

"""
Local entity linking utilities for legal acts (UE/France).
"""

import logging
import re
from dataclasses import dataclass, replace
from typing import Callable, Dict, Iterable, List, Optional

from rapidfuzz import fuzz, process

from lalandre_core.utils import normalize_celex, normalize_text

from .heuristics import IDENTIFIER_PATTERNS

logger = logging.getLogger(__name__)


ArticleLookup = Callable[[int, str], Optional[int]]
"""Callable that resolves ``(act_id, article_number)`` to a subdivision id."""


[docs] @dataclass(frozen=True) class ActAliasEntry: """Canonical act entry and its known alias forms.""" celex: str title: str aliases: tuple[str, ...] = () act_id: Optional[int] = None eli: Optional[str] = None """Optional European Legislation Identifier URI for interop with ELI-aware systems.""" acronyms: tuple[str, ...] = () """Short acronyms (DORA, CRR, MAR, ...) that bypass ``min_alias_chars``."""
[docs] @dataclass(frozen=True) class LinkResolution: """Resolved reference returned by the entity linker.""" celex: str score: float # 0.0 - 1.0 method: str # explicit, exact_alias, fuzzy_alias matched_text: str act_id: Optional[int] = None subdivision_id: Optional[int] = None article_number: Optional[str] = None eli: Optional[str] = None """Canonical ELI URI of the resolved act (propagated from the matching entry)."""
[docs] class LegalEntityLinker: """ Resolve legal references to canonical CELEX-like identifiers. """ def __init__( self, entries: Iterable[ActAliasEntry], *, fuzzy_threshold: float, fuzzy_min_gap: float, fuzzy_limit: int = 2, min_alias_chars: int, article_lookup: Optional[ArticleLookup] = None, ) -> None: self.fuzzy_threshold = max(0.0, min(fuzzy_threshold, 1.0)) self.fuzzy_min_gap = max(0.0, min(fuzzy_min_gap, 1.0)) self.fuzzy_limit = max(1, fuzzy_limit) self.min_alias_chars = min_alias_chars self._article_lookup = article_lookup self._alias_to_celex: Dict[str, str] = {} self._aliases: List[str] = [] self._celex_to_act_id: Dict[str, int] = {} self._celex_to_eli: Dict[str, str] = {} for entry in entries: if entry.act_id is not None: self._celex_to_act_id[entry.celex] = entry.act_id if entry.eli: self._celex_to_eli[entry.celex] = entry.eli standard_aliases = {entry.celex, entry.title, *entry.aliases} for alias in standard_aliases: normalized = normalize_text(alias) if not normalized: continue if len(normalized) < self.min_alias_chars and not re.search(r"\d", normalized): continue if normalized not in self._alias_to_celex: self._alias_to_celex[normalized] = entry.celex self._aliases.append(normalized) # Acronyms (DORA, CRR, MAR, …) bypass the min-length filter so # short official acronyms remain matchable even without a digit. for acronym in entry.acronyms: normalized = normalize_text(acronym) if not normalized: continue if normalized not in self._alias_to_celex: self._alias_to_celex[normalized] = entry.celex self._aliases.append(normalized) @property def alias_count(self) -> int: """Return the number of normalized aliases indexed by the linker.""" return len(self._aliases)
[docs] @classmethod def derive_acronyms(cls, title: str) -> tuple[str, ...]: """Extract short acronyms from a title's parenthesised content. Returns a tuple of strings like ``("DORA",)`` for a title that contains ``Digital Operational Resilience Regulation (DORA)``. The caller passes this to ``ActAliasEntry.acronyms`` so the linker can match them without applying ``min_alias_chars``. """ if not title: return () acronyms: set[str] = set() for match in re.finditer(r"\(([^)]{2,30})\)", title): content = match.group(1).strip() if len(content) >= 3 and any(ch.isalpha() for ch in content): acronyms.add(content) return tuple(acronyms)
[docs] @classmethod def derive_aliases( cls, title: str, *, eli: Optional[str] = None, official_journal_reference: Optional[str] = None, form_number: Optional[str] = None, ) -> tuple[str, ...]: """Derive stable alias candidates from act metadata fields.""" aliases: set[str] = set() raw_title = title.strip() if raw_title: aliases.add(raw_title) for pattern in IDENTIFIER_PATTERNS: for match in pattern.finditer(raw_title): aliases.add(match.group(0)) for match in re.finditer(r"\(([^)]{2,30})\)", raw_title): content = match.group(1).strip() if len(content) >= 3 and any(ch.isalpha() for ch in content): aliases.add(content) if eli: aliases.add(eli.strip()) if official_journal_reference: aliases.add(official_journal_reference.strip()) if form_number: aliases.add(form_number.strip()) return tuple(alias for alias in aliases if alias)
[docs] def resolve(self, reference: str) -> Optional[LinkResolution]: """Resolve a free-text legal reference to a canonical CELEX identifier.""" raw = reference.strip() if not raw: return None explicit = self._resolve_explicit_identifier(raw) if explicit is not None: return self._with_act_id(explicit) normalized = normalize_text(raw) if not normalized: return None exact_celex = self._alias_to_celex.get(normalized) if exact_celex: return self._with_act_id( LinkResolution( celex=exact_celex, score=0.99, method="exact_alias", matched_text=raw, ) ) fuzzy = self._resolve_fuzzy_alias(raw, normalized) return self._with_act_id(fuzzy) if fuzzy is not None else None
[docs] def resolve_with_article( self, reference: str, article_number: Optional[str] = None, *, article_lookup: Optional[ArticleLookup] = None, ) -> Optional[LinkResolution]: """Resolve a reference and optionally enrich with a subdivision_id for an article. If ``article_number`` is provided and the act is known, try to resolve the corresponding subdivision id via ``article_lookup`` (or the linker's default one). Falls back to returning the base resolution if the article can't be resolved. """ resolution = self.resolve(reference) if resolution is None: return None if not article_number: return resolution normalized_article = article_number.strip() if not normalized_article: return resolution if resolution.act_id is None: return replace(resolution, article_number=normalized_article) lookup = article_lookup or self._article_lookup if lookup is None: return replace(resolution, article_number=normalized_article) try: subdivision_id = lookup(resolution.act_id, normalized_article) except Exception: logger.debug( "Article lookup raised for act_id=%s article=%s", resolution.act_id, normalized_article, exc_info=True, ) subdivision_id = None return replace( resolution, subdivision_id=subdivision_id, article_number=normalized_article, )
def _with_act_id(self, resolution: LinkResolution) -> LinkResolution: updates: Dict[str, object] = {} if resolution.act_id is None: act_id = self._celex_to_act_id.get(resolution.celex) if act_id is not None: updates["act_id"] = act_id if resolution.eli is None: eli = self._celex_to_eli.get(resolution.celex) if eli: updates["eli"] = eli if not updates: return resolution return replace(resolution, **updates) def _resolve_explicit_identifier(self, text: str) -> Optional[LinkResolution]: for pattern in IDENTIFIER_PATTERNS: match = pattern.search(text) if not match: continue raw_identifier = match.group(0) celex = normalize_celex(raw_identifier) if celex: return LinkResolution( celex=celex, score=1.0, method="explicit", matched_text=raw_identifier, ) return None def _resolve_fuzzy_alias(self, raw: str, normalized: str) -> Optional[LinkResolution]: if not self._aliases: return None threshold_percent = self.fuzzy_threshold * 100.0 matches = process.extract( normalized, self._aliases, scorer=fuzz.token_set_ratio, score_cutoff=threshold_percent, limit=self.fuzzy_limit, ) if not matches: return None best_match = matches[0] alias_text = str(best_match[0]) score = float(best_match[1]) / 100.0 if len(matches) > 1: second_score = float(matches[1][1]) / 100.0 if (score - second_score) < self.fuzzy_min_gap: logger.debug( "Ambiguous fuzzy alias rejected: raw=%r best=%r(%.3f) second=%r(%.3f)", raw, alias_text, score, str(matches[1][0]), second_score, ) return None celex = self._alias_to_celex[alias_text] return LinkResolution( celex=celex, score=score, method="fuzzy_alias", matched_text=raw, )