Source code for lalandre_rag.agentic.models

"""Pydantic and dataclass models used by the planning runtime."""

from __future__ import annotations

from dataclasses import dataclass, field
from typing import Any, Dict, List, Literal, Optional, Union

from pydantic import BaseModel, Field, field_validator, model_validator


[docs] class ComplementaryQueryOutput(BaseModel): """Structured follow-up retrieval produced by the planner.""" query: str = Field(min_length=1) level_hint: Optional[str] = None
[docs] @field_validator("query", mode="before") @classmethod def clean_query(cls, value: Any) -> str: """Normalize and validate a complementary query string.""" if isinstance(value, str) and value.strip(): return value.strip() raise ValueError("Complementary query cannot be empty")
[docs] class RoutingIntentOutput(BaseModel): """Structured output returned by the routing agent.""" model_config = {"extra": "ignore"} profile: Literal[ "contextual_default", "citation_precision", "relationship_focus", "global_overview", ] granularity: Optional[Literal["subdivisions", "chunks", "all", "auto"]] = None top_k: int = Field(default=10, ge=1, le=40) include_relations_hint: bool = False execution_mode: Literal["hybrid", "global"] = "hybrid" rationale: str = "LLM parser selected retrieval profile." use_graph: bool = False normalized_query: Optional[str] = None intent_label: Optional[str] = None confidence: Optional[float] = Field(default=None, ge=0.0, le=1.0) output_validation_retries: int = Field(default=0, ge=0)
[docs] @field_validator("granularity", mode="before") @classmethod def normalize_granularity(cls, value: Any) -> Optional[str]: """Normalize planner granularity hints to supported values.""" if value is None: return None if isinstance(value, str): normalized = value.strip().lower() if normalized in {"subdivisions", "chunks", "all"}: return normalized return None
[docs] @field_validator("normalized_query", "intent_label", mode="before") @classmethod def clean_optional_text(cls, value: Any) -> Optional[str]: """Trim optional text fields and coerce blanks to ``None``.""" if isinstance(value, str) and value.strip(): return value.strip() return None
[docs] @field_validator("rationale", mode="before") @classmethod def clean_rationale(cls, value: Any) -> str: """Normalize routing rationales and inject a default fallback.""" if isinstance(value, str) and value.strip(): return value.strip() return "LLM parser selected retrieval profile."
[docs] @model_validator(mode="after") def apply_defaults(self) -> "RoutingIntentOutput": """Fill derived fields that depend on other parsed values.""" if not self.intent_label: self.intent_label = self.profile return self
[docs] class DecompositionOutput(BaseModel): """Structured output returned by the decomposition agent.""" model_config = {"extra": "ignore"} sub_questions: List[str] = Field(default_factory=list, max_length=3) synthesize: bool = False output_validation_retries: int = Field(default=0, ge=0)
[docs] @field_validator("sub_questions", mode="before") @classmethod def clean_sub_questions(cls, value: Any) -> List[str]: """Normalize decomposition output and cap it to three questions.""" if not isinstance(value, list): return [] cleaned: List[str] = [] for item in value: if isinstance(item, str) and item.strip(): cleaned.append(item.strip()) return cleaned[:3]
[docs] class RetrievalPlannerOutput(BaseModel): """Structured retrieval plan returned by the planner agent.""" model_config = {"extra": "ignore"} primary_query: str = "" intent_class: Literal["conversational", "documentary"] = "documentary" skip_retrieval: bool = False needs_complementary: bool = False complementary_queries: List[ComplementaryQueryOutput] = Field(default_factory=list) needs_compression: bool = False clarification_question: Optional[str] = None strict_grounding_requested: bool = False rationale: str = "" output_validation_retries: int = Field(default=0, ge=0)
[docs] @field_validator("primary_query", mode="before") @classmethod def clean_primary_query(cls, value: Any) -> str: """Normalize the planner primary query field.""" if isinstance(value, str): return value.strip() return ""
[docs] @field_validator("intent_class", mode="before") @classmethod def normalize_intent_class(cls, value: Any) -> str: """Restrict the planner intent class to supported values.""" if isinstance(value, str): normalized = value.strip().lower() if normalized in {"conversational", "documentary"}: return normalized return "documentary"
[docs] @field_validator("clarification_question", mode="before") @classmethod def clean_clarification_question(cls, value: Any) -> Optional[str]: """Normalize optional clarification prompts.""" if isinstance(value, str) and value.strip(): return value.strip() return None
[docs] @field_validator("rationale", mode="before") @classmethod def clean_rationale(cls, value: Any) -> str: """Normalize planner rationales to a stripped string.""" if isinstance(value, str) and value.strip(): return value.strip() return ""
[docs] @model_validator(mode="after") def cap_complementary_queries(self) -> "RetrievalPlannerOutput": """Apply post-parse planner constraints and derived defaults.""" self.skip_retrieval = self.intent_class == "conversational" self.complementary_queries = self.complementary_queries[:2] if self.skip_retrieval: self.primary_query = "" self.needs_complementary = False self.complementary_queries = [] self.needs_compression = False elif not self.primary_query: raise ValueError("primary_query is required when retrieval is needed") return self
[docs] class RetrievalRefinementOutput(BaseModel): """Structured refined query returned by the corrective agent.""" model_config = {"extra": "ignore"} refined_query: str = Field(min_length=1) rationale: str = "" output_validation_retries: int = Field(default=0, ge=0)
[docs] @field_validator("refined_query", mode="before") @classmethod def clean_refined_query(cls, value: Any) -> str: """Normalize the refined retrieval query proposed by the agent.""" if isinstance(value, str) and value.strip(): return value.strip() raise ValueError("refined_query is required")
[docs] @field_validator("rationale", mode="before") @classmethod def clean_refine_rationale(cls, value: Any) -> str: """Normalize the refinement rationale text.""" if isinstance(value, str) and value.strip(): return value.strip() return ""
[docs] class RetrievalEvaluationOutput(BaseModel): """Structured sufficiency evaluation returned by the CRAG evaluator.""" model_config = {"extra": "ignore"} status: Literal["SUFFICIENT", "PARTIAL", "INSUFFICIENT"] = "SUFFICIENT" gap: Optional[str] = None output_validation_retries: int = Field(default=0, ge=0)
[docs] @field_validator("gap", mode="before") @classmethod def clean_gap(cls, value: Any) -> Optional[str]: """Normalize optional evidence-gap descriptions.""" if isinstance(value, str) and value.strip() and value.strip().lower() not in {"null", "none"}: return value.strip() return None
[docs] class GraphSupportDecision(BaseModel): """Structured graph support decision reserved for future graph/Cypher routing.""" use_graph: bool = False use_cypher: bool = False rationale: str = ""
[docs] class PhaseTraceEvent(BaseModel): """Single trace event emitted by the planning runtime.""" phase: str status: str label: str detail: Optional[str] = None count: Optional[int] = None duration_ms: Optional[float] = None meta: Dict[str, Any] = Field(default_factory=dict) tool: Optional[str] = None
[docs] @dataclass class PlanningEarlyExit: """Signals that the planning pipeline hit a terminal condition.""" kind: str routing_ms: float planner_ms: float retrieval_ms: float = 0.0 intent_class: Literal["conversational", "documentary"] = "documentary" clarification_question: Optional[str] = None strict_grounding_requested: bool = False agentic_rationale: str = "" agentic_meta: Dict[str, Any] = field(default_factory=dict) best_score: float = 0.0 gate_threshold: float = 0.0 candidates_dropped: int = 0
[docs] @dataclass class PlanningContext: """Artifacts produced by the planning graph and consumed by generation.""" context_slices: List[Any] graph_fetch: Optional[Any] retrieval_plan: Any agentic_plan: Optional[Any] agentic_meta: Dict[str, Any] retrieval_query: str effective_top_k: int effective_granularity: Optional[str] effective_include_relations: bool community_meta: Dict[str, Any] routing_ms: float planner_ms: float retrieval_ms: float context_enrichment_ms: float graph_enrichment_ms: float complementary_ms: float compression_ms: float retrieval_stats: Any = None
PlanningResult = Union[PlanningEarlyExit, PlanningContext]
[docs] @dataclass class PlanningGraphState: """Mutable planning state shared across graph nodes.""" question: str top_k: int score_threshold: Optional[float] filters: Optional[Dict[str, Any]] include_relations: bool include_subjects: bool collections: Optional[List[str]] granularity: Optional[str] graph_depth: Optional[int] use_graph: Optional[bool] embedding_preset: Optional[str] planner_run_id: str planner_path: List[str] = field(default_factory=list) trace_events: List[PhaseTraceEvent] = field(default_factory=list) output_validation_retries: int = 0 decompose_ms: float = 0.0 routing_ms: float = 0.0 planner_ms: float = 0.0 retrieval_ms: float = 0.0 context_enrichment_ms: float = 0.0 graph_enrichment_ms: float = 0.0 complementary_ms: float = 0.0 compression_ms: float = 0.0 effective_top_k: int = 0 effective_granularity: Optional[str] = None effective_include_relations: bool = False decomposition_result: Any = None retrieval_plan: Optional[Any] = None agentic_plan: Optional[Any] = None retrieval_query: Optional[str] = None retrieval_results: List[Any] = field(default_factory=list) context_slices: List[Any] = field(default_factory=list) community_meta: Dict[str, Any] = field(default_factory=dict) graph_fetch: Optional[Any] = None retrieval_depth: Optional[str] = None planning_future: Optional[Any] = None graph_prefetch_future: Optional[Any] = None retrieval_stats: Any = None agentic_meta: Dict[str, Any] = field(default_factory=dict) early_exit: Optional[PlanningEarlyExit] = None