Source code for api_gateway.bootstrap
"""Bootstrap service: schedules data population jobs via Redis."""
import logging
from typing import Any, Optional, TypeAlias
from api_gateway.job_service import submit_job
from lalandre_core.config import get_gateway_config
from lalandre_core.runtime_values import require_int
gateway = get_gateway_config()
JOB_CHUNK_MIN_CONTENT_LENGTH = gateway.job_chunk_min_content_length
logger = logging.getLogger(__name__)
LOCK_KEY = "bootstrap:lock"
LOCK_TTL_SECONDS = require_int(gateway.bootstrap_lock_ttl_seconds, "BOOTSTRAP_LOCK_TTL_SECONDS")
RedisClient: TypeAlias = Any
[docs]
async def bootstrap_system(redis_client: RedisClient) -> bool:
"""
Orchestrates the data population pipeline.
We enqueue only chunk_all; workers chain chunk_act -> embed_act[preset*] + extract_act.
Returns:
bool: True if jobs were scheduled, False if locked or failed.
"""
# --- 1. Concurrency Control (Locking) ---
# Attempt to acquire a short-lived lock.
# NX=True ensures we only set the key if it does not already exist.
is_acquired: Optional[bool] = await redis_client.set(LOCK_KEY, "1", nx=True, ex=LOCK_TTL_SECONDS)
if not is_acquired:
logger.info("Bootstrap skipped: Lock is active (job already running).")
return False
logger.info("Bootstrap: scheduling jobs...")
# --- 3. Pipeline Execution ---
try:
# Step A: Dispatch chunk jobs.
# Downstream stages are chained by workers per-act.
await submit_job(
redis_conn=redis_client,
job_type="chunk_all",
channel="chunk_jobs",
params={
"min_content_length": JOB_CHUNK_MIN_CONTENT_LENGTH,
},
)
logger.info(
"Bootstrap scheduling complete - workers will process in pipeline mode "
"(chunk_act -> embed_act[preset*] + extract_act)."
)
return True
except Exception:
# --- 4. Error Handling & Rollback ---
logger.exception("Bootstrap failed during execution.")
# Critical: Release the lock immediately so retries aren't blocked for 10 mins.
logger.info("Rollback: Releasing bootstrap lock...")
await redis_client.delete(LOCK_KEY)
return False