Source code for lalandre_core.queue.job_queue

"""
Shared Redis queue helpers for worker services.

Centralizes job enqueue/dedup/status operations used by chunking, embedding,
and extraction workers.
"""

import json
import time
import uuid
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any, TypedDict, cast

from ..utils import normalize_celex


[docs] @dataclass class QueueRuntime: """Base runtime data required by queue helper functions.""" redis_client: Any # redis.Redis at runtime job_ttl_seconds: int
[docs] class JobPayload(TypedDict, total=False): """Typed representation of one serialized Redis job payload.""" job_id: str type: str params: dict[str, Any] submitted_at: str
[docs] def update_job_status( runtime: QueueRuntime, job_id: str, status: str, message: str | None = None, progress: int | float | None = None, ttl: int | None = None, ) -> None: """Update job status metadata in Redis.""" if ttl is None: ttl = runtime.job_ttl_seconds job_key = f"job:{job_id}" runtime.redis_client.hset(job_key, "status", status) if message: runtime.redis_client.hset(job_key, "message", message) if progress is not None: runtime.redis_client.hset(job_key, "progress", json.dumps({"value": progress})) runtime.redis_client.hset(job_key, "updated_at", runtime.redis_client.time()[0]) runtime.redis_client.expire(job_key, ttl)
[docs] def job_already_queued( runtime: QueueRuntime, *, queue_name: str, job_type: str, celex: str | None = None, ) -> bool: """ Check whether a matching job is already queued. If `celex` is provided, deduplication is scoped to that CELEX value. """ try: messages: list[str] = runtime.redis_client.lrange(queue_name, 0, -1) except Exception: return False for message in messages: try: loaded: Any = json.loads(message) except Exception: continue if not isinstance(loaded, dict): continue payload = cast(dict[str, Any], loaded) if payload.get("type") != job_type: continue if celex is None: return True params_value: Any = payload.get("params", {}) if not isinstance(params_value, dict): continue params = cast(dict[str, Any], params_value) raw_celex: Any = params.get("celex") payload_celex = normalize_celex(raw_celex) if isinstance(raw_celex, str) else "" if payload_celex == celex: return True return False
[docs] def enqueue_job( runtime: QueueRuntime, *, queue_name: str, job_type: str, params: dict[str, Any], dedupe_celex: str | None = None, ) -> str | None: """Push a job into queue + initialize status hash, with optional dedupe.""" if job_already_queued( runtime, queue_name=queue_name, job_type=job_type, celex=dedupe_celex, ): return None job_id = str(uuid.uuid4()) payload: JobPayload = { "job_id": job_id, "type": job_type, "params": params, "submitted_at": datetime.now(timezone.utc).isoformat(), } runtime.redis_client.lpush(queue_name, json.dumps(payload)) runtime.redis_client.hset( f"job:{job_id}", mapping={ "status": "queued", "type": job_type, "created_at": time.time(), }, ) runtime.redis_client.expire(f"job:{job_id}", runtime.job_ttl_seconds) return job_id