# models/hierarchical_ensemble_model.py - Sequential ARIMA-GARCH+XGBoost Ensemble
import numpy as np
import pandas as pd
import yfinance as yf
from typing import Tuple, Optional, Dict, Any
import warnings
warnings.filterwarnings('ignore')
# Import existing models
try:
from models.arima_garch_model import get_arima_garch_forecast, check_dependencies as check_garch_deps
ARIMA_GARCH_AVAILABLE = True
except ImportError:
print("⚠️ ARIMA-GARCH model not available")
ARIMA_GARCH_AVAILABLE = False
try:
import xgboost as xgb
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import mean_squared_error, mean_absolute_error
XGBOOST_AVAILABLE = True
except ImportError:
print("⚠️ XGBoost not available")
XGBOOST_AVAILABLE = False
class HierarchicalEnsembleForecaster:
"""
Hierarchical ARIMA-GARCH+XGBoost ensemble model.
Stage 1: ARIMA-GARCH models time series and volatility
Stage 2: XGBoost enhances predictions using ARIMA-GARCH outputs + market features
"""
def __init__(self, ticker: str, lookback_window: int = 20):
self.ticker = ticker
self.lookback_window = lookback_window
self.data = None
self.arima_garch_model = None
self.xgb_model = None
self.scaler = StandardScaler()
self.is_fitted = False
# XGBoost parameters optimized for financial time series
self.xgb_params = {
'objective': 'reg:squarederror',
'max_depth': 6,
'learning_rate': 0.1,
'n_estimators': 200,
'subsample': 0.8,
'colsample_bytree': 0.8,
'random_state': 42,
'verbosity': 0
}
def download_data(self, period: str = "2y") -> pd.DataFrame:
"""Download stock data for training."""
try:
self.data = yf.download(self.ticker, period=period, progress=False)
self.data = self.data.dropna()
if len(self.data) < 100:
raise ValueError(f"Insufficient data for {self.ticker}: {len(self.data)} points")
print(f"📊 Downloaded {len(self.data)} data points for {self.ticker}")
return self.data
except Exception as e:
print(f"❌ Error downloading data for {self.ticker}: {e}")
raise
def _create_arima_garch_features(self, window_data: pd.DataFrame) -> Dict[str, float]:
"""
Extract features from ARIMA-GARCH model for a given window.
"""
try:
# Get ARIMA-GARCH forecast for this window
forecast_price, forecast_details, _ = get_arima_garch_forecast(self.ticker)
# Extract ARIMA-GARCH specific features
forecast_result = forecast_details.get('forecast_result', {})
diagnostics = forecast_details.get('diagnostics', {})
# Core ARIMA-GARCH features
features = {
'arima_prediction': forecast_result.get('mean_forecast', 0.0),
'garch_volatility': forecast_result.get('volatility_forecast', 1.0),
'arima_aic': diagnostics.get('arima_aic', 0.0),
'garch_aic': diagnostics.get('garch_aic', 0.0),
'residual_mean': diagnostics.get('residual_mean', 0.0),
'residual_std': diagnostics.get('residual_std', 1.0)
}
return features
except Exception as e:
# Fallback features if ARIMA-GARCH fails
returns = window_data['Close'].pct_change().dropna()
return {
'arima_prediction': float(returns.mean() * 100),
'garch_volatility': float(returns.std() * 100),
'arima_aic': 0.0,
'garch_aic': 0.0,
'residual_mean': 0.0,
'residual_std': float(returns.std())
}
def _create_market_features(self, window_data: pd.DataFrame) -> Dict[str, float]:
"""
Create market-based features for XGBoost enhancement.
"""
try:
prices = window_data['Close']
returns = prices.pct_change().dropna()
volume = window_data.get('Volume', pd.Series([1] * len(prices), index=prices.index))
features = {}
# Technical indicators
features['sma_5'] = float(prices.rolling(5).mean().iloc[-1])
features['sma_20'] = float(prices.rolling(20).mean().iloc[-1])
features['price_momentum'] = float((prices.iloc[-1] / prices.iloc[-5] - 1) * 100) if len(prices) >= 5 else 0.0
# Volatility features
features['realized_vol_5d'] = float(returns.rolling(5).std().iloc[-1] * np.sqrt(252) * 100) if len(returns) >= 5 else 1.0
features['realized_vol_20d'] = float(returns.rolling(20).std().iloc[-1] * np.sqrt(252) * 100) if len(returns) >= 20 else 1.0
features['vol_ratio'] = features['realized_vol_5d'] / max(features['realized_vol_20d'], 0.01)
# Return patterns
features['return_skew'] = float(returns.rolling(20).skew().iloc[-1]) if len(returns) >= 20 else 0.0
features['return_kurt'] = float(returns.rolling(20).kurt().iloc[-1]) if len(returns) >= 20 else 0.0
features['max_drawdown'] = float((prices / prices.rolling(20).max()).iloc[-1] - 1) * 100 if len(prices) >= 20 else 0.0
# Trend features
features['rsi'] = self._calculate_rsi(prices)
features['bollinger_position'] = self._calculate_bollinger_position(prices)
# Volume features
if len(volume.dropna()) > 10:
features['volume_sma_ratio'] = float(volume.iloc[-1] / max(volume.rolling(20).mean().iloc[-1], 1))
features['price_volume_corr'] = float(np.corrcoef(prices[-10:], volume[-10:])[0,1]) if len(prices) >= 10 else 0.0
else:
features['volume_sma_ratio'] = 1.0
features['price_volume_corr'] = 0.0
# Fill NaN values with neutral defaults
for key, value in features.items():
if pd.isna(value) or np.isinf(value):
if 'vol' in key or 'std' in key:
features[key] = 1.0
else:
features[key] = 0.0
return features
except Exception as e:
print(f"⚠️ Market feature creation failed: {e}")
# Return neutral features
return {
'sma_5': 0.0, 'sma_20': 0.0, 'price_momentum': 0.0,
'realized_vol_5d': 1.0, 'realized_vol_20d': 1.0, 'vol_ratio': 1.0,
'return_skew': 0.0, 'return_kurt': 0.0, 'max_drawdown': 0.0,
'rsi': 50.0, 'bollinger_position': 0.5,
'volume_sma_ratio': 1.0, 'price_volume_corr': 0.0
}
def _calculate_rsi(self, prices: pd.Series, period: int = 14) -> float:
"""Calculate RSI indicator."""
try:
delta = prices.diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
return float(rsi.iloc[-1]) if not pd.isna(rsi.iloc[-1]) else 50.0
except:
return 50.0
def _calculate_bollinger_position(self, prices: pd.Series, period: int = 20) -> float:
"""Calculate position relative to Bollinger Bands."""
try:
sma = prices.rolling(period).mean()
std = prices.rolling(period).std()
bb_upper = sma + (2 * std)
bb_lower = sma - (2 * std)
current_price = prices.iloc[-1]
bb_position = (current_price - bb_lower.iloc[-1]) / (bb_upper.iloc[-1] - bb_lower.iloc[-1])
return float(bb_position) if not pd.isna(bb_position) else 0.5
except:
return 0.5
def _prepare_training_data(self) -> Tuple[pd.DataFrame, pd.Series]:
"""
Prepare training data using sliding window approach.
"""
print("🔧 Preparing hierarchical ensemble training data...")
X_features = []
y_targets = []
# Use sliding window to create training samples
for i in range(self.lookback_window, len(self.data) - 1):
try:
# Get data window
window_data = self.data.iloc[i-self.lookback_window:i+1]
# IMPORTANT: For each window, we need to simulate ARIMA-GARCH
# This is computationally expensive, so we'll use a simplified approach
# Get ARIMA-GARCH features (simplified)
arima_garch_features = self._create_arima_garch_features(window_data)
# Get market features
market_features = self._create_market_features(window_data)
# Combine all features
combined_features = {**arima_garch_features, **market_features}
# Create feature vector
feature_vector = [
combined_features['arima_prediction'],
combined_features['garch_volatility'],
combined_features['price_momentum'],
combined_features['realized_vol_5d'],
combined_features['vol_ratio'],
combined_features['return_skew'],
combined_features['rsi'],
combined_features['bollinger_position'],
combined_features['volume_sma_ratio']
]
X_features.append(feature_vector)
# Target: next day return (%)
current_price = self.data['Close'].iloc[i]
next_price = self.data['Close'].iloc[i+1]
next_return = (next_price - current_price) / current_price * 100
y_targets.append(next_return)
except Exception as e:
# Skip this sample if feature creation fails
continue
# Create DataFrames
feature_names = [
'arima_prediction', 'garch_volatility', 'price_momentum',
'realized_vol_5d', 'vol_ratio', 'return_skew', 'rsi',
'bollinger_position', 'volume_sma_ratio'
]
X_df = pd.DataFrame(X_features, columns=feature_names)
y_series = pd.Series(y_targets)
print(f"✅ Created {len(X_df)} training samples with {len(feature_names)} features")
return X_df, y_series
def fit(self, period: str = "2y") -> Dict[str, Any]:
"""
Fit the hierarchical ensemble model.
"""
print(f"🧠 Fitting Hierarchical ARIMA-GARCH+XGBoost Ensemble for {self.ticker}")
print("=" * 70)
# Download data
if self.data is None:
self.download_data(period)
# Prepare training data
X_train, y_train = self._prepare_training_data()
if len(X_train) < 10:
raise ValueError("Insufficient training data")
# Scale features
X_train_scaled = self.scaler.fit_transform(X_train)
# Time series cross-validation
print("🔧 Training XGBoost ensemble with time series validation...")
tscv = TimeSeriesSplit(n_splits=3)
best_score = float('inf')
for train_idx, val_idx in tscv.split(X_train_scaled):
X_tr, X_val = X_train_scaled[train_idx], X_train_scaled[val_idx]
y_tr, y_val = y_train.iloc[train_idx], y_train.iloc[val_idx]
# Fit XGBoost
xgb_model = xgb.XGBRegressor(**self.xgb_params)
xgb_model.fit(X_tr, y_tr, eval_set=[(X_val, y_val)], verbose=False)
# Evaluate
val_pred = xgb_model.predict(X_val)
val_score = mean_squared_error(y_val, val_pred)
if val_score < best_score:
best_score = val_score
self.xgb_model = xgb_model
self.is_fitted = True
# Calculate feature importance
feature_importance = dict(zip(X_train.columns, self.xgb_model.feature_importances_))
print(f"✅ Hierarchical ensemble fitted with validation MSE: {best_score:.6f}")
print("🎯 Top feature importance:")
for feature, importance in sorted(feature_importance.items(), key=lambda x: x[1], reverse=True)[:5]:
print(f" {feature}: {importance:.3f}")
print("=" * 70)
return {
'validation_mse': best_score,
'feature_importance': feature_importance,
'training_samples': len(X_train)
}
def predict(self, forecast_horizon: int = 1) -> Dict[str, Any]:
"""
Generate hierarchical ensemble prediction.
"""
if not self.is_fitted:
raise ValueError("Model must be fitted before prediction")
try:
# Stage 1: Get ARIMA-GARCH base prediction
print(f"🔧 Stage 1: ARIMA-GARCH base forecast for {self.ticker}...")
forecast_price, forecast_details, _ = get_arima_garch_forecast(self.ticker)
# Extract ARIMA-GARCH features
arima_garch_features = self._create_arima_garch_features(self.data)
# Stage 2: Create market features
market_features = self._create_market_features(self.data)
# Stage 3: Combine features for XGBoost
combined_features = {**arima_garch_features, **market_features}
feature_vector = np.array([
combined_features['arima_prediction'],
combined_features['garch_volatility'],
combined_features['price_momentum'],
combined_features['realized_vol_5d'],
combined_features['vol_ratio'],
combined_features['return_skew'],
combined_features['rsi'],
combined_features['bollinger_position'],
combined_features['volume_sma_ratio']
]).reshape(1, -1)
# Scale features
feature_vector_scaled = self.scaler.transform(feature_vector)
# Stage 4: XGBoost ensemble enhancement
print(f"📈 Stage 2: XGBoost enhancement...")
ensemble_return_prediction = self.xgb_model.predict(feature_vector_scaled)[0]
# Convert to price prediction
last_price = float(self.data['Close'].iloc[-1])
ensemble_price_prediction = last_price * (1 + ensemble_return_prediction / 100)
# Calculate ensemble metrics
base_prediction = forecast_price
enhancement = ensemble_price_prediction - base_prediction
# Generate trading signal
volatility = combined_features['garch_volatility']
if ensemble_return_prediction > 1.5 * volatility / 100:
signal = "BUY"
confidence = min(abs(ensemble_return_prediction) / (volatility / 100), 1.0)
elif ensemble_return_prediction < -1.5 * volatility / 100:
signal = "SELL"
confidence = min(abs(ensemble_return_prediction) / (volatility / 100), 1.0)
else:
signal = "HOLD"
confidence = 1.0 - min(abs(ensemble_return_prediction) / (volatility / 100), 1.0)
# Volatility regime
if volatility > 3.0:
volatility_regime = 'EXTREME'
elif volatility > 2.0:
volatility_regime = 'HIGH'
else:
volatility_regime = 'MODERATE'
result = {
'ensemble_prediction': ensemble_price_prediction,
'ensemble_return': ensemble_return_prediction / 100,
'base_arima_garch': base_prediction,
'xgboost_enhancement': enhancement,
'garch_volatility': volatility / 100,
'signal': signal,
'confidence': confidence,
'volatility_regime': volatility_regime,
'last_price': last_price,
'model_type': 'ARIMA-GARCH+XGBoost Hierarchical'
}
print(f"✅ Hierarchical ensemble prediction complete:")
print(f" Base (ARIMA-GARCH): ${base_prediction:.2f}")
print(f" Enhanced (Ensemble): ${ensemble_price_prediction:.2f}")
print(f" XGBoost Enhancement: ${enhancement:+.2f}")
print(f" Signal: {signal} (Confidence: {confidence:.1%})")
return result
except Exception as e:
print(f"❌ Hierarchical ensemble prediction failed: {e}")
raise
def get_hierarchical_ensemble_forecast(ticker: str, period: str = "2y") -> Tuple[float, Dict[str, Any], pd.DataFrame]:
"""
Simplified interface for hierarchical ensemble forecasting.
Args:
ticker: Stock ticker symbol
period: Data period to fetch
Returns:
Tuple of (forecast_price, forecast_details, dataframe)
"""
try:
# Check dependencies
if not ARIMA_GARCH_AVAILABLE:
raise ImportError("ARIMA-GARCH model not available")
if not XGBOOST_AVAILABLE:
raise ImportError("XGBoost not available")
if not check_garch_deps():
raise ImportError("ARIMA-GARCH dependencies not available")
# Create and fit ensemble model
ensemble = HierarchicalEnsembleForecaster(ticker)
ensemble.download_data(period)
# Fit the model
training_results = ensemble.fit(period)
# Generate prediction
prediction_results = ensemble.predict()
forecast_price = prediction_results['ensemble_prediction']
forecast_details = {
'model_type': 'Hierarchical ARIMA-GARCH+XGBoost',
'training_results': training_results,
'prediction_results': prediction_results,
'ensemble_components': {
'arima_garch_base': prediction_results['base_arima_garch'],
'xgboost_enhancement': prediction_results['xgboost_enhancement']
}
}
return forecast_price, forecast_details, ensemble.data
except Exception as e:
print(f"❌ Hierarchical ensemble forecast failed for {ticker}: {e}")
raise
def check_hierarchical_dependencies() -> bool:
"""Check if all dependencies for hierarchical ensemble are available."""
dependencies_ok = True
if not ARIMA_GARCH_AVAILABLE:
print("❌ ARIMA-GARCH model not available")
dependencies_ok = False
if not XGBOOST_AVAILABLE:
print("❌ XGBoost not available")
dependencies_ok = False
if ARIMA_GARCH_AVAILABLE and not check_garch_deps():
print("❌ ARIMA-GARCH dependencies (arch, statsmodels) not available")
dependencies_ok = False
if dependencies_ok:
print("✅ All hierarchical ensemble dependencies available")
return dependencies_ok
if __name__ == "__main__":
# Test the hierarchical ensemble model
check_hierarchical_dependencies()
test_ticker = "AAPL"
try:
print(f"\n🧪 Testing Hierarchical Ensemble with {test_ticker}")
ensemble = HierarchicalEnsembleForecaster(test_ticker)
ensemble.download_data()
training_results = ensemble.fit()
prediction_results = ensemble.predict()
print(f"\n📊 Results for {test_ticker}:")
print(f" Model: {prediction_results['model_type']}")
print(f" Current Price: ${prediction_results['last_price']:.2f}")
print(f" Ensemble Forecast: ${prediction_results['ensemble_prediction']:.2f}")
print(f" Expected Return: {prediction_results['ensemble_return']:+.2%}")
print(f" Signal: {prediction_results['signal']}")
print(f" Confidence: {prediction_results['confidence']:.1%}")
print(f" Volatility Regime: {prediction_results['volatility_regime']}")
except Exception as e:
print(f"❌ Test failed: {e}")