Source code for lalandre_rag.graph.service
"""
Graph service for RAG.
Combines semantic search with graph traversal for enhanced regulatory context retrieval.
"""
import logging
from typing import Any, Optional, cast
from lalandre_core.config import get_config, get_env_settings
from lalandre_core.utils.api_key_pool import APIKeyPool
from lalandre_db_neo4j import Neo4jRepository
from lalandre_db_qdrant import QdrantRepository
from lalandre_embedding import EmbeddingService
from .neo4j_adapter import Neo4jGraphRAGAdapter, Text2CypherSearchOutput
logger = logging.getLogger(__name__)
[docs]
class GraphRAGService:
"""
Graph-Enhanced Retrieval-Augmented Generation Service
This service implements the Graph RAG approach by:
1. Using semantic search (Qdrant) to find relevant subdivisions
2. Enriching results with act-level graph context (Neo4j) to capture relationships
3. Providing regulatory ecosystem understanding at the act level
Key capabilities:
- Semantic search (subdivision-level) with graph expansion (act-level)
- Regulatory path discovery between acts
- Temporal relationship tracking (amendments, repeals, etc.)
- Full regulatory context retrieval
Note: Graph operations focus on act-level relationships for performance.
Subdivision details are retrieved from Qdrant/PostgreSQL.
"""
def __init__(
self,
neo4j_repo: Neo4jRepository,
qdrant_repo: QdrantRepository,
embedding_service: EmbeddingService,
key_pool: Optional[APIKeyPool] = None,
):
"""
Initialize Graph RAG service
Args:
neo4j_repo: Neo4j repository for graph operations
qdrant_repo: Qdrant repository for semantic search
embedding_service: Service for generating embeddings
"""
self.neo4j = neo4j_repo
self.qdrant = qdrant_repo
self.embedding = embedding_service
self.key_pool = key_pool
self._official_adapter = self._build_official_adapter()
[docs]
def supports_official_text2cypher(self) -> bool:
"""Return whether the official Neo4j Text2Cypher adapter is available."""
return self._official_adapter is not None
[docs]
def text_to_cypher_search(
self,
*,
question: str,
max_graph_depth: int,
row_limit: int,
) -> Text2CypherSearchOutput:
"""Delegate a graph question to the official Text2Cypher adapter."""
if self._official_adapter is None:
raise RuntimeError("Neo4j GraphRAG adapter is unavailable")
return self._official_adapter.text_to_cypher_search(
question=question,
max_graph_depth=max_graph_depth,
row_limit=row_limit,
)
def _build_official_adapter(self) -> Optional[Neo4jGraphRAGAdapter]:
neo4j_driver = cast(Any, self.neo4j.driver)
if neo4j_driver is None:
return None
config = get_config()
env = get_env_settings()
generation = config.generation
adapter = Neo4jGraphRAGAdapter(
neo4j_driver=neo4j_driver,
neo4j_database=self.neo4j.settings.database,
qdrant_client=self.qdrant.client,
qdrant_collection_name=self.qdrant.collection_name,
llm_provider=generation.provider,
llm_model=str(generation.model_name or "").strip(),
llm_temperature=float(generation.temperature or 0.0),
llm_max_tokens=int(generation.max_tokens),
llm_api_key=generation.api_key or env.LLM_API_KEY,
mistral_api_key=env.MISTRAL_API_KEY,
llm_base_url=generation.base_url,
key_pool=self.key_pool,
read_only_validator=self.neo4j.validate_read_only_cypher,
row_serializer=self.neo4j.serialize_neo4j_value,
)
if not adapter.is_available():
logger.info("Neo4j GraphRAG dependency unavailable; using legacy graph retrieval")
return None
return adapter