Skip to main content
Glama

MCP Learning Project

by BerdTan
testing.pyโ€ข14.2 kB
""" MCP Server Testing Framework This module provides comprehensive testing capabilities for MCP servers. It can test server connections, tool functionality, and performance. """ import asyncio import json import logging import time from typing import Dict, List, Optional, Any, Tuple from dataclasses import dataclass from datetime import datetime logger = logging.getLogger(__name__) @dataclass class TestResult: """Result of an MCP server test.""" test_name: str success: bool duration: float details: Dict[str, Any] error_message: Optional[str] = None def __post_init__(self): if not hasattr(self, 'details') or self.details is None: self.details = {} @dataclass class ServerTestSuite: """Complete test suite for an MCP server.""" server_id: str server_name: str tests_run: int tests_passed: int tests_failed: int total_duration: float results: List[TestResult] timestamp: datetime class MCPServerTester: """Comprehensive testing framework for MCP servers.""" def __init__(self): self.test_results: Dict[str, ServerTestSuite] = {} async def test_server_connection(self, host: str, port: int) -> TestResult: """Test basic connection to an MCP server.""" start_time = time.time() try: # Try to connect to the server reader, writer = await asyncio.wait_for( asyncio.open_connection(host, port), timeout=5.0 ) # Send initialization request init_request = { "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": { "protocolVersion": "2024-11-05", "capabilities": {}, "clientInfo": { "name": "MCP Tester", "version": "1.0.0" } } } # Send request writer.write(json.dumps(init_request).encode()) await writer.drain() # Read response data = await asyncio.wait_for(reader.read(1024), timeout=5.0) response = json.loads(data.decode()) # Check if initialization was successful if "result" in response: duration = time.time() - start_time writer.close() await writer.wait_closed() return TestResult( test_name="connection_test", success=True, duration=duration, details={ "host": host, "port": port, "response": response } ) else: duration = time.time() - start_time writer.close() await writer.wait_closed() return TestResult( test_name="connection_test", success=False, duration=duration, error_message=f"Server returned error: {response.get('error', 'Unknown error')}", details={ "host": host, "port": port, "response": response } ) except asyncio.TimeoutError: duration = time.time() - start_time return TestResult( test_name="connection_test", success=False, duration=duration, error_message="Connection timeout", details={"host": host, "port": port} ) except Exception as e: duration = time.time() - start_time return TestResult( test_name="connection_test", success=False, duration=duration, error_message=str(e), details={"host": host, "port": port} ) async def test_server_tools(self, host: str, port: int) -> TestResult: """Test listing tools from an MCP server.""" start_time = time.time() try: # Connect to server reader, writer = await asyncio.wait_for( asyncio.open_connection(host, port), timeout=5.0 ) # Send tools/list request tools_request = { "jsonrpc": "2.0", "id": 2, "method": "tools/list", "params": {} } # Send request writer.write(json.dumps(tools_request).encode()) await writer.drain() # Read response data = await asyncio.wait_for(reader.read(1024), timeout=5.0) response = json.loads(data.decode()) # Check if tools listing was successful if "result" in response and "tools" in response["result"]: tools = response["result"]["tools"] duration = time.time() - start_time writer.close() await writer.wait_closed() return TestResult( test_name="tools_test", success=True, duration=duration, details={ "host": host, "port": port, "tools_count": len(tools), "tools": tools } ) else: duration = time.time() - start_time writer.close() await writer.wait_closed() return TestResult( test_name="tools_test", success=False, duration=duration, error_message=f"Failed to list tools: {response.get('error', 'Unknown error')}", details={ "host": host, "port": port, "response": response } ) except Exception as e: duration = time.time() - start_time return TestResult( test_name="tools_test", success=False, duration=duration, error_message=str(e), details={"host": host, "port": port} ) async def test_tool_execution(self, host: str, port: int, tool_name: str, arguments: Dict[str, Any] = None) -> TestResult: """Test execution of a specific tool.""" start_time = time.time() if arguments is None: arguments = {} # Ensure arguments is a dict if not isinstance(arguments, dict): arguments = {} try: # Connect to server reader, writer = await asyncio.wait_for( asyncio.open_connection(host, port), timeout=5.0 ) # Send tools/call request call_request = { "jsonrpc": "2.0", "id": 3, "method": "tools/call", "params": { "name": tool_name, "arguments": arguments } } # Send request writer.write(json.dumps(call_request).encode()) await writer.drain() # Read response data = await asyncio.wait_for(reader.read(1024), timeout=10.0) response = json.loads(data.decode()) # Check if tool execution was successful if "result" in response: duration = time.time() - start_time writer.close() await writer.wait_closed() return TestResult( test_name=f"tool_execution_{tool_name}", success=True, duration=duration, details={ "host": host, "port": port, "tool_name": tool_name, "arguments": arguments, "response": response } ) else: duration = time.time() - start_time writer.close() await writer.wait_closed() return TestResult( test_name=f"tool_execution_{tool_name}", success=False, duration=duration, error_message=f"Tool execution failed: {response.get('error', 'Unknown error')}", details={ "host": host, "port": port, "tool_name": tool_name, "arguments": arguments, "response": response } ) except Exception as e: duration = time.time() - start_time return TestResult( test_name=f"tool_execution_{tool_name}", success=False, duration=duration, error_message=str(e), details={ "host": host, "port": port, "tool_name": tool_name, "arguments": arguments } ) async def run_comprehensive_test(self, server_id: str, server_name: str, host: str, port: int) -> ServerTestSuite: """Run a comprehensive test suite on an MCP server.""" start_time = time.time() results = [] logger.info(f"Starting comprehensive test for server: {server_name} ({host}:{port})") # Test 1: Connection test connection_result = await self.test_server_connection(host, port) results.append(connection_result) # Test 2: Tools listing test tools_result = await self.test_server_tools(host, port) results.append(tools_result) # Test 3: Tool execution tests (if tools are available) if tools_result.success and "tools" in tools_result.details: tools = tools_result.details["tools"] for tool in tools[:3]: # Test first 3 tools tool_name = tool.get("name", "unknown") tool_result = await self.test_tool_execution(host, port, tool_name) results.append(tool_result) # Calculate summary total_duration = time.time() - start_time tests_passed = sum(1 for r in results if r.success) tests_failed = len(results) - tests_passed test_suite = ServerTestSuite( server_id=server_id, server_name=server_name, tests_run=len(results), tests_passed=tests_passed, tests_failed=tests_failed, total_duration=total_duration, results=results, timestamp=datetime.now() ) # Store results self.test_results[server_id] = test_suite logger.info(f"Test suite completed for {server_name}: {tests_passed}/{len(results)} tests passed") return test_suite async def get_test_results(self, server_id: str) -> Optional[ServerTestSuite]: """Get test results for a specific server.""" return self.test_results.get(server_id) async def get_all_test_results(self) -> List[ServerTestSuite]: """Get all test results.""" return list(self.test_results.values()) async def clear_test_results(self, server_id: Optional[str] = None): """Clear test results.""" if server_id: self.test_results.pop(server_id, None) else: self.test_results.clear() class PerformanceMonitor: """Monitor performance metrics for MCP servers.""" def __init__(self): self.metrics: Dict[str, List[Dict[str, Any]]] = {} async def record_metric(self, server_id: str, metric_name: str, value: float, timestamp: Optional[datetime] = None): """Record a performance metric.""" if server_id not in self.metrics: self.metrics[server_id] = [] if timestamp is None: timestamp = datetime.now() self.metrics[server_id].append({ "metric_name": metric_name, "value": value, "timestamp": timestamp }) async def get_server_metrics(self, server_id: str, metric_name: Optional[str] = None) -> List[Dict[str, Any]]: """Get metrics for a specific server.""" if server_id not in self.metrics: return [] metrics = self.metrics[server_id] if metric_name: return [m for m in metrics if m["metric_name"] == metric_name] return metrics async def get_average_metric(self, server_id: str, metric_name: str) -> Optional[float]: """Get average value for a specific metric.""" metrics = await self.get_server_metrics(server_id, metric_name) if not metrics: return None values = [m["value"] for m in metrics] return sum(values) / len(values)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/BerdTan/mcpharness'

If you have feedback or need assistance with the MCP directory API, please join our Discord server