Skip to main content
Glama

MCP Memory Service

#!/usr/bin/env python3 # Copyright 2024 Heinrich Krupp # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Health check utility for MCP Memory Service in Claude Desktop. This script sends MCP protocol requests to check the health of the memory service. """ import os import sys import json import asyncio import logging import argparse import subprocess from datetime import datetime # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # MCP protocol messages CHECK_HEALTH_REQUEST = { "method": "tools/call", "params": { "name": "check_database_health", "arguments": {} }, "jsonrpc": "2.0", "id": 1 } CHECK_MODEL_REQUEST = { "method": "tools/call", "params": { "name": "check_embedding_model", "arguments": {} }, "jsonrpc": "2.0", "id": 2 } STORE_MEMORY_REQUEST = { "method": "tools/call", "params": { "name": "store_memory", "arguments": { "content": f"Health check test memory created on {datetime.now().isoformat()}", "metadata": { "tags": ["health-check", "test"], "type": "test" } } }, "jsonrpc": "2.0", "id": 3 } RETRIEVE_MEMORY_REQUEST = { "method": "tools/call", "params": { "name": "retrieve_memory", "arguments": { "query": "health check test", "n_results": 3 } }, "jsonrpc": "2.0", "id": 4 } SEARCH_TAG_REQUEST = { "method": "tools/call", "params": { "name": "search_by_tag", "arguments": { "tags": ["health-check"] } }, "jsonrpc": "2.0", "id": 5 } async def write_json(writer, data): """Write JSON data to the writer.""" message = json.dumps(data) + '\r\n' writer.write(message.encode()) await writer.drain() async def read_json(reader): """Read JSON data from the reader.""" line = await reader.readline() if not line: return None return json.loads(line.decode()) async def check_health(reader, writer): """Check database health.""" logger.info("=== Check 1: Database Health ===") await write_json(writer, CHECK_HEALTH_REQUEST) response = await read_json(reader) if response and 'result' in response: try: text = response['result']['content'][0]['text'] logger.info(f"Health check response received") data = json.loads(text.split('\n', 1)[1]) # Extract relevant information validation_status = data.get('validation', {}).get('status', 'unknown') has_model = data.get('statistics', {}).get('has_embedding_model', False) memory_count = data.get('statistics', {}).get('total_memories', 0) if validation_status == 'healthy': logger.info(f"✅ Database validation status: {validation_status}") else: logger.error(f"❌ Database validation status: {validation_status}") if has_model: logger.info(f"✅ Embedding model loaded: {has_model}") else: logger.error(f"❌ Embedding model not loaded") logger.info(f"Total memories: {memory_count}") return data except Exception as e: logger.error(f"❌ Error parsing health check response: {e}") else: logger.error(f"❌ Invalid response: {response}") return None async def check_embedding_model(reader, writer): """Check embedding model status.""" logger.info("=== Check 2: Embedding Model ===") await write_json(writer, CHECK_MODEL_REQUEST) response = await read_json(reader) if response and 'result' in response: try: text = response['result']['content'][0]['text'] logger.info(f"Model check response received") data = json.loads(text.split('\n', 1)[1]) status = data.get('status', 'unknown') if status == 'healthy': logger.info(f"✅ Embedding model status: {status}") logger.info(f"Model name: {data.get('model_name', 'unknown')}") logger.info(f"Dimension: {data.get('embedding_dimension', 0)}") else: logger.error(f"❌ Embedding model status: {status}") logger.error(f"Error: {data.get('error', 'unknown')}") return data except Exception as e: logger.error(f"❌ Error parsing model check response: {e}") else: logger.error(f"❌ Invalid response: {response}") return None async def store_memory(reader, writer): """Store a test memory.""" logger.info("=== Check 3: Memory Storage ===") await write_json(writer, STORE_MEMORY_REQUEST) response = await read_json(reader) if response and 'result' in response: try: text = response['result']['content'][0]['text'] logger.info(f"Store memory response: {text}") if "successfully" in text.lower(): logger.info("✅ Memory stored successfully") return True else: logger.error(f"❌ Memory storage failed: {text}") return False except Exception as e: logger.error(f"❌ Error parsing store memory response: {e}") else: logger.error(f"❌ Invalid response: {response}") return False async def retrieve_memory(reader, writer): """Retrieve memories using semantic search.""" logger.info("=== Check 4: Semantic Search ===") await write_json(writer, RETRIEVE_MEMORY_REQUEST) response = await read_json(reader) if response and 'result' in response: try: text = response['result']['content'][0]['text'] logger.info(f"Retrieve memory response received") if "no matching memories" in text.lower(): logger.warning("⚠️ No memories found via semantic search") return False else: logger.info("✅ Semantic search successful") logger.info(f"Response: {text}") return True except Exception as e: logger.error(f"❌ Error parsing retrieve memory response: {e}") else: logger.error(f"❌ Invalid response: {response}") return False async def search_by_tag(reader, writer): """Search memories by tag.""" logger.info("=== Check 5: Tag Search ===") await write_json(writer, SEARCH_TAG_REQUEST) response = await read_json(reader) if response and 'result' in response: try: text = response['result']['content'][0]['text'] logger.info(f"Tag search response received") if "no memories found" in text.lower(): logger.warning("⚠️ No memories found via tag search") return False else: logger.info("✅ Tag search successful") logger.info(f"Response: {text}") return True except Exception as e: logger.error(f"❌ Error parsing tag search response: {e}") else: logger.error(f"❌ Invalid response: {response}") return False async def run_health_check(): """Run all health checks.""" # Start the server server_process = subprocess.Popen( ['/bin/bash', '/Users/hkr/Documents/GitHub/mcp-memory-service/run_mcp_memory.sh'], cwd='/Users/hkr/Documents/GitHub/mcp-memory-service', env={ 'MCP_MEMORY_STORAGE_BACKEND': 'sqlite_vec', 'MCP_MEMORY_SQLITE_PATH': os.path.expanduser('~/Library/Application Support/mcp-memory/sqlite_vec.db'), 'MCP_MEMORY_BACKUPS_PATH': os.path.expanduser('~/Library/Application Support/mcp-memory/backups'), 'MCP_MEMORY_USE_ONNX': '1', 'MCP_MEMORY_USE_HOMEBREW_PYTORCH': '1', 'PYTHONPATH': '/Users/hkr/Documents/GitHub/mcp-memory-service', 'LOG_LEVEL': 'INFO' }, stdout=subprocess.PIPE, stderr=subprocess.PIPE, start_new_session=True ) logger.info("Server started, waiting for initialization...") await asyncio.sleep(5) # Wait for server to start reader = None writer = None success = False try: # Connect to the server reader, writer = await asyncio.open_connection('127.0.0.1', 6789) logger.info("Connected to server") # Initialize the server await write_json(writer, { "method": "initialize", "params": { "protocolVersion": "2024-11-05", "capabilities": {}, "clientInfo": { "name": "health-check", "version": "1.0.0" } }, "jsonrpc": "2.0", "id": 0 }) init_response = await read_json(reader) logger.info(f"Initialization response: {init_response is not None}") # Run health checks health_data = await check_health(reader, writer) model_data = await check_embedding_model(reader, writer) store_success = await store_memory(reader, writer) search_success = await search_by_tag(reader, writer) retrieve_success = await retrieve_memory(reader, writer) # Summarize results logger.info("=== Health Check Summary ===") if health_data and health_data.get('validation', {}).get('status') == 'healthy': logger.info("✅ Database health: GOOD") else: logger.error("❌ Database health: FAILED") success = False if model_data and model_data.get('status') == 'healthy': logger.info("✅ Embedding model: GOOD") else: logger.error("❌ Embedding model: FAILED") success = False if store_success: logger.info("✅ Memory storage: GOOD") else: logger.error("❌ Memory storage: FAILED") success = False if search_success: logger.info("✅ Tag search: GOOD") else: logger.warning("⚠️ Tag search: NO RESULTS") if retrieve_success: logger.info("✅ Semantic search: GOOD") else: logger.warning("⚠️ Semantic search: NO RESULTS") success = ( health_data and health_data.get('validation', {}).get('status') == 'healthy' and model_data and model_data.get('status') == 'healthy' and store_success ) # Shutdown server await write_json(writer, { "method": "shutdown", "params": {}, "jsonrpc": "2.0", "id": 99 }) shutdown_response = await read_json(reader) logger.info(f"Shutdown response: {shutdown_response is not None}") except Exception as e: logger.error(f"Error during health check: {e}") success = False finally: # Clean up if writer: writer.close() await writer.wait_closed() # Terminate server try: os.killpg(os.getpgid(server_process.pid), 15) except: pass server_process.terminate() try: server_process.wait(timeout=5) except: server_process.kill() return success def main(): """Main entry point.""" parser = argparse.ArgumentParser(description='MCP Memory Service Health Check') args = parser.parse_args() logger.info("Starting health check...") success = asyncio.run(run_health_check()) if success: logger.info("Health check completed successfully!") sys.exit(0) else: logger.error("Health check failed!") sys.exit(1) if __name__ == '__main__': main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/doobidoo/mcp-memory-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server