Skip to main content
Glama
system_and_collections_handlers.py17.9 kB
""" System and collections handlers for MCP Memory Server. Handles system health monitoring and generic collection operations. """ from typing import Dict, Any from datetime import datetime try: from ..server_config import get_logger from ..error_handler import error_handler except ImportError: # Fallback for standalone usage import logging def get_logger(name: str): return logging.getLogger(name) class MockErrorHandler: def get_error_stats(self): return {"total_errors": 0} error_handler = MockErrorHandler() logger = get_logger("system-collections-handlers") class SystemAndCollectionsHandlers: """Handles system health and generic collection operations.""" def __init__(self, memory_manager, markdown_processor=None): """Initialize with memory manager and optional processors.""" self.memory_manager = memory_manager self.markdown_processor = markdown_processor def handle_system_health(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle system health check tool call.""" try: # Get health information health_info = { "timestamp": str(datetime.now()), "memory_manager": { "available": self.memory_manager is not None, "initialized": ( self.memory_manager is not None and getattr(self.memory_manager, 'collections_initialized', False) ) }, "error_statistics": error_handler.get_error_stats(), "components": { "markdown_processor": self.markdown_processor is not None } } # Test basic connectivity if memory manager is available if self.memory_manager: try: if (hasattr(self.memory_manager, 'client') and self.memory_manager.client): collections = self.memory_manager.client.get_collections() health_info["components"]["qdrant"] = { "status": "healthy", "collections_count": len(collections.collections) } else: health_info["components"]["qdrant"] = { "status": "unavailable", "error": "No Qdrant client" } if (hasattr(self.memory_manager, 'embedding_model') and self.memory_manager.embedding_model): health_info["components"]["embedding_model"] = { "status": "healthy", "model": str(self.memory_manager.embedding_model) } else: health_info["components"]["embedding_model"] = { "status": "unavailable" } except Exception as e: health_info["components"]["qdrant"] = { "status": "error", "error": str(e) } # Determine overall health status component_issues = [] for component, info in health_info["components"].items(): if (isinstance(info, dict) and info.get("status") in ["unavailable", "error"]): component_issues.append(component) if not component_issues: overall_status = "healthy" status_text = "✅ All systems operational" elif len(component_issues) == len(health_info["components"]): overall_status = "critical" status_text = ( f"❌ Critical: All components have issues: " f"{', '.join(component_issues)}" ) else: overall_status = "degraded" status_text = ( f"⚠️ Degraded: Issues with: {', '.join(component_issues)}" ) health_info["overall_status"] = overall_status # Format health info for display health_text = f"""# System Health Report **Status:** {status_text} **Timestamp:** {health_info['timestamp']} ## Component Status ### Memory Manager - **Available:** {'✅' if health_info['memory_manager']['available'] else '❌'} - **Initialized:** {'✅' if health_info['memory_manager']['initialized'] else '❌'} ### Components """ for component, info in health_info["components"].items(): if isinstance(info, dict): status = info.get("status", "unknown") if status == "healthy": status_icon = "✅" elif status == "unavailable": status_icon = "⚠️" else: status_icon = "❌" health_text += ( f"- **{component.replace('_', ' ').title()}:** " f"{status_icon} {status}\n" ) if "error" in info: health_text += f" - Error: {info['error']}\n" else: health_text += ( f"- **{component.replace('_', ' ').title()}:** " f"{'✅' if info else '❌'}\n" ) # Add error statistics if available error_stats = health_info.get("error_statistics", {}) if error_stats.get("total_errors", 0) > 0: health_text += f""" ## Error Statistics - **Total Errors:** {error_stats.get('total_errors', 0)} - **Recovery Attempts:** {error_stats.get('recovery_attempts', 0)} - **Successful Recoveries:** {error_stats.get('successful_recoveries', 0)} """ if error_stats.get("errors_by_category"): health_text += "\n### Errors by Category\n" for category, count in error_stats["errors_by_category"].items(): health_text += f"- **{category.title()}:** {count}\n" return { "content": [ { "type": "text", "text": health_text } ] } except Exception as e: logger.error(f"Error in system health check: {e}") return { "isError": True, "content": [ { "type": "text", "text": f"❌ Health check failed: {str(e)}" } ] } # Generic Collection Tools async def handle_create_collection( self, arguments: Dict[str, Any] ) -> Dict[str, Any]: """Handle create_collection tool call.""" try: collection_name = arguments.get("collection_name") description = arguments.get("description", "") metadata = arguments.get("metadata", {}) if not collection_name: return { "isError": True, "content": [ {"type": "text", "text": "Collection name is required"} ] } # Use the GenericMemoryService to create the collection result = await self.memory_manager.generic_service.create_collection( collection_name, description, metadata ) return { "content": [ { "type": "text", "text": f"Successfully created collection '{collection_name}'" } ] } except Exception as e: logger.error(f"Error creating collection: {e}") return { "isError": True, "content": [ {"type": "text", "text": f"Failed to create collection: {str(e)}"} ] } async def handle_list_collections( self, arguments: Dict[str, Any] ) -> Dict[str, Any]: """Handle list_collections tool call.""" try: include_stats = arguments.get("include_stats", True) # Get collections from GenericMemoryService collections = await self.memory_manager.generic_service.list_collections() result_text = f"Found {len(collections.collections)} collections:\n\n" for collection in collections.collections: result_text += f"**{collection.name}**\n" if collection.description: result_text += f" Description: {collection.description}\n" if include_stats: stats = await self.memory_manager.generic_service.get_collection_stats( collection.name ) result_text += f" Documents: {stats['document_count']}\n" result_text += f" Created: {collection.created_at}\n" if collection.metadata: result_text += f" Metadata: {collection.metadata}\n" result_text += "\n" return { "content": [ {"type": "text", "text": result_text} ] } except Exception as e: logger.error(f"Error listing collections: {e}") return { "isError": True, "content": [ {"type": "text", "text": f"Failed to list collections: {str(e)}"} ] } async def handle_add_to_collection( self, arguments: Dict[str, Any] ) -> Dict[str, Any]: """Handle add_to_collection tool call.""" try: collection_name = arguments.get("collection_name") content = arguments.get("content") metadata = arguments.get("metadata", {}) importance = arguments.get("importance", 0.5) if not collection_name or not content: return { "isError": True, "content": [ { "type": "text", "text": "Collection name and content are required" } ] } # Use the GenericMemoryService to add content memory_id = await self.memory_manager.generic_service.add_to_collection( collection_name, content, metadata, importance ) return { "content": [ { "type": "text", "text": ( f"Successfully added content to collection " f"'{collection_name}' with ID: {memory_id}" ) } ] } except Exception as e: logger.error(f"Error adding to collection: {e}") return { "isError": True, "content": [ {"type": "text", "text": f"Failed to add to collection: {str(e)}"} ] } async def handle_query_collection( self, arguments: Dict[str, Any] ) -> Dict[str, Any]: """Handle query_collection tool call.""" try: collection_name = arguments.get("collection_name") query = arguments.get("query") limit = arguments.get("limit", 10) min_score = arguments.get("min_score", 0.3) include_metadata = arguments.get("include_metadata", True) if not collection_name or not query: return { "isError": True, "content": [ { "type": "text", "text": "Collection name and query are required" } ] } # Use the GenericMemoryService to query the collection results = await self.memory_manager.generic_service.query_collection( collection_name, query, limit, min_score ) if not results: result_text = ( f"No results found in collection '{collection_name}' " f"for query: '{query}'" ) else: result_text = ( f"Found {len(results)} results in collection " f"'{collection_name}':\n\n" ) for i, result in enumerate(results, 1): result_text += ( f"**Result {i}** " f"(Score: {result.get('score', 'N/A'):.3f})\n" ) result_text += f"{result['content']}\n" if include_metadata and result.get('metadata'): result_text += f"Metadata: {result['metadata']}\n" result_text += "\n" return { "content": [ {"type": "text", "text": result_text} ] } except Exception as e: logger.error(f"Error querying collection: {e}") return { "isError": True, "content": [ {"type": "text", "text": f"Failed to query collection: {str(e)}"} ] } async def handle_delete_collection( self, arguments: Dict[str, Any] ) -> Dict[str, Any]: """Handle delete_collection tool call.""" try: collection_name = arguments.get("collection_name") confirm = arguments.get("confirm", False) if not collection_name: return { "isError": True, "content": [ {"type": "text", "text": "Collection name is required"} ] } if not confirm: return { "isError": True, "content": [ { "type": "text", "text": ( "Confirmation required: set 'confirm' to true " "to delete the collection" ) } ] } # Use the GenericMemoryService to delete the collection await self.memory_manager.generic_service.delete_collection( collection_name ) return { "content": [ { "type": "text", "text": ( f"Successfully deleted collection '{collection_name}' " f"and all its contents" ) } ] } except Exception as e: logger.error(f"Error deleting collection: {e}") return { "isError": True, "content": [ {"type": "text", "text": f"Failed to delete collection: {str(e)}"} ] } async def handle_get_collection_stats( self, arguments: Dict[str, Any] ) -> Dict[str, Any]: """Handle get_collection_stats tool call.""" try: collection_name = arguments.get("collection_name") if not collection_name: return { "isError": True, "content": [ {"type": "text", "text": "Collection name is required"} ] } # Use the GenericMemoryService to get collection stats stats = await self.memory_manager.generic_service.get_collection_stats( collection_name ) result_text = ( f"**Statistics for Collection '{collection_name}'**\n\n" ) result_text += f"Document Count: {stats['document_count']}\n" result_text += f"Total Size: {stats.get('total_size', 'Unknown')}\n" result_text += f"Last Updated: {stats.get('last_updated', 'Unknown')}\n" if stats.get('metadata'): result_text += f"Metadata: {stats['metadata']}\n" return { "content": [ {"type": "text", "text": result_text} ] } except Exception as e: logger.error(f"Error getting collection stats: {e}") return { "isError": True, "content": [ {"type": "text", "text": f"Failed to get collection stats: {str(e)}"} ] }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/hannesnortje/MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server