Services API

Note

This page is generated automatically from the repository’s maintained Python module inventory.

Maintained FastAPI services and worker entrypoints that compose the production Python runtime.

api_gateway

Source: services/api-gateway/__init__.py

API gateway package for authenticated public endpoints.

api_gateway.auth

Source: services/api-gateway/auth.py

JWT authentication middleware for the API Gateway. Validates Bearer tokens against Keycloak JWKS.

async api_gateway.auth.require_auth(request)[source]

FastAPI dependency that validates JWT Bearer tokens.

Parameters:

request (Request)

Return type:

dict

async api_gateway.auth.require_admin(request)[source]

FastAPI dependency that requires the caller to hold the admin role.

Checks realm_access.roles in the Keycloak JWT. When auth is disabled (no KEYCLOAK_URL) every caller is treated as admin.

Parameters:

request (Request)

Return type:

dict

api_gateway.bootstrap

Source: services/api-gateway/bootstrap.py

Bootstrap service: schedules data population jobs via Redis.

async api_gateway.bootstrap.bootstrap_system(redis_client)[source]

Orchestrates the data population pipeline. We enqueue only chunk_all; workers chain chunk_act -> embed_act[preset*] + extract_act. :returns: True if jobs were scheduled, False if locked or failed. :rtype: bool

Parameters:

redis_client (Any)

Return type:

bool

api_gateway.deps

Source: services/api-gateway/deps.py

Shared FastAPI dependencies for api-gateway.

api_gateway.deps.get_runtime_config(request)[source]

Return (rag_service_url, timeout_seconds) from app state.

Parameters:

request (Request)

Return type:

tuple[str, float]

api_gateway.deps.extract_user_id(request)[source]

Best-effort extraction of user_id (sub) from JWT without re-validating.

When auth is disabled the header won’t be present — returns None.

Parameters:

request (Request)

Return type:

str | None

api_gateway.job_service

Source: services/api-gateway/job_service.py

Shared job helpers — Redis job submission and status retrieval.

class api_gateway.job_service.JobResponse(*, job_id, status, message, submitted_at, job_ids=None, presets=None)[source]

Bases: BaseModel

Generic job submission response.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Parameters:
  • job_id (str)

  • status (str)

  • message (str)

  • submitted_at (str)

  • job_ids (list[str] | None)

  • presets (list[str] | None)

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class api_gateway.job_service.JobStatus(*, job_id, status, type, created_at, progress=None, result=None, error=None)[source]

Bases: BaseModel

Job status details.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Parameters:
  • job_id (str)

  • status (str)

  • type (str)

  • created_at (float)

  • progress (Dict[str, Any] | None)

  • result (Dict[str, Any] | None)

  • error (str | None)

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

api_gateway.job_service.require_valid_celex(celex)[source]

Normalize and validate CELEX format; raise HTTP 400 if unrecognisable.

Parameters:

celex (str)

Return type:

str

async api_gateway.job_service.submit_job(redis_conn, job_type, channel, params, message='')[source]

Push a job to a Redis queue and initialise its tracking key.

Parameters:
  • redis_conn (Any)

  • job_type (str)

  • channel (str)

  • params (Dict[str, Any])

  • message (str)

Return type:

JobResponse

async api_gateway.job_service.get_job_status(redis_conn, job_id)[source]

Retrieve job status from Redis.

Parameters:
  • redis_conn (Any)

  • job_id (str)

Return type:

JobStatus

api_gateway.main

Source: services/api-gateway/main.py

API Gateway Entry point for all API requests.

async api_gateway.main.get_redis(app)[source]

Dependency to get the active Redis client. Initializes it if it doesn’t exist (Lazy Loading pattern).

Parameters:

app (FastAPI)

Return type:

Redis

api_gateway.main.lifespan(app)[source]

Manages application startup and shutdown events. 1. Connects to Redis. 2. Optionally triggers the bootstrap sequence. 3. Cleans up connections on shutdown.

Parameters:

app (FastAPI)

async api_gateway.main.initialize_routers()[source]

Initialize runtime state for routers

async api_gateway.main.root()[source]

Basic service info and useful entrypoints.

async api_gateway.main.metrics()[source]

Expose Prometheus metrics after refreshing backend health probes.

async api_gateway.main.not_found_handler(request, exc)[source]

Return a normalized JSON payload for unknown endpoints.

Parameters:
  • request (Request)

  • exc (HTTPException)

async api_gateway.main.internal_error_handler(request, exc)[source]

Return a normalized JSON payload for uncaught internal errors.

Parameters:
  • request (Request)

  • exc (Exception)

api_gateway.rate_limit

Source: services/api-gateway/rate_limit.py

Rate limiting for the API Gateway using slowapi + Redis.

api_gateway.routers

Source: services/api-gateway/routers/__init__.py

API Gateway Routers Modular route handlers organized by service domain

api_gateway.routers.admin_pipeline

Source: services/api-gateway/routers/admin_pipeline.py

Admin endpoints for pipeline reset and rebuild orchestration.

class api_gateway.routers.admin_pipeline.PipelineRunningJob(*, job_id, job_type, status)[source]

Bases: BaseModel

Running pipeline job returned by the admin inspection endpoint.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Parameters:
  • job_id (str)

  • job_type (str)

  • status (str)

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class api_gateway.routers.admin_pipeline.PipelineFullResetResponse(*, job_id, submitted_at, presets, queues_cleared, deleted)[source]

Bases: BaseModel

Response payload for the full pipeline reset endpoint.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Parameters:
  • job_id (str)

  • submitted_at (str)

  • presets (list[str])

  • queues_cleared (Dict[str, int])

  • deleted (Dict[str, Dict[str, int]])

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

async api_gateway.routers.admin_pipeline.full_reset_pipeline(request, redis_conn=Depends(_get_redis_connection))[source]

