"""
Configuration management for project
"""
from pathlib import Path
from typing import Any, Dict, Optional, cast, get_args
import yaml
from pydantic import BaseModel, Field
from pydantic_settings import BaseSettings, SettingsConfigDict
DEFAULT_APP_CONFIG_FILE = Path(__file__).resolve().with_name("app_config.yaml")
[docs]
class EnvSettings(BaseSettings):
"""Environment-backed settings loaded before the YAML application config."""
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
env_ignore_empty=True,
extra="ignore",
)
APP_CONFIG_FILE: Optional[str] = None
APP_CONFIG_OVERRIDE_FILE: Optional[str] = None
GATEWAY_ALLOWED_ORIGINS: Optional[str] = None
DB_PASSWORD: Optional[str] = None
QDRANT_API_KEY: Optional[str] = None
NEO4J_PASSWORD: Optional[str] = None
LLM_API_KEY: Optional[str] = None
SEARCH_INTENT_PARSER_API_KEY: Optional[str] = None
MISTRAL_API_KEY: Optional[str] = None
MISTRAL_API_KEY_2: Optional[str] = None
MISTRAL_API_KEY_3: Optional[str] = None
MISTRAL_API_KEY_4: Optional[str] = None
MISTRAL_API_KEY_5: Optional[str] = None
MISTRAL_API_KEY_6: Optional[str] = None
MISTRAL_API_KEY_7: Optional[str] = None
MISTRAL_API_KEY_8: Optional[str] = None
MISTRAL_API_KEY_9: Optional[str] = None
MISTRAL_API_KEY_10: Optional[str] = None
[docs]
class DatabaseConfig(BaseModel):
"""PostgreSQL database configuration."""
host: Optional[str] = Field(default=None, description="DB host (use 'localhost' outside Docker)")
port: Optional[int] = Field(default=None)
database: Optional[str] = Field(default=None)
user: Optional[str] = Field(default=None)
password: Optional[str] = Field(default=None)
@property
def connection_string(self) -> str:
"""Return a PostgreSQL connection string for the configured database."""
if not all([self.user, self.password, self.host, self.port, self.database]):
raise ValueError(
"Database configuration incomplete. "
"Set database.host, database.port, database.database, "
"database.user, and provide DB_PASSWORD."
)
return f"postgresql://{self.user}:{self.password}@{self.host}:{self.port}/{self.database}"
[docs]
class VectorConfig(BaseModel):
"""Qdrant configuration."""
host: Optional[str] = Field(default=None, description="Qdrant host (use 'localhost' outside Docker)")
port: Optional[int] = Field(default=None)
api_key: Optional[str] = Field(default=None, description="API key for authentication (required in Docker)")
collection_chunks: Optional[str] = Field(default=None, description="Base name for chunks collection")
collection_acts: Optional[str] = Field(
default=None, description="Base name for acts collection (one vector per act, full-text embedding)"
)
vector_size: int = Field(default=1024)
timeout: int = Field(default=30, description="Client timeout in seconds")
use_https: bool = Field(
default=False,
description="Use HTTPS for Qdrant connection (required when QDRANT_API_KEY is set in production)",
)
[docs]
class GraphConfig(BaseModel):
"""Neo4j configuration."""
uri: Optional[str] = Field(default=None, description="Neo4j URI (use 'bolt://localhost:7687' outside Docker)")
user: Optional[str] = Field(default=None)
password: Optional[str] = None
database: Optional[str] = Field(default=None)
max_connection_lifetime: int = Field(default=3600)
max_connection_pool_size: int = Field(default=50)
connection_timeout: int = Field(default=30)
strict_mode: bool = Field(
default=False,
description="If true, graph query budgets are fixed by GRAPH_* env values.",
)
acts_limit: int = Field(
default=10,
description="Max number of related acts injected into graph mode context.",
)
relationships_limit: int = Field(
default=20,
description="Max number of graph relationships injected into context.",
)
depth: int = Field(
default=2,
description="Default graph traversal depth for graph mode.",
)
cypher_timeout_seconds: float = Field(
default=30.0,
description="Default timeout (seconds) for NL->Cypher generation.",
)
cypher_max_rows: int = Field(
default=80,
description="Default maximum Cypher result rows for graph_use_cypher mode.",
)
# ── Relation type weights ────────────────────────────────────────
ranking_relation_weights: Dict[str, float] = Field(
default_factory=lambda: {
"AMENDS": 1.0,
"IMPLEMENTS": 1.0,
"REPEALS": 0.9,
"REPLACES": 0.9,
"CORRECTS": 0.8,
"SUPPLEMENTS": 0.7,
"DEROGATES": 0.6,
"CITES": 0.4,
"RELATED_TO": 0.2,
},
description="Relation type weights for graph node ranking (normalized 0-1).",
)
ranking_default_relation_weight: float = Field(
default=0.3,
description="Default weight for unknown relation types in ranking.",
)
community_relation_weights: Dict[str, float] = Field(
default_factory=lambda: {
"AMENDS": 3.0,
"IMPLEMENTS": 3.0,
"REPEALS": 2.5,
"REPLACES": 2.5,
"CORRECTS": 2.0,
"SUPPLEMENTS": 1.5,
"DEROGATES": 1.0,
"CITES": 0.5,
},
description="Relation type weights for Louvain community detection.",
)
community_default_relation_weight: float = Field(
default=0.5,
description="Default weight for unknown relation types in community detection.",
)
# ── Graph RAG ranking (Level 1) ──────────────────────────────────
ranking_hop_decay: float = Field(
default=0.5,
description="Decay factor per hop in graph node scoring (0-1).",
)
ranking_semantic_boost: float = Field(
default=0.3,
description="Score boost for nodes also found in semantic search.",
)
ranking_relation_weight_factor: float = Field(
default=0.25,
description="Weight of relation-type score in the final ranking formula.",
)
# ── Graph RAG context budget (Level 1) ───────────────────────────
budget_semantic_share: float = Field(
default=0.60,
description="Fraction of context budget for semantic search content.",
)
budget_graph_share: float = Field(
default=0.30,
description="Fraction of context budget for graph-expanded act info.",
)
budget_relation_share: float = Field(
default=0.10,
description="Fraction of context budget for relationship descriptions.",
)
# ── Graph RAG map-reduce (Level 2) ───────────────────────────────
map_reduce_threshold: int = Field(
default=24000,
description="Context char threshold above which map-reduce generation kicks in.",
)
map_reduce_chunk_chars: int = Field(
default=5000,
description="Target chars per chunk for map-reduce splitting.",
)
map_reduce_max_parallel: int = Field(
default=3,
description="Max parallel map calls to avoid overwhelming the LLM.",
)
map_reduce_map_timeout: float = Field(
default=45.0,
description="Timeout (seconds) for each map LLM call.",
)
map_reduce_reduce_timeout: float = Field(
default=50.0,
description="Timeout (seconds) for the reduce LLM call.",
)
# ── Graph expansion query tuning ────────────────────────────────
expansion_relation_types: list[str] = Field(
default_factory=lambda: [
"AMENDS",
"IMPLEMENTS",
"REPEALS",
"REPLACES",
"CORRECTS",
"SUPPLEMENTS",
"DEROGATES",
],
description=(
"Relation types traversed during graph expansion. "
"Excluding low-signal types (e.g. CITES) prevents "
"combinatorial explosion at depth >= 3."
),
)
expansion_max_related_per_node: int = Field(
default=50,
description="Max related acts collected per seed node in graph expansion.",
)
expansion_max_relationships_per_node: int = Field(
default=100,
description="Max relationship lists collected per seed node in graph expansion.",
)
# ── Hybrid RAG graph enrichment ───────────────────────────────────
use_graph_in_rag: bool = Field(
default=True,
description=(
"If true, the RAG mode automatically enriches context with graph "
"relationships and communities when Neo4j is available, even for "
"generic questions (contextual_default profile)."
),
)
hybrid_enrichment_depth: int = Field(
default=2,
description="Graph traversal depth for graph enrichment in hybrid RAG mode.",
)
use_communities_in_rag: bool = Field(
default=True,
description=(
"If true, inject Neo4j community summaries into RAG context "
"(not just global mode). Useful for multi-act queries."
),
)
# ── Community display tuning ─────────────────────────────────────
community_central_act_title_chars: int = Field(
default=60,
description="Max chars for central act title display in community context.",
)
community_central_acts_display: int = Field(
default=3,
description="Max central acts displayed per community in graph context.",
)
[docs]
class TokenLimitsConfig(BaseModel):
"""Token limits for API models."""
embedding_max_input_tokens: int = Field(default=8192, description="Max tokens per embedding input (Mistral Embed)")
chars_per_token: float = Field(default=3.3, description="Average characters per token (French)")
embedding_safety_ratio: float = Field(
default=0.9,
description="Safety margin to stay below embedding model max input (0.9 = 10% headroom).",
)
@property
def embedding_max_chars(self) -> int:
"""Max characters for embedding based on token limit."""
return int(self.embedding_max_input_tokens * self.chars_per_token)
[docs]
class EmbeddingPresetConfig(BaseModel):
"""Named embedding runtime preset used for indexing and query-time routing."""
preset_id: str = Field(description="Stable preset identifier used by the frontend and RAG routing.")
provider: str = Field(description="Embedding provider name.")
model_name: str = Field(description="Embedding model identifier.")
device: str = Field(default="cpu", description="Runtime device for local models.")
label: str = Field(description="Human-readable label for UI display.")
enabled: bool = Field(
default=True,
description="Whether this preset is available for query-time routing and UI exposure.",
)
indexing_enabled: bool = Field(
default=True,
description="Whether async indexing workers should enqueue embedding jobs for this preset.",
)
queue_name: Optional[str] = Field(
default=None,
description="Dedicated Redis queue for this preset's embedding worker.",
)
vector_size: int = Field(default=1024, description="Expected embedding vector dimension.")
[docs]
def resolved_queue_name(self) -> str:
"""Return the queue name used by the embedding worker for this preset."""
if self.queue_name and self.queue_name.strip():
return self.queue_name.strip()
normalized_preset = self.preset_id.strip().replace("-", "_")
return f"embed_jobs__{normalized_preset}"
[docs]
class EmbeddingConfig(BaseModel):
"""Embedding model configuration."""
provider: Optional[str] = Field(default=None)
model_name: Optional[str] = Field(default=None)
batch_size: Optional[int] = Field(default=None, description="Number of texts to embed in a single batch")
device: Optional[str] = Field(default=None)
cache_dir: Optional[str] = Field(default=None)
normalize_embeddings: bool = Field(
default=True, description="Normalize embeddings to unit length (recommended for cosine similarity)"
)
enable_cache: bool = Field(default=True, description="Enable in-memory cache for local models")
cache_max_size: int = Field(default=10000, description="Max number of embeddings to cache in memory")
redis_socket_timeout: int = Field(
default=2,
description="Socket connect/read timeout (seconds) for embedding Redis cache",
)
cache_ttl_seconds: int = Field(
default=604800,
description="Redis TTL (seconds) for embedding cache entries.",
)
retry_min_tokens: int = Field(
default=64,
description="Minimum token limit when retrying after truncation error.",
)
retry_fallback_threshold: int = Field(
default=96,
description="Token limit below which no further reduction is attempted.",
)
retry_reduction_factor: float = Field(
default=0.7,
description="Factor by which to reduce token limit on truncation retry.",
)
[docs]
class ChunkingEmbeddingConfig(BaseModel):
"""Embedding runtime used internally by the chunking algorithm."""
provider: str = Field(default="mistral")
model_name: str = Field(default="mistral-embed")
device: str = Field(default="cpu")
def _default_embedding_presets() -> list[EmbeddingPresetConfig]:
return [
EmbeddingPresetConfig(
preset_id="mistral",
provider="mistral",
model_name="mistral-embed",
device="cpu",
label="Mistral (API cloud)",
enabled=True,
queue_name="embed_jobs__mistral",
vector_size=1024,
),
EmbeddingPresetConfig(
preset_id="e5-multilingual",
provider="local",
model_name="intfloat/multilingual-e5-large-instruct",
device="cpu",
label="multilingual-e5-large (recommande FR/EU)",
enabled=True,
queue_name="embed_jobs__e5_multilingual",
vector_size=1024,
),
]
[docs]
class SearchConfig(BaseModel):
"""Search configuration."""
default_limit: int = Field(default=10, description="Default top_k when not specified in query")
default_mode: str = Field(default="rag", description="Default query mode when not specified")
default_search_mode: str = Field(default="hybrid", description="Default search mode when not specified")
default_granularity: str = Field(default="all", description="Default retrieval granularity when not specified")
default_embedding_preset: str = Field(
default="mistral",
description="Default embedding preset used when the request does not specify one.",
)
fulltext_language: Optional[str] = Field(default=None)
bm25_normalization: int = Field(default=32)
fusion_method: Optional[str] = Field(default=None, description="Fusion method: rrf or weighted")
lexical_weight: Optional[float] = Field(default=None, description="Weight for BM25 scores (0-1)")
semantic_weight: Optional[float] = Field(default=None, description="Weight for semantic scores (0-1)")
candidate_multiplier: Optional[float] = Field(default=None, description="Multiplier for candidate pool size")
min_candidates: Optional[int] = Field(default=None, description="Minimum candidates for fusion")
max_candidates: int = Field(default=200, description="Hard cap on retrieval candidate pool size")
semantic_per_collection_oversampling: float = Field(
default=1.25,
description="Oversampling factor for per-collection semantic retrieval budget",
)
hnsw_ef: Optional[int] = Field(
default=None,
description="Optional Qdrant ANN runtime parameter (higher = more accurate, slower)",
)
exact_search: bool = Field(
default=False,
description="Use exact vector search in Qdrant (slower, mainly for evaluation)",
)
query_expansion_enabled: bool = Field(
default=True,
description="Enable deterministic multi-query expansion before retrieval.",
)
query_expansion_max_variants: int = Field(
default=3,
description="Maximum number of expanded query variants (including original).",
)
query_expansion_min_query_chars: int = Field(
default=24,
description="Skip expansion for short queries without explicit legal references.",
)
intent_parser_enabled: bool = Field(
default=False,
description=("Enable compact LLM parsing for query rewriting + intent routing before heuristic fallback."),
)
intent_parser_provider: Optional[str] = Field(
default=None,
description=("Optional parser LLM provider override. If unset, generation.provider is used."),
)
intent_parser_model: Optional[str] = Field(
default=None,
description="Optional parser model override. If unset, generation.model_name is used.",
)
intent_parser_base_url: Optional[str] = Field(
default=None,
description="Optional parser endpoint override. If unset, generation.base_url is used.",
)
intent_parser_api_key: Optional[str] = Field(
default=None,
description="Optional parser API key override.",
)
intent_parser_timeout_seconds: float = Field(
default=20.0,
description="Timeout for parser calls (seconds).",
)
intent_parser_temperature: float = Field(
default=0.0,
description="Sampling temperature for parser model.",
)
intent_parser_max_output_tokens: int = Field(
default=180,
description="Max output tokens for parser JSON response.",
)
rerank_enabled: bool = Field(default=True, description="Enable cross-encoder reranking")
rerank_model: str = Field(default="BAAI/bge-reranker-v2-m3", description="Cross-encoder model for reranking")
rerank_device: str = Field(default="cpu", description="Device for reranker: cpu, cuda, mps")
rerank_batch_size: int = Field(default=4, description="Batch size for reranker inference")
rerank_max_candidates: int = Field(default=5, description="Max candidates to rerank")
rerank_max_chars: int = Field(default=256, description="Max chars per document for reranker")
rerank_cache_dir: Optional[str] = Field(
default=None,
description="Optional cache directory for reranker model downloads.",
)
rerank_service_url: Optional[str] = Field(
default=None,
description="URL of the dedicated rerank service (e.g. http://rerank-service:8003). "
"When set, HTTP reranking is used instead of in-process CrossEncoder.",
)
rerank_service_timeout_seconds: float = Field(
default=15.0,
description="HTTP timeout for rerank service calls.",
)
rerank_fallback_to_skip: bool = Field(
default=True,
description="If rerank service is unreachable, skip reranking instead of failing.",
)
rerank_circuit_failure_threshold: int = Field(
default=2,
description="Open circuit breaker after this many consecutive rerank failures.",
)
rerank_circuit_cooldown_seconds: float = Field(
default=30.0,
description="Seconds to keep circuit breaker open before allowing a probe request.",
)
score_threshold_default: Optional[float] = Field(
default=0.15,
description="Default score threshold for retrieval. Results below this are filtered out.",
)
relevance_gate_threshold: Optional[float] = Field(
default=0.35,
description=(
"After retrieval, if the best result score is below this threshold, "
"bypass source-based generation and fall back to direct LLM response. "
"None = disabled (always use sources)."
),
)
max_lexical_query_chars: int = Field(
default=200,
description="Max chars for BM25 lexical query truncation.",
)
fts_max_lexemes: int = Field(
default=12,
description="Max unique lexemes in OR-tsquery. Caps blast radius of long questions.",
)
dynamic_fusion_enabled: bool = Field(
default=True,
description="Boost lexical weight when query contains an explicit legal reference (CELEX, directive…).",
)
lexical_boost_factor: float = Field(
default=1.8,
description="Multiplier applied to lexical_weight when a legal reference is detected in the query.",
)
lexical_boost_max: float = Field(
default=0.75,
description="Hard cap on the boosted lexical weight (0-1).",
)
result_cache_ttl_seconds: int = Field(
default=300,
description="TTL in seconds for Redis query result cache. 0 disables caching.",
)
# ── Query router profile thresholds ──────────────────────────────
query_router_broad_query_min_chars: int = Field(
default=220,
description="Query length (chars) above which a query is classified as global_overview.",
)
query_router_global_overview_min_top_k: int = Field(
default=10,
description="Minimum top_k for the global_overview retrieval profile.",
)
query_router_citation_precision_min_top_k: int = Field(
default=7,
description="Minimum top_k for the citation_precision retrieval profile.",
)
query_router_relationship_focus_min_top_k: int = Field(
default=8,
description="Minimum top_k for the relationship_focus retrieval profile.",
)
query_router_contextual_default_min_top_k: int = Field(
default=6,
description="Minimum top_k for the contextual_default retrieval profile.",
)
# ── Fusion tuning ────────────────────────────────────────────────
fusion_rrf_k: int = Field(
default=60,
description="Reciprocal Rank Fusion k constant. Higher = smoother ranking.",
)
# ── Query expansion strategy weights ─────────────────────────────
query_expansion_max_variants_cap: int = Field(
default=8,
description="Hard cap on the number of expansion variants generated.",
)
query_expansion_abbreviation_weight: float = Field(
default=0.96,
description="Weight for abbreviation normalization expansion variant.",
)
query_expansion_keyword_focus_weight: float = Field(
default=0.92,
description="Weight for keyword focus expansion variant.",
)
query_expansion_reference_focus_weight: float = Field(
default=0.90,
description="Weight for reference focus expansion variant.",
)
query_expansion_bilingual_weight: float = Field(
default=0.88,
description="Weight for bilingual mirror expansion variant.",
)
# ── Adaptive retrieval ──────────────────────────────────────────
adaptive_score_drop_threshold: Optional[float] = Field(
default=0.15,
description=(
"After reranking, if the score drop between consecutive results exceeds "
"this threshold, truncate the result list at that point. None = disabled."
),
)
# ── Agentic pipeline ───────────────────────────────────────────
complementary_max_queries: int = Field(
default=2,
description="Max complementary retrieval queries per request.",
)
complementary_top_k: int = Field(
default=5,
description="top_k for each complementary retrieval pass.",
)
compression_threshold_ratio: float = Field(
default=1.3,
description=(
"Trigger compression when total context chars exceed "
"budget * this ratio. 1.3 = compress when 30%% over budget."
),
)
# ── MMR diversity ──────────────────────────────────────────────
mmr_enabled: bool = Field(
default=True,
description="Enable act-based diversity: limit chunks per act in final result.",
)
mmr_max_per_act: int = Field(
default=2,
description="Max chunks returned per act_id when mmr_enabled=true.",
)
# ── CRAG self-correction loop ──────────────────────────────────
crag_enabled: bool = Field(
default=False,
description=(
"Enable Corrective RAG loop: evaluate retrieval sufficiency and "
"re-retrieve with a refined query when context is insufficient."
),
)
crag_max_iterations: int = Field(
default=1,
description="Max CRAG correction iterations (1 = one retry after initial retrieval).",
)
crag_skip_score_threshold: float = Field(
default=0.82,
description=(
"Skip CRAG entirely when the top retrieval score exceeds this threshold. "
"Set to 1.0 to disable score-based skipping."
),
)
# ── Parallelization ───────────────────────────────────────────
max_parallel_workers: int = Field(
default=4,
description="Max threads for parallel retrieval/context/compression tasks.",
)
# ── Query parser bounds ───────────────────────────────────────
query_parser_max_top_k: int = Field(
default=40,
description="Hard cap on top_k values returned by the LLM query parser.",
)
intent_parser_min_output_tokens: int = Field(
default=80,
description="Minimum floor for intent parser max_output_tokens.",
)
# ── Summary validation ────────────────────────────────────────
summary_min_chars: int = Field(
default=50,
description="Minimum character length for a valid generated summary.",
)
[docs]
class GenerationConfig(BaseModel):
"""LLM generation configuration."""
provider: str = Field(
default="mistral",
description="LLM provider: mistral or openai_compatible",
)
model_name: Optional[str] = Field(default=None)
temperature: Optional[float] = Field(
default=None, description="Temperature for generation (0=deterministic, 2=creative)"
)
max_tokens: int = Field(default=8000, description="Max tokens for LLM response (Mistral Large 2)")
max_context_chars: int = Field(default=20000, description="Max characters to inject as context into the LLM prompt")
summarize_max_context_chars: int = Field(
default=60000,
description="Max characters to inject as context in summarize/compare mode (larger budget than QA)",
)
base_url: Optional[str] = Field(default=None, description="Base URL for openai-compatible LLM endpoints")
mistral_base_url: str = Field(
default="https://api.mistral.ai/v1",
description="Default base URL for the Mistral API",
)
context_window: int = Field(default=32000, description="LLM context window size in tokens")
api_key: Optional[str] = Field(default=None, description="Optional API key for openai-compatible endpoints")
timeout_seconds: float = Field(default=45.0, description="HTTP timeout for LLM requests")
lightweight_model_name: Optional[str] = Field(
default=None,
description=(
"Smaller/faster model used for lightweight agentic tasks (CRAG eval, "
"refinement). Falls back to model_name when not set."
),
)
key_pool_max: int = Field(
default=10,
description="Max API keys to use from the pool (first N). Limits RAG to a subset so workers can use the rest.",
)
[docs]
class ChunkingConfig(BaseModel):
"""Chunking configuration."""
min_chunk_size: int = Field(description="Minimum chunk size in characters")
max_chunk_size: int = Field(
description="Maximum chunk size in characters",
)
chunk_overlap: int = Field(
default=0,
ge=0,
description=(
"Character overlap between consecutive chunks. "
"Recommended: 200-400 for legal text (~10-15%% of max_chunk_size)."
),
)
subdivision_max_chars: int = Field(default=30000, description="Max characters per subdivision (~9000 tokens)")
extraction_max_chunk_chars: int = Field(
default=3200,
description="Max characters per chunk for relation extraction (~900-1000 tokens)",
)
breakpoint_percentile: float = Field(
default=90.0,
ge=50.0,
le=99.9,
description=(
"Percentile threshold for semantic breakpoint detection. Higher values = fewer breakpoints = larger chunks."
),
)
breakpoint_max_threshold: float = Field(
default=1.0,
ge=0.0,
le=1.0,
description=(
"Absolute cap on the breakpoint similarity threshold. "
"1.0 = no cap (default). Lower values (e.g. 0.85) prevent splitting "
"text where all sentences remain highly similar."
),
)
sentence_window_size: int = Field(
default=1,
ge=1,
le=5,
description="Number of sentences to combine in a sliding window for smoother similarity curves.",
)
embedding_batch_size: int = Field(
default=32,
ge=1,
description="Batch size for sentence embedding API calls.",
)
article_level_chunking: bool = Field(
default=True,
description=(
"When True, articles from EUR-Lex and Légifrance are kept as a "
"single canonical chunk instead of being split by SAC. Oversized "
"articles stay intact; long-text handling is delegated to the "
"embedding layer."
),
)
embedding: ChunkingEmbeddingConfig = Field(default_factory=ChunkingEmbeddingConfig)
[docs]
def resolve_max_chunk_size(self, token_limits: "TokenLimitsConfig") -> int:
"""Cap max_chunk_size so it never exceeds the global embedding token budget.
Each embedding model handles its own per-provider limit at embed time
(split + weighted-average for oversized chunks). This guard only
prevents chunks from exceeding the *largest* model's hard ceiling.
"""
safe_limit = int(
token_limits.embedding_max_input_tokens * token_limits.chars_per_token * token_limits.embedding_safety_ratio
)
return min(self.max_chunk_size, safe_limit)
[docs]
class ContextBudgetConfig(BaseModel):
"""Token/character budgets used to compose RAG context blocks."""
rag_max_sources: int = Field(
default=10,
description="Max source documents injected in standard RAG mode.",
)
rag_min_chars_per_source: int = Field(
default=200,
description="Minimum content budget reserved per source in RAG mode.",
)
rag_relation_lines: int = Field(
default=8,
description="Max relation summary lines appended in RAG mode when enabled.",
)
global_reports_share: float = Field(
default=0.45,
description="Fraction of context reserved for community reports in global mode.",
)
global_sources_share: float = Field(
default=0.55,
description="Fraction of context reserved for source snippets in global mode.",
)
global_max_reports: int = Field(
default=4,
description="Max number of community reports injected in global mode.",
)
global_min_cluster_size: int = Field(
default=2,
description="Minimum acts per community report.",
)
global_max_evidence_per_report: int = Field(
default=3,
description="Max textual evidences kept per community report.",
)
global_max_source_docs: int = Field(
default=7,
description="Max source documents injected in global mode.",
)
# ── Budget fractions for context assembly ────────────────────────
standard_relation_budget_fraction: float = Field(
default=0.15,
description="Fraction of context budget for relation summary in standard QA mode.",
)
global_graph_budget_fraction: float = Field(
default=0.10,
description="Fraction of context budget for graph context in global mode.",
)
# ── Community report sub-limits ──────────────────────────────────
community_top_relation_types: int = Field(
default=5,
description="Max relation types shown per community report.",
)
community_central_acts: int = Field(
default=3,
description="Max central acts shown per community report.",
)
# ── Content preview lengths (response builder) ───────────────────
content_preview_chars: int = Field(
default=200,
description="Default content preview length in source documents.",
)
snippet_preview_chars: int = Field(
default=300,
description="Default snippet length in search results.",
)
fallback_preview_chars: int = Field(
default=180,
description="Fallback preview length when no explicit content available.",
)
# ── Context compression ───────────────────────────────────────
compression_min_chars: int = Field(
default=3000,
description="Minimum total context chars before compression is triggered.",
)
compression_min_budget: int = Field(
default=500,
description="Minimum character budget reserved per act during compression.",
)
[docs]
def normalized_global_shares(self) -> tuple[float, float]:
"""Return normalized `(reports_share, sources_share)` ratios for global mode."""
reports = max(float(self.global_reports_share), 0.0)
sources = max(float(self.global_sources_share), 0.0)
total = reports + sources
if total <= 0:
return 0.45, 0.55
return reports / total, sources / total
[docs]
class GatewayConfig(BaseModel):
"""API Gateway configuration."""
redis_host: Optional[str] = Field(default=None)
redis_port: Optional[int] = Field(default=None)
rag_service_url: Optional[str] = None
embedding_service_url: Optional[str] = None
rerank_service_url: Optional[str] = None
allowed_origins: Optional[list[str]] = None
auto_bootstrap: bool = Field(
default=False, description="Auto-start chunking/embedding/extraction on gateway startup."
)
job_ttl_seconds: Optional[int] = Field(default=None)
bootstrap_lock_ttl_seconds: Optional[int] = Field(default=None)
healthcheck_timeout_seconds: float = Field(default=5.0)
rag_proxy_timeout_seconds: float = Field(default=300.0)
rate_limit_query: str = Field(default="20/minute")
rate_limit_stream: str = Field(default="15/minute")
rate_limit_search: str = Field(default="30/minute")
rate_limit_jobs: str = Field(default="10/minute")
job_chunk_min_content_length: Optional[int] = None
job_embed_batch_size: Optional[int] = Field(default=None)
job_extract_min_confidence: Optional[float] = Field(default=None)
job_extract_skip_existing_default: Optional[bool] = Field(default=None)
[docs]
class WorkersConfig(BaseModel):
"""Worker runtime tuning."""
brpop_timeout_seconds: int = Field(default=1)
chunk_db_commit_batch_size: int = Field(default=10)
embed_worker_max_batch_size: int = Field(default=32)
embed_qdrant_upsert_batch_size: int = Field(default=1000)
auto_embed_reconcile: bool = Field(default=True)
auto_embed_reconcile_interval: int = Field(default=300)
auto_embed_reconcile_ttl: int = Field(default=600)
auto_chunk_reconcile: bool = Field(default=True)
auto_chunk_reconcile_interval: int = Field(default=300)
auto_chunk_reconcile_ttl: int = Field(default=600)
auto_extract_reconcile: bool = Field(default=True)
auto_extract_reconcile_interval: int = Field(default=600)
auto_extract_reconcile_ttl: int = Field(default=600)
extract_metrics_port: int = Field(
default=9107,
ge=1,
le=65535,
description="Port used by extraction-worker Prometheus metrics endpoint.",
)
embed_metrics_port: int = Field(
default=9108,
ge=1,
le=65535,
description="Port used by embedding-worker Prometheus metrics endpoint.",
)
chunk_metrics_port: int = Field(
default=9109,
ge=1,
le=65535,
description="Port used by chunking-worker Prometheus metrics endpoint.",
)
extract_stale_timeout_minutes: int = Field(
default=60,
description="Reset acts stuck in 'extracting' for longer than this (minutes).",
)
community_resolution: float = Field(default=1.0, description="Leiden resolution parameter for community detection.")
community_min_size: int = Field(default=2, description="Minimum community size kept after detection.")
[docs]
class LalandreConfig(BaseModel):
"""Main configuration class."""
database: DatabaseConfig = Field(default_factory=DatabaseConfig)
vector: VectorConfig = Field(default_factory=VectorConfig)
graph: GraphConfig = Field(default_factory=GraphConfig)
token_limits: TokenLimitsConfig = Field(default_factory=TokenLimitsConfig)
embedding: EmbeddingConfig = Field(default_factory=EmbeddingConfig)
embedding_presets: list[EmbeddingPresetConfig] = Field(default_factory=_default_embedding_presets)
search: SearchConfig = Field(default_factory=SearchConfig)
generation: GenerationConfig = Field(default_factory=GenerationConfig)
# Required from app_config.yaml; no hardcoded defaults in code.
chunking: ChunkingConfig
context_budget: ContextBudgetConfig = Field(default_factory=ContextBudgetConfig)
gateway: GatewayConfig = Field(default_factory=GatewayConfig)
extraction: ExtractionConfig = Field(default_factory=ExtractionConfig)
workers: WorkersConfig = Field(default_factory=WorkersConfig)
models_cache_dir: Optional[str] = Field(default=None)
[docs]
def enabled_embedding_presets(self) -> list[EmbeddingPresetConfig]:
"""Return the embedding presets that are enabled for runtime use."""
return [preset for preset in self.embedding_presets if preset.enabled]
[docs]
def indexing_enabled_embedding_presets(self) -> list[EmbeddingPresetConfig]:
"""Return the embedding presets that are enabled for indexing workflows."""
return [preset for preset in self.embedding_presets if preset.indexing_enabled]
[docs]
def get_embedding_preset(self, preset_id: str | None) -> Optional[EmbeddingPresetConfig]:
"""Return one embedding preset by ID, or ``None`` when it is unknown."""
if preset_id is None:
return None
for preset in self.embedding_presets:
if preset.preset_id == preset_id:
return preset
return None
[docs]
def get_default_embedding_preset(self) -> EmbeddingPresetConfig:
"""Return the default enabled embedding preset for query-time operations."""
preset = self.get_embedding_preset(self.search.default_embedding_preset)
if preset is not None and preset.enabled:
return preset
for enabled_preset in self.enabled_embedding_presets():
return enabled_preset
raise ValueError("No enabled embedding preset configured")
@staticmethod
def _field_allows_none(model: BaseModel, field_name: str) -> bool:
field_info = model.__class__.model_fields[field_name]
if field_info.default is None:
return True
annotation = field_info.annotation
if annotation is None:
return False
return type(None) in get_args(annotation)
@staticmethod
def _coerce_section_updates(
model: BaseModel,
raw_updates: Any,
*,
section_name: str,
) -> dict[str, Any]:
if not isinstance(raw_updates, dict):
return {}
allowed_keys = set(model.__class__.model_fields.keys())
ignored_legacy_keys = {
"vector": {"grpc_port", "prefer_grpc"},
}.get(section_name, set())
updates_map: dict[str, Any] = {}
unknown_keys: list[str] = []
raw_updates_dict = cast(dict[Any, Any], raw_updates)
for raw_key, raw_value in raw_updates_dict.items():
if not isinstance(raw_key, str):
continue
if raw_key not in allowed_keys:
if raw_key in ignored_legacy_keys:
continue
unknown_keys.append(raw_key)
continue
if raw_value is None:
if LalandreConfig._field_allows_none(model, raw_key):
updates_map[raw_key] = None
continue
updates_map[raw_key] = raw_value
if unknown_keys:
unknown_keys_display = ", ".join(sorted(set(unknown_keys)))
raise ValueError(f"Unknown keys in app config section '{section_name}': {unknown_keys_display}")
return updates_map
@classmethod
def _apply_app_config_overrides(cls, config: "LalandreConfig", raw_config: dict[str, Any]) -> None:
allowed_sections = {
"database",
"vector",
"graph",
"token_limits",
"embedding",
"embedding_presets",
"search",
"generation",
"chunking",
"context_budget",
"gateway",
"extraction",
"workers",
"models_cache_dir",
}
unknown_sections = [section_name for section_name in raw_config.keys() if section_name not in allowed_sections]
if unknown_sections:
unknown_sections_display = ", ".join(sorted(unknown_sections))
raise ValueError(f"Unknown top-level sections in app config: {unknown_sections_display}")
section_map: tuple[tuple[str, BaseModel, str], ...] = (
("database", config.database, "database"),
("vector", config.vector, "vector"),
("graph", config.graph, "graph"),
("token_limits", config.token_limits, "token_limits"),
("embedding", config.embedding, "embedding"),
("search", config.search, "search"),
("generation", config.generation, "generation"),
("chunking", config.chunking, "chunking"),
("context_budget", config.context_budget, "context_budget"),
("gateway", config.gateway, "gateway"),
("extraction", config.extraction, "extraction"),
("workers", config.workers, "workers"),
)
for section_name, model, attr_name in section_map:
updates = cls._coerce_section_updates(
model,
raw_config.get(section_name),
section_name=section_name,
)
if not updates:
continue
merged = model.model_dump()
merged.update(updates)
setattr(config, attr_name, model.__class__.model_validate(merged))
raw_embedding_presets = raw_config.get("embedding_presets")
if raw_embedding_presets is not None:
if not isinstance(raw_embedding_presets, list):
raise ValueError("App config section 'embedding_presets' must be a list")
config.embedding_presets = [EmbeddingPresetConfig.model_validate(item) for item in raw_embedding_presets]
models_cache_dir = raw_config.get("models_cache_dir")
if isinstance(models_cache_dir, str) and models_cache_dir.strip():
config.models_cache_dir = models_cache_dir.strip()
@classmethod
def _load_app_config_file(cls, config_file: Path, *, required: bool) -> dict[str, Any]:
if not config_file.exists():
if required:
raise FileNotFoundError(f"APP_CONFIG_FILE points to a missing file: {config_file}")
return {}
content_raw = yaml.safe_load(config_file.read_text(encoding="utf-8"))
if content_raw is None:
return {}
if not isinstance(content_raw, dict):
raise ValueError(f"App config file must contain a mapping at root: {config_file}")
content: dict[str, Any] = {}
content_raw_dict = cast(dict[Any, Any], content_raw)
for raw_key, raw_value in content_raw_dict.items():
if isinstance(raw_key, str):
content[raw_key] = raw_value
return content
@staticmethod
def _resolve_app_config_file(raw_path: str) -> Path:
candidate = Path(raw_path).expanduser()
if candidate.is_absolute():
return candidate
if candidate.exists():
return candidate
module_path = Path(__file__).resolve()
search_roots: list[Path] = [Path.cwd(), Path("/app")]
# Detect project root reliably instead of assuming a fixed parent depth.
for parent in module_path.parents:
if (parent / "docker-compose.yaml").exists() and (parent / "packages").exists():
search_roots.append(parent)
break
if (parent / ".git").exists():
search_roots.append(parent)
break
search_roots.append(module_path.parent)
for root in search_roots:
resolved = (root / candidate).resolve()
if resolved.exists():
return resolved
return candidate
@classmethod
def _load_base_config_from_settings(cls, settings: "EnvSettings") -> "LalandreConfig":
app_config_file = (
cls._resolve_app_config_file(settings.APP_CONFIG_FILE)
if settings.APP_CONFIG_FILE
else DEFAULT_APP_CONFIG_FILE
)
app_file_required = settings.APP_CONFIG_FILE is not None
app_raw = cls._load_app_config_file(app_config_file, required=app_file_required)
raw_chunking = app_raw.get("chunking")
if not isinstance(raw_chunking, dict):
raise ValueError("App config must define a 'chunking' section with min_chunk_size and max_chunk_size.")
config = cls(chunking=ChunkingConfig.model_validate(raw_chunking))
cls._apply_app_config_overrides(config, app_raw)
override_file_list = [
raw_path.strip() for raw_path in (settings.APP_CONFIG_OVERRIDE_FILE or "").split(",") if raw_path.strip()
]
for override_path in override_file_list:
override_file = cls._resolve_app_config_file(override_path)
override_raw = cls._load_app_config_file(override_file, required=False)
cls._apply_app_config_overrides(config, override_raw)
return config
@staticmethod
def _apply_connection_env_overrides(config: "LalandreConfig", settings: "EnvSettings") -> None:
if settings.DB_PASSWORD is not None:
config.database.password = settings.DB_PASSWORD
if settings.QDRANT_API_KEY is not None:
config.vector.api_key = settings.QDRANT_API_KEY
if settings.NEO4J_PASSWORD is not None:
config.graph.password = settings.NEO4J_PASSWORD
if settings.LLM_API_KEY is not None:
config.generation.api_key = settings.LLM_API_KEY
if settings.SEARCH_INTENT_PARSER_API_KEY is not None:
config.search.intent_parser_api_key = settings.SEARCH_INTENT_PARSER_API_KEY
if settings.GATEWAY_ALLOWED_ORIGINS is not None:
config.gateway.allowed_origins = [
raw_origin.strip() for raw_origin in settings.GATEWAY_ALLOWED_ORIGINS.split(",") if raw_origin.strip()
]
[docs]
@classmethod
def from_env(cls) -> "LalandreConfig":
"""Load configuration from environment variables"""
settings = get_env_settings()
config = cls._load_base_config_from_settings(settings)
cls._apply_connection_env_overrides(config, settings)
return config
_env_settings: Optional[EnvSettings] = None
_config: Optional[LalandreConfig] = None
[docs]
def get_env_settings() -> EnvSettings:
"""Get or create the environment settings instance."""
global _env_settings
if _env_settings is None:
_env_settings = EnvSettings()
return _env_settings
[docs]
def get_config() -> LalandreConfig:
"""Get or create global configuration instance."""
global _config
if _config is None:
_config = LalandreConfig.from_env()
return _config
[docs]
def reset_config() -> None:
"""Invalidate the config singleton so the next call to get_config() reloads from disk."""
global _config
_config = None
[docs]
def get_postgres_connection_string() -> str:
"""Get PostgreSQL connection string."""
return get_config().database.connection_string
[docs]
def get_gateway_config() -> GatewayConfig:
"""Get API Gateway configuration with required values enforced."""
gateway = get_config().gateway
missing: list[str] = []
if not gateway.rag_service_url:
missing.append("gateway.rag_service_url")
if not gateway.embedding_service_url:
missing.append("gateway.embedding_service_url")
if not gateway.allowed_origins:
missing.append("gateway.allowed_origins")
if gateway.job_ttl_seconds is None:
missing.append("gateway.job_ttl_seconds")
if gateway.job_chunk_min_content_length is None:
missing.append("gateway.job_chunk_min_content_length")
if gateway.job_embed_batch_size is None:
missing.append("gateway.job_embed_batch_size")
if gateway.job_extract_min_confidence is None:
missing.append("gateway.job_extract_min_confidence")
if gateway.job_extract_skip_existing_default is None:
missing.append("gateway.job_extract_skip_existing_default")
if gateway.rag_proxy_timeout_seconds is None:
missing.append("gateway.rag_proxy_timeout_seconds")
if not gateway.redis_host:
missing.append("gateway.redis_host")
if gateway.redis_port is None:
missing.append("gateway.redis_port")
if missing:
raise RuntimeError("Missing required API Gateway settings: " + ", ".join(missing))
return gateway