"""Post-process LLM responses to make regulatory references clickable.
Uses the shared ``LegalEntityLinker`` to detect explicit identifiers (CELEX, EU
refs, national authority refs) and combined ``article N du <act>`` patterns in
the final answer text, then wraps each resolved reference in a markdown link
pointing to the library route (``/library/acts/:act_id[#sub-:subdivision_id]``).
Existing markdown links and citation tags (``[S1]``, ``[G1]``, ``[R1]``,
``[C1]``, ``[CM1]``) are preserved — we never rewrite content inside those
regions.
"""
from __future__ import annotations
import logging
import re
from dataclasses import dataclass
from typing import Callable, List, Optional, Sequence, Set
from lalandre_core.linking import (
IDENTIFIER_PATTERNS,
LegalEntityLinker,
is_generic_target,
)
logger = logging.getLogger(__name__)
# Matches existing markdown links [text](url) — we must not wrap inside them.
_MARKDOWN_LINK_RE = re.compile(r"\[[^\]]*\]\([^)]*\)")
# Matches citation tags like [S1], [G1], [R1], [C1], [CM1] (with optional
# ", L<n>" level suffix). Preserved as-is.
_CITATION_TAG_RE = re.compile(r"\[(?:S|G|R|C|CM)\d+(?:,\s*L\d+)?\]")
# Matches a single article number form: "L.533-4", "4", "R.521-1", "Annex I"
_ARTICLE_NUMBER = r"[LRDA]?[.\-]?\d+(?:[A-Z]|[\w\.\-])*"
# Article(s) preceding an identifier, with up to 80 chars of intervening prose.
# Captures: (1) full "article N" prefix text, (2) first article number, (3) identifier.
_ARTICLE_THEN_IDENTIFIER_RE = re.compile(
r"\b(articles?\s+(" + _ARTICLE_NUMBER + r")"
r"(?:\s+(?:et|à|&|,)\s+" + _ARTICLE_NUMBER + r")*)" # optional additional numbers
r"[^.;:\n\[\]]{0,80}?" # intervening prose (no sentence break, no brackets)
r"\b("
r"3\d{4}[A-Z]\d{4}"
r"|\d{2,4}/\d{2,4}/(?:CE|UE|EU|EC|CEE)"
r"|(?:AMF|EBA|ESMA|EIOPA)-[A-Z0-9][A-Z0-9\-/.]{2,}"
r")\b",
re.IGNORECASE,
)
# Methods that should be trusted enough to render clickable links.
_TRUSTED_METHODS = frozenset({"explicit", "exact_alias", "fuzzy_alias"})
@dataclass(frozen=True)
class _Replacement:
start: int
end: int
replacement: str
[docs]
@dataclass(frozen=True)
class ExternalDetection:
"""A legal-reference span detected by an external detector.
Used to plug a third-party detector (e.g. a locally-hosted Ref2Link
service) next to our native regex+fuzzy engine. The external source
must return spans resolved to an internal ``act_id`` — translation from
their identifier space (ELI URI, CELEX, …) to our DB id stays the
caller's responsibility.
"""
start: int
end: int
act_id: int
subdivision_id: Optional[int] = None
eli: Optional[str] = None
ExternalDetector = Callable[[str], Sequence[ExternalDetection]]
"""Callable that detects legal references in free text.
Given the answer text, returns a sequence of ``ExternalDetection`` already
resolved to our internal ``act_id``. Errors raised by the detector are caught
and the linker silently falls back to its regex-only behaviour.
"""
[docs]
def link_prose(
text: str,
linker: LegalEntityLinker,
*,
min_score: float = 0.85,
resolve_articles: bool = True,
allowed_act_ids: Optional[Set[int]] = None,
external_detector: Optional[ExternalDetector] = None,
) -> str:
"""Return ``text`` with regulatory references wrapped as markdown links.
Each detected reference is linked to its library page (and optionally to
a specific article subdivision). Existing markdown links and citation
tags are preserved unmodified. Fallback resolutions (unvalidated) and
generic targets are never linked.
If ``allowed_act_ids`` is provided (non-None), only references whose
resolved ``act_id`` is in the set are linked. Mentions of acts not in
the RAG source set remain as plain text — this prevents the UI from
promising a "click to see the passage" that leads to an empty panel.
Pass ``None`` (default) to disable the filter.
If ``external_detector`` is provided, its detections are merged with the
regex+fuzzy ones from this module. External detections take precedence
on overlap. Designed to accept a locally-hosted third-party detector
(e.g. Ref2Link) without coupling this module to it.
"""
if not text or not text.strip():
return text
forbidden_regions = _find_forbidden_regions(text)
replacements: List[_Replacement] = []
if external_detector is not None:
replacements.extend(
_collect_external_replacements(
text,
detector=external_detector,
forbidden=forbidden_regions,
allowed_act_ids=allowed_act_ids,
)
)
if resolve_articles:
replacements.extend(
_collect_article_replacements(
text,
linker=linker,
min_score=min_score,
forbidden=forbidden_regions,
claimed_regions=[(r.start, r.end) for r in replacements],
allowed_act_ids=allowed_act_ids,
)
)
replacements.extend(
_collect_identifier_replacements(
text,
linker=linker,
min_score=min_score,
forbidden=forbidden_regions,
claimed_regions=[(r.start, r.end) for r in replacements],
allowed_act_ids=allowed_act_ids,
)
)
return _apply_replacements(text, replacements)
def _collect_external_replacements(
text: str,
*,
detector: ExternalDetector,
forbidden: List[tuple[int, int]],
allowed_act_ids: Optional[Set[int]],
) -> List[_Replacement]:
try:
detections = detector(text)
except Exception:
logger.warning(
"external detector failed; falling back to regex-only detection",
exc_info=True,
)
return []
collected: List[_Replacement] = []
for det in detections or ():
if det.start < 0 or det.end <= det.start or det.end > len(text):
continue
if _overlaps_any(det.start, det.end, forbidden):
continue
if allowed_act_ids is not None and det.act_id not in allowed_act_ids:
continue
span_text = text[det.start : det.end]
if not span_text.strip() or is_generic_target(span_text.strip()):
continue
url = f"/library/acts/{det.act_id}"
if det.subdivision_id is not None:
url += f"#sub-{det.subdivision_id}"
collected.append(
_Replacement(
start=det.start,
end=det.end,
replacement=_render_markdown_link(span_text, url),
)
)
return collected
def _find_forbidden_regions(text: str) -> List[tuple[int, int]]:
regions: List[tuple[int, int]] = []
for match in _MARKDOWN_LINK_RE.finditer(text):
regions.append((match.start(), match.end()))
for match in _CITATION_TAG_RE.finditer(text):
regions.append((match.start(), match.end()))
return regions
def _overlaps_any(start: int, end: int, regions: List[tuple[int, int]]) -> bool:
for region_start, region_end in regions:
if start < region_end and end > region_start:
return True
return False
def _collect_article_replacements(
text: str,
*,
linker: LegalEntityLinker,
min_score: float,
forbidden: List[tuple[int, int]],
claimed_regions: List[tuple[int, int]],
allowed_act_ids: Optional[Set[int]],
) -> List[_Replacement]:
combined_skip = forbidden + claimed_regions
collected: List[_Replacement] = []
for match in _ARTICLE_THEN_IDENTIFIER_RE.finditer(text):
if _overlaps_any(match.start(), match.end(), combined_skip):
continue
article_number = match.group(2).strip()
identifier = match.group(3).strip()
resolution = linker.resolve_with_article(identifier, article_number=article_number)
link = _resolution_to_url(resolution, min_score=min_score, allowed_act_ids=allowed_act_ids)
if link is None:
continue
full_span = match.group(0)
collected.append(
_Replacement(
start=match.start(),
end=match.end(),
replacement=_render_markdown_link(full_span, link),
)
)
return collected
def _collect_identifier_replacements(
text: str,
*,
linker: LegalEntityLinker,
min_score: float,
forbidden: List[tuple[int, int]],
claimed_regions: List[tuple[int, int]],
allowed_act_ids: Optional[Set[int]],
) -> List[_Replacement]:
collected: List[_Replacement] = []
combined_skip = forbidden + claimed_regions
for pattern in IDENTIFIER_PATTERNS:
for match in pattern.finditer(text):
if _overlaps_any(match.start(), match.end(), combined_skip):
continue
identifier = match.group(0)
if is_generic_target(identifier):
continue
resolution = linker.resolve(identifier)
link = _resolution_to_url(resolution, min_score=min_score, allowed_act_ids=allowed_act_ids)
if link is None:
continue
collected.append(
_Replacement(
start=match.start(),
end=match.end(),
replacement=_render_markdown_link(identifier, link),
)
)
return collected
def _resolution_to_url(
resolution,
*,
min_score: float,
allowed_act_ids: Optional[Set[int]],
) -> Optional[str]:
if resolution is None:
return None
if resolution.method not in _TRUSTED_METHODS:
return None
if resolution.score < min_score:
return None
if resolution.act_id is None:
return None
if allowed_act_ids is not None and resolution.act_id not in allowed_act_ids:
return None
url = f"/library/acts/{resolution.act_id}"
if resolution.subdivision_id is not None:
url += f"#sub-{resolution.subdivision_id}"
return url
def _render_markdown_link(text: str, url: str) -> str:
# Protect literal brackets in anchor text (rare in prose, but safe).
safe_text = text.replace("[", r"\[").replace("]", r"\]")
return f"[{safe_text}]({url})"
def _apply_replacements(text: str, replacements: List[_Replacement]) -> str:
if not replacements:
return text
# Sort by start asc, length desc — longer spans take precedence on conflicts.
sorted_replacements = sorted(
replacements,
key=lambda r: (r.start, -(r.end - r.start)),
)
deduped: List[_Replacement] = []
last_end = -1
for replacement in sorted_replacements:
if replacement.start < last_end:
continue # overlaps an earlier kept replacement
deduped.append(replacement)
last_end = replacement.end
# Apply right-to-left so earlier offsets remain valid.
result = text
for replacement in reversed(deduped):
result = result[: replacement.start] + replacement.replacement + result[replacement.end :]
return result
__all__ = ["link_prose"]