Purge the vector/extraction pipeline state, then re-enqueue a clean chunk_all.

Parameters:
  • request (Request)

  • redis_conn (Any)

Return type:

PipelineFullResetResponse

api_gateway.routers.config

Source: services/api-gateway/routers/config.py

Config Router Exposes read-only system configuration (active models, service info).

async api_gateway.routers.config.get_system_config(request)[source]

Return read-only system configuration: active models and service info. Calls downstream services to retrieve live info where available.

Parameters:

request (Request)

Return type:

dict

async api_gateway.routers.config.reload_rerank(request)[source]

Proxy reload request to the rerank service.

Parameters:

request (Request)

Return type:

dict

api_gateway.routers.conversations

Source: services/api-gateway/routers/conversations.py

Conversation history proxy endpoints — list / messages / delete.

async api_gateway.routers.conversations.list_conversations(request)[source]

List conversations for the authenticated user.

Parameters:

request (Request)

async api_gateway.routers.conversations.get_conversation_messages(conversation_id, request)[source]

Get messages for a conversation.

Parameters:
  • conversation_id (str)

  • request (Request)

async api_gateway.routers.conversations.delete_conversation(conversation_id, request)[source]

Delete a conversation.

Parameters:
  • conversation_id (str)

  • request (Request)

api_gateway.routers.embedding

Source: services/api-gateway/routers/embedding.py

Embedding preset metadata and deprecated switch endpoint.

class api_gateway.routers.embedding.EmbeddingPresetItem(*, preset_id, provider, model_name, device, label, enabled, indexing_enabled, queue_name, vector_size)[source]

Bases: BaseModel

Description of one configured embedding preset.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Parameters:
  • preset_id (str)

  • provider (str)

  • model_name (str)

  • device (str)

  • label (str)

  • enabled (bool)

  • indexing_enabled (bool)

  • queue_name (str)

  • vector_size (int)

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class api_gateway.routers.embedding.EmbeddingPresetsResponse(*, presets, default_preset)[source]

Bases: BaseModel

List of query-enabled embedding presets and the default selection.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Parameters:
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class api_gateway.routers.embedding.EmbeddingSwitchRequest(*, preset)[source]

Bases: BaseModel

Request payload selecting a new active embedding preset.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Parameters:

preset (str)

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

async api_gateway.routers.embedding.list_presets()[source]

Return query-enabled embedding presets from the central application config.

Return type:

EmbeddingPresetsResponse

async api_gateway.routers.embedding.switch_embedding(_=None)[source]

The global embedding switch has been removed in favor of per-preset routing.

Parameters:

_ (EmbeddingSwitchRequest | None)

Return type:

None

api_gateway.routers.health

Source: services/api-gateway/routers/health.py

Health Check Router Monitors service availability and system health

async api_gateway.routers.health.health_check(request)[source]

health check for API Gateway and dependent services.

Returns:

  • 200 if all services are healthy

  • 503 if any critical service is down

Parameters:

request (Request)

Return type:

dict[str, object]

api_gateway.routers.jobs

Source: services/api-gateway/routers/jobs.py

Job Management Router Handles async job submission and status tracking for chunk, embed, and extraction workers.

async api_gateway.routers.jobs.chunk_all(request, min_content_length=JOB_CHUNK_MIN_CONTENT_LENGTH, redis_conn=Depends(_get_redis_connection))[source]

Triggers chunking for all downloaded acts.

Parameters:
  • request (Request)

  • min_content_length (int)

  • redis_conn (Any)

async api_gateway.routers.jobs.chunk_act(request, celex, force=False, redis_conn=Depends(_get_redis_connection))[source]

Triggers chunking for a specific act.

Parameters:
  • request (Request)

  • celex (str)

  • force (bool)

  • redis_conn (Any)

async api_gateway.routers.jobs.get_chunk_status(job_id, redis_conn=Depends(_get_redis_connection))[source]

Checks progress of a chunk job.

Parameters:
  • job_id (str)

  • redis_conn (Any)

async api_gateway.routers.jobs.embed_all(request, batch_size=JOB_EMBED_BATCH_SIZE, redis_conn=Depends(_get_redis_connection))[source]

Triggers embedding for all indexing-enabled presets.

Parameters:
  • request (Request)

  • batch_size (int)

  • redis_conn (Any)

async api_gateway.routers.jobs.embed_act(request, celex, batch_size=JOB_EMBED_BATCH_SIZE, redis_conn=Depends(_get_redis_connection))[source]

Triggers chunk and act embeddings for a specific act on all indexing-enabled presets.

Parameters:
  • request (Request)

  • celex (str)

  • batch_size (int)

  • redis_conn (Any)

async api_gateway.routers.jobs.get_embed_status(job_id, redis_conn=Depends(_get_redis_connection))[source]

Checks progress of an embedding job.

Parameters:
  • job_id (str)

  • redis_conn (Any)

async api_gateway.routers.jobs.extract_all(request, min_confidence=JOB_EXTRACT_MIN_CONFIDENCE, skip_existing=JOB_EXTRACT_SKIP_EXISTING_DEFAULT, redis_conn=Depends(_get_redis_connection))[source]

Triggers relation extraction for all processed chunks.

Parameters:
  • request (Request)

  • min_confidence (float)

  • skip_existing (bool)

  • redis_conn (Any)

async api_gateway.routers.jobs.extract_act(request, celex, min_confidence=JOB_EXTRACT_MIN_CONFIDENCE, force=False, redis_conn=Depends(_get_redis_connection))[source]

Triggers relation extraction for a specific act.

Parameters:
  • request (Request)

  • celex (str)

  • min_confidence (float)

  • force (bool)

  • redis_conn (Any)

async api_gateway.routers.jobs.get_extract_status(job_id, redis_conn=Depends(_get_redis_connection))[source]

Checks progress of an extract job.

