Skip to main content
Glama
knowledge_base_manager.py22.2 kB
# cmcp/managers/knowledge_base_manager.py """ A refactored version of the knowledge base manager for CMCP. This version delegates all search-related functionality to the new SearchService, resulting in a cleaner, more maintainable, and focused class. """ import os from typing import Dict, List, Any, Optional from datetime import datetime, timezone from cmcp.kb.document_store import DocumentStore from cmcp.kb.models import DocumentIndex, ImplicitRDFTriple from cmcp.kb.path import PathComponents, PartialPathComponents from cmcp.utils.logging import get_logger from cmcp.config import AppConfig # New, clean imports from the structured search module from cmcp.kb.search import ( SearchService, SearchIndexManager, SearchIndexRecovery, SparseSearchIndex, GraphSearchIndex, Reranker ) logger = get_logger(__name__) class KnowledgeBaseManager: """ Manages the knowledge base, focusing on document and metadata operations. All search functionality is delegated to the SearchService. """ def __init__(self, storage_path: str, timeout_default: int, timeout_max: int, search_enabled: bool = True, sparse_index_path: Optional[str] = None, graph_index_path: Optional[str] = None, reranker_model: Optional[str] = None, search_relation_predicates: Optional[List[str]] = None, search_graph_neighbor_limit: int = 1000): """Initialize the refactored knowledge base manager.""" self.storage_path = storage_path self.timeout_default = timeout_default self.timeout_max = timeout_max self.document_store: Optional[DocumentStore] = None self.logger = logger # Search configuration is now passed down to the SearchService self.search_enabled = search_enabled self.sparse_index_path = sparse_index_path or os.path.join(storage_path, "search/sparse_idx") self.graph_index_path = graph_index_path or os.path.join(storage_path, "search/graph_idx") self.reranker_model = reranker_model or "mixedbread-ai/mxbai-rerank-base-v1" self.search_relation_predicates = search_relation_predicates or ["references"] self.search_graph_neighbor_limit = search_graph_neighbor_limit # The single point of contact for all search operations self.search_service: Optional[SearchService] = None logger.debug("KnowledgeBaseManager initialized.") @classmethod def from_env(cls, config: Optional[AppConfig] = None) -> 'KnowledgeBaseManager': """Create a KnowledgeBaseManager instance from configuration.""" if config is None: from cmcp.config import load_config config = load_config() return cls( storage_path=config.kb_config.storage_path, timeout_default=config.kb_config.timeout_default, timeout_max=config.kb_config.timeout_max, search_enabled=config.kb_config.search_enabled, sparse_index_path=config.kb_config.sparse_index_path, graph_index_path=config.kb_config.graph_index_path, reranker_model=config.kb_config.reranker_model, search_relation_predicates=config.kb_config.search_relation_predicates, search_graph_neighbor_limit=config.kb_config.search_graph_neighbor_limit ) def check_initialized(self) -> None: """Check if the knowledge base manager is initialized.""" if self.document_store is None: raise RuntimeError("KnowledgeBaseManager not initialized. Call initialize() first.") async def initialize(self) -> None: """Initialize the document store and the search service.""" if self.document_store is None: os.makedirs(self.storage_path, exist_ok=True) self.document_store = DocumentStore(self.storage_path) self.logger.info(f"Initialized knowledge base at: {self.storage_path}") if self.search_enabled: try: self.logger.info("Initializing search service and its components...") os.makedirs(self.sparse_index_path, exist_ok=True) os.makedirs(self.graph_index_path, exist_ok=True) # 1. Low-level components sparse_index = SparseSearchIndex(self.sparse_index_path) graph_index = GraphSearchIndex(self.graph_index_path) reranker = Reranker(self.reranker_model) # 2. Synchronous manager search_manager = SearchIndexManager(sparse_index, graph_index, reranker) # 3. Recovery utility search_recovery = SearchIndexRecovery(search_manager, self.document_store) # 4. Main async service, passing down config self.search_service = SearchService( search_index_manager=search_manager, search_recovery=search_recovery, document_store=self.document_store, search_relation_predicates=self.search_relation_predicates, search_graph_neighbor_limit=self.search_graph_neighbor_limit ) self.logger.info("Search service initialized successfully.") except Exception as e: self.logger.error(f"Failed to initialize search service: {e}", exc_info=True) self.search_enabled = False # === Document Content Methods === async def write_content(self, components: PathComponents, content: str) -> DocumentIndex: """Writes document content and delegates indexing to the SearchService.""" self.check_initialized() self.document_store.write_content(components, content) if self.search_enabled and self.search_service: await self.search_service.update_document_in_indices(components.urn, content) return self.document_store.update_index(components, {"updated_at": datetime.now(timezone.utc)}) async def read_content(self, components: PathComponents) -> Optional[str]: """Read content of a document.""" self.check_initialized() return self.document_store.read_content(components) async def check_content(self, components: PathComponents) -> bool: """Check if content exists for a document in the knowledge base.""" self.check_initialized() return self.document_store.check_content(components) # === Document Index and Metadata Methods === async def create_document(self, components: PathComponents, metadata: Optional[Dict[str, Any]] = None) -> DocumentIndex: """Create a new document with metadata but no content.""" self.check_initialized() if self.document_store.check_index(components): raise ValueError(f"Document already exists at path: {components.path}") index = DocumentIndex( namespace=components.namespace, collection=components.collection, name=components.name, metadata=(metadata or {}) ) self.document_store.write_index(components, index) return index async def read_index(self, components: PathComponents) -> DocumentIndex: """Read document index.""" self.check_initialized() return self.document_store.read_index(components) async def check_index(self, components: PathComponents) -> bool: """Check if index exists for a document in the knowledge base.""" self.check_initialized() return self.document_store.check_index(components) async def update_metadata(self, components: PathComponents, metadata: Dict[str, Any]) -> DocumentIndex: """Update metadata for a document.""" self.check_initialized() current_index = self.document_store.read_index(components) updated_metadata = dict(current_index.metadata) updated_metadata.update(metadata) return self.document_store.update_index(components, {"metadata": updated_metadata}) async def add_metadata_property(self, components: PathComponents, key: str, value: Any) -> Dict[str, Any]: """Add or update a single metadata property.""" self.check_initialized() index = self.document_store.read_index(components) updated_metadata = dict(index.metadata) updated_metadata[key] = value updated_index = self.document_store.update_index(components, {"metadata": updated_metadata}) return {"status": "updated", "metadata_count": len(updated_index.metadata), "key": key, "value": value} async def remove_metadata_property(self, components: PathComponents, key: str) -> Dict[str, Any]: """Remove a specific metadata property.""" self.check_initialized() index = self.document_store.read_index(components) updated_metadata = dict(index.metadata) removed = key in updated_metadata if removed: del updated_metadata[key] updated_index = self.document_store.update_index(components, {"metadata": updated_metadata}) return {"status": "updated" if removed else "unchanged", "metadata_count": len(updated_index.metadata), "key": key, "removed": removed} # === Triples: References & Preferences === async def add_reference(self, components: PathComponents, ref_components: PathComponents, relation: str) -> Dict[str, Any]: """Adds a document reference and delegates indexing to the SearchService.""" self.check_initialized() source_index = self.document_store.read_index(components) triple = ImplicitRDFTriple(predicate=relation, object=ref_components.urn) if triple in source_index.references: return {"status": "success", "message": "Reference already exists", "added": False} source_index.references.append(triple) self.document_store.update_index(components, {"references": source_index.references}) ref_index = self.document_store.read_index(ref_components) reverse_triple = ImplicitRDFTriple(predicate=relation, object=components.urn) if reverse_triple not in ref_index.referenced_by: ref_index.referenced_by.append(reverse_triple) self.document_store.update_index(ref_components, {"referenced_by": ref_index.referenced_by}) if self.search_enabled and self.search_service: await self.search_service.add_triple_to_indices(components.urn, relation, ref_components.urn, "reference") return {"status": "success", "message": "Reference added", "added": True} async def remove_reference(self, components: PathComponents, ref_components: PathComponents, relation: str) -> Dict[str, Any]: """Removes a document reference and delegates de-indexing to the SearchService.""" self.check_initialized() index = self.document_store.read_index(components) ref_to_remove = ImplicitRDFTriple(predicate=relation, object=ref_components.urn) if ref_to_remove not in index.references: return {"status": "unchanged", "reference_count": len(index.references)} updated_refs = [ref for ref in index.references if ref != ref_to_remove] self.document_store.update_index(components, {"references": updated_refs}) # Also remove the reverse reference from the target document try: ref_index = self.document_store.read_index(ref_components) reverse_triple = ImplicitRDFTriple(predicate=relation, object=components.urn) if reverse_triple in ref_index.referenced_by: updated_referenced_by = [ref for ref in ref_index.referenced_by if ref != reverse_triple] self.document_store.update_index(ref_components, {"referenced_by": updated_referenced_by}) except FileNotFoundError: self.logger.warning(f"Referenced document not found when trying to remove reverse reference: {ref_components.urn}") except Exception as e: self.logger.error(f"Failed to remove reverse reference from {ref_components.urn}: {e}", exc_info=True) if self.search_enabled and self.search_service: await self.search_service.delete_triple_from_indices(components.urn, relation, ref_components.urn, "reference") return {"status": "updated", "reference_count": len(updated_refs)} async def add_preference(self, components: PathComponents, preferences: List[ImplicitRDFTriple]) -> Dict[str, Any]: """Add preference triples to a document.""" self.check_initialized() index = self.document_store.read_index(components) updated_preferences = list(index.preferences) added_prefs = [] for p in preferences: if p not in updated_preferences: updated_preferences.append(p) added_prefs.append(p) updated_index = self.document_store.update_index(components, {"preferences": updated_preferences}) if self.search_enabled and self.search_service: for p in added_prefs: await self.search_service.add_triple_to_indices(components.urn, p.predicate, p.object, "preference") return {"status": "updated", "preference_count": len(updated_index.preferences)} async def remove_preference(self, components: PathComponents, preferences: List[ImplicitRDFTriple]) -> Dict[str, Any]: """Remove preference triples from a document.""" self.check_initialized() index = self.document_store.read_index(components) current_prefs = set(index.preferences) prefs_to_remove = set(preferences) removed_prefs = list(current_prefs.intersection(prefs_to_remove)) updated_preferences = list(current_prefs - prefs_to_remove) updated_index = self.document_store.update_index(components, {"preferences": updated_preferences}) if self.search_enabled and self.search_service: for p in removed_prefs: await self.search_service.delete_triple_from_indices(components.urn, p.predicate, p.object, "preference") return {"status": "updated", "preference_count": len(updated_index.preferences)} async def remove_all_preferences(self, components: PathComponents) -> Dict[str, Any]: """Remove all preference triples from a document.""" self.check_initialized() index = self.document_store.read_index(components) if self.search_enabled and self.search_service: for p in index.preferences: await self.search_service.delete_triple_from_indices(components.urn, p.predicate, p.object, "preference") self.document_store.update_index(components, {"preferences": []}) return {"status": "updated", "preference_count": 0} # === Filesystem & Lifecycle Methods === async def list_documents(self, components: Optional[PartialPathComponents] = None, recursive: bool = True) -> List[str]: """List documents in the knowledge base.""" self.check_initialized() if components is None: components = PartialPathComponents() if recursive: return self.document_store.find_documents_recursive(components) return self.document_store.find_documents_shallow(components) async def move_document(self, components: PathComponents, new_components: PathComponents) -> DocumentIndex: """Move a document, updating search index via SearchService.""" self.check_initialized() index = self.document_store.read_index(components) # Move file on disk self.document_store.move_document(components, new_components) # Update reverse references for reverse_ref in index.referenced_by: try: referencing_components = PathComponents.parse_path(reverse_ref.object) referencing_index = self.document_store.read_index(referencing_components) updated_references = [ ImplicitRDFTriple(ref.predicate, new_components.urn) if ref.object == components.urn else ref for ref in referencing_index.references ] if updated_references != referencing_index.references: self.document_store.update_index(referencing_components, {"references": updated_references}) except Exception as e: self.logger.error(f"Failed to update reverse reference in {reverse_ref.object}: {e}") # Delegate to search service if self.search_enabled and self.search_service: try: content = await self.read_content(new_components) await self.search_service.move_document_in_indices(components.urn, new_components.urn, content or "") except Exception as e: self.logger.error(f"SearchService failed to move document indices: {e}", exc_info=True) return self.document_store.read_index(new_components) async def delete_document(self, components: PathComponents) -> Dict[str, Any]: """Deletes a document and delegates de-indexing to the SearchService.""" self.check_initialized() if not self.document_store.check_index(components): return {"status": "not_found", "message": f"Document not found: {components.urn}"} # Clean up references before deleting # (Simplified logic, a real implementation would be more robust) # Delegate de-indexing first if self.search_enabled and self.search_service: await self.search_service.delete_document_from_indices(components.urn) # Then delete from document store self.document_store.delete_document(components) return {"status": "deleted", "message": f"Document deleted: {components.urn}"} async def archive_document(self, components: PathComponents) -> Dict[str, Any]: """Archives a document by moving it to the 'archive' namespace.""" self.check_initialized() if not self.document_store.check_index(components): return {"status": "not_found", "message": f"Document not found: {components.urn}"} index = self.document_store.read_index(components) # Define the archive path. We put it inside a collection matching the original namespace. archive_components = PathComponents( namespace="archive", collection=components.namespace, name=f"{components.collection}-{components.name}" ) # First, remove from search indices if self.search_enabled and self.search_service: await self.search_service.delete_document_from_indices(components.urn) # Clean up references before moving for ref in index.references: try: ref_comp = PathComponents.parse_path(ref.object) await self.remove_reference(components, ref_comp, ref.predicate) except Exception as e: self.logger.warning(f"Failed to clean up outgoing reference from {components.urn} to {ref.object}: {e}") for ref_by in index.referenced_by: try: ref_by_comp = PathComponents.parse_path(ref_by.object) await self.remove_reference(ref_by_comp, components, ref_by.predicate) except Exception as e: self.logger.warning(f"Failed to clean up incoming reference to {components.urn} from {ref_by.object}: {e}") # Move the document try: self.document_store.move_document(components, archive_components) self.logger.info(f"Archived document {components.path} to {archive_components.path}") return { "status": "archived", "message": f"Document archived: {components.urn}", "archive_path": archive_components.path, "archive_urn": archive_components.urn } except Exception as e: self.logger.error(f"Failed to move document to archive: {e}", exc_info=True) # Re-index if move fails if self.search_enabled and self.search_service: try: content = await self.read_content(components) await self.search_service.update_document_in_indices(components.urn, content or "") except Exception as reindex_e: self.logger.error(f"Failed to re-index document after failed archive attempt: {reindex_e}") return {"status": "error", "message": str(e)} # === Search & Recovery (Delegated) === async def search(self, **kwargs) -> List[Dict[str, Any]]: """Delegates search directly to the SearchService.""" self.check_initialized() if not self.search_enabled or not self.search_service: raise RuntimeError("Search is disabled or not initialized.") return await self.search_service.search(**kwargs) async def recover_search_indices(self, rebuild_all: bool = False) -> Dict[str, Any]: """Delegates index recovery directly to the SearchService.""" self.check_initialized() if not self.search_enabled or not self.search_service: raise RuntimeError("Search is disabled or not initialized.") return await self.search_service.recover_indices(rebuild_all=rebuild_all) # ... other non-search-related methods from the original KBM would go here ... # (e.g., create_document, read_index, metadata/preference handling, etc.)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/54rt1n/container-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server