Source code for lalandre_core.queue.worker_loop

"""
worker-loop utilities for Redis-based job workers.

Provides reusable building blocks (functions, not a class hierarchy)
"""

import json
import logging
import time
from datetime import datetime
from time import perf_counter
from typing import Any, Callable, cast

from ..redis_client import create_sync_redis_client
from .job_queue import update_job_status

logger = logging.getLogger(__name__)


# ── Common runtime parameters ────────────────────────────────────────────


[docs] class BaseRuntimeParams: """Common parameters resolved identically by every worker.""" __slots__ = ("redis_client", "job_ttl_seconds", "brpop_timeout_seconds") def __init__( self, redis_client: Any, job_ttl_seconds: int, brpop_timeout_seconds: int, ) -> None: self.redis_client = redis_client self.job_ttl_seconds = job_ttl_seconds self.brpop_timeout_seconds = brpop_timeout_seconds
[docs] def resolve_base_runtime_params( *, redis_host: str | None, redis_port: int | None, job_ttl_seconds: int | None, brpop_timeout_seconds: int, ) -> BaseRuntimeParams: """Resolve Redis client + base tunables shared by all workers.""" if redis_host is None: raise ValueError("gateway.redis_host must be configured") if redis_port is None: raise ValueError("gateway.redis_port must be configured") if job_ttl_seconds is None: raise ValueError("gateway.job_ttl_seconds must be configured") redis_client = create_sync_redis_client(host=redis_host, port=redis_port) return BaseRuntimeParams( redis_client=redis_client, job_ttl_seconds=job_ttl_seconds, brpop_timeout_seconds=brpop_timeout_seconds, )
# ── Job parsing ──────────────────────────────────────────────────────────
[docs] def parse_job_payload( job_data: dict[str, Any], ) -> tuple[str, str, dict[str, Any]]: """Extract ``(job_id, job_type, params)`` from a raw job dict. Returns empty strings when required fields are missing or have the wrong type so that callers can validate cheaply. """ job_id_value: Any = job_data.get("job_id") job_type_value: Any = job_data.get("type") params_value: Any = job_data.get("params", {}) job_id: str = job_id_value if isinstance(job_id_value, str) else "" job_type: str = job_type_value if isinstance(job_type_value, str) else "" params: dict[str, Any] = cast(dict[str, Any], params_value) if isinstance(params_value, dict) else {} return job_id, job_type, params
# ── Instrumented job processing ──────────────────────────────────────────
[docs] def instrumented_process_job( *, runtime: Any, job_data: dict[str, Any], dispatch: dict[str, Callable[[Any, str, dict[str, Any]], None]], observe_execution: Callable[..., None], observe_error: Callable[..., None], ) -> None: """Parse, dispatch, and instrument a single job. Args: runtime: Worker runtime instance passed to the dispatched handlers. job_data: Raw deserialized job payload fetched from Redis. dispatch: Mapping of ``job_type`` to ``handler(runtime, job_id, params)``. observe_execution: Observer called in ``finally`` with execution metrics. observe_error: Observer called when the handler raises an exception. """ started_at = perf_counter() job_type = "invalid_payload" outcome = "invalid_payload" job_id, job_type, params = parse_job_payload(job_data) if not job_id: logger.error("Invalid job payload: missing job_id") observe_execution( job_type=job_type, outcome=outcome, duration_seconds=perf_counter() - started_at, ) return logger.info("Processing job %s (type: %s)", job_id, job_type) try: handler = dispatch.get(job_type) if handler is not None: handler(runtime, job_id, params) outcome = "success" else: outcome = "unknown_job_type" logger.error("Unknown job type: %s", job_type) update_job_status(runtime, job_id, "failed", f"Unknown job type: {job_type}") except Exception as exc: outcome = "failed" observe_error(job_type=job_type, exc_or_reason=exc) raise finally: observe_execution( job_type=job_type, outcome=outcome, duration_seconds=perf_counter() - started_at, )
# ── Main BRPOP loop ───────────────────────────────────────────────────── def _in_reconcile_window(hour_start: int, hour_end: int) -> bool: """Return True if the current UTC hour falls within [hour_start, hour_end).""" current_hour = datetime.utcnow().hour if hour_start <= hour_end: return hour_start <= current_hour < hour_end # Wrap around midnight (e.g. 22 → 2) return current_hour >= hour_start or current_hour < hour_end
[docs] def run_worker_loop( *, queue_name: str, worker_name: str, redis_client: Any, brpop_timeout_seconds: int, process_job: Callable[[dict[str, Any]], None], reconcile_callback: Callable[[], None] | None = None, reconcile_interval_seconds: int = 0, reconcile_hour_start: int = 22, reconcile_hour_end: int = 24, ) -> None: """Generic BRPOP loop shared by all workers. Args: queue_name: Redis list to ``BRPOP`` from. worker_name: Human-readable worker name used in log messages. redis_client: Synchronous Redis client backing the worker loop. brpop_timeout_seconds: Polling timeout passed to ``BRPOP``. process_job: Callback invoked with each deserialized payload. reconcile_callback: Optional reconciliation callback executed periodically. reconcile_interval_seconds: Seconds between two reconciliation runs. reconcile_hour_start: UTC hour at which the reconciliation window opens. reconcile_hour_end: UTC hour at which the reconciliation window closes. """ logger.info(f"{worker_name} started") logger.info(f"Waiting for jobs on '{queue_name}' queue...") last_reconcile = time.time() try: while True: result = redis_client.brpop( [queue_name], timeout=brpop_timeout_seconds, ) if result is None: if ( reconcile_interval_seconds > 0 and reconcile_callback is not None and (time.time() - last_reconcile) >= reconcile_interval_seconds and _in_reconcile_window(reconcile_hour_start, reconcile_hour_end) ): logger.info("Reconciliation window active, running reconcile...") reconcile_callback() last_reconcile = time.time() continue _, message_data = result try: job_data_raw: Any = json.loads(message_data) if not isinstance(job_data_raw, dict): raise ValueError("Job payload must be a JSON object") job_data: dict[str, Any] = cast(dict[str, Any], job_data_raw) process_job(job_data) except json.JSONDecodeError as e: logger.error(f"Invalid JSON: {e}") except Exception as e: logger.error(f"Job processing error: {e}", exc_info=True) except KeyboardInterrupt: logger.info("Worker stopped by user") finally: logger.info("Worker shutdown complete")