Parameters:
  • job_id (str)

  • redis_conn (Any)

api_gateway.routers.rag_proxy

Source: services/api-gateway/routers/rag_proxy.py

RAG proxy router — /query, /query/stream, /search.

class api_gateway.routers.rag_proxy.QueryRequest(*, question, conversation_id=None, mode=None, top_k=None, min_score=None, sources=False, graph_depth=None, use_graph=None, graph_use_cypher=False, graph_cypher_timeout_seconds=None, graph_cypher_max_rows=None, graph_generation_timeout_seconds=None, graph_retrieval_timeout_seconds=None, granularity=None, embedding_preset=None, include_full_content=False, celex=None, compare_celex=None)[source]

Bases: BaseModel

RAG query request.

All retrieval parameters are optional — the rag-service resolves omitted values from app_config.yaml (single source of truth).

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Parameters:
  • question (str)

  • conversation_id (str | None)

  • mode (str | None)

  • top_k (int | None)

  • min_score (float | None)

  • sources (bool)

  • graph_depth (int | None)

  • use_graph (bool | None)

  • graph_use_cypher (bool)

  • graph_cypher_timeout_seconds (float | None)

  • graph_cypher_max_rows (int | None)

  • graph_generation_timeout_seconds (float | None)

  • graph_retrieval_timeout_seconds (float | None)

  • granularity (str | None)

  • embedding_preset (str | None)

  • include_full_content (bool)

  • celex (str | None)

  • compare_celex (str | None)

validate_question()[source]

Validate required CELEX fields for summarize and compare modes.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

async api_gateway.routers.rag_proxy.query(request, query_request)[source]

Proxies user queries to the RAG Service.

Parameters:
async api_gateway.routers.rag_proxy.query_stream(request, query_request)[source]

Proxies streaming (SSE) queries to the RAG Service.

Parameters:
async api_gateway.routers.rag_proxy.search(request, search_request)[source]

Proxies search requests to the RAG Service.

Parameters:

api_gateway.service_metrics

Source: services/api-gateway/service_metrics.py

Prometheus helpers for api-gateway runtime metrics.

api_gateway.service_metrics.observe_http_request(*, path, method, status_code, duration_seconds)[source]

Record one HTTP request handled by the API gateway.

Parameters:
  • path (str)

  • method (str)

  • status_code (int)

  • duration_seconds (float)

Return type:

None

api_gateway.service_metrics.observe_query_request(*, mode, granularity, top_k, duration_seconds, outcome)[source]

Record one proxied /query request.

Parameters:
  • mode (str | None)

  • granularity (str | None)

  • top_k (int)

  • duration_seconds (float)

  • outcome (str)

Return type:

None

api_gateway.service_metrics.observe_search_request(*, mode, granularity, top_k, duration_seconds, outcome)[source]

Record one proxied /search request.

Parameters:
  • mode (str | None)

  • granularity (str | None)

  • top_k (int)

  • duration_seconds (float)

  • outcome (str)

Return type:

None

api_gateway.service_metrics.observe_proxy_error(*, endpoint, target, exc_or_reason)[source]

Record a backend proxy error observed by the gateway.

Parameters:
  • endpoint (str)

  • target (str)

  • exc_or_reason (Any)

Return type:

None

async api_gateway.service_metrics.refresh_backend_health(app)[source]

Probe Redis and downstream HTTP services and update health gauges.

Parameters:

app (FastAPI)

Return type:

None

chunking_worker

Source: services/chunking-worker/__init__.py

Chunking worker package.

chunking_worker.bootstrap

Source: services/chunking-worker/bootstrap.py

Bootstrap for chunking worker

class chunking_worker.bootstrap.ChunkingComponents(pg_repo, chunk_vector_repos, act_vector_repos, chunker)[source]

Bases: object

Long-lived dependencies used by the chunking worker.

Parameters:
chunking_worker.bootstrap.init_components()[source]

Initialize chunking worker components

Return type:

ChunkingComponents

chunking_worker.service_metrics

Source: services/chunking-worker/service_metrics.py

Prometheus metrics helpers for chunking-worker.

chunking_worker.worker

Source: services/chunking-worker/worker.py

Chunking Worker - Lalandre Processes chunking jobs from Redis queue

class chunking_worker.worker.WorkerRuntime(redis_client, job_ttl_seconds, chunk_db_commit_batch_size, brpop_timeout_seconds, components=None)[source]

Bases: QueueRuntime

Runtime state and lazy dependencies for the chunking worker loop.

Parameters:
  • redis_client (Any)

  • job_ttl_seconds (int)

  • chunk_db_commit_batch_size (int)

  • brpop_timeout_seconds (int)

  • components (ChunkingComponents | None)

ensure_components()[source]

Initialize worker dependencies on first use and return them.

Return type:

ChunkingComponents

chunking_worker.worker.build_runtime()[source]

Build the queue runtime used by the chunking worker.

Return type:

WorkerRuntime

chunking_worker.worker.process_chunk_all(runtime, job_id, params)[source]

Dispatch chunk_act jobs for all acts (event-driven pipeline).

Parameters:
Return type:

None

chunking_worker.worker.process_chunk_act(runtime, job_id, params)[source]

Process chunk single act job

Parameters:
Return type:

None

chunking_worker.worker.process_job(runtime, job_data)[source]

Dispatch one chunking-related job through the instrumented worker loop.

Parameters:
Return type:

None

chunking_worker.worker.main()[source]

Start the chunking worker loop and its Prometheus endpoint.

Return type:

None

embedding_service

Source: services/embedding-service/__init__.py

Embedding service package entrypoint for documentation and local tooling.

embedding_service.main

Source: services/embedding-service/main.py

Embedding Service Generates embeddings with support for multiple providers (Mistral, OpenAI, Local)

embedding_service.main.lifespan(app)[source]

Initialize embedding service on startup

Parameters:

