Skip to main content
Glama

MaverickMCP

by wshobson
MIT License
165
  • Apple
optimizer_agent.py21.8 kB
""" Optimizer Agent for intelligent parameter optimization. This agent performs regime-aware parameter optimization for selected strategies, using adaptive grid sizes and optimization metrics based on market conditions. """ import asyncio import logging from datetime import datetime, timedelta from typing import Any from maverick_mcp.backtesting import StrategyOptimizer, VectorBTEngine from maverick_mcp.backtesting.strategies.templates import get_strategy_info from maverick_mcp.workflows.state import BacktestingWorkflowState logger = logging.getLogger(__name__) class OptimizerAgent: """Intelligent parameter optimizer with regime-aware optimization.""" def __init__( self, vectorbt_engine: VectorBTEngine | None = None, strategy_optimizer: StrategyOptimizer | None = None, ): """Initialize optimizer agent. Args: vectorbt_engine: VectorBT backtesting engine strategy_optimizer: Strategy optimization engine """ self.engine = vectorbt_engine or VectorBTEngine() self.optimizer = strategy_optimizer or StrategyOptimizer(self.engine) # Optimization configurations for different regimes self.REGIME_OPTIMIZATION_CONFIG = { "trending": { "optimization_metric": "total_return", # Focus on capturing trends "grid_size": "medium", "min_trades": 10, "max_drawdown_limit": 0.25, }, "ranging": { "optimization_metric": "sharpe_ratio", # Focus on risk-adjusted returns "grid_size": "fine", # More precision needed for ranging markets "min_trades": 15, "max_drawdown_limit": 0.15, }, "volatile": { "optimization_metric": "calmar_ratio", # Risk-adjusted for volatility "grid_size": "coarse", # Avoid overfitting in volatile conditions "min_trades": 8, "max_drawdown_limit": 0.35, }, "volatile_trending": { "optimization_metric": "sortino_ratio", # Focus on downside risk "grid_size": "medium", "min_trades": 10, "max_drawdown_limit": 0.30, }, "low_volume": { "optimization_metric": "win_rate", # Consistency important in low volume "grid_size": "medium", "min_trades": 12, "max_drawdown_limit": 0.20, }, "unknown": { "optimization_metric": "sharpe_ratio", # Balanced approach "grid_size": "medium", "min_trades": 10, "max_drawdown_limit": 0.20, }, } # Strategy-specific optimization parameters self.STRATEGY_PARAM_RANGES = { "sma_cross": { "fast_period": { "coarse": [5, 10, 15], "medium": [5, 8, 10, 12, 15, 20], "fine": list(range(5, 21)), }, "slow_period": { "coarse": [20, 30, 50], "medium": [20, 25, 30, 40, 50], "fine": list(range(20, 51, 5)), }, }, "rsi": { "period": { "coarse": [10, 14, 21], "medium": [10, 12, 14, 16, 21], "fine": list(range(8, 25)), }, "oversold": { "coarse": [25, 30, 35], "medium": [20, 25, 30, 35], "fine": list(range(15, 36, 5)), }, "overbought": { "coarse": [65, 70, 75], "medium": [65, 70, 75, 80], "fine": list(range(65, 86, 5)), }, }, "macd": { "fast_period": { "coarse": [8, 12, 16], "medium": [8, 10, 12, 14, 16], "fine": list(range(8, 17)), }, "slow_period": { "coarse": [21, 26, 32], "medium": [21, 24, 26, 28, 32], "fine": list(range(21, 35)), }, "signal_period": { "coarse": [6, 9, 12], "medium": [6, 8, 9, 10, 12], "fine": list(range(6, 15)), }, }, "bollinger": { "period": { "coarse": [15, 20, 25], "medium": [15, 18, 20, 22, 25], "fine": list(range(12, 28)), }, "std_dev": { "coarse": [1.5, 2.0, 2.5], "medium": [1.5, 1.8, 2.0, 2.2, 2.5], "fine": [1.0, 1.5, 1.8, 2.0, 2.2, 2.5, 3.0], }, }, "momentum": { "lookback": { "coarse": [10, 20, 30], "medium": [10, 15, 20, 25, 30], "fine": list(range(5, 31, 5)), }, "threshold": { "coarse": [0.03, 0.05, 0.08], "medium": [0.02, 0.03, 0.05, 0.07, 0.10], "fine": [0.01, 0.02, 0.03, 0.04, 0.05, 0.07, 0.10, 0.15], }, }, } logger.info("OptimizerAgent initialized") async def optimize_parameters( self, state: BacktestingWorkflowState ) -> BacktestingWorkflowState: """Optimize parameters for selected strategies. Args: state: Current workflow state with selected strategies Returns: Updated state with optimization results """ start_time = datetime.now() try: logger.info( f"Optimizing parameters for {len(state.selected_strategies)} strategies on {state.symbol}" ) # Get optimization configuration based on regime optimization_config = self._get_optimization_config( state.market_regime, state.regime_confidence ) # Generate parameter grids for each strategy parameter_grids = self._generate_parameter_grids( state.selected_strategies, optimization_config["grid_size"] ) # Optimize each strategy optimization_results = {} best_parameters = {} total_iterations = 0 # Use shorter timeframe for optimization to avoid overfitting opt_start_date = self._calculate_optimization_window( state.start_date, state.end_date ) for strategy in state.selected_strategies: try: logger.info(f"Optimizing {strategy} strategy...") param_grid = parameter_grids.get(strategy, {}) if not param_grid: logger.warning( f"No parameter grid for {strategy}, using defaults" ) continue # Run optimization result = await self.engine.optimize_parameters( symbol=state.symbol, strategy_type=strategy, param_grid=param_grid, start_date=opt_start_date, end_date=state.end_date, optimization_metric=optimization_config["optimization_metric"], initial_capital=state.initial_capital, top_n=min( 10, len(state.selected_strategies) * 2 ), # Adaptive top_n ) # Filter results by quality metrics filtered_result = self._filter_optimization_results( result, optimization_config ) optimization_results[strategy] = filtered_result best_parameters[strategy] = filtered_result.get( "best_parameters", {} ) total_iterations += filtered_result.get("valid_combinations", 0) logger.info( f"Optimized {strategy}: {filtered_result.get('best_metric_value', 0):.3f} {optimization_config['optimization_metric']}" ) except Exception as e: logger.error(f"Failed to optimize {strategy}: {e}") # Use default parameters as fallback strategy_info = get_strategy_info(strategy) best_parameters[strategy] = strategy_info.get("parameters", {}) state.fallback_strategies_used.append( f"{strategy}_optimization_fallback" ) # Update state state.optimization_config = optimization_config state.parameter_grids = parameter_grids state.optimization_results = optimization_results state.best_parameters = best_parameters state.optimization_iterations = total_iterations # Record execution time execution_time = (datetime.now() - start_time).total_seconds() * 1000 state.optimization_time_ms = execution_time # Update workflow status state.workflow_status = "validating" state.current_step = "optimization_completed" state.steps_completed.append("parameter_optimization") logger.info( f"Parameter optimization completed for {state.symbol}: " f"{total_iterations} combinations tested in {execution_time:.0f}ms" ) return state except Exception as e: error_info = { "step": "parameter_optimization", "error": str(e), "timestamp": datetime.now().isoformat(), "symbol": state.symbol, } state.errors_encountered.append(error_info) # Fallback to default parameters default_params = {} for strategy in state.selected_strategies: strategy_info = get_strategy_info(strategy) default_params[strategy] = strategy_info.get("parameters", {}) state.best_parameters = default_params state.fallback_strategies_used.append("optimization_fallback") logger.error(f"Parameter optimization failed for {state.symbol}: {e}") return state def _get_optimization_config( self, regime: str, regime_confidence: float ) -> dict[str, Any]: """Get optimization configuration based on market regime.""" base_config = self.REGIME_OPTIMIZATION_CONFIG.get( regime, self.REGIME_OPTIMIZATION_CONFIG["unknown"] ).copy() # Adjust grid size based on confidence if regime_confidence < 0.5: # Low confidence -> use coarser grid to avoid overfitting if base_config["grid_size"] == "fine": base_config["grid_size"] = "medium" elif base_config["grid_size"] == "medium": base_config["grid_size"] = "coarse" return base_config def _generate_parameter_grids( self, strategies: list[str], grid_size: str ) -> dict[str, dict[str, list]]: """Generate parameter grids for optimization.""" parameter_grids = {} for strategy in strategies: if strategy in self.STRATEGY_PARAM_RANGES: param_ranges = self.STRATEGY_PARAM_RANGES[strategy] grid = {} for param_name, size_ranges in param_ranges.items(): if grid_size in size_ranges: grid[param_name] = size_ranges[grid_size] else: # Fallback to medium if requested size not available grid[param_name] = size_ranges.get( "medium", size_ranges["coarse"] ) parameter_grids[strategy] = grid else: # For strategies not in our predefined ranges, use default minimal grid parameter_grids[strategy] = self._generate_default_grid( strategy, grid_size ) return parameter_grids def _generate_default_grid(self, strategy: str, grid_size: str) -> dict[str, list]: """Generate default parameter grid for unknown strategies.""" # Get strategy info to understand default parameters strategy_info = get_strategy_info(strategy) default_params = strategy_info.get("parameters", {}) grid = {} # Generate basic variations around default values for param_name, default_value in default_params.items(): if isinstance(default_value, int | float): if grid_size == "coarse": variations = [ default_value * 0.8, default_value, default_value * 1.2, ] elif grid_size == "fine": variations = [ default_value * 0.7, default_value * 0.8, default_value * 0.9, default_value, default_value * 1.1, default_value * 1.2, default_value * 1.3, ] else: # medium variations = [ default_value * 0.8, default_value * 0.9, default_value, default_value * 1.1, default_value * 1.2, ] # Convert back to appropriate type and filter valid values if isinstance(default_value, int): grid[param_name] = [max(1, int(v)) for v in variations] else: grid[param_name] = [max(0.001, v) for v in variations] else: # For non-numeric parameters, just use the default grid[param_name] = [default_value] return grid def _calculate_optimization_window(self, start_date: str, end_date: str) -> str: """Calculate optimization window to prevent overfitting.""" start_dt = datetime.strptime(start_date, "%Y-%m-%d") end_dt = datetime.strptime(end_date, "%Y-%m-%d") total_days = (end_dt - start_dt).days # Use 70% of data for optimization, leaving 30% for validation opt_days = int(total_days * 0.7) opt_start = end_dt - timedelta(days=opt_days) return opt_start.strftime("%Y-%m-%d") def _filter_optimization_results( self, result: dict[str, Any], optimization_config: dict[str, Any] ) -> dict[str, Any]: """Filter optimization results based on quality criteria.""" if "top_results" not in result or not result["top_results"]: return result # Quality filters min_trades = optimization_config.get("min_trades", 10) max_drawdown_limit = optimization_config.get("max_drawdown_limit", 0.25) # Filter results by quality criteria filtered_results = [] for res in result["top_results"]: # Check minimum trades if res.get("total_trades", 0) < min_trades: continue # Check maximum drawdown if abs(res.get("max_drawdown", 0)) > max_drawdown_limit: continue filtered_results.append(res) # If no results pass filters, relax criteria if not filtered_results and result["top_results"]: logger.warning("No results passed quality filters, relaxing criteria") # Take top results but with warning filtered_results = result["top_results"][:3] # Top 3 regardless of quality # Update result with filtered data filtered_result = result.copy() filtered_result["top_results"] = filtered_results if filtered_results: filtered_result["best_parameters"] = filtered_results[0]["parameters"] filtered_result["best_metric_value"] = filtered_results[0][ optimization_config["optimization_metric"] ] else: # Complete fallback filtered_result["best_parameters"] = {} filtered_result["best_metric_value"] = 0.0 return filtered_result def get_optimization_summary( self, state: BacktestingWorkflowState ) -> dict[str, Any]: """Get summary of optimization results.""" if not state.optimization_results: return {"summary": "No optimization results available"} summary = { "total_strategies": len(state.selected_strategies), "optimized_strategies": len(state.optimization_results), "total_iterations": state.optimization_iterations, "execution_time_ms": state.optimization_time_ms, "optimization_config": state.optimization_config, "strategy_results": {}, } for strategy, results in state.optimization_results.items(): if results: summary["strategy_results"][strategy] = { "best_metric": results.get("best_metric_value", 0), "metric_type": state.optimization_config.get( "optimization_metric", "unknown" ), "valid_combinations": results.get("valid_combinations", 0), "best_parameters": state.best_parameters.get(strategy, {}), } return summary async def parallel_optimization( self, state: BacktestingWorkflowState, max_concurrent: int = 3 ) -> BacktestingWorkflowState: """Run optimization for multiple strategies in parallel.""" if len(state.selected_strategies) <= 1: return await self.optimize_parameters(state) start_time = datetime.now() logger.info( f"Running parallel optimization for {len(state.selected_strategies)} strategies" ) # Create semaphore to limit concurrent optimizations semaphore = asyncio.Semaphore(max_concurrent) async def optimize_single_strategy(strategy: str) -> tuple[str, dict[str, Any]]: async with semaphore: try: optimization_config = self._get_optimization_config( state.market_regime, state.regime_confidence ) parameter_grids = self._generate_parameter_grids( [strategy], optimization_config["grid_size"] ) opt_start_date = self._calculate_optimization_window( state.start_date, state.end_date ) result = await self.engine.optimize_parameters( symbol=state.symbol, strategy_type=strategy, param_grid=parameter_grids.get(strategy, {}), start_date=opt_start_date, end_date=state.end_date, optimization_metric=optimization_config["optimization_metric"], initial_capital=state.initial_capital, top_n=10, ) filtered_result = self._filter_optimization_results( result, optimization_config ) return strategy, filtered_result except Exception as e: logger.error(f"Failed to optimize {strategy}: {e}") return strategy, {"error": str(e)} # Run optimizations in parallel tasks = [ optimize_single_strategy(strategy) for strategy in state.selected_strategies ] results = await asyncio.gather(*tasks, return_exceptions=True) # Process results optimization_results = {} best_parameters = {} total_iterations = 0 for result in results: if isinstance(result, Exception): logger.error(f"Parallel optimization failed: {result}") continue strategy, opt_result = result if "error" not in opt_result: optimization_results[strategy] = opt_result best_parameters[strategy] = opt_result.get("best_parameters", {}) total_iterations += opt_result.get("valid_combinations", 0) # Update state optimization_config = self._get_optimization_config( state.market_regime, state.regime_confidence ) state.optimization_config = optimization_config state.optimization_results = optimization_results state.best_parameters = best_parameters state.optimization_iterations = total_iterations # Record execution time execution_time = (datetime.now() - start_time).total_seconds() * 1000 state.optimization_time_ms = execution_time logger.info(f"Parallel optimization completed in {execution_time:.0f}ms") return state

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wshobson/maverick-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server