"""Shared job helpers — Redis job submission and status retrieval."""
import json
import logging
import time
import uuid
from datetime import datetime, timezone
from typing import Any, Dict, Optional
from fastapi import HTTPException
from lalandre_core.config import get_gateway_config
from lalandre_core.runtime_values import require_int
from lalandre_core.utils import is_valid_celex, normalize_celex
from pydantic import BaseModel
gateway = get_gateway_config()
JOB_TTL_SECONDS = require_int(gateway.job_ttl_seconds, "JOB_TTL_SECONDS")
logger = logging.getLogger(__name__)
RedisConnection = Any
[docs]
class JobResponse(BaseModel):
"""Generic job submission response."""
job_id: str
status: str
message: str
submitted_at: str
job_ids: Optional[list[str]] = None
presets: Optional[list[str]] = None
[docs]
class JobStatus(BaseModel):
"""Job status details."""
job_id: str
status: str
type: str
created_at: float
progress: Optional[Dict[str, Any]] = None
result: Optional[Dict[str, Any]] = None
error: Optional[str] = None
[docs]
def require_valid_celex(celex: str) -> str:
"""Normalize and validate CELEX format; raise HTTP 400 if unrecognisable."""
normalized = normalize_celex(celex)
if not is_valid_celex(normalized):
raise HTTPException(
status_code=400,
detail=(
f"Invalid or unrecognised CELEX identifier: '{celex}'. "
"Expected formats: 32016R0679, 2003/41/CE, AMF-RG-..., EBA-..., ESMA-..."
),
)
return normalized
[docs]
async def submit_job(
redis_conn: RedisConnection,
job_type: str,
channel: str,
params: Dict[str, Any],
message: str = "",
) -> JobResponse:
"""Push a job to a Redis queue and initialise its tracking key."""
job_id = str(uuid.uuid4())
submitted_at = datetime.now(timezone.utc).isoformat()
await redis_conn.lpush(
channel,
json.dumps(
{
"job_id": job_id,
"type": job_type,
"params": params,
"submitted_at": submitted_at,
}
),
)
job_key = f"job:{job_id}"
await redis_conn.hset(
job_key,
mapping={
"status": "queued",
"type": job_type,
"created_at": time.time(),
},
)
await redis_conn.expire(job_key, JOB_TTL_SECONDS)
logger.info("Job queued [%s]: %s", job_type, job_id)
return JobResponse(job_id=job_id, status="queued", message=message, submitted_at=submitted_at)
[docs]
async def get_job_status(redis_conn: RedisConnection, job_id: str) -> JobStatus:
"""Retrieve job status from Redis."""
job_data: Dict[str, Any] = await redis_conn.hgetall(f"job:{job_id}")
if not job_data:
raise HTTPException(status_code=404, detail=f"Job {job_id} not found")
progress: Optional[Dict[str, Any]] = None
raw_progress = job_data.get("progress")
if raw_progress is not None:
try:
parsed = json.loads(raw_progress)
progress = parsed if isinstance(parsed, dict) else {"value": str(raw_progress)}
except Exception as exc:
logger.debug("Job %s: progress JSON parse failed: %s", job_id, exc)
progress = {"value": str(raw_progress)}
result: Optional[Dict[str, Any]] = None
raw_result = job_data.get("result")
if raw_result is not None:
try:
parsed_result = json.loads(raw_result)
result = parsed_result if isinstance(parsed_result, dict) else None
except Exception as exc:
logger.debug("Job %s: result JSON parse failed: %s", job_id, exc)
return JobStatus(
job_id=job_id,
status=str(job_data.get("status") or "unknown"),
type=str(job_data.get("type") or "unknown"),
created_at=float(job_data.get("created_at") or 0.0),
progress=progress,
result=result,
error=str(job_data["error"]) if job_data.get("error") is not None else None,
)