app (FastAPI)

embedding_service.main.get_embedding_service(request)[source]

Get the embedding service instance

Parameters:

request (Request)

Return type:

EmbeddingService

class embedding_service.main.EmbedRequest(*, texts, use_cache=True)[source]

Bases: BaseModel

Request model for embedding

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Parameters:
  • texts (List[str])

  • use_cache (bool)

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class embedding_service.main.EmbedResponse(*, embeddings, model, provider, vector_dimension)[source]

Bases: BaseModel

Response model for embedding

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Parameters:
  • embeddings (List[List[float]])

  • model (str)

  • provider (str)

  • vector_dimension (int)

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class embedding_service.main.EmbedSingleResponse(*, embedding, model, provider, vector_dimension)[source]

Bases: BaseModel

Response model for single embedding

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Parameters:
  • embedding (List[float])

  • model (str)

  • provider (str)

  • vector_dimension (int)

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class embedding_service.main.HealthResponse(*, status, service, provider, model, device, vector_dimension)[source]

Bases: BaseModel

Response model for health check

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Parameters:
  • status (str)

  • service (str)

  • provider (str | None)

  • model (str | None)

  • device (str | None)

  • vector_dimension (int)

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class embedding_service.main.ServiceInfoResponse(*, provider, model, device, vector_dimension, batch_size, normalize_embeddings, cache_enabled=None, cache_max_size=None, current_cache_size=None, num_api_keys=None)[source]

Bases: BaseModel

Response model for service metadata

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Parameters:
  • provider (str | None)

  • model (str | None)

  • device (str | None)

  • vector_dimension (int)

  • batch_size (int | None)

  • normalize_embeddings (bool)

  • cache_enabled (bool | None)

  • cache_max_size (int | None)

  • current_cache_size (int | None)

  • num_api_keys (int | None)

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

async embedding_service.main.health_check(request)[source]

Health check

Parameters:

request (Request)

Return type:

HealthResponse

async embedding_service.main.embed_texts(request, http_request)[source]

Generate embeddings for texts using configured provider

Supports: - Mistral AI (with Redis cache) - OpenAI - Local models (with in-memory LRU cache)

Parameters:
Return type:

EmbedResponse

async embedding_service.main.embed_single(text, http_request)[source]

Embed a single text (convenience endpoint)

Parameters:
  • text (str)

  • http_request (Request)

Return type:

EmbedSingleResponse

async embedding_service.main.service_info(request)[source]

Get embedding service configuration info

Parameters:

request (Request)

Return type:

ServiceInfoResponse

async embedding_service.main.reload_model(request)[source]

Hot-reload: re-read the config override file and reinitialize the embedding model. Called by the api-gateway after writing a new embedding.yaml override.

Parameters:

request (Request)

Return type:

ServiceInfoResponse

async embedding_service.main.metrics()[source]

Expose Prometheus metrics for the embedding service.

Return type:

PlainTextResponse

embedding_service.service_metrics

Source: services/embedding-service/service_metrics.py

Prometheus metrics helpers for embedding-service.

embedding_service.service_metrics.observe_http_request(*, path, method, status_code, duration_seconds)[source]

Record one HTTP request handled by the embedding service.

Parameters:
  • path (str)

  • method (str)

  • status_code (int)

  • duration_seconds (float)

Return type:

None

embedding_service.service_metrics.observe_embed_request(*, endpoint, provider, batch_size, duration_seconds, outcome)[source]

Record one embedding request outcome and latency.

Parameters:
  • endpoint (str)

  • provider (str)

  • batch_size (int)

  • duration_seconds (float)

  • outcome (str)

Return type:

None

embedding_service.service_metrics.observe_embed_error(*, endpoint, provider, exc_or_reason)[source]

Record one embedding request error.

Parameters:
  • endpoint (str)

  • provider (str)

  • exc_or_reason (Any)

Return type:

None

embedding_worker

Source: services/embedding-worker/__init__.py

Embedding worker package.

embedding_worker.bootstrap

Source: services/embedding-worker/bootstrap.py

Bootstrap for embedding worker

class embedding_worker.bootstrap.EmbeddingComponents(pg_repo, qdrant_chunks, qdrant_acts, embedding_service, payload_builder, preset_id)[source]

Bases: object

Long-lived dependencies used by the embedding worker.

Parameters:
embedding_worker.bootstrap.init_components()[source]

Initialize embedding worker components

Return type:

EmbeddingComponents

embedding_worker.service_metrics

Source: services/embedding-worker/service_metrics.py

Prometheus metrics helpers for embedding-worker.

embedding_worker.worker

Source: services/embedding-worker/worker.py

Embedding Worker Processes embedding jobs from Redis queue Creates vector embeddings for chunks and acts and stores them in Qdrant

class embedding_worker.worker.WorkerRuntime(redis_client, job_ttl_seconds, embed_worker_max_batch_size, embed_qdrant_upsert_batch_size, brpop_timeout_seconds, embed_queue_name, preset_id, components=None)[source]

Bases: QueueRuntime

Runtime state and lazy dependencies for the embedding worker loop.

Parameters:
  • redis_client (Any)

  • job_ttl_seconds (int)

  • embed_worker_max_batch_size (int)

  • embed_qdrant_upsert_batch_size (int)

  • brpop_timeout_seconds (int)

  • embed_queue_name (str)

  • preset_id (str)

  • components (EmbeddingComponents | None)

ensure_components()[source]

Initialize worker dependencies on first use and return them.

Return type:

EmbeddingComponents

embedding_worker.worker.build_runtime()[source]

Build the queue runtime used by the embedding worker.

Return type:

WorkerRuntime

embedding_worker.worker.process_embed_all(runtime, job_id, params)[source]

Dispatch embed_act jobs for all acts (event-driven pipeline).

Parameters:
Return type:

None

