Source code for lalandre_rag.prose_linker

"""Post-process LLM responses to make regulatory references clickable.

Uses the shared ``LegalEntityLinker`` to detect explicit identifiers (CELEX, EU
refs, national authority refs) and combined ``article N du <act>`` patterns in
the final answer text, then wraps each resolved reference in a markdown link
pointing to the library route (``/library/acts/:act_id[#sub-:subdivision_id]``).

Existing markdown links and citation tags (``[S1]``, ``[G1]``, ``[R1]``,
``[C1]``, ``[CM1]``) are preserved — we never rewrite content inside those
regions.
"""

from __future__ import annotations

import logging
import re
from dataclasses import dataclass
from typing import Callable, List, Optional, Sequence, Set

from lalandre_core.linking import (
    IDENTIFIER_PATTERNS,
    LegalEntityLinker,
    is_generic_target,
)

logger = logging.getLogger(__name__)


# Matches existing markdown links [text](url) — we must not wrap inside them.
_MARKDOWN_LINK_RE = re.compile(r"\[[^\]]*\]\([^)]*\)")

# Matches citation tags like [S1], [G1], [R1], [C1], [CM1] (with optional
# ", L<n>" level suffix). Preserved as-is.
_CITATION_TAG_RE = re.compile(r"\[(?:S|G|R|C|CM)\d+(?:,\s*L\d+)?\]")

# Matches a single article number form: "L.533-4", "4", "R.521-1", "Annex I"
_ARTICLE_NUMBER = r"[LRDA]?[.\-]?\d+(?:[A-Z]|[\w\.\-])*"

# Article(s) preceding an identifier, with up to 80 chars of intervening prose.
# Captures: (1) full "article N" prefix text, (2) first article number, (3) identifier.
_ARTICLE_THEN_IDENTIFIER_RE = re.compile(
    r"\b(articles?\s+(" + _ARTICLE_NUMBER + r")"
    r"(?:\s+(?:et|à|&|,)\s+" + _ARTICLE_NUMBER + r")*)"  # optional additional numbers
    r"[^.;:\n\[\]]{0,80}?"  # intervening prose (no sentence break, no brackets)
    r"\b("
    r"3\d{4}[A-Z]\d{4}"
    r"|\d{2,4}/\d{2,4}/(?:CE|UE|EU|EC|CEE)"
    r"|(?:AMF|EBA|ESMA|EIOPA)-[A-Z0-9][A-Z0-9\-/.]{2,}"
    r")\b",
    re.IGNORECASE,
)

# Methods that should be trusted enough to render clickable links.
_TRUSTED_METHODS = frozenset({"explicit", "exact_alias", "fuzzy_alias"})


@dataclass(frozen=True)
class _Replacement:
    start: int
    end: int
    replacement: str


