Skip to main content
Glama

MaverickMCP

by wshobson
MIT License
165
  • Apple
feature_engineering.py28.8 kB
"""Feature engineering for ML trading strategies.""" import logging from typing import Any import numpy as np import pandas as pd import pandas_ta as ta from pandas import DataFrame, Series from sklearn.ensemble import RandomForestClassifier from sklearn.preprocessing import StandardScaler logger = logging.getLogger(__name__) class FeatureExtractor: """Extract technical and statistical features for ML models.""" def __init__(self, lookback_periods: list[int] = None): """Initialize feature extractor. Args: lookback_periods: Lookback periods for rolling features """ self.lookback_periods = lookback_periods or [5, 10, 20, 50] self.scaler = StandardScaler() def extract_price_features(self, data: DataFrame) -> DataFrame: """Extract price-based features. Args: data: OHLCV price data Returns: DataFrame with price features """ features = pd.DataFrame(index=data.index) # Normalize column names to handle both cases high = data.get("high", data.get("High")) low = data.get("low", data.get("Low")) close = data.get("close", data.get("Close")) open_ = data.get("open", data.get("Open")) # Safe division helper function def safe_divide(numerator, denominator, default=0.0): """Safely divide two values, handling None, NaN, and zero cases.""" if numerator is None or denominator is None: return default # Convert to numpy arrays to handle pandas Series num = np.asarray(numerator) den = np.asarray(denominator) # Use numpy divide with where condition for safety return np.divide( num, den, out=np.full_like(num, default, dtype=float), where=(den != 0) ) # Price ratios and spreads with safe division features["high_low_ratio"] = safe_divide(high, low, 1.0) features["close_open_ratio"] = safe_divide(close, open_, 1.0) features["hl_spread"] = ( safe_divide(high - low, close, 0.0) if high is not None and low is not None and close is not None else 0.0 ) features["co_spread"] = ( safe_divide(close - open_, open_, 0.0) if close is not None and open_ is not None else 0.0 ) # Returns with safe calculation if close is not None: features["returns"] = close.pct_change().fillna(0) # Safe log returns calculation price_ratio = safe_divide(close, close.shift(1), 1.0) features["log_returns"] = np.log( np.maximum(price_ratio, 1e-8) ) # Prevent log(0) else: features["returns"] = 0 features["log_returns"] = 0 # Volume features with safe calculations volume = data.get("volume", data.get("Volume")) if volume is not None and close is not None: volume_ma = volume.rolling(20).mean() features["volume_ma_ratio"] = safe_divide(volume, volume_ma, 1.0) features["price_volume"] = close * volume features["volume_returns"] = volume.pct_change().fillna(0) else: features["volume_ma_ratio"] = 1.0 features["price_volume"] = 0.0 features["volume_returns"] = 0.0 return features def extract_technical_features(self, data: DataFrame) -> DataFrame: """Extract technical indicator features. Args: data: OHLCV price data Returns: DataFrame with technical features """ features = pd.DataFrame(index=data.index) # Normalize column names close = data.get("close", data.get("Close")) high = data.get("high", data.get("High")) low = data.get("low", data.get("Low")) # Safe division helper (reused from price features) def safe_divide(numerator, denominator, default=0.0): """Safely divide two values, handling None, NaN, and zero cases.""" if numerator is None or denominator is None: return default # Convert to numpy arrays to handle pandas Series num = np.asarray(numerator) den = np.asarray(denominator) # Use numpy divide with where condition for safety return np.divide( num, den, out=np.full_like(num, default, dtype=float), where=(den != 0) ) # Moving averages with safe calculations for period in self.lookback_periods: if close is not None: sma = ta.sma(close, length=period) ema = ta.ema(close, length=period) features[f"sma_{period}_ratio"] = safe_divide(close, sma, 1.0) features[f"ema_{period}_ratio"] = safe_divide(close, ema, 1.0) features[f"sma_ema_diff_{period}"] = ( safe_divide(sma - ema, close, 0.0) if sma is not None and ema is not None else 0.0 ) else: features[f"sma_{period}_ratio"] = 1.0 features[f"ema_{period}_ratio"] = 1.0 features[f"sma_ema_diff_{period}"] = 0.0 # RSI rsi = ta.rsi(close, length=14) features["rsi"] = rsi features["rsi_oversold"] = (rsi < 30).astype(int) features["rsi_overbought"] = (rsi > 70).astype(int) # MACD macd = ta.macd(close) if macd is not None and not macd.empty: macd_cols = macd.columns macd_col = [ col for col in macd_cols if "MACD" in col and "h" not in col and "s" not in col.lower() ] signal_col = [ col for col in macd_cols if "signal" in col.lower() or "MACDs" in col ] hist_col = [ col for col in macd_cols if "hist" in col.lower() or "MACDh" in col ] if macd_col: features["macd"] = macd[macd_col[0]] else: features["macd"] = 0 if signal_col: features["macd_signal"] = macd[signal_col[0]] else: features["macd_signal"] = 0 if hist_col: features["macd_histogram"] = macd[hist_col[0]] else: features["macd_histogram"] = 0 features["macd_bullish"] = ( features["macd"] > features["macd_signal"] ).astype(int) else: features["macd"] = 0 features["macd_signal"] = 0 features["macd_histogram"] = 0 features["macd_bullish"] = 0 # Bollinger Bands bb = ta.bbands(close, length=20) if bb is not None and not bb.empty: # Handle different pandas_ta versions that may have different column names bb_cols = bb.columns upper_col = [ col for col in bb_cols if "BBU" in col or "upper" in col.lower() ] middle_col = [ col for col in bb_cols if "BBM" in col or "middle" in col.lower() ] lower_col = [ col for col in bb_cols if "BBL" in col or "lower" in col.lower() ] if upper_col and middle_col and lower_col: features["bb_upper"] = bb[upper_col[0]] features["bb_middle"] = bb[middle_col[0]] features["bb_lower"] = bb[lower_col[0]] # Safe BB position calculation bb_width = features["bb_upper"] - features["bb_lower"] features["bb_position"] = safe_divide( close - features["bb_lower"], bb_width, 0.5 ) features["bb_squeeze"] = safe_divide( bb_width, features["bb_middle"], 0.1 ) else: # Fallback to manual calculation with safe operations if close is not None: sma_20 = close.rolling(20).mean() std_20 = close.rolling(20).std() features["bb_upper"] = sma_20 + (std_20 * 2) features["bb_middle"] = sma_20 features["bb_lower"] = sma_20 - (std_20 * 2) # Safe BB calculations bb_width = features["bb_upper"] - features["bb_lower"] features["bb_position"] = safe_divide( close - features["bb_lower"], bb_width, 0.5 ) features["bb_squeeze"] = safe_divide( bb_width, features["bb_middle"], 0.1 ) else: features["bb_upper"] = 0 features["bb_middle"] = 0 features["bb_lower"] = 0 features["bb_position"] = 0.5 features["bb_squeeze"] = 0.1 else: # Manual calculation fallback with safe operations if close is not None: sma_20 = close.rolling(20).mean() std_20 = close.rolling(20).std() features["bb_upper"] = sma_20 + (std_20 * 2) features["bb_middle"] = sma_20 features["bb_lower"] = sma_20 - (std_20 * 2) # Safe BB calculations bb_width = features["bb_upper"] - features["bb_lower"] features["bb_position"] = safe_divide( close - features["bb_lower"], bb_width, 0.5 ) features["bb_squeeze"] = safe_divide( bb_width, features["bb_middle"], 0.1 ) else: features["bb_upper"] = 0 features["bb_middle"] = 0 features["bb_lower"] = 0 features["bb_position"] = 0.5 features["bb_squeeze"] = 0.1 # Stochastic stoch = ta.stoch(high, low, close) if stoch is not None and not stoch.empty: stoch_cols = stoch.columns k_col = [col for col in stoch_cols if "k" in col.lower()] d_col = [col for col in stoch_cols if "d" in col.lower()] if k_col: features["stoch_k"] = stoch[k_col[0]] else: features["stoch_k"] = 50 if d_col: features["stoch_d"] = stoch[d_col[0]] else: features["stoch_d"] = 50 else: features["stoch_k"] = 50 features["stoch_d"] = 50 # ATR (Average True Range) with safe calculation if high is not None and low is not None and close is not None: features["atr"] = ta.atr(high, low, close) features["atr_ratio"] = safe_divide( features["atr"], close, 0.02 ) # Default 2% ATR ratio else: features["atr"] = 0 features["atr_ratio"] = 0.02 return features def extract_statistical_features(self, data: DataFrame) -> DataFrame: """Extract statistical features. Args: data: OHLCV price data Returns: DataFrame with statistical features """ features = pd.DataFrame(index=data.index) # Safe division helper function def safe_divide(numerator, denominator, default=0.0): """Safely divide two values, handling None, NaN, and zero cases.""" if numerator is None or denominator is None: return default # Convert to numpy arrays to handle pandas Series num = np.asarray(numerator) den = np.asarray(denominator) # Use numpy divide with where condition for safety return np.divide( num, den, out=np.full_like(num, default, dtype=float), where=(den != 0) ) # Rolling statistics for period in self.lookback_periods: returns = data["close"].pct_change() # Volatility with safe calculations vol_short = returns.rolling(period).std() vol_long = returns.rolling(period * 2).std() features[f"volatility_{period}"] = vol_short features[f"volatility_ratio_{period}"] = safe_divide( vol_short, vol_long, 1.0 ) # Skewness and Kurtosis features[f"skewness_{period}"] = returns.rolling(period).skew() features[f"kurtosis_{period}"] = returns.rolling(period).kurt() # Min/Max ratios with safe division if "high" in data.columns and "low" in data.columns: rolling_high = data["high"].rolling(period).max() rolling_low = data["low"].rolling(period).min() features[f"high_ratio_{period}"] = safe_divide( data["close"], rolling_high, 1.0 ) features[f"low_ratio_{period}"] = safe_divide( data["close"], rolling_low, 1.0 ) else: features[f"high_ratio_{period}"] = 1.0 features[f"low_ratio_{period}"] = 1.0 # Momentum features with safe division features[f"momentum_{period}"] = safe_divide( data["close"], data["close"].shift(period), 1.0 ) features[f"roc_{period}"] = data["close"].pct_change(periods=period) return features def extract_microstructure_features(self, data: DataFrame) -> DataFrame: """Extract market microstructure features. Args: data: OHLCV price data Returns: DataFrame with microstructure features """ features = pd.DataFrame(index=data.index) # Safe division helper function def safe_divide(numerator, denominator, default=0.0): """Safely divide two values, handling None, NaN, and zero cases.""" if numerator is None or denominator is None: return default # Convert to numpy arrays to handle pandas Series num = np.asarray(numerator) den = np.asarray(denominator) # Use numpy divide with where condition for safety return np.divide( num, den, out=np.full_like(num, default, dtype=float), where=(den != 0) ) # Bid-ask spread proxy (high-low spread) with safe calculation if "high" in data.columns and "low" in data.columns: mid_price = (data["high"] + data["low"]) / 2 features["spread_proxy"] = safe_divide( data["high"] - data["low"], mid_price, 0.02 ) else: features["spread_proxy"] = 0.02 # Price impact measures with safe calculations if "volume" in data.columns: returns_abs = abs(data["close"].pct_change()) features["amihud_illiquidity"] = safe_divide( returns_abs, data["volume"], 0.0 ) if "high" in data.columns and "low" in data.columns: features["volume_weighted_price"] = ( data["high"] + data["low"] + data["close"] ) / 3 else: features["volume_weighted_price"] = data["close"] else: features["amihud_illiquidity"] = 0.0 features["volume_weighted_price"] = data.get("close", 0.0) # Intraday patterns with safe calculations if "open" in data.columns and "close" in data.columns: prev_close = data["close"].shift(1) features["open_gap"] = safe_divide( data["open"] - prev_close, prev_close, 0.0 ) else: features["open_gap"] = 0.0 if "high" in data.columns and "low" in data.columns and "close" in data.columns: features["close_to_high"] = safe_divide( data["high"] - data["close"], data["close"], 0.0 ) features["close_to_low"] = safe_divide( data["close"] - data["low"], data["close"], 0.0 ) else: features["close_to_high"] = 0.0 features["close_to_low"] = 0.0 return features def create_target_variable( self, data: DataFrame, forward_periods: int = 5, threshold: float = 0.02 ) -> Series: """Create target variable for classification. Args: data: Price data forward_periods: Number of periods to look forward threshold: Return threshold for classification Returns: Target variable (0: sell, 1: hold, 2: buy) """ close = data.get("close", data.get("Close")) forward_returns = close.pct_change(periods=forward_periods).shift( -forward_periods ) target = pd.Series(1, index=data.index) # Default to hold target[forward_returns > threshold] = 2 # Buy target[forward_returns < -threshold] = 0 # Sell return target def extract_all_features(self, data: DataFrame) -> DataFrame: """Extract all features for ML model. Args: data: OHLCV price data Returns: DataFrame with all features """ try: # Validate input data if data is None or data.empty: logger.warning("Empty or None data provided to extract_all_features") return pd.DataFrame() # Extract all feature types with individual error handling feature_dfs = [] try: price_features = self.extract_price_features(data) if not price_features.empty: feature_dfs.append(price_features) except Exception as e: logger.warning(f"Failed to extract price features: {e}") # Create empty DataFrame with same index as fallback price_features = pd.DataFrame(index=data.index) try: technical_features = self.extract_technical_features(data) if not technical_features.empty: feature_dfs.append(technical_features) except Exception as e: logger.warning(f"Failed to extract technical features: {e}") try: statistical_features = self.extract_statistical_features(data) if not statistical_features.empty: feature_dfs.append(statistical_features) except Exception as e: logger.warning(f"Failed to extract statistical features: {e}") try: microstructure_features = self.extract_microstructure_features(data) if not microstructure_features.empty: feature_dfs.append(microstructure_features) except Exception as e: logger.warning(f"Failed to extract microstructure features: {e}") # Combine all successfully extracted features if feature_dfs: all_features = pd.concat(feature_dfs, axis=1) else: # Fallback: create minimal feature set logger.warning( "No features extracted successfully, creating minimal fallback features" ) all_features = pd.DataFrame( { "returns": data.get("close", pd.Series(0, index=data.index)) .pct_change() .fillna(0), "close": data.get("close", pd.Series(0, index=data.index)), }, index=data.index, ) # Handle missing values with robust method if not all_features.empty: # Forward fill, then backward fill, then zero fill all_features = all_features.ffill().bfill().fillna(0) # Replace any infinite values all_features = all_features.replace([np.inf, -np.inf], 0) logger.info( f"Extracted {len(all_features.columns)} features for {len(all_features)} data points" ) else: logger.warning("No features could be extracted") return all_features except Exception as e: logger.error(f"Critical error extracting features: {e}") # Return minimal fallback instead of raising return pd.DataFrame( { "returns": pd.Series( 0, index=data.index if data is not None else [0] ), "close": pd.Series( 0, index=data.index if data is not None else [0] ), } ) class MLPredictor: """Machine learning predictor for trading signals.""" def __init__(self, model_type: str = "random_forest", **model_params): """Initialize ML predictor. Args: model_type: Type of ML model to use **model_params: Model parameters """ self.model_type = model_type self.model_params = model_params self.model = None self.scaler = StandardScaler() self.feature_extractor = FeatureExtractor() self.is_trained = False def _create_model(self): """Create ML model based on type.""" if self.model_type == "random_forest": self.model = RandomForestClassifier( n_estimators=self.model_params.get("n_estimators", 100), max_depth=self.model_params.get("max_depth", 10), random_state=self.model_params.get("random_state", 42), **{ k: v for k, v in self.model_params.items() if k not in ["n_estimators", "max_depth", "random_state"] }, ) else: raise ValueError(f"Unsupported model type: {self.model_type}") def prepare_data( self, data: DataFrame, target_periods: int = 5, return_threshold: float = 0.02 ) -> tuple[DataFrame, Series]: """Prepare features and target for training. Args: data: OHLCV price data target_periods: Periods to look forward for target return_threshold: Return threshold for classification Returns: Tuple of (features, target) """ # Extract features features = self.feature_extractor.extract_all_features(data) # Create target variable target = self.feature_extractor.create_target_variable( data, target_periods, return_threshold ) # Align features and target (remove NaN values) valid_idx = features.dropna().index.intersection(target.dropna().index) features = features.loc[valid_idx] target = target.loc[valid_idx] return features, target def train( self, data: DataFrame, target_periods: int = 5, return_threshold: float = 0.02 ) -> dict[str, Any]: """Train the ML model. Args: data: OHLCV price data target_periods: Periods to look forward for target return_threshold: Return threshold for classification Returns: Training metrics """ try: # Prepare data features, target = self.prepare_data(data, target_periods, return_threshold) if len(features) == 0: raise ValueError("No valid training data available") # Create and train model self._create_model() # Scale features features_scaled = self.scaler.fit_transform(features) # Train model self.model.fit(features_scaled, target) self.is_trained = True # Calculate training metrics train_score = self.model.score(features_scaled, target) # Convert numpy int64 to Python int for JSON serialization target_dist = target.value_counts().to_dict() target_dist = {int(k): int(v) for k, v in target_dist.items()} metrics = { "train_accuracy": float( train_score ), # Convert numpy float to Python float "n_samples": int(len(features)), "n_features": int(len(features.columns)), "target_distribution": target_dist, } # Feature importance (if available) if hasattr(self.model, "feature_importances_"): # Convert numpy floats to Python floats feature_importance = { str(col): float(imp) for col, imp in zip( features.columns, self.model.feature_importances_, strict=False ) } metrics["feature_importance"] = feature_importance logger.info(f"Model trained successfully: {metrics}") return metrics except Exception as e: logger.error(f"Error training model: {e}") raise def generate_signals(self, data: DataFrame) -> tuple[Series, Series]: """Generate trading signals using the trained model. Alias for predict() to match the expected interface. Args: data: OHLCV price data Returns: Tuple of (entry_signals, exit_signals) """ return self.predict(data) def predict(self, data: DataFrame) -> tuple[Series, Series]: """Generate trading signals using the trained model. Args: data: OHLCV price data Returns: Tuple of (entry_signals, exit_signals) """ if not self.is_trained: raise ValueError("Model must be trained before making predictions") try: # Extract features features = self.feature_extractor.extract_all_features(data) # Handle missing values features = features.ffill().fillna(0) # Scale features features_scaled = self.scaler.transform(features) # Make predictions predictions = self.model.predict(features_scaled) prediction_proba = self.model.predict_proba(features_scaled) # Convert to signals predictions_series = pd.Series(predictions, index=features.index) # Entry signals (buy predictions with high confidence) entry_signals = (predictions_series == 2) & ( pd.Series(prediction_proba[:, 2], index=features.index) > 0.6 ) # Exit signals (sell predictions or low confidence holds) exit_signals = (predictions_series == 0) | ( (predictions_series == 1) & (pd.Series(prediction_proba[:, 1], index=features.index) < 0.4) ) return entry_signals, exit_signals except Exception as e: logger.error(f"Error making predictions: {e}") raise def get_feature_importance(self) -> dict[str, float]: """Get feature importance from trained model. Returns: Dictionary of feature importance scores """ if not self.is_trained or not hasattr(self.model, "feature_importances_"): return {} feature_names = self.feature_extractor.extract_all_features( pd.DataFrame() # Empty DataFrame to get column names ).columns return dict(zip(feature_names, self.model.feature_importances_, strict=False)) def update_model( self, data: DataFrame, target_periods: int = 5, return_threshold: float = 0.02 ) -> dict[str, Any]: """Update model with new data (online learning simulation). Args: data: New OHLCV price data target_periods: Periods to look forward for target return_threshold: Return threshold for classification Returns: Update metrics """ try: # For now, retrain the model with all data # In production, this could use partial_fit for online learning return self.train(data, target_periods, return_threshold) except Exception as e: logger.error(f"Error updating model: {e}") raise

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wshobson/maverick-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server