embedding_worker.worker.process_embed_act(runtime, job_id, params)[source]

Embed chunks and (EUR-Lex only) whole-act vector for one act.

Parameters:
Return type:

None

embedding_worker.worker.process_job(runtime, job_data)[source]

Dispatch one embedding-related job through the instrumented worker loop.

Parameters:
Return type:

None

embedding_worker.worker.main()[source]

Start the embedding worker loop and its Prometheus endpoint.

Return type:

None

extraction_worker

Source: services/extraction-worker/__init__.py

Extraction worker package.

extraction_worker.bootstrap

Source: services/extraction-worker/bootstrap.py

Bootstrap for extraction worker.

class extraction_worker.bootstrap.ExtractionComponents(pg_repo, neo4j_repo, relation_graph, act_summary_service)[source]

Bases: object

Long-lived dependencies used by the extraction worker.

Parameters:
extraction_worker.bootstrap.init_components()[source]

Initialize extraction worker components.

Return type:

ExtractionComponents

extraction_worker.graph

Source: services/extraction-worker/graph/__init__.py

Graph extraction: relation persistence and community detection.

extraction_worker.graph.community_builder

Source: services/extraction-worker/graph/community_builder.py

Graph community detection for extraction worker.

Mirrors the logic in scripts/build_communities.py but runs inside the worker process, reusing the already-open Neo4j driver from bootstrap.

Communities are persisted as :Community nodes with BELONGS_TO relationships in Neo4j (no JSON file on disk).

Community summaries are generated by LLM (GraphRAG-style) when a client is available, with deterministic fallback.

extraction_worker.graph.community_builder.run_community_detection(*, driver, database, resolution=1.0, min_community_size=2, llm_client=None)[source]

Full community detection pipeline: extract → detect → summarize → persist to Neo4j.

When llm_client is provided, community summaries are generated by LLM (GraphRAG-style). Falls back to deterministic summaries on failure.

Returns a summary dict with status, counts, and modularity.

Parameters:
  • driver (Any)

  • database (str)

  • resolution (float)

  • min_community_size (int)

  • llm_client (JSONHTTPLLMClient | None)

Return type:

Dict[str, Any]

extraction_worker.graph.relation_service

Source: services/extraction-worker/graph/relation_service.py

Relation graph service.

Orchestrates deterministic relation extraction, persistence, and Neo4j sync.

class extraction_worker.graph.relation_service.RelationGraphService(pg_repo, neo4j_repo, min_confidence, max_chunk_size)[source]

Bases: object

Service for extracting and synchronizing legal relationships

Responsibilities: - Extract relations from act text using RelationExtractor - Store relations in PostgreSQL - Sync relations to Neo4j graph - Orchestrate the full workflow

Does NOT: - Parse text (uses RelationExtractor) - Access databases directly (uses repositories)

Initialize relation graph service

Parameters:
  • pg_repo (PostgresRepository) – PostgreSQL repository

  • neo4j_repo (Neo4jRepository | None) – Neo4j repository (optional, for direct sync)

  • min_confidence (float | None) – Minimum confidence threshold for relations. If None, uses config.gateway.job_extract_min_confidence.

  • max_chunk_size (int | None) – Maximum text chunk size for extraction. If None, uses config.chunking.extraction_max_chunk_chars.

extract_and_store_for_act(act_id=None, celex=None, sync_to_neo4j=True, force=False)[source]

Extract relations from a single act and store them

Parameters:
  • act_id (int | None) – Act ID (PostgreSQL)

  • celex (str | None) – CELEX identifier (alternative to act_id)

  • sync_to_neo4j (bool) – Whether to sync to Neo4j after storage

  • force (bool) – Re-extract even if already extracted

Returns:

Dictionary with extraction results

Return type:

Dict[str, Any]

extraction_worker.service_metrics

Source: services/extraction-worker/service_metrics.py

Prometheus extraction metrics recorder.

class extraction_worker.service_metrics.PrometheusExtractionMetricsRecorder[source]

Bases: ExtractionMetricsRecorder

Prometheus-backed recorder for extraction LLM metrics.

observe_call(*, provider, model, outcome, duration_seconds)[source]

Record one extraction LLM call latency and outcome.

Parameters:
  • provider (str)

  • model (str)

  • outcome (str)

  • duration_seconds (float)

Return type:

None

observe_error(*, provider, model, error_type)[source]

Record one extraction LLM error bucket.

Parameters:
  • provider (str)

  • model (str)

  • error_type (str)

Return type:

None

observe_json_parse(*, provider, model, parse_mode)[source]

Record how extraction output was parsed into JSON.

Parameters:
  • provider (str)

  • model (str)

  • parse_mode (str)

Return type:

None

observe_relations(*, provider, model, count)[source]

Record the number of relations produced per extraction call.

Parameters:
  • provider (str)

  • model (str)

  • count (int)

Return type:

None

extraction_worker.worker

Source: services/extraction-worker/worker.py

Extraction Worker Processes relation extraction jobs from Redis queue

class extraction_worker.worker.WorkerRuntime(redis_client, job_ttl_seconds, brpop_timeout_seconds, components=None)[source]

Bases: QueueRuntime

Runtime state and lazy dependencies for the extraction worker loop.

Parameters:
  • redis_client (Any)

  • job_ttl_seconds (int)

  • brpop_timeout_seconds (int)

  • components (ExtractionComponents | None)

ensure_components()[source]

Initialize worker dependencies on first use and return them.

Return type:

ExtractionComponents

extraction_worker.worker.build_runtime()[source]

Build the queue runtime used by the extraction worker.

Return type:

WorkerRuntime

extraction_worker.worker.process_extract_all(runtime, job_id, params)[source]

Dispatch extract_act jobs for all acts (event-driven pipeline).

Parameters:
Return type:

None

extraction_worker.worker.process_extract_act(runtime, job_id, params)[source]

