Skip to main content
Glama
cbcoutinho

Nextcloud MCP Server

by cbcoutinho
bm25_hybrid.py11.3 kB
"""BM25 hybrid search algorithm using Qdrant native RRF fusion.""" import logging from typing import Any from qdrant_client import models from qdrant_client.models import FieldCondition, Filter, MatchValue from nextcloud_mcp_server.config import get_settings from nextcloud_mcp_server.embedding import get_bm25_service, get_embedding_service from nextcloud_mcp_server.observability.metrics import record_qdrant_operation from nextcloud_mcp_server.observability.tracing import trace_operation from nextcloud_mcp_server.search.algorithms import SearchAlgorithm, SearchResult from nextcloud_mcp_server.vector.placeholder import get_placeholder_filter from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client logger = logging.getLogger(__name__) class BM25HybridSearchAlgorithm(SearchAlgorithm): """ Hybrid search combining dense semantic vectors with BM25 sparse vectors. Uses Qdrant's native Reciprocal Rank Fusion (RRF) to automatically merge results from both dense (semantic) and sparse (BM25 keyword) searches. This provides the best of both worlds: semantic understanding for conceptual queries and precise keyword matching for specific terms, acronyms, and codes. The fusion happens efficiently in the database using the prefetch mechanism, eliminating the need for application-layer result merging. """ def __init__(self, score_threshold: float = 0.0, fusion: str = "rrf"): """ Initialize BM25 hybrid search algorithm. Args: score_threshold: Minimum fusion score (0-1, default: 0.0 to allow fusion scoring) Note: Both RRF and DBSF produce normalized scores fusion: Fusion algorithm to use: "rrf" (Reciprocal Rank Fusion, default) or "dbsf" (Distribution-Based Score Fusion) Raises: ValueError: If fusion is not "rrf" or "dbsf" """ if fusion not in ("rrf", "dbsf"): raise ValueError( f"Invalid fusion algorithm '{fusion}'. Must be 'rrf' or 'dbsf'" ) self.score_threshold = score_threshold self.fusion = models.Fusion.RRF if fusion == "rrf" else models.Fusion.DBSF self.fusion_name = fusion @property def name(self) -> str: return "bm25_hybrid" @property def requires_vector_db(self) -> bool: return True async def search( self, query: str, user_id: str, limit: int = 10, doc_type: str | None = None, **kwargs: Any, ) -> list[SearchResult]: """ Execute hybrid search using dense + sparse vectors with native RRF fusion. Returns unverified results from Qdrant. Access verification should be performed separately at the final output stage using verify_search_results(). Deduplicates by (doc_id, doc_type, chunk_start_offset, chunk_end_offset) to show multiple chunks from the same document while avoiding duplicate chunks. Args: query: Natural language or keyword search query user_id: User ID for filtering limit: Maximum results to return doc_type: Optional document type filter **kwargs: Additional parameters (score_threshold override) Returns: List of unverified SearchResult objects ranked by RRF fusion score Raises: McpError: If vector sync is not enabled or search fails """ settings = get_settings() score_threshold = kwargs.get("score_threshold", self.score_threshold) logger.info( f"BM25 hybrid search: query='{query}', user={user_id}, " f"limit={limit}, score_threshold={score_threshold}, doc_type={doc_type}, " f"fusion={self.fusion_name}" ) # Generate dense embedding for semantic search with trace_operation("search.get_embedding_service"): embedding_service = get_embedding_service() with trace_operation("search.dense_embedding"): dense_embedding = await embedding_service.embed(query) # Store for reuse by callers (e.g., viz_routes PCA visualization) self.query_embedding = dense_embedding logger.debug(f"Generated dense embedding (dimension={len(dense_embedding)})") # Generate sparse embedding for BM25 keyword search with trace_operation("search.get_bm25_service"): bm25_service = get_bm25_service() with trace_operation("search.sparse_embedding_bm25"): sparse_embedding = await bm25_service.encode_async(query) logger.debug( f"Generated sparse embedding " f"({len(sparse_embedding['indices'])} non-zero terms)" ) # Build Qdrant filter filter_conditions = [ get_placeholder_filter(), # Always exclude placeholders from user-facing queries FieldCondition( key="user_id", match=MatchValue(value=user_id), ), ] # Add doc_type filter if specified if doc_type: filter_conditions.append( FieldCondition( key="doc_type", match=MatchValue(value=doc_type), ) ) query_filter = Filter(must=filter_conditions) # Execute hybrid search with Qdrant native RRF fusion with trace_operation("search.get_qdrant_client"): qdrant_client = await get_qdrant_client() try: # Use prefetch to run both dense and sparse searches # Qdrant will automatically merge results using RRF with trace_operation( "search.qdrant_query", attributes={"query.limit": limit * 2, "query.fusion": self.fusion_name}, ): search_response = await qdrant_client.query_points( collection_name=settings.get_collection_name(), prefetch=[ # Dense semantic search models.Prefetch( query=dense_embedding, using="dense", limit=limit * 2, # Get extra for deduplication filter=query_filter, ), # Sparse BM25 search models.Prefetch( query=models.SparseVector( indices=sparse_embedding["indices"], values=sparse_embedding["values"], ), using="sparse", limit=limit * 2, # Get extra for deduplication filter=query_filter, ), ], # Fusion query (RRF or DBSF based on initialization) query=models.FusionQuery(fusion=self.fusion), limit=limit * 2, # Get extra for deduplication score_threshold=score_threshold, with_payload=True, with_vectors=False, # Don't return vectors to save bandwidth ) record_qdrant_operation("search", "success") except Exception: record_qdrant_operation("search", "error") raise logger.info( f"Qdrant {self.fusion_name.upper()} fusion returned {len(search_response.points)} results " f"(before deduplication)" ) if search_response.points: # Log top 3 fusion scores to help with threshold tuning top_scores = [p.score for p in search_response.points[:3]] logger.debug( f"Top 3 {self.fusion_name.upper()} fusion scores: {top_scores}" ) # Deduplicate by (doc_id, doc_type, chunk_start, chunk_end) # This allows multiple chunks from same doc, but removes duplicate chunks with trace_operation( "search.deduplicate", attributes={"dedupe.num_points": len(search_response.points)}, ): seen_chunks = set() results = [] for result in search_response.points: if result.payload is None: continue # doc_id can be int (notes) or str (files - file paths) doc_id = result.payload["doc_id"] doc_type = result.payload.get("doc_type", "note") chunk_start = result.payload.get("chunk_start_offset") chunk_end = result.payload.get("chunk_end_offset") chunk_key = (doc_id, doc_type, chunk_start, chunk_end) # Skip if we've already seen this exact chunk if chunk_key in seen_chunks: continue seen_chunks.add(chunk_key) # Build metadata dict with common fields metadata = { "chunk_index": result.payload.get("chunk_index"), "total_chunks": result.payload.get("total_chunks"), "search_method": f"bm25_hybrid_{self.fusion_name}", } # Add file-specific metadata for PDF viewer if doc_type == "file" and (path := result.payload.get("file_path")): metadata["path"] = path # Add deck_card-specific metadata for frontend URL construction if doc_type == "deck_card": if board_id := result.payload.get("board_id"): metadata["board_id"] = board_id # Return unverified results (verification happens at output stage) results.append( SearchResult( id=doc_id, doc_type=doc_type, title=result.payload.get("title", "Untitled"), excerpt=result.payload.get("excerpt", ""), score=result.score, # Fusion score (RRF or DBSF) metadata=metadata, chunk_start_offset=result.payload.get("chunk_start_offset"), chunk_end_offset=result.payload.get("chunk_end_offset"), page_number=result.payload.get("page_number"), page_count=result.payload.get("page_count"), chunk_index=result.payload.get("chunk_index", 0), total_chunks=result.payload.get("total_chunks", 1), point_id=str(result.id), # Qdrant point ID for batch retrieval ) ) if len(results) >= limit: break logger.info(f"Returning {len(results)} unverified results after deduplication") if results: result_details = [ f"{r.doc_type}_{r.id} (score={r.score:.3f}, title='{r.title}')" for r in results[:5] # Show top 5 ] logger.debug(f"Top results: {', '.join(result_details)}") return results

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/cbcoutinho/nextcloud-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server