Skip to main content
Glama
check_server_health.py13.3 kB
#!/usr/bin/env python3 # Copyright 2024 Heinrich Krupp # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Health check utility for MCP Memory Service in Claude Desktop. This script sends MCP protocol requests to check the health of the memory service. """ import os import sys import json import asyncio import logging import argparse import subprocess from datetime import datetime # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # MCP protocol messages CHECK_HEALTH_REQUEST = { "method": "tools/call", "params": { "name": "check_database_health", "arguments": {} }, "jsonrpc": "2.0", "id": 1 } CHECK_MODEL_REQUEST = { "method": "tools/call", "params": { "name": "check_embedding_model", "arguments": {} }, "jsonrpc": "2.0", "id": 2 } STORE_MEMORY_REQUEST = { "method": "tools/call", "params": { "name": "store_memory", "arguments": { "content": f"Health check test memory created on {datetime.now().isoformat()}", "metadata": { "tags": ["health-check", "test"], "type": "test" } } }, "jsonrpc": "2.0", "id": 3 } RETRIEVE_MEMORY_REQUEST = { "method": "tools/call", "params": { "name": "retrieve_memory", "arguments": { "query": "health check test", "n_results": 3 } }, "jsonrpc": "2.0", "id": 4 } SEARCH_TAG_REQUEST = { "method": "tools/call", "params": { "name": "search_by_tag", "arguments": { "tags": ["health-check"] } }, "jsonrpc": "2.0", "id": 5 } async def write_json(writer, data): """Write JSON data to the writer.""" message = json.dumps(data) + '\r\n' writer.write(message.encode()) await writer.drain() async def read_json(reader): """Read JSON data from the reader.""" line = await reader.readline() if not line: return None return json.loads(line.decode()) def parse_mcp_response( response: dict | None, operation_name: str, success_patterns: list[str] | None = None, failure_patterns: list[str] | None = None, log_response: bool = True ) -> bool: """ Parse MCP protocol response and check for success/failure patterns. Args: response: MCP response dictionary operation_name: Name of operation for logging success_patterns: Keywords indicating success (empty list means no explicit success check) failure_patterns: Keywords indicating known failures (warnings, not errors) log_response: Whether to log the full response text Returns: True if operation succeeded, False otherwise """ if not response or 'result' not in response: logger.error(f"❌ Invalid response: {response}") return False try: text = response['result']['content'][0]['text'] if log_response: logger.info(f"{operation_name.capitalize()} response: {text}") else: logger.info(f"{operation_name.capitalize()} response received") # Check for failure patterns first (warnings, not errors) if failure_patterns: for pattern in failure_patterns: if pattern in text.lower(): logger.warning(f"⚠️ No results found via {operation_name}") return False # Check for success patterns if success_patterns: for pattern in success_patterns: if pattern in text.lower(): logger.info(f"✅ {operation_name.capitalize()} successful") return True # If we have success patterns but none matched, it's a failure logger.error(f"❌ {operation_name.capitalize()} failed: {text}") return False # No explicit patterns - assume success if we got here logger.info(f"✅ {operation_name.capitalize()} successful") logger.info(f"Response: {text}") return True except Exception as e: logger.error(f"❌ Error parsing {operation_name} response: {e}") return False async def check_health(reader, writer): """Check database health.""" logger.info("=== Check 1: Database Health ===") await write_json(writer, CHECK_HEALTH_REQUEST) response = await read_json(reader) if response and 'result' in response: try: text = response['result']['content'][0]['text'] logger.info(f"Health check response received") data = json.loads(text.split('\n', 1)[1]) # Extract relevant information validation_status = data.get('validation', {}).get('status', 'unknown') has_model = data.get('statistics', {}).get('has_embedding_model', False) memory_count = data.get('statistics', {}).get('total_memories', 0) if validation_status == 'healthy': logger.info(f"✅ Database validation status: {validation_status}") else: logger.error(f"❌ Database validation status: {validation_status}") if has_model: logger.info(f"✅ Embedding model loaded: {has_model}") else: logger.error(f"❌ Embedding model not loaded") logger.info(f"Total memories: {memory_count}") return data except Exception as e: logger.error(f"❌ Error parsing health check response: {e}") else: logger.error(f"❌ Invalid response: {response}") return None async def check_embedding_model(reader, writer): """Check embedding model status.""" logger.info("=== Check 2: Embedding Model ===") await write_json(writer, CHECK_MODEL_REQUEST) response = await read_json(reader) if response and 'result' in response: try: text = response['result']['content'][0]['text'] logger.info(f"Model check response received") data = json.loads(text.split('\n', 1)[1]) status = data.get('status', 'unknown') if status == 'healthy': logger.info(f"✅ Embedding model status: {status}") logger.info(f"Model name: {data.get('model_name', 'unknown')}") logger.info(f"Dimension: {data.get('embedding_dimension', 0)}") else: logger.error(f"❌ Embedding model status: {status}") logger.error(f"Error: {data.get('error', 'unknown')}") return data except Exception as e: logger.error(f"❌ Error parsing model check response: {e}") else: logger.error(f"❌ Invalid response: {response}") return None async def store_memory(reader, writer): """Store a test memory.""" logger.info("=== Check 3: Memory Storage ===") await write_json(writer, STORE_MEMORY_REQUEST) response = await read_json(reader) return parse_mcp_response(response, "memory storage", success_patterns=["successfully"]) async def retrieve_memory(reader, writer): """Retrieve memories using semantic search.""" logger.info("=== Check 4: Semantic Search ===") await write_json(writer, RETRIEVE_MEMORY_REQUEST) response = await read_json(reader) return parse_mcp_response( response, "semantic search", failure_patterns=["no matching memories"], log_response=False ) async def search_by_tag(reader, writer): """Search memories by tag.""" logger.info("=== Check 5: Tag Search ===") await write_json(writer, SEARCH_TAG_REQUEST) response = await read_json(reader) return parse_mcp_response( response, "tag search", failure_patterns=["no memories found"], log_response=False ) async def run_health_check(): """Run all health checks.""" # Start the server server_process = subprocess.Popen( ['/bin/bash', '/Users/hkr/Documents/GitHub/mcp-memory-service/run_mcp_memory.sh'], cwd='/Users/hkr/Documents/GitHub/mcp-memory-service', env={ 'MCP_MEMORY_STORAGE_BACKEND': 'sqlite_vec', 'MCP_MEMORY_SQLITE_PATH': os.path.expanduser('~/Library/Application Support/mcp-memory/sqlite_vec.db'), 'MCP_MEMORY_BACKUPS_PATH': os.path.expanduser('~/Library/Application Support/mcp-memory/backups'), 'MCP_MEMORY_USE_ONNX': '1', 'MCP_MEMORY_USE_HOMEBREW_PYTORCH': '1', 'PYTHONPATH': '/Users/hkr/Documents/GitHub/mcp-memory-service', 'LOG_LEVEL': 'INFO' }, stdout=subprocess.PIPE, stderr=subprocess.PIPE, start_new_session=True ) logger.info("Server started, waiting for initialization...") await asyncio.sleep(5) # Wait for server to start reader = None writer = None success = False try: # Connect to the server reader, writer = await asyncio.open_connection('127.0.0.1', 6789) logger.info("Connected to server") # Initialize the server await write_json(writer, { "method": "initialize", "params": { "protocolVersion": "2024-11-05", "capabilities": {}, "clientInfo": { "name": "health-check", "version": "1.0.0" } }, "jsonrpc": "2.0", "id": 0 }) init_response = await read_json(reader) logger.info(f"Initialization response: {init_response is not None}") # Run health checks health_data = await check_health(reader, writer) model_data = await check_embedding_model(reader, writer) store_success = await store_memory(reader, writer) search_success = await search_by_tag(reader, writer) retrieve_success = await retrieve_memory(reader, writer) # Summarize results logger.info("=== Health Check Summary ===") if health_data and health_data.get('validation', {}).get('status') == 'healthy': logger.info("✅ Database health: GOOD") else: logger.error("❌ Database health: FAILED") success = False if model_data and model_data.get('status') == 'healthy': logger.info("✅ Embedding model: GOOD") else: logger.error("❌ Embedding model: FAILED") success = False if store_success: logger.info("✅ Memory storage: GOOD") else: logger.error("❌ Memory storage: FAILED") success = False if search_success: logger.info("✅ Tag search: GOOD") else: logger.warning("⚠️ Tag search: NO RESULTS") if retrieve_success: logger.info("✅ Semantic search: GOOD") else: logger.warning("⚠️ Semantic search: NO RESULTS") success = ( health_data and health_data.get('validation', {}).get('status') == 'healthy' and model_data and model_data.get('status') == 'healthy' and store_success ) # Shutdown server await write_json(writer, { "method": "shutdown", "params": {}, "jsonrpc": "2.0", "id": 99 }) shutdown_response = await read_json(reader) logger.info(f"Shutdown response: {shutdown_response is not None}") except Exception as e: logger.error(f"Error during health check: {e}") success = False finally: # Clean up if writer: writer.close() await writer.wait_closed() # Terminate server try: os.killpg(os.getpgid(server_process.pid), 15) except: pass server_process.terminate() try: server_process.wait(timeout=5) except: server_process.kill() return success def main(): """Main entry point.""" parser = argparse.ArgumentParser(description='MCP Memory Service Health Check') args = parser.parse_args() logger.info("Starting health check...") success = asyncio.run(run_health_check()) if success: logger.info("Health check completed successfully!") sys.exit(0) else: logger.error("Health check failed!") sys.exit(1) if __name__ == '__main__': main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/doobidoo/mcp-memory-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server