"""Pydantic and dataclass models used by the planning runtime."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Dict, List, Literal, Optional, Union
from pydantic import BaseModel, Field, field_validator, model_validator
[docs]
class ComplementaryQueryOutput(BaseModel):
"""Structured follow-up retrieval produced by the planner."""
query: str = Field(min_length=1)
level_hint: Optional[str] = None
[docs]
@field_validator("query", mode="before")
@classmethod
def clean_query(cls, value: Any) -> str:
"""Normalize and validate a complementary query string."""
if isinstance(value, str) and value.strip():
return value.strip()
raise ValueError("Complementary query cannot be empty")
[docs]
class RoutingIntentOutput(BaseModel):
"""Structured output returned by the routing agent."""
model_config = {"extra": "ignore"}
profile: Literal[
"contextual_default",
"citation_precision",
"relationship_focus",
"global_overview",
]
granularity: Optional[Literal["subdivisions", "chunks", "all", "auto"]] = None
top_k: int = Field(default=10, ge=1, le=40)
include_relations_hint: bool = False
execution_mode: Literal["hybrid", "global"] = "hybrid"
rationale: str = "LLM parser selected retrieval profile."
use_graph: bool = False
normalized_query: Optional[str] = None
intent_label: Optional[str] = None
confidence: Optional[float] = Field(default=None, ge=0.0, le=1.0)
output_validation_retries: int = Field(default=0, ge=0)
[docs]
@field_validator("granularity", mode="before")
@classmethod
def normalize_granularity(cls, value: Any) -> Optional[str]:
"""Normalize planner granularity hints to supported values."""
if value is None:
return None
if isinstance(value, str):
normalized = value.strip().lower()
if normalized in {"subdivisions", "chunks", "all"}:
return normalized
return None
[docs]
@field_validator("normalized_query", "intent_label", mode="before")
@classmethod
def clean_optional_text(cls, value: Any) -> Optional[str]:
"""Trim optional text fields and coerce blanks to ``None``."""
if isinstance(value, str) and value.strip():
return value.strip()
return None
[docs]
@field_validator("rationale", mode="before")
@classmethod
def clean_rationale(cls, value: Any) -> str:
"""Normalize routing rationales and inject a default fallback."""
if isinstance(value, str) and value.strip():
return value.strip()
return "LLM parser selected retrieval profile."
[docs]
@model_validator(mode="after")
def apply_defaults(self) -> "RoutingIntentOutput":
"""Fill derived fields that depend on other parsed values."""
if not self.intent_label:
self.intent_label = self.profile
return self
[docs]
class DecompositionOutput(BaseModel):
"""Structured output returned by the decomposition agent."""
model_config = {"extra": "ignore"}
sub_questions: List[str] = Field(default_factory=list, max_length=3)
synthesize: bool = False
output_validation_retries: int = Field(default=0, ge=0)
[docs]
@field_validator("sub_questions", mode="before")
@classmethod
def clean_sub_questions(cls, value: Any) -> List[str]:
"""Normalize decomposition output and cap it to three questions."""
if not isinstance(value, list):
return []
cleaned: List[str] = []
for item in value:
if isinstance(item, str) and item.strip():
cleaned.append(item.strip())
return cleaned[:3]
[docs]
class RetrievalPlannerOutput(BaseModel):
"""Structured retrieval plan returned by the planner agent."""
model_config = {"extra": "ignore"}
primary_query: str = ""
intent_class: Literal["conversational", "documentary"] = "documentary"
skip_retrieval: bool = False
needs_complementary: bool = False
complementary_queries: List[ComplementaryQueryOutput] = Field(default_factory=list)
needs_compression: bool = False
clarification_question: Optional[str] = None
strict_grounding_requested: bool = False
rationale: str = ""
output_validation_retries: int = Field(default=0, ge=0)
[docs]
@field_validator("primary_query", mode="before")
@classmethod
def clean_primary_query(cls, value: Any) -> str:
"""Normalize the planner primary query field."""
if isinstance(value, str):
return value.strip()
return ""
[docs]
@field_validator("intent_class", mode="before")
@classmethod
def normalize_intent_class(cls, value: Any) -> str:
"""Restrict the planner intent class to supported values."""
if isinstance(value, str):
normalized = value.strip().lower()
if normalized in {"conversational", "documentary"}:
return normalized
return "documentary"
[docs]
@field_validator("clarification_question", mode="before")
@classmethod
def clean_clarification_question(cls, value: Any) -> Optional[str]:
"""Normalize optional clarification prompts."""
if isinstance(value, str) and value.strip():
return value.strip()
return None
[docs]
@field_validator("rationale", mode="before")
@classmethod
def clean_rationale(cls, value: Any) -> str:
"""Normalize planner rationales to a stripped string."""
if isinstance(value, str) and value.strip():
return value.strip()
return ""
[docs]
@model_validator(mode="after")
def cap_complementary_queries(self) -> "RetrievalPlannerOutput":
"""Apply post-parse planner constraints and derived defaults."""
self.skip_retrieval = self.intent_class == "conversational"
self.complementary_queries = self.complementary_queries[:2]
if self.skip_retrieval:
self.primary_query = ""
self.needs_complementary = False
self.complementary_queries = []
self.needs_compression = False
elif not self.primary_query:
raise ValueError("primary_query is required when retrieval is needed")
return self
[docs]
class RetrievalRefinementOutput(BaseModel):
"""Structured refined query returned by the corrective agent."""
model_config = {"extra": "ignore"}
refined_query: str = Field(min_length=1)
rationale: str = ""
output_validation_retries: int = Field(default=0, ge=0)
[docs]
@field_validator("refined_query", mode="before")
@classmethod
def clean_refined_query(cls, value: Any) -> str:
"""Normalize the refined retrieval query proposed by the agent."""
if isinstance(value, str) and value.strip():
return value.strip()
raise ValueError("refined_query is required")
[docs]
@field_validator("rationale", mode="before")
@classmethod
def clean_refine_rationale(cls, value: Any) -> str:
"""Normalize the refinement rationale text."""
if isinstance(value, str) and value.strip():
return value.strip()
return ""
[docs]
class RetrievalEvaluationOutput(BaseModel):
"""Structured sufficiency evaluation returned by the CRAG evaluator."""
model_config = {"extra": "ignore"}
status: Literal["SUFFICIENT", "PARTIAL", "INSUFFICIENT"] = "SUFFICIENT"
gap: Optional[str] = None
output_validation_retries: int = Field(default=0, ge=0)
[docs]
@field_validator("gap", mode="before")
@classmethod
def clean_gap(cls, value: Any) -> Optional[str]:
"""Normalize optional evidence-gap descriptions."""
if isinstance(value, str) and value.strip() and value.strip().lower() not in {"null", "none"}:
return value.strip()
return None
[docs]
class GraphSupportDecision(BaseModel):
"""Structured graph support decision reserved for future graph/Cypher routing."""
use_graph: bool = False
use_cypher: bool = False
rationale: str = ""
[docs]
class PhaseTraceEvent(BaseModel):
"""Single trace event emitted by the planning runtime."""
phase: str
status: str
label: str
detail: Optional[str] = None
count: Optional[int] = None
duration_ms: Optional[float] = None
meta: Dict[str, Any] = Field(default_factory=dict)
tool: Optional[str] = None
[docs]
@dataclass
class PlanningEarlyExit:
"""Signals that the planning pipeline hit a terminal condition."""
kind: str
routing_ms: float
planner_ms: float
retrieval_ms: float = 0.0
intent_class: Literal["conversational", "documentary"] = "documentary"
clarification_question: Optional[str] = None
strict_grounding_requested: bool = False
agentic_rationale: str = ""
agentic_meta: Dict[str, Any] = field(default_factory=dict)
best_score: float = 0.0
gate_threshold: float = 0.0
candidates_dropped: int = 0
[docs]
@dataclass
class PlanningContext:
"""Artifacts produced by the planning graph and consumed by generation."""
context_slices: List[Any]
graph_fetch: Optional[Any]
retrieval_plan: Any
agentic_plan: Optional[Any]
agentic_meta: Dict[str, Any]
retrieval_query: str
effective_top_k: int
effective_granularity: Optional[str]
effective_include_relations: bool
community_meta: Dict[str, Any]
routing_ms: float
planner_ms: float
retrieval_ms: float
context_enrichment_ms: float
graph_enrichment_ms: float
complementary_ms: float
compression_ms: float
retrieval_stats: Any = None
PlanningResult = Union[PlanningEarlyExit, PlanningContext]
[docs]
@dataclass
class PlanningGraphState:
"""Mutable planning state shared across graph nodes."""
question: str
top_k: int
score_threshold: Optional[float]
filters: Optional[Dict[str, Any]]
include_relations: bool
include_subjects: bool
collections: Optional[List[str]]
granularity: Optional[str]
graph_depth: Optional[int]
use_graph: Optional[bool]
embedding_preset: Optional[str]
planner_run_id: str
planner_path: List[str] = field(default_factory=list)
trace_events: List[PhaseTraceEvent] = field(default_factory=list)
output_validation_retries: int = 0
decompose_ms: float = 0.0
routing_ms: float = 0.0
planner_ms: float = 0.0
retrieval_ms: float = 0.0
context_enrichment_ms: float = 0.0
graph_enrichment_ms: float = 0.0
complementary_ms: float = 0.0
compression_ms: float = 0.0
effective_top_k: int = 0
effective_granularity: Optional[str] = None
effective_include_relations: bool = False
decomposition_result: Any = None
retrieval_plan: Optional[Any] = None
agentic_plan: Optional[Any] = None
retrieval_query: Optional[str] = None
retrieval_results: List[Any] = field(default_factory=list)
context_slices: List[Any] = field(default_factory=list)
community_meta: Dict[str, Any] = field(default_factory=dict)
graph_fetch: Optional[Any] = None
retrieval_depth: Optional[str] = None
planning_future: Optional[Any] = None
graph_prefetch_future: Optional[Any] = None
retrieval_stats: Any = None
agentic_meta: Dict[str, Any] = field(default_factory=dict)
early_exit: Optional[PlanningEarlyExit] = None