Services API¶
Note
This page is generated automatically from the repository’s maintained Python module inventory.
Maintained FastAPI services and worker entrypoints that compose the production Python runtime.
api_gateway¶
Source: services/api-gateway/__init__.py
API gateway package for authenticated public endpoints.
api_gateway.auth¶
Source: services/api-gateway/auth.py
JWT authentication middleware for the API Gateway. Validates Bearer tokens against Keycloak JWKS.
api_gateway.bootstrap¶
Source: services/api-gateway/bootstrap.py
Bootstrap service: schedules data population jobs via Redis.
- async api_gateway.bootstrap.bootstrap_system(redis_client)[source]¶
Orchestrates the data population pipeline. We enqueue only chunk_all; workers chain chunk_act -> embed_act[preset*] + extract_act. :returns: True if jobs were scheduled, False if locked or failed. :rtype: bool
- Parameters:
redis_client (Any)
- Return type:
bool
api_gateway.deps¶
Source: services/api-gateway/deps.py
Shared FastAPI dependencies for api-gateway.
api_gateway.job_service¶
Source: services/api-gateway/job_service.py
Shared job helpers — Redis job submission and status retrieval.
- class api_gateway.job_service.JobResponse(*, job_id, status, message, submitted_at, job_ids=None, presets=None)[source]¶
Bases:
BaseModelGeneric job submission response.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- Parameters:
job_id (str)
status (str)
message (str)
submitted_at (str)
job_ids (list[str] | None)
presets (list[str] | None)
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class api_gateway.job_service.JobStatus(*, job_id, status, type, created_at, progress=None, result=None, error=None)[source]¶
Bases:
BaseModelJob status details.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- Parameters:
job_id (str)
status (str)
type (str)
created_at (float)
progress (Dict[str, Any] | None)
result (Dict[str, Any] | None)
error (str | None)
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- api_gateway.job_service.require_valid_celex(celex)[source]¶
Normalize and validate CELEX format; raise HTTP 400 if unrecognisable.
- Parameters:
celex (str)
- Return type:
str
api_gateway.main¶
Source: services/api-gateway/main.py
API Gateway Entry point for all API requests.
- async api_gateway.main.get_redis(app)[source]¶
Dependency to get the active Redis client. Initializes it if it doesn’t exist (Lazy Loading pattern).
- Parameters:
app (FastAPI)
- Return type:
Redis
- api_gateway.main.lifespan(app)[source]¶
Manages application startup and shutdown events. 1. Connects to Redis. 2. Optionally triggers the bootstrap sequence. 3. Cleans up connections on shutdown.
- Parameters:
app (FastAPI)
- async api_gateway.main.metrics()[source]¶
Expose Prometheus metrics after refreshing backend health probes.
api_gateway.rate_limit¶
Source: services/api-gateway/rate_limit.py
Rate limiting for the API Gateway using slowapi + Redis.
api_gateway.routers¶
Source: services/api-gateway/routers/__init__.py
API Gateway Routers Modular route handlers organized by service domain
api_gateway.routers.admin_pipeline¶
Source: services/api-gateway/routers/admin_pipeline.py
Admin endpoints for pipeline reset and rebuild orchestration.
- class api_gateway.routers.admin_pipeline.PipelineRunningJob(*, job_id, job_type, status)[source]¶
Bases:
BaseModelRunning pipeline job returned by the admin inspection endpoint.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- Parameters:
job_id (str)
job_type (str)
status (str)
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class api_gateway.routers.admin_pipeline.PipelineFullResetResponse(*, job_id, submitted_at, presets, queues_cleared, deleted)[source]¶
Bases:
BaseModelResponse payload for the full pipeline reset endpoint.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- Parameters:
job_id (str)
submitted_at (str)
presets (list[str])
queues_cleared (Dict[str, int])
deleted (Dict[str, Dict[str, int]])
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
api_gateway.routers.config¶
Source: services/api-gateway/routers/config.py
Config Router Exposes read-only system configuration (active models, service info).
api_gateway.routers.conversations¶
Source: services/api-gateway/routers/conversations.py
Conversation history proxy endpoints — list / messages / delete.
- async api_gateway.routers.conversations.list_conversations(request)[source]¶
List conversations for the authenticated user.
- Parameters:
request (Request)
api_gateway.routers.embedding¶
Source: services/api-gateway/routers/embedding.py
Embedding preset metadata and deprecated switch endpoint.
- class api_gateway.routers.embedding.EmbeddingPresetItem(*, preset_id, provider, model_name, device, label, enabled, indexing_enabled, queue_name, vector_size)[source]¶
Bases:
BaseModelDescription of one configured embedding preset.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- Parameters:
preset_id (str)
provider (str)
model_name (str)
device (str)
label (str)
enabled (bool)
indexing_enabled (bool)
queue_name (str)
vector_size (int)
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class api_gateway.routers.embedding.EmbeddingPresetsResponse(*, presets, default_preset)[source]¶
Bases:
BaseModelList of query-enabled embedding presets and the default selection.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- Parameters:
presets (Dict[str, EmbeddingPresetItem])
default_preset (str)
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class api_gateway.routers.embedding.EmbeddingSwitchRequest(*, preset)[source]¶
Bases:
BaseModelRequest payload selecting a new active embedding preset.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- Parameters:
preset (str)
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- async api_gateway.routers.embedding.list_presets()[source]¶
Return query-enabled embedding presets from the central application config.
- Return type:
- async api_gateway.routers.embedding.switch_embedding(_=None)[source]¶
The global embedding switch has been removed in favor of per-preset routing.
- Parameters:
_ (EmbeddingSwitchRequest | None)
- Return type:
None
api_gateway.routers.health¶
Source: services/api-gateway/routers/health.py
Health Check Router Monitors service availability and system health
api_gateway.routers.jobs¶
Source: services/api-gateway/routers/jobs.py
Job Management Router Handles async job submission and status tracking for chunk, embed, and extraction workers.
- async api_gateway.routers.jobs.chunk_all(request, min_content_length=JOB_CHUNK_MIN_CONTENT_LENGTH, redis_conn=Depends(_get_redis_connection))[source]¶
Triggers chunking for all downloaded acts.
- Parameters:
request (Request)
min_content_length (int)
redis_conn (Any)
- async api_gateway.routers.jobs.chunk_act(request, celex, force=False, redis_conn=Depends(_get_redis_connection))[source]¶
Triggers chunking for a specific act.
- Parameters:
request (Request)
celex (str)
force (bool)
redis_conn (Any)
- async api_gateway.routers.jobs.get_chunk_status(job_id, redis_conn=Depends(_get_redis_connection))[source]¶
Checks progress of a chunk job.
- Parameters:
job_id (str)
redis_conn (Any)
- async api_gateway.routers.jobs.embed_all(request, batch_size=JOB_EMBED_BATCH_SIZE, redis_conn=Depends(_get_redis_connection))[source]¶
Triggers embedding for all indexing-enabled presets.
- Parameters:
request (Request)
batch_size (int)
redis_conn (Any)
- async api_gateway.routers.jobs.embed_act(request, celex, batch_size=JOB_EMBED_BATCH_SIZE, redis_conn=Depends(_get_redis_connection))[source]¶
Triggers chunk and act embeddings for a specific act on all indexing-enabled presets.
- Parameters:
request (Request)
celex (str)
batch_size (int)
redis_conn (Any)
- async api_gateway.routers.jobs.get_embed_status(job_id, redis_conn=Depends(_get_redis_connection))[source]¶
Checks progress of an embedding job.
- Parameters:
job_id (str)
redis_conn (Any)
- async api_gateway.routers.jobs.extract_all(request, min_confidence=JOB_EXTRACT_MIN_CONFIDENCE, skip_existing=JOB_EXTRACT_SKIP_EXISTING_DEFAULT, redis_conn=Depends(_get_redis_connection))[source]¶
Triggers relation extraction for all processed chunks.
- Parameters:
request (Request)
min_confidence (float)
skip_existing (bool)
redis_conn (Any)
- async api_gateway.routers.jobs.extract_act(request, celex, min_confidence=JOB_EXTRACT_MIN_CONFIDENCE, force=False, redis_conn=Depends(_get_redis_connection))[source]¶
Triggers relation extraction for a specific act.
- Parameters:
request (Request)
celex (str)
min_confidence (float)
force (bool)
redis_conn (Any)
api_gateway.routers.rag_proxy¶
Source: services/api-gateway/routers/rag_proxy.py
RAG proxy router — /query, /query/stream, /search.
- class api_gateway.routers.rag_proxy.QueryRequest(*, question, conversation_id=None, mode=None, top_k=None, min_score=None, sources=False, graph_depth=None, use_graph=None, graph_use_cypher=False, graph_cypher_timeout_seconds=None, graph_cypher_max_rows=None, graph_generation_timeout_seconds=None, graph_retrieval_timeout_seconds=None, granularity=None, embedding_preset=None, include_full_content=False, celex=None, compare_celex=None)[source]¶
Bases:
BaseModelRAG query request.
All retrieval parameters are optional — the rag-service resolves omitted values from
app_config.yaml(single source of truth).Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- Parameters:
question (str)
conversation_id (str | None)
mode (str | None)
top_k (int | None)
min_score (float | None)
sources (bool)
graph_depth (int | None)
use_graph (bool | None)
graph_use_cypher (bool)
graph_cypher_timeout_seconds (float | None)
graph_cypher_max_rows (int | None)
graph_generation_timeout_seconds (float | None)
graph_retrieval_timeout_seconds (float | None)
granularity (str | None)
embedding_preset (str | None)
include_full_content (bool)
celex (str | None)
compare_celex (str | None)
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- async api_gateway.routers.rag_proxy.query(request, query_request)[source]¶
Proxies user queries to the RAG Service.
- Parameters:
request (Request)
query_request (QueryRequest)
- async api_gateway.routers.rag_proxy.query_stream(request, query_request)[source]¶
Proxies streaming (SSE) queries to the RAG Service.
- Parameters:
request (Request)
query_request (QueryRequest)
- async api_gateway.routers.rag_proxy.search(request, search_request)[source]¶
Proxies search requests to the RAG Service.
- Parameters:
request (Request)
search_request (SearchRequest)
api_gateway.service_metrics¶
Source: services/api-gateway/service_metrics.py
Prometheus helpers for api-gateway runtime metrics.
- api_gateway.service_metrics.observe_http_request(*, path, method, status_code, duration_seconds)[source]¶
Record one HTTP request handled by the API gateway.
- Parameters:
path (str)
method (str)
status_code (int)
duration_seconds (float)
- Return type:
None
- api_gateway.service_metrics.observe_query_request(*, mode, granularity, top_k, duration_seconds, outcome)[source]¶
Record one proxied /query request.
- Parameters:
mode (str | None)
granularity (str | None)
top_k (int)
duration_seconds (float)
outcome (str)
- Return type:
None
- api_gateway.service_metrics.observe_search_request(*, mode, granularity, top_k, duration_seconds, outcome)[source]¶
Record one proxied /search request.
- Parameters:
mode (str | None)
granularity (str | None)
top_k (int)
duration_seconds (float)
outcome (str)
- Return type:
None
chunking_worker¶
Source: services/chunking-worker/__init__.py
Chunking worker package.
chunking_worker.bootstrap¶
Source: services/chunking-worker/bootstrap.py
Bootstrap for chunking worker
- class chunking_worker.bootstrap.ChunkingComponents(pg_repo, chunk_vector_repos, act_vector_repos, chunker)[source]¶
Bases:
objectLong-lived dependencies used by the chunking worker.
- Parameters:
pg_repo (PostgresRepository)
chunk_vector_repos (dict[str, QdrantRepository])
act_vector_repos (dict[str, QdrantRepository])
chunker (Any)
chunking_worker.service_metrics¶
Source: services/chunking-worker/service_metrics.py
Prometheus metrics helpers for chunking-worker.
chunking_worker.worker¶
Source: services/chunking-worker/worker.py
Chunking Worker - Lalandre Processes chunking jobs from Redis queue
- class chunking_worker.worker.WorkerRuntime(redis_client, job_ttl_seconds, chunk_db_commit_batch_size, brpop_timeout_seconds, components=None)[source]¶
Bases:
QueueRuntimeRuntime state and lazy dependencies for the chunking worker loop.
- Parameters:
redis_client (Any)
job_ttl_seconds (int)
chunk_db_commit_batch_size (int)
brpop_timeout_seconds (int)
components (ChunkingComponents | None)
- chunking_worker.worker.build_runtime()[source]¶
Build the queue runtime used by the chunking worker.
- Return type:
- chunking_worker.worker.process_chunk_all(runtime, job_id, params)[source]¶
Dispatch chunk_act jobs for all acts (event-driven pipeline).
- Parameters:
runtime (WorkerRuntime)
job_id (str)
params (dict[str, Any])
- Return type:
None
- chunking_worker.worker.process_chunk_act(runtime, job_id, params)[source]¶
Process chunk single act job
- Parameters:
runtime (WorkerRuntime)
job_id (str)
params (dict[str, Any])
- Return type:
None
- chunking_worker.worker.process_job(runtime, job_data)[source]¶
Dispatch one chunking-related job through the instrumented worker loop.
- Parameters:
runtime (WorkerRuntime)
job_data (dict[str, Any])
- Return type:
None
embedding_service¶
Source: services/embedding-service/__init__.py
Embedding service package entrypoint for documentation and local tooling.
embedding_service.main¶
Source: services/embedding-service/main.py
Embedding Service Generates embeddings with support for multiple providers (Mistral, OpenAI, Local)
- embedding_service.main.lifespan(app)[source]¶
Initialize embedding service on startup
- Parameters:
app (FastAPI)
- embedding_service.main.get_embedding_service(request)[source]¶
Get the embedding service instance
- Parameters:
request (Request)
- Return type:
- class embedding_service.main.EmbedRequest(*, texts, use_cache=True)[source]¶
Bases:
BaseModelRequest model for embedding
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- Parameters:
texts (List[str])
use_cache (bool)
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class embedding_service.main.EmbedResponse(*, embeddings, model, provider, vector_dimension)[source]¶
Bases:
BaseModelResponse model for embedding
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- Parameters:
embeddings (List[List[float]])
model (str)
provider (str)
vector_dimension (int)
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class embedding_service.main.EmbedSingleResponse(*, embedding, model, provider, vector_dimension)[source]¶
Bases:
BaseModelResponse model for single embedding
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- Parameters:
embedding (List[float])
model (str)
provider (str)
vector_dimension (int)
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class embedding_service.main.HealthResponse(*, status, service, provider, model, device, vector_dimension)[source]¶
Bases:
BaseModelResponse model for health check
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- Parameters:
status (str)
service (str)
provider (str | None)
model (str | None)
device (str | None)
vector_dimension (int)
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class embedding_service.main.ServiceInfoResponse(*, provider, model, device, vector_dimension, batch_size, normalize_embeddings, cache_enabled=None, cache_max_size=None, current_cache_size=None, num_api_keys=None)[source]¶
Bases:
BaseModelResponse model for service metadata
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- Parameters:
provider (str | None)
model (str | None)
device (str | None)
vector_dimension (int)
batch_size (int | None)
normalize_embeddings (bool)
cache_enabled (bool | None)
cache_max_size (int | None)
current_cache_size (int | None)
num_api_keys (int | None)
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- async embedding_service.main.health_check(request)[source]¶
Health check
- Parameters:
request (Request)
- Return type:
- async embedding_service.main.embed_texts(request, http_request)[source]¶
Generate embeddings for texts using configured provider
Supports: - Mistral AI (with Redis cache) - OpenAI - Local models (with in-memory LRU cache)
- Parameters:
request (EmbedRequest)
http_request (Request)
- Return type:
- async embedding_service.main.embed_single(text, http_request)[source]¶
Embed a single text (convenience endpoint)
- Parameters:
text (str)
http_request (Request)
- Return type:
- async embedding_service.main.service_info(request)[source]¶
Get embedding service configuration info
- Parameters:
request (Request)
- Return type:
embedding_service.service_metrics¶
Source: services/embedding-service/service_metrics.py
Prometheus metrics helpers for embedding-service.
- embedding_service.service_metrics.observe_http_request(*, path, method, status_code, duration_seconds)[source]¶
Record one HTTP request handled by the embedding service.
- Parameters:
path (str)
method (str)
status_code (int)
duration_seconds (float)
- Return type:
None
embedding_worker¶
Source: services/embedding-worker/__init__.py
Embedding worker package.
embedding_worker.bootstrap¶
Source: services/embedding-worker/bootstrap.py
Bootstrap for embedding worker
- class embedding_worker.bootstrap.EmbeddingComponents(pg_repo, qdrant_chunks, qdrant_acts, embedding_service, payload_builder, preset_id)[source]¶
Bases:
objectLong-lived dependencies used by the embedding worker.
- Parameters:
pg_repo (PostgresRepository)
qdrant_chunks (QdrantRepository)
qdrant_acts (QdrantRepository)
embedding_service (EmbeddingService)
payload_builder (PayloadBuilder)
preset_id (str)
embedding_worker.service_metrics¶
Source: services/embedding-worker/service_metrics.py
Prometheus metrics helpers for embedding-worker.
embedding_worker.worker¶
Source: services/embedding-worker/worker.py
Embedding Worker Processes embedding jobs from Redis queue Creates vector embeddings for chunks and acts and stores them in Qdrant
- class embedding_worker.worker.WorkerRuntime(redis_client, job_ttl_seconds, embed_worker_max_batch_size, embed_qdrant_upsert_batch_size, brpop_timeout_seconds, embed_queue_name, preset_id, components=None)[source]¶
Bases:
QueueRuntimeRuntime state and lazy dependencies for the embedding worker loop.
- Parameters:
redis_client (Any)
job_ttl_seconds (int)
embed_worker_max_batch_size (int)
embed_qdrant_upsert_batch_size (int)
brpop_timeout_seconds (int)
embed_queue_name (str)
preset_id (str)
components (EmbeddingComponents | None)
- embedding_worker.worker.build_runtime()[source]¶
Build the queue runtime used by the embedding worker.
- Return type:
- embedding_worker.worker.process_embed_all(runtime, job_id, params)[source]¶
Dispatch embed_act jobs for all acts (event-driven pipeline).
- Parameters:
runtime (WorkerRuntime)
job_id (str)
params (dict[str, Any])
- Return type:
None
- embedding_worker.worker.process_embed_act(runtime, job_id, params)[source]¶
Embed chunks and (EUR-Lex only) whole-act vector for one act.
- Parameters:
runtime (WorkerRuntime)
job_id (str)
params (dict[str, Any])
- Return type:
None
- embedding_worker.worker.process_job(runtime, job_data)[source]¶
Dispatch one embedding-related job through the instrumented worker loop.
- Parameters:
runtime (WorkerRuntime)
job_data (dict[str, Any])
- Return type:
None
extraction_worker¶
Source: services/extraction-worker/__init__.py
Extraction worker package.
extraction_worker.bootstrap¶
Source: services/extraction-worker/bootstrap.py
Bootstrap for extraction worker.
- class extraction_worker.bootstrap.ExtractionComponents(pg_repo, neo4j_repo, relation_graph, act_summary_service)[source]¶
Bases:
objectLong-lived dependencies used by the extraction worker.
- Parameters:
pg_repo (PostgresRepository)
neo4j_repo (Neo4jRepository)
relation_graph (RelationGraphService)
act_summary_service (ActSummaryService)
extraction_worker.graph¶
Source: services/extraction-worker/graph/__init__.py
Graph extraction: relation persistence and community detection.
extraction_worker.graph.community_builder¶
Source: services/extraction-worker/graph/community_builder.py
Graph community detection for extraction worker.
Mirrors the logic in scripts/build_communities.py but runs inside the worker process, reusing the already-open Neo4j driver from bootstrap.
Communities are persisted as :Community nodes with BELONGS_TO relationships in Neo4j (no JSON file on disk).
Community summaries are generated by LLM (GraphRAG-style) when a client is available, with deterministic fallback.
- extraction_worker.graph.community_builder.run_community_detection(*, driver, database, resolution=1.0, min_community_size=2, llm_client=None)[source]¶
Full community detection pipeline: extract → detect → summarize → persist to Neo4j.
When llm_client is provided, community summaries are generated by LLM (GraphRAG-style). Falls back to deterministic summaries on failure.
Returns a summary dict with status, counts, and modularity.
- Parameters:
driver (Any)
database (str)
resolution (float)
min_community_size (int)
llm_client (JSONHTTPLLMClient | None)
- Return type:
Dict[str, Any]
extraction_worker.graph.relation_service¶
Source: services/extraction-worker/graph/relation_service.py
Relation graph service.
Orchestrates deterministic relation extraction, persistence, and Neo4j sync.
- class extraction_worker.graph.relation_service.RelationGraphService(pg_repo, neo4j_repo, min_confidence, max_chunk_size)[source]¶
Bases:
objectService for extracting and synchronizing legal relationships
Responsibilities: - Extract relations from act text using RelationExtractor - Store relations in PostgreSQL - Sync relations to Neo4j graph - Orchestrate the full workflow
Does NOT: - Parse text (uses RelationExtractor) - Access databases directly (uses repositories)
Initialize relation graph service
- Parameters:
pg_repo (PostgresRepository) – PostgreSQL repository
neo4j_repo (Neo4jRepository | None) – Neo4j repository (optional, for direct sync)
min_confidence (float | None) – Minimum confidence threshold for relations. If None, uses config.gateway.job_extract_min_confidence.
max_chunk_size (int | None) – Maximum text chunk size for extraction. If None, uses config.chunking.extraction_max_chunk_chars.
- extract_and_store_for_act(act_id=None, celex=None, sync_to_neo4j=True, force=False)[source]¶
Extract relations from a single act and store them
- Parameters:
act_id (int | None) – Act ID (PostgreSQL)
celex (str | None) – CELEX identifier (alternative to act_id)
sync_to_neo4j (bool) – Whether to sync to Neo4j after storage
force (bool) – Re-extract even if already extracted
- Returns:
Dictionary with extraction results
- Return type:
Dict[str, Any]
extraction_worker.service_metrics¶
Source: services/extraction-worker/service_metrics.py
Prometheus extraction metrics recorder.
- class extraction_worker.service_metrics.PrometheusExtractionMetricsRecorder[source]¶
Bases:
ExtractionMetricsRecorderPrometheus-backed recorder for extraction LLM metrics.
- observe_call(*, provider, model, outcome, duration_seconds)[source]¶
Record one extraction LLM call latency and outcome.
- Parameters:
provider (str)
model (str)
outcome (str)
duration_seconds (float)
- Return type:
None
- observe_error(*, provider, model, error_type)[source]¶
Record one extraction LLM error bucket.
- Parameters:
provider (str)
model (str)
error_type (str)
- Return type:
None
extraction_worker.worker¶
Source: services/extraction-worker/worker.py
Extraction Worker Processes relation extraction jobs from Redis queue
- class extraction_worker.worker.WorkerRuntime(redis_client, job_ttl_seconds, brpop_timeout_seconds, components=None)[source]¶
Bases:
QueueRuntimeRuntime state and lazy dependencies for the extraction worker loop.
- Parameters:
redis_client (Any)
job_ttl_seconds (int)
brpop_timeout_seconds (int)
components (ExtractionComponents | None)
- extraction_worker.worker.build_runtime()[source]¶
Build the queue runtime used by the extraction worker.
- Return type:
- extraction_worker.worker.process_extract_all(runtime, job_id, params)[source]¶
Dispatch extract_act jobs for all acts (event-driven pipeline).
- Parameters:
runtime (WorkerRuntime)
job_id (str)
params (dict[str, Any])
- Return type:
None
- extraction_worker.worker.process_extract_act(runtime, job_id, params)[source]¶
Process extract single act job
- Parameters:
runtime (WorkerRuntime)
job_id (str)
params (dict[str, Any])
- Return type:
None
- extraction_worker.worker.process_summarize_all(runtime, job_id, params)[source]¶
Enqueue summarize jobs for every act missing a canonical summary.
- Parameters:
runtime (WorkerRuntime)
job_id (str)
params (dict[str, Any])
- Return type:
None
- extraction_worker.worker.process_summarize_act(runtime, job_id, params)[source]¶
Refresh the canonical summary for one CELEX identifier.
- Parameters:
runtime (WorkerRuntime)
job_id (str)
params (dict[str, Any])
- Return type:
None
- extraction_worker.worker.process_build_communities(runtime, job_id, params)[source]¶
Detect Louvain communities on the Act graph and write back to Neo4j.
- Parameters:
runtime (WorkerRuntime)
job_id (str)
params (dict[str, Any])
- Return type:
None
- extraction_worker.worker.process_job(runtime, job_data)[source]¶
Dispatch one extraction-related job through the instrumented worker loop.
- Parameters:
runtime (WorkerRuntime)
job_data (dict[str, Any])
- Return type:
None
rag_service¶
Source: services/rag-service/__init__.py
RAG service package.
rag_service.bootstrap¶
Source: services/rag-service/bootstrap.py
Bootstrap for RAG service
- class rag_service.bootstrap.RagComponents(rag_service, retrieval_service, context_service, graph_rag_service, conversation_manager, pg_repo, key_pool=None, entity_linker=None, external_detector=None)[source]¶
Bases:
objectLong-lived dependencies exposed by the rag-service bootstrap.
- Parameters:
rag_service (RAGService)
retrieval_service (RetrievalService)
context_service (ContextService)
graph_rag_service (GraphRAGService | None)
conversation_manager (ConversationManager)
pg_repo (PostgresRepository)
key_pool (APIKeyPool | None)
entity_linker (LegalEntityLinker | None)
external_detector (Callable[[str], Any] | None)
- rag_service.bootstrap.ensure_graph_rag_service(components)[source]¶
Initialize GraphRAGService lazily if it is not available yet.
This allows graph mode to recover when Neo4j becomes available after startup.
- Parameters:
components (RagComponents)
- Return type:
GraphRAGService | None
rag_service.conversation¶
Source: services/rag-service/conversation.py
Conversation history management for multi-turn RAG.
Handles loading, trimming, and persisting conversation turns.
History is returned as LangChain BaseMessage objects ready for injection
into a ChatPromptTemplate via MessagesPlaceholder.
- class rag_service.conversation.ConversationContext(conversation_id, is_new, history_messages=<factory>)[source]¶
Bases:
objectResult of loading conversation history.
- Parameters:
conversation_id (str)
is_new (bool)
history_messages (List[BaseMessage])
- class rag_service.conversation.ConversationManager(pg_repo, llm=None, max_history_chars=4_000, max_turns=10, summary_max_chars=_DEFAULT_SUMMARY_MAX_CHARS, recent_history_chars=_DEFAULT_RECENT_HISTORY_CHARS)[source]¶
Bases:
objectLoad and persist multi-turn conversation state backed by PostgreSQL.
- Parameters:
pg_repo (PostgresRepository)
llm (Any | None)
max_history_chars (int)
max_turns (int)
summary_max_chars (int)
recent_history_chars (int)
- load_history(conversation_id, question, user_id=None)[source]¶
Load (or create) a conversation and return its history.
When conversation_id is
None, auto-creates a new conversation.- Parameters:
conversation_id (str | None)
question (str)
user_id (str | None)
- Return type:
- save_turn(conversation_id, question, answer, query_id, mode=None, sources=None, timings=None, steps=None)[source]¶
Persist human + assistant messages and return the assistant message id.
- Parameters:
conversation_id (str)
question (str)
answer (str)
query_id (str)
mode (str | None)
sources (Any | None)
timings (dict | None)
steps (list | None)
- Return type:
str
rag_service.graph¶
Source: services/rag-service/graph/__init__.py
Graph RAG components for rag-service.
rag_service.graph.cypher_branch¶
Source: services/rag-service/graph/cypher_branch.py
Cypher branch — orchestration layer.
Collects Cypher-derived graph evidence to enrich (not replace) the main RAG answer. Pipeline: intent → template → Text2Cypher → NL→Cypher fallback.
Routing and template selection are delegated to intent and templates
modules.
- class rag_service.graph.cypher_branch.CypherResolution(status, strategy, generated_cypher, rows, detail)[source]¶
Bases:
objectResolved Cypher-support result returned by the graph branch.
- Parameters:
status (str)
strategy (str)
generated_cypher (str | None)
rows (List[Dict[str, Any]])
detail (str)
- async rag_service.graph.cypher_branch.collect_cypher_support(*, query_request, graph_rag_service, rag_service, config, graph_depth, strict_graph_mode)[source]¶
Collect Cypher-derived graph evidence without replacing the main RAG answer.
- Parameters:
query_request (QueryRequest)
graph_rag_service (Any)
rag_service (Any)
config (Any)
graph_depth (int)
strict_graph_mode (bool)
- Return type:
tuple[Dict[str, Any] | None, Dict[str, Any]]
rag_service.graph.intent¶
Source: services/rag-service/graph/intent.py
Relation intent detection — regex keywords + embedding similarity fallback.
Determines whether a user question has relational intent (i.e. asks about links, amendments, chains between legal acts) and if so, which direction (outgoing, incoming, or both) and which relation types are relevant.
- class rag_service.graph.intent.RelationIntent(has_intent, direction='both', relation_types=<factory>, celex_candidates=<factory>, wants_direct=False, wants_path=False)[source]¶
Bases:
objectResult of analyzing a question for relational intent.
- Parameters:
has_intent (bool)
direction (Literal['outgoing', 'incoming', 'both'])
relation_types (List[str])
celex_candidates (List[str])
wants_direct (bool)
wants_path (bool)
rag_service.graph.templates¶
Source: services/rag-service/graph/templates.py
Cypher template library — deterministic, direction-aware query strategies.
Each strategy produces a ready-to-execute Cypher query from structured intent. Relations in Neo4j are directed: (source)-[:AMENDS]->(target) means “source amends target”. Templates preserve this semantics.
- class rag_service.graph.templates.CypherTemplateMatch(strategy, cypher, detail)[source]¶
Bases:
objectDeterministic Cypher template selected for a relation intent.
- Parameters:
strategy (str)
cypher (str)
detail (str)
- rag_service.graph.templates.match_template(intent, *, max_graph_depth, row_limit)[source]¶
Select the best deterministic Cypher template from structured intent.
- Parameters:
intent (RelationIntent)
max_graph_depth (int)
row_limit (int)
- Return type:
CypherTemplateMatch | None
rag_service.main¶
Source: services/rag-service/main.py
RAG Service - Lalandre Handles RAG queries (retrieval + generation)
- async rag_service.main.health_check(request)[source]¶
Health check endpoint.
- Parameters:
request (Request)
- Return type:
rag_service.metrics¶
Source: services/rag-service/metrics/__init__.py
Prometheus metrics for rag-service.
rag_service.metrics.retrieval¶
Source: services/rag-service/metrics/retrieval.py
Prometheus backend for lalandre_rag runtime retrieval metrics.
- class rag_service.metrics.retrieval.PrometheusRetrievalMetricsRecorder[source]¶
Bases:
RetrievalMetricsRecorderPrometheus-backed recorder for retrieval phase metrics.
rag_service.metrics.service¶
Source: services/rag-service/metrics/service.py
Prometheus helpers for rag-service runtime metrics.
- rag_service.metrics.service.observe_http_request(*, path, method, status_code, duration_seconds)[source]¶
Record one HTTP request handled by rag-service.
- Parameters:
path (str)
method (str)
status_code (int)
duration_seconds (float)
- Return type:
None
- rag_service.metrics.service.observe_query_request(*, mode, granularity, top_k, duration_seconds, outcome, metadata=None)[source]¶
Record one /query request handled by rag-service.
- Parameters:
mode (str | None)
granularity (str | None)
top_k (int | None)
duration_seconds (float)
outcome (str)
metadata (Dict[str, Any] | None)
- Return type:
None
- rag_service.metrics.service.observe_search_request(*, mode, granularity, top_k, duration_seconds, outcome)[source]¶
Record one /search request handled by rag-service.
- Parameters:
mode (str | None)
granularity (str | None)
top_k (int | None)
duration_seconds (float)
outcome (str)
- Return type:
None
- rag_service.metrics.service.observe_provider_error(*, mode, stage, exc_or_reason)[source]¶
Record one provider-side error or fallback reason.
- Parameters:
mode (str | None)
stage (str)
exc_or_reason (Any)
- Return type:
None
- rag_service.metrics.service.observe_provider_fallbacks(*, mode, metadata)[source]¶
Emit provider fallback counters derived from response metadata.
- Parameters:
mode (str | None)
metadata (Dict[str, Any])
- Return type:
None
- rag_service.metrics.service.infer_query_outcome(metadata)[source]¶
Infer the outcome label used for query-level service metrics.
- Parameters:
metadata (Dict[str, Any] | None)
- Return type:
str
- rag_service.metrics.service.refresh_backend_health(components)[source]¶
Probe initialized dependencies and update backend health gauges.
- Parameters:
components (RagComponents)
- Return type:
None
rag_service.mode_handlers¶
Source: services/rag-service/mode_handlers.py
Mode-specific query handlers for the RAG service.
- rag_service.mode_handlers.handle_llm_only_mode(*, query_request, rag_service)[source]¶
Execute the direct LLM-only answer path.
- Parameters:
query_request (QueryRequest)
rag_service (Any)
- Return type:
tuple[str, Dict[str, Any] | None, Dict[str, Any]]
- rag_service.mode_handlers.handle_summarize_mode(*, query_request, rag_service)[source]¶
Execute summarize mode and return answer, sources, and metadata.
- Parameters:
query_request (QueryRequest)
rag_service (Any)
- Return type:
tuple[str, Dict[str, Any] | None, Dict[str, Any]]
- rag_service.mode_handlers.handle_compare_mode(*, query_request, rag_service)[source]¶
Execute compare mode and return answer, sources, and metadata.
- Parameters:
query_request (QueryRequest)
rag_service (Any)
- Return type:
tuple[str, Dict[str, Any] | None, Dict[str, Any]]
- rag_service.mode_handlers.handle_rag_mode(*, query_request, rag_service, chat_history=None)[source]¶
Execute the default retrieval-augmented generation path.
- Parameters:
query_request (QueryRequest)
rag_service (Any)
chat_history (Any)
- Return type:
tuple[str, Dict[str, Any] | None, Dict[str, Any]]
rag_service.models¶
Source: services/rag-service/models.py
Pydantic models for the RAG service API.
- class rag_service.models.QueryRequest(*, query_id, question, conversation_id=None, mode=None, top_k=None, min_score=None, sources=False, include_full_content=False, graph_depth=None, use_graph=None, graph_generation_timeout_seconds=None, graph_retrieval_timeout_seconds=None, graph_use_cypher=False, graph_cypher_timeout_seconds=None, graph_cypher_max_rows=None, granularity=None, embedding_preset=None, retrieval_depth=None, celex=None, compare_celex=None)[source]¶
Bases:
BaseModelRequest model for RAG queries.
All retrieval parameters are optional. When omitted (
None), the rag-service resolves them fromSearchConfigdefaults at request time so thatapp_config.yamlremains the single source of truth.Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- Parameters:
query_id (str)
question (str)
conversation_id (str | None)
mode (str | None)
top_k (int | None)
min_score (float | None)
sources (bool)
include_full_content (bool)
graph_depth (int | None)
use_graph (bool | None)
graph_generation_timeout_seconds (float | None)
graph_retrieval_timeout_seconds (float | None)
graph_use_cypher (bool)
graph_cypher_timeout_seconds (float | None)
graph_cypher_max_rows (int | None)
granularity (str | None)
embedding_preset (str | None)
retrieval_depth (str | None)
celex (str | None)
compare_celex (str | None)
- classmethod resolve_mode_aliases(data)[source]¶
Resolve legacy mode aliases before validation.
- Parameters:
data (Any)
- Return type:
Any
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class rag_service.models.HealthResponse(*, status, service, components_initialized)[source]¶
Bases:
BaseModelHealth response
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- Parameters:
status (str)
service (str)
components_initialized (bool)
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
rag_service.routers¶
Source: services/rag-service/routers/__init__.py
Route registrations for rag-service endpoints.
rag_service.routers._deps¶
Source: services/rag-service/routers/_deps.py
Shared FastAPI dependencies for rag-service routers.
rag_service.routers.conversations¶
Source: services/rag-service/routers/conversations.py
Conversation history endpoints (list / messages / delete).
- async rag_service.routers.conversations.list_conversations(components=Depends(get_components), user_id=None, limit=50)[source]¶
List conversations for a user, ordered by most recent.
- Parameters:
components (RagComponents)
user_id (str | None)
limit (int)
- async rag_service.routers.conversations.get_conversation_messages(conversation_id, components=Depends(get_components), user_id=None)[source]¶
Get messages for a conversation (with ownership check).
- Parameters:
conversation_id (str)
components (RagComponents)
user_id (str | None)
- async rag_service.routers.conversations.delete_conversation(conversation_id, components=Depends(get_components), user_id=None)[source]¶
Delete a conversation (with ownership check).
- Parameters:
conversation_id (str)
components (RagComponents)
user_id (str | None)
rag_service.routers.query¶
Source: services/rag-service/routers/query.py
POST /query endpoint — synchronous RAG query handler.
- rag_service.routers.query.apply_config_defaults_query(query_request)[source]¶
Fill None fields with config defaults (mutates in place).
- Parameters:
query_request (QueryRequest)
- Return type:
None
- rag_service.routers.query.validate_query_mode_and_granularity(query_request)[source]¶
Validate the requested query mode and granularity combination.
- Parameters:
query_request (QueryRequest)
- Return type:
None
- rag_service.routers.query.build_query_response(*, query_request, answer, sources, search_metadata)[source]¶
Assemble the normalized /query response payload.
- Parameters:
query_request (QueryRequest)
answer (str)
sources (Dict[str, Any] | None)
search_metadata (Dict[str, Any] | None)
- Return type:
- async rag_service.routers.query.process_query(query_request, components=Depends(get_components), user_id=Depends(extract_user_id))[source]¶
Process a RAG query
Modes: - llm_only: LLM generation without retrieval - rag: Standard RAG (retrieval + LLM) - summarize: Summarization of retrieved documents - compare: Compare two acts
- Parameters:
query_request (QueryRequest)
components (RagComponents)
user_id (str | None)
- Return type:
rag_service.routers.search¶
Source: services/rag-service/routers/search.py
POST /search endpoint — hybrid/semantic/lexical search handler.
- rag_service.routers.search.apply_config_defaults_search(search_request)[source]¶
Fill None fields with config defaults (mutates in place).
- Parameters:
search_request (SearchRequest)
- Return type:
None
- async rag_service.routers.search.search(search_request, components=Depends(get_components))[source]¶
Hybrid search (semantic + lexical) Supports different search modes: semantic, lexical, or hybrid
score_threshold is interpreted on a normalized [0,1] scale across modes. Lexical BM25 and hybrid RRF scores are normalized before filtering.
- Parameters:
search_request (SearchRequest)
components (RagComponents)
- Return type:
rag_service.routers.stream¶
Source: services/rag-service/routers/stream.py
POST /query/stream endpoint — SSE streaming query handler.
- async rag_service.routers.stream.process_query_stream(query_request, components=Depends(get_components), user_id=Depends(extract_user_id))[source]¶
SSE streaming variant of /query. Streams LLM tokens as they are generated.
- Parameters:
query_request (QueryRequest)
components (RagComponents)
user_id (str | None)
rerank_service¶
Source: services/rerank-service/__init__.py
Rerank service package entrypoint for documentation and local tooling.
rerank_service.main¶
Source: services/rerank-service/main.py
Rerank Service — dedicated cross-encoder reranking for Lalandre.
Loads BAAI/bge-reranker-v2-m3 at startup and exposes a /rerank endpoint.
- rerank_service.main.lifespan(app)[source]¶
Load the reranker model during FastAPI startup.
- Parameters:
app (FastAPI)
- async rerank_service.main.health()[source]¶
Return rerank-service readiness and current model metadata.
- async rerank_service.main.rerank(request)[source]¶
Rerank the supplied documents for the given query.
- Parameters:
request (RerankRequest)
rerank_service.models¶
Source: services/rerank-service/models.py
Request/response models for the rerank service.
- class rerank_service.models.RerankDocument(*, id, content)[source]¶
Bases:
BaseModelDocument candidate accepted by the rerank endpoint.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- Parameters:
id (str)
content (str)
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class rerank_service.models.RerankRequest(*, query, documents, top_k=None)[source]¶
Bases:
BaseModelRequest payload for reranking a list of document candidates.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- Parameters:
query (str)
documents (list[RerankDocument])
top_k (int | None)
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class rerank_service.models.RerankResult(*, id, score, rank)[source]¶
Bases:
BaseModelSingle reranked document with its score and rank.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- Parameters:
id (str)
score (float)
rank (int)
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class rerank_service.models.RerankResponse(*, results, model, duration_ms)[source]¶
Bases:
BaseModelResponse payload returned by the rerank endpoint.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- Parameters:
results (list[RerankResult])
model (str)
duration_ms (float)
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].