Process extract single act job

Parameters:
Return type:

None

extraction_worker.worker.process_summarize_all(runtime, job_id, params)[source]

Enqueue summarize jobs for every act missing a canonical summary.

Parameters:
Return type:

None

extraction_worker.worker.process_summarize_act(runtime, job_id, params)[source]

Refresh the canonical summary for one CELEX identifier.

Parameters:
Return type:

None

extraction_worker.worker.process_build_communities(runtime, job_id, params)[source]

Detect Louvain communities on the Act graph and write back to Neo4j.

Parameters:
Return type:

None

extraction_worker.worker.process_job(runtime, job_data)[source]

Dispatch one extraction-related job through the instrumented worker loop.

Parameters:
Return type:

None

extraction_worker.worker.main()[source]

Start the extraction worker loop and its Prometheus endpoint.

Return type:

None

rag_service

Source: services/rag-service/__init__.py

RAG service package.

rag_service.bootstrap

Source: services/rag-service/bootstrap.py

Bootstrap for RAG service

class rag_service.bootstrap.RagComponents(rag_service, retrieval_service, context_service, graph_rag_service, conversation_manager, pg_repo, key_pool=None, entity_linker=None, external_detector=None)[source]

Bases: object

Long-lived dependencies exposed by the rag-service bootstrap.

Parameters:
rag_service.bootstrap.ensure_graph_rag_service(components)[source]

Initialize GraphRAGService lazily if it is not available yet.

This allows graph mode to recover when Neo4j becomes available after startup.

Parameters:

components (RagComponents)

Return type:

GraphRAGService | None

rag_service.bootstrap.init_components()[source]

Initialize RAG service components

rag_service.conversation

Source: services/rag-service/conversation.py

Conversation history management for multi-turn RAG.

Handles loading, trimming, and persisting conversation turns. History is returned as LangChain BaseMessage objects ready for injection into a ChatPromptTemplate via MessagesPlaceholder.

class rag_service.conversation.ConversationContext(conversation_id, is_new, history_messages=<factory>)[source]

Bases: object

Result of loading conversation history.

Parameters:
  • conversation_id (str)

  • is_new (bool)

  • history_messages (List[BaseMessage])

class rag_service.conversation.ConversationManager(pg_repo, llm=None, max_history_chars=4_000, max_turns=10, summary_max_chars=_DEFAULT_SUMMARY_MAX_CHARS, recent_history_chars=_DEFAULT_RECENT_HISTORY_CHARS)[source]

Bases: object

Load and persist multi-turn conversation state backed by PostgreSQL.

Parameters:
  • pg_repo (PostgresRepository)

  • llm (Any | None)

  • max_history_chars (int)

  • max_turns (int)

  • summary_max_chars (int)

  • recent_history_chars (int)

load_history(conversation_id, question, user_id=None)[source]

Load (or create) a conversation and return its history.

When conversation_id is None, auto-creates a new conversation.

Parameters:
  • conversation_id (str | None)

  • question (str)

  • user_id (str | None)

Return type:

ConversationContext

save_turn(conversation_id, question, answer, query_id, mode=None, sources=None, timings=None, steps=None)[source]

Persist human + assistant messages and return the assistant message id.

Parameters:
  • conversation_id (str)

  • question (str)

  • answer (str)

  • query_id (str)

  • mode (str | None)

  • sources (Any | None)

  • timings (dict | None)

  • steps (list | None)

Return type:

str

rag_service.graph

Source: services/rag-service/graph/__init__.py

Graph RAG components for rag-service.

rag_service.graph.cypher_branch

Source: services/rag-service/graph/cypher_branch.py

Cypher branch — orchestration layer.

Collects Cypher-derived graph evidence to enrich (not replace) the main RAG answer. Pipeline: intent → template → Text2Cypher → NL→Cypher fallback.

Routing and template selection are delegated to intent and templates modules.

class rag_service.graph.cypher_branch.CypherResolution(status, strategy, generated_cypher, rows, detail)[source]

Bases: object

Resolved Cypher-support result returned by the graph branch.

Parameters:
  • status (str)

  • strategy (str)

  • generated_cypher (str | None)

  • rows (List[Dict[str, Any]])

  • detail (str)

async rag_service.graph.cypher_branch.collect_cypher_support(*, query_request, graph_rag_service, rag_service, config, graph_depth, strict_graph_mode)[source]

Collect Cypher-derived graph evidence without replacing the main RAG answer.

Parameters:
  • query_request (QueryRequest)

  • graph_rag_service (Any)

  • rag_service (Any)

  • config (Any)

  • graph_depth (int)

  • strict_graph_mode (bool)

Return type:

tuple[Dict[str, Any] | None, Dict[str, Any]]

rag_service.graph.intent

Source: services/rag-service/graph/intent.py

Relation intent detection — regex keywords + embedding similarity fallback.

Determines whether a user question has relational intent (i.e. asks about links, amendments, chains between legal acts) and if so, which direction (outgoing, incoming, or both) and which relation types are relevant.

class rag_service.graph.intent.RelationIntent(has_intent, direction='both', relation_types=<factory>, celex_candidates=<factory>, wants_direct=False, wants_path=False)[source]

Bases: object

Result of analyzing a question for relational intent.

Parameters:
  • has_intent (bool)

  • direction (Literal['outgoing', 'incoming', 'both'])

  • relation_types (List[str])

  • celex_candidates (List[str])

  • wants_direct (bool)

  • wants_path (bool)

rag_service.graph.intent.analyze_relation_intent(question, *, embedding_service=None)[source]

Analyze a question and return structured relational intent.

Parameters:
  • question (str)

  • embedding_service (Any)

Return type:

RelationIntent

rag_service.graph.templates

Source: services/rag-service/graph/templates.py

Cypher template library — deterministic, direction-aware query strategies.

