testing.pyโข14.2 kB
"""
MCP Server Testing Framework
This module provides comprehensive testing capabilities for MCP servers.
It can test server connections, tool functionality, and performance.
"""
import asyncio
import json
import logging
import time
from typing import Dict, List, Optional, Any, Tuple
from dataclasses import dataclass
from datetime import datetime
logger = logging.getLogger(__name__)
@dataclass
class TestResult:
"""Result of an MCP server test."""
test_name: str
success: bool
duration: float
details: Dict[str, Any]
error_message: Optional[str] = None
def __post_init__(self):
if not hasattr(self, 'details') or self.details is None:
self.details = {}
@dataclass
class ServerTestSuite:
"""Complete test suite for an MCP server."""
server_id: str
server_name: str
tests_run: int
tests_passed: int
tests_failed: int
total_duration: float
results: List[TestResult]
timestamp: datetime
class MCPServerTester:
"""Comprehensive testing framework for MCP servers."""
def __init__(self):
self.test_results: Dict[str, ServerTestSuite] = {}
async def test_server_connection(self, host: str, port: int) -> TestResult:
"""Test basic connection to an MCP server."""
start_time = time.time()
try:
# Try to connect to the server
reader, writer = await asyncio.wait_for(
asyncio.open_connection(host, port),
timeout=5.0
)
# Send initialization request
init_request = {
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {
"name": "MCP Tester",
"version": "1.0.0"
}
}
}
# Send request
writer.write(json.dumps(init_request).encode())
await writer.drain()
# Read response
data = await asyncio.wait_for(reader.read(1024), timeout=5.0)
response = json.loads(data.decode())
# Check if initialization was successful
if "result" in response:
duration = time.time() - start_time
writer.close()
await writer.wait_closed()
return TestResult(
test_name="connection_test",
success=True,
duration=duration,
details={
"host": host,
"port": port,
"response": response
}
)
else:
duration = time.time() - start_time
writer.close()
await writer.wait_closed()
return TestResult(
test_name="connection_test",
success=False,
duration=duration,
error_message=f"Server returned error: {response.get('error', 'Unknown error')}",
details={
"host": host,
"port": port,
"response": response
}
)
except asyncio.TimeoutError:
duration = time.time() - start_time
return TestResult(
test_name="connection_test",
success=False,
duration=duration,
error_message="Connection timeout",
details={"host": host, "port": port}
)
except Exception as e:
duration = time.time() - start_time
return TestResult(
test_name="connection_test",
success=False,
duration=duration,
error_message=str(e),
details={"host": host, "port": port}
)
async def test_server_tools(self, host: str, port: int) -> TestResult:
"""Test listing tools from an MCP server."""
start_time = time.time()
try:
# Connect to server
reader, writer = await asyncio.wait_for(
asyncio.open_connection(host, port),
timeout=5.0
)
# Send tools/list request
tools_request = {
"jsonrpc": "2.0",
"id": 2,
"method": "tools/list",
"params": {}
}
# Send request
writer.write(json.dumps(tools_request).encode())
await writer.drain()
# Read response
data = await asyncio.wait_for(reader.read(1024), timeout=5.0)
response = json.loads(data.decode())
# Check if tools listing was successful
if "result" in response and "tools" in response["result"]:
tools = response["result"]["tools"]
duration = time.time() - start_time
writer.close()
await writer.wait_closed()
return TestResult(
test_name="tools_test",
success=True,
duration=duration,
details={
"host": host,
"port": port,
"tools_count": len(tools),
"tools": tools
}
)
else:
duration = time.time() - start_time
writer.close()
await writer.wait_closed()
return TestResult(
test_name="tools_test",
success=False,
duration=duration,
error_message=f"Failed to list tools: {response.get('error', 'Unknown error')}",
details={
"host": host,
"port": port,
"response": response
}
)
except Exception as e:
duration = time.time() - start_time
return TestResult(
test_name="tools_test",
success=False,
duration=duration,
error_message=str(e),
details={"host": host, "port": port}
)
async def test_tool_execution(self, host: str, port: int, tool_name: str, arguments: Dict[str, Any] = None) -> TestResult:
"""Test execution of a specific tool."""
start_time = time.time()
if arguments is None:
arguments = {}
# Ensure arguments is a dict
if not isinstance(arguments, dict):
arguments = {}
try:
# Connect to server
reader, writer = await asyncio.wait_for(
asyncio.open_connection(host, port),
timeout=5.0
)
# Send tools/call request
call_request = {
"jsonrpc": "2.0",
"id": 3,
"method": "tools/call",
"params": {
"name": tool_name,
"arguments": arguments
}
}
# Send request
writer.write(json.dumps(call_request).encode())
await writer.drain()
# Read response
data = await asyncio.wait_for(reader.read(1024), timeout=10.0)
response = json.loads(data.decode())
# Check if tool execution was successful
if "result" in response:
duration = time.time() - start_time
writer.close()
await writer.wait_closed()
return TestResult(
test_name=f"tool_execution_{tool_name}",
success=True,
duration=duration,
details={
"host": host,
"port": port,
"tool_name": tool_name,
"arguments": arguments,
"response": response
}
)
else:
duration = time.time() - start_time
writer.close()
await writer.wait_closed()
return TestResult(
test_name=f"tool_execution_{tool_name}",
success=False,
duration=duration,
error_message=f"Tool execution failed: {response.get('error', 'Unknown error')}",
details={
"host": host,
"port": port,
"tool_name": tool_name,
"arguments": arguments,
"response": response
}
)
except Exception as e:
duration = time.time() - start_time
return TestResult(
test_name=f"tool_execution_{tool_name}",
success=False,
duration=duration,
error_message=str(e),
details={
"host": host,
"port": port,
"tool_name": tool_name,
"arguments": arguments
}
)
async def run_comprehensive_test(self, server_id: str, server_name: str, host: str, port: int) -> ServerTestSuite:
"""Run a comprehensive test suite on an MCP server."""
start_time = time.time()
results = []
logger.info(f"Starting comprehensive test for server: {server_name} ({host}:{port})")
# Test 1: Connection test
connection_result = await self.test_server_connection(host, port)
results.append(connection_result)
# Test 2: Tools listing test
tools_result = await self.test_server_tools(host, port)
results.append(tools_result)
# Test 3: Tool execution tests (if tools are available)
if tools_result.success and "tools" in tools_result.details:
tools = tools_result.details["tools"]
for tool in tools[:3]: # Test first 3 tools
tool_name = tool.get("name", "unknown")
tool_result = await self.test_tool_execution(host, port, tool_name)
results.append(tool_result)
# Calculate summary
total_duration = time.time() - start_time
tests_passed = sum(1 for r in results if r.success)
tests_failed = len(results) - tests_passed
test_suite = ServerTestSuite(
server_id=server_id,
server_name=server_name,
tests_run=len(results),
tests_passed=tests_passed,
tests_failed=tests_failed,
total_duration=total_duration,
results=results,
timestamp=datetime.now()
)
# Store results
self.test_results[server_id] = test_suite
logger.info(f"Test suite completed for {server_name}: {tests_passed}/{len(results)} tests passed")
return test_suite
async def get_test_results(self, server_id: str) -> Optional[ServerTestSuite]:
"""Get test results for a specific server."""
return self.test_results.get(server_id)
async def get_all_test_results(self) -> List[ServerTestSuite]:
"""Get all test results."""
return list(self.test_results.values())
async def clear_test_results(self, server_id: Optional[str] = None):
"""Clear test results."""
if server_id:
self.test_results.pop(server_id, None)
else:
self.test_results.clear()
class PerformanceMonitor:
"""Monitor performance metrics for MCP servers."""
def __init__(self):
self.metrics: Dict[str, List[Dict[str, Any]]] = {}
async def record_metric(self, server_id: str, metric_name: str, value: float, timestamp: Optional[datetime] = None):
"""Record a performance metric."""
if server_id not in self.metrics:
self.metrics[server_id] = []
if timestamp is None:
timestamp = datetime.now()
self.metrics[server_id].append({
"metric_name": metric_name,
"value": value,
"timestamp": timestamp
})
async def get_server_metrics(self, server_id: str, metric_name: Optional[str] = None) -> List[Dict[str, Any]]:
"""Get metrics for a specific server."""
if server_id not in self.metrics:
return []
metrics = self.metrics[server_id]
if metric_name:
return [m for m in metrics if m["metric_name"] == metric_name]
return metrics
async def get_average_metric(self, server_id: str, metric_name: str) -> Optional[float]:
"""Get average value for a specific metric."""
metrics = await self.get_server_metrics(server_id, metric_name)
if not metrics:
return None
values = [m["value"] for m in metrics]
return sum(values) / len(values)