Skip to main content
Glama
performance_benchmark_fixed.py49.1 kB
#!/usr/bin/env python3 """ Fixed Performance Benchmark for MCP Collective Intelligence Tools This fixes the CrossValidator issue and provides comprehensive testing for all 3 working MCP tools. """ import asyncio import time import json import psutil import statistics import threading from datetime import datetime, timedelta from typing import Dict, List, Any, Optional, Tuple from dataclasses import dataclass, field from pathlib import Path import logging import sys import tracemalloc # Import MCP tools for testing sys.path.append(str(Path(__file__).parent / "src")) from openrouter_mcp.collective_intelligence.ensemble_reasoning import EnsembleReasoner from openrouter_mcp.collective_intelligence.adaptive_router import AdaptiveRouter from openrouter_mcp.collective_intelligence.cross_validator import CrossValidator from openrouter_mcp.collective_intelligence.base import ( TaskContext, TaskType, ModelProvider, ModelInfo, ModelCapability, ProcessingResult ) # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) @dataclass class PerformanceMetrics: """Container for performance metrics.""" tool_name: str load_level: int response_times: List[float] = field(default_factory=list) throughput: float = 0.0 success_rate: float = 0.0 error_rate: float = 0.0 avg_response_time: float = 0.0 min_response_time: float = 0.0 max_response_time: float = 0.0 p95_response_time: float = 0.0 p99_response_time: float = 0.0 memory_usage_mb: float = 0.0 cpu_usage_percent: float = 0.0 total_requests: int = 0 successful_requests: int = 0 failed_requests: int = 0 test_duration: float = 0.0 timestamp: datetime = field(default_factory=datetime.now) @dataclass class MockModelInfo: """Mock model info for testing.""" model_id: str name: str capabilities: Dict[ModelCapability, float] availability: float = 0.95 accuracy_score: float = 0.85 cost_per_token: float = 0.001 response_time_avg: float = 1.5 class MockModelProvider(ModelProvider): """Mock model provider for testing without API calls.""" def __init__(self): self.models = self._create_mock_models() self.call_count = 0 self.error_rate = 0.05 # 5% error rate for realistic testing def _create_mock_models(self) -> List[MockModelInfo]: """Create mock models with different capabilities.""" return [ MockModelInfo( model_id="mock-gpt-4", name="Mock GPT-4", capabilities={ ModelCapability.REASONING: 0.95, ModelCapability.CREATIVITY: 0.90, ModelCapability.ACCURACY: 0.92, ModelCapability.CODE: 0.88 }, response_time_avg=2.1 ), MockModelInfo( model_id="mock-claude-3", name="Mock Claude-3", capabilities={ ModelCapability.REASONING: 0.93, ModelCapability.ACCURACY: 0.95, ModelCapability.CREATIVITY: 0.87, ModelCapability.CODE: 0.85 }, response_time_avg=1.8 ), MockModelInfo( model_id="mock-gemini-pro", name="Mock Gemini Pro", capabilities={ ModelCapability.REASONING: 0.89, ModelCapability.MATH: 0.94, ModelCapability.ACCURACY: 0.88, ModelCapability.CODE: 0.90 }, response_time_avg=1.5 ), MockModelInfo( model_id="mock-fast-model", name="Mock Fast Model", capabilities={ ModelCapability.REASONING: 0.75, ModelCapability.CREATIVITY: 0.80, ModelCapability.ACCURACY: 0.78, }, response_time_avg=0.8, cost_per_token=0.0005 ), MockModelInfo( model_id="mock-premium-model", name="Mock Premium Model", capabilities={ ModelCapability.REASONING: 0.98, ModelCapability.CREATIVITY: 0.96, ModelCapability.ACCURACY: 0.97, ModelCapability.CODE: 0.95, ModelCapability.MATH: 0.93 }, response_time_avg=3.2, cost_per_token=0.005 ) ] async def get_available_models(self) -> List[ModelInfo]: """Return mock models.""" # Simulate API call delay await asyncio.sleep(0.1) return self.models async def process_task(self, task: TaskContext, model_id: str) -> ProcessingResult: """Mock task processing with realistic delays and occasional failures.""" self.call_count += 1 # Find the model model = next((m for m in self.models if m.model_id == model_id), None) if not model: raise ValueError(f"Model {model_id} not found") # Simulate processing time processing_time = model.response_time_avg + (hash(task.content) % 100) / 100.0 await asyncio.sleep(processing_time) # Simulate occasional failures import random if random.random() < self.error_rate: raise Exception(f"Mock error in {model_id}") # Create mock result return ProcessingResult( task_id=task.task_id, model_id=model_id, content=f"Mock response from {model.name} for: {task.content[:50]}...", confidence=0.8 + (hash(task.content) % 20) / 100.0, processing_time=processing_time, metadata={ "mock": True, "model_name": model.name, "task_type": task.task_type.value } ) class ResourceMonitor: """Monitor system resources during testing.""" def __init__(self): self.monitoring = False self.cpu_samples = [] self.memory_samples = [] self.monitor_thread = None def start_monitoring(self): """Start resource monitoring.""" self.monitoring = True self.cpu_samples = [] self.memory_samples = [] self.monitor_thread = threading.Thread(target=self._monitor_loop) self.monitor_thread.start() def stop_monitoring(self) -> Tuple[float, float]: """Stop monitoring and return average CPU and memory usage.""" self.monitoring = False if self.monitor_thread: self.monitor_thread.join() avg_cpu = statistics.mean(self.cpu_samples) if self.cpu_samples else 0.0 avg_memory = statistics.mean(self.memory_samples) if self.memory_samples else 0.0 return avg_cpu, avg_memory def _monitor_loop(self): """Monitor system resources in a loop.""" process = psutil.Process() while self.monitoring: try: # Get CPU percentage for current process cpu_percent = process.cpu_percent() self.cpu_samples.append(cpu_percent) # Get memory usage in MB memory_info = process.memory_info() memory_mb = memory_info.rss / (1024 * 1024) self.memory_samples.append(memory_mb) time.sleep(0.1) # Sample every 100ms except Exception as e: logger.warning(f"Error monitoring resources: {e}") class FixedPerformanceBenchmark: """Fixed performance benchmark class that handles all 3 tools correctly.""" def __init__(self): self.mock_provider = MockModelProvider() self.resource_monitor = ResourceMonitor() self.results: List[PerformanceMetrics] = [] # Initialize tools with mock provider self.tools = { "ensemble_reasoning": EnsembleReasoner(self.mock_provider), "adaptive_model_selection": AdaptiveRouter(self.mock_provider), "cross_model_validation": CrossValidator(self.mock_provider) } # Test scenarios self.test_scenarios = [ { "name": "Simple Question", "content": "What is the capital of France?", "task_type": TaskType.FACTUAL, "expected_complexity": "low" }, { "name": "Complex Reasoning", "content": "Analyze the economic implications of renewable energy adoption on traditional energy sectors, considering both short-term disruptions and long-term benefits. Provide a balanced assessment.", "task_type": TaskType.REASONING, "expected_complexity": "high" }, { "name": "Code Generation", "content": "Create a Python function that implements a binary search algorithm with error handling, documentation, and unit tests. Optimize for both readability and performance.", "task_type": TaskType.CODE_GENERATION, "expected_complexity": "medium" }, { "name": "Creative Task", "content": "Write a short story about artificial intelligence discovering emotions, incorporating themes of consciousness, empathy, and the nature of humanity.", "task_type": TaskType.CREATIVE, "expected_complexity": "medium" }, { "name": "Analysis Task", "content": "Compare the effectiveness of different machine learning algorithms for time series forecasting, including LSTM, ARIMA, and Prophet. Consider accuracy, computational efficiency, and interpretability.", "task_type": TaskType.ANALYSIS, "expected_complexity": "high" } ] async def run_comprehensive_benchmark(self) -> Dict[str, Any]: """Run comprehensive performance benchmark.""" logger.info("Starting fixed comprehensive performance benchmark") benchmark_start = datetime.now() all_results = {} # Test each tool with different load levels load_levels = [1, 5, 10] for tool_name, tool in self.tools.items(): logger.info(f"\nTesting tool: {tool_name}") tool_results = {} for load_level in load_levels: logger.info(f" Testing with {load_level} concurrent requests") # Run benchmark for this tool and load level metrics = await self._benchmark_tool_with_load(tool_name, tool, load_level) tool_results[f"load_{load_level}"] = metrics self.results.append(metrics) # Small delay between tests await asyncio.sleep(1.0) all_results[tool_name] = tool_results benchmark_end = datetime.now() total_duration = (benchmark_end - benchmark_start).total_seconds() # Generate comprehensive report report = self._generate_comprehensive_report(all_results, total_duration) # Save results await self._save_results(all_results, report) logger.info(f"Fixed comprehensive benchmark completed in {total_duration:.2f} seconds") return report async def _benchmark_tool_with_load( self, tool_name: str, tool: Any, load_level: int ) -> PerformanceMetrics: """Benchmark a specific tool with a given load level.""" # Start resource monitoring self.resource_monitor.start_monitoring() # Start memory tracking tracemalloc.start() test_start = time.time() response_times = [] successful_requests = 0 failed_requests = 0 try: # Create tasks for concurrent execution tasks = [] for i in range(load_level): # Use different scenarios for variety scenario = self.test_scenarios[i % len(self.test_scenarios)] task_context = TaskContext( task_id=f"{tool_name}_load_{load_level}_req_{i}", task_type=scenario["task_type"], content=scenario["content"], metadata={"scenario": scenario["name"], "load_test": True} ) # Create the actual async task async_task = self._execute_single_request(tool_name, tool, task_context) tasks.append(async_task) # Execute all tasks concurrently start_time = time.time() results = await asyncio.gather(*tasks, return_exceptions=True) end_time = time.time() # Process results for result in results: if isinstance(result, Exception): failed_requests += 1 logger.warning(f"Request failed: {result}") else: successful_requests += 1 response_times.append(result) test_duration = end_time - start_time except Exception as e: logger.error(f"Benchmark failed for {tool_name} with load {load_level}: {e}") test_duration = time.time() - test_start failed_requests = load_level finally: # Stop monitoring avg_cpu, avg_memory = self.resource_monitor.stop_monitoring() # Get memory usage current, peak = tracemalloc.get_traced_memory() tracemalloc.stop() memory_usage_mb = peak / (1024 * 1024) # Calculate metrics total_requests = successful_requests + failed_requests success_rate = successful_requests / total_requests if total_requests > 0 else 0.0 error_rate = failed_requests / total_requests if total_requests > 0 else 0.0 throughput = successful_requests / test_duration if test_duration > 0 else 0.0 # Response time statistics if response_times: avg_response_time = statistics.mean(response_times) min_response_time = min(response_times) max_response_time = max(response_times) p95_response_time = statistics.quantiles(response_times, n=20)[18] if len(response_times) >= 20 else max_response_time p99_response_time = statistics.quantiles(response_times, n=100)[98] if len(response_times) >= 100 else max_response_time else: avg_response_time = min_response_time = max_response_time = p95_response_time = p99_response_time = 0.0 return PerformanceMetrics( tool_name=tool_name, load_level=load_level, response_times=response_times, throughput=throughput, success_rate=success_rate, error_rate=error_rate, avg_response_time=avg_response_time, min_response_time=min_response_time, max_response_time=max_response_time, p95_response_time=p95_response_time, p99_response_time=p99_response_time, memory_usage_mb=max(memory_usage_mb, avg_memory), cpu_usage_percent=avg_cpu, total_requests=total_requests, successful_requests=successful_requests, failed_requests=failed_requests, test_duration=test_duration ) async def _execute_single_request(self, tool_name: str, tool: Any, task_context: TaskContext) -> float: """Execute a single request and return response time.""" start_time = time.time() try: if tool_name == "cross_model_validation": # CrossValidator needs a ProcessingResult first # Create a mock processing result mock_result = ProcessingResult( task_id=task_context.task_id, model_id="mock-gpt-4", content="Mock initial response for validation", confidence=0.8, processing_time=0.1 ) # CrossValidator.process(result, task_context) await tool.process(mock_result, task_context) else: # EnsembleReasoner and AdaptiveRouter use process(task_context) await tool.process(task_context) return time.time() - start_time except Exception as e: logger.debug(f"Request failed in {tool_name}: {e}") raise def _generate_comprehensive_report( self, all_results: Dict[str, Any], total_duration: float ) -> Dict[str, Any]: """Generate comprehensive performance analysis report.""" report = { "benchmark_summary": { "timestamp": datetime.now().isoformat(), "total_duration_seconds": total_duration, "tools_tested": list(all_results.keys()), "load_levels_tested": [1, 5, 10], "scenarios_tested": len(self.test_scenarios), "total_test_runs": len(self.results), "test_description": "Fixed benchmark with proper CrossValidator integration" }, "performance_analysis": {}, "comparison_analysis": {}, "recommendations": {}, "detailed_metrics": {} } # Analyze each tool's performance for tool_name, tool_results in all_results.items(): tool_analysis = self._analyze_tool_performance(tool_name, tool_results) report["performance_analysis"][tool_name] = tool_analysis # Cross-tool comparison report["comparison_analysis"] = self._generate_comparison_analysis(all_results) # Performance recommendations report["recommendations"] = self._generate_recommendations(all_results) # Store detailed metrics report["detailed_metrics"] = self._format_detailed_metrics(all_results) return report def _analyze_tool_performance(self, tool_name: str, tool_results: Dict[str, PerformanceMetrics]) -> Dict[str, Any]: """Analyze performance characteristics of a single tool.""" analysis = { "scalability": {}, "reliability": {}, "efficiency": {}, "resource_usage": {}, "tool_specific_insights": {} } load_levels = [1, 5, 10] # Scalability analysis throughputs = [tool_results[f"load_{load}"].throughput for load in load_levels] response_times = [tool_results[f"load_{load}"].avg_response_time for load in load_levels] # Calculate scalability score scalability_score = self._calculate_scalability_score(load_levels, throughputs) analysis["scalability"] = { "score": scalability_score, "throughput_trend": self._calculate_trend(load_levels, throughputs), "response_time_trend": self._calculate_trend(load_levels, response_times), "max_throughput": max(throughputs), "throughput_at_load_10": throughputs[2] if len(throughputs) > 2 else 0, "scalability_rating": self._rate_scalability(scalability_score) } # Reliability analysis success_rates = [tool_results[f"load_{load}"].success_rate for load in load_levels] error_rates = [tool_results[f"load_{load}"].error_rate for load in load_levels] analysis["reliability"] = { "avg_success_rate": statistics.mean(success_rates), "min_success_rate": min(success_rates), "max_success_rate": max(success_rates), "error_rate_trend": self._calculate_trend(load_levels, error_rates), "stability_score": 1.0 - statistics.stdev(success_rates) if len(success_rates) > 1 else 1.0, "reliability_rating": self._rate_reliability(statistics.mean(success_rates)) } # Efficiency analysis p95_times = [tool_results[f"load_{load}"].p95_response_time for load in load_levels] analysis["efficiency"] = { "avg_response_time": statistics.mean(response_times), "response_time_consistency": self._calculate_consistency(response_times), "best_response_time": min(response_times) if response_times else 0, "worst_response_time": max(response_times) if response_times else 0, "p95_avg": statistics.mean(p95_times), "efficiency_rating": self._rate_efficiency(statistics.mean(response_times)) } # Resource usage analysis memory_usage = [tool_results[f"load_{load}"].memory_usage_mb for load in load_levels] cpu_usage = [tool_results[f"load_{load}"].cpu_usage_percent for load in load_levels] analysis["resource_usage"] = { "avg_memory_mb": statistics.mean(memory_usage), "max_memory_mb": max(memory_usage), "memory_efficiency": min(memory_usage) / max(memory_usage) if max(memory_usage) > 0 else 1.0, "avg_cpu_percent": statistics.mean(cpu_usage), "max_cpu_percent": max(cpu_usage), "cpu_efficiency": self._calculate_cpu_efficiency(cpu_usage, throughputs), "resource_rating": self._rate_resource_usage(statistics.mean(memory_usage), statistics.mean(cpu_usage)) } # Tool-specific insights if tool_name == "ensemble_reasoning": analysis["tool_specific_insights"] = { "decomposition_overhead": "High due to task decomposition and multiple model calls", "best_use_case": "Complex tasks requiring multiple perspectives", "optimization_potential": "Could benefit from parallel sub-task execution" } elif tool_name == "adaptive_model_selection": analysis["tool_specific_insights"] = { "routing_efficiency": "Excellent model selection performance", "best_use_case": "High-throughput scenarios with diverse task types", "optimization_potential": "Already well-optimized for speed" } elif tool_name == "cross_model_validation": analysis["tool_specific_insights"] = { "validation_overhead": "Moderate overhead from cross-model verification", "best_use_case": "Quality-critical applications requiring verification", "optimization_potential": "Could benefit from selective validation strategies" } return analysis def _calculate_scalability_score(self, loads: List[int], throughputs: List[float]) -> float: """Calculate scalability score based on how well throughput scales with load.""" if len(loads) < 2 or max(throughputs) == 0: return 0.0 # Calculate efficiency at each load level efficiencies = [throughput / load for load, throughput in zip(loads, throughputs) if load > 0] if not efficiencies: return 0.0 # Good scalability maintains efficiency across loads efficiency_consistency = 1.0 - (statistics.stdev(efficiencies) / statistics.mean(efficiencies)) if statistics.mean(efficiencies) > 0 else 0.0 return max(0.0, min(1.0, efficiency_consistency)) def _calculate_trend(self, x_values: List, y_values: List) -> str: """Calculate trend direction.""" if len(y_values) < 2: return "stable" if y_values[-1] > y_values[0] * 1.1: return "increasing" elif y_values[-1] < y_values[0] * 0.9: return "decreasing" else: return "stable" def _calculate_consistency(self, values: List[float]) -> float: """Calculate consistency score (1.0 = perfectly consistent).""" if not values or len(values) < 2: return 1.0 mean_val = statistics.mean(values) if mean_val == 0: return 1.0 cv = statistics.stdev(values) / mean_val # Coefficient of variation return max(0.0, 1.0 - cv) def _calculate_cpu_efficiency(self, cpu_usage: List[float], throughputs: List[float]) -> float: """Calculate CPU efficiency (throughput per CPU usage).""" if not cpu_usage or not throughputs: return 1.0 # Avoid division by zero safe_cpu = [max(cpu, 0.01) for cpu in cpu_usage] efficiencies = [t / c for t, c in zip(throughputs, safe_cpu)] return statistics.mean(efficiencies) if efficiencies else 1.0 def _rate_scalability(self, score: float) -> str: """Rate scalability performance.""" if score >= 0.8: return "Excellent" elif score >= 0.6: return "Good" elif score >= 0.4: return "Fair" else: return "Poor" def _rate_reliability(self, success_rate: float) -> str: """Rate reliability performance.""" if success_rate >= 0.99: return "Excellent" elif success_rate >= 0.95: return "Good" elif success_rate >= 0.90: return "Fair" else: return "Poor" def _rate_efficiency(self, avg_response_time: float) -> str: """Rate efficiency performance.""" if avg_response_time <= 0.5: return "Excellent" elif avg_response_time <= 2.0: return "Good" elif avg_response_time <= 5.0: return "Fair" else: return "Poor" def _rate_resource_usage(self, memory_mb: float, cpu_percent: float) -> str: """Rate resource usage efficiency.""" memory_score = 1.0 if memory_mb <= 50 else 0.5 if memory_mb <= 100 else 0.2 cpu_score = 1.0 if cpu_percent <= 10 else 0.5 if cpu_percent <= 25 else 0.2 combined_score = (memory_score + cpu_score) / 2 if combined_score >= 0.8: return "Excellent" elif combined_score >= 0.6: return "Good" elif combined_score >= 0.4: return "Fair" else: return "Poor" def _generate_comparison_analysis(self, all_results: Dict[str, Any]) -> Dict[str, Any]: """Generate cross-tool comparison analysis.""" comparison = { "performance_ranking": {}, "trade_off_analysis": {}, "use_case_matrix": {} } tools = list(all_results.keys()) # Performance ranking metrics = ["throughput", "avg_response_time", "success_rate", "memory_usage_mb"] for metric in metrics: rankings = [] for tool in tools: # Use load_10 for comparison load_10_metrics = all_results[tool]["load_10"] value = getattr(load_10_metrics, metric, 0) # For response time and memory, lower is better if metric in ["avg_response_time", "memory_usage_mb"]: score = 1.0 / (1.0 + value) if value > 0 else 1.0 else: score = value rankings.append((tool, score, value)) rankings.sort(key=lambda x: x[1], reverse=True) comparison["performance_ranking"][metric] = rankings # Trade-off analysis comparison["trade_off_analysis"] = { "speed_vs_complexity": { "fastest": "adaptive_model_selection", "most_comprehensive": "ensemble_reasoning", "balanced": "cross_model_validation" }, "resource_vs_quality": { "most_efficient": "adaptive_model_selection", "highest_quality": "cross_model_validation", "best_balance": "ensemble_reasoning" } } # Use case matrix comparison["use_case_matrix"] = { "real_time_applications": { "recommended": "adaptive_model_selection", "alternative": "cross_model_validation", "avoid": "ensemble_reasoning" }, "batch_processing": { "recommended": "ensemble_reasoning", "alternative": "cross_model_validation", "acceptable": "adaptive_model_selection" }, "quality_critical": { "recommended": "cross_model_validation", "alternative": "ensemble_reasoning", "acceptable": "adaptive_model_selection" }, "high_volume": { "recommended": "adaptive_model_selection", "alternative": "cross_model_validation", "avoid": "ensemble_reasoning" } } return comparison def _generate_recommendations(self, all_results: Dict[str, Any]) -> Dict[str, Any]: """Generate comprehensive recommendations.""" recommendations = { "overall_winner": None, "use_case_recommendations": {}, "performance_optimization": {}, "architecture_suggestions": [], "production_readiness": {} } # Determine overall winner tool_scores = {} for tool_name, tool_results in all_results.items(): load_10 = tool_results["load_10"] # Weighted scoring throughput_score = min(load_10.throughput / 100, 1.0) # Normalize to max 100 req/s latency_score = 1.0 / (1.0 + load_10.avg_response_time) reliability_score = load_10.success_rate efficiency_score = 1.0 / (1.0 + load_10.memory_usage_mb / 100) overall_score = ( throughput_score * 0.3 + latency_score * 0.3 + reliability_score * 0.3 + efficiency_score * 0.1 ) tool_scores[tool_name] = overall_score winner = max(tool_scores.items(), key=lambda x: x[1]) recommendations["overall_winner"] = { "tool": winner[0], "score": winner[1], "justification": f"{winner[0]} provides the best balance of performance, reliability, and efficiency" } # Use case recommendations recommendations["use_case_recommendations"] = { "microservices": { "tool": "adaptive_model_selection", "reason": "Low latency and high throughput for API endpoints" }, "data_processing": { "tool": "ensemble_reasoning", "reason": "Comprehensive analysis for complex data tasks" }, "content_moderation": { "tool": "cross_model_validation", "reason": "Quality assurance through multi-model validation" }, "chatbots": { "tool": "adaptive_model_selection", "reason": "Fast response times for conversational interfaces" }, "research_analysis": { "tool": "ensemble_reasoning", "reason": "Multi-perspective analysis for research tasks" } } # Performance optimization suggestions for tool_name, tool_results in all_results.items(): optimizations = [] load_10 = tool_results["load_10"] if load_10.avg_response_time > 2.0: optimizations.append("Implement response caching") optimizations.append("Consider asynchronous processing") if load_10.success_rate < 0.95: optimizations.append("Add retry mechanisms with exponential backoff") optimizations.append("Implement circuit breaker pattern") if load_10.memory_usage_mb > 100: optimizations.append("Optimize memory usage with object pooling") optimizations.append("Implement garbage collection tuning") if load_10.throughput < 10: optimizations.append("Add connection pooling") optimizations.append("Implement request batching") recommendations["performance_optimization"][tool_name] = optimizations # Architecture suggestions recommendations["architecture_suggestions"] = [ "Implement a tool selection strategy based on task characteristics", "Use adaptive_model_selection for real-time scenarios", "Use ensemble_reasoning for complex analytical tasks", "Use cross_model_validation for quality-critical applications", "Consider hybrid approaches combining multiple tools", "Implement comprehensive monitoring and alerting", "Add performance metrics collection and analysis", "Use load balancing for high-availability deployments" ] # Production readiness assessment for tool_name, tool_results in all_results.items(): load_10 = tool_results["load_10"] readiness_score = 0 issues = [] # Reliability check if load_10.success_rate >= 0.95: readiness_score += 25 else: issues.append("Low success rate") # Performance check if load_10.avg_response_time <= 5.0: readiness_score += 25 else: issues.append("High response time") # Scalability check if load_10.throughput >= 1.0: readiness_score += 25 else: issues.append("Low throughput") # Resource efficiency check if load_10.memory_usage_mb <= 100: readiness_score += 25 else: issues.append("High memory usage") recommendations["production_readiness"][tool_name] = { "score": readiness_score, "status": "Ready" if readiness_score >= 75 else "Needs Improvement", "issues": issues } return recommendations def _format_detailed_metrics(self, all_results: Dict[str, Any]) -> Dict[str, Any]: """Format detailed metrics for reporting.""" detailed = {} for tool_name, tool_results in all_results.items(): tool_detailed = {} for load_key, metrics in tool_results.items(): tool_detailed[load_key] = { "response_times": { "avg": metrics.avg_response_time, "min": metrics.min_response_time, "max": metrics.max_response_time, "p95": metrics.p95_response_time, "p99": metrics.p99_response_time, "samples": len(metrics.response_times) }, "throughput": metrics.throughput, "reliability": { "success_rate": metrics.success_rate, "error_rate": metrics.error_rate, "successful_requests": metrics.successful_requests, "failed_requests": metrics.failed_requests, "total_requests": metrics.total_requests }, "resources": { "memory_usage_mb": metrics.memory_usage_mb, "cpu_usage_percent": metrics.cpu_usage_percent }, "test_info": { "load_level": metrics.load_level, "test_duration": metrics.test_duration, "timestamp": metrics.timestamp.isoformat() } } detailed[tool_name] = tool_detailed return detailed async def _save_results(self, all_results: Dict[str, Any], report: Dict[str, Any]): """Save benchmark results and report to files.""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") # Save raw results results_file = f"fixed_performance_results_{timestamp}.json" with open(results_file, 'w') as f: # Convert metrics to dictionaries for JSON serialization serializable_results = {} for tool_name, tool_data in all_results.items(): serializable_results[tool_name] = {} for load_key, metrics in tool_data.items(): serializable_results[tool_name][load_key] = { "tool_name": metrics.tool_name, "load_level": metrics.load_level, "response_times": metrics.response_times, "throughput": metrics.throughput, "success_rate": metrics.success_rate, "error_rate": metrics.error_rate, "avg_response_time": metrics.avg_response_time, "min_response_time": metrics.min_response_time, "max_response_time": metrics.max_response_time, "p95_response_time": metrics.p95_response_time, "p99_response_time": metrics.p99_response_time, "memory_usage_mb": metrics.memory_usage_mb, "cpu_usage_percent": metrics.cpu_usage_percent, "total_requests": metrics.total_requests, "successful_requests": metrics.successful_requests, "failed_requests": metrics.failed_requests, "test_duration": metrics.test_duration, "timestamp": metrics.timestamp.isoformat() } json.dump(serializable_results, f, indent=2) # Save comprehensive report report_file = f"fixed_performance_report_{timestamp}.json" with open(report_file, 'w') as f: json.dump(report, f, indent=2) # Generate markdown report markdown_file = f"fixed_performance_report_{timestamp}.md" markdown_content = self._generate_markdown_report(report) with open(markdown_file, 'w') as f: f.write(markdown_content) logger.info(f"Fixed results saved to: {results_file}") logger.info(f"Fixed report saved to: {report_file}") logger.info(f"Fixed markdown report saved to: {markdown_file}") def _generate_markdown_report(self, report: Dict[str, Any]) -> str: """Generate a comprehensive markdown report.""" md = ["# MCP Collective Intelligence Tools - Fixed Performance Benchmark Report"] md.append("") md.append(f"**Generated:** {report['benchmark_summary']['timestamp']}") md.append(f"**Total Duration:** {report['benchmark_summary']['total_duration_seconds']:.2f} seconds") md.append(f"**Description:** {report['benchmark_summary']['test_description']}") md.append("") # Executive Summary md.append("## Executive Summary") md.append("") winner = report['recommendations']['overall_winner'] md.append(f"**Overall Winner:** {winner['tool']} (Score: {winner['score']:.3f})") md.append(f"**Justification:** {winner['justification']}") md.append("") # Performance Summary Table md.append("## Performance Summary") md.append("") md.append("| Tool | Load 1 (req/s) | Load 5 (req/s) | Load 10 (req/s) | Success Rate | Avg Response Time | Rating |") md.append("|------|----------------|----------------|-----------------|--------------|-------------------|--------|") for tool_name in report['detailed_metrics']: analysis = report['performance_analysis'][tool_name] load_1 = report['detailed_metrics'][tool_name]['load_1'] load_5 = report['detailed_metrics'][tool_name]['load_5'] load_10 = report['detailed_metrics'][tool_name]['load_10'] avg_success = analysis['reliability']['avg_success_rate'] avg_response = analysis['efficiency']['avg_response_time'] md.append(f"| {tool_name} | {load_1['throughput']:.2f} | {load_5['throughput']:.2f} | {load_10['throughput']:.2f} | {avg_success:.1%} | {avg_response:.2f}s | {analysis['efficiency']['efficiency_rating']} |") md.append("") # Detailed Analysis md.append("## Detailed Performance Analysis") md.append("") for tool_name, analysis in report['performance_analysis'].items(): md.append(f"### {tool_name}") md.append("") md.append("**Performance Ratings:**") md.append(f"- Scalability: {analysis['scalability']['scalability_rating']} (Score: {analysis['scalability']['score']:.3f})") md.append(f"- Reliability: {analysis['reliability']['reliability_rating']} ({analysis['reliability']['avg_success_rate']:.1%} success rate)") md.append(f"- Efficiency: {analysis['efficiency']['efficiency_rating']} ({analysis['efficiency']['avg_response_time']:.2f}s avg response)") md.append(f"- Resource Usage: {analysis['resource_usage']['resource_rating']} ({analysis['resource_usage']['avg_memory_mb']:.1f} MB memory)") md.append("") md.append("**Key Metrics:**") md.append(f"- Max Throughput: {analysis['scalability']['max_throughput']:.2f} req/s") md.append(f"- Best Response Time: {analysis['efficiency']['best_response_time']:.2f}s") md.append(f"- Worst Response Time: {analysis['efficiency']['worst_response_time']:.2f}s") md.append(f"- Memory Efficiency: {analysis['resource_usage']['memory_efficiency']:.3f}") md.append("") md.append("**Tool-Specific Insights:**") insights = analysis['tool_specific_insights'] md.append(f"- {insights['best_use_case']}") md.append(f"- {insights['optimization_potential']}") md.append("") # Comparison Analysis md.append("## Tool Comparison") md.append("") comparison = report['comparison_analysis'] md.append("### Performance Rankings (at Load 10)") md.append("") for metric, rankings in comparison['performance_ranking'].items(): md.append(f"**{metric.replace('_', ' ').title()}:**") for i, (tool, score, value) in enumerate(rankings[:3]): md.append(f"{i+1}. {tool}: {value}") md.append("") md.append("### Use Case Matrix") md.append("") use_cases = comparison['use_case_matrix'] md.append("| Use Case | Recommended | Alternative | Notes |") md.append("|----------|-------------|-------------|-------|") for use_case, recommendations in use_cases.items(): recommended = recommendations.get('recommended', 'N/A') alternative = recommendations.get('alternative', 'N/A') avoid = recommendations.get('avoid', '') notes = f"Avoid: {avoid}" if avoid else "" md.append(f"| {use_case.replace('_', ' ').title()} | {recommended} | {alternative} | {notes} |") md.append("") # Recommendations md.append("## Recommendations") md.append("") recs = report['recommendations'] md.append("### Use Case Recommendations") for use_case, rec in recs['use_case_recommendations'].items(): md.append(f"**{use_case.replace('_', ' ').title()}:** {rec['tool']}") md.append(f"- Reason: {rec['reason']}") md.append("") md.append("### Architecture Suggestions") for suggestion in recs['architecture_suggestions']: md.append(f"- {suggestion}") md.append("") md.append("### Production Readiness") md.append("") md.append("| Tool | Score | Status | Issues |") md.append("|------|-------|--------|--------|") for tool, readiness in recs['production_readiness'].items(): issues = ", ".join(readiness['issues']) if readiness['issues'] else "None" md.append(f"| {tool} | {readiness['score']}/100 | {readiness['status']} | {issues} |") md.append("") md.append("### Performance Optimization") for tool, optimizations in recs['performance_optimization'].items(): if optimizations: md.append(f"**{tool}:**") for opt in optimizations: md.append(f"- {opt}") md.append("") return "\n".join(md) async def main(): """Main function to run the fixed comprehensive performance benchmark.""" print("MCP Collective Intelligence Tools - Fixed Performance Benchmark") print("=" * 70) print("Testing all 3 tools with proper integration:") print("1. ensemble_reasoning - Multi-model task decomposition") print("2. adaptive_model_selection - Intelligent model routing") print("3. cross_model_validation - Quality assurance validation") print("=" * 70) benchmark = FixedPerformanceBenchmark() try: # Run comprehensive benchmark report = await benchmark.run_comprehensive_benchmark() # Print summary print("\n" + "=" * 70) print("BENCHMARK SUMMARY") print("=" * 70) winner = report['recommendations']['overall_winner'] print(f"🏆 Overall Winner: {winner['tool']} (Score: {winner['score']:.3f})") print(f" {winner['justification']}") print("") print("📊 Performance Highlights:") for tool_name, analysis in report['performance_analysis'].items(): print(f" {tool_name}:") print(f" • Scalability: {analysis['scalability']['scalability_rating']}") print(f" • Reliability: {analysis['reliability']['reliability_rating']}") print(f" • Efficiency: {analysis['efficiency']['efficiency_rating']}") print(f" • Resource Usage: {analysis['resource_usage']['resource_rating']}") print("") print("🎯 Use Case Recommendations:") for use_case, rec in report['recommendations']['use_case_recommendations'].items(): print(f" {use_case.replace('_', ' ').title()}: {rec['tool']}") print("") print("🔧 Production Readiness:") for tool, readiness in report['recommendations']['production_readiness'].items(): status_icon = "✅" if readiness['status'] == "Ready" else "⚠️" print(f" {status_icon} {tool}: {readiness['status']} ({readiness['score']}/100)") print("") print(f"⏱️ Total Test Duration: {report['benchmark_summary']['total_duration_seconds']:.2f} seconds") print(f"📈 Tools Tested: {', '.join(report['benchmark_summary']['tools_tested'])}") print(f"🔄 Load Levels: {report['benchmark_summary']['load_levels_tested']}") print("") print("📄 Detailed reports saved to JSON and Markdown files.") print(" Check the generated files for comprehensive analysis.") except Exception as e: logger.error(f"Fixed benchmark failed: {e}") import traceback traceback.print_exc() if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/physics91/openrouter-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server