"""
worker-loop utilities for Redis-based job workers.
Provides reusable building blocks (functions, not a class hierarchy)
"""
import json
import logging
import time
from datetime import datetime
from time import perf_counter
from typing import Any, Callable, cast
from ..redis_client import create_sync_redis_client
from .job_queue import update_job_status
logger = logging.getLogger(__name__)
# ── Common runtime parameters ────────────────────────────────────────────
[docs]
class BaseRuntimeParams:
"""Common parameters resolved identically by every worker."""
__slots__ = ("redis_client", "job_ttl_seconds", "brpop_timeout_seconds")
def __init__(
self,
redis_client: Any,
job_ttl_seconds: int,
brpop_timeout_seconds: int,
) -> None:
self.redis_client = redis_client
self.job_ttl_seconds = job_ttl_seconds
self.brpop_timeout_seconds = brpop_timeout_seconds
[docs]
def resolve_base_runtime_params(
*,
redis_host: str | None,
redis_port: int | None,
job_ttl_seconds: int | None,
brpop_timeout_seconds: int,
) -> BaseRuntimeParams:
"""Resolve Redis client + base tunables shared by all workers."""
if redis_host is None:
raise ValueError("gateway.redis_host must be configured")
if redis_port is None:
raise ValueError("gateway.redis_port must be configured")
if job_ttl_seconds is None:
raise ValueError("gateway.job_ttl_seconds must be configured")
redis_client = create_sync_redis_client(host=redis_host, port=redis_port)
return BaseRuntimeParams(
redis_client=redis_client,
job_ttl_seconds=job_ttl_seconds,
brpop_timeout_seconds=brpop_timeout_seconds,
)
# ── Job parsing ──────────────────────────────────────────────────────────
[docs]
def parse_job_payload(
job_data: dict[str, Any],
) -> tuple[str, str, dict[str, Any]]:
"""Extract ``(job_id, job_type, params)`` from a raw job dict.
Returns empty strings when required fields are missing or have the wrong
type so that callers can validate cheaply.
"""
job_id_value: Any = job_data.get("job_id")
job_type_value: Any = job_data.get("type")
params_value: Any = job_data.get("params", {})
job_id: str = job_id_value if isinstance(job_id_value, str) else ""
job_type: str = job_type_value if isinstance(job_type_value, str) else ""
params: dict[str, Any] = cast(dict[str, Any], params_value) if isinstance(params_value, dict) else {}
return job_id, job_type, params
# ── Instrumented job processing ──────────────────────────────────────────
[docs]
def instrumented_process_job(
*,
runtime: Any,
job_data: dict[str, Any],
dispatch: dict[str, Callable[[Any, str, dict[str, Any]], None]],
observe_execution: Callable[..., None],
observe_error: Callable[..., None],
) -> None:
"""Parse, dispatch, and instrument a single job.
Args:
runtime: Worker runtime instance passed to the dispatched handlers.
job_data: Raw deserialized job payload fetched from Redis.
dispatch: Mapping of ``job_type`` to ``handler(runtime, job_id, params)``.
observe_execution: Observer called in ``finally`` with execution metrics.
observe_error: Observer called when the handler raises an exception.
"""
started_at = perf_counter()
job_type = "invalid_payload"
outcome = "invalid_payload"
job_id, job_type, params = parse_job_payload(job_data)
if not job_id:
logger.error("Invalid job payload: missing job_id")
observe_execution(
job_type=job_type,
outcome=outcome,
duration_seconds=perf_counter() - started_at,
)
return
logger.info("Processing job %s (type: %s)", job_id, job_type)
try:
handler = dispatch.get(job_type)
if handler is not None:
handler(runtime, job_id, params)
outcome = "success"
else:
outcome = "unknown_job_type"
logger.error("Unknown job type: %s", job_type)
update_job_status(runtime, job_id, "failed", f"Unknown job type: {job_type}")
except Exception as exc:
outcome = "failed"
observe_error(job_type=job_type, exc_or_reason=exc)
raise
finally:
observe_execution(
job_type=job_type,
outcome=outcome,
duration_seconds=perf_counter() - started_at,
)
# ── Main BRPOP loop ─────────────────────────────────────────────────────
def _in_reconcile_window(hour_start: int, hour_end: int) -> bool:
"""Return True if the current UTC hour falls within [hour_start, hour_end)."""
current_hour = datetime.utcnow().hour
if hour_start <= hour_end:
return hour_start <= current_hour < hour_end
# Wrap around midnight (e.g. 22 → 2)
return current_hour >= hour_start or current_hour < hour_end
[docs]
def run_worker_loop(
*,
queue_name: str,
worker_name: str,
redis_client: Any,
brpop_timeout_seconds: int,
process_job: Callable[[dict[str, Any]], None],
reconcile_callback: Callable[[], None] | None = None,
reconcile_interval_seconds: int = 0,
reconcile_hour_start: int = 22,
reconcile_hour_end: int = 24,
) -> None:
"""Generic BRPOP loop shared by all workers.
Args:
queue_name: Redis list to ``BRPOP`` from.
worker_name: Human-readable worker name used in log messages.
redis_client: Synchronous Redis client backing the worker loop.
brpop_timeout_seconds: Polling timeout passed to ``BRPOP``.
process_job: Callback invoked with each deserialized payload.
reconcile_callback: Optional reconciliation callback executed periodically.
reconcile_interval_seconds: Seconds between two reconciliation runs.
reconcile_hour_start: UTC hour at which the reconciliation window opens.
reconcile_hour_end: UTC hour at which the reconciliation window closes.
"""
logger.info(f"{worker_name} started")
logger.info(f"Waiting for jobs on '{queue_name}' queue...")
last_reconcile = time.time()
try:
while True:
result = redis_client.brpop(
[queue_name],
timeout=brpop_timeout_seconds,
)
if result is None:
if (
reconcile_interval_seconds > 0
and reconcile_callback is not None
and (time.time() - last_reconcile) >= reconcile_interval_seconds
and _in_reconcile_window(reconcile_hour_start, reconcile_hour_end)
):
logger.info("Reconciliation window active, running reconcile...")
reconcile_callback()
last_reconcile = time.time()
continue
_, message_data = result
try:
job_data_raw: Any = json.loads(message_data)
if not isinstance(job_data_raw, dict):
raise ValueError("Job payload must be a JSON object")
job_data: dict[str, Any] = cast(dict[str, Any], job_data_raw)
process_job(job_data)
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON: {e}")
except Exception as e:
logger.error(f"Job processing error: {e}", exc_info=True)
except KeyboardInterrupt:
logger.info("Worker stopped by user")
finally:
logger.info("Worker shutdown complete")