Skip to main content
Glama

MaverickMCP

by wshobson
MIT License
165
  • Apple
test_stress.py35.3 kB
""" Stress Testing for Resource Usage Under Load. This test suite covers: - Sustained load testing (1+ hour) - Memory leak detection over time - CPU utilization monitoring under stress - Database connection pool exhaustion - File descriptor limits testing - Network connection limits - Queue overflow scenarios - System stability under extreme conditions """ import asyncio import gc import logging import resource import threading import time from dataclasses import dataclass from typing import Any from unittest.mock import Mock import numpy as np import pandas as pd import psutil import pytest from maverick_mcp.backtesting import VectorBTEngine from maverick_mcp.backtesting.persistence import BacktestPersistenceManager from maverick_mcp.backtesting.strategies import STRATEGY_TEMPLATES logger = logging.getLogger(__name__) @dataclass class ResourceSnapshot: """Snapshot of system resources at a point in time.""" timestamp: float memory_rss_mb: float memory_vms_mb: float memory_percent: float cpu_percent: float threads: int file_descriptors: int connections: int swap_usage_mb: float class ResourceMonitor: """Monitor system resources over time.""" def __init__(self, interval: float = 1.0): self.interval = interval self.snapshots: list[ResourceSnapshot] = [] self.monitoring = False self.monitor_thread = None self.process = psutil.Process() def start_monitoring(self): """Start continuous resource monitoring.""" self.monitoring = True self.monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True) self.monitor_thread.start() logger.info("Resource monitoring started") def stop_monitoring(self): """Stop resource monitoring.""" self.monitoring = False if self.monitor_thread: self.monitor_thread.join(timeout=2.0) logger.info( f"Resource monitoring stopped. Collected {len(self.snapshots)} snapshots" ) def _monitor_loop(self): """Continuous monitoring loop.""" while self.monitoring: try: snapshot = self._take_snapshot() self.snapshots.append(snapshot) time.sleep(self.interval) except Exception as e: logger.error(f"Error in resource monitoring: {e}") def _take_snapshot(self) -> ResourceSnapshot: """Take a resource snapshot.""" memory_info = self.process.memory_info() # Get file descriptor count try: fd_count = self.process.num_fds() except AttributeError: # Windows doesn't have num_fds() fd_count = len(self.process.open_files()) # Get connection count try: connections = len(self.process.connections()) except (psutil.AccessDenied, psutil.NoSuchProcess): connections = 0 # Get swap usage try: swap = psutil.swap_memory() swap_used_mb = swap.used / 1024 / 1024 except Exception: swap_used_mb = 0 return ResourceSnapshot( timestamp=time.time(), memory_rss_mb=memory_info.rss / 1024 / 1024, memory_vms_mb=memory_info.vms / 1024 / 1024, memory_percent=self.process.memory_percent(), cpu_percent=self.process.cpu_percent(), threads=self.process.num_threads(), file_descriptors=fd_count, connections=connections, swap_usage_mb=swap_used_mb, ) def get_current_snapshot(self) -> ResourceSnapshot: """Get current resource snapshot.""" return self._take_snapshot() def analyze_trends(self) -> dict[str, Any]: """Analyze resource usage trends.""" if len(self.snapshots) < 2: return {"error": "Insufficient data for trend analysis"} # Calculate trends timestamps = [s.timestamp for s in self.snapshots] memories = [s.memory_rss_mb for s in self.snapshots] cpus = [s.cpu_percent for s in self.snapshots] fds = [s.file_descriptors for s in self.snapshots] # Linear regression for memory trend n = len(timestamps) sum_t = sum(timestamps) sum_m = sum(memories) sum_tm = sum(t * m for t, m in zip(timestamps, memories, strict=False)) sum_tt = sum(t * t for t in timestamps) memory_slope = ( (n * sum_tm - sum_t * sum_m) / (n * sum_tt - sum_t * sum_t) if n * sum_tt != sum_t * sum_t else 0 ) return { "duration_seconds": timestamps[-1] - timestamps[0], "initial_memory_mb": memories[0], "final_memory_mb": memories[-1], "memory_growth_mb": memories[-1] - memories[0], "memory_growth_rate_mb_per_hour": memory_slope * 3600, "peak_memory_mb": max(memories), "avg_cpu_percent": sum(cpus) / len(cpus), "peak_cpu_percent": max(cpus), "initial_file_descriptors": fds[0], "final_file_descriptors": fds[-1], "fd_growth": fds[-1] - fds[0], "peak_file_descriptors": max(fds), "snapshots_count": len(self.snapshots), } class StressTestRunner: """Run various stress tests.""" def __init__(self, data_provider): self.data_provider = data_provider self.resource_monitor = ResourceMonitor(interval=2.0) async def sustained_load_test( self, duration_minutes: int = 60, concurrent_load: int = 10 ) -> dict[str, Any]: """Run sustained load test for extended duration.""" logger.info( f"Starting sustained load test: {duration_minutes} minutes with {concurrent_load} concurrent operations" ) self.resource_monitor.start_monitoring() start_time = time.time() end_time = start_time + (duration_minutes * 60) total_operations = 0 total_errors = 0 operation_times = [] try: # Create semaphore for concurrent control semaphore = asyncio.Semaphore(concurrent_load) async def sustained_operation(operation_id: int): """Single sustained operation.""" nonlocal total_operations, total_errors engine = VectorBTEngine(data_provider=self.data_provider) symbol = f"STRESS_{operation_id % 20}" # Cycle through 20 symbols strategy = ["sma_cross", "rsi", "macd"][ operation_id % 3 ] # Cycle through strategies try: async with semaphore: op_start = time.time() await engine.run_backtest( symbol=symbol, strategy_type=strategy, parameters=STRATEGY_TEMPLATES[strategy]["parameters"], start_date="2023-01-01", end_date="2023-12-31", ) op_time = time.time() - op_start operation_times.append(op_time) total_operations += 1 if total_operations % 100 == 0: logger.info(f"Completed {total_operations} operations") except Exception as e: total_errors += 1 logger.error(f"Operation {operation_id} failed: {e}") # Run operations continuously until duration expires operation_id = 0 active_tasks = [] while time.time() < end_time: # Start new operation task = asyncio.create_task(sustained_operation(operation_id)) active_tasks.append(task) operation_id += 1 # Clean up completed tasks active_tasks = [t for t in active_tasks if not t.done()] # Control task creation rate await asyncio.sleep(0.1) # Prevent task accumulation if len(active_tasks) > concurrent_load * 2: await asyncio.sleep(1.0) # Wait for remaining tasks to complete if active_tasks: await asyncio.gather(*active_tasks, return_exceptions=True) finally: self.resource_monitor.stop_monitoring() actual_duration = time.time() - start_time trend_analysis = self.resource_monitor.analyze_trends() return { "duration_minutes": actual_duration / 60, "total_operations": total_operations, "total_errors": total_errors, "error_rate": total_errors / total_operations if total_operations > 0 else 0, "operations_per_minute": total_operations / (actual_duration / 60), "avg_operation_time": sum(operation_times) / len(operation_times) if operation_times else 0, "resource_trends": trend_analysis, "concurrent_load": concurrent_load, } async def memory_leak_detection_test( self, iterations: int = 1000 ) -> dict[str, Any]: """Test for memory leaks over many iterations.""" logger.info(f"Starting memory leak detection test with {iterations} iterations") self.resource_monitor.start_monitoring() engine = VectorBTEngine(data_provider=self.data_provider) initial_memory = self.resource_monitor.get_current_snapshot().memory_rss_mb memory_measurements = [] try: for i in range(iterations): # Run backtest operation symbol = f"LEAK_TEST_{i % 10}" await engine.run_backtest( symbol=symbol, strategy_type="sma_cross", parameters=STRATEGY_TEMPLATES["sma_cross"]["parameters"], start_date="2023-01-01", end_date="2023-12-31", ) # Force garbage collection every 50 iterations if i % 50 == 0: gc.collect() snapshot = self.resource_monitor.get_current_snapshot() memory_measurements.append( { "iteration": i, "memory_mb": snapshot.memory_rss_mb, "memory_growth": snapshot.memory_rss_mb - initial_memory, } ) if i % 200 == 0: logger.info( f"Iteration {i}: Memory = {snapshot.memory_rss_mb:.1f}MB " f"(+{snapshot.memory_rss_mb - initial_memory:.1f}MB)" ) finally: self.resource_monitor.stop_monitoring() final_memory = self.resource_monitor.get_current_snapshot().memory_rss_mb total_growth = final_memory - initial_memory # Analyze memory leak pattern if len(memory_measurements) > 2: iterations_list = [m["iteration"] for m in memory_measurements] growth_list = [m["memory_growth"] for m in memory_measurements] # Linear regression to detect memory leak n = len(iterations_list) sum_x = sum(iterations_list) sum_y = sum(growth_list) sum_xy = sum( x * y for x, y in zip(iterations_list, growth_list, strict=False) ) sum_xx = sum(x * x for x in iterations_list) if n * sum_xx != sum_x * sum_x: leak_rate = (n * sum_xy - sum_x * sum_y) / (n * sum_xx - sum_x * sum_x) else: leak_rate = 0 # Memory leak per 1000 iterations leak_per_1000_iterations = leak_rate * 1000 else: leak_rate = 0 leak_per_1000_iterations = 0 return { "iterations": iterations, "initial_memory_mb": initial_memory, "final_memory_mb": final_memory, "total_memory_growth_mb": total_growth, "leak_rate_mb_per_iteration": leak_rate, "leak_per_1000_iterations_mb": leak_per_1000_iterations, "memory_measurements": memory_measurements, "leak_detected": abs(leak_per_1000_iterations) > 10.0, # More than 10MB per 1000 iterations } async def cpu_stress_test( self, duration_minutes: int = 10, cpu_target: float = 0.9 ) -> dict[str, Any]: """Test CPU utilization under stress.""" logger.info( f"Starting CPU stress test: {duration_minutes} minutes at {cpu_target * 100}% target" ) self.resource_monitor.start_monitoring() # Create CPU-intensive background load stop_event = threading.Event() cpu_threads = [] def cpu_intensive_task(): """CPU-intensive computation.""" while not stop_event.is_set(): # Perform CPU-intensive work for _ in range(10000): _ = sum(i**2 for i in range(100)) time.sleep(0.001) # Brief pause try: # Start CPU load threads num_cpu_threads = max(1, int(psutil.cpu_count() * cpu_target)) for _ in range(num_cpu_threads): thread = threading.Thread(target=cpu_intensive_task, daemon=True) thread.start() cpu_threads.append(thread) # Run backtests under CPU stress engine = VectorBTEngine(data_provider=self.data_provider) start_time = time.time() end_time = start_time + (duration_minutes * 60) operations_completed = 0 cpu_stress_errors = 0 response_times = [] while time.time() < end_time: try: op_start = time.time() symbol = f"CPU_STRESS_{operations_completed % 5}" await asyncio.wait_for( engine.run_backtest( symbol=symbol, strategy_type="rsi", parameters=STRATEGY_TEMPLATES["rsi"]["parameters"], start_date="2023-01-01", end_date="2023-12-31", ), timeout=30.0, # Prevent hanging under CPU stress ) response_time = time.time() - op_start response_times.append(response_time) operations_completed += 1 except TimeoutError: cpu_stress_errors += 1 logger.warning("Operation timed out under CPU stress") except Exception as e: cpu_stress_errors += 1 logger.error(f"Operation failed under CPU stress: {e}") # Brief pause between operations await asyncio.sleep(1.0) finally: # Stop CPU stress stop_event.set() for thread in cpu_threads: thread.join(timeout=1.0) self.resource_monitor.stop_monitoring() trend_analysis = self.resource_monitor.analyze_trends() return { "duration_minutes": duration_minutes, "cpu_target_percent": cpu_target * 100, "operations_completed": operations_completed, "cpu_stress_errors": cpu_stress_errors, "error_rate": cpu_stress_errors / (operations_completed + cpu_stress_errors) if (operations_completed + cpu_stress_errors) > 0 else 0, "avg_response_time": sum(response_times) / len(response_times) if response_times else 0, "max_response_time": max(response_times) if response_times else 0, "avg_cpu_utilization": trend_analysis["avg_cpu_percent"], "peak_cpu_utilization": trend_analysis["peak_cpu_percent"], } async def database_connection_exhaustion_test( self, db_session, max_connections: int = 50 ) -> dict[str, Any]: """Test database behavior under connection exhaustion.""" logger.info( f"Starting database connection exhaustion test with {max_connections} connections" ) # Generate test data engine = VectorBTEngine(data_provider=self.data_provider) test_results = [] for i in range(5): result = await engine.run_backtest( symbol=f"DB_EXHAUST_{i}", strategy_type="macd", parameters=STRATEGY_TEMPLATES["macd"]["parameters"], start_date="2023-01-01", end_date="2023-12-31", ) test_results.append(result) # Test connection exhaustion async def database_operation(conn_id: int) -> dict[str, Any]: """Single database operation holding connection.""" try: with BacktestPersistenceManager(session=db_session) as persistence: # Hold connection and perform operations saved_ids = [] for result in test_results: backtest_id = persistence.save_backtest_result( vectorbt_results=result, execution_time=2.0, notes=f"Connection exhaustion test {conn_id}", ) saved_ids.append(backtest_id) # Perform queries for backtest_id in saved_ids: persistence.get_backtest_by_id(backtest_id) # Hold connection for some time await asyncio.sleep(2.0) return { "connection_id": conn_id, "operations_completed": len(saved_ids) * 2, # Save + retrieve "success": True, } except Exception as e: return { "connection_id": conn_id, "error": str(e), "success": False, } # Create many concurrent database operations start_time = time.time() connection_tasks = [database_operation(i) for i in range(max_connections)] # Execute with timeout to prevent hanging try: results = await asyncio.wait_for( asyncio.gather(*connection_tasks, return_exceptions=True), timeout=60.0 ) except TimeoutError: logger.warning("Database connection test timed out") results = [] execution_time = time.time() - start_time # Analyze results successful_connections = sum( 1 for r in results if isinstance(r, dict) and r.get("success", False) ) failed_connections = len(results) - successful_connections total_operations = sum( r.get("operations_completed", 0) for r in results if isinstance(r, dict) and r.get("success", False) ) return { "max_connections_attempted": max_connections, "successful_connections": successful_connections, "failed_connections": failed_connections, "connection_success_rate": successful_connections / max_connections if max_connections > 0 else 0, "total_operations": total_operations, "execution_time": execution_time, "operations_per_second": total_operations / execution_time if execution_time > 0 else 0, } async def file_descriptor_exhaustion_test(self) -> dict[str, Any]: """Test file descriptor usage patterns.""" logger.info("Starting file descriptor exhaustion test") initial_snapshot = self.resource_monitor.get_current_snapshot() initial_fds = initial_snapshot.file_descriptors self.resource_monitor.start_monitoring() # Get system file descriptor limit try: soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE) except Exception: soft_limit, hard_limit = 1024, 4096 # Default assumptions logger.info( f"FD limits - Soft: {soft_limit}, Hard: {hard_limit}, Initial: {initial_fds}" ) try: engine = VectorBTEngine(data_provider=self.data_provider) # Run many operations to stress file descriptor usage fd_measurements = [] max_operations = min(100, soft_limit // 10) # Conservative approach for i in range(max_operations): await engine.run_backtest( symbol=f"FD_TEST_{i}", strategy_type="sma_cross", parameters=STRATEGY_TEMPLATES["sma_cross"]["parameters"], start_date="2023-01-01", end_date="2023-12-31", ) if i % 10 == 0: snapshot = self.resource_monitor.get_current_snapshot() fd_measurements.append( { "iteration": i, "file_descriptors": snapshot.file_descriptors, "fd_growth": snapshot.file_descriptors - initial_fds, } ) if snapshot.file_descriptors > soft_limit * 0.8: logger.warning( f"High FD usage detected: {snapshot.file_descriptors}/{soft_limit}" ) finally: self.resource_monitor.stop_monitoring() final_snapshot = self.resource_monitor.get_current_snapshot() final_fds = final_snapshot.file_descriptors fd_growth = final_fds - initial_fds # Analyze FD usage pattern peak_fds = max([m["file_descriptors"] for m in fd_measurements] + [final_fds]) fd_utilization = peak_fds / soft_limit if soft_limit > 0 else 0 return { "initial_file_descriptors": initial_fds, "final_file_descriptors": final_fds, "peak_file_descriptors": peak_fds, "fd_growth": fd_growth, "soft_limit": soft_limit, "hard_limit": hard_limit, "fd_utilization_percent": fd_utilization * 100, "fd_measurements": fd_measurements, "operations_completed": max_operations, } class TestStressTesting: """Stress testing suite.""" @pytest.fixture async def stress_data_provider(self): """Create data provider optimized for stress testing.""" provider = Mock() # Cache to reduce computation overhead during stress tests data_cache = {} def get_stress_test_data(symbol: str) -> pd.DataFrame: """Get cached data for stress testing.""" if symbol not in data_cache: # Generate smaller dataset for faster stress testing dates = pd.date_range(start="2023-06-01", end="2023-12-31", freq="D") seed = hash(symbol) % 1000 np.random.seed(seed) returns = np.random.normal(0.001, 0.02, len(dates)) prices = 100 * np.cumprod(1 + returns) data_cache[symbol] = pd.DataFrame( { "Open": prices * np.random.uniform(0.99, 1.01, len(dates)), "High": prices * np.random.uniform(1.01, 1.03, len(dates)), "Low": prices * np.random.uniform(0.97, 0.99, len(dates)), "Close": prices, "Volume": np.random.randint(1000000, 5000000, len(dates)), "Adj Close": prices, }, index=dates, ) return data_cache[symbol].copy() provider.get_stock_data.side_effect = get_stress_test_data return provider @pytest.mark.slow async def test_sustained_load_15_minutes(self, stress_data_provider): """Test sustained load for 15 minutes (abbreviated from 1 hour for CI).""" stress_runner = StressTestRunner(stress_data_provider) result = await stress_runner.sustained_load_test( duration_minutes=15, # Reduced for CI/testing concurrent_load=8, ) # Assertions for sustained load assert result["error_rate"] <= 0.05, ( f"Error rate too high: {result['error_rate']:.3f}" ) assert result["operations_per_minute"] >= 10, ( f"Throughput too low: {result['operations_per_minute']:.1f} ops/min" ) # Resource growth should be reasonable trends = result["resource_trends"] assert trends["memory_growth_rate_mb_per_hour"] <= 100, ( f"Memory growth rate too high: {trends['memory_growth_rate_mb_per_hour']:.1f} MB/hour" ) logger.info( f"✓ Sustained load test completed: {result['total_operations']} operations in {result['duration_minutes']:.1f} minutes" ) return result async def test_memory_leak_detection(self, stress_data_provider): """Test for memory leaks over many iterations.""" stress_runner = StressTestRunner(stress_data_provider) result = await stress_runner.memory_leak_detection_test(iterations=200) # Memory leak assertions assert not result["leak_detected"], ( f"Memory leak detected: {result['leak_per_1000_iterations_mb']:.2f} MB per 1000 iterations" ) assert result["total_memory_growth_mb"] <= 300, ( f"Total memory growth too high: {result['total_memory_growth_mb']:.1f} MB" ) logger.info( f"✓ Memory leak test completed: {result['total_memory_growth_mb']:.1f}MB growth over {result['iterations']} iterations" ) return result async def test_cpu_stress_resilience(self, stress_data_provider): """Test system resilience under CPU stress.""" stress_runner = StressTestRunner(stress_data_provider) result = await stress_runner.cpu_stress_test( duration_minutes=5, # Reduced for testing cpu_target=0.7, # 70% CPU utilization ) # CPU stress assertions assert result["error_rate"] <= 0.2, ( f"Error rate too high under CPU stress: {result['error_rate']:.3f}" ) assert result["avg_response_time"] <= 10.0, ( f"Response time too slow under CPU stress: {result['avg_response_time']:.2f}s" ) assert result["operations_completed"] >= 10, ( f"Too few operations completed: {result['operations_completed']}" ) logger.info( f"✓ CPU stress test completed: {result['operations_completed']} operations with {result['avg_cpu_utilization']:.1f}% avg CPU" ) return result async def test_database_connection_stress(self, stress_data_provider, db_session): """Test database performance under connection stress.""" stress_runner = StressTestRunner(stress_data_provider) result = await stress_runner.database_connection_exhaustion_test( db_session=db_session, max_connections=20, # Reduced for testing ) # Database stress assertions assert result["connection_success_rate"] >= 0.8, ( f"Connection success rate too low: {result['connection_success_rate']:.3f}" ) assert result["operations_per_second"] >= 5.0, ( f"Database throughput too low: {result['operations_per_second']:.2f} ops/s" ) logger.info( f"✓ Database stress test completed: {result['successful_connections']}/{result['max_connections_attempted']} connections succeeded" ) return result async def test_file_descriptor_management(self, stress_data_provider): """Test file descriptor usage under stress.""" stress_runner = StressTestRunner(stress_data_provider) result = await stress_runner.file_descriptor_exhaustion_test() # File descriptor assertions assert result["fd_utilization_percent"] <= 50.0, ( f"FD utilization too high: {result['fd_utilization_percent']:.1f}%" ) assert result["fd_growth"] <= 100, f"FD growth too high: {result['fd_growth']}" logger.info( f"✓ File descriptor test completed: {result['peak_file_descriptors']} peak FDs ({result['fd_utilization_percent']:.1f}% utilization)" ) return result async def test_queue_overflow_scenarios(self, stress_data_provider): """Test queue management under overflow conditions.""" # Simulate queue overflow by creating more tasks than can be processed max_queue_size = 50 overflow_tasks = 100 semaphore = asyncio.Semaphore(5) # Limit concurrent processing processed_tasks = 0 overflow_errors = 0 async def queue_task(task_id: int): nonlocal processed_tasks, overflow_errors try: async with semaphore: engine = VectorBTEngine(data_provider=stress_data_provider) await engine.run_backtest( symbol=f"QUEUE_{task_id % 10}", strategy_type="sma_cross", parameters=STRATEGY_TEMPLATES["sma_cross"]["parameters"], start_date="2023-06-01", end_date="2023-12-31", ) processed_tasks += 1 except Exception as e: overflow_errors += 1 logger.error(f"Queue task {task_id} failed: {e}") # Create tasks faster than they can be processed start_time = time.time() tasks = [] for i in range(overflow_tasks): task = asyncio.create_task(queue_task(i)) tasks.append(task) # Create tasks rapidly to test queue management if i < max_queue_size: await asyncio.sleep(0.01) # Rapid creation else: await asyncio.sleep(0.1) # Slower creation after queue fills # Wait for all tasks to complete await asyncio.gather(*tasks, return_exceptions=True) execution_time = time.time() - start_time # Queue overflow assertions processing_success_rate = processed_tasks / overflow_tasks assert processing_success_rate >= 0.8, ( f"Queue processing success rate too low: {processing_success_rate:.3f}" ) assert execution_time < 120.0, ( f"Queue processing took too long: {execution_time:.1f}s" ) logger.info( f"✓ Queue overflow test completed: {processed_tasks}/{overflow_tasks} tasks processed in {execution_time:.1f}s" ) return { "overflow_tasks": overflow_tasks, "processed_tasks": processed_tasks, "overflow_errors": overflow_errors, "processing_success_rate": processing_success_rate, "execution_time": execution_time, } async def test_comprehensive_stress_suite(self, stress_data_provider, db_session): """Run comprehensive stress testing suite.""" logger.info("Starting Comprehensive Stress Testing Suite...") stress_results = {} # Run individual stress tests stress_results["sustained_load"] = await self.test_sustained_load_15_minutes( stress_data_provider ) stress_results["memory_leak"] = await self.test_memory_leak_detection( stress_data_provider ) stress_results["cpu_stress"] = await self.test_cpu_stress_resilience( stress_data_provider ) stress_results["database_stress"] = await self.test_database_connection_stress( stress_data_provider, db_session ) stress_results["file_descriptors"] = await self.test_file_descriptor_management( stress_data_provider ) stress_results["queue_overflow"] = await self.test_queue_overflow_scenarios( stress_data_provider ) # Aggregate stress test analysis total_tests = len(stress_results) passed_tests = 0 critical_failures = [] for test_name, result in stress_results.items(): # Simple pass/fail based on whether test completed without major issues test_passed = True if test_name == "sustained_load" and result["error_rate"] > 0.1: test_passed = False critical_failures.append( f"Sustained load error rate: {result['error_rate']:.3f}" ) elif test_name == "memory_leak" and result["leak_detected"]: test_passed = False critical_failures.append( f"Memory leak detected: {result['leak_per_1000_iterations_mb']:.2f} MB/1k iterations" ) elif test_name == "cpu_stress" and result["error_rate"] > 0.3: test_passed = False critical_failures.append( f"CPU stress error rate: {result['error_rate']:.3f}" ) if test_passed: passed_tests += 1 overall_pass_rate = passed_tests / total_tests logger.info( f"\n{'=' * 60}\n" f"COMPREHENSIVE STRESS TEST REPORT\n" f"{'=' * 60}\n" f"Total Tests: {total_tests}\n" f"Passed: {passed_tests}\n" f"Overall Pass Rate: {overall_pass_rate:.1%}\n" f"Critical Failures: {len(critical_failures)}\n" f"{'=' * 60}\n" ) # Assert overall stress test success assert overall_pass_rate >= 0.8, ( f"Overall stress test pass rate too low: {overall_pass_rate:.1%}" ) assert len(critical_failures) <= 1, ( f"Too many critical failures: {critical_failures}" ) return { "overall_pass_rate": overall_pass_rate, "critical_failures": critical_failures, "stress_results": stress_results, } if __name__ == "__main__": # Run stress testing suite pytest.main( [ __file__, "-v", "--tb=short", "--asyncio-mode=auto", "--timeout=1800", # 30 minute timeout for stress tests "-m", "not slow", # Skip slow tests by default ] )

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wshobson/maverick-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server