Source code for api_gateway.routers.admin_pipeline

"""Admin endpoints for pipeline reset and rebuild orchestration."""

import json
from typing import Any, Dict

from api_gateway.auth import require_admin
from api_gateway.job_service import JobResponse, submit_job
from fastapi import APIRouter, Depends, HTTPException, Request
from fastapi.concurrency import run_in_threadpool
from lalandre_core.config import get_config, get_gateway_config
from lalandre_core.embedding_presets import list_embedding_presets
from lalandre_core.runtime_values import require_int
from lalandre_db_neo4j import Neo4jRepository
from lalandre_db_postgres import PostgresRepository
from lalandre_db_qdrant import QdrantRepository
from pydantic import BaseModel

router = APIRouter(prefix="/api/v1", tags=["admin"])

gateway = get_gateway_config()
JOB_CHUNK_MIN_CONTENT_LENGTH = require_int(gateway.job_chunk_min_content_length, "JOB_CHUNK_MIN_CONTENT_LENGTH")

PIPELINE_JOB_TYPES = {
    "chunk_all",
    "chunk_act",
    "embed_all",
    "embed_act",
    "extract_all",
    "extract_act",
    "summarize_all",
    "summarize_act",
    "build_communities",
}


[docs] class PipelineRunningJob(BaseModel): """Running pipeline job returned by the admin inspection endpoint.""" job_id: str job_type: str status: str
[docs] class PipelineFullResetResponse(BaseModel): """Response payload for the full pipeline reset endpoint.""" job_id: str submitted_at: str presets: list[str] queues_cleared: Dict[str, int] deleted: Dict[str, Dict[str, int]]
def _get_redis_connection(request: Request) -> Any: redis_conn = getattr(request.app.state, "redis", None) if redis_conn is None: raise HTTPException(status_code=503, detail="Redis connection not initialized") return redis_conn async def _find_running_pipeline_jobs(redis_conn: Any) -> list[PipelineRunningJob]: running_jobs: list[PipelineRunningJob] = [] async for key in redis_conn.scan_iter(match="job:*"): job_data = await redis_conn.hgetall(key) status = str(job_data.get("status") or "") job_type = str(job_data.get("type") or "") if status != "running" or job_type not in PIPELINE_JOB_TYPES: continue running_jobs.append( PipelineRunningJob( job_id=str(key).split("job:", 1)[-1], job_type=job_type, status=status, ) ) return running_jobs async def _clear_queue(redis_conn: Any, queue_name: str) -> tuple[int, int]: raw_messages = await redis_conn.lrange(queue_name, 0, -1) job_keys: list[str] = [] for raw_message in raw_messages: try: payload = json.loads(raw_message) except Exception: continue if not isinstance(payload, dict): continue job_id = payload.get("job_id") if isinstance(job_id, str) and job_id: job_keys.append(f"job:{job_id}") if raw_messages: await redis_conn.delete(queue_name) if job_keys: await redis_conn.delete(*job_keys) return len(raw_messages), len(job_keys) async def _discover_embed_queues(redis_conn: Any) -> list[str]: queue_names: list[str] = [] async for key in redis_conn.scan_iter(match="embed_jobs__*"): queue_names.append(str(key)) return queue_names def _reset_postgres_store() -> Dict[str, int]: config = get_config() repo = PostgresRepository(config.database.connection_string) try: with repo.get_session() as session: try: deleted = { "embedding_state": PostgresRepository.clear_embedding_states(session), "act_relations": PostgresRepository.clear_act_relations(session), "act_summaries": PostgresRepository.clear_act_summaries(session), "chunks": PostgresRepository.clear_chunks(session), "acts_reset": PostgresRepository.reset_all_extraction_statuses(session), } session.commit() return deleted except Exception: session.rollback() raise finally: repo.close() def _reset_neo4j_store() -> Dict[str, int]: repo = Neo4jRepository(get_config().graph) try: return { "act_relationships": repo.clear_act_relationships(), "communities": repo.clear_communities(), } finally: repo.close() def _reset_qdrant_store() -> Dict[str, int]: config = get_config() deleted: Dict[str, int] = {} base_chunk_collection_name = config.vector.collection_chunks base_act_collection_name = config.vector.collection_acts if not base_chunk_collection_name: raise ValueError("vector.collection_chunks must be set in app config") if not base_act_collection_name: raise ValueError("vector.collection_acts must be set in app config") base_collections: tuple[str, str] = ( base_chunk_collection_name, base_act_collection_name, ) for preset in list_embedding_presets(): for base_collection in base_collections: collection_name = QdrantRepository.make_collection_name( base_collection, preset.model_name, preset.vector_size, ) repo = QdrantRepository( collection_name=collection_name, vector_size=preset.vector_size, ) try: if not repo.collection_exists(): deleted[collection_name] = 0 continue collection_info = repo.client.get_collection(collection_name) deleted[collection_name] = int(getattr(collection_info, "points_count", 0) or 0) repo.client.delete_collection(collection_name=collection_name) finally: repo.close() return deleted
[docs] @router.post( "/admin/pipeline/full-reset", response_model=PipelineFullResetResponse, dependencies=[Depends(require_admin)], ) async def full_reset_pipeline( request: Request, redis_conn: Any = Depends(_get_redis_connection), ) -> PipelineFullResetResponse: """Purge the vector/extraction pipeline state, then re-enqueue a clean chunk_all.""" del request running_jobs = await _find_running_pipeline_jobs(redis_conn) if running_jobs: raise HTTPException( status_code=409, detail={ "message": "Pipeline reset refused while jobs are still running", "running_jobs": [job.model_dump() for job in running_jobs], }, ) presets = list_embedding_presets() queue_names = list( dict.fromkeys( [ "chunk_jobs", *[preset.resolved_queue_name() for preset in presets], *(await _discover_embed_queues(redis_conn)), "extract_jobs", ] ) ) queues_cleared: Dict[str, int] = {} deleted_job_hashes = 0 for queue_name in queue_names: cleared_count, deleted_hashes = await _clear_queue(redis_conn, queue_name) queues_cleared[queue_name] = cleared_count deleted_job_hashes += deleted_hashes postgres_deleted = await run_in_threadpool(_reset_postgres_store) neo4j_deleted = await run_in_threadpool(_reset_neo4j_store) qdrant_deleted = await run_in_threadpool(_reset_qdrant_store) rebuild_job: JobResponse = await submit_job( redis_conn=redis_conn, job_type="chunk_all", channel="chunk_jobs", params={"min_content_length": JOB_CHUNK_MIN_CONTENT_LENGTH}, message="Chunking job queued after full pipeline reset", ) return PipelineFullResetResponse( job_id=rebuild_job.job_id, submitted_at=rebuild_job.submitted_at, presets=[preset.preset_id for preset in presets], queues_cleared=queues_cleared, deleted={ "redis": {"job_hashes": deleted_job_hashes}, "postgres": postgres_deleted, "neo4j": neo4j_deleted, "qdrant": qdrant_deleted, }, )