Skip to main content
Glama

MaverickMCP

by wshobson
MIT License
165
  • Apple
backtesting_workflow.py20.4 kB
""" Intelligent Backtesting Workflow using LangGraph. This workflow orchestrates market regime analysis, strategy selection, parameter optimization, and validation to provide intelligent, confidence-scored backtesting recommendations. """ import logging from datetime import datetime, timedelta from typing import Any from langchain_core.messages import HumanMessage from langgraph.graph import END, StateGraph from maverick_mcp.workflows.agents import ( MarketAnalyzerAgent, OptimizerAgent, StrategySelectorAgent, ValidatorAgent, ) from maverick_mcp.workflows.state import BacktestingWorkflowState logger = logging.getLogger(__name__) class BacktestingWorkflow: """Intelligent backtesting workflow orchestrator.""" def __init__( self, market_analyzer: MarketAnalyzerAgent | None = None, strategy_selector: StrategySelectorAgent | None = None, optimizer: OptimizerAgent | None = None, validator: ValidatorAgent | None = None, ): """Initialize backtesting workflow. Args: market_analyzer: Market regime analysis agent strategy_selector: Strategy selection agent optimizer: Parameter optimization agent validator: Results validation agent """ self.market_analyzer = market_analyzer or MarketAnalyzerAgent() self.strategy_selector = strategy_selector or StrategySelectorAgent() self.optimizer = optimizer or OptimizerAgent() self.validator = validator or ValidatorAgent() # Build the workflow graph self.workflow = self._build_workflow_graph() logger.info("BacktestingWorkflow initialized") def _build_workflow_graph(self) -> StateGraph: """Build the LangGraph workflow.""" # Define the workflow graph workflow = StateGraph(BacktestingWorkflowState) # Add nodes for each step workflow.add_node("initialize", self._initialize_workflow) workflow.add_node("analyze_market_regime", self._analyze_market_regime_node) workflow.add_node("select_strategies", self._select_strategies_node) workflow.add_node("optimize_parameters", self._optimize_parameters_node) workflow.add_node("validate_results", self._validate_results_node) workflow.add_node("finalize_workflow", self._finalize_workflow) # Define the workflow flow workflow.set_entry_point("initialize") # Sequential workflow with conditional routing workflow.add_edge("initialize", "analyze_market_regime") workflow.add_conditional_edges( "analyze_market_regime", self._should_proceed_after_market_analysis, { "continue": "select_strategies", "fallback": "finalize_workflow", }, ) workflow.add_conditional_edges( "select_strategies", self._should_proceed_after_strategy_selection, { "continue": "optimize_parameters", "fallback": "finalize_workflow", }, ) workflow.add_conditional_edges( "optimize_parameters", self._should_proceed_after_optimization, { "continue": "validate_results", "fallback": "finalize_workflow", }, ) workflow.add_edge("validate_results", "finalize_workflow") workflow.add_edge("finalize_workflow", END) return workflow.compile() async def run_intelligent_backtest( self, symbol: str, start_date: str | None = None, end_date: str | None = None, initial_capital: float = 10000.0, requested_strategy: str | None = None, ) -> dict[str, Any]: """Run intelligent backtesting workflow. Args: symbol: Stock symbol to analyze start_date: Start date (YYYY-MM-DD), defaults to 1 year ago end_date: End date (YYYY-MM-DD), defaults to today initial_capital: Starting capital for backtest requested_strategy: User-requested strategy (optional) Returns: Comprehensive backtesting results with recommendations """ start_time = datetime.now() try: logger.info(f"Starting intelligent backtest workflow for {symbol}") # Set default date range if not provided if not end_date: end_date = datetime.now().strftime("%Y-%m-%d") if not start_date: start_date = (datetime.now() - timedelta(days=365)).strftime("%Y-%m-%d") # Initialize workflow state initial_state = self._create_initial_state( symbol=symbol, start_date=start_date, end_date=end_date, initial_capital=initial_capital, requested_strategy=requested_strategy, ) # Run the workflow final_state = await self.workflow.ainvoke(initial_state) # Convert state to results dictionary results = self._format_results(final_state) # Add execution metadata total_execution_time = (datetime.now() - start_time).total_seconds() * 1000 results["execution_metadata"] = { "total_execution_time_ms": total_execution_time, "workflow_completed": final_state.workflow_status == "completed", "steps_completed": final_state.steps_completed, "errors_encountered": final_state.errors_encountered, "fallback_strategies_used": final_state.fallback_strategies_used, } logger.info( f"Intelligent backtest completed for {symbol} in {total_execution_time:.0f}ms: " f"{final_state.recommended_strategy} recommended with {final_state.recommendation_confidence:.1%} confidence" ) return results except Exception as e: logger.error(f"Intelligent backtest failed for {symbol}: {e}") return { "symbol": symbol, "error": str(e), "execution_metadata": { "total_execution_time_ms": ( datetime.now() - start_time ).total_seconds() * 1000, "workflow_completed": False, }, } def _create_initial_state( self, symbol: str, start_date: str, end_date: str, initial_capital: float, requested_strategy: str | None, ) -> BacktestingWorkflowState: """Create initial workflow state.""" return BacktestingWorkflowState( # Base agent state messages=[ HumanMessage(content=f"Analyze backtesting opportunities for {symbol}") ], session_id=f"backtest_{symbol}_{datetime.now().isoformat()}", persona="intelligent_backtesting_agent", timestamp=datetime.now(), token_count=0, error=None, analyzed_stocks={}, key_price_levels={}, last_analysis_time={}, conversation_context={}, execution_time_ms=None, api_calls_made=0, cache_hits=0, cache_misses=0, # Input parameters symbol=symbol, start_date=start_date, end_date=end_date, initial_capital=initial_capital, requested_strategy=requested_strategy, # Market regime analysis (initialized) market_regime="unknown", regime_confidence=0.0, regime_indicators={}, regime_analysis_time_ms=0.0, volatility_percentile=0.0, trend_strength=0.0, market_conditions={}, sector_performance={}, correlation_to_market=0.0, volume_profile={}, support_resistance_levels=[], # Strategy selection (initialized) candidate_strategies=[], strategy_rankings={}, selected_strategies=[], strategy_selection_reasoning="", strategy_selection_confidence=0.0, # Parameter optimization (initialized) optimization_config={}, parameter_grids={}, optimization_results={}, best_parameters={}, optimization_time_ms=0.0, optimization_iterations=0, # Validation (initialized) walk_forward_results={}, monte_carlo_results={}, out_of_sample_performance={}, robustness_score={}, validation_warnings=[], # Final recommendations (initialized) final_strategy_ranking=[], recommended_strategy="", recommended_parameters={}, recommendation_confidence=0.0, risk_assessment={}, # Performance metrics (initialized) comparative_metrics={}, benchmark_comparison={}, risk_adjusted_performance={}, drawdown_analysis={}, # Workflow control (initialized) workflow_status="initializing", current_step="initialization", steps_completed=[], total_execution_time_ms=0.0, # Error handling (initialized) errors_encountered=[], fallback_strategies_used=[], data_quality_issues=[], # Caching (initialized) cached_results={}, cache_hit_rate=0.0, # Advanced analysis (initialized) regime_transition_analysis={}, multi_timeframe_analysis={}, correlation_analysis={}, macroeconomic_context={}, ) async def _initialize_workflow( self, state: BacktestingWorkflowState ) -> BacktestingWorkflowState: """Initialize the workflow and validate inputs.""" logger.info(f"Initializing backtesting workflow for {state.symbol}") # Validate inputs if not state.symbol: state.errors_encountered.append( { "step": "initialization", "error": "Symbol is required", "timestamp": datetime.now().isoformat(), } ) state.workflow_status = "failed" return state # Update workflow state state.workflow_status = "analyzing_regime" state.current_step = "initialization_completed" state.steps_completed.append("initialization") logger.info(f"Workflow initialized for {state.symbol}") return state async def _analyze_market_regime_node( self, state: BacktestingWorkflowState ) -> BacktestingWorkflowState: """Market regime analysis node.""" return await self.market_analyzer.analyze_market_regime(state) async def _select_strategies_node( self, state: BacktestingWorkflowState ) -> BacktestingWorkflowState: """Strategy selection node.""" return await self.strategy_selector.select_strategies(state) async def _optimize_parameters_node( self, state: BacktestingWorkflowState ) -> BacktestingWorkflowState: """Parameter optimization node.""" return await self.optimizer.optimize_parameters(state) async def _validate_results_node( self, state: BacktestingWorkflowState ) -> BacktestingWorkflowState: """Results validation node.""" return await self.validator.validate_strategies(state) async def _finalize_workflow( self, state: BacktestingWorkflowState ) -> BacktestingWorkflowState: """Finalize the workflow and prepare results.""" if state.workflow_status != "completed": # Handle incomplete workflow if not state.recommended_strategy and state.best_parameters: # Select first available strategy as fallback state.recommended_strategy = list(state.best_parameters.keys())[0] state.recommended_parameters = state.best_parameters[ state.recommended_strategy ] state.recommendation_confidence = 0.3 state.fallback_strategies_used.append("incomplete_workflow_fallback") state.current_step = "workflow_finalized" logger.info(f"Workflow finalized for {state.symbol}") return state def _should_proceed_after_market_analysis( self, state: BacktestingWorkflowState ) -> str: """Decide whether to proceed after market analysis.""" if state.errors_encountered and any( "market_regime_analysis" in err.get("step", "") for err in state.errors_encountered ): return "fallback" if state.market_regime == "unknown" and state.regime_confidence < 0.1: return "fallback" return "continue" def _should_proceed_after_strategy_selection( self, state: BacktestingWorkflowState ) -> str: """Decide whether to proceed after strategy selection.""" if not state.selected_strategies: return "fallback" if state.strategy_selection_confidence < 0.2: return "fallback" return "continue" def _should_proceed_after_optimization( self, state: BacktestingWorkflowState ) -> str: """Decide whether to proceed after optimization.""" if not state.best_parameters: return "fallback" return "continue" def _format_results(self, state: BacktestingWorkflowState) -> dict[str, Any]: """Format final results for output.""" return { "symbol": state.symbol, "period": { "start_date": state.start_date, "end_date": state.end_date, "initial_capital": state.initial_capital, }, "market_analysis": { "regime": state.market_regime, "regime_confidence": state.regime_confidence, "regime_indicators": state.regime_indicators, "volatility_percentile": state.volatility_percentile, "trend_strength": state.trend_strength, "market_conditions": state.market_conditions, "support_resistance_levels": state.support_resistance_levels, }, "strategy_selection": { "selected_strategies": state.selected_strategies, "strategy_rankings": state.strategy_rankings, "selection_reasoning": state.strategy_selection_reasoning, "selection_confidence": state.strategy_selection_confidence, "candidate_strategies": state.candidate_strategies, }, "optimization": { "optimization_config": state.optimization_config, "best_parameters": state.best_parameters, "optimization_iterations": state.optimization_iterations, "optimization_time_ms": state.optimization_time_ms, }, "validation": { "robustness_scores": state.robustness_score, "validation_warnings": state.validation_warnings, "out_of_sample_performance": state.out_of_sample_performance, }, "recommendation": { "recommended_strategy": state.recommended_strategy, "recommended_parameters": state.recommended_parameters, "recommendation_confidence": state.recommendation_confidence, "final_strategy_ranking": state.final_strategy_ranking, "risk_assessment": state.risk_assessment, }, "performance_analysis": { "comparative_metrics": state.comparative_metrics, "benchmark_comparison": state.benchmark_comparison, "risk_adjusted_performance": state.risk_adjusted_performance, }, } async def run_quick_analysis( self, symbol: str, start_date: str | None = None, end_date: str | None = None, initial_capital: float = 10000.0, ) -> dict[str, Any]: """Run quick analysis with market regime detection and basic strategy recommendations. This is a faster alternative that skips parameter optimization and validation for rapid insights. Args: symbol: Stock symbol to analyze start_date: Start date (YYYY-MM-DD) end_date: End date (YYYY-MM-DD) initial_capital: Starting capital Returns: Quick analysis results with strategy recommendations """ start_time = datetime.now() try: logger.info(f"Running quick analysis for {symbol}") # Set default dates if not end_date: end_date = datetime.now().strftime("%Y-%m-%d") if not start_date: start_date = (datetime.now() - timedelta(days=365)).strftime("%Y-%m-%d") # Create initial state state = self._create_initial_state( symbol=symbol, start_date=start_date, end_date=end_date, initial_capital=initial_capital, requested_strategy=None, ) # Run market analysis state = await self.market_analyzer.analyze_market_regime(state) # Run strategy selection if state["market_regime"] != "unknown" or state["regime_confidence"] > 0.3: state = await self.strategy_selector.select_strategies(state) # Format quick results execution_time = (datetime.now() - start_time).total_seconds() * 1000 return { "symbol": symbol, "analysis_type": "quick_analysis", "market_regime": { "regime": state["market_regime"], "confidence": state["regime_confidence"], "trend_strength": state["trend_strength"], "volatility_percentile": state["volatility_percentile"], }, "recommended_strategies": state["selected_strategies"][:3], # Top 3 "strategy_fitness": { strategy: state["strategy_rankings"].get(strategy, 0) for strategy in state["selected_strategies"][:3] }, "market_conditions": state["market_conditions"], "selection_reasoning": state["strategy_selection_reasoning"], "execution_time_ms": execution_time, "data_quality": { "errors": len(state["errors_encountered"]), "warnings": state["data_quality_issues"], }, } except Exception as e: logger.error(f"Quick analysis failed for {symbol}: {e}") return { "symbol": symbol, "analysis_type": "quick_analysis", "error": str(e), "execution_time_ms": (datetime.now() - start_time).total_seconds() * 1000, } def get_workflow_status(self, state: BacktestingWorkflowState) -> dict[str, Any]: """Get current workflow status and progress.""" total_steps = 5 # initialize, analyze, select, optimize, validate completed_steps = len(state.steps_completed) return { "workflow_status": state.workflow_status, "current_step": state.current_step, "progress_percentage": (completed_steps / total_steps) * 100, "steps_completed": state.steps_completed, "errors_count": len(state.errors_encountered), "warnings_count": len(state.validation_warnings), "execution_time_ms": state.total_execution_time_ms, "recommended_strategy": state.recommended_strategy or "TBD", "recommendation_confidence": state.recommendation_confidence, }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wshobson/maverick-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server