Source code for extraction_worker.graph.relation_service

"""
Relation graph service.

Orchestrates deterministic relation extraction, persistence, and Neo4j sync.
"""

import logging
import threading
from datetime import date, datetime
from typing import Any, Dict, List, Optional, cast

from lalandre_core.config import get_config
from lalandre_core.linking import ActAliasEntry, LegalEntityLinker
from lalandre_core.utils import normalize_celex
from lalandre_db_neo4j import ActNode, ActRelationship, Neo4jRepository
from lalandre_db_postgres import ActRelationsSQL, ActsSQL, PostgresRepository, SubdivisionsSQL
from lalandre_extraction.relation_extractor import ExtractedRelation, RegulatoryRelationExtractor
from sqlalchemy.orm import Session
from sqlalchemy.sql import func

logger = logging.getLogger(__name__)

EXTRACTION_STATUS_EXTRACTING = "extracting"
EXTRACTION_STATUS_EXTRACTED = "extracted"
EXTRACTION_STATUS_ERROR = "error"


[docs] class RelationGraphService: """ Service for extracting and synchronizing legal relationships Responsibilities: - Extract relations from act text using RelationExtractor - Store relations in PostgreSQL - Sync relations to Neo4j graph - Orchestrate the full workflow Does NOT: - Parse text (uses RelationExtractor) - Access databases directly (uses repositories) """ def __init__( self, pg_repo: PostgresRepository, neo4j_repo: Optional[Neo4jRepository], min_confidence: Optional[float], max_chunk_size: Optional[int], ) -> None: """ Initialize relation graph service Args: pg_repo: PostgreSQL repository neo4j_repo: Neo4j repository (optional, for direct sync) min_confidence: Minimum confidence threshold for relations. If None, uses config.gateway.job_extract_min_confidence. max_chunk_size: Maximum text chunk size for extraction. If None, uses config.chunking.extraction_max_chunk_chars. """ config = get_config() resolved_min_confidence = ( min_confidence if min_confidence is not None else config.gateway.job_extract_min_confidence ) if resolved_min_confidence is None: raise ValueError("min_confidence must be provided or configured via gateway.job_extract_min_confidence") resolved_max_chunk_size = ( max_chunk_size if max_chunk_size is not None else config.chunking.extraction_max_chunk_chars ) self.pg_repo = pg_repo self.neo4j_repo = neo4j_repo self.min_confidence = float(resolved_min_confidence) self.extractor: RegulatoryRelationExtractor self._entity_linker: Optional[LegalEntityLinker] = None self._entity_linker_lock = threading.Lock() self._entity_linker_act_count: int = 0 self._entity_linker_max_updated_at: Optional[datetime] = None self.max_chunk_size = int(resolved_max_chunk_size) self.extractor = RegulatoryRelationExtractor( max_chunk_size=self.max_chunk_size, validation_enabled=True, ) @staticmethod def _set_extraction_status( session: Session, *, act_id: int, status: str, mark_extracted_at: bool = False, ) -> None: query = session.query(ActsSQL).filter(ActsSQL.id == act_id) if mark_extracted_at: query.update( { ActsSQL.extraction_status: status, ActsSQL.extracted_at: func.now(), } ) return query.update({ActsSQL.extraction_status: status}) @staticmethod def _enum_to_str(value: Any) -> str: """Convert enum-like values to string.""" return str(getattr(value, "value", value)) @staticmethod def _to_datetime(value: date | datetime | None) -> datetime | None: """Normalize SQL date/datetime values for pydantic models.""" if value is None: return None if isinstance(value, datetime): return value return datetime.combine(value, datetime.min.time()) def _ensure_entity_linker(self) -> None: with self.pg_repo.get_session() as session: current_count, current_updated_at = session.query( func.count(ActsSQL.id), func.max(ActsSQL.updated_at), ).one() current_count = int(current_count or 0) with self._entity_linker_lock: # Rebuild if count/last-update changed or linker not yet created. if ( self._entity_linker is not None and current_count == self._entity_linker_act_count and current_updated_at == self._entity_linker_max_updated_at ): return if self._entity_linker is not None: logger.info( "Acts changed (count %s%s, updated_at %s%s), rebuilding entity linker", self._entity_linker_act_count, current_count, self._entity_linker_max_updated_at, current_updated_at, ) config = get_config() with self.pg_repo.get_session() as session: acts = cast(List[ActsSQL], session.query(ActsSQL).all()) entries: List[ActAliasEntry] = [] for act in acts: aliases = LegalEntityLinker.derive_aliases( title=act.title, eli=act.eli, official_journal_reference=act.official_journal_reference, form_number=act.form_number, ) entries.append( ActAliasEntry( celex=act.celex, title=act.title, aliases=aliases, ) ) new_linker = LegalEntityLinker( entries, fuzzy_threshold=config.extraction.entity_linker_fuzzy_threshold, fuzzy_min_gap=config.extraction.entity_linker_fuzzy_min_gap, fuzzy_limit=config.extraction.entity_linker_fuzzy_limit, min_alias_chars=config.extraction.entity_linker_min_alias_chars, ) # Atomic swap: extractor never sees a None linker mid-extraction. self._entity_linker = new_linker self._entity_linker_act_count = len(entries) self._entity_linker_max_updated_at = current_updated_at self.extractor.set_entity_linker(new_linker) logger.info( "Entity linker initialized: acts=%s aliases=%s", len(entries), new_linker.alias_count, )
[docs] def extract_and_store_for_act( self, act_id: Optional[int] = None, celex: Optional[str] = None, sync_to_neo4j: bool = True, force: bool = False, ) -> Dict[str, Any]: """ Extract relations from a single act and store them Args: act_id: Act ID (PostgreSQL) celex: CELEX identifier (alternative to act_id) sync_to_neo4j: Whether to sync to Neo4j after storage force: Re-extract even if already extracted Returns: Dictionary with extraction results """ # Get act from PostgreSQL act_sql: ActsSQL full_text: str with self.pg_repo.get_session() as session: if celex is not None: normalized_celex = normalize_celex(celex) act_sql_row = cast( Optional[ActsSQL], session.query(ActsSQL).filter(ActsSQL.celex == normalized_celex).first() ) elif act_id is not None: act_sql_row = cast(Optional[ActsSQL], session.query(ActsSQL).filter(ActsSQL.id == act_id).first()) else: raise ValueError("Either act_id or celex must be provided") if act_sql_row is None: raise ValueError(f"Act not found: {normalize_celex(celex) if celex is not None else act_id}") act_sql = act_sql_row if sync_to_neo4j: self._ensure_act_node(act_sql) # Check if already extracted (skip to avoid wasting API calls) extraction_status = act_sql.extraction_status if not force and extraction_status == EXTRACTION_STATUS_EXTRACTED: logger.info(f"Act {act_sql.celex} already extracted, skipping") # Return existing relations count existing_count = ( session.query(ActRelationsSQL).filter(ActRelationsSQL.source_act_id == act_sql.id).count() ) return { "celex": act_sql.celex, "relations_found": existing_count, "relations_stored": existing_count, "status": "already_extracted", } # When forcing, delete existing relations first if force and extraction_status == EXTRACTION_STATUS_EXTRACTED: logger.info(f"Act {act_sql.celex} force re-extraction, deleting existing relations") session.query(ActRelationsSQL).filter(ActRelationsSQL.source_act_id == act_sql.id).delete( synchronize_session=False ) session.commit() # Mark as extracting self._set_extraction_status( session, act_id=act_sql.id, status=EXTRACTION_STATUS_EXTRACTING, ) session.commit() # Get full text from subdivisions full_text = self._get_act_full_text(act_sql.id) if not full_text: logger.warning(f"No text content for act {act_sql.celex}") # Mark as error with self.pg_repo.get_session() as err_session: self._set_extraction_status( err_session, act_id=act_sql.id, status=EXTRACTION_STATUS_ERROR, ) err_session.commit() return {"celex": act_sql.celex, "relations_found": 0, "relations_stored": 0, "status": "no_content"} # Extract relations (single deterministic pipeline) logger.info("[EXTRACT] Processing act %s (%s chars)", act_sql.celex, len(full_text)) self._ensure_entity_linker() extractor = self.extractor try: relations = extractor.extract_relations( text=full_text, source_celex=act_sql.celex, min_confidence=self.min_confidence ) logger.info( "[EXTRACT] Relations raw count=%d preview=%s", len(relations), [ { "type": rel.relation_type.value, "target": rel.target_celex, "conf": round(rel.confidence, 3), "evidence": rel.text_evidence[:120], "method": rel.resolution_method, "score": rel.resolution_score, } for rel in relations[:5] ], ) except Exception as e: logger.error(f"Failed to extract relations for {act_sql.celex}: {e}") # Mark as error with self.pg_repo.get_session() as err_session: self._set_extraction_status( err_session, act_id=act_sql.id, status=EXTRACTION_STATUS_ERROR, ) err_session.commit() return {"celex": act_sql.celex, "relations_found": 0, "relations_stored": 0, "status": "extraction_failed"} if not relations: logger.info(f"No relations found for act {act_sql.celex}") # Mark as extracted even if no relations found with self.pg_repo.get_session() as no_rel_session: self._set_extraction_status( no_rel_session, act_id=act_sql.id, status=EXTRACTION_STATUS_EXTRACTED, mark_extracted_at=True, ) no_rel_session.commit() return {"celex": act_sql.celex, "relations_found": 0, "relations_stored": 0, "status": "no_relations"} # Store in PostgreSQL store_result = self._store_relations(act_sql, relations) stored_count = store_result["stored"] logger.info( "[STORED] %s relations stored for %s (%s internal, %s external, resolved_existing=%s) sample=%s", stored_count, act_sql.celex, store_result["internal"], store_result["external"], store_result.get("resolved_existing", 0), [ { "type": rel.relation_type.value, "target": rel.target_celex, "conf": round(rel.confidence, 3), "method": rel.resolution_method, "score": rel.resolution_score, } for rel in relations[:5] ], ) # Sync to Neo4j if requested — always run to resolve relations # from previously extracted acts that now target this act. synced_count = 0 if sync_to_neo4j: synced_count = self._sync_to_neo4j() return { "celex": act_sql.celex, "relations_found": len(relations), "relations_stored": stored_count, "relations_internal": store_result["internal"], "relations_external": store_result["external"], "relations_resolved_existing": store_result.get("resolved_existing", 0), "relations_synced": synced_count, "status": "success", }
def _get_act_full_text(self, act_id: int) -> str: """Get concatenated text from all subdivisions""" with self.pg_repo.get_session() as session: subdivisions = cast( List[SubdivisionsSQL], session.query(SubdivisionsSQL) .filter(SubdivisionsSQL.act_id == act_id) .order_by(SubdivisionsSQL.sequence_order) .all(), ) return "\n\n".join([f"{subdiv.title or ''}\n{subdiv.content or ''}" for subdiv in subdivisions]) def _store_relations(self, act_sql: ActsSQL, relations: List[ExtractedRelation]) -> Dict[str, int]: """ Store ALL relations in PostgreSQL (even if target act doesn't exist) Philosophy: - PostgreSQL: Store ALL relations (external references allowed) - Neo4j: Only sync relations where both nodes exist (handled separately) """ stored_count = 0 skipped_duplicate = 0 skipped_self_ref = 0 skipped_invalid = 0 external_refs_count = 0 resolved_existing_count = 0 with self.pg_repo.get_session() as session: for rel in relations: if not self._is_relation_storable(rel): skipped_invalid += 1 continue # Normalize target CELEX to match database format normalized_celex = normalize_celex(rel.target_celex) # Try to find target act (but don't fail if not found) target_act = cast( Optional[ActsSQL], session.query(ActsSQL).filter(ActsSQL.celex == normalized_celex).first() ) # Determine if this is an external reference is_external = target_act is None # Skip self-referential relations (act pointing to itself) if target_act is not None and act_sql.id == target_act.id: logger.debug( f"Skipping self-referential relation: {act_sql.celex} {rel.relation_type.value} itself" ) skipped_self_ref += 1 continue # Check if already exists (using target_celex instead of target_act_id) existing = ( session.query(ActRelationsSQL) .filter( ActRelationsSQL.source_act_id == act_sql.id, ActRelationsSQL.target_celex == normalized_celex, ActRelationsSQL.relation_type == rel.relation_type, ) .first() ) if existing: existing_rel = cast(ActRelationsSQL, existing) if target_act is not None and ( existing_rel.target_act_id is None or not bool(existing_rel.is_resolved) ): existing_row = cast(Any, existing_rel) existing_row.target_act_id = target_act.id existing_row.is_resolved = True existing_row.synced_to_neo4j_at = None resolved_existing_count += 1 logger.info( f"Resolved existing relation: " f"{rel.relation_type.value}{normalized_celex} " f"(target_act_id={target_act.id})" ) continue logger.debug(f"Relation already exists: {rel.relation_type.value}{normalized_celex}") skipped_duplicate += 1 continue # Create new relation - ALWAYS store, even if target doesn't exist # Preserve both rationale (description) and text evidence when available. act_relation = ActRelationsSQL( source_act_id=act_sql.id, target_act_id=target_act.id if target_act is not None else None, # NULL if external target_celex=normalized_celex, # Always store CELEX relation_type=rel.relation_type, description=rel.relation_description or rel.text_evidence or None, evidence=rel.text_evidence or None, rationale=rel.relation_description or None, resolution_method=rel.resolution_method, resolution_score=rel.resolution_score, target_reference=rel.raw_target_reference, confidence=rel.confidence, source=rel.extraction_method, validated=False, # Not yet manually validated is_resolved=not is_external, # True if target exists, False otherwise ) session.add(act_relation) stored_count += 1 if is_external: external_refs_count += 1 logger.debug( f"Stored external ref: {rel.relation_type.value}{normalized_celex} " f"(confidence: {rel.confidence:.2f})" ) else: logger.info( f"Stored: {rel.relation_type.value}{normalized_celex} (confidence: {rel.confidence:.2f})" ) session.commit() # Mark act as extracted self._set_extraction_status( session, act_id=act_sql.id, status=EXTRACTION_STATUS_EXTRACTED, mark_extracted_at=True, ) session.commit() # Report statistics total_skipped = skipped_duplicate + skipped_self_ref + skipped_invalid internal_count = stored_count - external_refs_count + resolved_existing_count logger.info( f"Stored {stored_count} relations for {act_sql.celex}: " f"{internal_count} internal, {external_refs_count} external, " f"{resolved_existing_count} resolved-existing" ) if total_skipped > 0: skip_reasons: List[str] = [] if skipped_duplicate > 0: skip_reasons.append(f"{skipped_duplicate} duplicates") if skipped_self_ref > 0: skip_reasons.append(f"{skipped_self_ref} self-referential") if skipped_invalid > 0: skip_reasons.append(f"{skipped_invalid} invalid") logger.info(f"Skipped {total_skipped} relations: {', '.join(skip_reasons)}") return { "stored": stored_count, "internal": internal_count, "external": external_refs_count, "resolved_existing": resolved_existing_count, "skipped": total_skipped, } @staticmethod def _is_relation_storable(rel: ExtractedRelation) -> bool: target = normalize_celex(rel.target_celex).strip() if not target: return False if rel.confidence < 0.0 or rel.confidence > 1.0: return False return True def _sync_to_neo4j(self) -> int: """Sync all relations from PostgreSQL to Neo4j""" if not self.neo4j_repo: logger.warning("No Neo4j repository configured, skipping sync") return 0 # Direct sync - sync all internal (resolved) relations logger.info("Running Neo4j sync (resolve pending + sync unsynced)") synced_count = 0 with self.pg_repo.get_session() as session: unresolved_relations = cast( List[ActRelationsSQL], session.query(ActRelationsSQL) .filter( ActRelationsSQL.is_resolved.is_(False), ActRelationsSQL.target_act_id.is_(None), ActRelationsSQL.target_celex.isnot(None), ) .all(), ) resolved_now = 0 for relation in unresolved_relations: target_celex = relation.target_celex if not target_celex: continue target_act = cast( Optional[ActsSQL], session.query(ActsSQL).filter(ActsSQL.celex == target_celex).first() ) if target_act is None: continue session.query(ActRelationsSQL).filter(ActRelationsSQL.id == relation.id).update( {"target_act_id": target_act.id, "is_resolved": True, "synced_to_neo4j_at": None} ) resolved_now += 1 if resolved_now > 0: session.commit() logger.info(f"Resolved {resolved_now} previously external relations before Neo4j sync") # Get all internal relations (where both source and target acts exist) relations = cast( List[ActRelationsSQL], session.query(ActRelationsSQL) .filter( ActRelationsSQL.is_resolved.is_(True), ActRelationsSQL.target_act_id.isnot(None), ActRelationsSQL.synced_to_neo4j_at.is_(None), ) .all(), ) logger.info(f"Found {len(relations)} internal relations to sync to Neo4j") if relations: act_ids = {rel.source_act_id for rel in relations} act_ids.update(rel.target_act_id for rel in relations if rel.target_act_id is not None) acts = cast(List[ActsSQL], session.query(ActsSQL).filter(ActsSQL.id.in_(act_ids)).all()) for act in acts: self._ensure_act_node(act) for rel in relations: try: target_act_id = rel.target_act_id if target_act_id is None: continue # Create ActRelationship model for Neo4j relation_type = self._enum_to_str(rel.relation_type) act_rel = ActRelationship( source_act_id=rel.source_act_id, target_act_id=target_act_id, relation_type=relation_type, effect_date=self._to_datetime(rel.effect_date), description=rel.description, source_subdivision_id=rel.source_subdivision_id, target_subdivision_id=rel.target_subdivision_id, ) # Create relationship in Neo4j if self.neo4j_repo.create_act_relationship(act_rel): session.query(ActRelationsSQL).filter(ActRelationsSQL.id == rel.id).update( {"synced_to_neo4j_at": func.now()} ) synced_count += 1 except Exception as e: logger.error(f"Failed to sync relation {rel.id} to Neo4j: {e}") continue session.commit() logger.info(f"Successfully synced {synced_count} relations to Neo4j") return synced_count def _ensure_act_node(self, act_sql: ActsSQL) -> None: """Ensure the Act node exists in Neo4j (idempotent).""" if not self.neo4j_repo: return try: act_node = ActNode( id=act_sql.id, celex=act_sql.celex, title=act_sql.title, act_type=self._enum_to_str(act_sql.act_type), language=self._enum_to_str(act_sql.language), adoption_date=self._to_datetime(act_sql.adoption_date), force_date=self._to_datetime(act_sql.force_date), end_date=self._to_datetime(act_sql.end_date), sector=act_sql.sector, level=act_sql.level, official_journal_reference=act_sql.official_journal_reference, eli=act_sql.eli, url_eurlex=act_sql.url_eurlex, ) self.neo4j_repo.create_act_node(act_node) except Exception as e: logger.warning(f"Failed to ensure Act node for {act_sql.celex}: {e}")