"""
Shared Redis queue helpers for worker services.
Centralizes job enqueue/dedup/status operations used by chunking, embedding,
and extraction workers.
"""
import json
import time
import uuid
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any, TypedDict, cast
from ..utils import normalize_celex
[docs]
@dataclass
class QueueRuntime:
"""Base runtime data required by queue helper functions."""
redis_client: Any # redis.Redis at runtime
job_ttl_seconds: int
[docs]
class JobPayload(TypedDict, total=False):
"""Typed representation of one serialized Redis job payload."""
job_id: str
type: str
params: dict[str, Any]
submitted_at: str
[docs]
def update_job_status(
runtime: QueueRuntime,
job_id: str,
status: str,
message: str | None = None,
progress: int | float | None = None,
ttl: int | None = None,
) -> None:
"""Update job status metadata in Redis."""
if ttl is None:
ttl = runtime.job_ttl_seconds
job_key = f"job:{job_id}"
runtime.redis_client.hset(job_key, "status", status)
if message:
runtime.redis_client.hset(job_key, "message", message)
if progress is not None:
runtime.redis_client.hset(job_key, "progress", json.dumps({"value": progress}))
runtime.redis_client.hset(job_key, "updated_at", runtime.redis_client.time()[0])
runtime.redis_client.expire(job_key, ttl)
[docs]
def job_already_queued(
runtime: QueueRuntime,
*,
queue_name: str,
job_type: str,
celex: str | None = None,
) -> bool:
"""
Check whether a matching job is already queued.
If `celex` is provided, deduplication is scoped to that CELEX value.
"""
try:
messages: list[str] = runtime.redis_client.lrange(queue_name, 0, -1)
except Exception:
return False
for message in messages:
try:
loaded: Any = json.loads(message)
except Exception:
continue
if not isinstance(loaded, dict):
continue
payload = cast(dict[str, Any], loaded)
if payload.get("type") != job_type:
continue
if celex is None:
return True
params_value: Any = payload.get("params", {})
if not isinstance(params_value, dict):
continue
params = cast(dict[str, Any], params_value)
raw_celex: Any = params.get("celex")
payload_celex = normalize_celex(raw_celex) if isinstance(raw_celex, str) else ""
if payload_celex == celex:
return True
return False
[docs]
def enqueue_job(
runtime: QueueRuntime,
*,
queue_name: str,
job_type: str,
params: dict[str, Any],
dedupe_celex: str | None = None,
) -> str | None:
"""Push a job into queue + initialize status hash, with optional dedupe."""
if job_already_queued(
runtime,
queue_name=queue_name,
job_type=job_type,
celex=dedupe_celex,
):
return None
job_id = str(uuid.uuid4())
payload: JobPayload = {
"job_id": job_id,
"type": job_type,
"params": params,
"submitted_at": datetime.now(timezone.utc).isoformat(),
}
runtime.redis_client.lpush(queue_name, json.dumps(payload))
runtime.redis_client.hset(
f"job:{job_id}",
mapping={
"status": "queued",
"type": job_type,
"created_at": time.time(),
},
)
runtime.redis_client.expire(f"job:{job_id}", runtime.job_ttl_seconds)
return job_id