Source code for api_gateway.routers.jobs

"""
Job Management Router
Handles async job submission and status tracking for chunk, embed, and extraction workers.
"""

import logging
from typing import Any, cast

from api_gateway.job_service import (
    JobResponse,
    JobStatus,
    get_job_status,
    require_valid_celex,
    submit_job,
)
from api_gateway.rate_limit import LIMIT_JOBS, limiter
from fastapi import APIRouter, Depends, HTTPException, Request
from lalandre_core.config import get_gateway_config
from lalandre_core.embedding_presets import list_embedding_presets
from lalandre_core.runtime_values import (
    require_bool,
    require_float,
    require_int,
)

gateway = get_gateway_config()
JOB_CHUNK_MIN_CONTENT_LENGTH = require_int(gateway.job_chunk_min_content_length, "JOB_CHUNK_MIN_CONTENT_LENGTH")
JOB_EMBED_BATCH_SIZE = require_int(gateway.job_embed_batch_size, "JOB_EMBED_BATCH_SIZE")
JOB_EXTRACT_MIN_CONFIDENCE = require_float(gateway.job_extract_min_confidence, "JOB_EXTRACT_MIN_CONFIDENCE")
JOB_EXTRACT_SKIP_EXISTING_DEFAULT = require_bool(
    gateway.job_extract_skip_existing_default, "JOB_EXTRACT_SKIP_EXISTING_DEFAULT"
)

logger = logging.getLogger(__name__)

router = APIRouter(prefix="/api/v1", tags=["jobs"])

RedisConnection = Any


def _get_redis_connection(request: Request) -> RedisConnection:
    redis_conn = getattr(request.app.state, "redis", None)
    if redis_conn is None:
        raise HTTPException(status_code=503, detail="Redis connection not initialized")
    return cast(RedisConnection, redis_conn)


async def _submit_embedding_jobs(
    *,
    redis_conn: RedisConnection,
    job_type: str,
    params: dict[str, Any],
    message_prefix: str,
) -> JobResponse:
    presets = list_embedding_presets(indexing_only=True)
    if not presets:
        raise HTTPException(status_code=503, detail="No indexing-enabled embedding presets are configured")

    job_ids: list[str] = []
    preset_ids: list[str] = []
    submitted_at: str | None = None

    for preset in presets:
        response = await submit_job(
            redis_conn,
            job_type=job_type,
            channel=preset.resolved_queue_name(),
            params=params,
            message="",
        )
        if submitted_at is None:
            submitted_at = response.submitted_at
        job_ids.append(response.job_id)
        preset_ids.append(preset.preset_id)

    return JobResponse(
        job_id=job_ids[0],
        job_ids=job_ids,
        presets=preset_ids,
        status="queued",
        message=f"{message_prefix}: {', '.join(preset_ids)}",
        submitted_at=submitted_at or "",
    )


# =============================================================================
# CHUNKING ENDPOINTS
# =============================================================================


[docs] @router.post("/chunk/all", response_model=JobResponse) @limiter.limit(LIMIT_JOBS) async def chunk_all( request: Request, min_content_length: int = JOB_CHUNK_MIN_CONTENT_LENGTH, redis_conn: RedisConnection = Depends(_get_redis_connection), ): """Triggers chunking for all downloaded acts.""" return await submit_job( redis_conn, job_type="chunk_all", channel="chunk_jobs", params={"min_content_length": min_content_length}, message="Chunking job queued for all acts", )
[docs] @router.post("/chunk/{celex}", response_model=JobResponse) @limiter.limit(LIMIT_JOBS) async def chunk_act( request: Request, celex: str, force: bool = False, redis_conn: RedisConnection = Depends(_get_redis_connection), ): """Triggers chunking for a specific act.""" normalized_celex = require_valid_celex(celex) return await submit_job( redis_conn, job_type="chunk_act", channel="chunk_jobs", params={"celex": normalized_celex, "force": force}, message=f"Chunking job queued for act: {normalized_celex}", )
[docs] @router.get("/chunk/{job_id}/status", response_model=JobStatus) async def get_chunk_status(job_id: str, redis_conn: RedisConnection = Depends(_get_redis_connection)): """Checks progress of a chunk job.""" return await get_job_status(redis_conn, job_id)
# ============================================================================= # EMBEDDING ENDPOINTS # =============================================================================
[docs] @router.post("/embed/all", response_model=JobResponse) @limiter.limit(LIMIT_JOBS) async def embed_all( request: Request, batch_size: int = JOB_EMBED_BATCH_SIZE, redis_conn: RedisConnection = Depends(_get_redis_connection), ): """Triggers embedding for all indexing-enabled presets.""" del request return await _submit_embedding_jobs( job_type="embed_all", redis_conn=redis_conn, params={"batch_size": batch_size}, message_prefix="Embedding job queued for all indexing-enabled presets", )
[docs] @router.post("/embed/{celex}", response_model=JobResponse) @limiter.limit(LIMIT_JOBS) async def embed_act( request: Request, celex: str, batch_size: int = JOB_EMBED_BATCH_SIZE, redis_conn: RedisConnection = Depends(_get_redis_connection), ): """Triggers chunk and act embeddings for a specific act on all indexing-enabled presets.""" del request normalized_celex = require_valid_celex(celex) return await _submit_embedding_jobs( job_type="embed_act", redis_conn=redis_conn, params={"celex": normalized_celex, "batch_size": batch_size}, message_prefix=f"Embedding job queued for act {normalized_celex} across indexing-enabled presets", )
[docs] @router.get("/embed/{job_id}/status", response_model=JobStatus) async def get_embed_status(job_id: str, redis_conn: RedisConnection = Depends(_get_redis_connection)): """Checks progress of an embedding job.""" return await get_job_status(redis_conn, job_id)
# ============================================================================= # EXTRACTION ENDPOINTS # =============================================================================
[docs] @router.post("/extract/all", response_model=JobResponse) @limiter.limit(LIMIT_JOBS) async def extract_all( request: Request, min_confidence: float = JOB_EXTRACT_MIN_CONFIDENCE, skip_existing: bool = JOB_EXTRACT_SKIP_EXISTING_DEFAULT, redis_conn: RedisConnection = Depends(_get_redis_connection), ): """Triggers relation extraction for all processed chunks.""" return await submit_job( redis_conn, job_type="extract_all", channel="extract_jobs", params={ "min_confidence": min_confidence, "skip_existing": skip_existing, }, message="Relation extraction job queued for all acts", )
[docs] @router.post("/extract/{celex}", response_model=JobResponse) @limiter.limit(LIMIT_JOBS) async def extract_act( request: Request, celex: str, min_confidence: float = JOB_EXTRACT_MIN_CONFIDENCE, force: bool = False, redis_conn: RedisConnection = Depends(_get_redis_connection), ): """Triggers relation extraction for a specific act.""" normalized_celex = require_valid_celex(celex) return await submit_job( redis_conn, job_type="extract_act", channel="extract_jobs", params={ "celex": normalized_celex, "min_confidence": min_confidence, "force": force, }, message=f"Relation extraction job queued for act: {normalized_celex}", )
[docs] @router.get("/extract/{job_id}/status", response_model=JobStatus) async def get_extract_status(job_id: str, redis_conn: RedisConnection = Depends(_get_redis_connection)): """Checks progress of an extract job.""" return await get_job_status(redis_conn, job_id)