Skip to main content
Glama

MaverickMCP

by wshobson
MIT License
165
  • Apple
test_load.py24.6 kB
""" Load Testing for Concurrent Users and Backtest Operations. This test suite covers: - Concurrent user load testing (10, 50, 100 users) - Response time and throughput measurement - Memory usage under concurrent load - Database performance with multiple connections - API rate limiting behavior - Queue management and task distribution - System stability under sustained load """ import asyncio import logging import os import random import statistics import time from dataclasses import dataclass from typing import Any from unittest.mock import Mock import numpy as np import pandas as pd import psutil import pytest from maverick_mcp.backtesting import VectorBTEngine from maverick_mcp.backtesting.persistence import BacktestPersistenceManager from maverick_mcp.backtesting.strategies import STRATEGY_TEMPLATES logger = logging.getLogger(__name__) @dataclass class LoadTestResult: """Data class for load test results.""" concurrent_users: int total_requests: int successful_requests: int failed_requests: int total_duration: float avg_response_time: float min_response_time: float max_response_time: float p50_response_time: float p95_response_time: float p99_response_time: float requests_per_second: float errors_per_second: float memory_usage_mb: float cpu_usage_percent: float class LoadTestRunner: """Load test runner with realistic user simulation.""" def __init__(self, data_provider): self.data_provider = data_provider self.results = [] self.active_requests = 0 async def simulate_user_session( self, user_id: int, session_config: dict[str, Any] ) -> dict[str, Any]: """Simulate a realistic user session with multiple backtests.""" session_start = time.time() user_results = [] response_times = [] symbols = session_config.get("symbols", ["AAPL"]) strategies = session_config.get("strategies", ["sma_cross"]) think_time_range = session_config.get("think_time", (0.5, 2.0)) engine = VectorBTEngine(data_provider=self.data_provider) for symbol in symbols: for strategy in strategies: self.active_requests += 1 request_start = time.time() try: parameters = STRATEGY_TEMPLATES.get(strategy, {}).get( "parameters", {} ) result = await engine.run_backtest( symbol=symbol, strategy_type=strategy, parameters=parameters, start_date="2023-01-01", end_date="2023-12-31", ) request_time = time.time() - request_start response_times.append(request_time) user_results.append( { "symbol": symbol, "strategy": strategy, "success": True, "response_time": request_time, "result_size": len(str(result)), } ) except Exception as e: request_time = time.time() - request_start response_times.append(request_time) user_results.append( { "symbol": symbol, "strategy": strategy, "success": False, "response_time": request_time, "error": str(e), } ) finally: self.active_requests -= 1 # Simulate think time between requests think_time = random.uniform(*think_time_range) await asyncio.sleep(think_time) session_time = time.time() - session_start return { "user_id": user_id, "session_time": session_time, "results": user_results, "response_times": response_times, "success_count": sum(1 for r in user_results if r["success"]), "failure_count": sum(1 for r in user_results if not r["success"]), } def calculate_percentiles(self, response_times: list[float]) -> dict[str, float]: """Calculate response time percentiles.""" if not response_times: return {"p50": 0, "p95": 0, "p99": 0} sorted_times = sorted(response_times) return { "p50": np.percentile(sorted_times, 50), "p95": np.percentile(sorted_times, 95), "p99": np.percentile(sorted_times, 99), } async def run_load_test( self, concurrent_users: int, session_config: dict[str, Any], duration_seconds: int = 60, ) -> LoadTestResult: """Run load test with specified concurrent users.""" logger.info( f"Starting load test: {concurrent_users} concurrent users for {duration_seconds}s" ) process = psutil.Process(os.getpid()) initial_memory = process.memory_info().rss / 1024 / 1024 # MB start_time = time.time() all_response_times = [] all_user_results = [] # Create semaphore to control concurrency semaphore = asyncio.Semaphore(concurrent_users) async def run_user_with_semaphore(user_id: int): async with semaphore: return await self.simulate_user_session(user_id, session_config) # Generate user sessions user_tasks = [] for user_id in range(concurrent_users): task = run_user_with_semaphore(user_id) user_tasks.append(task) # Execute all user sessions concurrently try: user_results = await asyncio.wait_for( asyncio.gather(*user_tasks, return_exceptions=True), timeout=duration_seconds + 30, # Add buffer to test timeout ) except TimeoutError: logger.warning(f"Load test timed out after {duration_seconds + 30}s") user_results = [] end_time = time.time() actual_duration = end_time - start_time # Process results successful_sessions = [] failed_sessions = [] for result in user_results: if isinstance(result, Exception): failed_sessions.append(str(result)) elif isinstance(result, dict): successful_sessions.append(result) all_response_times.extend(result.get("response_times", [])) all_user_results.extend(result.get("results", [])) # Calculate metrics total_requests = len(all_user_results) successful_requests = sum( 1 for r in all_user_results if r.get("success", False) ) failed_requests = total_requests - successful_requests # Response time statistics percentiles = self.calculate_percentiles(all_response_times) avg_response_time = ( statistics.mean(all_response_times) if all_response_times else 0 ) min_response_time = min(all_response_times) if all_response_times else 0 max_response_time = max(all_response_times) if all_response_times else 0 # Throughput metrics requests_per_second = ( total_requests / actual_duration if actual_duration > 0 else 0 ) errors_per_second = ( failed_requests / actual_duration if actual_duration > 0 else 0 ) # Resource usage final_memory = process.memory_info().rss / 1024 / 1024 memory_usage = final_memory - initial_memory cpu_usage = process.cpu_percent() result = LoadTestResult( concurrent_users=concurrent_users, total_requests=total_requests, successful_requests=successful_requests, failed_requests=failed_requests, total_duration=actual_duration, avg_response_time=avg_response_time, min_response_time=min_response_time, max_response_time=max_response_time, p50_response_time=percentiles["p50"], p95_response_time=percentiles["p95"], p99_response_time=percentiles["p99"], requests_per_second=requests_per_second, errors_per_second=errors_per_second, memory_usage_mb=memory_usage, cpu_usage_percent=cpu_usage, ) logger.info( f"Load Test Results ({concurrent_users} users):\n" f" • Total Requests: {total_requests}\n" f" • Success Rate: {successful_requests / total_requests * 100:.1f}%\n" f" • Avg Response Time: {avg_response_time:.3f}s\n" f" • 95th Percentile: {percentiles['p95']:.3f}s\n" f" • Throughput: {requests_per_second:.1f} req/s\n" f" • Memory Usage: {memory_usage:.1f}MB\n" f" • Duration: {actual_duration:.1f}s" ) return result class TestLoadTesting: """Load testing suite for concurrent users.""" @pytest.fixture async def optimized_data_provider(self): """Create optimized data provider for load testing.""" provider = Mock() # Pre-generate data for common symbols to reduce computation symbol_data_cache = {} def get_cached_data(symbol: str) -> pd.DataFrame: """Get or generate cached data for symbol.""" if symbol not in symbol_data_cache: # Generate deterministic data based on symbol hash seed = hash(symbol) % 1000 np.random.seed(seed) dates = pd.date_range(start="2023-01-01", end="2023-12-31", freq="D") returns = np.random.normal(0.001, 0.02, len(dates)) prices = 100 * np.cumprod(1 + returns) symbol_data_cache[symbol] = pd.DataFrame( { "Open": prices * np.random.uniform(0.99, 1.01, len(dates)), "High": prices * np.random.uniform(1.01, 1.03, len(dates)), "Low": prices * np.random.uniform(0.97, 0.99, len(dates)), "Close": prices, "Volume": np.random.randint(1000000, 10000000, len(dates)), "Adj Close": prices, }, index=dates, ) # Ensure OHLC constraints data = symbol_data_cache[symbol] data["High"] = np.maximum( data["High"], np.maximum(data["Open"], data["Close"]) ) data["Low"] = np.minimum( data["Low"], np.minimum(data["Open"], data["Close"]) ) return symbol_data_cache[symbol].copy() provider.get_stock_data.side_effect = get_cached_data return provider async def test_concurrent_users_10(self, optimized_data_provider, benchmark_timer): """Test load with 10 concurrent users.""" load_runner = LoadTestRunner(optimized_data_provider) session_config = { "symbols": ["AAPL", "GOOGL"], "strategies": ["sma_cross", "rsi"], "think_time": (0.1, 0.5), # Faster think time for testing } with benchmark_timer(): result = await load_runner.run_load_test( concurrent_users=10, session_config=session_config, duration_seconds=30 ) # Performance assertions for 10 users assert result.requests_per_second >= 2.0, ( f"Throughput too low: {result.requests_per_second:.1f} req/s" ) assert result.avg_response_time <= 5.0, ( f"Response time too high: {result.avg_response_time:.2f}s" ) assert result.p95_response_time <= 10.0, ( f"95th percentile too high: {result.p95_response_time:.2f}s" ) assert result.successful_requests / result.total_requests >= 0.9, ( "Success rate too low" ) assert result.memory_usage_mb <= 500, ( f"Memory usage too high: {result.memory_usage_mb:.1f}MB" ) return result async def test_concurrent_users_50(self, optimized_data_provider, benchmark_timer): """Test load with 50 concurrent users.""" load_runner = LoadTestRunner(optimized_data_provider) session_config = { "symbols": ["AAPL", "MSFT", "GOOGL"], "strategies": ["sma_cross", "rsi", "macd"], "think_time": (0.2, 1.0), } with benchmark_timer(): result = await load_runner.run_load_test( concurrent_users=50, session_config=session_config, duration_seconds=60 ) # Performance assertions for 50 users assert result.requests_per_second >= 5.0, ( f"Throughput too low: {result.requests_per_second:.1f} req/s" ) assert result.avg_response_time <= 8.0, ( f"Response time too high: {result.avg_response_time:.2f}s" ) assert result.p95_response_time <= 15.0, ( f"95th percentile too high: {result.p95_response_time:.2f}s" ) assert result.successful_requests / result.total_requests >= 0.85, ( "Success rate too low" ) assert result.memory_usage_mb <= 1000, ( f"Memory usage too high: {result.memory_usage_mb:.1f}MB" ) return result async def test_concurrent_users_100(self, optimized_data_provider, benchmark_timer): """Test load with 100 concurrent users.""" load_runner = LoadTestRunner(optimized_data_provider) session_config = { "symbols": ["AAPL", "MSFT", "GOOGL", "AMZN"], "strategies": ["sma_cross", "rsi"], # Reduced strategies for higher load "think_time": (0.5, 1.5), } with benchmark_timer(): result = await load_runner.run_load_test( concurrent_users=100, session_config=session_config, duration_seconds=90 ) # More relaxed performance assertions for 100 users assert result.requests_per_second >= 3.0, ( f"Throughput too low: {result.requests_per_second:.1f} req/s" ) assert result.avg_response_time <= 15.0, ( f"Response time too high: {result.avg_response_time:.2f}s" ) assert result.p95_response_time <= 30.0, ( f"95th percentile too high: {result.p95_response_time:.2f}s" ) assert result.successful_requests / result.total_requests >= 0.8, ( "Success rate too low" ) assert result.memory_usage_mb <= 2000, ( f"Memory usage too high: {result.memory_usage_mb:.1f}MB" ) return result async def test_load_scalability_analysis(self, optimized_data_provider): """Analyze how performance scales with user load.""" load_runner = LoadTestRunner(optimized_data_provider) session_config = { "symbols": ["AAPL", "GOOGL"], "strategies": ["sma_cross"], "think_time": (0.3, 0.7), } user_loads = [5, 10, 20, 40] scalability_results = [] for user_count in user_loads: logger.info(f"Testing scalability with {user_count} users") result = await load_runner.run_load_test( concurrent_users=user_count, session_config=session_config, duration_seconds=30, ) scalability_results.append(result) # Analyze scalability metrics throughput_efficiency = [] response_time_degradation = [] baseline_rps = scalability_results[0].requests_per_second baseline_response_time = scalability_results[0].avg_response_time for i, result in enumerate(scalability_results): expected_rps = baseline_rps * user_loads[i] / user_loads[0] actual_efficiency = ( result.requests_per_second / expected_rps if expected_rps > 0 else 0 ) throughput_efficiency.append(actual_efficiency) response_degradation = ( result.avg_response_time / baseline_response_time if baseline_response_time > 0 else 1 ) response_time_degradation.append(response_degradation) logger.info( f"Scalability Analysis ({user_loads[i]} users):\n" f" • RPS: {result.requests_per_second:.2f}\n" f" • RPS Efficiency: {actual_efficiency:.2%}\n" f" • Response Time: {result.avg_response_time:.3f}s\n" f" • Response Degradation: {response_degradation:.2f}x\n" f" • Memory: {result.memory_usage_mb:.1f}MB" ) # Scalability assertions avg_efficiency = statistics.mean(throughput_efficiency) max_response_degradation = max(response_time_degradation) assert avg_efficiency >= 0.5, ( f"Average throughput efficiency too low: {avg_efficiency:.2%}" ) assert max_response_degradation <= 5.0, ( f"Response time degradation too high: {max_response_degradation:.1f}x" ) return { "user_loads": user_loads, "results": scalability_results, "throughput_efficiency": throughput_efficiency, "response_time_degradation": response_time_degradation, "avg_efficiency": avg_efficiency, } async def test_sustained_load_stability(self, optimized_data_provider): """Test stability under sustained load.""" load_runner = LoadTestRunner(optimized_data_provider) session_config = { "symbols": ["AAPL", "MSFT"], "strategies": ["sma_cross", "rsi"], "think_time": (0.5, 1.0), } # Run sustained load for longer duration result = await load_runner.run_load_test( concurrent_users=25, session_config=session_config, duration_seconds=300, # 5 minutes ) # Stability assertions assert result.errors_per_second <= 0.1, ( f"Error rate too high: {result.errors_per_second:.3f} err/s" ) assert result.successful_requests / result.total_requests >= 0.95, ( "Success rate degraded over time" ) assert result.memory_usage_mb <= 800, ( f"Memory usage grew too much: {result.memory_usage_mb:.1f}MB" ) # Check for performance consistency (no significant degradation) assert result.p99_response_time / result.p50_response_time <= 5.0, ( "Response time variance too high" ) logger.info( f"Sustained Load Results (25 users, 5 minutes):\n" f" • Total Requests: {result.total_requests}\n" f" • Success Rate: {result.successful_requests / result.total_requests * 100:.2f}%\n" f" • Avg Throughput: {result.requests_per_second:.2f} req/s\n" f" • Response Time (50/95/99): {result.p50_response_time:.2f}s/" f"{result.p95_response_time:.2f}s/{result.p99_response_time:.2f}s\n" f" • Memory Growth: {result.memory_usage_mb:.1f}MB\n" f" • Error Rate: {result.errors_per_second:.4f} err/s" ) return result async def test_database_connection_pooling_under_load( self, optimized_data_provider, db_session ): """Test database connection pooling under concurrent load.""" # Generate backtest results to save to database engine = VectorBTEngine(data_provider=optimized_data_provider) test_symbols = ["DB_LOAD_1", "DB_LOAD_2", "DB_LOAD_3"] # Pre-generate results for database testing backtest_results = [] for symbol in test_symbols: result = await engine.run_backtest( symbol=symbol, strategy_type="sma_cross", parameters=STRATEGY_TEMPLATES["sma_cross"]["parameters"], start_date="2023-01-01", end_date="2023-12-31", ) backtest_results.append(result) # Test concurrent database operations async def concurrent_database_operations(operation_id: int) -> dict[str, Any]: """Simulate concurrent database save/retrieve operations.""" start_time = time.time() operations_completed = 0 errors = [] try: with BacktestPersistenceManager(session=db_session) as persistence: # Save operations for result in backtest_results: try: backtest_id = persistence.save_backtest_result( vectorbt_results=result, execution_time=2.0, notes=f"Load test operation {operation_id}", ) operations_completed += 1 # Retrieve operation retrieved = persistence.get_backtest_by_id(backtest_id) if retrieved: operations_completed += 1 except Exception as e: errors.append(str(e)) except Exception as e: errors.append(f"Session error: {str(e)}") operation_time = time.time() - start_time return { "operation_id": operation_id, "operations_completed": operations_completed, "errors": errors, "operation_time": operation_time, } # Run concurrent database operations concurrent_operations = 20 db_tasks = [ concurrent_database_operations(i) for i in range(concurrent_operations) ] start_time = time.time() db_results = await asyncio.gather(*db_tasks, return_exceptions=True) total_time = time.time() - start_time # Analyze database performance under load successful_operations = [r for r in db_results if isinstance(r, dict)] failed_operations = len(db_results) - len(successful_operations) total_operations = sum(r["operations_completed"] for r in successful_operations) total_errors = sum(len(r["errors"]) for r in successful_operations) avg_operation_time = statistics.mean( [r["operation_time"] for r in successful_operations] ) db_throughput = total_operations / total_time if total_time > 0 else 0 error_rate = total_errors / total_operations if total_operations > 0 else 0 logger.info( f"Database Load Test Results:\n" f" • Concurrent Operations: {concurrent_operations}\n" f" • Successful Sessions: {len(successful_operations)}\n" f" • Failed Sessions: {failed_operations}\n" f" • Total DB Operations: {total_operations}\n" f" • DB Throughput: {db_throughput:.2f} ops/s\n" f" • Error Rate: {error_rate:.3%}\n" f" • Avg Operation Time: {avg_operation_time:.3f}s" ) # Database performance assertions assert len(successful_operations) / len(db_results) >= 0.9, ( "DB session success rate too low" ) assert error_rate <= 0.05, f"DB error rate too high: {error_rate:.3%}" assert db_throughput >= 5.0, f"DB throughput too low: {db_throughput:.2f} ops/s" return { "concurrent_operations": concurrent_operations, "db_throughput": db_throughput, "error_rate": error_rate, "avg_operation_time": avg_operation_time, } if __name__ == "__main__": # Run load testing suite pytest.main( [ __file__, "-v", "--tb=short", "--asyncio-mode=auto", "--timeout=600", # 10 minute timeout for load tests "--durations=10", ] )

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wshobson/maverick-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server