"""Admin endpoints for pipeline reset and rebuild orchestration."""
import json
from typing import Any, Dict
from api_gateway.auth import require_admin
from api_gateway.job_service import JobResponse, submit_job
from fastapi import APIRouter, Depends, HTTPException, Request
from fastapi.concurrency import run_in_threadpool
from lalandre_core.config import get_config, get_gateway_config
from lalandre_core.embedding_presets import list_embedding_presets
from lalandre_core.runtime_values import require_int
from lalandre_db_neo4j import Neo4jRepository
from lalandre_db_postgres import PostgresRepository
from lalandre_db_qdrant import QdrantRepository
from pydantic import BaseModel
router = APIRouter(prefix="/api/v1", tags=["admin"])
gateway = get_gateway_config()
JOB_CHUNK_MIN_CONTENT_LENGTH = require_int(gateway.job_chunk_min_content_length, "JOB_CHUNK_MIN_CONTENT_LENGTH")
PIPELINE_JOB_TYPES = {
"chunk_all",
"chunk_act",
"embed_all",
"embed_act",
"extract_all",
"extract_act",
"summarize_all",
"summarize_act",
"build_communities",
}
[docs]
class PipelineRunningJob(BaseModel):
"""Running pipeline job returned by the admin inspection endpoint."""
job_id: str
job_type: str
status: str
[docs]
class PipelineFullResetResponse(BaseModel):
"""Response payload for the full pipeline reset endpoint."""
job_id: str
submitted_at: str
presets: list[str]
queues_cleared: Dict[str, int]
deleted: Dict[str, Dict[str, int]]
def _get_redis_connection(request: Request) -> Any:
redis_conn = getattr(request.app.state, "redis", None)
if redis_conn is None:
raise HTTPException(status_code=503, detail="Redis connection not initialized")
return redis_conn
async def _find_running_pipeline_jobs(redis_conn: Any) -> list[PipelineRunningJob]:
running_jobs: list[PipelineRunningJob] = []
async for key in redis_conn.scan_iter(match="job:*"):
job_data = await redis_conn.hgetall(key)
status = str(job_data.get("status") or "")
job_type = str(job_data.get("type") or "")
if status != "running" or job_type not in PIPELINE_JOB_TYPES:
continue
running_jobs.append(
PipelineRunningJob(
job_id=str(key).split("job:", 1)[-1],
job_type=job_type,
status=status,
)
)
return running_jobs
async def _clear_queue(redis_conn: Any, queue_name: str) -> tuple[int, int]:
raw_messages = await redis_conn.lrange(queue_name, 0, -1)
job_keys: list[str] = []
for raw_message in raw_messages:
try:
payload = json.loads(raw_message)
except Exception:
continue
if not isinstance(payload, dict):
continue
job_id = payload.get("job_id")
if isinstance(job_id, str) and job_id:
job_keys.append(f"job:{job_id}")
if raw_messages:
await redis_conn.delete(queue_name)
if job_keys:
await redis_conn.delete(*job_keys)
return len(raw_messages), len(job_keys)
async def _discover_embed_queues(redis_conn: Any) -> list[str]:
queue_names: list[str] = []
async for key in redis_conn.scan_iter(match="embed_jobs__*"):
queue_names.append(str(key))
return queue_names
def _reset_postgres_store() -> Dict[str, int]:
config = get_config()
repo = PostgresRepository(config.database.connection_string)
try:
with repo.get_session() as session:
try:
deleted = {
"embedding_state": PostgresRepository.clear_embedding_states(session),
"act_relations": PostgresRepository.clear_act_relations(session),
"act_summaries": PostgresRepository.clear_act_summaries(session),
"chunks": PostgresRepository.clear_chunks(session),
"acts_reset": PostgresRepository.reset_all_extraction_statuses(session),
}
session.commit()
return deleted
except Exception:
session.rollback()
raise
finally:
repo.close()
def _reset_neo4j_store() -> Dict[str, int]:
repo = Neo4jRepository(get_config().graph)
try:
return {
"act_relationships": repo.clear_act_relationships(),
"communities": repo.clear_communities(),
}
finally:
repo.close()
def _reset_qdrant_store() -> Dict[str, int]:
config = get_config()
deleted: Dict[str, int] = {}
base_chunk_collection_name = config.vector.collection_chunks
base_act_collection_name = config.vector.collection_acts
if not base_chunk_collection_name:
raise ValueError("vector.collection_chunks must be set in app config")
if not base_act_collection_name:
raise ValueError("vector.collection_acts must be set in app config")
base_collections: tuple[str, str] = (
base_chunk_collection_name,
base_act_collection_name,
)
for preset in list_embedding_presets():
for base_collection in base_collections:
collection_name = QdrantRepository.make_collection_name(
base_collection,
preset.model_name,
preset.vector_size,
)
repo = QdrantRepository(
collection_name=collection_name,
vector_size=preset.vector_size,
)
try:
if not repo.collection_exists():
deleted[collection_name] = 0
continue
collection_info = repo.client.get_collection(collection_name)
deleted[collection_name] = int(getattr(collection_info, "points_count", 0) or 0)
repo.client.delete_collection(collection_name=collection_name)
finally:
repo.close()
return deleted
[docs]
@router.post(
"/admin/pipeline/full-reset",
response_model=PipelineFullResetResponse,
dependencies=[Depends(require_admin)],
)
async def full_reset_pipeline(
request: Request,
redis_conn: Any = Depends(_get_redis_connection),
) -> PipelineFullResetResponse:
"""Purge the vector/extraction pipeline state, then re-enqueue a clean chunk_all."""
del request
running_jobs = await _find_running_pipeline_jobs(redis_conn)
if running_jobs:
raise HTTPException(
status_code=409,
detail={
"message": "Pipeline reset refused while jobs are still running",
"running_jobs": [job.model_dump() for job in running_jobs],
},
)
presets = list_embedding_presets()
queue_names = list(
dict.fromkeys(
[
"chunk_jobs",
*[preset.resolved_queue_name() for preset in presets],
*(await _discover_embed_queues(redis_conn)),
"extract_jobs",
]
)
)
queues_cleared: Dict[str, int] = {}
deleted_job_hashes = 0
for queue_name in queue_names:
cleared_count, deleted_hashes = await _clear_queue(redis_conn, queue_name)
queues_cleared[queue_name] = cleared_count
deleted_job_hashes += deleted_hashes
postgres_deleted = await run_in_threadpool(_reset_postgres_store)
neo4j_deleted = await run_in_threadpool(_reset_neo4j_store)
qdrant_deleted = await run_in_threadpool(_reset_qdrant_store)
rebuild_job: JobResponse = await submit_job(
redis_conn=redis_conn,
job_type="chunk_all",
channel="chunk_jobs",
params={"min_content_length": JOB_CHUNK_MIN_CONTENT_LENGTH},
message="Chunking job queued after full pipeline reset",
)
return PipelineFullResetResponse(
job_id=rebuild_job.job_id,
submitted_at=rebuild_job.submitted_at,
presets=[preset.preset_id for preset in presets],
queues_cleared=queues_cleared,
deleted={
"redis": {"job_hashes": deleted_job_hashes},
"postgres": postgres_deleted,
"neo4j": neo4j_deleted,
"qdrant": qdrant_deleted,
},
)