Each strategy produces a ready-to-execute Cypher query from structured intent. Relations in Neo4j are directed: (source)-[:AMENDS]->(target) means “source amends target”. Templates preserve this semantics.

class rag_service.graph.templates.CypherTemplateMatch(strategy, cypher, detail)[source]

Bases: object

Deterministic Cypher template selected for a relation intent.

Parameters:
  • strategy (str)

  • cypher (str)

  • detail (str)

rag_service.graph.templates.match_template(intent, *, max_graph_depth, row_limit)[source]

Select the best deterministic Cypher template from structured intent.

Parameters:
Return type:

CypherTemplateMatch | None

rag_service.main

Source: services/rag-service/main.py

RAG Service - Lalandre Handles RAG queries (retrieval + generation)

rag_service.main.lifespan(app)[source]

Initialize system components.

Parameters:

app (FastAPI)

async rag_service.main.health_check(request)[source]

Health check endpoint.

Parameters:

request (Request)

Return type:

HealthResponse

async rag_service.main.metrics(request)[source]

Expose Prometheus metrics after refreshing backend health probes.

Parameters:

request (Request)

Return type:

PlainTextResponse

async rag_service.main.get_config_info()[source]

Return read-only generation and search configuration.

rag_service.metrics

Source: services/rag-service/metrics/__init__.py

Prometheus metrics for rag-service.

rag_service.metrics.retrieval

Source: services/rag-service/metrics/retrieval.py

Prometheus backend for lalandre_rag runtime retrieval metrics.

class rag_service.metrics.retrieval.PrometheusRetrievalMetricsRecorder[source]

Bases: RetrievalMetricsRecorder

Prometheus-backed recorder for retrieval phase metrics.

observe_phase(*, operation, phase, duration_seconds)[source]

Record one retrieval phase latency.

Parameters:
  • operation (str)

  • phase (str)

  • duration_seconds (float)

Return type:

None

observe_error(*, operation, phase, exc_or_reason)[source]

Record one retrieval error classified by provider and type.

Parameters:
  • operation (str)

  • phase (str)

  • exc_or_reason (Any)

Return type:

None

rag_service.metrics.service

Source: services/rag-service/metrics/service.py

Prometheus helpers for rag-service runtime metrics.

rag_service.metrics.service.observe_http_request(*, path, method, status_code, duration_seconds)[source]

Record one HTTP request handled by rag-service.

Parameters:
  • path (str)

  • method (str)

  • status_code (int)

  • duration_seconds (float)

Return type:

None

rag_service.metrics.service.observe_query_request(*, mode, granularity, top_k, duration_seconds, outcome, metadata=None)[source]

Record one /query request handled by rag-service.

Parameters:
  • mode (str | None)

  • granularity (str | None)

  • top_k (int | None)

  • duration_seconds (float)

  • outcome (str)

  • metadata (Dict[str, Any] | None)

Return type:

None

rag_service.metrics.service.observe_search_request(*, mode, granularity, top_k, duration_seconds, outcome)[source]

Record one /search request handled by rag-service.

Parameters:
  • mode (str | None)

  • granularity (str | None)

  • top_k (int | None)

  • duration_seconds (float)

  • outcome (str)

Return type:

None

rag_service.metrics.service.observe_provider_error(*, mode, stage, exc_or_reason)[source]

Record one provider-side error or fallback reason.

Parameters:
  • mode (str | None)

  • stage (str)

  • exc_or_reason (Any)

Return type:

None

rag_service.metrics.service.observe_provider_fallbacks(*, mode, metadata)[source]

Emit provider fallback counters derived from response metadata.

Parameters:
  • mode (str | None)

  • metadata (Dict[str, Any])

Return type:

None

rag_service.metrics.service.infer_query_outcome(metadata)[source]

Infer the outcome label used for query-level service metrics.

Parameters:

metadata (Dict[str, Any] | None)

Return type:

str

rag_service.metrics.service.refresh_backend_health(components)[source]

Probe initialized dependencies and update backend health gauges.

Parameters:

components (RagComponents)

Return type:

None

rag_service.mode_handlers

Source: services/rag-service/mode_handlers.py

Mode-specific query handlers for the RAG service.

rag_service.mode_handlers.handle_llm_only_mode(*, query_request, rag_service)[source]

Execute the direct LLM-only answer path.

Parameters:
Return type:

tuple[str, Dict[str, Any] | None, Dict[str, Any]]

rag_service.mode_handlers.handle_summarize_mode(*, query_request, rag_service)[source]

Execute summarize mode and return answer, sources, and metadata.

Parameters:
Return type:

tuple[str, Dict[str, Any] | None, Dict[str, Any]]

rag_service.mode_handlers.handle_compare_mode(*, query_request, rag_service)[source]

Execute compare mode and return answer, sources, and metadata.

Parameters:
Return type:

tuple[str, Dict[str, Any] | None, Dict[str, Any]]

rag_service.mode_handlers.handle_rag_mode(*, query_request, rag_service, chat_history=None)[source]

Execute the default retrieval-augmented generation path.

Parameters:
  • query_request (QueryRequest)

  • rag_service (Any)

  • chat_history (Any)

Return type:

tuple[str, Dict[str, Any] | None, Dict[str, Any]]

rag_service.models

Source: services/rag-service/models.py

Pydantic models for the RAG service API.

class rag_service.models.QueryRequest(*, query_id, question, conversation_id=None, mode=None, top_k=None, min_score=None, sources=False, include_full_content=False, graph_depth=None, use_graph=None, graph_generation_timeout_seconds=None, graph_retrieval_timeout_seconds=None, graph_use_cypher=False, graph_cypher_timeout_seconds=None, graph_cypher_max_rows=None, granularity=None, embedding_preset=None, retrieval_depth=None, celex=None, compare_celex=None)[source]

Bases: BaseModel

Request model for RAG queries.

