Skip to main content
Glama
retriever.py23.5 kB
"""Knowledge base retriever for RAG functionality.""" import time from typing import Any, Dict, List, Optional from ultimate_mcp_server.services.knowledge_base.feedback import get_rag_feedback_service from ultimate_mcp_server.services.knowledge_base.utils import build_metadata_filter from ultimate_mcp_server.services.vector import VectorDatabaseService from ultimate_mcp_server.utils import get_logger logger = get_logger(__name__) class KnowledgeBaseRetriever: """ Advanced retrieval engine for knowledge base collections in RAG applications. The KnowledgeBaseRetriever provides sophisticated search capabilities for finding the most relevant documents within knowledge bases. It offers multiple retrieval strategies optimized for different search scenarios, from pure semantic vector search to hybrid approaches combining vector and keyword matching. Key Features: - Multiple retrieval methods (vector, hybrid, keyword) - Metadata filtering for targeted searches - Content-based filtering for keyword matching - Configurable similarity thresholds and relevance scoring - Feedback mechanisms for continuous retrieval improvement - Performance monitoring and diagnostics - Advanced parameter tuning for specialized search needs Retrieval Methods: 1. Vector Search: Uses embeddings for semantic similarity matching - Best for finding conceptually related content - Handles paraphrasing and semantic equivalence - Computationally efficient for large collections 2. Hybrid Search: Combines vector and keyword matching with weighted scoring - Balances semantic understanding with exact term matching - Addresses vocabulary mismatch problems - Provides more robust retrieval across diverse query types 3. Keyword Filtering: Limits results to those containing specific text - Used for explicit term presence requirements - Can be combined with other search methods Architecture: The retriever operates as a higher-level service above the vector database, working in concert with: - Embedding services for query vectorization - Vector database services for efficient similarity search - Feedback services for result quality improvement - Metadata filters for context-aware retrieval Usage in RAG Applications: This retriever is a critical component in RAG pipelines, responsible for the quality and relevance of context provided to LLMs. Tuning retrieval parameters significantly impacts the quality of generated responses. Example Usage: ```python # Get retriever instance retriever = get_knowledge_base_retriever() # Simple vector search results = await retriever.retrieve( knowledge_base_name="company_policies", query="What is our remote work policy?", top_k=3, min_score=0.7 ) # Hybrid search with metadata filtering dept_results = await retriever.retrieve_hybrid( knowledge_base_name="company_policies", query="security requirements for customer data", top_k=5, vector_weight=0.6, keyword_weight=0.4, metadata_filter={"department": "security", "status": "active"} ) # Process and use the retrieved documents for item in results["results"]: print(f"Document (score: {item['score']:.2f}): {item['document'][:100]}...") print(f"Source: {item['metadata'].get('source', 'unknown')}") # Record which documents were actually useful await retriever.record_feedback( knowledge_base_name="company_policies", query="What is our remote work policy?", retrieved_documents=results["results"], used_document_ids=["doc123", "doc456"] ) ``` """ def __init__(self, vector_service: VectorDatabaseService): """Initialize the knowledge base retriever. Args: vector_service: Vector database service for retrieving embeddings """ self.vector_service = vector_service self.feedback_service = get_rag_feedback_service() # Get embedding service for generating query embeddings from ultimate_mcp_server.services.vector.embeddings import get_embedding_service self.embedding_service = get_embedding_service() logger.info("Knowledge base retriever initialized", extra={"emoji_key": "success"}) async def _validate_knowledge_base(self, name: str) -> Dict[str, Any]: """Validate that a knowledge base exists. Args: name: Knowledge base name Returns: Validation result """ # Check if knowledge base exists collections = await self.vector_service.list_collections() if name not in collections: logger.warning( f"Knowledge base '{name}' not found", extra={"emoji_key": "warning"} ) return {"status": "not_found", "name": name} # Get metadata metadata = await self.vector_service.get_collection_metadata(name) if metadata.get("type") != "knowledge_base": logger.warning( f"Collection '{name}' is not a knowledge base", extra={"emoji_key": "warning"} ) return {"status": "not_knowledge_base", "name": name} return { "status": "valid", "name": name, "metadata": metadata } async def retrieve( self, knowledge_base_name: str, query: str, top_k: int = 5, min_score: float = 0.6, metadata_filter: Optional[Dict[str, Any]] = None, content_filter: Optional[str] = None, embedding_model: Optional[str] = None, apply_feedback: bool = True, search_params: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """Retrieve documents from a knowledge base using vector search. Args: knowledge_base_name: Knowledge base name query: Query text top_k: Number of results to return min_score: Minimum similarity score metadata_filter: Optional metadata filter (field->value or field->{op:value}) content_filter: Text to search for in documents embedding_model: Optional embedding model name apply_feedback: Whether to apply feedback adjustments search_params: Optional ChromaDB search parameters Returns: Retrieved documents with metadata """ start_time = time.time() # Validate knowledge base kb_info = await self._validate_knowledge_base(knowledge_base_name) if kb_info["status"] != "valid": logger.warning( f"Knowledge base '{knowledge_base_name}' not found or invalid", extra={"emoji_key": "warning"} ) return { "status": "error", "message": f"Knowledge base '{knowledge_base_name}' not found or invalid" } logger.debug(f"DEBUG: Knowledge base validated - metadata: {kb_info['metadata']}") # Use the same embedding model that was used to create the knowledge base if not embedding_model and kb_info["metadata"].get("embedding_model"): embedding_model = kb_info["metadata"]["embedding_model"] logger.debug(f"Using embedding model from knowledge base metadata: {embedding_model}") # If embedding model is specified, ensure it's saved in the metadata for future use if embedding_model and not kb_info["metadata"].get("embedding_model"): try: await self.vector_service.update_collection_metadata( name=knowledge_base_name, metadata={ **kb_info["metadata"], "embedding_model": embedding_model } ) logger.debug(f"Updated knowledge base metadata with embedding model: {embedding_model}") except Exception as e: logger.warning(f"Failed to update knowledge base metadata with embedding model: {str(e)}") # Get or create ChromaDB collection collection = await self.vector_service.get_collection(knowledge_base_name) logger.debug(f"DEBUG: Retrieved collection type: {type(collection)}") # Set search parameters if provided if search_params: await self.vector_service.update_collection_metadata( collection_name=knowledge_base_name, metadata={ **kb_info["metadata"], **{f"hnsw:{k}": v for k, v in search_params.items()} } ) # Create includes parameter includes = ["documents", "metadatas", "distances"] # Create where_document parameter for content filtering where_document = {"$contains": content_filter} if content_filter else None # Convert metadata filter format if provided chroma_filter = build_metadata_filter(metadata_filter) if metadata_filter else None logger.debug(f"DEBUG: Search parameters - top_k: {top_k}, min_score: {min_score}, filter: {chroma_filter}, where_document: {where_document}") try: # Generate embedding directly with our embedding service # Call create_embeddings with a list and get the first result query_embeddings = await self.embedding_service.create_embeddings( texts=[query], # model=embedding_model # Model is set during service init ) if not query_embeddings: logger.error(f"Failed to generate embedding for query: {query}") return { "status": "error", "message": "Failed to generate query embedding" } query_embedding = query_embeddings[0] logger.debug(f"Generated query embedding with model: {self.embedding_service.model_name}, dimension: {len(query_embedding)}") # Use correct query method based on collection type if hasattr(collection, 'query') and not hasattr(collection, 'search_by_text'): # ChromaDB collection logger.debug("Using ChromaDB direct query with embeddings") try: search_results = collection.query( query_embeddings=[query_embedding], # Use our embedding directly n_results=top_k * 2, where=chroma_filter, where_document=where_document, include=includes ) except Exception as e: logger.error(f"ChromaDB query error: {str(e)}") raise else: # Our custom VectorCollection logger.debug("Using VectorCollection search method") search_results = await collection.query( query_texts=[query], n_results=top_k * 2, where=chroma_filter, where_document=where_document, include=includes, embedding_model=embedding_model ) # Debug raw results logger.debug(f"DEBUG: Raw search results - keys: {search_results.keys()}") logger.debug(f"DEBUG: Documents count: {len(search_results.get('documents', [[]])[0])}") logger.debug(f"DEBUG: IDs: {search_results.get('ids', [[]])[0]}") logger.debug(f"DEBUG: Distances: {search_results.get('distances', [[]])[0]}") # Process results results = [] for i, doc in enumerate(search_results["documents"][0]): # Convert distance to similarity score (1 = exact match, 0 = completely different) # Most distance metrics return 0 for exact match, so we use 1 - distance # This works for cosine, l2, etc. similarity = 1.0 - float(search_results["distances"][0][i]) # Debug each document logger.debug(f"DEBUG: Document {i} - ID: {search_results['ids'][0][i]}") logger.debug(f"DEBUG: Similarity: {similarity} (min required: {min_score})") logger.debug(f"DEBUG: Document content (first 100 chars): {doc[:100] if doc else 'Empty'}") if search_results["metadatas"] and i < len(search_results["metadatas"][0]): metadata = search_results["metadatas"][0][i] logger.debug(f"DEBUG: Metadata: {metadata}") # Skip results below minimum score if similarity < min_score: logger.debug(f"DEBUG: Skipping document {i} due to low similarity: {similarity} < {min_score}") continue results.append({ "id": search_results["ids"][0][i], "document": doc, "metadata": search_results["metadatas"][0][i] if search_results["metadatas"] else {}, "score": similarity }) logger.debug(f"DEBUG: After filtering, {len(results)} documents remain.") # Apply feedback adjustments if requested if apply_feedback: results = await self.feedback_service.apply_feedback_adjustments( knowledge_base_name=knowledge_base_name, results=results, query=query ) # Limit to top_k results = results[:top_k] # Track retrieval time retrieval_time = time.time() - start_time logger.info( f"Retrieved {len(results)} documents from '{knowledge_base_name}' in {retrieval_time:.2f}s", extra={"emoji_key": "success"} ) return { "status": "success", "query": query, "results": results, "count": len(results), "retrieval_time": retrieval_time } except Exception as e: logger.error( f"Error retrieving from knowledge base '{knowledge_base_name}': {str(e)}", extra={"emoji_key": "error"} ) return { "status": "error", "message": str(e) } async def retrieve_hybrid( self, knowledge_base_name: str, query: str, top_k: int = 5, vector_weight: float = 0.7, keyword_weight: float = 0.3, min_score: float = 0.6, metadata_filter: Optional[Dict[str, Any]] = None, additional_keywords: Optional[List[str]] = None, apply_feedback: bool = True, search_params: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """Retrieve documents using hybrid search. Args: knowledge_base_name: Knowledge base name query: Query text top_k: Number of documents to retrieve vector_weight: Weight for vector search component keyword_weight: Weight for keyword search component min_score: Minimum similarity score metadata_filter: Optional metadata filter additional_keywords: Additional keywords to include in search apply_feedback: Whether to apply feedback adjustments search_params: Optional ChromaDB search parameters Returns: Retrieved documents with metadata """ start_time = time.time() # Validate knowledge base kb_info = await self._validate_knowledge_base(knowledge_base_name) if kb_info["status"] != "valid": logger.warning( f"Knowledge base '{knowledge_base_name}' not found or invalid", extra={"emoji_key": "warning"} ) return { "status": "error", "message": f"Knowledge base '{knowledge_base_name}' not found or invalid" } # Get or create ChromaDB collection collection = await self.vector_service.get_collection(knowledge_base_name) # Set search parameters if provided if search_params: await self.vector_service.update_collection_metadata( collection_name=knowledge_base_name, metadata={ **kb_info["metadata"], **{f"hnsw:{k}": v for k, v in search_params.items()} } ) # Convert metadata filter format if provided chroma_filter = build_metadata_filter(metadata_filter) if metadata_filter else None # Create content filter based on query and additional keywords content_text = query if additional_keywords: content_text = f"{query} {' '.join(additional_keywords)}" # Use ChromaDB's hybrid search by providing both query text and content filter try: # Vector search results with content filter search_results = await collection.query( query_texts=[query], n_results=top_k * 3, # Get more results for combining where=chroma_filter, where_document={"$contains": content_text} if content_text else None, include=["documents", "metadatas", "distances"], embedding_model=None # Use default embedding model ) # Process results combined_results = {} # Process vector search results for i, doc in enumerate(search_results["documents"][0]): doc_id = search_results["ids"][0][i] vector_score = 1.0 - float(search_results["distances"][0][i]) combined_results[doc_id] = { "id": doc_id, "document": doc, "metadata": search_results["metadatas"][0][i] if search_results["metadatas"] else {}, "vector_score": vector_score, "keyword_score": 0.0, "score": vector_score * vector_weight } # Now do a keyword-only search if we have keywords component if keyword_weight > 0: keyword_results = await collection.query( query_texts=None, # No vector query n_results=top_k * 3, where=chroma_filter, where_document={"$contains": content_text}, include=["documents", "metadatas"], embedding_model=None # No embedding model needed for keyword-only search ) # Process keyword results for i, doc in enumerate(keyword_results["documents"][0]): doc_id = keyword_results["ids"][0][i] # Approximate keyword score based on position (best = 1.0) keyword_score = 1.0 - (i / len(keyword_results["documents"][0])) if doc_id in combined_results: # Update existing result combined_results[doc_id]["keyword_score"] = keyword_score combined_results[doc_id]["score"] += keyword_score * keyword_weight else: # Add new result combined_results[doc_id] = { "id": doc_id, "document": doc, "metadata": keyword_results["metadatas"][0][i] if keyword_results["metadatas"] else {}, "vector_score": 0.0, "keyword_score": keyword_score, "score": keyword_score * keyword_weight } # Convert to list and filter by min_score results = [r for r in combined_results.values() if r["score"] >= min_score] # Apply feedback adjustments if requested if apply_feedback: results = await self.feedback_service.apply_feedback_adjustments( knowledge_base_name=knowledge_base_name, results=results, query=query ) # Sort by score and limit to top_k results.sort(key=lambda x: x["score"], reverse=True) results = results[:top_k] # Track retrieval time retrieval_time = time.time() - start_time logger.info( f"Hybrid search retrieved {len(results)} documents from '{knowledge_base_name}' in {retrieval_time:.2f}s", extra={"emoji_key": "success"} ) return { "status": "success", "query": query, "results": results, "count": len(results), "retrieval_time": retrieval_time } except Exception as e: logger.error( f"Error performing hybrid search on '{knowledge_base_name}': {str(e)}", extra={"emoji_key": "error"} ) return { "status": "error", "message": str(e) } async def record_feedback( self, knowledge_base_name: str, query: str, retrieved_documents: List[Dict[str, Any]], used_document_ids: Optional[List[str]] = None, explicit_feedback: Optional[Dict[str, str]] = None ) -> Dict[str, Any]: """Record feedback for retrieval results. Args: knowledge_base_name: Knowledge base name query: Query text retrieved_documents: List of retrieved documents used_document_ids: List of document IDs used in the response explicit_feedback: Explicit feedback for documents Returns: Feedback recording result """ # Convert list to set if provided used_ids_set = set(used_document_ids) if used_document_ids else None # Record feedback result = await self.feedback_service.record_retrieval_feedback( knowledge_base_name=knowledge_base_name, query=query, retrieved_documents=retrieved_documents, used_document_ids=used_ids_set, explicit_feedback=explicit_feedback ) return result

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Kappasig920/Ultimate-MCP-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server