"""Generic dispatch-all-acts helper shared by workers."""
import logging
from typing import Any, Callable
from ..utils import normalize_celex
from .job_queue import QueueRuntime, enqueue_job, update_job_status
logger = logging.getLogger(__name__)
[docs]
def dispatch_all_act_jobs(
*,
runtime: QueueRuntime,
job_id: str,
queue_name: str,
job_type: str,
label: str,
acts: list[Any],
build_params: Callable[[Any], dict[str, Any]],
skip_filter: Callable[[Any], bool] | None = None,
error_label: str = "Processing",
) -> None:
"""Iterate over acts and enqueue one per-act job with progress tracking.
Args:
runtime: Queue runtime containing the Redis client and TTL policy.
job_id: Parent job identifier used for status updates.
queue_name: Redis queue that receives the child jobs.
job_type: Job type string for the child jobs, such as ``"chunk_act"``.
label: Human-readable label used in log and status messages.
acts: Iterable of act objects exposing a ``celex`` attribute.
build_params: Callable receiving one CELEX value and returning job params.
skip_filter: Optional predicate returning ``True`` when one act should be skipped.
error_label: Verb phrase used in failure messages, such as ``"Chunking"``.
"""
logger.info("[Job %s] Dispatching %s jobs for all acts", job_id, label)
update_job_status(runtime, job_id, "running", f"Dispatching {label} jobs...")
try:
total_acts = len(acts)
if total_acts == 0:
update_job_status(runtime, job_id, "completed", f"No acts to {label.split('_')[0]}", 100)
return
queued_count = 0
skipped_count = 0
invalid_count = 0
for index, act in enumerate(acts, start=1):
celex = normalize_celex(str(getattr(act, "celex", "")))
if not celex:
invalid_count += 1
elif skip_filter is not None and skip_filter(act):
skipped_count += 1
else:
queued_job_id = enqueue_job(
runtime,
queue_name=queue_name,
job_type=job_type,
params=build_params(celex),
dedupe_celex=celex,
)
if queued_job_id:
queued_count += 1
else:
skipped_count += 1
progress = int((index / total_acts) * 100)
update_job_status(
runtime,
job_id,
"running",
(
f"Queued {label} for {index}/{total_acts} acts "
f"(queued={queued_count}, skipped={skipped_count}, invalid={invalid_count})"
),
progress,
)
message = (
f"Queued {queued_count} {label} jobs (skipped {skipped_count} already queued, invalid {invalid_count})"
)
logger.info("[Job %s] %s", job_id, message)
update_job_status(runtime, job_id, "completed", message, 100)
except Exception as e:
error_msg = f"{error_label} failed: {e}"
logger.error("[Job %s] %s", job_id, error_msg, exc_info=True)
update_job_status(runtime, job_id, "failed", error_msg)
raise