Skip to main content
Glama

MaverickMCP

by wshobson
MIT License
165
  • Apple
adaptive.py30.1 kB
"""Adaptive trading strategies with online learning and parameter adjustment.""" import logging from typing import Any import numpy as np import pandas as pd from pandas import DataFrame, Series from sklearn.linear_model import SGDClassifier from sklearn.preprocessing import StandardScaler from maverick_mcp.backtesting.strategies.base import Strategy logger = logging.getLogger(__name__) class AdaptiveStrategy(Strategy): """Base class for adaptive strategies that adjust parameters based on performance.""" def __init__( self, base_strategy: Strategy, adaptation_method: str = "gradient", learning_rate: float = 0.01, lookback_period: int = 50, adaptation_frequency: int = 10, parameters: dict[str, Any] | None = None, ): """Initialize adaptive strategy. Args: base_strategy: Base strategy to adapt adaptation_method: Method for parameter adaptation learning_rate: Learning rate for parameter updates lookback_period: Period for performance evaluation adaptation_frequency: How often to adapt parameters parameters: Additional parameters """ super().__init__(parameters) self.base_strategy = base_strategy self.adaptation_method = adaptation_method self.learning_rate = learning_rate self.lookback_period = lookback_period self.adaptation_frequency = adaptation_frequency # Performance tracking self.performance_history = [] self.parameter_history = [] self.last_adaptation = 0 # Store original parameters for reference self.original_parameters = base_strategy.parameters.copy() @property def name(self) -> str: """Get strategy name.""" return f"Adaptive({self.base_strategy.name})" @property def description(self) -> str: """Get strategy description.""" return f"Adaptive version of {self.base_strategy.name} using {self.adaptation_method} method" def calculate_performance_metric(self, returns: Series) -> float: """Calculate performance metric for parameter adaptation. Args: returns: Strategy returns Returns: Performance score """ if len(returns) == 0: return 0.0 # Use Sharpe ratio as default performance metric if returns.std() == 0: return 0.0 sharpe = returns.mean() / returns.std() * np.sqrt(252) # Alternative metrics could be: # - Calmar ratio: returns.mean() / abs(max_drawdown) # - Sortino ratio: returns.mean() / downside_deviation # - Information ratio: excess_returns.mean() / tracking_error return sharpe def adapt_parameters_gradient(self, performance_gradient: float) -> None: """Adapt parameters using gradient-based method. Args: performance_gradient: Gradient of performance with respect to parameters """ adaptable_params = self.get_adaptable_parameters() for param_name, param_info in adaptable_params.items(): if param_name in self.base_strategy.parameters: current_value = self.base_strategy.parameters[param_name] # Calculate parameter update param_gradient = performance_gradient * self.learning_rate # Apply bounds and constraints min_val = param_info.get("min", current_value * 0.5) max_val = param_info.get("max", current_value * 2.0) step_size = param_info.get("step", abs(current_value) * 0.01) # Update parameter new_value = current_value + param_gradient * step_size new_value = max(min_val, min(max_val, new_value)) self.base_strategy.parameters[param_name] = new_value logger.debug( f"Adapted {param_name}: {current_value:.4f} -> {new_value:.4f}" ) def adapt_parameters_random_search(self) -> None: """Adapt parameters using random search with performance feedback.""" adaptable_params = self.get_adaptable_parameters() # Try random perturbations and keep improvements for param_name, param_info in adaptable_params.items(): if param_name in self.base_strategy.parameters: current_value = self.base_strategy.parameters[param_name] # Generate random perturbation min_val = param_info.get("min", current_value * 0.5) max_val = param_info.get("max", current_value * 2.0) # Small random step perturbation = np.random.normal(0, abs(current_value) * 0.1) new_value = current_value + perturbation new_value = max(min_val, min(max_val, new_value)) # Store new value for trial self.base_strategy.parameters[param_name] = new_value # Note: Performance evaluation would happen in next cycle # For now, keep the change and let performance tracking decide def get_adaptable_parameters(self) -> dict[str, dict]: """Get parameters that can be adapted. Returns: Dictionary of adaptable parameters with their constraints """ # Default adaptable parameters - can be overridden by subclasses return { "lookback_period": {"min": 5, "max": 200, "step": 1}, "threshold": {"min": 0.001, "max": 0.1, "step": 0.001}, "window": {"min": 5, "max": 100, "step": 1}, "period": {"min": 5, "max": 200, "step": 1}, } def adapt_parameters(self, recent_performance: float) -> None: """Adapt strategy parameters based on recent performance. Args: recent_performance: Recent performance metric """ try: if self.adaptation_method == "gradient": # Approximate gradient based on performance change if len(self.performance_history) > 1: performance_gradient = ( recent_performance - self.performance_history[-2] ) self.adapt_parameters_gradient(performance_gradient) elif self.adaptation_method == "random_search": # Use random search with performance feedback self.adapt_parameters_random_search() else: logger.warning(f"Unknown adaptation method: {self.adaptation_method}") # Store adapted parameters self.parameter_history.append(self.base_strategy.parameters.copy()) except Exception as e: logger.error(f"Error adapting parameters: {e}") def generate_signals(self, data: DataFrame) -> tuple[Series, Series]: """Generate adaptive trading signals. Args: data: Price data with OHLCV columns Returns: Tuple of (entry_signals, exit_signals) as boolean Series """ try: # Generate signals from base strategy entry_signals, exit_signals = self.base_strategy.generate_signals(data) # Calculate strategy performance for adaptation positions = entry_signals.astype(int) - exit_signals.astype(int) returns = positions.shift(1) * data["close"].pct_change() # Track performance over time for adaptation for idx in range( self.adaptation_frequency, len(data), self.adaptation_frequency ): if idx > self.last_adaptation + self.adaptation_frequency: # Calculate recent performance recent_returns = returns.iloc[ max(0, idx - self.lookback_period) : idx ] if len(recent_returns) > 0: recent_performance = self.calculate_performance_metric( recent_returns ) self.performance_history.append(recent_performance) # Adapt parameters based on performance self.adapt_parameters(recent_performance) # Re-generate signals with adapted parameters entry_signals, exit_signals = ( self.base_strategy.generate_signals(data) ) self.last_adaptation = idx return entry_signals, exit_signals except Exception as e: logger.error(f"Error generating adaptive signals: {e}") return pd.Series(False, index=data.index), pd.Series( False, index=data.index ) def get_adaptation_history(self) -> dict[str, Any]: """Get history of parameter adaptations. Returns: Dictionary with adaptation history """ return { "performance_history": self.performance_history, "parameter_history": self.parameter_history, "current_parameters": self.base_strategy.parameters, "original_parameters": self.original_parameters, } def reset_to_original(self) -> None: """Reset strategy parameters to original values.""" self.base_strategy.parameters = self.original_parameters.copy() self.performance_history = [] self.parameter_history = [] self.last_adaptation = 0 class OnlineLearningStrategy(Strategy): """Strategy with online learning capabilities using streaming ML algorithms.""" def __init__( self, model_type: str = "sgd", update_frequency: int = 10, feature_window: int = 20, confidence_threshold: float = 0.6, min_training_samples: int = 100, initial_training_period: int = 200, parameters: dict[str, Any] | None = None, ): """Initialize online learning strategy. Args: model_type: Type of online learning model update_frequency: How often to update the model feature_window: Window for feature calculation confidence_threshold: Minimum confidence for signals min_training_samples: Minimum samples before enabling predictions initial_training_period: Period for initial batch training parameters: Additional parameters """ super().__init__(parameters) self.model_type = model_type self.update_frequency = update_frequency self.feature_window = feature_window self.confidence_threshold = confidence_threshold self.min_training_samples = min_training_samples self.initial_training_period = initial_training_period # Initialize online learning components self.model = None self.scaler = None self.is_trained = False self.is_initial_trained = False self.training_buffer = [] self.last_update = 0 self.training_samples_count = 0 # Feature consistency tracking self.expected_feature_count = None self.feature_stats_buffer = [] self._initialize_model() def _initialize_model(self): """Initialize online learning model with proper configuration.""" if self.model_type == "sgd": self.model = SGDClassifier( loss="log_loss", learning_rate="adaptive", eta0=0.01, random_state=42, max_iter=1000, tol=1e-4, warm_start=True, # Enable incremental learning alpha=0.01, # Regularization fit_intercept=True, ) else: raise ValueError(f"Unsupported model type: {self.model_type}") # Initialize scaler as None - will be created during first fit self.scaler = None @property def name(self) -> str: """Get strategy name.""" return f"OnlineLearning({self.model_type.upper()})" @property def description(self) -> str: """Get strategy description.""" return ( f"Online learning strategy using {self.model_type} with streaming updates" ) def extract_features(self, data: DataFrame, end_idx: int) -> np.ndarray: """Extract features for online learning with enhanced stability. Args: data: Price data end_idx: End index for feature calculation Returns: Feature array with consistent dimensionality """ try: start_idx = max(0, end_idx - self.feature_window) window_data = data.iloc[start_idx : end_idx + 1] # Need minimum data for meaningful features if len(window_data) < max(5, self.feature_window // 4): return np.array([]) features = [] # Price features with error handling returns = window_data["close"].pct_change().dropna() if len(returns) == 0: return np.array([]) # Basic return statistics (robust to small samples) mean_return = returns.mean() if len(returns) > 0 else 0.0 std_return = returns.std() if len(returns) > 1 else 0.01 # Small default skew_return = returns.skew() if len(returns) > 3 else 0.0 kurt_return = returns.kurtosis() if len(returns) > 3 else 0.0 # Replace NaN/inf values features.extend( [ mean_return if np.isfinite(mean_return) else 0.0, std_return if np.isfinite(std_return) else 0.01, skew_return if np.isfinite(skew_return) else 0.0, kurt_return if np.isfinite(kurt_return) else 0.0, ] ) # Technical indicators with fallbacks current_price = window_data["close"].iloc[-1] # Short-term moving average ratio if len(window_data) >= 5: sma_5 = window_data["close"].rolling(5).mean().iloc[-1] features.append(current_price / sma_5 if sma_5 > 0 else 1.0) else: features.append(1.0) # Medium-term moving average ratio if len(window_data) >= 10: sma_10 = window_data["close"].rolling(10).mean().iloc[-1] features.append(current_price / sma_10 if sma_10 > 0 else 1.0) else: features.append(1.0) # Long-term moving average ratio (if enough data) if len(window_data) >= 20: sma_20 = window_data["close"].rolling(20).mean().iloc[-1] features.append(current_price / sma_20 if sma_20 > 0 else 1.0) else: features.append(1.0) # Volatility feature if len(returns) > 10: vol_ratio = std_return / returns.rolling(10).std().mean() features.append(vol_ratio if np.isfinite(vol_ratio) else 1.0) else: features.append(1.0) # Volume features (if available) if "volume" in window_data.columns and len(window_data) >= 5: current_volume = window_data["volume"].iloc[-1] volume_ma = window_data["volume"].rolling(5).mean().iloc[-1] volume_ratio = current_volume / volume_ma if volume_ma > 0 else 1.0 features.append(volume_ratio if np.isfinite(volume_ratio) else 1.0) # Volume trend if len(window_data) >= 10: volume_ma_long = window_data["volume"].rolling(10).mean().iloc[-1] volume_trend = ( volume_ma / volume_ma_long if volume_ma_long > 0 else 1.0 ) features.append(volume_trend if np.isfinite(volume_trend) else 1.0) else: features.append(1.0) else: features.extend([1.0, 1.0]) feature_array = np.array(features) # Validate feature consistency if self.expected_feature_count is None: self.expected_feature_count = len(feature_array) elif len(feature_array) != self.expected_feature_count: logger.warning( f"Feature count mismatch: expected {self.expected_feature_count}, got {len(feature_array)}" ) return np.array([]) # Check for any remaining NaN or inf values if not np.all(np.isfinite(feature_array)): logger.warning("Non-finite features detected, replacing with defaults") feature_array = np.nan_to_num( feature_array, nan=0.0, posinf=1.0, neginf=-1.0 ) return feature_array except Exception as e: logger.error(f"Error extracting features: {e}") return np.array([]) def create_target(self, data: DataFrame, idx: int, forward_periods: int = 3) -> int: """Create target variable for online learning. Args: data: Price data idx: Current index forward_periods: Periods to look forward Returns: Target class (0: sell, 1: hold, 2: buy) """ if idx + forward_periods >= len(data): return 1 # Hold as default current_price = data["close"].iloc[idx] future_price = data["close"].iloc[idx + forward_periods] return_threshold = 0.02 # 2% threshold forward_return = (future_price - current_price) / current_price if forward_return > return_threshold: return 2 # Buy elif forward_return < -return_threshold: return 0 # Sell else: return 1 # Hold def _initial_training(self, data: DataFrame, current_idx: int) -> bool: """Perform initial batch training on historical data. Args: data: Price data current_idx: Current index Returns: True if initial training successful """ try: if current_idx < self.initial_training_period: return False # Collect initial training data training_examples = [] training_targets = [] # Use a substantial portion of historical data for initial training start_idx = max( self.feature_window, current_idx - self.initial_training_period ) for idx in range( start_idx, current_idx - 10 ): # Leave some data for validation features = self.extract_features(data, idx) if len(features) > 0: target = self.create_target(data, idx) training_examples.append(features) training_targets.append(target) if len(training_examples) < self.min_training_samples: logger.debug( f"Insufficient training samples: {len(training_examples)} < {self.min_training_samples}" ) return False X = np.array(training_examples) y = np.array(training_targets) # Check for class balance unique_classes, class_counts = np.unique(y, return_counts=True) if len(unique_classes) < 2: logger.warning( f"Insufficient class diversity for training: {unique_classes}" ) return False # Initialize scaler with training data self.scaler = StandardScaler() X_scaled = self.scaler.fit_transform(X) # Train initial model self.model.fit(X_scaled, y) self.is_initial_trained = True self.is_trained = True self.training_samples_count = len(X) logger.info( f"Initial training completed with {len(X)} samples, classes: {dict(zip(unique_classes, class_counts, strict=False))}" ) return True except Exception as e: logger.error(f"Error in initial training: {e}") return False def update_model(self, data: DataFrame, current_idx: int) -> None: """Update online learning model with new data. Args: data: Price data current_idx: Current index """ # Perform initial training if not done yet if not self.is_initial_trained: if self._initial_training(data, current_idx): self.last_update = current_idx return # Check if update is needed if current_idx - self.last_update < self.update_frequency: return try: # Collect recent training examples for incremental update recent_examples = [] recent_targets = [] # Look back for recent training examples (smaller window for incremental updates) lookback_start = max(0, current_idx - self.update_frequency) for idx in range(lookback_start, current_idx): features = self.extract_features(data, idx) if len(features) > 0: target = self.create_target(data, idx) recent_examples.append(features) recent_targets.append(target) if len(recent_examples) < 2: # Need minimum samples for incremental update return X = np.array(recent_examples) y = np.array(recent_targets) # Scale features using existing scaler X_scaled = self.scaler.transform(X) # Incremental update using partial_fit # Ensure we have all classes that the model has seen before existing_classes = self.model.classes_ self.model.partial_fit(X_scaled, y, classes=existing_classes) self.training_samples_count += len(X) self.last_update = current_idx logger.debug( f"Updated online model at index {current_idx} with {len(X)} samples (total: {self.training_samples_count})" ) except Exception as e: logger.error(f"Error updating online model: {e}") # Reset training flag to attempt re-initialization if "partial_fit" in str(e).lower(): logger.warning("Partial fit failed, will attempt re-initialization") self.is_trained = False self.is_initial_trained = False def generate_signals(self, data: DataFrame) -> tuple[Series, Series]: """Generate signals using online learning. Args: data: Price data with OHLCV columns Returns: Tuple of (entry_signals, exit_signals) as boolean Series """ entry_signals = pd.Series(False, index=data.index) exit_signals = pd.Series(False, index=data.index) try: # Need minimum data for features start_idx = max(self.feature_window, self.initial_training_period + 10) if len(data) < start_idx: logger.warning( f"Insufficient data for online learning: {len(data)} < {start_idx}" ) return entry_signals, exit_signals for idx in range(start_idx, len(data)): # Update model periodically self.update_model(data, idx) if not self.is_trained or self.scaler is None: continue # Extract features for current point features = self.extract_features(data, idx) if len(features) == 0: continue try: # Make prediction with error handling X = self.scaler.transform([features]) prediction = self.model.predict(X)[0] # Get confidence score if hasattr(self.model, "predict_proba"): probabilities = self.model.predict_proba(X)[0] confidence = max(probabilities) else: # For models without predict_proba, use decision function if hasattr(self.model, "decision_function"): decision_values = self.model.decision_function(X)[0] # Convert to pseudo-probability (sigmoid-like) confidence = 1.0 / (1.0 + np.exp(-abs(decision_values))) else: confidence = 0.6 # Default confidence # Generate signals based on prediction and confidence if confidence >= self.confidence_threshold: if prediction == 2: # Buy signal entry_signals.iloc[idx] = True elif prediction == 0: # Sell signal exit_signals.iloc[idx] = True except Exception as pred_error: logger.debug(f"Prediction error at index {idx}: {pred_error}") continue # Log summary statistics total_entry_signals = entry_signals.sum() total_exit_signals = exit_signals.sum() logger.info( f"Generated {total_entry_signals} entry and {total_exit_signals} exit signals using online learning" ) except Exception as e: logger.error(f"Error generating online learning signals: {e}") return entry_signals, exit_signals def get_model_info(self) -> dict[str, Any]: """Get information about the online learning model. Returns: Dictionary with model information """ info = { "model_type": self.model_type, "is_trained": self.is_trained, "is_initial_trained": self.is_initial_trained, "feature_window": self.feature_window, "update_frequency": self.update_frequency, "confidence_threshold": self.confidence_threshold, "min_training_samples": self.min_training_samples, "initial_training_period": self.initial_training_period, "training_samples_count": self.training_samples_count, "expected_feature_count": self.expected_feature_count, } if hasattr(self.model, "coef_") and self.model.coef_ is not None: info["model_coefficients"] = self.model.coef_.tolist() if hasattr(self.model, "classes_") and self.model.classes_ is not None: info["model_classes"] = self.model.classes_.tolist() if self.scaler is not None: info["feature_scaling"] = { "mean": self.scaler.mean_.tolist() if hasattr(self.scaler, "mean_") else None, "scale": self.scaler.scale_.tolist() if hasattr(self.scaler, "scale_") else None, } return info class HybridAdaptiveStrategy(AdaptiveStrategy): """Hybrid strategy combining parameter adaptation with online learning.""" def __init__( self, base_strategy: Strategy, online_learning_weight: float = 0.3, **kwargs ): """Initialize hybrid adaptive strategy. Args: base_strategy: Base strategy to adapt online_learning_weight: Weight for online learning component **kwargs: Additional parameters for AdaptiveStrategy """ super().__init__(base_strategy, **kwargs) self.online_learning_weight = online_learning_weight self.online_strategy = OnlineLearningStrategy() @property def name(self) -> str: """Get strategy name.""" return f"HybridAdaptive({self.base_strategy.name})" @property def description(self) -> str: """Get strategy description.""" return "Hybrid adaptive strategy combining parameter adaptation with online learning" def generate_signals(self, data: DataFrame) -> tuple[Series, Series]: """Generate hybrid adaptive signals. Args: data: Price data with OHLCV columns Returns: Tuple of (entry_signals, exit_signals) as boolean Series """ # Get signals from adaptive base strategy adaptive_entry, adaptive_exit = super().generate_signals(data) # Get signals from online learning component online_entry, online_exit = self.online_strategy.generate_signals(data) # Combine signals with weighting base_weight = 1.0 - self.online_learning_weight # Weighted combination for entry signals combined_entry = ( adaptive_entry.astype(float) * base_weight + online_entry.astype(float) * self.online_learning_weight ) > 0.5 # Weighted combination for exit signals combined_exit = ( adaptive_exit.astype(float) * base_weight + online_exit.astype(float) * self.online_learning_weight ) > 0.5 return combined_entry, combined_exit def get_hybrid_info(self) -> dict[str, Any]: """Get information about hybrid strategy components. Returns: Dictionary with hybrid strategy information """ return { "adaptation_history": self.get_adaptation_history(), "online_learning_info": self.online_strategy.get_model_info(), "online_learning_weight": self.online_learning_weight, "base_weight": 1.0 - self.online_learning_weight, }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wshobson/maverick-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server