"""
Relation graph service.
Orchestrates deterministic relation extraction, persistence, and Neo4j sync.
"""
import logging
import threading
from datetime import date, datetime
from typing import Any, Dict, List, Optional, cast
from lalandre_core.config import get_config
from lalandre_core.linking import ActAliasEntry, LegalEntityLinker
from lalandre_core.utils import normalize_celex
from lalandre_db_neo4j import ActNode, ActRelationship, Neo4jRepository
from lalandre_db_postgres import ActRelationsSQL, ActsSQL, PostgresRepository, SubdivisionsSQL
from lalandre_extraction.relation_extractor import ExtractedRelation, RegulatoryRelationExtractor
from sqlalchemy.orm import Session
from sqlalchemy.sql import func
logger = logging.getLogger(__name__)
EXTRACTION_STATUS_EXTRACTING = "extracting"
EXTRACTION_STATUS_EXTRACTED = "extracted"
EXTRACTION_STATUS_ERROR = "error"
[docs]
class RelationGraphService:
"""
Service for extracting and synchronizing legal relationships
Responsibilities:
- Extract relations from act text using RelationExtractor
- Store relations in PostgreSQL
- Sync relations to Neo4j graph
- Orchestrate the full workflow
Does NOT:
- Parse text (uses RelationExtractor)
- Access databases directly (uses repositories)
"""
def __init__(
self,
pg_repo: PostgresRepository,
neo4j_repo: Optional[Neo4jRepository],
min_confidence: Optional[float],
max_chunk_size: Optional[int],
) -> None:
"""
Initialize relation graph service
Args:
pg_repo: PostgreSQL repository
neo4j_repo: Neo4j repository (optional, for direct sync)
min_confidence: Minimum confidence threshold for relations.
If None, uses config.gateway.job_extract_min_confidence.
max_chunk_size: Maximum text chunk size for extraction.
If None, uses config.chunking.extraction_max_chunk_chars.
"""
config = get_config()
resolved_min_confidence = (
min_confidence if min_confidence is not None else config.gateway.job_extract_min_confidence
)
if resolved_min_confidence is None:
raise ValueError("min_confidence must be provided or configured via gateway.job_extract_min_confidence")
resolved_max_chunk_size = (
max_chunk_size if max_chunk_size is not None else config.chunking.extraction_max_chunk_chars
)
self.pg_repo = pg_repo
self.neo4j_repo = neo4j_repo
self.min_confidence = float(resolved_min_confidence)
self.extractor: RegulatoryRelationExtractor
self._entity_linker: Optional[LegalEntityLinker] = None
self._entity_linker_lock = threading.Lock()
self._entity_linker_act_count: int = 0
self._entity_linker_max_updated_at: Optional[datetime] = None
self.max_chunk_size = int(resolved_max_chunk_size)
self.extractor = RegulatoryRelationExtractor(
max_chunk_size=self.max_chunk_size,
validation_enabled=True,
)
@staticmethod
def _set_extraction_status(
session: Session,
*,
act_id: int,
status: str,
mark_extracted_at: bool = False,
) -> None:
query = session.query(ActsSQL).filter(ActsSQL.id == act_id)
if mark_extracted_at:
query.update(
{
ActsSQL.extraction_status: status,
ActsSQL.extracted_at: func.now(),
}
)
return
query.update({ActsSQL.extraction_status: status})
@staticmethod
def _enum_to_str(value: Any) -> str:
"""Convert enum-like values to string."""
return str(getattr(value, "value", value))
@staticmethod
def _to_datetime(value: date | datetime | None) -> datetime | None:
"""Normalize SQL date/datetime values for pydantic models."""
if value is None:
return None
if isinstance(value, datetime):
return value
return datetime.combine(value, datetime.min.time())
def _ensure_entity_linker(self) -> None:
with self.pg_repo.get_session() as session:
current_count, current_updated_at = session.query(
func.count(ActsSQL.id),
func.max(ActsSQL.updated_at),
).one()
current_count = int(current_count or 0)
with self._entity_linker_lock:
# Rebuild if count/last-update changed or linker not yet created.
if (
self._entity_linker is not None
and current_count == self._entity_linker_act_count
and current_updated_at == self._entity_linker_max_updated_at
):
return
if self._entity_linker is not None:
logger.info(
"Acts changed (count %s → %s, updated_at %s → %s), rebuilding entity linker",
self._entity_linker_act_count,
current_count,
self._entity_linker_max_updated_at,
current_updated_at,
)
config = get_config()
with self.pg_repo.get_session() as session:
acts = cast(List[ActsSQL], session.query(ActsSQL).all())
entries: List[ActAliasEntry] = []
for act in acts:
aliases = LegalEntityLinker.derive_aliases(
title=act.title,
eli=act.eli,
official_journal_reference=act.official_journal_reference,
form_number=act.form_number,
)
entries.append(
ActAliasEntry(
celex=act.celex,
title=act.title,
aliases=aliases,
)
)
new_linker = LegalEntityLinker(
entries,
fuzzy_threshold=config.extraction.entity_linker_fuzzy_threshold,
fuzzy_min_gap=config.extraction.entity_linker_fuzzy_min_gap,
fuzzy_limit=config.extraction.entity_linker_fuzzy_limit,
min_alias_chars=config.extraction.entity_linker_min_alias_chars,
)
# Atomic swap: extractor never sees a None linker mid-extraction.
self._entity_linker = new_linker
self._entity_linker_act_count = len(entries)
self._entity_linker_max_updated_at = current_updated_at
self.extractor.set_entity_linker(new_linker)
logger.info(
"Entity linker initialized: acts=%s aliases=%s",
len(entries),
new_linker.alias_count,
)
[docs]
def extract_and_store_for_act(
self,
act_id: Optional[int] = None,
celex: Optional[str] = None,
sync_to_neo4j: bool = True,
force: bool = False,
) -> Dict[str, Any]:
"""
Extract relations from a single act and store them
Args:
act_id: Act ID (PostgreSQL)
celex: CELEX identifier (alternative to act_id)
sync_to_neo4j: Whether to sync to Neo4j after storage
force: Re-extract even if already extracted
Returns:
Dictionary with extraction results
"""
# Get act from PostgreSQL
act_sql: ActsSQL
full_text: str
with self.pg_repo.get_session() as session:
if celex is not None:
normalized_celex = normalize_celex(celex)
act_sql_row = cast(
Optional[ActsSQL], session.query(ActsSQL).filter(ActsSQL.celex == normalized_celex).first()
)
elif act_id is not None:
act_sql_row = cast(Optional[ActsSQL], session.query(ActsSQL).filter(ActsSQL.id == act_id).first())
else:
raise ValueError("Either act_id or celex must be provided")
if act_sql_row is None:
raise ValueError(f"Act not found: {normalize_celex(celex) if celex is not None else act_id}")
act_sql = act_sql_row
if sync_to_neo4j:
self._ensure_act_node(act_sql)
# Check if already extracted (skip to avoid wasting API calls)
extraction_status = act_sql.extraction_status
if not force and extraction_status == EXTRACTION_STATUS_EXTRACTED:
logger.info(f"Act {act_sql.celex} already extracted, skipping")
# Return existing relations count
existing_count = (
session.query(ActRelationsSQL).filter(ActRelationsSQL.source_act_id == act_sql.id).count()
)
return {
"celex": act_sql.celex,
"relations_found": existing_count,
"relations_stored": existing_count,
"status": "already_extracted",
}
# When forcing, delete existing relations first
if force and extraction_status == EXTRACTION_STATUS_EXTRACTED:
logger.info(f"Act {act_sql.celex} force re-extraction, deleting existing relations")
session.query(ActRelationsSQL).filter(ActRelationsSQL.source_act_id == act_sql.id).delete(
synchronize_session=False
)
session.commit()
# Mark as extracting
self._set_extraction_status(
session,
act_id=act_sql.id,
status=EXTRACTION_STATUS_EXTRACTING,
)
session.commit()
# Get full text from subdivisions
full_text = self._get_act_full_text(act_sql.id)
if not full_text:
logger.warning(f"No text content for act {act_sql.celex}")
# Mark as error
with self.pg_repo.get_session() as err_session:
self._set_extraction_status(
err_session,
act_id=act_sql.id,
status=EXTRACTION_STATUS_ERROR,
)
err_session.commit()
return {"celex": act_sql.celex, "relations_found": 0, "relations_stored": 0, "status": "no_content"}
# Extract relations (single deterministic pipeline)
logger.info("[EXTRACT] Processing act %s (%s chars)", act_sql.celex, len(full_text))
self._ensure_entity_linker()
extractor = self.extractor
try:
relations = extractor.extract_relations(
text=full_text, source_celex=act_sql.celex, min_confidence=self.min_confidence
)
logger.info(
"[EXTRACT] Relations raw count=%d preview=%s",
len(relations),
[
{
"type": rel.relation_type.value,
"target": rel.target_celex,
"conf": round(rel.confidence, 3),
"evidence": rel.text_evidence[:120],
"method": rel.resolution_method,
"score": rel.resolution_score,
}
for rel in relations[:5]
],
)
except Exception as e:
logger.error(f"Failed to extract relations for {act_sql.celex}: {e}")
# Mark as error
with self.pg_repo.get_session() as err_session:
self._set_extraction_status(
err_session,
act_id=act_sql.id,
status=EXTRACTION_STATUS_ERROR,
)
err_session.commit()
return {"celex": act_sql.celex, "relations_found": 0, "relations_stored": 0, "status": "extraction_failed"}
if not relations:
logger.info(f"No relations found for act {act_sql.celex}")
# Mark as extracted even if no relations found
with self.pg_repo.get_session() as no_rel_session:
self._set_extraction_status(
no_rel_session,
act_id=act_sql.id,
status=EXTRACTION_STATUS_EXTRACTED,
mark_extracted_at=True,
)
no_rel_session.commit()
return {"celex": act_sql.celex, "relations_found": 0, "relations_stored": 0, "status": "no_relations"}
# Store in PostgreSQL
store_result = self._store_relations(act_sql, relations)
stored_count = store_result["stored"]
logger.info(
"[STORED] %s relations stored for %s (%s internal, %s external, resolved_existing=%s) sample=%s",
stored_count,
act_sql.celex,
store_result["internal"],
store_result["external"],
store_result.get("resolved_existing", 0),
[
{
"type": rel.relation_type.value,
"target": rel.target_celex,
"conf": round(rel.confidence, 3),
"method": rel.resolution_method,
"score": rel.resolution_score,
}
for rel in relations[:5]
],
)
# Sync to Neo4j if requested — always run to resolve relations
# from previously extracted acts that now target this act.
synced_count = 0
if sync_to_neo4j:
synced_count = self._sync_to_neo4j()
return {
"celex": act_sql.celex,
"relations_found": len(relations),
"relations_stored": stored_count,
"relations_internal": store_result["internal"],
"relations_external": store_result["external"],
"relations_resolved_existing": store_result.get("resolved_existing", 0),
"relations_synced": synced_count,
"status": "success",
}
def _get_act_full_text(self, act_id: int) -> str:
"""Get concatenated text from all subdivisions"""
with self.pg_repo.get_session() as session:
subdivisions = cast(
List[SubdivisionsSQL],
session.query(SubdivisionsSQL)
.filter(SubdivisionsSQL.act_id == act_id)
.order_by(SubdivisionsSQL.sequence_order)
.all(),
)
return "\n\n".join([f"{subdiv.title or ''}\n{subdiv.content or ''}" for subdiv in subdivisions])
def _store_relations(self, act_sql: ActsSQL, relations: List[ExtractedRelation]) -> Dict[str, int]:
"""
Store ALL relations in PostgreSQL (even if target act doesn't exist)
Philosophy:
- PostgreSQL: Store ALL relations (external references allowed)
- Neo4j: Only sync relations where both nodes exist (handled separately)
"""
stored_count = 0
skipped_duplicate = 0
skipped_self_ref = 0
skipped_invalid = 0
external_refs_count = 0
resolved_existing_count = 0
with self.pg_repo.get_session() as session:
for rel in relations:
if not self._is_relation_storable(rel):
skipped_invalid += 1
continue
# Normalize target CELEX to match database format
normalized_celex = normalize_celex(rel.target_celex)
# Try to find target act (but don't fail if not found)
target_act = cast(
Optional[ActsSQL], session.query(ActsSQL).filter(ActsSQL.celex == normalized_celex).first()
)
# Determine if this is an external reference
is_external = target_act is None
# Skip self-referential relations (act pointing to itself)
if target_act is not None and act_sql.id == target_act.id:
logger.debug(
f"Skipping self-referential relation: {act_sql.celex} {rel.relation_type.value} itself"
)
skipped_self_ref += 1
continue
# Check if already exists (using target_celex instead of target_act_id)
existing = (
session.query(ActRelationsSQL)
.filter(
ActRelationsSQL.source_act_id == act_sql.id,
ActRelationsSQL.target_celex == normalized_celex,
ActRelationsSQL.relation_type == rel.relation_type,
)
.first()
)
if existing:
existing_rel = cast(ActRelationsSQL, existing)
if target_act is not None and (
existing_rel.target_act_id is None or not bool(existing_rel.is_resolved)
):
existing_row = cast(Any, existing_rel)
existing_row.target_act_id = target_act.id
existing_row.is_resolved = True
existing_row.synced_to_neo4j_at = None
resolved_existing_count += 1
logger.info(
f"Resolved existing relation: "
f"{rel.relation_type.value} → {normalized_celex} "
f"(target_act_id={target_act.id})"
)
continue
logger.debug(f"Relation already exists: {rel.relation_type.value} → {normalized_celex}")
skipped_duplicate += 1
continue
# Create new relation - ALWAYS store, even if target doesn't exist
# Preserve both rationale (description) and text evidence when available.
act_relation = ActRelationsSQL(
source_act_id=act_sql.id,
target_act_id=target_act.id if target_act is not None else None, # NULL if external
target_celex=normalized_celex, # Always store CELEX
relation_type=rel.relation_type,
description=rel.relation_description or rel.text_evidence or None,
evidence=rel.text_evidence or None,
rationale=rel.relation_description or None,
resolution_method=rel.resolution_method,
resolution_score=rel.resolution_score,
target_reference=rel.raw_target_reference,
confidence=rel.confidence,
source=rel.extraction_method,
validated=False, # Not yet manually validated
is_resolved=not is_external, # True if target exists, False otherwise
)
session.add(act_relation)
stored_count += 1
if is_external:
external_refs_count += 1
logger.debug(
f"Stored external ref: {rel.relation_type.value} → {normalized_celex} "
f"(confidence: {rel.confidence:.2f})"
)
else:
logger.info(
f"Stored: {rel.relation_type.value} → {normalized_celex} (confidence: {rel.confidence:.2f})"
)
session.commit()
# Mark act as extracted
self._set_extraction_status(
session,
act_id=act_sql.id,
status=EXTRACTION_STATUS_EXTRACTED,
mark_extracted_at=True,
)
session.commit()
# Report statistics
total_skipped = skipped_duplicate + skipped_self_ref + skipped_invalid
internal_count = stored_count - external_refs_count + resolved_existing_count
logger.info(
f"Stored {stored_count} relations for {act_sql.celex}: "
f"{internal_count} internal, {external_refs_count} external, "
f"{resolved_existing_count} resolved-existing"
)
if total_skipped > 0:
skip_reasons: List[str] = []
if skipped_duplicate > 0:
skip_reasons.append(f"{skipped_duplicate} duplicates")
if skipped_self_ref > 0:
skip_reasons.append(f"{skipped_self_ref} self-referential")
if skipped_invalid > 0:
skip_reasons.append(f"{skipped_invalid} invalid")
logger.info(f"Skipped {total_skipped} relations: {', '.join(skip_reasons)}")
return {
"stored": stored_count,
"internal": internal_count,
"external": external_refs_count,
"resolved_existing": resolved_existing_count,
"skipped": total_skipped,
}
@staticmethod
def _is_relation_storable(rel: ExtractedRelation) -> bool:
target = normalize_celex(rel.target_celex).strip()
if not target:
return False
if rel.confidence < 0.0 or rel.confidence > 1.0:
return False
return True
def _sync_to_neo4j(self) -> int:
"""Sync all relations from PostgreSQL to Neo4j"""
if not self.neo4j_repo:
logger.warning("No Neo4j repository configured, skipping sync")
return 0
# Direct sync - sync all internal (resolved) relations
logger.info("Running Neo4j sync (resolve pending + sync unsynced)")
synced_count = 0
with self.pg_repo.get_session() as session:
unresolved_relations = cast(
List[ActRelationsSQL],
session.query(ActRelationsSQL)
.filter(
ActRelationsSQL.is_resolved.is_(False),
ActRelationsSQL.target_act_id.is_(None),
ActRelationsSQL.target_celex.isnot(None),
)
.all(),
)
resolved_now = 0
for relation in unresolved_relations:
target_celex = relation.target_celex
if not target_celex:
continue
target_act = cast(
Optional[ActsSQL], session.query(ActsSQL).filter(ActsSQL.celex == target_celex).first()
)
if target_act is None:
continue
session.query(ActRelationsSQL).filter(ActRelationsSQL.id == relation.id).update(
{"target_act_id": target_act.id, "is_resolved": True, "synced_to_neo4j_at": None}
)
resolved_now += 1
if resolved_now > 0:
session.commit()
logger.info(f"Resolved {resolved_now} previously external relations before Neo4j sync")
# Get all internal relations (where both source and target acts exist)
relations = cast(
List[ActRelationsSQL],
session.query(ActRelationsSQL)
.filter(
ActRelationsSQL.is_resolved.is_(True),
ActRelationsSQL.target_act_id.isnot(None),
ActRelationsSQL.synced_to_neo4j_at.is_(None),
)
.all(),
)
logger.info(f"Found {len(relations)} internal relations to sync to Neo4j")
if relations:
act_ids = {rel.source_act_id for rel in relations}
act_ids.update(rel.target_act_id for rel in relations if rel.target_act_id is not None)
acts = cast(List[ActsSQL], session.query(ActsSQL).filter(ActsSQL.id.in_(act_ids)).all())
for act in acts:
self._ensure_act_node(act)
for rel in relations:
try:
target_act_id = rel.target_act_id
if target_act_id is None:
continue
# Create ActRelationship model for Neo4j
relation_type = self._enum_to_str(rel.relation_type)
act_rel = ActRelationship(
source_act_id=rel.source_act_id,
target_act_id=target_act_id,
relation_type=relation_type,
effect_date=self._to_datetime(rel.effect_date),
description=rel.description,
source_subdivision_id=rel.source_subdivision_id,
target_subdivision_id=rel.target_subdivision_id,
)
# Create relationship in Neo4j
if self.neo4j_repo.create_act_relationship(act_rel):
session.query(ActRelationsSQL).filter(ActRelationsSQL.id == rel.id).update(
{"synced_to_neo4j_at": func.now()}
)
synced_count += 1
except Exception as e:
logger.error(f"Failed to sync relation {rel.id} to Neo4j: {e}")
continue
session.commit()
logger.info(f"Successfully synced {synced_count} relations to Neo4j")
return synced_count
def _ensure_act_node(self, act_sql: ActsSQL) -> None:
"""Ensure the Act node exists in Neo4j (idempotent)."""
if not self.neo4j_repo:
return
try:
act_node = ActNode(
id=act_sql.id,
celex=act_sql.celex,
title=act_sql.title,
act_type=self._enum_to_str(act_sql.act_type),
language=self._enum_to_str(act_sql.language),
adoption_date=self._to_datetime(act_sql.adoption_date),
force_date=self._to_datetime(act_sql.force_date),
end_date=self._to_datetime(act_sql.end_date),
sector=act_sql.sector,
level=act_sql.level,
official_journal_reference=act_sql.official_journal_reference,
eli=act_sql.eli,
url_eurlex=act_sql.url_eurlex,
)
self.neo4j_repo.create_act_node(act_node)
except Exception as e:
logger.warning(f"Failed to ensure Act node for {act_sql.celex}: {e}")