Skip to main content
Glama
manager.py13 kB
"""Knowledge base manager for RAG functionality.""" import time from typing import Any, Dict, List, Optional from ultimate_mcp_server.services.vector import VectorDatabaseService from ultimate_mcp_server.utils import get_logger logger = get_logger(__name__) class KnowledgeBaseManager: """ Manager for creating and maintaining knowledge bases for RAG applications. The KnowledgeBaseManager provides a high-level interface for working with vector databases as knowledge bases for Retrieval-Augmented Generation (RAG) systems. It abstracts the complexities of vector database operations, focusing on the domain-specific needs of knowledge management for AI applications. Key Features: - Knowledge base lifecycle management (create, delete, list, get) - Document ingestion with metadata support - Vector embedding management for semantic search - Document chunking and processing - Persistence and durability guarantees - Metadata tracking for knowledge base statistics Architecture: The manager sits between RAG applications and the underlying vector database, providing domain-specific operations while delegating storage and embedding to specialized services. It primarily interacts with: 1. Vector Database Service - for persistent storage of embeddings and documents 2. Embedding Service - for converting text to vector representations 3. Text Chunking Service - for breaking documents into optimal retrieval units Technical Characteristics: - Asynchronous API for high throughput in server environments - Thread-safe operations for concurrent access - Consistent error handling and logging - Idempotent operations where possible - Transactional guarantees for critical operations This service is typically accessed through the singleton get_knowledge_base_manager() function, which ensures a single instance is shared across the application. Example Usage: ```python # Get the manager kb_manager = get_knowledge_base_manager() # Create a new knowledge base await kb_manager.create_knowledge_base( name="company_policies", description="Corporate policy documents and guidelines" ) # Add documents with metadata await kb_manager.add_documents( knowledge_base_name="company_policies", documents=[ "All employees must complete annual security training.", "Remote work is available for eligible positions with manager approval." ], metadatas=[ {"source": "security_policy.pdf", "category": "security", "page": 12}, {"source": "hr_handbook.pdf", "category": "remote_work", "page": 45} ], chunk_size=500, chunk_method="semantic" ) # List available knowledge bases kb_list = await kb_manager.list_knowledge_bases() print(f"Found {kb_list['count']} knowledge bases") # Get details about a specific knowledge base kb_info = await kb_manager.get_knowledge_base("company_policies") doc_count = kb_info.get("metadata", {}).get("doc_count", 0) print(f"Knowledge base contains {doc_count} document chunks") ``` """ def __init__(self, vector_service: VectorDatabaseService): """Initialize the knowledge base manager. Args: vector_service: Vector database service for storing embeddings """ self.vector_service = vector_service logger.info("Knowledge base manager initialized", extra={"emoji_key": "success"}) async def create_knowledge_base( self, name: str, description: Optional[str] = None, embedding_model: Optional[str] = None, overwrite: bool = False ) -> Dict[str, Any]: """Create a new knowledge base. Args: name: Knowledge base name description: Optional description embedding_model: Optional embedding model name overwrite: Whether to overwrite existing knowledge base Returns: Knowledge base metadata """ # Check if knowledge base already exists collections = await self.vector_service.list_collections() if name in collections and not overwrite: logger.warning( f"Knowledge base '{name}' already exists", extra={"emoji_key": "warning"} ) return {"status": "exists", "name": name} # Create new collection for knowledge base metadata = { "type": "knowledge_base", "description": description or "", "created_at": time.time(), "doc_count": 0 } # Only add embedding_model if not None (to avoid ChromaDB errors) if embedding_model is not None: metadata["embedding_model"] = embedding_model logger.debug(f"Creating knowledge base with metadata: {metadata}") # Ensure any existing collection is deleted first if overwrite: try: # Force delete any existing collection await self.vector_service.delete_collection(name) logger.debug(f"Force deleted existing collection '{name}' for clean creation") # Add a small delay to ensure deletion completes import asyncio await asyncio.sleep(0.2) except Exception as e: logger.debug(f"Error during force deletion: {str(e)}") try: await self.vector_service.create_collection(name, metadata=metadata) logger.info( f"Created knowledge base '{name}'", extra={"emoji_key": "success"} ) return { "status": "created", "name": name, "metadata": metadata } except Exception as e: logger.error( f"Failed to create knowledge base '{name}': {str(e)}", extra={"emoji_key": "error"} ) raise ValueError(f"Failed to create knowledge base: {str(e)}") from e async def delete_knowledge_base(self, name: str) -> Dict[str, Any]: """Delete a knowledge base. Args: name: Knowledge base name Returns: Deletion status """ # Check if knowledge base exists collections = await self.vector_service.list_collections() if name not in collections: logger.warning( f"Knowledge base '{name}' not found", extra={"emoji_key": "warning"} ) return {"status": "not_found", "name": name} # Delete collection await self.vector_service.delete_collection(name) logger.info( f"Deleted knowledge base '{name}'", extra={"emoji_key": "success"} ) return { "status": "deleted", "name": name } async def list_knowledge_bases(self): """List all knowledge bases. Returns: List of knowledge bases with metadata """ collection_names = await self.vector_service.list_collections() kb_list = [] for name in collection_names: try: metadata = await self.vector_service.get_collection_metadata(name) # Only include collections that are knowledge bases if metadata and metadata.get("type") == "knowledge_base": # Create a simple dict with name and metadata kb = { "name": name, "metadata": metadata } kb_list.append(kb) except Exception as e: logger.error( f"Error getting metadata for collection '{name}': {str(e)}", extra={"emoji_key": "error"} ) return { "count": len(kb_list), "knowledge_bases": kb_list } async def get_knowledge_base(self, name: str) -> Dict[str, Any]: """Get knowledge base metadata. Args: name: Knowledge base name Returns: Knowledge base metadata """ # Check if knowledge base exists collections = await self.vector_service.list_collections() if name not in collections: logger.warning( f"Knowledge base '{name}' not found", extra={"emoji_key": "warning"} ) return {"status": "not_found", "name": name} # Get metadata metadata = await self.vector_service.get_collection_metadata(name) if metadata.get("type") != "knowledge_base": logger.warning( f"Collection '{name}' is not a knowledge base", extra={"emoji_key": "warning"} ) return {"status": "not_knowledge_base", "name": name} return { "status": "found", "name": name, "metadata": metadata } async def add_documents( self, knowledge_base_name: str, documents: List[str], metadatas: Optional[List[Dict[str, Any]]] = None, ids: Optional[List[str]] = None, embedding_model: Optional[str] = None, chunk_size: int = 1000, chunk_overlap: int = 200, chunk_method: str = "semantic" ) -> Dict[str, Any]: """Add documents to a knowledge base. Args: knowledge_base_name: Knowledge base name documents: List of document texts metadatas: Optional list of document metadata ids: Optional list of document IDs embedding_model: Optional embedding model name chunk_size: Chunk size for document processing chunk_overlap: Chunk overlap for document processing chunk_method: Chunking method (token, semantic, etc.) Returns: Document addition status """ logger.debug(f"DEBUG: Adding documents to knowledge base '{knowledge_base_name}'") logger.debug(f"DEBUG: Document count: {len(documents)}") logger.debug(f"DEBUG: First document sample: {documents[0][:100]}...") logger.debug(f"DEBUG: Metadatas: {metadatas[:2] if metadatas else None}") logger.debug(f"DEBUG: Chunk settings - size: {chunk_size}, overlap: {chunk_overlap}, method: {chunk_method}") # Check if knowledge base exists kb_info = await self.get_knowledge_base(knowledge_base_name) if kb_info["status"] != "found": logger.warning( f"Knowledge base '{knowledge_base_name}' not found", extra={"emoji_key": "warning"} ) return {"status": "not_found", "name": knowledge_base_name} try: # Add documents to vector store doc_ids = await self.vector_service.add_texts( collection_name=knowledge_base_name, texts=documents, metadatas=metadatas, ids=ids, embedding_model=embedding_model ) # Update document count in metadata current_metadata = await self.vector_service.get_collection_metadata(knowledge_base_name) doc_count = current_metadata.get("doc_count", 0) + len(documents) # Prepare metadata updates metadata_updates = {"doc_count": doc_count} # Store embedding model in metadata if provided (for consistent retrieval) if embedding_model: metadata_updates["embedding_model"] = embedding_model # Update metadata await self.vector_service.update_collection_metadata( name=knowledge_base_name, metadata=metadata_updates ) logger.info( f"Added {len(documents)} documents to knowledge base '{knowledge_base_name}'", extra={"emoji_key": "success"} ) return { "status": "success", "name": knowledge_base_name, "added_count": len(documents), "ids": doc_ids } except Exception as e: logger.error( f"Error adding documents to knowledge base '{knowledge_base_name}': {str(e)}", extra={"emoji_key": "error"} ) raise

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Kappasig920/Ultimate-MCP-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server