Skip to main content
Glama

MaverickMCP

by wshobson
MIT License
165
  • Apple
test_full_backtest_workflow.py26.4 kB
""" Comprehensive end-to-end integration tests for VectorBT backtesting workflow. Tests cover: - Full workflow integration from data fetching to result visualization - LangGraph workflow orchestration with real agents - Database persistence with real PostgreSQL operations - Chart generation and visualization pipeline - ML strategy integration with adaptive learning - Performance benchmarks for complete workflow - Error recovery and resilience testing - Concurrent workflow execution - Resource cleanup and memory management - Cache integration and optimization """ import asyncio import base64 import logging from datetime import datetime from unittest.mock import Mock, patch from uuid import UUID import numpy as np import pandas as pd import pytest from maverick_mcp.backtesting.persistence import ( BacktestPersistenceManager, ) from maverick_mcp.backtesting.vectorbt_engine import VectorBTEngine from maverick_mcp.backtesting.visualization import ( generate_equity_curve, generate_performance_dashboard, ) from maverick_mcp.providers.stock_data import EnhancedStockDataProvider from maverick_mcp.workflows.backtesting_workflow import BacktestingWorkflow logger = logging.getLogger(__name__) class TestFullBacktestWorkflowIntegration: """Integration tests for complete backtesting workflow.""" @pytest.fixture async def mock_stock_data_provider(self): """Create a mock stock data provider with realistic data.""" provider = Mock(spec=EnhancedStockDataProvider) # Generate realistic stock data dates = pd.date_range(start="2023-01-01", end="2023-12-31", freq="D") returns = np.random.normal(0.0008, 0.02, len(dates)) # ~20% annual volatility prices = 150 * np.cumprod(1 + returns) # Start at $150 volumes = np.random.randint(1000000, 10000000, len(dates)) stock_data = pd.DataFrame( { "Open": prices * np.random.uniform(0.99, 1.01, len(dates)), "High": prices * np.random.uniform(1.00, 1.03, len(dates)), "Low": prices * np.random.uniform(0.97, 1.00, len(dates)), "Close": prices, "Volume": volumes, "Adj Close": prices, }, index=dates, ) # Ensure OHLC constraints stock_data["High"] = np.maximum( stock_data["High"], np.maximum(stock_data["Open"], stock_data["Close"]) ) stock_data["Low"] = np.minimum( stock_data["Low"], np.minimum(stock_data["Open"], stock_data["Close"]) ) provider.get_stock_data.return_value = stock_data return provider @pytest.fixture async def vectorbt_engine(self, mock_stock_data_provider): """Create VectorBT engine with mocked data provider.""" engine = VectorBTEngine(data_provider=mock_stock_data_provider) return engine @pytest.fixture def workflow_with_real_agents(self): """Create workflow with real agents (not mocked).""" return BacktestingWorkflow() async def test_complete_workflow_execution( self, workflow_with_real_agents, db_session, benchmark_timer ): """Test complete workflow from start to finish with database persistence.""" with benchmark_timer() as timer: # Execute intelligent backtest result = await workflow_with_real_agents.run_intelligent_backtest( symbol="AAPL", start_date="2023-01-01", end_date="2023-12-31", initial_capital=10000.0, ) # Test basic result structure assert "symbol" in result assert result["symbol"] == "AAPL" assert "execution_metadata" in result assert "market_analysis" in result assert "strategy_selection" in result assert "recommendation" in result # Test execution metadata metadata = result["execution_metadata"] assert "total_execution_time_ms" in metadata assert "workflow_completed" in metadata assert "steps_completed" in metadata # Test performance requirements assert timer.elapsed < 60.0 # Should complete within 1 minute assert metadata["total_execution_time_ms"] > 0 # Test that meaningful analysis occurred market_analysis = result["market_analysis"] assert "regime" in market_analysis assert "regime_confidence" in market_analysis strategy_selection = result["strategy_selection"] assert "selected_strategies" in strategy_selection assert "selection_reasoning" in strategy_selection recommendation = result["recommendation"] assert "recommended_strategy" in recommendation assert "recommendation_confidence" in recommendation logger.info(f"Complete workflow executed in {timer.elapsed:.2f}s") async def test_workflow_with_persistence_integration( self, workflow_with_real_agents, db_session, sample_vectorbt_results ): """Test workflow integration with database persistence.""" # First run the workflow result = await workflow_with_real_agents.run_intelligent_backtest( symbol="TSLA", start_date="2023-01-01", end_date="2023-12-31" ) # Simulate saving backtest results to database with BacktestPersistenceManager(session=db_session) as persistence: # Modify sample results to match workflow output sample_vectorbt_results["symbol"] = "TSLA" sample_vectorbt_results["strategy"] = result["recommendation"][ "recommended_strategy" ] backtest_id = persistence.save_backtest_result( vectorbt_results=sample_vectorbt_results, execution_time=result["execution_metadata"]["total_execution_time_ms"] / 1000, notes=f"Intelligent backtest - {result['recommendation']['recommendation_confidence']:.2%} confidence", ) # Verify persistence assert backtest_id is not None assert UUID(backtest_id) # Retrieve and verify saved_result = persistence.get_backtest_by_id(backtest_id) assert saved_result is not None assert saved_result.symbol == "TSLA" assert ( saved_result.strategy_type == result["recommendation"]["recommended_strategy"] ) async def test_workflow_with_visualization_integration( self, workflow_with_real_agents, sample_vectorbt_results ): """Test workflow integration with visualization generation.""" # Run workflow result = await workflow_with_real_agents.run_intelligent_backtest( symbol="NVDA", start_date="2023-01-01", end_date="2023-12-31" ) # Generate visualizations based on workflow results equity_curve_data = pd.Series(sample_vectorbt_results["equity_curve"]) drawdown_data = pd.Series(sample_vectorbt_results["drawdown_series"]) # Test equity curve generation equity_chart = generate_equity_curve( equity_curve_data, drawdown=drawdown_data, title=f"NVDA - {result['recommendation']['recommended_strategy']} Strategy", ) assert isinstance(equity_chart, str) assert len(equity_chart) > 100 # Verify base64 image try: decoded_bytes = base64.b64decode(equity_chart) assert decoded_bytes.startswith(b"\x89PNG") except Exception as e: pytest.fail(f"Invalid chart generation: {e}") # Test performance dashboard dashboard_metrics = { "Strategy": result["recommendation"]["recommended_strategy"], "Confidence": f"{result['recommendation']['recommendation_confidence']:.1%}", "Market Regime": result["market_analysis"]["regime"], "Regime Confidence": f"{result['market_analysis']['regime_confidence']:.1%}", "Total Return": sample_vectorbt_results["metrics"]["total_return"], "Sharpe Ratio": sample_vectorbt_results["metrics"]["sharpe_ratio"], "Max Drawdown": sample_vectorbt_results["metrics"]["max_drawdown"], } dashboard_chart = generate_performance_dashboard( dashboard_metrics, title="Intelligent Backtest Results" ) assert isinstance(dashboard_chart, str) assert len(dashboard_chart) > 100 async def test_workflow_with_ml_strategy_integration( self, workflow_with_real_agents, mock_stock_data_provider ): """Test workflow integration with ML-enhanced strategies.""" # Mock the workflow to use ML strategies with patch.object( workflow_with_real_agents.strategy_selector, "select_strategies" ) as mock_selector: async def mock_select_with_ml(state): state.selected_strategies = ["adaptive_momentum", "online_learning"] state.strategy_selection_confidence = 0.85 state.strategy_selection_reasoning = ( "ML strategies selected for volatile market conditions" ) return state mock_selector.side_effect = mock_select_with_ml result = await workflow_with_real_agents.run_intelligent_backtest( symbol="AMZN", start_date="2023-01-01", end_date="2023-12-31" ) # Verify ML strategy integration assert ( "adaptive_momentum" in result["strategy_selection"]["selected_strategies"] or "online_learning" in result["strategy_selection"]["selected_strategies"] ) assert ( "ML" in result["strategy_selection"]["selection_reasoning"] or "adaptive" in result["strategy_selection"]["selection_reasoning"] ) async def test_vectorbt_engine_integration(self, vectorbt_engine): """Test VectorBT engine integration with workflow.""" # Test data fetching data = await vectorbt_engine.get_historical_data( symbol="MSFT", start_date="2023-01-01", end_date="2023-12-31" ) assert isinstance(data, pd.DataFrame) assert len(data) > 0 # Check if required columns exist (data should already have lowercase columns) required_cols = ["open", "high", "low", "close"] actual_cols = list(data.columns) missing_cols = [col for col in required_cols if col not in actual_cols] assert all(col in actual_cols for col in required_cols), ( f"Missing columns: {missing_cols}" ) # Test backtest execution backtest_result = await vectorbt_engine.run_backtest( symbol="MSFT", strategy_type="sma_crossover", parameters={"fast_window": 10, "slow_window": 20}, start_date="2023-01-01", end_date="2023-12-31", ) assert isinstance(backtest_result, dict) assert "symbol" in backtest_result assert "metrics" in backtest_result assert "equity_curve" in backtest_result async def test_error_recovery_integration(self, workflow_with_real_agents): """Test error recovery in integrated workflow.""" # Test with invalid symbol result = await workflow_with_real_agents.run_intelligent_backtest( symbol="INVALID_SYMBOL", start_date="2023-01-01", end_date="2023-12-31" ) # Should handle gracefully assert "error" in result or "execution_metadata" in result if "execution_metadata" in result: assert result["execution_metadata"]["workflow_completed"] is False # Test with invalid date range result = await workflow_with_real_agents.run_intelligent_backtest( symbol="AAPL", start_date="2025-01-01", # Future date end_date="2025-12-31", ) # Should handle gracefully assert isinstance(result, dict) async def test_concurrent_workflow_execution( self, workflow_with_real_agents, benchmark_timer ): """Test concurrent execution of multiple complete workflows.""" symbols = ["AAPL", "GOOGL", "MSFT", "TSLA"] async def run_workflow(symbol): return await workflow_with_real_agents.run_intelligent_backtest( symbol=symbol, start_date="2023-01-01", end_date="2023-12-31" ) with benchmark_timer() as timer: # Run workflows concurrently results = await asyncio.gather( *[run_workflow(symbol) for symbol in symbols], return_exceptions=True ) # Test all completed assert len(results) == len(symbols) # Test no exceptions successful_results = [] for i, result in enumerate(results): if not isinstance(result, Exception): successful_results.append(result) assert result["symbol"] == symbols[i] else: logger.warning(f"Workflow failed for {symbols[i]}: {result}") # At least half should succeed in concurrent execution assert len(successful_results) >= len(symbols) // 2 # Test reasonable execution time for concurrent runs assert timer.elapsed < 120.0 # Should complete within 2 minutes logger.info( f"Concurrent workflows completed: {len(successful_results)}/{len(symbols)} in {timer.elapsed:.2f}s" ) async def test_performance_benchmarks_integration( self, workflow_with_real_agents, benchmark_timer ): """Test performance benchmarks for integrated workflow.""" performance_results = {} # Test quick analysis performance with benchmark_timer() as timer: quick_result = await workflow_with_real_agents.run_quick_analysis( symbol="AAPL", start_date="2023-01-01", end_date="2023-12-31" ) assert isinstance(quick_result, dict) assert quick_result.get("symbol") == "AAPL" quick_time = timer.elapsed # Test full workflow performance with benchmark_timer() as timer: full_result = await workflow_with_real_agents.run_intelligent_backtest( symbol="AAPL", start_date="2023-01-01", end_date="2023-12-31" ) full_time = timer.elapsed # Performance requirements assert quick_time < 10.0 # Quick analysis < 10 seconds assert full_time < 60.0 # Full workflow < 1 minute assert quick_time < full_time # Quick should be faster than full performance_results["quick_analysis"] = quick_time performance_results["full_workflow"] = full_time # Test workflow status tracking performance if "workflow_completed" in full_result.get("execution_metadata", {}): workflow_status = workflow_with_real_agents.get_workflow_status( full_result.get( "_internal_state", Mock( workflow_status="completed", current_step="finalized", steps_completed=[ "initialization", "market_analysis", "strategy_selection", ], errors_encountered=[], validation_warnings=[], total_execution_time_ms=full_time * 1000, recommended_strategy=full_result.get("recommendation", {}).get( "recommended_strategy", "unknown" ), recommendation_confidence=full_result.get( "recommendation", {} ).get("recommendation_confidence", 0.0), ), ) ) assert workflow_status["progress_percentage"] >= 0 assert workflow_status["progress_percentage"] <= 100 logger.info(f"Performance benchmarks: {performance_results}") async def test_resource_cleanup_integration(self, workflow_with_real_agents): """Test resource cleanup after workflow completion.""" import os import psutil process = psutil.Process(os.getpid()) initial_memory = process.memory_info().rss initial_threads = process.num_threads() # Run multiple workflows for i in range(3): result = await workflow_with_real_agents.run_intelligent_backtest( symbol=f"TEST_{i}", # Use different symbols start_date="2023-01-01", end_date="2023-12-31", ) assert isinstance(result, dict) # Check resource usage after completion final_memory = process.memory_info().rss final_threads = process.num_threads() memory_growth = (final_memory - initial_memory) / 1024 / 1024 # MB thread_growth = final_threads - initial_threads # Memory growth should be reasonable assert memory_growth < 200 # < 200MB growth # Thread count should not grow excessively assert thread_growth <= 5 # Allow some thread growth logger.info( f"Resource usage: Memory +{memory_growth:.1f}MB, Threads +{thread_growth}" ) async def test_cache_optimization_integration(self, workflow_with_real_agents): """Test cache optimization in integrated workflow.""" # First run - should populate cache start_time1 = datetime.now() result1 = await workflow_with_real_agents.run_intelligent_backtest( symbol="CACHE_TEST", start_date="2023-01-01", end_date="2023-12-31" ) time1 = (datetime.now() - start_time1).total_seconds() # Second run - should use cache start_time2 = datetime.now() result2 = await workflow_with_real_agents.run_intelligent_backtest( symbol="CACHE_TEST", start_date="2023-01-01", end_date="2023-12-31" ) time2 = (datetime.now() - start_time2).total_seconds() # Both should complete successfully assert isinstance(result1, dict) assert isinstance(result2, dict) # Second run might be faster due to caching (though not guaranteed) # We mainly test that caching doesn't break functionality assert result1["symbol"] == result2["symbol"] == "CACHE_TEST" logger.info(f"Cache test: First run {time1:.2f}s, Second run {time2:.2f}s") class TestWorkflowErrorResilience: """Test workflow resilience under various error conditions.""" async def test_database_failure_resilience(self, workflow_with_real_agents): """Test workflow resilience when database operations fail.""" with patch( "maverick_mcp.backtesting.persistence.SessionLocal", side_effect=Exception("Database unavailable"), ): # Workflow should still complete even if persistence fails result = await workflow_with_real_agents.run_intelligent_backtest( symbol="DB_FAIL_TEST", start_date="2023-01-01", end_date="2023-12-31" ) # Should get a result even if database persistence failed assert isinstance(result, dict) assert "symbol" in result async def test_external_api_failure_resilience(self, workflow_with_real_agents): """Test workflow resilience when external APIs fail.""" # Mock external API failures with patch( "maverick_mcp.providers.stock_data.EnhancedStockDataProvider.get_stock_data", side_effect=Exception("API rate limit exceeded"), ): result = await workflow_with_real_agents.run_intelligent_backtest( symbol="API_FAIL_TEST", start_date="2023-01-01", end_date="2023-12-31" ) # Should handle API failure gracefully assert isinstance(result, dict) # Should either have an error field or fallback behavior assert "error" in result or "execution_metadata" in result async def test_memory_pressure_resilience(self, workflow_with_real_agents): """Test workflow resilience under memory pressure.""" # Simulate memory pressure by creating large objects memory_pressure = [] try: # Create memory pressure (but not too much to crash the test) for _ in range(10): large_array = np.random.random((1000, 1000)) # ~8MB each memory_pressure.append(large_array) # Run workflow under memory pressure result = await workflow_with_real_agents.run_intelligent_backtest( symbol="MEMORY_TEST", start_date="2023-01-01", end_date="2023-12-31" ) assert isinstance(result, dict) assert "symbol" in result finally: # Clean up memory pressure del memory_pressure async def test_timeout_handling(self, workflow_with_real_agents): """Test workflow timeout handling.""" # Create a workflow with very short timeout with patch.object(asyncio, "wait_for") as mock_wait_for: mock_wait_for.side_effect = TimeoutError("Workflow timed out") try: result = await workflow_with_real_agents.run_intelligent_backtest( symbol="TIMEOUT_TEST", start_date="2023-01-01", end_date="2023-12-31", ) # If we get here, timeout was handled assert isinstance(result, dict) except TimeoutError: # Timeout occurred - this is also acceptable behavior pass class TestWorkflowValidation: """Test workflow validation and data integrity.""" async def test_input_validation(self, workflow_with_real_agents): """Test input parameter validation.""" # Test invalid symbol result = await workflow_with_real_agents.run_intelligent_backtest( symbol="", # Empty symbol start_date="2023-01-01", end_date="2023-12-31", ) assert "error" in result or ( "execution_metadata" in result and not result["execution_metadata"]["workflow_completed"] ) # Test invalid date range result = await workflow_with_real_agents.run_intelligent_backtest( symbol="AAPL", start_date="2023-12-31", # Start after end end_date="2023-01-01", ) assert isinstance(result, dict) # Should handle gracefully async def test_output_validation(self, workflow_with_real_agents): """Test output structure validation.""" result = await workflow_with_real_agents.run_intelligent_backtest( symbol="VALIDATE_TEST", start_date="2023-01-01", end_date="2023-12-31" ) # Validate required fields required_fields = ["symbol", "execution_metadata"] for field in required_fields: assert field in result, f"Missing required field: {field}" # Validate execution metadata structure metadata = result["execution_metadata"] required_metadata = ["total_execution_time_ms", "workflow_completed"] for field in required_metadata: assert field in metadata, f"Missing metadata field: {field}" # Validate data types assert isinstance(metadata["total_execution_time_ms"], (int, float)) assert isinstance(metadata["workflow_completed"], bool) if "recommendation" in result: recommendation = result["recommendation"] assert "recommended_strategy" in recommendation assert "recommendation_confidence" in recommendation assert isinstance(recommendation["recommendation_confidence"], (int, float)) assert 0.0 <= recommendation["recommendation_confidence"] <= 1.0 async def test_data_consistency(self, workflow_with_real_agents, db_session): """Test data consistency across workflow components.""" symbol = "CONSISTENCY_TEST" result = await workflow_with_real_agents.run_intelligent_backtest( symbol=symbol, start_date="2023-01-01", end_date="2023-12-31" ) # Test symbol consistency assert result["symbol"] == symbol # If workflow completed successfully, all components should be consistent if result["execution_metadata"]["workflow_completed"]: # Market analysis should be consistent if "market_analysis" in result: market_analysis = result["market_analysis"] assert "regime" in market_analysis assert isinstance( market_analysis.get("regime_confidence", 0), (int, float) ) # Strategy selection should be consistent if "strategy_selection" in result: strategy_selection = result["strategy_selection"] selected_strategies = strategy_selection.get("selected_strategies", []) assert isinstance(selected_strategies, list) # Recommendation should be consistent with selection if "recommendation" in result and "strategy_selection" in result: recommended = result["recommendation"]["recommended_strategy"] if recommended and selected_strategies: # Recommended strategy should be from selected strategies # (though fallback behavior might select others) pass # Allow flexibility for fallback scenarios if __name__ == "__main__": # Run integration tests with extended timeout pytest.main( [ __file__, "-v", "--tb=short", "--asyncio-mode=auto", "--timeout=300", # 5 minute timeout for integration tests "-x", # Stop on first failure ] )

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wshobson/maverick-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server