"""
Job Management Router
Handles async job submission and status tracking for chunk, embed, and extraction workers.
"""
import logging
from typing import Any, cast
from api_gateway.job_service import (
JobResponse,
JobStatus,
get_job_status,
require_valid_celex,
submit_job,
)
from api_gateway.rate_limit import LIMIT_JOBS, limiter
from fastapi import APIRouter, Depends, HTTPException, Request
from lalandre_core.config import get_gateway_config
from lalandre_core.embedding_presets import list_embedding_presets
from lalandre_core.runtime_values import (
require_bool,
require_float,
require_int,
)
gateway = get_gateway_config()
JOB_CHUNK_MIN_CONTENT_LENGTH = require_int(gateway.job_chunk_min_content_length, "JOB_CHUNK_MIN_CONTENT_LENGTH")
JOB_EMBED_BATCH_SIZE = require_int(gateway.job_embed_batch_size, "JOB_EMBED_BATCH_SIZE")
JOB_EXTRACT_MIN_CONFIDENCE = require_float(gateway.job_extract_min_confidence, "JOB_EXTRACT_MIN_CONFIDENCE")
JOB_EXTRACT_SKIP_EXISTING_DEFAULT = require_bool(
gateway.job_extract_skip_existing_default, "JOB_EXTRACT_SKIP_EXISTING_DEFAULT"
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1", tags=["jobs"])
RedisConnection = Any
def _get_redis_connection(request: Request) -> RedisConnection:
redis_conn = getattr(request.app.state, "redis", None)
if redis_conn is None:
raise HTTPException(status_code=503, detail="Redis connection not initialized")
return cast(RedisConnection, redis_conn)
async def _submit_embedding_jobs(
*,
redis_conn: RedisConnection,
job_type: str,
params: dict[str, Any],
message_prefix: str,
) -> JobResponse:
presets = list_embedding_presets(indexing_only=True)
if not presets:
raise HTTPException(status_code=503, detail="No indexing-enabled embedding presets are configured")
job_ids: list[str] = []
preset_ids: list[str] = []
submitted_at: str | None = None
for preset in presets:
response = await submit_job(
redis_conn,
job_type=job_type,
channel=preset.resolved_queue_name(),
params=params,
message="",
)
if submitted_at is None:
submitted_at = response.submitted_at
job_ids.append(response.job_id)
preset_ids.append(preset.preset_id)
return JobResponse(
job_id=job_ids[0],
job_ids=job_ids,
presets=preset_ids,
status="queued",
message=f"{message_prefix}: {', '.join(preset_ids)}",
submitted_at=submitted_at or "",
)
# =============================================================================
# CHUNKING ENDPOINTS
# =============================================================================
[docs]
@router.post("/chunk/all", response_model=JobResponse)
@limiter.limit(LIMIT_JOBS)
async def chunk_all(
request: Request,
min_content_length: int = JOB_CHUNK_MIN_CONTENT_LENGTH,
redis_conn: RedisConnection = Depends(_get_redis_connection),
):
"""Triggers chunking for all downloaded acts."""
return await submit_job(
redis_conn,
job_type="chunk_all",
channel="chunk_jobs",
params={"min_content_length": min_content_length},
message="Chunking job queued for all acts",
)
[docs]
@router.post("/chunk/{celex}", response_model=JobResponse)
@limiter.limit(LIMIT_JOBS)
async def chunk_act(
request: Request,
celex: str,
force: bool = False,
redis_conn: RedisConnection = Depends(_get_redis_connection),
):
"""Triggers chunking for a specific act."""
normalized_celex = require_valid_celex(celex)
return await submit_job(
redis_conn,
job_type="chunk_act",
channel="chunk_jobs",
params={"celex": normalized_celex, "force": force},
message=f"Chunking job queued for act: {normalized_celex}",
)
[docs]
@router.get("/chunk/{job_id}/status", response_model=JobStatus)
async def get_chunk_status(job_id: str, redis_conn: RedisConnection = Depends(_get_redis_connection)):
"""Checks progress of a chunk job."""
return await get_job_status(redis_conn, job_id)
# =============================================================================
# EMBEDDING ENDPOINTS
# =============================================================================
[docs]
@router.post("/embed/all", response_model=JobResponse)
@limiter.limit(LIMIT_JOBS)
async def embed_all(
request: Request,
batch_size: int = JOB_EMBED_BATCH_SIZE,
redis_conn: RedisConnection = Depends(_get_redis_connection),
):
"""Triggers embedding for all indexing-enabled presets."""
del request
return await _submit_embedding_jobs(
job_type="embed_all",
redis_conn=redis_conn,
params={"batch_size": batch_size},
message_prefix="Embedding job queued for all indexing-enabled presets",
)
[docs]
@router.post("/embed/{celex}", response_model=JobResponse)
@limiter.limit(LIMIT_JOBS)
async def embed_act(
request: Request,
celex: str,
batch_size: int = JOB_EMBED_BATCH_SIZE,
redis_conn: RedisConnection = Depends(_get_redis_connection),
):
"""Triggers chunk and act embeddings for a specific act on all indexing-enabled presets."""
del request
normalized_celex = require_valid_celex(celex)
return await _submit_embedding_jobs(
job_type="embed_act",
redis_conn=redis_conn,
params={"celex": normalized_celex, "batch_size": batch_size},
message_prefix=f"Embedding job queued for act {normalized_celex} across indexing-enabled presets",
)
[docs]
@router.get("/embed/{job_id}/status", response_model=JobStatus)
async def get_embed_status(job_id: str, redis_conn: RedisConnection = Depends(_get_redis_connection)):
"""Checks progress of an embedding job."""
return await get_job_status(redis_conn, job_id)
# =============================================================================
# EXTRACTION ENDPOINTS
# =============================================================================