check_server_health.py•13.3 kB
#!/usr/bin/env python3
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Health check utility for MCP Memory Service in Claude Desktop.
This script sends MCP protocol requests to check the health of the memory service.
"""
import os
import sys
import json
import asyncio
import logging
import argparse
import subprocess
from datetime import datetime
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# MCP protocol messages
CHECK_HEALTH_REQUEST = {
"method": "tools/call",
"params": {
"name": "check_database_health",
"arguments": {}
},
"jsonrpc": "2.0",
"id": 1
}
CHECK_MODEL_REQUEST = {
"method": "tools/call",
"params": {
"name": "check_embedding_model",
"arguments": {}
},
"jsonrpc": "2.0",
"id": 2
}
STORE_MEMORY_REQUEST = {
"method": "tools/call",
"params": {
"name": "store_memory",
"arguments": {
"content": f"Health check test memory created on {datetime.now().isoformat()}",
"metadata": {
"tags": ["health-check", "test"],
"type": "test"
}
}
},
"jsonrpc": "2.0",
"id": 3
}
RETRIEVE_MEMORY_REQUEST = {
"method": "tools/call",
"params": {
"name": "retrieve_memory",
"arguments": {
"query": "health check test",
"n_results": 3
}
},
"jsonrpc": "2.0",
"id": 4
}
SEARCH_TAG_REQUEST = {
"method": "tools/call",
"params": {
"name": "search_by_tag",
"arguments": {
"tags": ["health-check"]
}
},
"jsonrpc": "2.0",
"id": 5
}
async def write_json(writer, data):
"""Write JSON data to the writer."""
message = json.dumps(data) + '\r\n'
writer.write(message.encode())
await writer.drain()
async def read_json(reader):
"""Read JSON data from the reader."""
line = await reader.readline()
if not line:
return None
return json.loads(line.decode())
def parse_mcp_response(
response: dict | None,
operation_name: str,
success_patterns: list[str] | None = None,
failure_patterns: list[str] | None = None,
log_response: bool = True
) -> bool:
"""
Parse MCP protocol response and check for success/failure patterns.
Args:
response: MCP response dictionary
operation_name: Name of operation for logging
success_patterns: Keywords indicating success (empty list means no explicit success check)
failure_patterns: Keywords indicating known failures (warnings, not errors)
log_response: Whether to log the full response text
Returns:
True if operation succeeded, False otherwise
"""
if not response or 'result' not in response:
logger.error(f"❌ Invalid response: {response}")
return False
try:
text = response['result']['content'][0]['text']
if log_response:
logger.info(f"{operation_name.capitalize()} response: {text}")
else:
logger.info(f"{operation_name.capitalize()} response received")
# Check for failure patterns first (warnings, not errors)
if failure_patterns:
for pattern in failure_patterns:
if pattern in text.lower():
logger.warning(f"⚠️ No results found via {operation_name}")
return False
# Check for success patterns
if success_patterns:
for pattern in success_patterns:
if pattern in text.lower():
logger.info(f"✅ {operation_name.capitalize()} successful")
return True
# If we have success patterns but none matched, it's a failure
logger.error(f"❌ {operation_name.capitalize()} failed: {text}")
return False
# No explicit patterns - assume success if we got here
logger.info(f"✅ {operation_name.capitalize()} successful")
logger.info(f"Response: {text}")
return True
except Exception as e:
logger.error(f"❌ Error parsing {operation_name} response: {e}")
return False
async def check_health(reader, writer):
"""Check database health."""
logger.info("=== Check 1: Database Health ===")
await write_json(writer, CHECK_HEALTH_REQUEST)
response = await read_json(reader)
if response and 'result' in response:
try:
text = response['result']['content'][0]['text']
logger.info(f"Health check response received")
data = json.loads(text.split('\n', 1)[1])
# Extract relevant information
validation_status = data.get('validation', {}).get('status', 'unknown')
has_model = data.get('statistics', {}).get('has_embedding_model', False)
memory_count = data.get('statistics', {}).get('total_memories', 0)
if validation_status == 'healthy':
logger.info(f"✅ Database validation status: {validation_status}")
else:
logger.error(f"❌ Database validation status: {validation_status}")
if has_model:
logger.info(f"✅ Embedding model loaded: {has_model}")
else:
logger.error(f"❌ Embedding model not loaded")
logger.info(f"Total memories: {memory_count}")
return data
except Exception as e:
logger.error(f"❌ Error parsing health check response: {e}")
else:
logger.error(f"❌ Invalid response: {response}")
return None
async def check_embedding_model(reader, writer):
"""Check embedding model status."""
logger.info("=== Check 2: Embedding Model ===")
await write_json(writer, CHECK_MODEL_REQUEST)
response = await read_json(reader)
if response and 'result' in response:
try:
text = response['result']['content'][0]['text']
logger.info(f"Model check response received")
data = json.loads(text.split('\n', 1)[1])
status = data.get('status', 'unknown')
if status == 'healthy':
logger.info(f"✅ Embedding model status: {status}")
logger.info(f"Model name: {data.get('model_name', 'unknown')}")
logger.info(f"Dimension: {data.get('embedding_dimension', 0)}")
else:
logger.error(f"❌ Embedding model status: {status}")
logger.error(f"Error: {data.get('error', 'unknown')}")
return data
except Exception as e:
logger.error(f"❌ Error parsing model check response: {e}")
else:
logger.error(f"❌ Invalid response: {response}")
return None
async def store_memory(reader, writer):
"""Store a test memory."""
logger.info("=== Check 3: Memory Storage ===")
await write_json(writer, STORE_MEMORY_REQUEST)
response = await read_json(reader)
return parse_mcp_response(response, "memory storage", success_patterns=["successfully"])
async def retrieve_memory(reader, writer):
"""Retrieve memories using semantic search."""
logger.info("=== Check 4: Semantic Search ===")
await write_json(writer, RETRIEVE_MEMORY_REQUEST)
response = await read_json(reader)
return parse_mcp_response(
response,
"semantic search",
failure_patterns=["no matching memories"],
log_response=False
)
async def search_by_tag(reader, writer):
"""Search memories by tag."""
logger.info("=== Check 5: Tag Search ===")
await write_json(writer, SEARCH_TAG_REQUEST)
response = await read_json(reader)
return parse_mcp_response(
response,
"tag search",
failure_patterns=["no memories found"],
log_response=False
)
async def run_health_check():
"""Run all health checks."""
# Start the server
server_process = subprocess.Popen(
['/bin/bash', '/Users/hkr/Documents/GitHub/mcp-memory-service/run_mcp_memory.sh'],
cwd='/Users/hkr/Documents/GitHub/mcp-memory-service',
env={
'MCP_MEMORY_STORAGE_BACKEND': 'sqlite_vec',
'MCP_MEMORY_SQLITE_PATH': os.path.expanduser('~/Library/Application Support/mcp-memory/sqlite_vec.db'),
'MCP_MEMORY_BACKUPS_PATH': os.path.expanduser('~/Library/Application Support/mcp-memory/backups'),
'MCP_MEMORY_USE_ONNX': '1',
'MCP_MEMORY_USE_HOMEBREW_PYTORCH': '1',
'PYTHONPATH': '/Users/hkr/Documents/GitHub/mcp-memory-service',
'LOG_LEVEL': 'INFO'
},
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
start_new_session=True
)
logger.info("Server started, waiting for initialization...")
await asyncio.sleep(5) # Wait for server to start
reader = None
writer = None
success = False
try:
# Connect to the server
reader, writer = await asyncio.open_connection('127.0.0.1', 6789)
logger.info("Connected to server")
# Initialize the server
await write_json(writer, {
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {
"name": "health-check",
"version": "1.0.0"
}
},
"jsonrpc": "2.0",
"id": 0
})
init_response = await read_json(reader)
logger.info(f"Initialization response: {init_response is not None}")
# Run health checks
health_data = await check_health(reader, writer)
model_data = await check_embedding_model(reader, writer)
store_success = await store_memory(reader, writer)
search_success = await search_by_tag(reader, writer)
retrieve_success = await retrieve_memory(reader, writer)
# Summarize results
logger.info("=== Health Check Summary ===")
if health_data and health_data.get('validation', {}).get('status') == 'healthy':
logger.info("✅ Database health: GOOD")
else:
logger.error("❌ Database health: FAILED")
success = False
if model_data and model_data.get('status') == 'healthy':
logger.info("✅ Embedding model: GOOD")
else:
logger.error("❌ Embedding model: FAILED")
success = False
if store_success:
logger.info("✅ Memory storage: GOOD")
else:
logger.error("❌ Memory storage: FAILED")
success = False
if search_success:
logger.info("✅ Tag search: GOOD")
else:
logger.warning("⚠️ Tag search: NO RESULTS")
if retrieve_success:
logger.info("✅ Semantic search: GOOD")
else:
logger.warning("⚠️ Semantic search: NO RESULTS")
success = (
health_data and health_data.get('validation', {}).get('status') == 'healthy' and
model_data and model_data.get('status') == 'healthy' and
store_success
)
# Shutdown server
await write_json(writer, {
"method": "shutdown",
"params": {},
"jsonrpc": "2.0",
"id": 99
})
shutdown_response = await read_json(reader)
logger.info(f"Shutdown response: {shutdown_response is not None}")
except Exception as e:
logger.error(f"Error during health check: {e}")
success = False
finally:
# Clean up
if writer:
writer.close()
await writer.wait_closed()
# Terminate server
try:
os.killpg(os.getpgid(server_process.pid), 15)
except:
pass
server_process.terminate()
try:
server_process.wait(timeout=5)
except:
server_process.kill()
return success
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(description='MCP Memory Service Health Check')
args = parser.parse_args()
logger.info("Starting health check...")
success = asyncio.run(run_health_check())
if success:
logger.info("Health check completed successfully!")
sys.exit(0)
else:
logger.error("Health check failed!")
sys.exit(1)
if __name__ == '__main__':
main()