Source code for lalandre_rag.retrieval.semantic_search

"""
Semantic Search Service
Vector-based search using Qdrant embeddings
"""

import logging
import math
from typing import Any, Dict, List, Optional

from lalandre_core.config import get_config
from lalandre_db_qdrant import QdrantRepository

from .result import RetrievalResult

logger = logging.getLogger(__name__)


[docs] class SemanticSearchService: """ Semantic search using Qdrant vector database Uses embedding-based similarity search to find semantically related documents regardless of exact keyword matches. Responsibilities: - Execute vector search via Qdrant - Support multiple collections (chunks, acts) - Apply metadata filters - Convert Qdrant results to RetrievalResult format Does NOT: - Generate embeddings (uses EmbeddingService) - Fuse with lexical results (handled by FusionService) - Execute lexical search """ def __init__( self, qdrant_repos: Dict[str, QdrantRepository], score_threshold: Optional[float] = None, hnsw_ef: Optional[int] = None, exact_search: Optional[bool] = None, per_collection_oversampling: Optional[float] = None, ): """ Initialize semantic search service Args: qdrant_repos: Dictionary of collection_name -> QdrantRepository Expected keys typically include 'chunks' and 'acts' score_threshold: Minimum similarity score (0-1, None = no filtering) """ self.qdrant_repos = qdrant_repos self.score_threshold = score_threshold config = get_config() self.default_limit = config.search.default_limit resolved_hnsw_ef = hnsw_ef if hnsw_ef is not None else config.search.hnsw_ef self.hnsw_ef = max(int(resolved_hnsw_ef), 1) if resolved_hnsw_ef is not None else None self.exact_search = bool(exact_search) if exact_search is not None else config.search.exact_search resolved_oversampling = ( per_collection_oversampling if per_collection_oversampling is not None else config.search.semantic_per_collection_oversampling ) self.per_collection_oversampling = max(float(resolved_oversampling), 1.0)
[docs] def search( self, query_vector: List[float], top_k: Optional[int] = None, filters: Optional[Dict[str, Any]] = None, collections: Optional[List[str]] = None, score_threshold: Optional[float] = None, hnsw_ef: Optional[int] = None, exact_search: Optional[bool] = None, ) -> list["RetrievalResult"]: """ Execute semantic search in one or multiple collections Args: query_vector: Query embedding vector top_k: Number of results (default: config.search.default_limit) filters: Metadata filters (e.g., {"celex": "32016R0679"}) collections: Collections to search (default: ["chunks"]) score_threshold: Override instance score threshold Returns: List of RetrievalResult objects sorted by similarity score """ if top_k is None: top_k = self.default_limit if collections is None: collections = ["chunks"] threshold = score_threshold if score_threshold is not None else self.score_threshold resolved_hnsw_ef = hnsw_ef if hnsw_ef is not None else self.hnsw_ef resolved_exact_search = exact_search if exact_search is not None else self.exact_search logger.debug( f"Semantic search: collections={collections}, top_k={top_k}, " f"threshold={threshold}, hnsw_ef={resolved_hnsw_ef}, exact={resolved_exact_search}" ) if len(collections) == 1: # Single collection search return self._search_single( query_vector=query_vector, collection=collections[0], top_k=top_k, filters=filters, score_threshold=threshold, hnsw_ef=resolved_hnsw_ef, exact_search=resolved_exact_search, ) else: # Multi-collection search return self._search_multi( query_vector=query_vector, collections=collections, top_k=top_k, filters=filters, score_threshold=threshold, hnsw_ef=resolved_hnsw_ef, exact_search=resolved_exact_search, )
def _search_single( self, query_vector: List[float], collection: str, top_k: int, filters: Optional[Dict[str, Any]], score_threshold: Optional[float], hnsw_ef: Optional[int], exact_search: bool, ) -> list["RetrievalResult"]: """ Search in a single Qdrant collection Args: query_vector: Query embedding collection: Collection name ('chunks', 'acts', or preset-specific variant) top_k: Number of results filters: Metadata filters (simple dict, will be converted by QdrantRepository) score_threshold: Minimum score Returns: List of RetrievalResult objects """ if collection not in self.qdrant_repos: logger.warning(f"Collection '{collection}' not found in repositories") return [] qdrant_repo = self.qdrant_repos[collection] effective_filters: Optional[Dict[str, Any]] = filters if collection.startswith("chunks"): effective_filters = dict(filters or {}) effective_filters.setdefault("retrieval_enabled", True) # Pass filters directly to repository - it handles Qdrant filter construction # No need to build Qdrant filter here - QdrantRepository._build_filter() does it # Execute search search_results = qdrant_repo.search( query_vector=query_vector, limit=top_k, query_filter=effective_filters, score_threshold=score_threshold, hnsw_ef=hnsw_ef, exact=exact_search, ) # Convert to RetrievalResult results = self._convert_to_retrieval_results(search_results, collection) logger.info(f"Semantic search in '{collection}': {len(results)} results") return results def _search_multi( self, query_vector: List[float], collections: List[str], top_k: int, filters: Optional[Dict[str, Any]], score_threshold: Optional[float], hnsw_ef: Optional[int], exact_search: bool, ) -> list["RetrievalResult"]: """ Search across multiple Qdrant collections Args: query_vector: Query embedding collections: List of collection names top_k: Total number of results filters: Metadata filters score_threshold: Minimum score Returns: List of RetrievalResult objects from all collections, merged and sorted """ valid_collections: List[str] = [] for collection in collections: if collection in self.qdrant_repos: valid_collections.append(collection) else: logger.warning(f"Skipping unknown collection: {collection}") if not valid_collections: return [] per_collection_limit = max(1, math.ceil(top_k / len(valid_collections))) per_collection_limit = max(1, math.ceil(per_collection_limit * self.per_collection_oversampling)) logger.debug( "Semantic multi-collection budget: total_top_k=%s collections=%s per_collection_limit=%s", top_k, len(valid_collections), per_collection_limit, ) all_results: list["RetrievalResult"] = [] for collection in valid_collections: results = self._search_single( query_vector=query_vector, collection=collection, top_k=per_collection_limit, filters=filters, score_threshold=score_threshold, hnsw_ef=hnsw_ef, exact_search=exact_search, ) all_results.extend(results) # Deduplicate by subdivision_id, keeping the highest score seen: dict[int, int] = {} # subdivision_id → index in deduped deduped: list["RetrievalResult"] = [] all_results.sort(key=lambda x: x.score, reverse=True) for result in all_results: key = result.subdivision_id if key and key in seen: continue if key: seen[key] = len(deduped) deduped.append(result) return deduped[:top_k] def _convert_to_retrieval_results(self, search_results: List[Any], collection: str) -> list["RetrievalResult"]: """ Convert Qdrant SearchResult objects to RetrievalResult Args: search_results: List of Qdrant SearchResult objects collection: Source collection name Returns: List of RetrievalResult objects """ results: List[RetrievalResult] = [] def _to_int(value: Any, default: int = 0) -> int: try: return int(value) except (TypeError, ValueError): return default for search_result in search_results: # SearchResult has .payload and .score attributes payload = dict(search_result.payload or {}) score = search_result.score content = str(payload.get("content", "") or "") payload.pop("content", None) results.append( RetrievalResult( content=content, score=float(score), subdivision_id=_to_int(payload.get("subdivision_id"), 0), act_id=_to_int(payload.get("act_id"), 0), celex=str(payload.get("celex")) if payload.get("celex") is not None else None, subdivision_type=str(payload.get("subdivision_type", "") or ""), sequence_order=_to_int(payload.get("sequence_order"), 0), metadata={ **payload, "search_method": "semantic", "collection": collection, "source_collection": collection, }, ) ) return results
[docs] def get_statistics(self) -> Dict[str, Any]: """ Get semantic search statistics Returns: Dictionary with configuration and collection info """ collections_info = {} for name, repo in self.qdrant_repos.items(): try: if repo.collection_exists(): collections_info[name] = {"exists": True, "vector_size": repo.vector_size} else: collections_info[name] = {"exists": False} except Exception as e: collections_info[name] = {"error": str(e)} return { "collections": collections_info, "score_threshold": self.score_threshold, "default_limit": self.default_limit, "search_method": "Qdrant vector similarity", }