Skip to main content
Glama
performance_benchmark_comprehensive.py44.9 kB
#!/usr/bin/env python3 """ Comprehensive Performance Benchmark for MCP Collective Intelligence Tools This module provides comprehensive performance testing for the 3 working MCP tools: 1. ensemble_reasoning 2. adaptive_model_selection 3. cross_model_validation Tests include: - Response time analysis under different loads - Throughput testing - Concurrent request handling - Memory and CPU usage monitoring - Quality vs speed trade-offs - Tool comparison analysis """ import asyncio import time import json import psutil import statistics import threading from datetime import datetime, timedelta from typing import Dict, List, Any, Optional, Tuple from dataclasses import dataclass, field from pathlib import Path import logging import sys import tracemalloc from concurrent.futures import ThreadPoolExecutor, as_completed # Import MCP tools for testing sys.path.append(str(Path(__file__).parent / "src")) from openrouter_mcp.collective_intelligence.ensemble_reasoning import EnsembleReasoner from openrouter_mcp.collective_intelligence.adaptive_router import AdaptiveRouter from openrouter_mcp.collective_intelligence.cross_validator import CrossValidator from openrouter_mcp.collective_intelligence.base import ( TaskContext, TaskType, ModelProvider, ModelInfo, ModelCapability ) # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) @dataclass class PerformanceMetrics: """Container for performance metrics.""" tool_name: str load_level: int response_times: List[float] = field(default_factory=list) throughput: float = 0.0 success_rate: float = 0.0 error_rate: float = 0.0 avg_response_time: float = 0.0 min_response_time: float = 0.0 max_response_time: float = 0.0 p95_response_time: float = 0.0 p99_response_time: float = 0.0 memory_usage_mb: float = 0.0 cpu_usage_percent: float = 0.0 total_requests: int = 0 successful_requests: int = 0 failed_requests: int = 0 test_duration: float = 0.0 timestamp: datetime = field(default_factory=datetime.now) @dataclass class MockModelInfo: """Mock model info for testing.""" model_id: str name: str capabilities: Dict[ModelCapability, float] availability: float = 0.95 accuracy_score: float = 0.85 cost_per_token: float = 0.001 response_time_avg: float = 1.5 class MockModelProvider(ModelProvider): """Mock model provider for testing without API calls.""" def __init__(self): self.models = self._create_mock_models() self.call_count = 0 self.error_rate = 0.05 # 5% error rate for realistic testing def _create_mock_models(self) -> List[MockModelInfo]: """Create mock models with different capabilities.""" return [ MockModelInfo( model_id="mock-gpt-4", name="Mock GPT-4", capabilities={ ModelCapability.REASONING: 0.95, ModelCapability.CREATIVITY: 0.90, ModelCapability.ACCURACY: 0.92, ModelCapability.CODE: 0.88 }, response_time_avg=2.1 ), MockModelInfo( model_id="mock-claude-3", name="Mock Claude-3", capabilities={ ModelCapability.REASONING: 0.93, ModelCapability.ACCURACY: 0.95, ModelCapability.CREATIVITY: 0.87, ModelCapability.CODE: 0.85 }, response_time_avg=1.8 ), MockModelInfo( model_id="mock-gemini-pro", name="Mock Gemini Pro", capabilities={ ModelCapability.REASONING: 0.89, ModelCapability.MATH: 0.94, ModelCapability.ACCURACY: 0.88, ModelCapability.CODE: 0.90 }, response_time_avg=1.5 ), MockModelInfo( model_id="mock-fast-model", name="Mock Fast Model", capabilities={ ModelCapability.REASONING: 0.75, ModelCapability.CREATIVITY: 0.80, ModelCapability.ACCURACY: 0.78, }, response_time_avg=0.8, cost_per_token=0.0005 ), MockModelInfo( model_id="mock-premium-model", name="Mock Premium Model", capabilities={ ModelCapability.REASONING: 0.98, ModelCapability.CREATIVITY: 0.96, ModelCapability.ACCURACY: 0.97, ModelCapability.CODE: 0.95, ModelCapability.MATH: 0.93 }, response_time_avg=3.2, cost_per_token=0.005 ) ] async def get_available_models(self) -> List[ModelInfo]: """Return mock models.""" # Simulate API call delay await asyncio.sleep(0.1) return self.models async def process_task(self, task: TaskContext, model_id: str) -> Any: """Mock task processing with realistic delays and occasional failures.""" self.call_count += 1 # Find the model model = next((m for m in self.models if m.model_id == model_id), None) if not model: raise ValueError(f"Model {model_id} not found") # Simulate processing time processing_time = model.response_time_avg + (hash(task.content) % 100) / 100.0 await asyncio.sleep(processing_time) # Simulate occasional failures import random if random.random() < self.error_rate: raise Exception(f"Mock error in {model_id}") # Create mock result from openrouter_mcp.collective_intelligence.base import ProcessingResult return ProcessingResult( task_id=task.task_id, model_id=model_id, content=f"Mock response from {model.name} for: {task.content[:50]}...", confidence=0.8 + (hash(task.content) % 20) / 100.0, processing_time=processing_time, metadata={ "mock": True, "model_name": model.name, "task_type": task.task_type.value } ) class ResourceMonitor: """Monitor system resources during testing.""" def __init__(self): self.monitoring = False self.cpu_samples = [] self.memory_samples = [] self.monitor_thread = None def start_monitoring(self): """Start resource monitoring.""" self.monitoring = True self.cpu_samples = [] self.memory_samples = [] self.monitor_thread = threading.Thread(target=self._monitor_loop) self.monitor_thread.start() def stop_monitoring(self) -> Tuple[float, float]: """Stop monitoring and return average CPU and memory usage.""" self.monitoring = False if self.monitor_thread: self.monitor_thread.join() avg_cpu = statistics.mean(self.cpu_samples) if self.cpu_samples else 0.0 avg_memory = statistics.mean(self.memory_samples) if self.memory_samples else 0.0 return avg_cpu, avg_memory def _monitor_loop(self): """Monitor system resources in a loop.""" process = psutil.Process() while self.monitoring: try: # Get CPU percentage for current process cpu_percent = process.cpu_percent() self.cpu_samples.append(cpu_percent) # Get memory usage in MB memory_info = process.memory_info() memory_mb = memory_info.rss / (1024 * 1024) self.memory_samples.append(memory_mb) time.sleep(0.1) # Sample every 100ms except Exception as e: logger.warning(f"Error monitoring resources: {e}") class PerformanceBenchmark: """Main performance benchmark class.""" def __init__(self): self.mock_provider = MockModelProvider() self.resource_monitor = ResourceMonitor() self.results: List[PerformanceMetrics] = [] # Initialize tools with mock provider self.tools = { "ensemble_reasoning": EnsembleReasoner(self.mock_provider), "adaptive_model_selection": AdaptiveRouter(self.mock_provider), "cross_model_validation": CrossValidator(self.mock_provider) } # Test scenarios self.test_scenarios = [ { "name": "Simple Question", "content": "What is the capital of France?", "task_type": TaskType.FACTUAL, "expected_complexity": "low" }, { "name": "Complex Reasoning", "content": "Analyze the economic implications of renewable energy adoption on traditional energy sectors, considering both short-term disruptions and long-term benefits. Provide a balanced assessment.", "task_type": TaskType.REASONING, "expected_complexity": "high" }, { "name": "Code Generation", "content": "Create a Python function that implements a binary search algorithm with error handling, documentation, and unit tests. Optimize for both readability and performance.", "task_type": TaskType.CODE_GENERATION, "expected_complexity": "medium" }, { "name": "Creative Task", "content": "Write a short story about artificial intelligence discovering emotions, incorporating themes of consciousness, empathy, and the nature of humanity.", "task_type": TaskType.CREATIVE, "expected_complexity": "medium" }, { "name": "Analysis Task", "content": "Compare the effectiveness of different machine learning algorithms for time series forecasting, including LSTM, ARIMA, and Prophet. Consider accuracy, computational efficiency, and interpretability.", "task_type": TaskType.ANALYSIS, "expected_complexity": "high" } ] async def run_comprehensive_benchmark(self) -> Dict[str, Any]: """Run comprehensive performance benchmark.""" logger.info("Starting comprehensive performance benchmark") benchmark_start = datetime.now() all_results = {} # Test each tool with different load levels load_levels = [1, 5, 10] for tool_name, tool in self.tools.items(): logger.info(f"\nTesting tool: {tool_name}") tool_results = {} for load_level in load_levels: logger.info(f" Testing with {load_level} concurrent requests") # Run benchmark for this tool and load level metrics = await self._benchmark_tool_with_load(tool_name, tool, load_level) tool_results[f"load_{load_level}"] = metrics self.results.append(metrics) # Small delay between tests await asyncio.sleep(1.0) all_results[tool_name] = tool_results benchmark_end = datetime.now() total_duration = (benchmark_end - benchmark_start).total_seconds() # Generate comprehensive report report = self._generate_comprehensive_report(all_results, total_duration) # Save results await self._save_results(all_results, report) logger.info(f"Comprehensive benchmark completed in {total_duration:.2f} seconds") return report async def _benchmark_tool_with_load( self, tool_name: str, tool: Any, load_level: int ) -> PerformanceMetrics: """Benchmark a specific tool with a given load level.""" # Start resource monitoring self.resource_monitor.start_monitoring() # Start memory tracking tracemalloc.start() test_start = time.time() response_times = [] successful_requests = 0 failed_requests = 0 try: # Create tasks for concurrent execution tasks = [] for i in range(load_level): # Use different scenarios for variety scenario = self.test_scenarios[i % len(self.test_scenarios)] task_context = TaskContext( task_id=f"{tool_name}_load_{load_level}_req_{i}", task_type=scenario["task_type"], content=scenario["content"], metadata={"scenario": scenario["name"], "load_test": True} ) # Create the actual async task async_task = self._execute_single_request(tool, task_context) tasks.append(async_task) # Execute all tasks concurrently start_time = time.time() results = await asyncio.gather(*tasks, return_exceptions=True) end_time = time.time() # Process results for result in results: if isinstance(result, Exception): failed_requests += 1 logger.warning(f"Request failed: {result}") else: successful_requests += 1 response_times.append(result) test_duration = end_time - start_time except Exception as e: logger.error(f"Benchmark failed for {tool_name} with load {load_level}: {e}") test_duration = time.time() - test_start failed_requests = load_level finally: # Stop monitoring avg_cpu, avg_memory = self.resource_monitor.stop_monitoring() # Get memory usage current, peak = tracemalloc.get_traced_memory() tracemalloc.stop() memory_usage_mb = peak / (1024 * 1024) # Calculate metrics total_requests = successful_requests + failed_requests success_rate = successful_requests / total_requests if total_requests > 0 else 0.0 error_rate = failed_requests / total_requests if total_requests > 0 else 0.0 throughput = successful_requests / test_duration if test_duration > 0 else 0.0 # Response time statistics if response_times: avg_response_time = statistics.mean(response_times) min_response_time = min(response_times) max_response_time = max(response_times) p95_response_time = statistics.quantiles(response_times, n=20)[18] if len(response_times) >= 20 else max_response_time p99_response_time = statistics.quantiles(response_times, n=100)[98] if len(response_times) >= 100 else max_response_time else: avg_response_time = min_response_time = max_response_time = p95_response_time = p99_response_time = 0.0 return PerformanceMetrics( tool_name=tool_name, load_level=load_level, response_times=response_times, throughput=throughput, success_rate=success_rate, error_rate=error_rate, avg_response_time=avg_response_time, min_response_time=min_response_time, max_response_time=max_response_time, p95_response_time=p95_response_time, p99_response_time=p99_response_time, memory_usage_mb=max(memory_usage_mb, avg_memory), cpu_usage_percent=avg_cpu, total_requests=total_requests, successful_requests=successful_requests, failed_requests=failed_requests, test_duration=test_duration ) async def _execute_single_request(self, tool: Any, task_context: TaskContext) -> float: """Execute a single request and return response time.""" start_time = time.time() try: # All tools have a process method result = await tool.process(task_context) return time.time() - start_time except Exception as e: logger.debug(f"Request failed: {e}") raise def _generate_comprehensive_report( self, all_results: Dict[str, Any], total_duration: float ) -> Dict[str, Any]: """Generate comprehensive performance analysis report.""" report = { "benchmark_summary": { "timestamp": datetime.now().isoformat(), "total_duration_seconds": total_duration, "tools_tested": list(all_results.keys()), "load_levels_tested": [1, 5, 10], "scenarios_tested": len(self.test_scenarios), "total_test_runs": len(self.results) }, "performance_analysis": {}, "comparison_analysis": {}, "recommendations": {}, "detailed_metrics": {} } # Analyze each tool's performance for tool_name, tool_results in all_results.items(): tool_analysis = self._analyze_tool_performance(tool_name, tool_results) report["performance_analysis"][tool_name] = tool_analysis # Cross-tool comparison report["comparison_analysis"] = self._generate_comparison_analysis(all_results) # Performance recommendations report["recommendations"] = self._generate_recommendations(all_results) # Store detailed metrics report["detailed_metrics"] = self._format_detailed_metrics(all_results) return report def _analyze_tool_performance(self, tool_name: str, tool_results: Dict[str, PerformanceMetrics]) -> Dict[str, Any]: """Analyze performance characteristics of a single tool.""" analysis = { "scalability": {}, "reliability": {}, "efficiency": {}, "resource_usage": {} } load_levels = [1, 5, 10] # Scalability analysis throughputs = [tool_results[f"load_{load}"].throughput for load in load_levels] response_times = [tool_results[f"load_{load}"].avg_response_time for load in load_levels] # Calculate scalability score (how well throughput scales with load) scalability_score = self._calculate_scalability_score(load_levels, throughputs) analysis["scalability"] = { "score": scalability_score, "throughput_trend": self._calculate_trend(load_levels, throughputs), "response_time_trend": self._calculate_trend(load_levels, response_times), "max_throughput": max(throughputs), "throughput_degradation": (throughputs[0] - throughputs[-1]) / throughputs[0] if throughputs[0] > 0 else 0 } # Reliability analysis success_rates = [tool_results[f"load_{load}"].success_rate for load in load_levels] error_rates = [tool_results[f"load_{load}"].error_rate for load in load_levels] analysis["reliability"] = { "avg_success_rate": statistics.mean(success_rates), "min_success_rate": min(success_rates), "error_rate_trend": self._calculate_trend(load_levels, error_rates), "stability_score": 1.0 - statistics.stdev(success_rates) if len(success_rates) > 1 else 1.0 } # Efficiency analysis (response time consistency) p95_times = [tool_results[f"load_{load}"].p95_response_time for load in load_levels] p99_times = [tool_results[f"load_{load}"].p99_response_time for load in load_levels] analysis["efficiency"] = { "avg_response_time": statistics.mean(response_times), "response_time_consistency": 1.0 - (statistics.stdev(response_times) / statistics.mean(response_times)) if statistics.mean(response_times) > 0 else 1.0, "p95_response_time": statistics.mean(p95_times), "p99_response_time": statistics.mean(p99_times), "latency_score": self._calculate_latency_score(response_times, p95_times) } # Resource usage analysis memory_usage = [tool_results[f"load_{load}"].memory_usage_mb for load in load_levels] cpu_usage = [tool_results[f"load_{load}"].cpu_usage_percent for load in load_levels] analysis["resource_usage"] = { "avg_memory_mb": statistics.mean(memory_usage), "max_memory_mb": max(memory_usage), "memory_growth_rate": self._calculate_trend(load_levels, memory_usage), "avg_cpu_percent": statistics.mean(cpu_usage), "max_cpu_percent": max(cpu_usage), "cpu_efficiency": min(cpu_usage) / max(cpu_usage) if max(cpu_usage) > 0 else 1.0 } return analysis def _generate_comparison_analysis(self, all_results: Dict[str, Any]) -> Dict[str, Any]: """Generate cross-tool comparison analysis.""" comparison = { "performance_ranking": {}, "use_case_recommendations": {}, "trade_off_analysis": {} } tools = list(all_results.keys()) load_levels = [1, 5, 10] # Performance ranking by different criteria criteria = ["throughput", "avg_response_time", "success_rate", "memory_usage_mb"] for criterion in criteria: rankings = [] for load in load_levels: load_rankings = [] for tool in tools: metrics = all_results[tool][f"load_{load}"] value = getattr(metrics, criterion, 0) # For response time and memory usage, lower is better if criterion in ["avg_response_time", "memory_usage_mb"]: score = 1.0 / (1.0 + value) if value > 0 else 1.0 else: score = value load_rankings.append((tool, score, value)) # Sort by score (higher is better) load_rankings.sort(key=lambda x: x[1], reverse=True) rankings.append(load_rankings) comparison["performance_ranking"][criterion] = rankings # Use case recommendations comparison["use_case_recommendations"] = { "high_throughput": self._recommend_for_throughput(all_results), "low_latency": self._recommend_for_latency(all_results), "resource_efficient": self._recommend_for_efficiency(all_results), "most_reliable": self._recommend_for_reliability(all_results) } # Trade-off analysis comparison["trade_off_analysis"] = self._analyze_trade_offs(all_results) return comparison def _generate_recommendations(self, all_results: Dict[str, Any]) -> Dict[str, Any]: """Generate actionable recommendations based on performance analysis.""" recommendations = { "best_overall_tool": None, "tool_specific_recommendations": {}, "architecture_recommendations": [], "optimization_opportunities": [] } # Calculate overall scores for each tool overall_scores = {} for tool_name, tool_results in all_results.items(): # Weight different factors weights = { "throughput": 0.25, "latency": 0.25, "reliability": 0.25, "resource_efficiency": 0.25 } throughput_score = statistics.mean([tool_results[f"load_{load}"].throughput for load in [1, 5, 10]]) latency_score = 1.0 / (1.0 + statistics.mean([tool_results[f"load_{load}"].avg_response_time for load in [1, 5, 10]])) reliability_score = statistics.mean([tool_results[f"load_{load}"].success_rate for load in [1, 5, 10]]) efficiency_score = 1.0 / (1.0 + statistics.mean([tool_results[f"load_{load}"].memory_usage_mb for load in [1, 5, 10]])) overall_score = ( throughput_score * weights["throughput"] + latency_score * weights["latency"] + reliability_score * weights["reliability"] + efficiency_score * weights["resource_efficiency"] ) overall_scores[tool_name] = overall_score # Find best overall tool best_tool = max(overall_scores.items(), key=lambda x: x[1]) recommendations["best_overall_tool"] = { "tool": best_tool[0], "score": best_tool[1], "reasoning": f"{best_tool[0]} achieved the highest weighted score across all performance criteria" } # Tool-specific recommendations for tool_name, tool_results in all_results.items(): tool_recs = [] # Analyze performance characteristics load_10_metrics = tool_results["load_10"] load_1_metrics = tool_results["load_1"] if load_10_metrics.success_rate < 0.9: tool_recs.append("Consider implementing circuit breaker pattern for better failure handling under load") if load_10_metrics.avg_response_time > load_1_metrics.avg_response_time * 3: tool_recs.append("Response time degrades significantly under load - consider request queuing or load balancing") if load_10_metrics.memory_usage_mb > 500: tool_recs.append("High memory usage detected - implement memory optimization strategies") if load_10_metrics.throughput < load_1_metrics.throughput * 0.5: tool_recs.append("Throughput doesn't scale well - investigate bottlenecks and consider asynchronous processing") recommendations["tool_specific_recommendations"][tool_name] = tool_recs # Architecture recommendations arch_recs = [] if any(all_results[tool]["load_10"].error_rate > 0.1 for tool in all_results): arch_recs.append("Implement retry mechanisms with exponential backoff for better resilience") if any(all_results[tool]["load_10"].avg_response_time > 5.0 for tool in all_results): arch_recs.append("Consider implementing request timeouts and graceful degradation") arch_recs.append("Implement comprehensive monitoring and alerting for production deployment") arch_recs.append("Consider using connection pooling and caching for better performance") recommendations["architecture_recommendations"] = arch_recs return recommendations def _calculate_scalability_score(self, loads: List[int], throughputs: List[float]) -> float: """Calculate scalability score based on how well throughput scales with load.""" if len(loads) < 2 or max(throughputs) == 0: return 0.0 # Ideal scalability would be linear ideal_throughputs = [throughputs[0] * load / loads[0] for load in loads] # Calculate how close actual throughputs are to ideal differences = [abs(actual - ideal) / ideal for actual, ideal in zip(throughputs, ideal_throughputs) if ideal > 0] if not differences: return 1.0 avg_difference = statistics.mean(differences) return max(0.0, 1.0 - avg_difference) def _calculate_trend(self, x_values: List, y_values: List) -> str: """Calculate trend direction (increasing, decreasing, stable).""" if len(y_values) < 2: return "stable" # Simple linear regression slope n = len(x_values) x_mean = statistics.mean(x_values) y_mean = statistics.mean(y_values) numerator = sum((x - x_mean) * (y - y_mean) for x, y in zip(x_values, y_values)) denominator = sum((x - x_mean) ** 2 for x in x_values) if denominator == 0: return "stable" slope = numerator / denominator if slope > 0.1: return "increasing" elif slope < -0.1: return "decreasing" else: return "stable" def _calculate_latency_score(self, avg_times: List[float], p95_times: List[float]) -> float: """Calculate latency score based on consistency.""" if not avg_times or not p95_times: return 0.0 # Good latency has low average and low variance avg_latency = statistics.mean(avg_times) avg_p95 = statistics.mean(p95_times) # Score based on absolute latency and consistency latency_score = 1.0 / (1.0 + avg_latency) consistency_score = avg_latency / avg_p95 if avg_p95 > 0 else 1.0 return (latency_score + consistency_score) / 2 def _recommend_for_throughput(self, all_results: Dict[str, Any]) -> Dict[str, Any]: """Recommend best tool for high throughput scenarios.""" max_throughput = 0 best_tool = None for tool_name, tool_results in all_results.items(): tool_max = max(tool_results[f"load_{load}"].throughput for load in [1, 5, 10]) if tool_max > max_throughput: max_throughput = tool_max best_tool = tool_name return { "tool": best_tool, "max_throughput": max_throughput, "use_case": "Batch processing, high-volume API services" } def _recommend_for_latency(self, all_results: Dict[str, Any]) -> Dict[str, Any]: """Recommend best tool for low latency scenarios.""" min_latency = float('inf') best_tool = None for tool_name, tool_results in all_results.items(): tool_min = min(tool_results[f"load_{load}"].avg_response_time for load in [1, 5, 10]) if tool_min < min_latency: min_latency = tool_min best_tool = tool_name return { "tool": best_tool, "avg_response_time": min_latency, "use_case": "Real-time applications, interactive systems" } def _recommend_for_efficiency(self, all_results: Dict[str, Any]) -> Dict[str, Any]: """Recommend best tool for resource efficiency.""" best_efficiency = 0 best_tool = None for tool_name, tool_results in all_results.items(): # Efficiency = throughput / memory_usage efficiency = sum( tool_results[f"load_{load}"].throughput / max(tool_results[f"load_{load}"].memory_usage_mb, 1.0) for load in [1, 5, 10] ) / 3 if efficiency > best_efficiency: best_efficiency = efficiency best_tool = tool_name return { "tool": best_tool, "efficiency_score": best_efficiency, "use_case": "Resource-constrained environments, cost optimization" } def _recommend_for_reliability(self, all_results: Dict[str, Any]) -> Dict[str, Any]: """Recommend best tool for reliability.""" best_reliability = 0 best_tool = None for tool_name, tool_results in all_results.items(): avg_success_rate = statistics.mean( tool_results[f"load_{load}"].success_rate for load in [1, 5, 10] ) if avg_success_rate > best_reliability: best_reliability = avg_success_rate best_tool = tool_name return { "tool": best_tool, "avg_success_rate": best_reliability, "use_case": "Mission-critical applications, production systems" } def _analyze_trade_offs(self, all_results: Dict[str, Any]) -> Dict[str, Any]: """Analyze trade-offs between different performance aspects.""" trade_offs = {} for tool_name, tool_results in all_results.items(): load_10 = tool_results["load_10"] # Throughput vs Latency throughput_latency_ratio = load_10.throughput * load_10.avg_response_time # Quality vs Speed (using success rate as quality proxy) quality_speed_ratio = load_10.success_rate / max(load_10.avg_response_time, 0.001) # Resource vs Performance resource_performance_ratio = (load_10.throughput * load_10.success_rate) / max(load_10.memory_usage_mb, 1.0) trade_offs[tool_name] = { "throughput_vs_latency": throughput_latency_ratio, "quality_vs_speed": quality_speed_ratio, "resource_vs_performance": resource_performance_ratio } return trade_offs def _format_detailed_metrics(self, all_results: Dict[str, Any]) -> Dict[str, Any]: """Format detailed metrics for the report.""" detailed = {} for tool_name, tool_results in all_results.items(): tool_detailed = {} for load_key, metrics in tool_results.items(): tool_detailed[load_key] = { "response_times": { "avg": metrics.avg_response_time, "min": metrics.min_response_time, "max": metrics.max_response_time, "p95": metrics.p95_response_time, "p99": metrics.p99_response_time }, "throughput": metrics.throughput, "reliability": { "success_rate": metrics.success_rate, "error_rate": metrics.error_rate, "successful_requests": metrics.successful_requests, "failed_requests": metrics.failed_requests, "total_requests": metrics.total_requests }, "resources": { "memory_usage_mb": metrics.memory_usage_mb, "cpu_usage_percent": metrics.cpu_usage_percent }, "test_info": { "load_level": metrics.load_level, "test_duration": metrics.test_duration, "timestamp": metrics.timestamp.isoformat() } } detailed[tool_name] = tool_detailed return detailed async def _save_results(self, all_results: Dict[str, Any], report: Dict[str, Any]): """Save benchmark results and report to files.""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") # Save raw results results_file = f"performance_benchmark_results_{timestamp}.json" with open(results_file, 'w') as f: # Convert metrics to dictionaries for JSON serialization serializable_results = {} for tool_name, tool_data in all_results.items(): serializable_results[tool_name] = {} for load_key, metrics in tool_data.items(): serializable_results[tool_name][load_key] = { "tool_name": metrics.tool_name, "load_level": metrics.load_level, "response_times": metrics.response_times, "throughput": metrics.throughput, "success_rate": metrics.success_rate, "error_rate": metrics.error_rate, "avg_response_time": metrics.avg_response_time, "min_response_time": metrics.min_response_time, "max_response_time": metrics.max_response_time, "p95_response_time": metrics.p95_response_time, "p99_response_time": metrics.p99_response_time, "memory_usage_mb": metrics.memory_usage_mb, "cpu_usage_percent": metrics.cpu_usage_percent, "total_requests": metrics.total_requests, "successful_requests": metrics.successful_requests, "failed_requests": metrics.failed_requests, "test_duration": metrics.test_duration, "timestamp": metrics.timestamp.isoformat() } json.dump(serializable_results, f, indent=2) # Save comprehensive report report_file = f"performance_benchmark_report_{timestamp}.json" with open(report_file, 'w') as f: json.dump(report, f, indent=2) # Generate markdown report markdown_file = f"performance_benchmark_report_{timestamp}.md" markdown_content = self._generate_markdown_report(report) with open(markdown_file, 'w') as f: f.write(markdown_content) logger.info(f"Results saved to: {results_file}") logger.info(f"Report saved to: {report_file}") logger.info(f"Markdown report saved to: {markdown_file}") def _generate_markdown_report(self, report: Dict[str, Any]) -> str: """Generate a markdown version of the performance report.""" md = ["# MCP Collective Intelligence Tools - Performance Benchmark Report"] md.append("") md.append(f"**Generated:** {report['benchmark_summary']['timestamp']}") md.append(f"**Total Duration:** {report['benchmark_summary']['total_duration_seconds']:.2f} seconds") md.append("") # Executive Summary md.append("## Executive Summary") md.append("") best_tool = report['recommendations']['best_overall_tool'] md.append(f"**Best Overall Tool:** {best_tool['tool']} (Score: {best_tool['score']:.3f})") md.append(f"**Reasoning:** {best_tool['reasoning']}") md.append("") # Performance Summary Table md.append("## Performance Summary") md.append("") md.append("| Tool | Load 1 | Load 5 | Load 10 | Avg Success Rate | Avg Response Time |") md.append("|------|--------|--------|---------|------------------|-------------------|") for tool_name in report['detailed_metrics']: load_1 = report['detailed_metrics'][tool_name]['load_1'] load_5 = report['detailed_metrics'][tool_name]['load_5'] load_10 = report['detailed_metrics'][tool_name]['load_10'] avg_success = (load_1['reliability']['success_rate'] + load_5['reliability']['success_rate'] + load_10['reliability']['success_rate']) / 3 avg_response = (load_1['response_times']['avg'] + load_5['response_times']['avg'] + load_10['response_times']['avg']) / 3 md.append(f"| {tool_name} | {load_1['throughput']:.2f} | {load_5['throughput']:.2f} | {load_10['throughput']:.2f} | {avg_success:.1%} | {avg_response:.2f}s |") md.append("") # Detailed Analysis md.append("## Detailed Performance Analysis") md.append("") for tool_name, analysis in report['performance_analysis'].items(): md.append(f"### {tool_name}") md.append("") md.append("**Scalability:**") md.append(f"- Score: {analysis['scalability']['score']:.3f}") md.append(f"- Max Throughput: {analysis['scalability']['max_throughput']:.2f} req/s") md.append(f"- Throughput Trend: {analysis['scalability']['throughput_trend']}") md.append("") md.append("**Reliability:**") md.append(f"- Average Success Rate: {analysis['reliability']['avg_success_rate']:.1%}") md.append(f"- Minimum Success Rate: {analysis['reliability']['min_success_rate']:.1%}") md.append(f"- Stability Score: {analysis['reliability']['stability_score']:.3f}") md.append("") md.append("**Resource Usage:**") md.append(f"- Average Memory: {analysis['resource_usage']['avg_memory_mb']:.1f} MB") md.append(f"- Maximum Memory: {analysis['resource_usage']['max_memory_mb']:.1f} MB") md.append(f"- Average CPU: {analysis['resource_usage']['avg_cpu_percent']:.1f}%") md.append("") # Recommendations md.append("## Recommendations") md.append("") md.append("### Use Case Recommendations") use_cases = report['comparison_analysis']['use_case_recommendations'] md.append(f"**High Throughput:** {use_cases['high_throughput']['tool']} ({use_cases['high_throughput']['max_throughput']:.2f} req/s)") md.append(f"**Low Latency:** {use_cases['low_latency']['tool']} ({use_cases['low_latency']['avg_response_time']:.2f}s)") md.append(f"**Resource Efficient:** {use_cases['resource_efficient']['tool']} (Score: {use_cases['resource_efficient']['efficiency_score']:.3f})") md.append(f"**Most Reliable:** {use_cases['most_reliable']['tool']} ({use_cases['most_reliable']['avg_success_rate']:.1%})") md.append("") md.append("### Architecture Recommendations") for rec in report['recommendations']['architecture_recommendations']: md.append(f"- {rec}") md.append("") # Tool-specific recommendations md.append("### Tool-Specific Recommendations") for tool_name, recs in report['recommendations']['tool_specific_recommendations'].items(): if recs: md.append(f"**{tool_name}:**") for rec in recs: md.append(f"- {rec}") md.append("") return "\n".join(md) async def main(): """Main function to run the comprehensive performance benchmark.""" print("Starting MCP Collective Intelligence Tools Performance Benchmark") print("=" * 70) benchmark = PerformanceBenchmark() try: # Run comprehensive benchmark report = await benchmark.run_comprehensive_benchmark() # Print summary print("\nBenchmark Summary:") print("-" * 50) best_tool = report['recommendations']['best_overall_tool'] print(f"Best Overall Tool: {best_tool['tool']} (Score: {best_tool['score']:.3f})") print("\nUse Case Recommendations:") use_cases = report['comparison_analysis']['use_case_recommendations'] print(f" High Throughput: {use_cases['high_throughput']['tool']}") print(f" Low Latency: {use_cases['low_latency']['tool']}") print(f" Resource Efficient: {use_cases['resource_efficient']['tool']}") print(f" Most Reliable: {use_cases['most_reliable']['tool']}") print(f"\nTotal Test Duration: {report['benchmark_summary']['total_duration_seconds']:.2f} seconds") print(f"Tools Tested: {', '.join(report['benchmark_summary']['tools_tested'])}") print(f"Load Levels: {report['benchmark_summary']['load_levels_tested']}") print("\nDetailed reports have been saved to JSON and Markdown files.") print("Check the generated files for comprehensive analysis and recommendations.") except Exception as e: logger.error(f"Benchmark failed: {e}") import traceback traceback.print_exc() if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/physics91/openrouter-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server