"""
Relation intent detection — regex keywords + embedding similarity fallback.
Determines whether a user question has relational intent (i.e. asks about
links, amendments, chains between legal acts) and if so, which direction
(outgoing, incoming, or both) and which relation types are relevant.
"""
import logging
import re
from dataclasses import dataclass, field
from typing import Any, List, Literal, Optional
import numpy as np
from lalandre_core.utils import normalize_celex
logger = logging.getLogger(__name__)
# ── Direction detection ───────────────────────────────────────────────────────
_ACTIVE_VOICE_RE = re.compile(
r"\b(modifi\w*|abrog\w*|remplac\w*|transpos\w*|cit\w+|compl[èeé]t\w*|corrig\w*|supprim\w*"
r"|amends?|repeals?|replaces?|implements?|cites?|supplements?|corrects?)\b",
re.IGNORECASE,
)
_PASSIVE_VOICE_RE = re.compile(
r"\b(modifi[ée]\w*\s+par|abrog[ée]\w*\s+par|remplac[ée]\w*\s+par|transpos[ée]\w*\s+par"
r"|cit[ée]\w*\s+par|compl[ée]t[ée]\w*\s+par|corrig[ée]\w*\s+par|supprim[ée]\w*\s+par"
r"|amended\s+by|repealed\s+by|replaced\s+by|implemented\s+by"
r"|cited\s+by|supplemented\s+by|corrected\s+by"
r"|been\s+amended|been\s+repealed|been\s+replaced|been\s+implemented"
r"|been\s+cited|been\s+supplemented|been\s+corrected"
r"|was\s+amended|was\s+repealed|was\s+replaced|was\s+implemented"
r"|est\s+modifi|est\s+abrog|est\s+remplac|est\s+transpos"
r"|a\s+[ée]t[ée]\s+modifi|a\s+[ée]t[ée]\s+abrog)",
re.IGNORECASE,
)
# ── Relation type detection ───────────────────────────────────────────────────
_DIRECT_RELATION_RE = re.compile(
r"\b(direct|directe|imm[ée]diat|imm[ée]diate)\b",
re.IGNORECASE,
)
_PATH_RELATION_RE = re.compile(
r"\b(liens?|relations?|reli[ée]\w*|entre|between|chemin|path|connect\w*"
r"|li[ée]\w*\s+[àa]|rapports?|interactions?|articulations?|impact\w*"
r"|influenc\w*|d[ée]pend\w*|d[ée]coul\w*|d[ée]riv\w*|historique\s+l[ée]gislatif"
r"|chain|cha[iî]ne|filiation|parent[ée]|ant[ée]c[ée]dent\w*|successeur\w*"
r"|en\s+amont|en\s+aval|vis-[àa]-vis)\b",
re.IGNORECASE,
)
_RELATION_KEYWORDS: tuple[tuple[re.Pattern[str], str], ...] = (
(re.compile(r"\bamend\w*|modifi\w+|chang[ée]\w*|mis\w*\s+[àa]\s+jour|r[ée]vis\w*", re.IGNORECASE), "AMENDS"),
(re.compile(r"\babrog\w*|repeal\w*|supprim\w*|annul\w*", re.IGNORECASE), "REPEALS"),
(re.compile(r"\bremplac\w*|replace\w*|substitu\w*", re.IGNORECASE), "REPLACES"),
(
re.compile(r"\bimpl[ée]ment\w*|transpos\w*|tradui\w*\s+en\s+droit|mise?\s+en\s+[œo]euvre", re.IGNORECASE),
"IMPLEMENTS",
),
(re.compile(r"\bcit[ée]\w*|citation|r[ée]f[ée]ren\w*|mentionne\w*|vis[ée]\w*|invoqu\w*", re.IGNORECASE), "CITES"),
(re.compile(r"\bd[ée]rog\w*|exception\w*|exempti\w*", re.IGNORECASE), "DEROGATES"),
(re.compile(r"\bsuppl[ée]ment\w*|compl[ée]t\w*|compl[ée]mentaire\w*|ajout\w*", re.IGNORECASE), "SUPPLEMENTS"),
(re.compile(r"\bcorrect\w*|rectifi\w*|errat\w*", re.IGNORECASE), "CORRECTS"),
)
# ── CELEX extraction ─────────────────────────────────────────────────────────
_CELEX_PATTERNS: tuple[re.Pattern[str], ...] = (
re.compile(r"\b3\d{4}[A-Z]\d{4}\b"),
re.compile(r"\bAMF-[A-Z0-9-]+\b"),
re.compile(r"(?:Directive|R[èe]glement|Regulation|D[ée]cision|Decision)\s+\d{4}/\d+/(?:CE|UE|EU)", re.IGNORECASE),
re.compile(r"\(\s*(?:UE|CE|EU)\s*\)\s*(?:n°|no\.?)?\s*\d{1,4}/\d{1,4}", re.IGNORECASE),
re.compile(r"\b\d{4}/\d+/(?:CE|UE|EU)\b", re.IGNORECASE),
)
# ── Embedding similarity ─────────────────────────────────────────────────────
_RELATION_REFERENCE_PHRASES: tuple[str, ...] = (
"Quels textes sont liés à cette directive ?",
"Cette loi a-t-elle été modifiée ou abrogée ?",
"Quel est l'historique législatif de ce règlement ?",
"Quels actes dépendent de ce texte ?",
"Quels textes transposent cette directive en droit national ?",
"Montrer les relations entre ces deux actes juridiques",
"Quels textes citent ou référencent ce règlement ?",
"What acts amend or repeal this regulation?",
"Show the legislative chain for this directive",
)
_RELATION_SIMILARITY_THRESHOLD = 0.45
_relation_ref_matrix: Optional[np.ndarray] = None
# ── Public types ──────────────────────────────────────────────────────────────
Direction = Literal["outgoing", "incoming", "both"]
[docs]
@dataclass(frozen=True)
class RelationIntent:
"""Result of analyzing a question for relational intent."""
has_intent: bool
direction: Direction = "both"
relation_types: List[str] = field(default_factory=list)
celex_candidates: List[str] = field(default_factory=list)
wants_direct: bool = False
wants_path: bool = False
_NO_INTENT = RelationIntent(has_intent=False)
# ── Internal helpers ──────────────────────────────────────────────────────────
def _extract_celex_candidates(question: str) -> List[str]:
candidates: List[str] = []
seen: set[str] = set()
for pattern in _CELEX_PATTERNS:
for match in pattern.finditer(question):
normalized = normalize_celex(match.group(0))
if not normalized or normalized in seen:
continue
seen.add(normalized)
candidates.append(normalized)
return candidates
def _detect_relation_types(question: str) -> List[str]:
relation_types: List[str] = []
seen: set[str] = set()
for pattern, relation_type in _RELATION_KEYWORDS:
if pattern.search(question) and relation_type not in seen:
seen.add(relation_type)
relation_types.append(relation_type)
return relation_types
def _detect_direction(question: str) -> Direction:
has_passive = _PASSIVE_VOICE_RE.search(question)
has_active = _ACTIVE_VOICE_RE.search(question)
# Passive constructions ("modifié par", "est abrogé") are more specific
# than active stems, so they take priority when both match.
if has_passive:
return "incoming"
if has_active:
return "outgoing"
return "both"
def _has_keyword_intent(question: str) -> bool:
return bool(
_DIRECT_RELATION_RE.search(question) or _PATH_RELATION_RE.search(question) or _detect_relation_types(question)
)
def _has_embedding_intent(question: str, embedding_service: Any) -> bool:
global _relation_ref_matrix # noqa: PLW0603
try:
if _relation_ref_matrix is None:
raw = embedding_service.embed_batch(list(_RELATION_REFERENCE_PHRASES))
matrix = np.array(raw, dtype=np.float32)
norms = np.linalg.norm(matrix, axis=1, keepdims=True)
_relation_ref_matrix = matrix / np.where(norms == 0, 1, norms)
logger.info("Relation intent: cached %d reference embeddings", len(raw))
q_vec = np.array(embedding_service.embed_text(question), dtype=np.float32)
q_norm = np.linalg.norm(q_vec)
if q_norm == 0:
return False
q_vec /= q_norm
best = float(np.max(_relation_ref_matrix @ q_vec))
logger.debug(
"Relation intent embedding score: %.3f (threshold=%.2f)",
best,
_RELATION_SIMILARITY_THRESHOLD,
)
return best >= _RELATION_SIMILARITY_THRESHOLD
except Exception:
logger.warning("Relation intent embedding check failed, falling back to regex-only", exc_info=True)
return False
# ── Public API ────────────────────────────────────────────────────────────────
[docs]
def analyze_relation_intent(
question: str,
*,
embedding_service: Any = None,
) -> RelationIntent:
"""Analyze a question and return structured relational intent."""
celex_candidates = _extract_celex_candidates(question)
relation_types = _detect_relation_types(question)
direction = _detect_direction(question)
wants_direct = bool(_DIRECT_RELATION_RE.search(question))
wants_path = bool(_PATH_RELATION_RE.search(question))
has_intent = _has_keyword_intent(question)
if not has_intent and embedding_service is not None:
has_intent = _has_embedding_intent(question, embedding_service)
if not has_intent:
return _NO_INTENT
return RelationIntent(
has_intent=True,
direction=direction,
relation_types=relation_types,
celex_candidates=celex_candidates,
wants_direct=wants_direct,
wants_path=wants_path,
)