Source code for lalandre_core.queue.worker_config
"""Shared config accessor helpers for Redis-based workers."""
from typing import Any
from ..config import get_config
[docs]
def require_gateway_config(field: str) -> Any:
"""Read a mandatory gateway config field; raise if None."""
value = getattr(get_config().gateway, field, None)
if value is None:
raise ValueError(f"gateway.{field} must be configured")
return value
[docs]
def get_reconcile_params(prefix: str) -> tuple[bool, int, int]:
"""Return ``(enabled, ttl, interval)`` for a worker reconcile loop.
``prefix`` is the worker kind, e.g. ``"embed"``, ``"chunk"``, ``"extract"``.
"""
workers = get_config().workers
enabled: bool = getattr(workers, f"auto_{prefix}_reconcile")
ttl: int = getattr(workers, f"auto_{prefix}_reconcile_ttl")
interval: int = getattr(workers, f"auto_{prefix}_reconcile_interval")
return enabled, ttl, interval