Skip to main content
Glama

MaverickMCP

by wshobson
MIT License
165
  • Apple
regime_aware.py34.5 kB
"""Market regime-aware trading strategies with automatic strategy switching.""" import logging from typing import Any import numpy as np import pandas as pd from pandas import DataFrame, Series from sklearn.cluster import KMeans from sklearn.mixture import GaussianMixture from sklearn.preprocessing import StandardScaler from maverick_mcp.backtesting.strategies.base import Strategy logger = logging.getLogger(__name__) class MarketRegimeDetector: """Detect market regimes using various statistical methods.""" def __init__( self, method: str = "hmm", n_regimes: int = 3, lookback_period: int = 50 ): """Initialize regime detector. Args: method: Detection method ('hmm', 'kmeans', 'threshold') n_regimes: Number of market regimes to detect lookback_period: Period for regime detection """ self.method = method self.n_regimes = n_regimes self.lookback_period = lookback_period self.scaler = StandardScaler() # Initialize detection model self.model = None self.is_fitted = False self._initialize_model() def _initialize_model(self): """Initialize regime detection model with better configurations.""" if self.method == "hmm": # Use GaussianMixture with more stable configuration self.model = GaussianMixture( n_components=self.n_regimes, covariance_type="diag", # Use diagonal covariance for stability random_state=42, max_iter=200, tol=1e-6, reg_covar=1e-6, # Regularization for numerical stability init_params="kmeans", # Better initialization warm_start=False, ) elif self.method == "kmeans": self.model = KMeans( n_clusters=self.n_regimes, random_state=42, n_init=10, max_iter=500, tol=1e-6, algorithm="lloyd", # More stable algorithm ) elif self.method == "threshold": # Threshold-based regime detection self.model = None else: raise ValueError(f"Unsupported regime detection method: {self.method}") def extract_regime_features(self, data: DataFrame) -> np.ndarray: """Extract robust features for regime detection. Args: data: Price data Returns: Feature array with consistent dimensionality and stability """ try: # Validate input data if data is None or data.empty or len(data) < 10: logger.debug("Insufficient data for regime feature extraction") return np.array([]) if "close" not in data.columns: logger.warning("Close price data not available for regime features") return np.array([]) features = [] returns = data["close"].pct_change().dropna() if len(returns) == 0: logger.debug("No valid returns data for regime features") return np.array([]) # Rolling statistics with robust error handling for window in [5, 10, 20]: if len(returns) >= window: window_returns = returns.rolling(window) mean_return = window_returns.mean().iloc[-1] std_return = window_returns.std().iloc[-1] # Robust skewness and kurtosis if window >= 5: skew_return = window_returns.skew().iloc[-1] kurt_return = window_returns.kurt().iloc[-1] else: skew_return = 0.0 kurt_return = 0.0 # Replace NaN/inf values with sensible defaults features.extend( [ mean_return if np.isfinite(mean_return) else 0.0, std_return if np.isfinite(std_return) else 0.01, skew_return if np.isfinite(skew_return) else 0.0, kurt_return if np.isfinite(kurt_return) else 0.0, ] ) else: # Default values for insufficient data features.extend([0.0, 0.01, 0.0, 0.0]) # Enhanced technical indicators for regime detection current_price = data["close"].iloc[-1] # Multiple timeframe trend strength if len(data) >= 20: # Short-term trend (20-day) sma_20 = data["close"].rolling(20).mean() sma_20_value = ( float(sma_20.iloc[-1]) if not pd.isna(sma_20.iloc[-1]) else 0.0 ) if sma_20_value != 0.0: trend_strength_20 = (current_price - sma_20_value) / sma_20_value else: trend_strength_20 = 0.0 features.append( trend_strength_20 if np.isfinite(trend_strength_20) else 0.0 ) # Price momentum (rate of change) prev_price = ( float(data["close"].iloc[-20]) if not pd.isna(data["close"].iloc[-20]) else current_price ) if prev_price != 0.0: momentum_20 = (current_price - prev_price) / prev_price else: momentum_20 = 0.0 features.append(momentum_20 if np.isfinite(momentum_20) else 0.0) else: features.extend([0.0, 0.0]) # Multi-timeframe volatility regime detection if len(returns) >= 20: vol_short = returns.rolling(20).std().iloc[-1] * np.sqrt( 252 ) # Annualized vol_medium = ( returns.rolling(60).std().iloc[-1] * np.sqrt(252) if len(returns) >= 60 else vol_short ) # Volatility regime indicator vol_regime = vol_short / vol_medium if vol_medium > 0 else 1.0 features.append(vol_regime if np.isfinite(vol_regime) else 1.0) # Absolute volatility level (normalized) vol_level = min(vol_short / 0.3, 3.0) # Cap at 3x of 30% volatility features.append(vol_level if np.isfinite(vol_level) else 1.0) else: features.extend([1.0, 1.0]) # Market structure and volume features (if available) if "volume" in data.columns and len(data) >= 10: current_volume = data["volume"].iloc[-1] # Volume trend if len(data) >= 20: volume_ma_short = data["volume"].rolling(10).mean().iloc[-1] volume_ma_long = data["volume"].rolling(20).mean().iloc[-1] volume_trend = ( volume_ma_short / volume_ma_long if volume_ma_long > 0 else 1.0 ) features.append(volume_trend if np.isfinite(volume_trend) else 1.0) # Volume surge indicator volume_surge = ( current_volume / volume_ma_long if volume_ma_long > 0 else 1.0 ) features.append( min(volume_surge, 10.0) if np.isfinite(volume_surge) else 1.0 ) else: features.extend([1.0, 1.0]) else: features.extend([1.0, 1.0]) # Price dispersion (high-low range analysis) if "high" in data.columns and "low" in data.columns and len(data) >= 10: hl_range = (data["high"] - data["low"]) / data["close"] avg_range = ( hl_range.rolling(20).mean().iloc[-1] if len(data) >= 20 else hl_range.mean() ) current_range = hl_range.iloc[-1] range_regime = current_range / avg_range if avg_range > 0 else 1.0 features.append(range_regime if np.isfinite(range_regime) else 1.0) else: features.append(1.0) feature_array = np.array(features) # Final validation and cleaning if len(feature_array) == 0: return np.array([]) # Replace any remaining NaN/inf values feature_array = np.nan_to_num( feature_array, nan=0.0, posinf=1.0, neginf=-1.0 ) return feature_array except Exception as e: logger.error(f"Error extracting regime features: {e}") return np.array([]) def detect_regime_threshold(self, data: DataFrame) -> int: """Detect regime using threshold-based method. Args: data: Price data Returns: Regime label (0: bear/declining, 1: sideways, 2: bull/trending) """ if len(data) < 20: return 1 # Default to sideways # Calculate trend and volatility measures returns = data["close"].pct_change() # Trend measure (20-day slope) x = np.arange(20) y = data["close"].iloc[-20:].values trend_slope = np.polyfit(x, y, 1)[0] / y[-1] # Normalized slope # Volatility measure vol_20 = returns.rolling(20).std().iloc[-1] * np.sqrt(252) # Define regime thresholds trend_threshold = 0.001 # 0.1% daily trend threshold vol_threshold = 0.25 # 25% annual volatility threshold # Classify regime if trend_slope > trend_threshold and vol_20 < vol_threshold: return 2 # Bull/trending market elif trend_slope < -trend_threshold and vol_20 > vol_threshold: return 0 # Bear/declining market else: return 1 # Sideways/uncertain market def fit_regimes(self, data: DataFrame) -> None: """Fit regime detection model to historical data with enhanced robustness. Args: data: Historical price data """ if self.method == "threshold": self.is_fitted = True return try: # Need sufficient data for stable regime detection min_required_samples = max(50, self.n_regimes * 20) if len(data) < min_required_samples + self.lookback_period: logger.warning( f"Insufficient data for regime fitting: {len(data)} < {min_required_samples + self.lookback_period}" ) self.is_fitted = True return # Extract features for regime detection with temporal consistency feature_list = [] feature_consistency_count = None # Use overlapping windows for more stable regime detection step_size = max(1, self.lookback_period // 10) for i in range(self.lookback_period, len(data), step_size): window_data = data.iloc[max(0, i - self.lookback_period) : i + 1] features = self.extract_regime_features(window_data) if len(features) > 0 and np.all(np.isfinite(features)): # Check feature consistency if feature_consistency_count is None: feature_consistency_count = len(features) elif len(features) != feature_consistency_count: logger.warning( f"Feature dimension mismatch: expected {feature_consistency_count}, got {len(features)}" ) continue feature_list.append(features) if len(feature_list) < min_required_samples: logger.warning( f"Insufficient valid samples for regime fitting: {len(feature_list)} < {min_required_samples}" ) self.is_fitted = True return # Ensure we have valid feature_list before creating array if len(feature_list) == 0: logger.warning( "Empty feature list after filtering, cannot create feature matrix" ) self.is_fitted = True return X = np.array(feature_list) # Additional data quality checks if X.size == 0: logger.warning("Empty feature matrix, cannot fit regime detector") self.is_fitted = True return elif np.any(np.isnan(X)) or np.any(np.isinf(X)): logger.warning("Found NaN or inf values in feature matrix, cleaning...") X = np.nan_to_num(X, nan=0.0, posinf=1.0, neginf=-1.0) # Check for zero variance features feature_std = np.std(X, axis=0) zero_variance_features = np.where(feature_std < 1e-8)[0] if len(zero_variance_features) > 0: logger.debug( f"Found {len(zero_variance_features)} zero-variance features" ) # Add small noise to zero-variance features for idx in zero_variance_features: X[:, idx] += np.random.normal(0, 1e-6, X.shape[0]) # Scale features with robust scaler X_scaled = self.scaler.fit_transform(X) # Fit model with better error handling try: if self.method == "hmm": # For GaussianMixture, ensure numerical stability self.model.fit(X_scaled) # Validate fitted model if ( not hasattr(self.model, "weights_") or len(self.model.weights_) != self.n_regimes ): raise ValueError("Model fitting failed - invalid weights") # Check convergence if not self.model.converged_: logger.warning( "GaussianMixture did not converge, but will proceed" ) elif self.method == "kmeans": self.model.fit(X_scaled) # Validate fitted model if ( not hasattr(self.model, "cluster_centers_") or len(self.model.cluster_centers_) != self.n_regimes ): raise ValueError( "KMeans fitting failed - invalid cluster centers" ) self.is_fitted = True # Log fitting success with model diagnostics if self.method == "hmm": avg_log_likelihood = self.model.score(X_scaled) / len(X_scaled) logger.info( f"Fitted {self.method} regime detector with {len(X)} samples, avg log-likelihood: {avg_log_likelihood:.4f}" ) else: inertia = ( self.model.inertia_ if hasattr(self.model, "inertia_") else "N/A" ) logger.info( f"Fitted {self.method} regime detector with {len(X)} samples, inertia: {inertia}" ) except Exception as model_error: logger.error(f"Model fitting failed: {model_error}") logger.info("Falling back to threshold method") self.method = "threshold" # Fallback to threshold method self.is_fitted = True except Exception as e: logger.error(f"Error fitting regime detector: {e}") self.is_fitted = True # Allow fallback to threshold method def detect_current_regime(self, data: DataFrame) -> int: """Detect current market regime with enhanced error handling. Args: data: Recent price data Returns: Regime label (0: bear, 1: sideways, 2: bull) """ if not self.is_fitted: logger.debug("Regime detector not fitted, using threshold method") return self.detect_regime_threshold(data) try: if self.method == "threshold": return self.detect_regime_threshold(data) # Extract features for current regime features = self.extract_regime_features(data) if len(features) == 0: logger.debug("No features extracted, falling back to threshold method") return self.detect_regime_threshold(data) # Check for non-finite features only if features array is not empty if features.size > 0 and np.any(~np.isfinite(features)): logger.debug("Non-finite features detected, cleaning and proceeding") features = np.nan_to_num(features, nan=0.0, posinf=1.0, neginf=-1.0) # Validate feature consistency with training expected_features = ( self.scaler.n_features_in_ if hasattr(self.scaler, "n_features_in_") else None ) if expected_features is not None and len(features) != expected_features: logger.warning( f"Feature count mismatch in prediction: expected {expected_features}, got {len(features)}" ) return self.detect_regime_threshold(data) # Scale features and predict regime try: X = self.scaler.transform([features]) regime = self.model.predict(X)[0] # Validate regime prediction if regime < 0 or regime >= self.n_regimes: logger.warning( f"Invalid regime prediction: {regime}, using threshold method" ) return self.detect_regime_threshold(data) return int(regime) except Exception as pred_error: logger.debug( f"Prediction error: {pred_error}, falling back to threshold method" ) return self.detect_regime_threshold(data) except Exception as e: logger.error(f"Error detecting current regime: {e}") return self.detect_regime_threshold(data) # Always fallback to threshold def get_regime_probabilities(self, data: DataFrame) -> np.ndarray: """Get probabilities for each regime. Args: data: Recent price data Returns: Array of regime probabilities """ if not self.is_fitted or self.method == "threshold": # For threshold method, return deterministic probabilities regime = self.detect_current_regime(data) probs = np.zeros(self.n_regimes) probs[regime] = 1.0 return probs try: features = self.extract_regime_features(data) if len(features) == 0: return np.ones(self.n_regimes) / self.n_regimes elif features.size > 0 and np.any(np.isnan(features)): return np.ones(self.n_regimes) / self.n_regimes X = self.scaler.transform([features]) if hasattr(self.model, "predict_proba"): return self.model.predict_proba(X)[0] else: # For methods without probabilities, return one-hot encoding regime = self.model.predict(X)[0] probs = np.zeros(self.n_regimes) probs[regime] = 1.0 return probs except Exception as e: logger.error(f"Error getting regime probabilities: {e}") return np.ones(self.n_regimes) / self.n_regimes class RegimeAwareStrategy(Strategy): """Strategy that switches between different strategies based on market regime.""" def __init__( self, regime_strategies: dict[int, Strategy], regime_detector: MarketRegimeDetector = None, regime_names: dict[int, str] = None, switch_threshold: float = 0.7, min_regime_duration: int = 5, parameters: dict[str, Any] = None, ): """Initialize regime-aware strategy. Args: regime_strategies: Dictionary mapping regime labels to strategies regime_detector: Market regime detector instance regime_names: Names for each regime switch_threshold: Probability threshold for regime switching min_regime_duration: Minimum duration before switching regimes parameters: Additional parameters """ super().__init__(parameters) self.regime_strategies = regime_strategies self.regime_detector = regime_detector or MarketRegimeDetector() self.regime_names = regime_names or {0: "Bear", 1: "Sideways", 2: "Bull"} self.switch_threshold = switch_threshold self.min_regime_duration = min_regime_duration # Regime tracking self.current_regime = 1 # Start with sideways self.regime_history = [] self.regime_duration = 0 self.regime_switches = 0 @property def name(self) -> str: """Get strategy name.""" strategy_names = [s.name for s in self.regime_strategies.values()] return f"RegimeAware({','.join(strategy_names)})" @property def description(self) -> str: """Get strategy description.""" return f"Regime-aware strategy switching between {len(self.regime_strategies)} strategies based on market conditions" def fit_regime_detector(self, data: DataFrame) -> None: """Fit regime detector to historical data. Args: data: Historical price data """ self.regime_detector.fit_regimes(data) def update_current_regime(self, data: DataFrame, current_idx: int) -> bool: """Update current market regime. Args: data: Price data current_idx: Current index in data Returns: True if regime changed, False otherwise """ # Get regime probabilities window_data = data.iloc[ max(0, current_idx - self.regime_detector.lookback_period) : current_idx + 1 ] regime_probs = self.regime_detector.get_regime_probabilities(window_data) # Find most likely regime most_likely_regime = np.argmax(regime_probs) max_prob = regime_probs[most_likely_regime] # Check if we should switch regimes regime_changed = False if ( most_likely_regime != self.current_regime and max_prob >= self.switch_threshold and self.regime_duration >= self.min_regime_duration ): old_regime = self.current_regime self.current_regime = most_likely_regime self.regime_duration = 0 self.regime_switches += 1 regime_changed = True logger.info( f"Regime switch: {self.regime_names.get(old_regime, old_regime)} -> " f"{self.regime_names.get(self.current_regime, self.current_regime)} " f"(prob: {max_prob:.3f})" ) else: self.regime_duration += 1 # Track regime history self.regime_history.append( { "index": current_idx, "regime": self.current_regime, "probabilities": regime_probs.tolist(), "duration": self.regime_duration, "switched": regime_changed, } ) return regime_changed def get_active_strategy(self) -> Strategy: """Get currently active strategy based on regime. Returns: Active strategy for current regime """ if self.current_regime in self.regime_strategies: return self.regime_strategies[self.current_regime] else: # Fallback to first available strategy return next(iter(self.regime_strategies.values())) def generate_signals(self, data: DataFrame) -> tuple[Series, Series]: """Generate regime-aware trading signals. Args: data: Price data with OHLCV columns Returns: Tuple of (entry_signals, exit_signals) as boolean Series """ try: # Validate input data if data is None or len(data) == 0: logger.warning("Empty or invalid data provided to generate_signals") # Create empty Series with a dummy index to avoid empty array issues dummy_index = pd.DatetimeIndex([pd.Timestamp.now()]) return pd.Series(False, index=dummy_index), pd.Series( False, index=dummy_index ) # Ensure minimum data requirements min_required_data = max(50, self.regime_detector.lookback_period) if len(data) < min_required_data: logger.warning( f"Insufficient data for regime-aware strategy: {len(data)} < {min_required_data}" ) # Return all False signals but with valid data index return pd.Series(False, index=data.index), pd.Series( False, index=data.index ) # Fit regime detector if not already done if not self.regime_detector.is_fitted: try: self.fit_regime_detector(data) except Exception as e: logger.error( f"Failed to fit regime detector: {e}, falling back to single strategy" ) # Fallback to using first available strategy without regime switching fallback_strategy = next(iter(self.regime_strategies.values())) return fallback_strategy.generate_signals(data) entry_signals = pd.Series(False, index=data.index) exit_signals = pd.Series(False, index=data.index) # Generate signals with regime awareness current_strategy = None for idx in range(len(data)): # Update regime regime_changed = self.update_current_regime(data, idx) # Get active strategy active_strategy = self.get_active_strategy() # If regime changed, regenerate signals from new strategy if regime_changed or current_strategy != active_strategy: current_strategy = active_strategy # Generate signals for remaining data remaining_data = data.iloc[idx:] if len(remaining_data) > 0: strategy_entry, strategy_exit = ( current_strategy.generate_signals(remaining_data) ) # Update signals for remaining period end_idx = min(idx + len(strategy_entry), len(data)) entry_signals.iloc[idx:end_idx] = strategy_entry.iloc[ : end_idx - idx ] exit_signals.iloc[idx:end_idx] = strategy_exit.iloc[ : end_idx - idx ] logger.info( f"Generated regime-aware signals with {self.regime_switches} regime switches" ) return entry_signals, exit_signals except Exception as e: logger.error(f"Error generating regime-aware signals: {e}") # Ensure we always return valid series even on error if data is not None and len(data) > 0: return pd.Series(False, index=data.index), pd.Series( False, index=data.index ) else: # Create dummy index to avoid empty array issues dummy_index = pd.DatetimeIndex([pd.Timestamp.now()]) return pd.Series(False, index=dummy_index), pd.Series( False, index=dummy_index ) def get_regime_analysis(self) -> dict[str, Any]: """Get analysis of regime detection and switching. Returns: Dictionary with regime analysis """ if not self.regime_history: return {} regime_counts = {} regime_durations = {} for record in self.regime_history: regime = record["regime"] regime_name = self.regime_names.get(regime, f"Regime_{regime}") if regime_name not in regime_counts: regime_counts[regime_name] = 0 regime_durations[regime_name] = [] regime_counts[regime_name] += 1 # Track regime durations if record["switched"] and len(self.regime_history) > 1: # Find duration of previous regime prev_regime_start = 0 for i in range(len(self.regime_history) - 2, -1, -1): if ( self.regime_history[i]["regime"] != self.regime_history[-1]["regime"] ): prev_regime_start = i + 1 break duration = len(self.regime_history) - prev_regime_start - 1 prev_regime = self.regime_history[prev_regime_start]["regime"] prev_regime_name = self.regime_names.get( prev_regime, f"Regime_{prev_regime}" ) if prev_regime_name in regime_durations: regime_durations[prev_regime_name].append(duration) # Calculate average durations avg_durations = {} for regime_name, durations in regime_durations.items(): if durations: avg_durations[regime_name] = np.mean(durations) else: avg_durations[regime_name] = 0 return { "current_regime": self.regime_names.get( self.current_regime, self.current_regime ), "total_switches": self.regime_switches, "regime_counts": regime_counts, "average_regime_durations": avg_durations, "regime_history": self.regime_history[-50:], # Last 50 records "active_strategy": self.get_active_strategy().name, } def validate_parameters(self) -> bool: """Validate regime-aware strategy parameters. Returns: True if parameters are valid """ if not self.regime_strategies: return False if self.switch_threshold < 0 or self.switch_threshold > 1: return False if self.min_regime_duration < 0: return False # Validate individual strategies for strategy in self.regime_strategies.values(): if not strategy.validate_parameters(): return False return True def get_default_parameters(self) -> dict[str, Any]: """Get default parameters for regime-aware strategy. Returns: Dictionary of default parameters """ return { "switch_threshold": 0.7, "min_regime_duration": 5, "regime_detection_method": "hmm", "n_regimes": 3, "lookback_period": 50, } class AdaptiveRegimeStrategy(RegimeAwareStrategy): """Advanced regime-aware strategy with adaptive regime detection.""" def __init__( self, regime_strategies: dict[int, Strategy], adaptation_frequency: int = 100, regime_confidence_threshold: float = 0.6, **kwargs, ): """Initialize adaptive regime strategy. Args: regime_strategies: Dictionary mapping regime labels to strategies adaptation_frequency: How often to re-fit regime detector regime_confidence_threshold: Minimum confidence for regime detection **kwargs: Additional parameters for RegimeAwareStrategy """ super().__init__(regime_strategies, **kwargs) self.adaptation_frequency = adaptation_frequency self.regime_confidence_threshold = regime_confidence_threshold self.last_adaptation = 0 @property def name(self) -> str: """Get strategy name.""" return f"Adaptive{super().name}" def adapt_regime_detector(self, data: DataFrame, current_idx: int) -> None: """Re-fit regime detector with recent data. Args: data: Price data current_idx: Current index """ if current_idx - self.last_adaptation < self.adaptation_frequency: return try: # Use recent data for adaptation adaptation_data = data.iloc[max(0, current_idx - 500) : current_idx] if len(adaptation_data) >= self.regime_detector.lookback_period: logger.info(f"Adapting regime detector at index {current_idx}") self.regime_detector.fit_regimes(adaptation_data) self.last_adaptation = current_idx except Exception as e: logger.error(f"Error adapting regime detector: {e}") def generate_signals(self, data: DataFrame) -> tuple[Series, Series]: """Generate adaptive regime-aware signals. Args: data: Price data with OHLCV columns Returns: Tuple of (entry_signals, exit_signals) as boolean Series """ # Periodically adapt regime detector for idx in range( self.adaptation_frequency, len(data), self.adaptation_frequency ): self.adapt_regime_detector(data, idx) # Generate signals using parent method return super().generate_signals(data)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wshobson/maverick-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server