Skip to main content
Glama

MCP Standards

by airmcp-com
test_stress_comprehensive.py22.8 kB
#!/usr/bin/env python3 """ Comprehensive Stress Testing Suite Extended validation beyond basic scalability testing. """ import asyncio import gc import json import psutil import random import statistics import sys import time import os from typing import List, Dict, Any, Tuple from datetime import datetime, timedelta # Add project root to path sys.path.insert(0, os.path.abspath('.')) from src.mcp_standards.memory.v2.test_hybrid_memory import create_test_hybrid_memory class StressTestMetrics: """Comprehensive metrics collection for stress testing""" def __init__(self): self.start_time = time.time() self.operations = [] self.memory_samples = [] self.error_log = [] self.performance_samples = [] def record_operation(self, operation_type: str, duration: float, success: bool, details: Dict = None): """Record a single operation""" self.operations.append({ 'type': operation_type, 'duration': duration, 'success': success, 'timestamp': time.time(), 'details': details or {} }) def record_memory_sample(self): """Record current memory usage""" try: process = psutil.Process() memory_info = process.memory_info() cpu_percent = process.cpu_percent() self.memory_samples.append({ 'timestamp': time.time(), 'rss_mb': memory_info.rss / 1024 / 1024, 'vms_mb': memory_info.vms / 1024 / 1024, 'cpu_percent': cpu_percent, 'num_fds': process.num_fds() if hasattr(process, 'num_fds') else 0 }) except (psutil.NoSuchProcess, psutil.AccessDenied): pass def record_error(self, error_type: str, message: str, context: Dict = None): """Record an error occurrence""" self.error_log.append({ 'type': error_type, 'message': message, 'timestamp': time.time(), 'context': context or {} }) def get_summary(self) -> Dict[str, Any]: """Generate comprehensive test summary""" total_time = time.time() - self.start_time # Operation statistics successful_ops = [op for op in self.operations if op['success']] failed_ops = [op for op in self.operations if not op['success']] # Performance statistics durations = [op['duration'] for op in successful_ops] # Memory statistics if self.memory_samples: memory_growth = ( self.memory_samples[-1]['rss_mb'] - self.memory_samples[0]['rss_mb'] if len(self.memory_samples) > 1 else 0 ) max_memory = max(sample['rss_mb'] for sample in self.memory_samples) avg_cpu = statistics.mean(sample['cpu_percent'] for sample in self.memory_samples) else: memory_growth = 0 max_memory = 0 avg_cpu = 0 return { 'test_duration_seconds': total_time, 'total_operations': len(self.operations), 'successful_operations': len(successful_ops), 'failed_operations': len(failed_ops), 'success_rate': len(successful_ops) / len(self.operations) if self.operations else 0, 'operations_per_second': len(self.operations) / total_time if total_time > 0 else 0, 'performance': { 'avg_duration_ms': statistics.mean(durations) * 1000 if durations else 0, 'min_duration_ms': min(durations) * 1000 if durations else 0, 'max_duration_ms': max(durations) * 1000 if durations else 0, 'p95_duration_ms': statistics.quantiles(durations, n=20)[18] * 1000 if len(durations) > 20 else 0, 'p99_duration_ms': statistics.quantiles(durations, n=100)[98] * 1000 if len(durations) > 100 else 0, }, 'memory': { 'growth_mb': memory_growth, 'max_usage_mb': max_memory, 'avg_cpu_percent': avg_cpu, 'samples_collected': len(self.memory_samples) }, 'errors': { 'total_errors': len(self.error_log), 'error_types': list(set(error['type'] for error in self.error_log)), 'error_rate': len(self.error_log) / len(self.operations) if self.operations else 0 } } async def stress_test_large_dataset(memory_router, metrics: StressTestMetrics, num_patterns: int = 1000): """Test system with large dataset of patterns""" print(f"📊 Large Dataset Stress Test: {num_patterns} patterns...") # Generate diverse patterns pattern_templates = [ "File operation: {action} on {file_type} file {filename} in {directory}", "Database query: {operation} {table} with {condition} filter returning {count} results", "API request: {method} {endpoint} with {params} parameters taking {duration}ms", "System command: {command} executed with {args} arguments in {context} environment", "User interaction: {user} performed {action} on {component} at {timestamp}", "Network operation: {protocol} connection to {host}:{port} with {status} status", "Cache operation: {operation} {key} in {cache_type} cache with {result} outcome", "Configuration change: {setting} modified from {old_value} to {new_value} by {user}", "Performance metric: {metric} measured at {value} {unit} on {component}", "Error handling: {error_type} caught in {module} with {severity} severity" ] start_time = time.time() batch_size = 50 for batch_start in range(0, num_patterns, batch_size): batch_end = min(batch_start + batch_size, num_patterns) batch_patterns = [] # Generate batch of patterns for i in range(batch_start, batch_end): template = random.choice(pattern_templates) # Fill template with random data pattern_content = template.format( action=random.choice(['create', 'update', 'delete', 'read', 'modify']), file_type=random.choice(['json', 'xml', 'csv', 'txt', 'log']), filename=f"file_{i}_{random.randint(1000, 9999)}", directory=random.choice(['/tmp', '/var/log', '/home/user', '/opt/app']), operation=random.choice(['SELECT', 'INSERT', 'UPDATE', 'DELETE']), table=f"table_{random.choice(['users', 'orders', 'products', 'logs'])}", condition=f"id = {random.randint(1, 1000)}", count=random.randint(0, 100), method=random.choice(['GET', 'POST', 'PUT', 'DELETE']), endpoint=f"/api/v1/{random.choice(['users', 'orders', 'products'])}", params=random.randint(0, 10), duration=random.randint(10, 1000), command=random.choice(['ls', 'grep', 'find', 'awk', 'sed']), args=f"--{random.choice(['verbose', 'recursive', 'force'])}", context=random.choice(['development', 'staging', 'production']), user=f"user_{random.randint(1, 100)}", component=random.choice(['button', 'form', 'menu', 'dialog']), timestamp=datetime.now().isoformat(), protocol=random.choice(['HTTP', 'HTTPS', 'TCP', 'UDP']), host=f"server_{random.randint(1, 10)}", port=random.choice([80, 443, 8080, 3000]), status=random.choice(['success', 'failed', 'timeout']), key=f"cache_key_{i}", cache_type=random.choice(['redis', 'memcache', 'local']), result=random.choice(['hit', 'miss', 'expired']), setting=f"config_{random.choice(['timeout', 'retries', 'pool_size'])}", old_value=random.randint(1, 100), new_value=random.randint(1, 100), metric=random.choice(['cpu_usage', 'memory_usage', 'response_time']), value=random.uniform(0.1, 99.9), unit=random.choice(['%', 'MB', 'ms']), error_type=random.choice(['TypeError', 'ValueError', 'ConnectionError']), module=f"module_{random.choice(['auth', 'db', 'api', 'cache'])}", severity=random.choice(['low', 'medium', 'high', 'critical']) ) batch_patterns.append({ 'content': pattern_content, 'context': { 'batch': batch_start // batch_size, 'pattern_id': i, 'test_type': 'large_dataset_stress', 'complexity': random.choice(['simple', 'medium', 'complex']), 'priority': random.choice(['low', 'medium', 'high']), 'source': random.choice(['user', 'system', 'api', 'batch']) }, 'category': f"stress_test_category_{i % 10}" }) # Store batch concurrently batch_start_time = time.time() tasks = [] for pattern in batch_patterns: async def store_pattern(p): op_start = time.time() try: pattern_id = await memory_router.store_pattern( p['content'], p['context'], p['category'] ) duration = time.time() - op_start metrics.record_operation('store', duration, True, {'pattern_id': pattern_id}) return True except Exception as e: duration = time.time() - op_start metrics.record_operation('store', duration, False) metrics.record_error('store_error', str(e), {'pattern': p}) return False tasks.append(store_pattern(pattern)) # Execute batch results = await asyncio.gather(*tasks, return_exceptions=True) batch_time = time.time() - batch_start_time successful = sum(1 for r in results if r is True) batch_num = batch_start // batch_size + 1 total_batches = (num_patterns + batch_size - 1) // batch_size print(f" Batch {batch_num}/{total_batches}: {successful}/{len(batch_patterns)} patterns stored in {batch_time:.3f}s") # Memory sampling metrics.record_memory_sample() # Brief pause between batches await asyncio.sleep(0.1) total_time = time.time() - start_time print(f"✅ Large Dataset Test: {num_patterns} patterns processed in {total_time:.3f}s") return True async def stress_test_concurrent_load(memory_router, metrics: StressTestMetrics, max_concurrent: int = 20): """Test system under extreme concurrent load""" print(f"⚡ Extreme Concurrent Load Test: {max_concurrent} concurrent operations...") async def concurrent_user(user_id: int, duration_seconds: int = 60): """Simulate a single concurrent user""" end_time = time.time() + duration_seconds operation_count = 0 while time.time() < end_time: # Randomly choose operation (70% search, 30% store) if random.random() < 0.7: # Search operation queries = [ "file operation database", "user interaction system", "API request performance", "network connection cache", "error handling configuration" ] query = random.choice(queries) op_start = time.time() try: results = await memory_router.search_patterns( query, top_k=random.randint(3, 10), threshold=random.uniform(0.3, 0.8) ) duration = time.time() - op_start metrics.record_operation( 'search', duration, True, {'user': user_id, 'results': len(results) if results else 0} ) except Exception as e: duration = time.time() - op_start metrics.record_operation('search', duration, False) metrics.record_error('search_error', str(e), {'user': user_id}) else: # Store operation content = f"Concurrent user {user_id} operation {operation_count}: " + \ f"Testing system under load at {datetime.now().isoformat()}" context = { 'user_id': user_id, 'operation_count': operation_count, 'test_phase': 'concurrent_load' } category = f"concurrent_user_{user_id % 5}" op_start = time.time() try: pattern_id = await memory_router.store_pattern(content, context, category) duration = time.time() - op_start metrics.record_operation( 'store', duration, True, {'user': user_id, 'pattern_id': pattern_id} ) except Exception as e: duration = time.time() - op_start metrics.record_operation('store', duration, False) metrics.record_error('store_error', str(e), {'user': user_id}) operation_count += 1 # Brief pause between operations for this user await asyncio.sleep(random.uniform(0.1, 0.5)) # Launch concurrent users user_tasks = [] for user_id in range(max_concurrent): task = asyncio.create_task(concurrent_user(user_id, 30)) # 30 seconds per user user_tasks.append(task) # Stagger user startup await asyncio.sleep(0.1) # Monitor progress start_time = time.time() while not all(task.done() for task in user_tasks): await asyncio.sleep(5) metrics.record_memory_sample() # Progress update elapsed = time.time() - start_time completed_tasks = sum(1 for task in user_tasks if task.done()) print(f" Progress: {completed_tasks}/{max_concurrent} users completed, {elapsed:.1f}s elapsed") # Wait for all users to complete await asyncio.gather(*user_tasks, return_exceptions=True) total_time = time.time() - start_time print(f"✅ Concurrent Load Test: {max_concurrent} users completed in {total_time:.3f}s") return True async def stress_test_memory_stability(memory_router, metrics: StressTestMetrics, duration_minutes: int = 10): """Test for memory leaks and stability over extended period""" print(f"🧠 Memory Stability Test: {duration_minutes} minutes duration...") start_time = time.time() end_time = start_time + (duration_minutes * 60) operation_count = 0 gc_count = 0 while time.time() < end_time: # Mixed operations to stress memory operations = [] # Create batch of operations for _ in range(20): if random.random() < 0.6: # Store operation content = f"Memory stability test pattern {operation_count}: " + \ f"Testing for memory leaks with large content " + \ f"{'x' * random.randint(100, 1000)}" # Variable content size operations.append(('store', content, { 'operation_count': operation_count, 'test_phase': 'memory_stability', 'content_size': len(content) }, f"memory_category_{operation_count % 3}")) else: # Search operation queries = [ "memory stability test pattern", "testing for memory leaks", "large content memory test", "stability pattern operation" ] operations.append(('search', random.choice(queries), {}, {})) operation_count += 1 # Execute operations for op_type, content_or_query, context, category in operations: op_start = time.time() try: if op_type == 'store': result = await memory_router.store_pattern(content_or_query, context, category) duration = time.time() - op_start metrics.record_operation('store', duration, True) else: result = await memory_router.search_patterns(content_or_query, top_k=5) duration = time.time() - op_start metrics.record_operation('search', duration, True) except Exception as e: duration = time.time() - op_start metrics.record_operation(op_type, duration, False) metrics.record_error(f'{op_type}_error', str(e)) # Memory monitoring metrics.record_memory_sample() # Periodic garbage collection if operation_count % 200 == 0: gc.collect() gc_count += 1 print(f" Memory check: {operation_count} operations, GC run #{gc_count}") # Brief pause await asyncio.sleep(0.1) # Final garbage collection and memory sample gc.collect() metrics.record_memory_sample() total_time = time.time() - start_time print(f"✅ Memory Stability Test: {operation_count} operations in {total_time:.3f}s") return True async def run_comprehensive_stress_tests(): """Run all stress tests and generate comprehensive report""" print("🚀 Comprehensive Stress Testing Suite") print("=" * 60) metrics = StressTestMetrics() try: # Initialize memory router print("🔧 Initializing test memory router...") memory_router = await create_test_hybrid_memory() print("✅ Memory router initialized") # Initial memory baseline metrics.record_memory_sample() # Test suite tests = [ ("Large Dataset Stress Test", lambda: stress_test_large_dataset(memory_router, metrics, 1000)), ("Extreme Concurrent Load Test", lambda: stress_test_concurrent_load(memory_router, metrics, 15)), ("Memory Stability Test", lambda: stress_test_memory_stability(memory_router, metrics, 5)), ] for test_name, test_func in tests: print(f"\n🧪 Running {test_name}...") test_start = time.time() try: success = await test_func() test_time = time.time() - test_start print(f"✅ {test_name} completed in {test_time:.3f}s") except Exception as e: test_time = time.time() - test_start print(f"❌ {test_name} failed: {e}") metrics.record_error('test_failure', str(e), {'test': test_name}) # Generate comprehensive report print(f"\n📊 Comprehensive Stress Test Results") print("=" * 60) summary = metrics.get_summary() print(f"🕒 Test Duration: {summary['test_duration_seconds']:.1f} seconds") print(f"📈 Operations: {summary['total_operations']:,} total") print(f"✅ Success Rate: {summary['success_rate']:.1%}") print(f"⚡ Throughput: {summary['operations_per_second']:.1f} ops/second") print(f"\n⏱️ Performance Metrics:") perf = summary['performance'] print(f" Average: {perf['avg_duration_ms']:.1f}ms") print(f" 95th percentile: {perf['p95_duration_ms']:.1f}ms") print(f" 99th percentile: {perf['p99_duration_ms']:.1f}ms") print(f" Min/Max: {perf['min_duration_ms']:.1f}ms / {perf['max_duration_ms']:.1f}ms") print(f"\n🧠 Memory Analysis:") mem = summary['memory'] print(f" Memory Growth: {mem['growth_mb']:.1f} MB") print(f" Peak Usage: {mem['max_usage_mb']:.1f} MB") print(f" Average CPU: {mem['avg_cpu_percent']:.1f}%") print(f" Samples: {mem['samples_collected']} collected") print(f"\n❌ Error Analysis:") errors = summary['errors'] print(f" Total Errors: {errors['total_errors']}") print(f" Error Rate: {errors['error_rate']:.1%}") print(f" Error Types: {', '.join(errors['error_types']) if errors['error_types'] else 'None'}") # Performance assessment print(f"\n🎯 Performance Assessment:") if summary['success_rate'] >= 0.95: print("✅ Excellent reliability (≥95% success rate)") elif summary['success_rate'] >= 0.90: print("⚠️ Good reliability (≥90% success rate)") else: print("❌ Poor reliability (<90% success rate)") if perf['p95_duration_ms'] <= 100: print("✅ Excellent response times (≤100ms p95)") elif perf['p95_duration_ms'] <= 200: print("⚠️ Good response times (≤200ms p95)") else: print("❌ Poor response times (>200ms p95)") if mem['growth_mb'] <= 50: print("✅ Excellent memory efficiency (≤50MB growth)") elif mem['growth_mb'] <= 100: print("⚠️ Good memory efficiency (≤100MB growth)") else: print("❌ Poor memory efficiency (>100MB growth)") # Overall assessment overall_score = ( (1 if summary['success_rate'] >= 0.95 else 0.5 if summary['success_rate'] >= 0.90 else 0) + (1 if perf['p95_duration_ms'] <= 100 else 0.5 if perf['p95_duration_ms'] <= 200 else 0) + (1 if mem['growth_mb'] <= 50 else 0.5 if mem['growth_mb'] <= 100 else 0) ) / 3 print(f"\n🏆 Overall Assessment:") if overall_score >= 0.9: print("🎉 EXCELLENT - System exceeds all stress testing requirements!") elif overall_score >= 0.7: print("✅ GOOD - System performs well under stress with minor concerns") elif overall_score >= 0.5: print("⚠️ ACCEPTABLE - System functions under stress but needs improvement") else: print("❌ POOR - System struggles under stress, requires optimization") return overall_score >= 0.7 except Exception as e: print(f"❌ Stress testing suite failed: {e}") import traceback traceback.print_exc() return False if __name__ == "__main__": success = asyncio.run(run_comprehensive_stress_tests()) sys.exit(0 if success else 1)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/airmcp-com/mcp-standards'

If you have feedback or need assistance with the MCP directory API, please join our Discord server