mcp_health_check.pyā¢5.24 kB
#!/usr/bin/env python
"""
Nikola TEST MCP MCP Server Health Check Script
This script properly connects to the Nikola TEST MCP MCP server and checks its health by:
1. Establishing a session
2. Requesting server info
3. Listing available tools
"""
import sys
import json
import requests
import argparse
from typing import Dict, Any
import uuid
def create_mcp_session(base_url: str) -> Dict[str, Any]:
"""Create an MCP session and return session info"""
# Generate a session ID
session_id = str(uuid.uuid4())
# MCP requires specific headers for streamable-http
headers = {
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream",
"X-Session-ID": session_id
}
return {"session_id": session_id, "headers": headers, "base_url": base_url}
def send_mcp_request(session: Dict[str, Any], method: str, params: Dict = None) -> Dict[str, Any]:
"""Send an MCP JSON-RPC request"""
request_data = {
"jsonrpc": "2.0",
"method": method,
"params": params or {},
"id": str(uuid.uuid4())
}
try:
response = requests.post(
f"{session['base_url']}/mcp/",
json=request_data,
headers=session['headers'],
timeout=5
)
# Handle both JSON and SSE responses
if response.headers.get('content-type', '').startswith('text/event-stream'):
# For SSE, we'd need to parse the event stream
return {"status": "ok", "message": "Server returned SSE stream"}
else:
return response.json()
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def check_mcp_server_health(url: str) -> bool:
"""Check if Nikola TEST MCP MCP server is healthy"""
print(f"š Checking Nikola TEST MCP MCP server health at {url}")
# Create session
session = create_mcp_session(url)
print(f"š Created session: {session['session_id']}")
# Try to get server info
print("\n1ļøā£ Testing server.info method...")
result = send_mcp_request(session, "server.info")
if "error" in result and "session" not in str(result.get("error", "")):
print(f"ā Server info failed: {result}")
return False
else:
print(f"ā
Server responded: {json.dumps(result, indent=2)[:200]}...")
# Try to list tools
print("\n2ļøā£ Testing tools/list method...")
result = send_mcp_request(session, "tools/list")
if "error" in result and "session" not in str(result.get("error", "")):
print(f"ā List tools failed: {result}")
return False
else:
print(f"ā
Tools list responded: {json.dumps(result, indent=2)[:200]}...")
# Check if we have the expected tools
if "result" in result and "tools" in result["result"]:
tools = result["result"]["tools"]
tool_names = [tool.get("name", "") for tool in tools]
print(f"š Available tools: {', '.join(tool_names)}")
# Check for expected tools
expected_tools = ["example_tool", "get_api_info"]
missing_tools = [tool for tool in expected_tools if tool not in tool_names]
if missing_tools:
print(f"ā ļø Missing expected tools: {', '.join(missing_tools)}")
else:
print("ā
All expected Nikola TEST MCP tools are available!")
# Alternative: Try connecting with CrewAI adapter
print("\n3ļøā£ Testing CrewAI adapter connection...")
try:
from crewai_tools import MCPServerAdapter
server_params = {
"url": f"{url}/mcp/",
"transport": "streamable-http"
}
with MCPServerAdapter(server_params) as mcp_tools:
tools = list(mcp_tools)
print(f"ā
CrewAI connected successfully! Found {len(tools)} tools")
# Print first few tools
for i, tool in enumerate(tools[:3]):
print(f" - {tool.name}")
if len(tools) > 3:
print(f" ... and {len(tools) - 3} more")
return True
except Exception as e:
print(f"ā ļø CrewAI adapter test failed: {e}")
# This might fail but server could still be healthy
return True
def main():
parser = argparse.ArgumentParser(description="Check Nikola TEST MCP MCP Server Health")
parser.add_argument("--url", default="http://localhost:8000",
help="Nikola TEST MCP MCP server URL (default: http://localhost:8000)")
args = parser.parse_args()
print(f"š Nikola TEST MCP MCP Server Health Check")
print(f"š Server URL: {args.url}")
print(f"š° Expected tools: example_tool, get_api_info")
print("="*50)
if check_mcp_server_health(args.url):
print("\nā
Nikola TEST MCP MCP Server is healthy and responding!")
return 0
else:
print("\nā Nikola TEST MCP MCP Server health check failed!")
return 1
if __name__ == "__main__":
sys.exit(main())