Source code for lalandre_core.queue.reconcile

"""
shared Redis lock pattern for worker auto-reconcile.

Each worker calls ``with_reconcile_lock`` to acquire a distributed Redis lock,
run its domain-specific reconcile check, and release the lock.
"""

import logging
from typing import Any, Callable

logger = logging.getLogger(__name__)


[docs] def with_reconcile_lock( redis_client: Any, lock_key: str, lock_ttl: int, action: Callable[[], None], ) -> None: """Acquire a Redis NX lock, execute *action*, then release. * If the lock is already held, returns silently. * If Redis is unreachable, logs a warning and returns. * The lock is **always** released in the ``finally`` block. """ try: acquired = redis_client.set(lock_key, "1", nx=True, ex=lock_ttl) except Exception as exc: logger.warning(f"[Reconcile] Redis unavailable, skipping: {exc}") return if not acquired: return try: action() finally: try: redis_client.delete(lock_key) except Exception: pass