Source code for chunking_worker.bootstrap
"""
Bootstrap for chunking worker
"""
import logging
from dataclasses import dataclass
from typing import Any
from lalandre_chunking import get_chunker
from lalandre_core.config import get_config
from lalandre_core.embedding_presets import list_embedding_presets
from lalandre_core.utils.api_key_pool import APIKeyPool
from lalandre_db_postgres import PostgresRepository
from lalandre_db_qdrant.repository import QdrantRepository
from lalandre_embedding import EmbeddingService
logger = logging.getLogger(__name__)
[docs]
@dataclass
class ChunkingComponents:
"""Long-lived dependencies used by the chunking worker."""
pg_repo: PostgresRepository
chunk_vector_repos: dict[str, QdrantRepository]
act_vector_repos: dict[str, QdrantRepository]
chunker: Any
[docs]
def init_components() -> ChunkingComponents:
"""Initialize chunking worker components"""
config = get_config()
# Initialize repositories
pg_repo = PostgresRepository(config.database.connection_string)
base_chunk_collection_name = config.vector.collection_chunks
base_act_collection_name = config.vector.collection_acts
if not base_chunk_collection_name:
raise ValueError("vector.collection_chunks must be set in app config")
if not base_act_collection_name:
raise ValueError("vector.collection_acts must be set in app config")
chunk_vector_repos: dict[str, QdrantRepository] = {}
act_vector_repos: dict[str, QdrantRepository] = {}
for preset in list_embedding_presets():
chunk_collection_name = QdrantRepository.make_collection_name(
base_chunk_collection_name,
preset.model_name,
preset.vector_size,
)
act_collection_name = QdrantRepository.make_collection_name(
base_act_collection_name,
preset.model_name,
preset.vector_size,
)
chunk_vector_repos[preset.preset_id] = QdrantRepository(
collection_name=chunk_collection_name,
vector_size=preset.vector_size,
)
act_vector_repos[preset.preset_id] = QdrantRepository(
collection_name=act_collection_name,
vector_size=preset.vector_size,
)
# Workers use keys 6-10 to avoid rate-limit contention with RAG (keys 1-5)
try:
worker_key_pool = APIKeyPool.from_env("MISTRAL_API_KEY", start_index=6)
except ValueError:
worker_key_pool = None
# Use the service wrapper so SAC chunking inherits token guards and
# adaptive splitting for oversized sentence windows.
logger.info("Initializing embedding service for SAC chunking")
embedding_provider = EmbeddingService(
provider=config.chunking.embedding.provider,
model_name=config.chunking.embedding.model_name,
device=config.chunking.embedding.device,
key_pool=worker_key_pool,
)
max_chunk_size = config.chunking.resolve_max_chunk_size(config.token_limits)
chunker = get_chunker(
embedding_provider=embedding_provider,
min_chunk_size=config.chunking.min_chunk_size,
max_chunk_size=max_chunk_size,
chunk_overlap=config.chunking.chunk_overlap,
chars_per_token=config.token_limits.chars_per_token,
breakpoint_percentile=config.chunking.breakpoint_percentile,
breakpoint_max_threshold=config.chunking.breakpoint_max_threshold,
sentence_window_size=config.chunking.sentence_window_size,
embedding_batch_size=config.chunking.embedding_batch_size,
)
return ChunkingComponents(
pg_repo=pg_repo,
chunk_vector_repos=chunk_vector_repos,
act_vector_repos=act_vector_repos,
chunker=chunker,
)