# models/hybrid_model.py - Completely fixed version
import numpy as np
import pandas as pd
from xgboost import XGBRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from typing import Tuple, Dict, Any, Optional
import warnings
warnings.filterwarnings('ignore')
def train_xgboost_on_residuals(residuals: pd.Series, df: pd.DataFrame) -> Tuple[XGBRegressor, pd.DataFrame]:
"""
Train XGBoost model on ARIMA residuals - fixed version.
Args:
residuals: ARIMA model residuals
df: Original DataFrame with price data
Returns:
Tuple of (trained_model, last_features)
"""
try:
# Create enhanced features with proper error handling
df_features = df.copy()
# Lag features (always safe)
for i in range(1, 6): # 5 lags
df_features[f'Lag_{i}'] = df_features['Close'].shift(i)
# Technical indicators (safe)
df_features['SMA_5'] = df_features['Close'].rolling(5).mean()
df_features['SMA_10'] = df_features['Close'].rolling(10).mean()
df_features['Price_Change'] = df_features['Close'].pct_change()
df_features['Volatility'] = df_features['Price_Change'].rolling(5).std()
# Volume features with comprehensive error handling
volume_features_added = False
if 'Volume' in df_features.columns:
try:
# Check if we have actual volume data
volume_series = df_features['Volume']
if len(volume_series.dropna()) > 10: # Need at least 10 non-null values
# Create volume moving average
volume_ma = volume_series.rolling(5).mean()
# Create ratio safely
# Replace any zeros or very small values to avoid division issues
volume_ma_safe = volume_ma.where(volume_ma > 0.001, 1.0)
volume_ratio = volume_series / volume_ma_safe
# Handle infinities and NaNs
volume_ratio = volume_ratio.replace([np.inf, -np.inf], 1.0)
volume_ratio = volume_ratio.fillna(1.0)
# Assign to dataframe
df_features['Volume_MA'] = volume_ma
df_features['Volume_Ratio'] = volume_ratio
volume_features_added = True
print(" ✅ Volume features added successfully")
except Exception as e:
print(f" ⚠️ Skipping volume features: {e}")
# Remove NaN values
df_features = df_features.dropna()
# Align residuals with features
min_len = min(len(df_features), len(residuals))
df_features = df_features.tail(min_len)
residuals_aligned = residuals.tail(min_len)
# Prepare feature matrix - be explicit about which columns to include
feature_cols = []
# Always include lag features
for i in range(1, 6):
if f'Lag_{i}' in df_features.columns:
feature_cols.append(f'Lag_{i}')
# Always include technical indicators
for col in ['SMA_5', 'SMA_10', 'Price_Change', 'Volatility']:
if col in df_features.columns:
feature_cols.append(col)
# Include volume features only if successfully created
if volume_features_added:
if 'Volume_Ratio' in df_features.columns:
feature_cols.append('Volume_Ratio')
X = df_features[feature_cols]
y = residuals_aligned
if len(X) < 10:
raise ValueError("Insufficient data for training")
print(f" ✅ Using {len(feature_cols)} features: {feature_cols}")
# Split data maintaining temporal order
split_idx = int(len(X) * 0.8)
X_train, X_test = X.iloc[:split_idx], X.iloc[split_idx:]
y_train, y_test = y.iloc[:split_idx], y.iloc[split_idx:]
# Train XGBoost model
model = XGBRegressor(
n_estimators=150,
max_depth=4,
learning_rate=0.1,
subsample=0.8,
colsample_bytree=0.8,
random_state=42,
verbosity=0 # Suppress XGBoost warnings
)
model.fit(X_train, y_train)
# Evaluate model
if len(X_test) > 0:
y_pred = model.predict(X_test)
mse = mean_squared_error(y_test, y_pred)
mae = mean_absolute_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)
print(f"✅ Enhanced XGBoost Performance:")
print(f" MSE: {mse:.6f}")
print(f" MAE: {mae:.6f}")
print(f" R²: {r2:.4f}")
print(f" Features: {len(feature_cols)}")
return model, X.iloc[[-1]]
except Exception as e:
print(f"❌ Enhanced XGBoost failed: {e}")
print(" 🔄 Falling back to simple XGBoost...")
# Fallback to simple approach
df_simple = df.copy()
df_simple['Lag1'] = df_simple['Close'].shift(1)
df_simple['Lag2'] = df_simple['Close'].shift(2)
df_simple['Lag3'] = df_simple['Close'].shift(3)
df_simple = df_simple.dropna()
X = df_simple[['Lag1', 'Lag2', 'Lag3']]
y = residuals[-len(X):]
if len(X) < 4:
raise ValueError("Insufficient data even for fallback model")
X_train, X_test, y_train, y_test = train_test_split(X, y, shuffle=False, test_size=0.2)
model = XGBRegressor(n_estimators=100, max_depth=3, random_state=42, verbosity=0)
model.fit(X_train, y_train)
if len(y_test) > 0:
y_pred = model.predict(X_test)
mse = mean_squared_error(y_test, y_pred)
print(f"✅ Fallback XGBoost MSE: {mse:.4f}")
return model, X.iloc[[-1]]
class HybridARIMAXGBoost:
"""
Enhanced hybrid model with comprehensive error handling.
"""
def __init__(self, n_lags: int = 5):
self.n_lags = n_lags
self.model = None
self.feature_names = []
def create_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""Create features with robust error handling."""
df_features = df.copy()
# Lag features (always safe)
for i in range(1, self.n_lags + 1):
df_features[f'Lag_{i}'] = df_features['Close'].shift(i)
# Technical indicators
df_features['SMA_5'] = df_features['Close'].rolling(5).mean()
df_features['SMA_10'] = df_features['Close'].rolling(10).mean()
df_features['SMA_20'] = df_features['Close'].rolling(20).mean()
# Price change and volatility
df_features['Price_Change'] = df_features['Close'].pct_change()
df_features['Volatility_5'] = df_features['Price_Change'].rolling(5).std()
df_features['Volatility_10'] = df_features['Price_Change'].rolling(10).std()
# RSI-like indicator
try:
delta = df_features['Close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
# Avoid division by zero
rs = gain / loss.where(loss != 0, 0.0001)
df_features['RSI'] = 100 - (100 / (1 + rs))
df_features['RSI'] = df_features['RSI'].fillna(50) # Neutral RSI for NaN
except Exception:
# Skip RSI if calculation fails
pass
# Volume features with comprehensive safety
if 'Volume' in df_features.columns:
try:
volume_col = df_features['Volume']
if volume_col.notna().sum() > 10: # At least 10 non-null values
volume_ma = volume_col.rolling(5).mean()
# Safe volume ratio calculation
volume_ma_safe = volume_ma.where(volume_ma > 0, 1.0)
volume_ratio = volume_col / volume_ma_safe
volume_ratio = volume_ratio.replace([np.inf, -np.inf], 1.0)
volume_ratio = volume_ratio.fillna(1.0)
df_features['Volume_MA'] = volume_ma
df_features['Volume_Ratio'] = volume_ratio
except Exception:
# Skip volume features if any error
pass
return df_features
def train_model(self, residuals: pd.Series, df: pd.DataFrame, verbose: bool = True) -> Dict[str, Any]:
"""Train the hybrid model with comprehensive error handling."""
try:
# Create features
df_features = self.create_features(df)
df_features = df_features.dropna()
# Align data
min_len = min(len(df_features), len(residuals))
df_features = df_features.tail(min_len)
residuals_aligned = residuals.tail(min_len)
# Select features intelligently
exclude_cols = {'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close'}
available_cols = [col for col in df_features.columns if col not in exclude_cols]
# Always include lag features
feature_cols = [col for col in available_cols if col.startswith('Lag_')]
# Add other features if they exist
other_features = ['SMA_5', 'SMA_10', 'SMA_20', 'Price_Change',
'Volatility_5', 'Volatility_10', 'RSI', 'Volume_Ratio']
for feature in other_features:
if feature in available_cols:
feature_cols.append(feature)
X = df_features[feature_cols]
y = residuals_aligned
self.feature_names = feature_cols
if verbose:
print(f" ✅ Selected {len(feature_cols)} features for training")
# Split data
split_idx = int(len(X) * 0.8)
X_train, X_test = X.iloc[:split_idx], X.iloc[split_idx:]
y_train, y_test = y.iloc[:split_idx], y.iloc[split_idx:]
# Train model
self.model = XGBRegressor(
n_estimators=200,
max_depth=4,
learning_rate=0.1,
subsample=0.8,
colsample_bytree=0.8,
random_state=42,
verbosity=0
)
self.model.fit(X_train, y_train)
# Evaluate
y_train_pred = self.model.predict(X_train)
train_metrics = {
'mse': mean_squared_error(y_train, y_train_pred),
'mae': mean_absolute_error(y_train, y_train_pred),
'r2': r2_score(y_train, y_train_pred)
}
test_metrics = {}
if len(X_test) > 0:
y_test_pred = self.model.predict(X_test)
test_metrics = {
'mse': mean_squared_error(y_test, y_test_pred),
'mae': mean_absolute_error(y_test, y_test_pred),
'r2': r2_score(y_test, y_test_pred)
}
if verbose:
print(f"✅ Enhanced XGBoost Training Complete:")
print(f" Features: {len(feature_cols)}")
print(f" Train R²: {train_metrics['r2']:.4f}")
if test_metrics:
print(f" Test R²: {test_metrics['r2']:.4f}")
# Feature importance
feature_importance = pd.DataFrame({
'feature': feature_cols,
'importance': self.model.feature_importances_
}).sort_values('importance', ascending=False)
return {
'model': self.model,
'last_features': X.iloc[[-1]],
'train_metrics': train_metrics,
'test_metrics': test_metrics,
'feature_importance': feature_importance
}
except Exception as e:
if verbose:
print(f"❌ Enhanced training failed: {e}")
raise
def predict_residual(self, last_features: pd.DataFrame) -> float:
"""Predict residual for next period."""
if self.model is None:
raise ValueError("Model not trained")
return float(self.model.predict(last_features)[0])
def train_xgboost_on_residuals_enhanced(residuals: pd.Series,
df: pd.DataFrame,
n_lags: int = 5,
**kwargs) -> Tuple[HybridARIMAXGBoost, pd.DataFrame, Dict[str, Any]]:
"""
Enhanced version with comprehensive error handling.
"""
try:
hybrid_model = HybridARIMAXGBoost(n_lags=n_lags)
results = hybrid_model.train_model(residuals, df)
return hybrid_model, results['last_features'], results
except Exception as e:
print(f"❌ Enhanced XGBoost failed: {e}")
print(" 🔄 Falling back to basic version...")
# Fallback to basic version
model, last_features = train_xgboost_on_residuals(residuals, df)
return model, last_features, {'method': 'fallback'}