Source code for lalandre_core.queue.worker_config

"""Shared config accessor helpers for Redis-based workers."""

from typing import Any

from ..config import get_config


[docs] def require_gateway_config(field: str) -> Any: """Read a mandatory gateway config field; raise if None.""" value = getattr(get_config().gateway, field, None) if value is None: raise ValueError(f"gateway.{field} must be configured") return value
[docs] def get_reconcile_params(prefix: str) -> tuple[bool, int, int]: """Return ``(enabled, ttl, interval)`` for a worker reconcile loop. ``prefix`` is the worker kind, e.g. ``"embed"``, ``"chunk"``, ``"extract"``. """ workers = get_config().workers enabled: bool = getattr(workers, f"auto_{prefix}_reconcile") ttl: int = getattr(workers, f"auto_{prefix}_reconcile_ttl") interval: int = getattr(workers, f"auto_{prefix}_reconcile_interval") return enabled, ttl, interval