Source code for api_gateway.bootstrap

"""Bootstrap service: schedules data population jobs via Redis."""

import logging
from typing import Any, Optional, TypeAlias

from api_gateway.job_service import submit_job
from lalandre_core.config import get_gateway_config
from lalandre_core.runtime_values import require_int

gateway = get_gateway_config()
JOB_CHUNK_MIN_CONTENT_LENGTH = gateway.job_chunk_min_content_length

logger = logging.getLogger(__name__)

LOCK_KEY = "bootstrap:lock"
LOCK_TTL_SECONDS = require_int(gateway.bootstrap_lock_ttl_seconds, "BOOTSTRAP_LOCK_TTL_SECONDS")

RedisClient: TypeAlias = Any


[docs] async def bootstrap_system(redis_client: RedisClient) -> bool: """ Orchestrates the data population pipeline. We enqueue only chunk_all; workers chain chunk_act -> embed_act[preset*] + extract_act. Returns: bool: True if jobs were scheduled, False if locked or failed. """ # --- 1. Concurrency Control (Locking) --- # Attempt to acquire a short-lived lock. # NX=True ensures we only set the key if it does not already exist. is_acquired: Optional[bool] = await redis_client.set(LOCK_KEY, "1", nx=True, ex=LOCK_TTL_SECONDS) if not is_acquired: logger.info("Bootstrap skipped: Lock is active (job already running).") return False logger.info("Bootstrap: scheduling jobs...") # --- 3. Pipeline Execution --- try: # Step A: Dispatch chunk jobs. # Downstream stages are chained by workers per-act. await submit_job( redis_conn=redis_client, job_type="chunk_all", channel="chunk_jobs", params={ "min_content_length": JOB_CHUNK_MIN_CONTENT_LENGTH, }, ) logger.info( "Bootstrap scheduling complete - workers will process in pipeline mode " "(chunk_act -> embed_act[preset*] + extract_act)." ) return True except Exception: # --- 4. Error Handling & Rollback --- logger.exception("Bootstrap failed during execution.") # Critical: Release the lock immediately so retries aren't blocked for 10 mins. logger.info("Rollback: Releasing bootstrap lock...") await redis_client.delete(LOCK_KEY) return False