All retrieval parameters are optional. When omitted (None), the rag-service resolves them from SearchConfig defaults at request time so that app_config.yaml remains the single source of truth.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Parameters:
  • query_id (str)

  • question (str)

  • conversation_id (str | None)

  • mode (str | None)

  • top_k (int | None)

  • min_score (float | None)

  • sources (bool)

  • include_full_content (bool)

  • graph_depth (int | None)

  • use_graph (bool | None)

  • graph_generation_timeout_seconds (float | None)

  • graph_retrieval_timeout_seconds (float | None)

  • graph_use_cypher (bool)

  • graph_cypher_timeout_seconds (float | None)

  • graph_cypher_max_rows (int | None)

  • granularity (str | None)

  • embedding_preset (str | None)

  • retrieval_depth (str | None)

  • celex (str | None)

  • compare_celex (str | None)

classmethod resolve_mode_aliases(data)[source]

Resolve legacy mode aliases before validation.

Parameters:

data (Any)

Return type:

Any

normalize_celex_fields()[source]

Normalize CELEX fields after model validation succeeds.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class rag_service.models.HealthResponse(*, status, service, components_initialized)[source]

Bases: BaseModel

Health response

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Parameters:
  • status (str)

  • service (str)

  • components_initialized (bool)

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

rag_service.routers

Source: services/rag-service/routers/__init__.py

Route registrations for rag-service endpoints.

rag_service.routers._deps

Source: services/rag-service/routers/_deps.py

Shared FastAPI dependencies for rag-service routers.

rag_service.routers._deps.get_components(request)[source]

Return initialized rag-service components from the FastAPI state.

Parameters:

request (Request)

Return type:

RagComponents

rag_service.routers._deps.extract_user_id(request)[source]

Extract user_id from X-User-Id header (set by api-gateway from JWT sub claim).

Parameters:

request (Request)

Return type:

str | None

rag_service.routers.conversations

Source: services/rag-service/routers/conversations.py

Conversation history endpoints (list / messages / delete).

async rag_service.routers.conversations.list_conversations(components=Depends(get_components), user_id=None, limit=50)[source]

List conversations for a user, ordered by most recent.

Parameters:
async rag_service.routers.conversations.get_conversation_messages(conversation_id, components=Depends(get_components), user_id=None)[source]

Get messages for a conversation (with ownership check).

Parameters:
  • conversation_id (str)

  • components (RagComponents)

  • user_id (str | None)

async rag_service.routers.conversations.delete_conversation(conversation_id, components=Depends(get_components), user_id=None)[source]

Delete a conversation (with ownership check).

Parameters:
  • conversation_id (str)

  • components (RagComponents)

  • user_id (str | None)

rag_service.routers.query

Source: services/rag-service/routers/query.py

POST /query endpoint — synchronous RAG query handler.

rag_service.routers.query.apply_config_defaults_query(query_request)[source]

Fill None fields with config defaults (mutates in place).

Parameters:

query_request (QueryRequest)

Return type:

None

rag_service.routers.query.validate_query_mode_and_granularity(query_request)[source]

Validate the requested query mode and granularity combination.

Parameters:

query_request (QueryRequest)

Return type:

None

rag_service.routers.query.build_query_response(*, query_request, answer, sources, search_metadata)[source]

Assemble the normalized /query response payload.

Parameters:
  • query_request (QueryRequest)

  • answer (str)

  • sources (Dict[str, Any] | None)

  • search_metadata (Dict[str, Any] | None)

Return type:

QueryResponse

async rag_service.routers.query.process_query(query_request, components=Depends(get_components), user_id=Depends(extract_user_id))[source]

Process a RAG query

Modes: - llm_only: LLM generation without retrieval - rag: Standard RAG (retrieval + LLM) - summarize: Summarization of retrieved documents - compare: Compare two acts

Parameters:
Return type:

QueryResponse

rag_service.routers.stream

Source: services/rag-service/routers/stream.py

POST /query/stream endpoint — SSE streaming query handler.

async rag_service.routers.stream.process_query_stream(query_request, components=Depends(get_components), user_id=Depends(extract_user_id))[source]

SSE streaming variant of /query. Streams LLM tokens as they are generated.

Parameters:

rerank_service

Source: services/rerank-service/__init__.py

Rerank service package entrypoint for documentation and local tooling.

rerank_service.main

Source: services/rerank-service/main.py

Rerank Service — dedicated cross-encoder reranking for Lalandre.

Loads BAAI/bge-reranker-v2-m3 at startup and exposes a /rerank endpoint.

rerank_service.main.lifespan(app)[source]

Load the reranker model during FastAPI startup.

Parameters:

app (FastAPI)

async rerank_service.main.health()[source]

Return rerank-service readiness and current model metadata.

async rerank_service.main.rerank(request)[source]

Rerank the supplied documents for the given query.

Parameters:

request (RerankRequest)

async rerank_service.main.reload_model()[source]

Reload the reranker model from the current config (app_config.yaml).

rerank_service.models

Source: services/rerank-service/models.py

Request/response models for the rerank service.

class rerank_service.models.RerankDocument(*, id, content)[source]

Bases: BaseModel

Document candidate accepted by the rerank endpoint.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Parameters:
  • id (str)

  • content (str)

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class rerank_service.models.RerankRequest(*, query, documents, top_k=None)[source]

Bases: BaseModel

Request payload for reranking a list of document candidates.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Parameters:
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class rerank_service.models.RerankResult(*, id, score, rank)[source]

Bases: BaseModel

Single reranked document with its score and rank.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Parameters:
  • id (str)

  • score (float)

  • rank (int)

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class rerank_service.models.RerankResponse(*, results, model, duration_ms)[source]

Bases: BaseModel

Response payload returned by the rerank endpoint.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Parameters:
  • results (list[RerankResult])

  • model (str)

  • duration_ms (float)

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].