[docs] @dataclass(frozen=True) class ExternalDetection: """A legal-reference span detected by an external detector. Used to plug a third-party detector (e.g. a locally-hosted Ref2Link service) next to our native regex+fuzzy engine. The external source must return spans resolved to an internal ``act_id`` — translation from their identifier space (ELI URI, CELEX, …) to our DB id stays the caller's responsibility. """ start: int end: int act_id: int subdivision_id: Optional[int] = None eli: Optional[str] = None
ExternalDetector = Callable[[str], Sequence[ExternalDetection]] """Callable that detects legal references in free text. Given the answer text, returns a sequence of ``ExternalDetection`` already resolved to our internal ``act_id``. Errors raised by the detector are caught and the linker silently falls back to its regex-only behaviour. """ def _collect_external_replacements( text: str, *, detector: ExternalDetector, forbidden: List[tuple[int, int]], allowed_act_ids: Optional[Set[int]], ) -> List[_Replacement]: try: detections = detector(text) except Exception: logger.warning( "external detector failed; falling back to regex-only detection", exc_info=True, ) return [] collected: List[_Replacement] = [] for det in detections or (): if det.start < 0 or det.end <= det.start or det.end > len(text): continue if _overlaps_any(det.start, det.end, forbidden): continue if allowed_act_ids is not None and det.act_id not in allowed_act_ids: continue span_text = text[det.start : det.end] if not span_text.strip() or is_generic_target(span_text.strip()): continue url = f"/library/acts/{det.act_id}" if det.subdivision_id is not None: url += f"#sub-{det.subdivision_id}" collected.append( _Replacement( start=det.start, end=det.end, replacement=_render_markdown_link(span_text, url), ) ) return collected def _find_forbidden_regions(text: str) -> List[tuple[int, int]]: regions: List[tuple[int, int]] = [] for match in _MARKDOWN_LINK_RE.finditer(text): regions.append((match.start(), match.end())) for match in _CITATION_TAG_RE.finditer(text): regions.append((match.start(), match.end())) return regions def _overlaps_any(start: int, end: int, regions: List[tuple[int, int]]) -> bool: for region_start, region_end in regions: if start < region_end and end > region_start: return True return False def _collect_article_replacements( text: str, *, linker: LegalEntityLinker, min_score: float, forbidden: List[tuple[int, int]], claimed_regions: List[tuple[int, int]], allowed_act_ids: Optional[Set[int]], ) -> List[_Replacement]: combined_skip = forbidden + claimed_regions collected: List[_Replacement] = [] for match in _ARTICLE_THEN_IDENTIFIER_RE.finditer(text): if _overlaps_any(match.start(), match.end(), combined_skip): continue article_number = match.group(2).strip() identifier = match.group(3).strip() resolution = linker.resolve_with_article(identifier, article_number=article_number) link = _resolution_to_url(resolution, min_score=min_score, allowed_act_ids=allowed_act_ids) if link is None: continue full_span = match.group(0) collected.append( _Replacement( start=match.start(), end=match.end(), replacement=_render_markdown_link(full_span, link), ) ) return collected def _collect_identifier_replacements( text: str, *, linker: LegalEntityLinker, min_score: float, forbidden: List[tuple[int, int]], claimed_regions: List[tuple[int, int]], allowed_act_ids: Optional[Set[int]], ) -> List[_Replacement]: collected: List[_Replacement] = [] combined_skip = forbidden + claimed_regions for pattern in IDENTIFIER_PATTERNS: for match in pattern.finditer(text): if _overlaps_any(match.start(), match.end(), combined_skip): continue identifier = match.group(0) if is_generic_target(identifier): continue resolution = linker.resolve(identifier) link = _resolution_to_url(resolution, min_score=min_score, allowed_act_ids=allowed_act_ids) if link is None: continue collected.append( _Replacement( start=match.start(), end=match.end(), replacement=_render_markdown_link(identifier, link), ) ) return collected def _resolution_to_url( resolution, *, min_score: float, allowed_act_ids: Optional[Set[int]], ) -> Optional[str]: if resolution is None: return None if resolution.method not in _TRUSTED_METHODS: return None if resolution.score < min_score: return None if resolution.act_id is None: return None if allowed_act_ids is not None and resolution.act_id not in allowed_act_ids: return None url = f"/library/acts/{resolution.act_id}" if resolution.subdivision_id is not None: url += f"#sub-{resolution.subdivision_id}" return url def _render_markdown_link(text: str, url: str) -> str: # Protect literal brackets in anchor text (rare in prose, but safe). safe_text = text.replace("[", r"\[").replace("]", r"\]") return f"[{safe_text}]({url})" def _apply_replacements(text: str, replacements: List[_Replacement]) -> str: if not replacements: return text # Sort by start asc, length desc — longer spans take precedence on conflicts. sorted_replacements = sorted( replacements, key=lambda r: (r.start, -(r.end - r.start)), ) deduped: List[_Replacement] = [] last_end = -1 for replacement in sorted_replacements: if replacement.start < last_end: continue # overlaps an earlier kept replacement deduped.append(replacement) last_end = replacement.end # Apply right-to-left so earlier offsets remain valid. result = text for replacement in reversed(deduped): result = result[: replacement.start] + replacement.replacement + result[replacement.end :] return result __all__ = ["link_prose"]