Source code for lalandre_core.queue.dispatch_all

"""Generic dispatch-all-acts helper shared by workers."""

import logging
from typing import Any, Callable

from ..utils import normalize_celex
from .job_queue import QueueRuntime, enqueue_job, update_job_status

logger = logging.getLogger(__name__)


[docs] def dispatch_all_act_jobs( *, runtime: QueueRuntime, job_id: str, queue_name: str, job_type: str, label: str, acts: list[Any], build_params: Callable[[Any], dict[str, Any]], skip_filter: Callable[[Any], bool] | None = None, error_label: str = "Processing", ) -> None: """Iterate over acts and enqueue one per-act job with progress tracking. Args: runtime: Queue runtime containing the Redis client and TTL policy. job_id: Parent job identifier used for status updates. queue_name: Redis queue that receives the child jobs. job_type: Job type string for the child jobs, such as ``"chunk_act"``. label: Human-readable label used in log and status messages. acts: Iterable of act objects exposing a ``celex`` attribute. build_params: Callable receiving one CELEX value and returning job params. skip_filter: Optional predicate returning ``True`` when one act should be skipped. error_label: Verb phrase used in failure messages, such as ``"Chunking"``. """ logger.info("[Job %s] Dispatching %s jobs for all acts", job_id, label) update_job_status(runtime, job_id, "running", f"Dispatching {label} jobs...") try: total_acts = len(acts) if total_acts == 0: update_job_status(runtime, job_id, "completed", f"No acts to {label.split('_')[0]}", 100) return queued_count = 0 skipped_count = 0 invalid_count = 0 for index, act in enumerate(acts, start=1): celex = normalize_celex(str(getattr(act, "celex", ""))) if not celex: invalid_count += 1 elif skip_filter is not None and skip_filter(act): skipped_count += 1 else: queued_job_id = enqueue_job( runtime, queue_name=queue_name, job_type=job_type, params=build_params(celex), dedupe_celex=celex, ) if queued_job_id: queued_count += 1 else: skipped_count += 1 progress = int((index / total_acts) * 100) update_job_status( runtime, job_id, "running", ( f"Queued {label} for {index}/{total_acts} acts " f"(queued={queued_count}, skipped={skipped_count}, invalid={invalid_count})" ), progress, ) message = ( f"Queued {queued_count} {label} jobs (skipped {skipped_count} already queued, invalid {invalid_count})" ) logger.info("[Job %s] %s", job_id, message) update_job_status(runtime, job_id, "completed", message, 100) except Exception as e: error_msg = f"{error_label} failed: {e}" logger.error("[Job %s] %s", job_id, error_msg, exc_info=True) update_job_status(runtime, job_id, "failed", error_msg) raise