Source code for lalandre_core.queue.reconcile
"""
shared Redis lock pattern for worker auto-reconcile.
Each worker calls ``with_reconcile_lock`` to acquire a distributed Redis lock,
run its domain-specific reconcile check, and release the lock.
"""
import logging
from typing import Any, Callable
logger = logging.getLogger(__name__)
[docs]
def with_reconcile_lock(
redis_client: Any,
lock_key: str,
lock_ttl: int,
action: Callable[[], None],
) -> None:
"""Acquire a Redis NX lock, execute *action*, then release.
* If the lock is already held, returns silently.
* If Redis is unreachable, logs a warning and returns.
* The lock is **always** released in the ``finally`` block.
"""
try:
acquired = redis_client.set(lock_key, "1", nx=True, ex=lock_ttl)
except Exception as exc:
logger.warning(f"[Reconcile] Redis unavailable, skipping: {exc}")
return
if not acquired:
return
try:
action()
finally:
try:
redis_client.delete(lock_key)
except Exception:
pass