"""
Local entity linking utilities for legal acts (UE/France).
"""
import logging
import re
from dataclasses import dataclass, replace
from typing import Callable, Dict, Iterable, List, Optional
from rapidfuzz import fuzz, process
from lalandre_core.utils import normalize_celex, normalize_text
from .heuristics import IDENTIFIER_PATTERNS
logger = logging.getLogger(__name__)
ArticleLookup = Callable[[int, str], Optional[int]]
"""Callable that resolves ``(act_id, article_number)`` to a subdivision id."""
[docs]
@dataclass(frozen=True)
class ActAliasEntry:
"""Canonical act entry and its known alias forms."""
celex: str
title: str
aliases: tuple[str, ...] = ()
act_id: Optional[int] = None
eli: Optional[str] = None
"""Optional European Legislation Identifier URI for interop with ELI-aware systems."""
acronyms: tuple[str, ...] = ()
"""Short acronyms (DORA, CRR, MAR, ...) that bypass ``min_alias_chars``."""
[docs]
@dataclass(frozen=True)
class LinkResolution:
"""Resolved reference returned by the entity linker."""
celex: str
score: float # 0.0 - 1.0
method: str # explicit, exact_alias, fuzzy_alias
matched_text: str
act_id: Optional[int] = None
subdivision_id: Optional[int] = None
article_number: Optional[str] = None
eli: Optional[str] = None
"""Canonical ELI URI of the resolved act (propagated from the matching entry)."""
[docs]
class LegalEntityLinker:
"""
Resolve legal references to canonical CELEX-like identifiers.
"""
def __init__(
self,
entries: Iterable[ActAliasEntry],
*,
fuzzy_threshold: float,
fuzzy_min_gap: float,
fuzzy_limit: int = 2,
min_alias_chars: int,
article_lookup: Optional[ArticleLookup] = None,
) -> None:
self.fuzzy_threshold = max(0.0, min(fuzzy_threshold, 1.0))
self.fuzzy_min_gap = max(0.0, min(fuzzy_min_gap, 1.0))
self.fuzzy_limit = max(1, fuzzy_limit)
self.min_alias_chars = min_alias_chars
self._article_lookup = article_lookup
self._alias_to_celex: Dict[str, str] = {}
self._aliases: List[str] = []
self._celex_to_act_id: Dict[str, int] = {}
self._celex_to_eli: Dict[str, str] = {}
for entry in entries:
if entry.act_id is not None:
self._celex_to_act_id[entry.celex] = entry.act_id
if entry.eli:
self._celex_to_eli[entry.celex] = entry.eli
standard_aliases = {entry.celex, entry.title, *entry.aliases}
for alias in standard_aliases:
normalized = normalize_text(alias)
if not normalized:
continue
if len(normalized) < self.min_alias_chars and not re.search(r"\d", normalized):
continue
if normalized not in self._alias_to_celex:
self._alias_to_celex[normalized] = entry.celex
self._aliases.append(normalized)
# Acronyms (DORA, CRR, MAR, …) bypass the min-length filter so
# short official acronyms remain matchable even without a digit.
for acronym in entry.acronyms:
normalized = normalize_text(acronym)
if not normalized:
continue
if normalized not in self._alias_to_celex:
self._alias_to_celex[normalized] = entry.celex
self._aliases.append(normalized)
@property
def alias_count(self) -> int:
"""Return the number of normalized aliases indexed by the linker."""
return len(self._aliases)
[docs]
@classmethod
def derive_acronyms(cls, title: str) -> tuple[str, ...]:
"""Extract short acronyms from a title's parenthesised content.
Returns a tuple of strings like ``("DORA",)`` for a title that
contains ``Digital Operational Resilience Regulation (DORA)``. The
caller passes this to ``ActAliasEntry.acronyms`` so the linker can
match them without applying ``min_alias_chars``.
"""
if not title:
return ()
acronyms: set[str] = set()
for match in re.finditer(r"\(([^)]{2,30})\)", title):
content = match.group(1).strip()
if len(content) >= 3 and any(ch.isalpha() for ch in content):
acronyms.add(content)
return tuple(acronyms)
[docs]
@classmethod
def derive_aliases(
cls,
title: str,
*,
eli: Optional[str] = None,
official_journal_reference: Optional[str] = None,
form_number: Optional[str] = None,
) -> tuple[str, ...]:
"""Derive stable alias candidates from act metadata fields."""
aliases: set[str] = set()
raw_title = title.strip()
if raw_title:
aliases.add(raw_title)
for pattern in IDENTIFIER_PATTERNS:
for match in pattern.finditer(raw_title):
aliases.add(match.group(0))
for match in re.finditer(r"\(([^)]{2,30})\)", raw_title):
content = match.group(1).strip()
if len(content) >= 3 and any(ch.isalpha() for ch in content):
aliases.add(content)
if eli:
aliases.add(eli.strip())
if official_journal_reference:
aliases.add(official_journal_reference.strip())
if form_number:
aliases.add(form_number.strip())
return tuple(alias for alias in aliases if alias)
[docs]
def resolve(self, reference: str) -> Optional[LinkResolution]:
"""Resolve a free-text legal reference to a canonical CELEX identifier."""
raw = reference.strip()
if not raw:
return None
explicit = self._resolve_explicit_identifier(raw)
if explicit is not None:
return self._with_act_id(explicit)
normalized = normalize_text(raw)
if not normalized:
return None
exact_celex = self._alias_to_celex.get(normalized)
if exact_celex:
return self._with_act_id(
LinkResolution(
celex=exact_celex,
score=0.99,
method="exact_alias",
matched_text=raw,
)
)
fuzzy = self._resolve_fuzzy_alias(raw, normalized)
return self._with_act_id(fuzzy) if fuzzy is not None else None
[docs]
def resolve_with_article(
self,
reference: str,
article_number: Optional[str] = None,
*,
article_lookup: Optional[ArticleLookup] = None,
) -> Optional[LinkResolution]:
"""Resolve a reference and optionally enrich with a subdivision_id for an article.
If ``article_number`` is provided and the act is known, try to resolve
the corresponding subdivision id via ``article_lookup`` (or the linker's
default one). Falls back to returning the base resolution if the article
can't be resolved.
"""
resolution = self.resolve(reference)
if resolution is None:
return None
if not article_number:
return resolution
normalized_article = article_number.strip()
if not normalized_article:
return resolution
if resolution.act_id is None:
return replace(resolution, article_number=normalized_article)
lookup = article_lookup or self._article_lookup
if lookup is None:
return replace(resolution, article_number=normalized_article)
try:
subdivision_id = lookup(resolution.act_id, normalized_article)
except Exception:
logger.debug(
"Article lookup raised for act_id=%s article=%s",
resolution.act_id,
normalized_article,
exc_info=True,
)
subdivision_id = None
return replace(
resolution,
subdivision_id=subdivision_id,
article_number=normalized_article,
)
def _with_act_id(self, resolution: LinkResolution) -> LinkResolution:
updates: Dict[str, object] = {}
if resolution.act_id is None:
act_id = self._celex_to_act_id.get(resolution.celex)
if act_id is not None:
updates["act_id"] = act_id
if resolution.eli is None:
eli = self._celex_to_eli.get(resolution.celex)
if eli:
updates["eli"] = eli
if not updates:
return resolution
return replace(resolution, **updates)
def _resolve_explicit_identifier(self, text: str) -> Optional[LinkResolution]:
for pattern in IDENTIFIER_PATTERNS:
match = pattern.search(text)
if not match:
continue
raw_identifier = match.group(0)
celex = normalize_celex(raw_identifier)
if celex:
return LinkResolution(
celex=celex,
score=1.0,
method="explicit",
matched_text=raw_identifier,
)
return None
def _resolve_fuzzy_alias(self, raw: str, normalized: str) -> Optional[LinkResolution]:
if not self._aliases:
return None
threshold_percent = self.fuzzy_threshold * 100.0
matches = process.extract(
normalized,
self._aliases,
scorer=fuzz.token_set_ratio,
score_cutoff=threshold_percent,
limit=self.fuzzy_limit,
)
if not matches:
return None
best_match = matches[0]
alias_text = str(best_match[0])
score = float(best_match[1]) / 100.0
if len(matches) > 1:
second_score = float(matches[1][1]) / 100.0
if (score - second_score) < self.fuzzy_min_gap:
logger.debug(
"Ambiguous fuzzy alias rejected: raw=%r best=%r(%.3f) second=%r(%.3f)",
raw,
alias_text,
score,
str(matches[1][0]),
second_score,
)
return None
celex = self._alias_to_celex[alias_text]
return LinkResolution(
celex=celex,
score=score,
method="fuzzy_alias",
matched_text=raw,
)