# models/arima_garch_model.py - FIXED VERSION
import pandas as pd
import numpy as np
import yfinance as yf
import json
import warnings
from datetime import datetime, timedelta
from pathlib import Path
from typing import Tuple, Dict, Any, Optional
import itertools
warnings.filterwarnings('ignore')
# Try to import ARCH models with fallback
try:
from arch import arch_model
from arch.unitroot import ADF
ARCH_AVAILABLE = True
except ImportError:
print("⚠️ ARCH library not available. Install with: pip install arch")
ARCH_AVAILABLE = False
try:
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.stats.diagnostic import acorr_ljungbox
STATSMODELS_AVAILABLE = True
except ImportError:
print("⚠️ Statsmodels not available. Install with: pip install statsmodels")
STATSMODELS_AVAILABLE = False
def check_dependencies():
"""Check if all required dependencies are available"""
return ARCH_AVAILABLE and STATSMODELS_AVAILABLE
class ARIMAGARCHModelManager:
"""Fixed ARIMA-GARCH model with proper ticker-specific caching"""
def __init__(self, cache_dir: str = "cache/garch_params"):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
def _get_cache_file_path(self, ticker: str) -> Path:
"""Get ticker-specific cache file path"""
return self.cache_dir / f"{ticker}_garch_params.json"
def _load_cached_parameters(self, ticker: str) -> Optional[Dict[str, Any]]:
"""Load cached parameters ONLY for the specific ticker"""
cache_file = self._get_cache_file_path(ticker)
try:
if cache_file.exists():
with open(cache_file, 'r') as f:
cache_data = json.load(f)
# Validate cache is recent (within 30 days)
cache_date = datetime.fromisoformat(cache_data['timestamp'])
if (datetime.now() - cache_date).days < 30:
print(f"✅ {ticker}: Using cached ARIMA-GARCH parameters")
print(f" ARIMA: {cache_data['arima_params']}, GARCH: {cache_data['garch_params']}")
return cache_data
else:
print(f"⏰ {ticker}: GARCH cache expired ({(datetime.now() - cache_date).days} days old)")
return None
else:
print(f"📝 {ticker}: No GARCH cache file found")
return None
except Exception as e:
print(f"⚠️ {ticker}: GARCH cache read error: {e}")
return None
def _save_optimized_parameters(self, ticker: str, arima_params: Tuple[int, int, int],
garch_params: Dict[str, Any], model_metrics: Dict[str, float]) -> None:
"""Save ticker-specific optimized parameters"""
cache_file = self._get_cache_file_path(ticker)
cache_data = {
'ticker': ticker,
'arima_params': arima_params,
'garch_params': garch_params,
'arima_aic': model_metrics.get('arima_aic', None),
'garch_aic': model_metrics.get('garch_aic', None),
'timestamp': datetime.now().isoformat(),
'data_period': '4y',
'optimization_date': datetime.now().strftime('%Y-%m-%d')
}
try:
with open(cache_file, 'w') as f:
json.dump(cache_data, f, indent=2)
print(f"💾 {ticker}: Saved optimized ARIMA-GARCH parameters")
except Exception as e:
print(f"❌ {ticker}: Failed to save GARCH cache: {e}")
def _download_stock_data(self, ticker: str, period: str = "4y") -> pd.DataFrame:
"""Download 4 years of stock data"""
print(f"📥 {ticker}: Downloading {period} of data for ARIMA-GARCH...")
try:
data = yf.download(ticker, period=period, progress=False)
if data.empty:
raise ValueError(f"No data available for {ticker}")
print(f"✅ {ticker}: Downloaded {len(data)} data points ({data.index[0].date()} to {data.index[-1].date()})")
return data
except Exception as e:
print(f"❌ {ticker}: Data download failed: {e}")
raise
def _optimize_arima_parameters(self, returns: pd.Series, ticker: str) -> Tuple[Tuple[int, int, int], float]:
"""Find optimal ARIMA parameters for returns series"""
print(f"🔍 {ticker}: Optimizing ARIMA parameters for GARCH residuals...")
# Parameter ranges - smaller for GARCH residuals
p_values = range(0, 3) # AR terms
d_values = [0, 1] # Usually don't need much differencing for returns
q_values = range(0, 3) # MA terms
best_aic = float('inf')
best_params = None
tested_combinations = 0
for p, d, q in itertools.product(p_values, d_values, q_values):
tested_combinations += 1
try:
# Fit ARIMA model to returns
model = ARIMA(returns, order=(p, d, q))
fitted_model = model.fit()
aic = fitted_model.aic
# Check if this is the best model so far
if aic < best_aic:
best_aic = aic
best_params = (p, d, q)
except Exception:
# Skip problematic parameter combinations
continue
if best_params is None:
# Fallback to simple ARIMA(1,0,1) for returns
print(f"⚠️ {ticker}: ARIMA optimization failed, using fallback (1,0,1)")
best_params = (1, 0, 1)
model = ARIMA(returns, order=best_params)
fitted_model = model.fit()
best_aic = fitted_model.aic
print(f"✅ {ticker}: Optimal ARIMA{best_params} for mean equation (AIC: {best_aic:.2f})")
return best_params, best_aic
def _optimize_garch_parameters(self, arima_residuals: pd.Series, ticker: str) -> Tuple[Dict[str, Any], float]:
"""Find optimal GARCH parameters"""
print(f"🔍 {ticker}: Optimizing GARCH parameters...")
# Test different GARCH specifications
garch_specs = [
{'vol': 'GARCH', 'p': 1, 'q': 1}, # Standard GARCH(1,1)
{'vol': 'GARCH', 'p': 1, 'q': 2}, # GARCH(1,2)
{'vol': 'GARCH', 'p': 2, 'q': 1}, # GARCH(2,1)
{'vol': 'EGARCH', 'p': 1, 'q': 1}, # EGARCH(1,1) for asymmetry
]
best_aic = float('inf')
best_params = None
best_model = None
for spec in garch_specs:
try:
# Fit GARCH model to ARIMA residuals
garch_model = arch_model(arima_residuals * 100, # Scale up for numerical stability
vol=spec['vol'],
p=spec['p'],
q=spec['q'])
fitted_garch = garch_model.fit(disp='off', show_warning=False)
aic = fitted_garch.aic
if aic < best_aic:
best_aic = aic
best_params = spec
best_model = fitted_garch
except Exception as e:
# Skip problematic specifications
continue
if best_params is None:
# Fallback to simple GARCH(1,1)
print(f"⚠️ {ticker}: GARCH optimization failed, using fallback GARCH(1,1)")
best_params = {'vol': 'GARCH', 'p': 1, 'q': 1}
garch_model = arch_model(arima_residuals * 100, vol='GARCH', p=1, q=1)
best_model = garch_model.fit(disp='off', show_warning=False)
best_aic = best_model.aic
print(f"✅ {ticker}: Optimal {best_params['vol']}({best_params['p']},{best_params['q']}) (AIC: {best_aic:.2f})")
return best_params, best_aic
def get_arima_garch_forecast(self, ticker: str, use_optimized_params: bool = True,
force_recalculate: bool = False) -> Tuple[float, Dict[str, Any], pd.DataFrame]:
"""
Get ARIMA-GARCH forecast with proper ticker-specific caching
Returns:
forecast_price: Next day's predicted price
forecast_details: Dictionary with forecast details and diagnostics
data: Historical price data used
"""
print(f"\n🔧 {ticker}: Starting ARIMA-GARCH analysis...")
# Download fresh data
data = self._download_stock_data(ticker)
close_prices = data['Close'].dropna()
returns = close_prices.pct_change().dropna()
# Determine optimal parameters
if use_optimized_params and not force_recalculate:
# Try to load cached parameters for THIS specific ticker
cached_data = self._load_cached_parameters(ticker)
if cached_data:
arima_params = tuple(cached_data['arima_params'])
garch_params = cached_data['garch_params']
print(f"📈 {ticker}: Using cached ARIMA-GARCH parameters")
else:
# No cache for this ticker, optimize
print(f"🔄 {ticker}: No cached GARCH parameters found, optimizing...")
arima_params, arima_aic = self._optimize_arima_parameters(returns, ticker)
# Fit ARIMA to get residuals for GARCH
arima_model = ARIMA(returns, order=arima_params)
fitted_arima = arima_model.fit()
arima_residuals = fitted_arima.resid
garch_params, garch_aic = self._optimize_garch_parameters(arima_residuals, ticker)
metrics = {'arima_aic': arima_aic, 'garch_aic': garch_aic}
self._save_optimized_parameters(ticker, arima_params, garch_params, metrics)
else:
# Force fresh optimization
if force_recalculate:
print(f"🔄 {ticker}: Force recalculating ARIMA-GARCH parameters...")
else:
print(f"🔄 {ticker}: Optimizing ARIMA-GARCH parameters...")
arima_params, arima_aic = self._optimize_arima_parameters(returns, ticker)
# Fit ARIMA to get residuals for GARCH
arima_model = ARIMA(returns, order=arima_params)
fitted_arima = arima_model.fit()
arima_residuals = fitted_arima.resid
garch_params, garch_aic = self._optimize_garch_parameters(arima_residuals, ticker)
metrics = {'arima_aic': arima_aic, 'garch_aic': garch_aic}
self._save_optimized_parameters(ticker, arima_params, garch_params, metrics)
# Fit final ARIMA-GARCH model with optimal parameters
print(f"🎯 {ticker}: Fitting final ARIMA{arima_params}-{garch_params['vol']}({garch_params['p']},{garch_params['q']}) model...")
try:
# Step 1: Fit ARIMA model to returns
arima_model = ARIMA(returns, order=arima_params)
fitted_arima = arima_model.fit()
# Step 2: Fit GARCH model to ARIMA residuals
arima_residuals = fitted_arima.resid
garch_model = arch_model(arima_residuals * 100,
vol=garch_params['vol'],
p=garch_params['p'],
q=garch_params['q'])
fitted_garch = garch_model.fit(disp='off', show_warning=False)
# Step 3: Generate forecasts
# ARIMA forecast for mean
arima_forecast = fitted_arima.forecast(steps=1)
mean_forecast = float(arima_forecast.iloc[0])
# GARCH forecast for volatility
garch_forecast = fitted_garch.forecast(horizon=1)
volatility_forecast = float(np.sqrt(garch_forecast.variance.iloc[-1, 0])) / 100 # Scale back
# Convert return forecast to price forecast
last_price = float(close_prices.iloc[-1])
forecast_price = last_price * (1 + mean_forecast)
# Model diagnostics
expected_change = (forecast_price - last_price) / last_price
# Prepare detailed results
forecast_details = {
'forecast_result': {
'price_forecast': forecast_price,
'return_forecast': mean_forecast,
'volatility_forecast': volatility_forecast * 100, # As percentage
'confidence_interval_95': {
'lower': forecast_price * (1 - 1.96 * volatility_forecast),
'upper': forecast_price * (1 + 1.96 * volatility_forecast)
}
},
'model_specification': {
'arima_order': arima_params,
'garch_type': garch_params['vol'],
'garch_order': (garch_params['p'], garch_params['q'])
},
'diagnostics': {
'arima_aic': fitted_arima.aic,
'garch_aic': fitted_garch.aic,
'residual_std': float(arima_residuals.std()),
'residual_mean': float(arima_residuals.mean()),
'data_points': len(returns)
},
'residuals': arima_residuals # For XGBoost enhancement
}
print(f"✅ {ticker}: ARIMA-GARCH forecast complete")
print(f" Last price: ${last_price:.2f}")
print(f" Forecast: ${forecast_price:.2f} ({expected_change:+.2%})")
print(f" Volatility: {volatility_forecast*100:.2f}%")
print(f" Model: ARIMA{arima_params}-{garch_params['vol']}({garch_params['p']},{garch_params['q']})")
return forecast_price, forecast_details, data
except Exception as e:
print(f"❌ {ticker}: ARIMA-GARCH model fitting failed: {e}")
raise
# Global instance for backward compatibility
garch_manager = ARIMAGARCHModelManager()
def get_arima_garch_forecast(ticker: str, use_optimized_params: bool = True,
force_recalculate: bool = False) -> Tuple[float, Dict[str, Any], pd.DataFrame]:
"""
FIXED: Get ARIMA-GARCH forecast with proper ticker-specific caching
This function now ensures:
1. Each ticker has its own GARCH parameter cache
2. Parameters are never shared between tickers
3. Fresh 4-year data is always downloaded
4. Optimization results are ticker-specific
"""
if not check_dependencies():
raise ImportError("ARIMA-GARCH requires 'arch' and 'statsmodels' packages")
return garch_manager.get_arima_garch_forecast(ticker, use_optimized_params, force_recalculate)
def get_enhanced_arima_garch_forecast(ticker: str) -> Tuple[float, Dict[str, Any], pd.DataFrame]:
"""Enhanced ARIMA-GARCH forecast with forced parameter optimization"""
return get_arima_garch_forecast(ticker, use_optimized_params=False, force_recalculate=True)
def clear_cache_for_ticker(ticker: str) -> bool:
"""Clear cached GARCH parameters for specific ticker"""
cache_file = garch_manager._get_cache_file_path(ticker)
try:
if cache_file.exists():
cache_file.unlink()
print(f"🗑️ {ticker}: GARCH cache cleared")
return True
else:
print(f"ℹ️ {ticker}: No GARCH cache to clear")
return False
except Exception as e:
print(f"❌ {ticker}: Failed to clear GARCH cache: {e}")
return False
def clear_all_cache() -> int:
"""Clear all cached GARCH parameters"""
cache_dir = garch_manager.cache_dir
cleared_count = 0
try:
for cache_file in cache_dir.glob("*_garch_params.json"):
cache_file.unlink()
cleared_count += 1
print(f"🗑️ Cleared {cleared_count} GARCH cache files")
return cleared_count
except Exception as e:
print(f"❌ Failed to clear GARCH cache: {e}")
return 0