Skip to main content
Glama
memory_manager.pyโ€ข18.8 kB
""" Qdrant Memory Manager for MCP Memory Server (Refactored Router). This is now a lightweight router that delegates to specialized memory modules while maintaining backward compatibility with the existing MCP server interface. Refactored from monolithic 1,106-line class for better maintainability. """ import logging import uuid from typing import Dict, Any, List, Optional from datetime import datetime from qdrant_client import QdrantClient from sentence_transformers import SentenceTransformer from src.config import Config from .error_handler import retry_qdrant_operation from .generic_memory_service import GenericMemoryService # Import specialized memory modules from .memory import ( VectorOperations, AgentRegistry, FileMetadataManager, EmbeddingService, CollectionManager ) logger = logging.getLogger(__name__) class QdrantMemoryManager: """ Lightweight router that delegates to specialized memory modules. Maintains backward compatibility with existing MCP server interface while using modular architecture for better maintainability. """ def __init__(self) -> None: """Initialize the Memory Manager Router.""" # Legacy interface compatibility self.client: Optional[QdrantClient] = None self.embedding_model: Optional[SentenceTransformer] = None self.collections_initialized = False self.current_agent_id = None self.current_context = {} # Generic memory service for backward compatibility self.generic_service = GenericMemoryService() # Specialized modules (initialized after client setup) self.embedding_service: Optional[EmbeddingService] = None self.collection_manager: Optional[CollectionManager] = None self.vector_operations: Optional[VectorOperations] = None self.agent_registry: Optional[AgentRegistry] = None self.file_metadata_manager: Optional[FileMetadataManager] = None # Initialize synchronously for MCP server compatibility self._sync_initialize() def _agent_id_to_point_id(self, agent_id: str) -> str: """Convert agent ID to valid Qdrant point ID.""" try: uuid.UUID(agent_id) return agent_id # It's already a valid UUID except ValueError: pass # Create a deterministic UUID5 based on the agent_id namespace = uuid.UUID('6ba7b810-9dad-11d1-80b4-00c04fd430c8') return str(uuid.uuid5(namespace, agent_id)) @retry_qdrant_operation(max_attempts=3) def _sync_initialize(self) -> None: """Synchronous initialization for compatibility.""" try: # Initialize the generic memory service synchronously import asyncio # Check if we can run async operations try: asyncio.get_running_loop() logger.warning( "Already in event loop during sync initialization - " "using fallback sync init" ) # Fall back to old sync initialization self._fallback_sync_init() except RuntimeError: # No event loop, run async initialization asyncio.run(self._async_initialize_generic_service()) logger.info( "โœ… QdrantMemoryManager initialized with GenericMemoryService" ) except Exception as e: logger.error(f"โŒ Failed to initialize QdrantMemoryManager: {e}") # Fall back to sync initialization try: self._fallback_sync_init() logger.info("โœ… Fallback sync initialization completed") except Exception as fallback_error: logger.error( f"โŒ Fallback initialization also failed: {fallback_error}" ) raise async def _async_initialize_generic_service(self) -> None: """Initialize the generic memory service asynchronously.""" await self.generic_service.initialize() def _fallback_sync_init(self) -> None: """Fallback to sync initialization with modular services.""" # Initialize Qdrant client directly if Config.QDRANT_API_KEY: self.client = QdrantClient( host=Config.QDRANT_HOST, port=Config.QDRANT_PORT, api_key=Config.QDRANT_API_KEY, timeout=60 ) else: self.client = QdrantClient( host=Config.QDRANT_HOST, port=Config.QDRANT_PORT, timeout=60 ) # Test connection collections = self.client.get_collections() logger.info( f"โœ… Connected to Qdrant at {Config.QDRANT_HOST}:" f"{Config.QDRANT_PORT}" ) logger.info( f"Found {len(collections.collections)} existing collections" ) # Initialize embedding model self.embedding_model = SentenceTransformer(Config.EMBEDDING_MODEL) logger.info(f"โœ… Loaded embedding model: {Config.EMBEDDING_MODEL}") # Initialize specialized modules self._initialize_modules() # Initialize collections using collection manager if self.collection_manager: self.collection_manager.sync_initialize_collections() self.collections_initialized = True # Initialize the generic service with the same client and model self.generic_service.client = self.client self.generic_service.embedding_model = self.embedding_model self.generic_service.collection_manager = self.collection_manager self.generic_service.initialized = True def _initialize_modules(self) -> None: """Initialize specialized memory modules.""" try: # Initialize embedding service self.embedding_service = EmbeddingService() # Set embedding model directly (sync initialization) self.embedding_service.embedding_model = self.embedding_model # Initialize collection manager self.collection_manager = CollectionManager(self.client) # Initialize vector operations self.vector_operations = VectorOperations( self.client, self.embedding_service ) # Initialize agent registry self.agent_registry = AgentRegistry( self.client, self.embedding_service ) # Initialize file metadata manager self.file_metadata_manager = FileMetadataManager( self.client, self.embedding_service ) logger.info("โœ… Specialized memory modules initialized") except Exception as e: logger.error(f"โŒ Failed to initialize modules: {e}") raise # MCP Server Interface Methods def set_agent_context( self, agent_id: str, context_type: str, description: str ) -> None: """Set the current agent context for memory operations.""" self.current_agent_id = agent_id self.current_context = { "agent_id": agent_id, "context_type": context_type, "description": description, "timestamp": datetime.now().isoformat() } logger.info(f"๐ŸŽฏ Set agent context: {agent_id} ({context_type})") def add_to_global_memory( self, content: str, category: str = "general", importance: float = 0.5 ) -> Dict[str, Any]: """Add content to global memory via GenericMemoryService.""" return self.generic_service.add_to_global_memory( content, category, importance ) def add_to_learned_memory( self, content: str, pattern_type: str = "insight", confidence: float = 0.7 ) -> Dict[str, Any]: """Add learned patterns to memory via GenericMemoryService.""" return self.generic_service.add_to_learned_memory( content, pattern_type, confidence ) def add_to_agent_memory( self, content: str, agent_id: Optional[str] = None, memory_type: str = "general" ) -> Dict[str, Any]: """Add content to agent-specific memory via vector operations.""" try: target_agent_id = agent_id or self.current_agent_id if not target_agent_id: return { "success": False, "error": "No agent ID provided or set in context" } # Ensure agent collection exists via collection manager if self.collection_manager: self.collection_manager.ensure_agent_collection(target_agent_id) # Use vector operations to add content collection_name = Config.get_collection_name("agent", target_agent_id) if self.vector_operations: result = self.vector_operations.async_add_to_memory( content=content, collection=collection_name, metadata={ "agent_id": target_agent_id, "memory_type": memory_type } ) if result["success"]: result["message"] = ( f"Added to agent memory for {target_agent_id}" ) return result else: return { "success": False, "error": "Vector operations not initialized" } except Exception as e: logger.error(f"โŒ Failed to add to agent memory: {e}") return {"success": False, "error": str(e)} def query_memory( self, query: str, memory_types: List[str] = None, limit: int = 10, min_score: float = 0.3 ) -> Dict[str, Any]: """Query memory for relevant content via GenericMemoryService.""" return self.generic_service.query_memory( query, memory_types, limit, min_score ) def compare_against_learned_memory( self, situation: str, comparison_type: str = "pattern_match", limit: int = 5 ) -> Dict[str, Any]: """Compare situation against learned patterns via service.""" return self.generic_service.compare_against_learned_memory( situation, comparison_type, limit ) # Async Interface Methods async def initialize(self) -> None: """Initialize Qdrant client and embedding model.""" await self.generic_service.initialize() # Copy initialized components self.client = self.generic_service.client self.embedding_model = self.generic_service.embedding_model self.collections_initialized = True # Initialize specialized modules self._initialize_modules() # Vector Operations Delegation def async_add_to_memory( self, content: str, collection: str, metadata: Optional[Dict[str, Any]] = None, content_hash: Optional[str] = None ) -> Dict[str, Any]: """Add content to specified memory collection.""" if self.vector_operations: return self.vector_operations.async_add_to_memory( content, collection, metadata, content_hash ) return {"success": False, "error": "Vector operations not initialized"} def async_query_memory( self, query: str, collection: str, limit: int = 10, min_score: float = 0.3, filters: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """Query memory collection for relevant content.""" if self.vector_operations: return self.vector_operations.async_query_memory( query, collection, limit, min_score, filters ) return {"success": False, "error": "Vector operations not initialized"} def async_check_duplicate_with_similarity( self, content: str, collection: str, similarity_threshold: float = 0.95, metadata_filters: Optional[Dict[str, Any]] = None, check_hash_first: bool = True ) -> Dict[str, Any]: """Check for duplicate content using similarity search.""" if self.vector_operations: return self.vector_operations.async_check_duplicate_with_similarity( content, collection, similarity_threshold, metadata_filters, check_hash_first ) return {"is_duplicate": False, "error": "Vector operations not initialized"} def async_check_duplicate( self, content: str, collection: str, metadata_filters: Optional[Dict[str, Any]] = None ) -> bool: """Simple duplicate check by content hash.""" if self.vector_operations: return self.vector_operations.async_check_duplicate( content, collection, metadata_filters ) return False def async_delete_content( self, content_hash: str, collection: str ) -> Dict[str, Any]: """Delete content from collection by hash.""" if self.vector_operations: return self.vector_operations.async_delete_content( content_hash, collection ) return {"success": False, "error": "Vector operations not initialized"} def async_get_collection_info( self, memory_type: str, agent_id: Optional[str] = None ) -> Dict[str, Any]: """Get information about a collection.""" collection = ( Config.get_collection_name(memory_type, agent_id) if agent_id else Config.get_collection_name(memory_type) ) if self.vector_operations: return self.vector_operations.async_get_collection_info(collection) return {"error": "Vector operations not initialized"} # Agent Registry Delegation async def register_agent( self, agent_id: str, agent_role: str = "general", memory_layers: List[str] = None, permissions: Dict[str, Any] = None ) -> Dict[str, Any]: """Register a new agent in the agent registry.""" if self.agent_registry: return await self.agent_registry.register_agent( agent_id, agent_role, memory_layers, permissions ) return {"success": False, "error": "Agent registry not initialized"} async def get_agent(self, agent_id: str) -> Dict[str, Any]: """Get agent information from registry.""" if self.agent_registry: return await self.agent_registry.get_agent(agent_id) return {"success": False, "error": "Agent registry not initialized"} async def update_agent_permissions( self, agent_id: str, permissions: Dict[str, Any] ) -> Dict[str, Any]: """Update agent permissions.""" if self.agent_registry: return await self.agent_registry.update_agent_permissions( agent_id, permissions ) return {"success": False, "error": "Agent registry not initialized"} async def list_agents(self) -> Dict[str, Any]: """List all registered agents.""" if self.agent_registry: return await self.agent_registry.list_agents() return {"success": False, "error": "Agent registry not initialized"} async def check_agent_permission( self, agent_id: str, action: str, memory_type: str ) -> bool: """Check if agent has permission for a specific action.""" if self.agent_registry: return await self.agent_registry.check_agent_permission( agent_id, action, memory_type ) return False async def log_agent_action( self, agent_id: str, action: str, context: Dict[str, Any], outcome: str, store_as_learned: bool = False ) -> Dict[str, Any]: """Log an agent action and optionally store as learned memory.""" if self.agent_registry: return await self.agent_registry.log_agent_action( agent_id, action, context, outcome, store_as_learned ) return {"success": False, "error": "Agent registry not initialized"} # File Metadata Management Delegation def add_file_metadata( self, file_path: str, file_hash: str, file_size: int, processing_status: str = "processed", chunks_created: int = 0, processing_time: float = 0.0, additional_metadata: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """Add file processing metadata to tracking collection.""" if self.file_metadata_manager: return self.file_metadata_manager.add_file_metadata( file_path, file_hash, file_size, processing_status, chunks_created, processing_time, additional_metadata ) return {"success": False, "error": "File metadata manager not initialized"} def get_file_metadata(self, file_path: str) -> Optional[Dict[str, Any]]: """Get file metadata by file path.""" if self.file_metadata_manager: return self.file_metadata_manager.get_file_metadata(file_path) return None def check_file_processed(self, file_path: str, file_hash: str) -> bool: """Check if file has been processed based on path and hash.""" if self.file_metadata_manager: return self.file_metadata_manager.check_file_processed( file_path, file_hash ) return False # Cleanup async def cleanup(self) -> None: """Cleanup resources.""" try: if self.embedding_service: await self.embedding_service.cleanup() if self.generic_service: await self.generic_service.cleanup() logger.info("๐Ÿงน Memory manager cleanup completed") except Exception as e: logger.error(f"โš ๏ธ Error during cleanup: {e}")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/hannesnortje/MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server