Source code for chunking_worker.bootstrap

"""
Bootstrap for chunking worker
"""

import logging
from dataclasses import dataclass
from typing import Any

from lalandre_chunking import get_chunker
from lalandre_core.config import get_config
from lalandre_core.embedding_presets import list_embedding_presets
from lalandre_core.utils.api_key_pool import APIKeyPool
from lalandre_db_postgres import PostgresRepository
from lalandre_db_qdrant.repository import QdrantRepository
from lalandre_embedding import EmbeddingService

logger = logging.getLogger(__name__)


[docs] @dataclass class ChunkingComponents: """Long-lived dependencies used by the chunking worker.""" pg_repo: PostgresRepository chunk_vector_repos: dict[str, QdrantRepository] act_vector_repos: dict[str, QdrantRepository] chunker: Any
[docs] def init_components() -> ChunkingComponents: """Initialize chunking worker components""" config = get_config() # Initialize repositories pg_repo = PostgresRepository(config.database.connection_string) base_chunk_collection_name = config.vector.collection_chunks base_act_collection_name = config.vector.collection_acts if not base_chunk_collection_name: raise ValueError("vector.collection_chunks must be set in app config") if not base_act_collection_name: raise ValueError("vector.collection_acts must be set in app config") chunk_vector_repos: dict[str, QdrantRepository] = {} act_vector_repos: dict[str, QdrantRepository] = {} for preset in list_embedding_presets(): chunk_collection_name = QdrantRepository.make_collection_name( base_chunk_collection_name, preset.model_name, preset.vector_size, ) act_collection_name = QdrantRepository.make_collection_name( base_act_collection_name, preset.model_name, preset.vector_size, ) chunk_vector_repos[preset.preset_id] = QdrantRepository( collection_name=chunk_collection_name, vector_size=preset.vector_size, ) act_vector_repos[preset.preset_id] = QdrantRepository( collection_name=act_collection_name, vector_size=preset.vector_size, ) # Workers use keys 6-10 to avoid rate-limit contention with RAG (keys 1-5) try: worker_key_pool = APIKeyPool.from_env("MISTRAL_API_KEY", start_index=6) except ValueError: worker_key_pool = None # Use the service wrapper so SAC chunking inherits token guards and # adaptive splitting for oversized sentence windows. logger.info("Initializing embedding service for SAC chunking") embedding_provider = EmbeddingService( provider=config.chunking.embedding.provider, model_name=config.chunking.embedding.model_name, device=config.chunking.embedding.device, key_pool=worker_key_pool, ) max_chunk_size = config.chunking.resolve_max_chunk_size(config.token_limits) chunker = get_chunker( embedding_provider=embedding_provider, min_chunk_size=config.chunking.min_chunk_size, max_chunk_size=max_chunk_size, chunk_overlap=config.chunking.chunk_overlap, chars_per_token=config.token_limits.chars_per_token, breakpoint_percentile=config.chunking.breakpoint_percentile, breakpoint_max_threshold=config.chunking.breakpoint_max_threshold, sentence_window_size=config.chunking.sentence_window_size, embedding_batch_size=config.chunking.embedding_batch_size, ) return ChunkingComponents( pg_repo=pg_repo, chunk_vector_repos=chunk_vector_repos, act_vector_repos=act_vector_repos, chunker=chunker, )