Skip to main content
Glama
test_mcp_simulation.pyβ€’47.5 kB
#!/usr/bin/env python3 """ MCP Simulation Test - Tier 3 Test Suite Tests MCP tool interface, server simulation, and real-world query scenarios. Ensures the MCP layer works correctly as clients would experience it. """ import os import sys import time import json from pathlib import Path from typing import Dict, Any, Tuple, List, Optional from concurrent.futures import ThreadPoolExecutor import importlib.util # Add src to path sys.path.insert(0, str(Path(__file__).parent.parent.parent / "src")) from tests.support.api_client import DuneTestClient, TestQueryManager from tests.support.helpers import PerformanceTimer, TestResultCollector, MCPToolSimulator, TestEnvironment from tests.support import QueryFactory, ExpectedResults, TestDataGenerator def load_env_variables(): """Load environment variables from .env file.""" env_file = Path(__file__).parent.parent.parent / ".env" if env_file.exists(): with open(env_file) as f: for line in f: line = line.strip() if line and not line.startswith("#") and "=" in line: key, value = line.split("=", 1) os.environ[key] = value def test_mcp_server_simulation() -> Tuple[bool, Dict[str, Any]]: """Simulate MCP server startup and tool discovery.""" print("πŸ–₯️ Testing MCP Server Simulation...") timer = PerformanceTimer() try: timer.start() # Load the MCP server components sys.path.insert(0, str(Path(__file__).parent.parent.parent / "src")) # Import configuration and tool classes from spice_mcp.config import Config, DuneConfig from spice_mcp.mcp.tools.execute_query import ExecuteQueryTool from spice_mcp.service_layer.query_service import QueryService from spice_mcp.logging.query_history import QueryHistory print(" βœ“ Successfully imported MCP components") # Simulate server configuration api_key = os.getenv("DUNE_API_KEY") if not api_key: return False, {"error": "DUNE_API_KEY required for MCP simulation"} print(f" βœ“ API key configured: {api_key[:8]}...") # Initialize server configuration config = Config(dune=DuneConfig(api_key=api_key)) timer.checkpoint("config_initialized") # Initialize services query_service = QueryService(config) timer.checkpoint("query_service_initialized") # Initialize query history from pathlib import Path as PathObj history_path = PathObj("/tmp") / "mcp_simulation_history.jsonl" artifact_root = PathObj("/tmp") / "mcp_simulation_artifacts" query_history = QueryHistory(history_path, artifact_root) timer.checkpoint("query_history_initialized") # Initialize tools execute_tool = ExecuteQueryTool(config, query_service, query_history) timer.checkpoint("tools_initialized") # Simulate tool discovery tools = { "execute_query": execute_tool } print(f" βœ“ Initialized {len(tools)} tools") # Validate tool schemas schema_validations = {} for tool_name, tool_instance in tools.items(): schema_validation = MCPToolSimulator.validate_tool_schema(tool_instance) schema_validations[tool_name] = schema_validation if schema_validation['valid']: print(f" βœ“ {tool_name} schema valid") else: print(f" βœ— {tool_name} schema invalid: {schema_validation['errors']}") timer.checkpoint("schemas_validated") # Check server health server_health = { "components_loaded": True, "tools_available": len(tools), "schemas_valid": all(v['valid'] for v in schema_validations.values()), "api_configured": bool(config.dune.api_key), "services_initialized": len([query_service, query_history]) } health_score = sum([ server_health["components_loaded"], server_health["tools_available"] > 0, server_health["schemas_valid"], server_health["api_configured"], server_health["services_initialized"] >= 2 ]) / 5 timer.stop() details = { "server_health": server_health, "health_score": health_score, "schema_validations": schema_validations, "tools_available": list(tools.keys()), "timings": timer.get_report() } return health_score >= 0.8, details except Exception as e: timer.stop() return False, {"error": str(e), "timings": timer.get_report()} def test_mcp_tool_parameter_validation() -> Tuple[bool, Dict[str, Any]]: """Test MCP tool parameter validation and schema compliance.""" print("πŸ”§ Testing MCP Tool Parameter Validation...") timer = PerformanceTimer() try: timer.start() # Import MCP components from spice_mcp.config import Config, DuneConfig from spice_mcp.mcp.tools.execute_query import ExecuteQueryTool from spice_mcp.service_layer.query_service import QueryService from spice_mcp.logging.query_history import QueryHistory from pathlib import Path as PathObj api_key = os.getenv("DUNE_API_KEY") config = Config(dune=DuneConfig(api_key=api_key)) # Initialize services query_service = QueryService(config) history_path = PathObj("/tmp") / "param_validation_history.jsonl" artifact_root = PathObj("/tmp") / "param_validation_artifacts" query_history = QueryHistory(history_path, artifact_root) # Initialize tool execute_tool = ExecuteQueryTool(config, query_service, query_history) # Get tool schema schema = execute_tool.get_parameter_schema() timer.checkpoint("schema_retrieved") print(f" βœ“ Retrieved tool schema: {schema.get('type', 'unknown')}") # Test various parameter combinations parameter_test_cases = [ { "name": "minimal_valid_params", "parameters": {"query": "SELECT 1 as test"}, "should_pass": True }, { "name": "full_valid_params", "parameters": { "query": "SELECT 1 as test, 'hello' as message", "parameters": {"test": "value"}, "performance": "medium", "limit": 10, "format": "json" }, "should_pass": True }, { "name": "missing_required_query", "parameters": {"limit": 10}, "should_pass": False }, { "name": "invalid_performance", "parameters": { "query": "SELECT 1", "performance": "invalid" }, "should_pass": False }, { "name": "invalid_format", "parameters": { "query": "SELECT 1", "format": "invalid" }, "should_pass": False }, { "name": "negative_limit", "parameters": { "query": "SELECT 1", "limit": -1 }, "should_pass": False } ] validation_results = [] for test_case in parameter_test_cases: test_name = test_case["name"] parameters = test_case["parameters"] should_pass = test_case["should_pass"] print(f" Testing {test_name}...") # Simulate parameter validation by calling the tool try: result = MCPToolSimulator.simulate_tool_call(execute_tool, parameters) validation_passed = ( (should_pass and result['success']) or (not should_pass and not result['success']) ) validation_results.append({ "test_name": test_name, "parameters": parameters, "should_pass": should_pass, "actual_success": result['success'], "validation_passed": validation_passed, "result": result }) status = "βœ“" if validation_passed else "βœ—" print(f" {status} {test_name}: {'passed' if validation_passed else 'failed'}") except Exception as e: # Handle exception for parameter validation if not should_pass: validation_results.append({ "test_name": test_name, "parameters": parameters, "should_pass": should_pass, "validation_passed": True, "error": str(e) }) print(f" βœ“ {test_name}: correctly rejected with exception") else: validation_results.append({ "test_name": test_name, "parameters": parameters, "should_pass": should_pass, "validation_passed": False, "error": str(e) }) print(f" βœ— {test_name}: unexpected rejection with exception: {e}") timer.checkpoint("parameter_tests_completed") # Analyze results passed_validations = sum(1 for r in validation_results if r['validation_passed']) total_validations = len(validation_results) validation_rate = passed_validations / total_validations timer.stop() print(f" Parameter validation: {passed_validations}/{total_validations} passed") details = { "tool_schema": schema, "validation_results": validation_results, "passed_validations": passed_validations, "total_validations": total_validations, "validation_rate": validation_rate, "timings": timer.get_report() } return validation_rate >= 0.8, details except Exception as e: timer.stop() return False, {"error": str(e), "timings": timer.get_report()} def test_mcp_request_response_cycle() -> Tuple[bool, Dict[str, Any]]: """Test request/response cycle as MCP client would see it.""" print("πŸ”„ Testing MCP Request/Response Cycle...") timer = PerformanceTimer() try: timer.start() # Import MCP components from spice_mcp.config import Config, DuneConfig from spice_mcp.mcp.tools.execute_query import ExecuteQueryTool from spice_mcp.service_layer.query_service import QueryService from spice_mcp.logging.query_history import QueryHistory from pathlib import Path as PathObj api_key = os.getenv("DUNE_API_KEY") config = Config(dune=DuneConfig(api_key=api_key)) # Initialize services query_service = QueryService(config) history_path = PathObj("/tmp") / "request_response_history.jsonl" artifact_root = PathObj("/tmp") / "request_response_artifacts" query_history = QueryHistory(history_path, artifact_root) # Initialize tool execute_tool = ExecuteQueryTool(config, query_service, query_history) # Test request/response scenarios request_scenarios = [ { "name": "simple_query_request", "request": { "query": "SELECT 1 as test_col, 'hello' as message", "limit": 5 }, "expected_response_keys": ["ok", "error"] }, { "name": "parameterized_query_request", "request": { "query": "SELECT {{test_param}} as value, 42 as constant", "parameters": {"test_param": "hello_world"}, "limit": 10 }, "expected_response_keys": ["ok", "error"] }, { "name": "performance_level_request", "request": { "query": QueryFactory.aggregate_query(), "performance": "medium", "limit": 1 }, "expected_response_keys": ["ok", "error"] } ] response_validations = [] for scenario in request_scenarios: scenario_name = scenario["name"] request = scenario["request"] expected_keys = scenario["expected_response_keys"] print(f" Testing {scenario_name}...") try: # Simulate MCP request response_result = MCPToolSimulator.simulate_tool_call(execute_tool, request) # Validate response structure if response_result['success']: response_data = response_result['data'] # Check expected keys exist missing_keys = [key for key in expected_keys if key not in response_data] response_validations.append({ "scenario_name": scenario_name, "request": request, "response_success": response_result['success'], "response_data": response_data, "missing_keys": missing_keys, "validation_passed": len(missing_keys) == 0, "result": response_result }) print(f" βœ“ {scenario_name}: response structure valid") # Show snippet of response if 'preview' in response_data: preview = response_data['preview'] print(f" Preview: {preview.get('rowcount', 'unknown')} rows") else: response_validations.append({ "scenario_name": scenario_name, "request": request, "response_success": False, "error": response_result.get('error', 'Unknown error'), "validation_passed": False # Failed response is validation failure for these test cases }) print(f" βœ— {scenario_name}: request failed - {response_result.get('error', 'Unknown')}") except Exception as e: response_validations.append({ "scenario_name": scenario_name, "request": request, "response_success": False, "error": str(e), "validation_passed": False, "exception": True }) print(f" βœ— {scenario_name}: exception - {e}") timer.checkpoint("request_response_tests_completed") # Analyze results successful_responses = sum(1 for r in response_validations if r['validation_passed']) total_responses = len(response_validations) success_rate = successful_responses / total_responses timer.stop() print(f" Request/Response cycle: {successful_responses}/{total_responses} successful") details = { "response_validations": response_validations, "successful_responses": successful_responses, "total_responses": total_responses, "success_rate": success_rate, "timings": timer.get_report() } return success_rate >= 0.7, details # Allow some leeway for API variability except Exception as e: timer.stop() return False, {"error": str(e), "timings": timer.get_report()} def test_real_world_query_scenarios() -> Tuple[bool, Dict[str, Any]]: """Test real-world query scenarios that would be used in production.""" print("🌍 Testing Real-World Query Scenarios...") timer = PerformanceTimer() try: timer.start() # Import MCP components from spice_mcp.config import Config, DuneConfig from spice_mcp.mcp.tools.execute_query import ExecuteQueryTool from spice_mcp.service_layer.query_service import QueryService from spice_mcp.logging.query_history import QueryHistory from pathlib import Path as PathObj api_key = os.getenv("DUNE_API_KEY") config = Config(dune=DuneConfig(api_key=api_key)) # Initialize services query_service = QueryService(config) history_path = PathObj("/tmp") / "real_world_history.jsonl" artifact_root = PathObj("/tmp") / "real_world_artifacts" query_history = QueryHistory(history_path, artifact_root) # Initialize tool execute_tool = ExecuteQueryTool(config, query_service, query_history) # Real-world query scenarios real_world_scenarios = [ { "name": "analytics_dashboard_query", "description": "Typical analytics dashboard query with aggregations", "request": { "query": """ SELECT DATE_TRUNC('day', created_at) as date, COUNT(*) as total_users, SUM(CASE WHEN active THEN 1 ELSE 0 END) as active_users, AVG(session_duration) as avg_session FROM user_activity WHERE created_at >= '2024-01-01' GROUP BY date ORDER BY date DESC LIMIT 30 """, "performance": "medium", "limit": 5 }, "expected_structure": ["date", "total", "active", "duration"] }, { "name": "financial_summary_query", "description": "Financial summary with multiple calculations", "request": { "query": """ SELECT SUM(amount) as total_revenue, AVG(amount) as avg_transaction, COUNT(*) as transaction_count, MIN(amount) as min_amount, MAX(amount) as max_amount FROM ( SELECT 100 + (n * 10) as amount FROM generate_series(1, 100) as n ) transactions """, "performance": "medium", "limit": 1 }, "expected_structure": ["revenue", "avg", "count", "min", "max"] }, { "name": "time_series_analysis", "description": "Time series analysis for trend identification", "request": { "query": """ SELECT DATE_TRUNC('week', event_date) as week, category, COUNT(*) as events, SUM(value) as total_value FROM ( SELECT DATE '2024-01-01' + INTERVAL '1 day' * n as event_date, CASE WHEN n % 3 = 0 THEN 'A' WHEN n % 3 = 1 THEN 'B' ELSE 'C' END as category, n * 5 as value FROM generate_series(1, 30) as n ) event_data GROUP BY week, category ORDER BY week DESC, category LIMIT 10 """, "performance": "medium", "limit": 10 }, "expected_structure": ["week", "category", "events", "value"] } ] scenario_results = [] for scenario in real_world_scenarios: scenario_name = scenario["name"] description = scenario["description"] request = scenario["request"] expected_structure = scenario["expected_structure"] print(f" Testing {scenario_name}: {description}") try: # Execute real-world query scenario result = MCPToolSimulator.simulate_tool_call(execute_tool, request) if result['success']: response_data = result['data'] # Validate response structure structure_validation = True if 'preview' in response_data: columns = response_data['preview'].get('columns', []) # Check if expected structure keywords are present missing_structure = [ keyword for keyword in expected_structure if not any(keyword.lower() in col.lower() for col in columns) ] structure_validation = len(missing_structure) == 0 if missing_structure: print(f" ⚠ Missing expected structure: {missing_structure}") scenario_results.append({ "scenario_name": scenario_name, "description": description, "request": request, "success": result['success'], "response_data": response_data, "structure_validation": structure_validation, "validation_passed": structure_validation, "result": result }) status = "βœ“" if structure_validation else "⚠" print(f" {status} {scenario_name}: structure {'validated' if structure_validation else 'needs review'}") # Show preview if 'preview' in response_data: preview = response_data['preview'] print(f" Result: {preview.get('rowcount', 'unknown')} rows, {len(preview.get('columns', []))} columns") else: scenario_results.append({ "scenario_name": scenario_name, "description": description, "request": request, "success": False, "error": result.get('error', 'Unknown error'), "validation_passed": False }) print(f" βœ— {scenario_name}: failed - {result.get('error', 'Unknown')}") except Exception as e: scenario_results.append({ "scenario_name": scenario_name, "description": description, "request": request, "success": False, "error": str(e), "validation_passed": False, "exception": True }) print(f" βœ— {scenario_name}: exception - {e}") timer.checkpoint("real_world_scenarios_completed") # Analyze results successful_scenarios = sum(1 for r in scenario_results if r.get('validation_passed', False)) total_scenarios = len(scenario_results) success_rate = successful_scenarios / total_scenarios timer.stop() print(f" Real-world scenarios: {successful_scenarios}/{total_scenarios} successful") details = { "scenario_results": scenario_results, "successful_scenarios": successful_scenarios, "total_scenarios": total_scenarios, "success_rate": success_rate, "timings": timer.get_report() } return success_rate >= 0.6, details # Allow flexibility for real-world variability except Exception as e: timer.stop() return False, {"error": str(e), "timings": timer.get_report()} def test_mcp_error_handling() -> Tuple[bool, Dict[str, Any]]: """Test error handling through the MCP interface.""" print("⚠️ Testing MCP Error Handling...") timer = PerformanceTimer() try: timer.start() # Import MCP components from spice_mcp.config import Config, DuneConfig from spice_mcp.mcp.tools.execute_query import ExecuteQueryTool from spice_mcp.service_layer.query_service import QueryService from spice_mcp.logging.query_history import QueryHistory from pathlib import Path as PathObj api_key = os.getenv("DUNE_API_KEY") config = Config(dune=DuneConfig(api_key=api_key)) # Initialize services query_service = QueryService(config) history_path = PathObj("/tmp") / "error_handling_history.jsonl" artifact_root = PathObj("/tmp") / "error_handling_artifacts" query_history = QueryHistory(history_path, artifact_root) # Initialize tool execute_tool = ExecuteQueryTool(config, query_service, query_history) # Error scenarios to test error_scenarios = [ { "name": "invalid_sql_syntax", "request": { "query": "SELECTTTT INVALID SQL SYNTAX" }, "expected_error_type": "sql_syntax" }, { "name": "missing_required_parameters", "request": { "query": "SELECT {{missing_param}} as value" }, "expected_error_type": "missing_parameter" }, { "name": "empty_query", "request": { "query": "" }, "expected_error_type": "empty_query" }, { "name": "invalid_parameter_values", "request": { "query": "SELECT 1 as test", "limit": "invalid_number" }, "expected_error_type": "invalid_parameter" }, { "name": "timeout_simulation", "request": { "query": "SELECT 1", # This should work, but we'll test error response format "timeout_seconds": 0.001 # Very short timeout to force error }, "expected_error_type": "timeout" } ] error_handling_results = [] for scenario in error_scenarios: scenario_name = scenario["name"] request = scenario["request"] expected_error_type = scenario["expected_error_type"] print(f" Testing {scenario_name}...") try: # Simulate error scenario result = MCPToolSimulator.simulate_tool_call(execute_tool, request) if result['success']: # Unexpected success - this might be valid for some error scenarios error_handling_results.append({ "scenario_name": scenario_name, "request": request, "expected_error_type": expected_error_type, "actual_success": True, "error_handling_passed": False, # Expected error but got success "reason": "Expected error but got success", "result": result }) print(f" ⚠ {scenario_name}: got success instead of expected error") else: # Expected failure - check error format error_info = result.get('error', 'Unknown error') error_type = result.get('error_type', 'Unknown') # Validate error information has_error_info = bool(error_info) has_error_type = bool(error_type) error_handling_passed = has_error_info and has_error_type error_handling_results.append({ "scenario_name": scenario_name, "request": request, "expected_error_type": expected_error_type, "actual_success": False, "actual_error_type": error_type, "error_info": error_info, "error_handling_passed": error_handling_passed, "result": result }) status = "βœ“" if error_handling_passed else "⚠" print(f" {status} {scenario_name}: {error_type} - {error_info[:100]}...") except Exception as e: # Exception in error handling is not ideal but acceptable error_handling_results.append({ "scenario_name": scenario_name, "request": request, "expected_error_type": expected_error_type, "actual_success": False, "exception": str(e), "error_handling_passed": True, # Exception indicates error was caught "result": "exception" }) print(f" βœ“ {scenario_name}: exception caught - {str(e)[:100]}...") timer.checkpoint("error_scenarios_completed") # Analyze error handling results proper_error_handling = sum(1 for r in error_handling_results if r['error_handling_passed']) total_scenarios = len(error_handling_results) error_handling_rate = proper_error_handling / total_scenarios timer.stop() print(f" Error handling: {proper_error_handling}/{total_scenarios} handled correctly") details = { "error_handling_results": error_handling_results, "proper_error_handling": proper_error_handling, "total_scenarios": total_scenarios, "error_handling_rate": error_handling_rate, "timings": timer.get_report() } return error_handling_rate >= 0.8, details except Exception as e: timer.stop() return False, {"error": str(e), "timings": timer.get_report()} def test_concurrent_mcp_usage() -> Tuple[bool, Dict[str, Any]]: """Test concurrent MCP tool usage scenarios.""" print("πŸš€ Testing Concurrent MCP Usage...") timer = PerformanceTimer() try: timer.start() # Import MCP components from spice_mcp.config import Config, DuneConfig from spice_mcp.mcp.tools.execute_query import ExecuteQueryTool from spice_mcp.service_layer.query_service import QueryService from spice_mcp.logging.query_history import QueryHistory from pathlib import Path as PathObj api_key = os.getenv("DUNE_API_KEY") config = Config(dune=DuneConfig(api_key=api_key)) # Initialize services query_service = QueryService(config) history_path = PathObj("/tmp") / "concurrent_mcp_history.jsonl" artifact_root = PathObj("/tmp") / "concurrent_mcp_artifacts" query_history = QueryHistory(history_path, artifact_root) # Initialize tool execute_tool = ExecuteQueryTool(config, query_service, query_history) # Test concurrent usage scenarios concurrent_scenarios = [ { "name": "simple_concurrent_queries", "concurrent_requests": [ {"query": f"SELECT {i} as query_id, 'concurrent {i}' as description", "limit": 5} for i in range(3) ] }, { "name": "mixed_concurrent_queries", "concurrent_requests": [ {"query": "SELECT 1 as simple", "performance": "low"}, {"query": "SELECT 2 as medium, 3 as value", "performance": "medium"}, {"query": "SELECT SUM(n) as total FROM generate_series(1, 10) as n", "performance": "medium"} ] } ] concurrent_results = [] for scenario in concurrent_scenarios: scenario_name = scenario["name"] requests = scenario["concurrent_requests"] print(f" Testing {scenario_name} with {len(requests)} concurrent requests...") try: # Execute requests concurrently def execute_single_request(request_data, index): """Execute a single MCP request.""" return MCPToolSimulator.simulate_tool_call(execute_tool, request_data), index with ThreadPoolExecutor(max_workers=len(requests)) as executor: futures = [ executor.submit(execute_single_request, req, i) for i, req in enumerate(requests) ] # Collect results scenario_results = [] for future in futures: try: result, index = future.result(timeout=120) # 2 minute timeout scenario_results.append({ "index": index, "result": result, "success": result.get('success', False) }) except Exception as e: scenario_results.append({ "index": index, "result": {"success": False, "error": str(e)}, "success": False, "exception": True }) # Analyze concurrent execution successful_requests = [r for r in scenario_results if r['success']] failed_requests = [r for r in scenario_results if not r['success']] concurrent_results.append({ "scenario_name": scenario_name, "total_requests": len(requests), "successful_requests": len(successful_requests), "failed_requests": len(failed_requests), "success_rate": len(successful_requests) / len(requests), "results": scenario_results }) print(f" βœ“ {scenario_name}: {len(successful_requests)}/{len(requests)} successful") except Exception as e: concurrent_results.append({ "scenario_name": scenario_name, "total_requests": len(requests), "successful_requests": 0, "failed_requests": len(requests), "success_rate": 0.0, "error": str(e) }) print(f" βœ— {scenario_name}: failed - {e}") timer.checkpoint("concurrent_scenarios_completed") # Analyze overall concurrent usage total_concurrent_requests = sum(r['total_requests'] for r in concurrent_results) total_successful_requests = sum(r['successful_requests'] for r in concurrent_results) overall_success_rate = total_successful_requests / total_concurrent_requests if total_concurrent_requests > 0 else 0 timer.stop() print(f" Concurrent usage: {total_successful_requests}/{total_concurrent_requests} requests successful") details = { "concurrent_results": concurrent_results, "total_concurrent_requests": total_concurrent_requests, "total_successful_requests": total_successful_requests, "overall_success_rate": overall_success_rate, "timings": timer.get_report() } return overall_success_rate >= 0.7, details # 70% success rate for concurrent scenarios except Exception as e: timer.stop() return False, {"error": str(e), "timings": timer.get_report()} def test_mcp_performance_characterization() -> Tuple[bool, Dict[str, Any]]: """Test performance characterization of MCP tool layer.""" print("πŸ“Š Testing MCP Performance Characterization...") timer = PerformanceTimer() try: timer.start() # Import MCP components from spice_mcp.config import Config, DuneConfig from spice_mcp.mcp.tools.execute_query import ExecuteQueryTool from spice_mcp.service_layer.query_service import QueryService from spice_mcp.logging.query_history import QueryHistory from pathlib import Path as PathObj api_key = os.getenv("DUNE_API_KEY") config = Config(dune=DuneConfig(api_key=api_key)) # Initialize services query_service = QueryService(config) history_path = PathObj("/tmp") / "performance_history.jsonl" artifact_root = PathObj("/tmp") / "performance_artifacts" query_history = QueryHistory(history_path, artifact_root) # Initialize tool execute_tool = ExecuteQueryTool(config, query_service, query_history) # Performance test scenarios performance_scenarios = [ { "name": "tool_initialization", "test_function": lambda: ExecuteQueryTool(config, query_service, query_history), "expected_max_time": 5.0, "iterations": 5 }, { "name": "schema_retrieval", "test_function": lambda: execute_tool.get_parameter_schema(), "expected_max_time": 1.0, "iterations": 10 }, { "name": "simple_query_execution", "test_function": lambda: MCPToolSimulator.simulate_tool_call( execute_tool, {"query": "SELECT 1 as test_col", "limit": 1} ), "expected_max_time": 20.0, "iterations": 3 }, { "name": "parameterized_query_execution", "test_function": lambda: MCPToolSimulator.simulate_tool_call( execute_tool, { "query": "SELECT {{param}} as value", "parameters": {"param": "test_value"}, "limit": 1 } ), "expected_max_time": 25.0, "iterations": 3 } ] performance_results = [] for scenario in performance_scenarios: scenario_name = scenario["name"] test_function = scenario["test_function"] expected_max_time = scenario["expected_max_time"] iterations = scenario["iterations"] print(f" Testing {scenario_name} ({iterations} iterations)...") execution_times = [] successes = 0 for i in range(iterations): iteration_timer = PerformanceTimer() iteration_timer.start() try: result = test_function() iteration_timer.stop() # For query executions, check success if isinstance(result, dict): if 'success' in result: success = result['success'] else: # Tool initialization, schema retrieval usually succeed success = True else: # Tool objects success = True execution_times.append(iteration_timer.duration) if success: successes += 1 status = "βœ“" if success else "βœ—" print(f" {status} Iteration {i+1}: {iteration_timer.duration:.3f}s") except Exception as e: iteration_timer.stop() execution_times.append(iteration_timer.duration) print(f" βœ— Iteration {i+1}: exception - {str(e)[:50]}...") # Calculate scenario statistics if execution_times: avg_time = sum(execution_times) / len(execution_times) min_time = min(execution_times) max_time = max(execution_times) time_variance = max_time - min_time scenario_performance = { "scenario_name": scenario_name, "iterations": iterations, "execution_times": execution_times, "avg_time": avg_time, "min_time": min_time, "max_time": max_time, "time_variance": time_variance, "successes": successes, "success_rate": successes / iterations, "within_expected_time": avg_time <= expected_max_time } performance_results.append(scenario_performance) time_status = "βœ“" if scenario_performance['within_expected_time'] else "⚠" print(f" {time_status} {scenario_name}: avg {avg_time:.3f}s (expected ≀{expected_max_time}s)") timer.checkpoint("performance_tests_completed") # Analyze overall performance performance_summary = { "total_scenarios": len(performance_results), "scenarios_within_expected_time": sum(1 for r in performance_results if r['within_expected_time']), "overall_success_rate": sum(r['success_rate'] for r in performance_results) / len(performance_results) if performance_results else 0 } performance_ok = ( performance_summary['scenarios_within_expected_time'] >= performance_summary['total_scenarios'] * 0.75 and performance_summary['overall_success_rate'] >= 0.8 ) timer.stop() print(f" Performance summary: {performance_summary['scenarios_within_expected_time']}/{performance_summary['total_scenarios']} scenarios within expected time") print(f" Overall success rate: {performance_summary['overall_success_rate']:.1%}") details = { "performance_results": performance_results, "performance_summary": performance_summary, "performance_ok": performance_ok, "timings": timer.get_report() } return performance_ok, details except Exception as e: timer.stop() return False, {"error": str(e), "timings": timer.get_report()} def main(): """Run MCP simulation test suite.""" print("πŸ–₯️ DUNE MCP SIMULATION TEST SUITE") print("=" * 50) # Load environment load_env_variables() # Check API key if not os.getenv("DUNE_API_KEY"): print("❌ DUNE_API_KEY not found. Please set it in your environment or .env file.") return False # Initialize result collector results = TestResultCollector() results.start_collection() # Run MCP simulation tests tests = [ ("MCP Server Simulation", test_mcp_server_simulation), ("MCP Tool Parameter Validation", test_mcp_tool_parameter_validation), ("MCP Request/Response Cycle", test_mcp_request_response_cycle), ("Real-World Query Scenarios", test_real_world_query_scenarios), ("MCP Error Handling", test_mcp_error_handling), ("Concurrent MCP Usage", test_concurrent_mcp_usage), ("MCP Performance Characterization", test_mcp_performance_characterization), ] passed = 0 total = len(tests) for test_name, test_func in tests: print(f"\n{'-' * 40}") try: success, details = test_func() results.add_result(test_name, success, details) if success: print(f"βœ… {test_name} PASSED") passed += 1 else: error = details.get('error', 'Unknown error') print(f"❌ {test_name} FAILED: {error}") except Exception as e: results.add_result(test_name, False, {"error": str(e)}) print(f"❌ {test_name} EXCEPTION: {e}") results.finish_collection() summary = results.get_summary() # Summary print(f"\n{'=' * 50}") print("🎯 MCP SIMULATION TEST SUMMARY") print(f"βœ… {passed}/{total} tests passed") print(f"⏱️ Total duration: {summary['duration']:.2f}s") if passed >= total * 0.8: # 80% pass rate for simulation tests print("πŸŽ‰ MCP simulation tests passed! Interface is ready for client usage.") return True else: print("⚠️ Some MCP simulation tests failed. Review interface issues.") return False if __name__ == "__main__": success = main() sys.exit(0 if success else 1)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Evan-Kim2028/spice-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server