# models/arima_model.py - FIXED VERSION WITH WALK-FORWARD VALIDATION
#
# MAJOR IMPROVEMENT: Now uses proper walk-forward validation instead of simple AIC optimization
#
# OLD METHOD (Inferior):
# - Used all 4 years of data for training
# - Selected parameters based on AIC (model fit, not prediction accuracy)
# - No validation on unseen data
# - Risk of overfitting
#
# NEW METHOD (Industry Standard):
# - Downloads 4 years of data
# - Uses last 250 days for out-of-sample validation
# - For each parameter combination: runs 250 forecasting tests
# - Selects parameters with best actual prediction accuracy
# - Prevents overfitting, optimizes for real trading performance
import pandas as pd
import numpy as np
import yfinance as yf
import json
import warnings
from datetime import datetime, timedelta
from pathlib import Path
from typing import Tuple, Dict, Any, Optional
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.stattools import adfuller
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
import itertools
warnings.filterwarnings('ignore')
class ARIMAModelManager:
"""Fixed ARIMA model with proper ticker-specific caching"""
def __init__(self, cache_dir: str = "cache/arima_params"):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
# Debug: Print ARIMA implementation info
print(f"๐ ARIMA Debug Info:")
print(f" statsmodels version: {self._get_statsmodels_version()}")
print(f" ARIMA class: {ARIMA}")
def _get_statsmodels_version(self):
"""Get statsmodels version for debugging"""
try:
import statsmodels
return statsmodels.__version__
except:
return "unknown"
def test_arima_basic(self, ticker: str) -> bool:
"""Test basic ARIMA functionality for debugging"""
try:
print(f"๐งช Testing basic ARIMA functionality for {ticker}...")
# Download small amount of data for testing
import yfinance as yf
data = yf.download(ticker, period="1mo", progress=False)
if data.empty:
print(f" โ No data available for {ticker}")
return False
close_prices = data['Close'].dropna()
print(f" โ
Downloaded {len(close_prices)} data points")
# Test simple ARIMA(1,1,1)
model = ARIMA(close_prices, order=(1, 1, 1))
print(f" โ
ARIMA model created: {type(model)}")
# Test fitting with no parameters
fitted_model = model.fit()
print(f" โ
Model fitted successfully: {type(fitted_model)}")
# Test forecasting
forecast = fitted_model.forecast(steps=1)
print(f" โ
Forecast generated: {forecast}")
return True
except Exception as e:
print(f" โ ARIMA test failed: {e}")
return False
def _get_cache_file_path(self, ticker: str) -> Path:
"""Get ticker-specific cache file path"""
return self.cache_dir / f"{ticker}_arima_params.json"
def _load_cached_parameters(self, ticker: str) -> Optional[Dict[str, Any]]:
"""Load cached parameters ONLY for the specific ticker"""
cache_file = self._get_cache_file_path(ticker)
try:
if cache_file.exists():
with open(cache_file, 'r') as f:
cache_data = json.load(f)
# Validate cache is recent (within 30 days)
cache_date = datetime.fromisoformat(cache_data['timestamp'])
if (datetime.now() - cache_date).days < 30:
validation_method = cache_data.get('validation_method', 'unknown')
if validation_method == 'walk_forward_250_day':
mae = cache_data.get('walk_forward_mae', 'N/A')
success_rate = cache_data.get('success_rate', 'N/A')
print(f"โ
{ticker}: Using cached ARIMA parameters {cache_data['optimal_params']}")
print(f" Walk-forward validated: MAE=${mae}, Success rate={success_rate:.1%}" if success_rate != 'N/A' else f" Walk-forward validated: MAE=${mae}")
else:
print(f"โ
{ticker}: Using cached ARIMA parameters {cache_data['optimal_params']} (AIC-based)")
return cache_data
else:
print(f"โฐ {ticker}: Cache expired ({(datetime.now() - cache_date).days} days old)")
return None
else:
print(f"๐ {ticker}: No cache file found")
return None
except Exception as e:
print(f"โ ๏ธ {ticker}: Cache read error: {e}")
return None
def _save_optimized_parameters(self, ticker: str, optimal_params: Tuple[int, int, int],
model_metrics: Dict[str, float]) -> None:
"""Save ticker-specific optimized parameters"""
cache_file = self._get_cache_file_path(ticker)
cache_data = {
'ticker': ticker,
'optimal_params': optimal_params,
'validation_method': model_metrics.get('validation_method', 'unknown'),
'walk_forward_mae': model_metrics.get('walk_forward_mae', None),
'walk_forward_rmse': model_metrics.get('walk_forward_rmse', None),
'success_rate': model_metrics.get('success_rate', None),
'total_forecasting_tests': model_metrics.get('total_forecasting_tests', None),
'aic': model_metrics.get('aic', None),
'bic': model_metrics.get('bic', None),
'timestamp': datetime.now().isoformat(),
'data_period': '4y',
'optimization_date': datetime.now().strftime('%Y-%m-%d')
}
try:
with open(cache_file, 'w') as f:
json.dump(cache_data, f, indent=2)
print(f"๐พ {ticker}: Saved optimized parameters {optimal_params}")
except Exception as e:
print(f"โ {ticker}: Failed to save cache: {e}")
def _download_stock_data(self, ticker: str, period: str = "4y") -> pd.DataFrame:
"""Download 4 years of stock data"""
print(f"๐ฅ {ticker}: Downloading {period} of data...")
try:
data = yf.download(ticker, period=period, progress=False)
if data.empty:
raise ValueError(f"No data available for {ticker}")
print(f"โ
{ticker}: Downloaded {len(data)} data points ({data.index[0].date()} to {data.index[-1].date()})")
return data
except Exception as e:
print(f"โ {ticker}: Data download failed: {e}")
raise
def _optimize_arima_parameters(self, data: pd.DataFrame, ticker: str) -> Tuple[Tuple[int, int, int], Dict[str, float]]:
"""Find optimal ARIMA parameters using walk-forward validation (250 forecasting tests)"""
print(f"๐ {ticker}: Optimizing ARIMA parameters using walk-forward validation...")
close_prices = data['Close'].dropna()
test_days = 250 # Last 250 days for out-of-sample testing
if len(close_prices) < test_days + 100:
print(f"โ ๏ธ {ticker}: Not enough data for walk-forward validation, using simple optimization")
return self._optimize_arima_parameters_simple(data, ticker)
# Split data: use last 250 days for validation
train_end_idx = len(close_prices) - test_days
print(f"๐ {ticker}: Training data: {train_end_idx} days, Validation period: {test_days} days")
print(f" Training: {close_prices.index[0].date()} to {close_prices.index[train_end_idx-1].date()}")
print(f" Testing: {close_prices.index[train_end_idx].date()} to {close_prices.index[-1].date()}")
# Parameter ranges for testing
p_values = range(0, 4) # AR terms: 0,1,2,3
d_values = range(0, 3) # Differencing: 0,1,2
q_values = range(0, 4) # MA terms: 0,1,2,3
best_mae = float('inf')
best_params = None
optimization_results = []
total_combinations = len(p_values) * len(d_values) * len(q_values)
current_combination = 0
print(f"๐ฏ {ticker}: Testing {total_combinations} parameter combinations with {test_days} forecasts each...")
print(f" Each parameter combination runs {test_days} walk-forward tests:")
print(f" ๐
Day 1: Train on 753 days โ Predict day 754 โ Measure error")
print(f" ๐
Day 2: Train on 754 days โ Predict day 755 โ Measure error")
print(f" ๐
...")
print(f" ๐
Day 250: Train on 1002 days โ Predict day 1003 โ Measure error")
print(f" ๐ Best parameters = Lowest total prediction error across all 250 tests")
print(f" Note: Skipping invalid models like ARIMA(0,0,0)")
print("")
for p, d, q in itertools.product(p_values, d_values, q_values):
current_combination += 1
# Skip problematic parameter combinations
if p == 0 and d == 0 and q == 0: # ARIMA(0,0,0) is just white noise
print(f" [{current_combination:2d}/{total_combinations}] Skipping ARIMA({p},{d},{q}) - invalid model")
continue
try:
print(f" [{current_combination:2d}/{total_combinations}] Testing ARIMA({p},{d},{q}) - Walk-forward validation:")
print(f" Running {test_days} forecasting tests...", end="")
# Walk-forward validation: 250 forecasting tests
forecast_errors = []
successful_forecasts = 0
failed_forecasts = 0
first_error_logged = False
first_error_msg = ""
for test_day in range(test_days):
# Show progress every 50 days
if test_day > 0 and test_day % 50 == 0:
print(f" {test_day}/{test_days}", end="")
try:
# Training data: from start up to current validation day
train_end = train_end_idx + test_day
train_data = close_prices.iloc[:train_end]
# Actual price for the day we're trying to predict
actual_price = float(close_prices.iloc[train_end])
# Skip if we don't have enough training data
if len(train_data) < 10:
failed_forecasts += 1
continue
# Show detailed progress for first few parameter combinations
if current_combination <= 3 and test_day < 5:
print(f"\n Day {test_day+1}: Training on {len(train_data)} days, predicting ${actual_price:.2f}", end="")
# Fit ARIMA model on training data only
# Use basic fit() with no method parameter
model = ARIMA(train_data, order=(p, d, q))
# Debug: Show what fit methods are available for first combination
if current_combination == 2 and test_day == 0:
print(f"\n ๐ Debug: ARIMA.fit signature: {model.fit.__doc__[:200] if model.fit.__doc__ else 'No docstring'}")
fitted_model = model.fit()
# Forecast next day (the validation day)
forecast_result = fitted_model.forecast(steps=1)
# Handle different forecast result types
if hasattr(forecast_result, 'iloc'):
forecast_price = float(forecast_result.iloc[0])
elif isinstance(forecast_result, (list, np.ndarray)):
forecast_price = float(forecast_result[0])
else:
forecast_price = float(forecast_result)
# Show prediction for first few tests
if current_combination <= 3 and test_day < 5:
error = abs(forecast_price - actual_price)
print(f" โ ${forecast_price:.2f} (error: ${error:.2f})")
# Calculate prediction error
error = abs(forecast_price - actual_price)
forecast_errors.append(error)
successful_forecasts += 1
except Exception as e:
# Model failed for this day - skip it
failed_forecasts += 1
# Log first error for debugging
if not first_error_logged:
first_error_msg = str(e)[:100]
first_error_logged = True
continue
# Completed all 250 forecasting tests for this parameter combination
print(f" โ {test_days}/{test_days} complete")
# Require at least 80% successful forecasts
success_rate = successful_forecasts / test_days
if success_rate < 0.8:
if first_error_logged:
print(f" โ ARIMA({p},{d},{q}): Only {successful_forecasts}/{test_days} successful forecasts ({success_rate:.1%})")
print(f" Sample error: {first_error_msg}")
else:
print(f" โ ARIMA({p},{d},{q}): Only {successful_forecasts}/{test_days} successful forecasts ({success_rate:.1%})")
continue
# Calculate performance metrics
mae = np.mean(forecast_errors)
rmse = np.sqrt(np.mean([e**2 for e in forecast_errors]))
# Skip MAPE calculation to avoid division issues
# mape = np.mean([abs(e)/actual for e, actual in zip(forecast_errors, close_prices.iloc[train_end_idx:train_end_idx+len(forecast_errors)])])
result = {
'params': (p, d, q),
'mae': mae,
'rmse': rmse,
'successful_forecasts': successful_forecasts,
'success_rate': success_rate,
'forecast_errors': forecast_errors
}
optimization_results.append(result)
print(f" โ
ARIMA({p},{d},{q}): MAE=${mae:.2f}, RMSE=${rmse:.2f}, Success={successful_forecasts}/{test_days} ({success_rate:.1%})")
# Check if this is the best model so far
if mae < best_mae:
best_mae = mae
best_params = (p, d, q)
print(f" ๐ New best model! MAE=${mae:.2f}")
except Exception as e:
print(f" โ ARIMA({p},{d},{q}): Complete failure - {str(e)[:80]}...")
continue
if best_params is None:
print(f"โ ๏ธ {ticker}: All walk-forward tests failed, using fallback approach")
return self._optimize_arima_parameters_simple(data, ticker)
# Find the best result details
best_result = next(r for r in optimization_results if r['params'] == best_params)
metrics = {
'walk_forward_mae': best_mae,
'walk_forward_rmse': best_result['rmse'],
'successful_forecasts': best_result['successful_forecasts'],
'success_rate': best_result['success_rate'],
'test_days': test_days,
'total_combinations_tested': len(optimization_results),
'total_forecasting_tests': sum(r['successful_forecasts'] for r in optimization_results),
'validation_method': 'walk_forward_250_day'
}
print(f"โ
{ticker}: Walk-forward optimization complete!")
print(f" ๐ Best model: ARIMA{best_params}")
print(f" ๐ Out-of-sample MAE: ${best_mae:.2f}")
print(f" ๐ Out-of-sample RMSE: ${best_result['rmse']:.2f}")
print(f" ๐ฏ Success rate: {best_result['success_rate']:.1%}")
print(f" ๐ฌ Total forecasting tests completed: {metrics['total_forecasting_tests']:,}")
print(f" โฑ๏ธ Successfully tested {len(optimization_results)} parameter combinations")
print(f" ๐ฏ This model was optimized using {test_days} out-of-sample forecasting tests")
return best_params, metrics
def _optimize_arima_parameters_simple(self, data: pd.DataFrame, ticker: str) -> Tuple[Tuple[int, int, int], Dict[str, float]]:
"""Fallback: Simple AIC-based optimization (for when walk-forward fails)"""
print(f"๐ {ticker}: Using simple AIC-based optimization...")
close_prices = data['Close'].dropna()
# Parameter ranges
p_values = range(0, 4)
d_values = range(0, 3)
q_values = range(0, 4)
best_aic = float('inf')
best_params = None
for p, d, q in itertools.product(p_values, d_values, q_values):
try:
model = ARIMA(close_prices, order=(p, d, q))
fitted_model = model.fit() # Use basic fit() with no parameters
if fitted_model.aic < best_aic:
best_aic = fitted_model.aic
best_params = (p, d, q)
except Exception:
continue
if best_params is None:
best_params = (1, 1, 1)
best_aic = 0
metrics = {
'aic': best_aic,
'validation_method': 'simple_aic'
}
print(f"โ
{ticker}: Simple optimization complete - ARIMA{best_params} (AIC: {best_aic:.2f})")
return best_params, metrics
def get_arima_forecast(self, ticker: str, use_optimized_params: bool = True,
force_recalculate: bool = False) -> Tuple[float, np.ndarray, pd.DataFrame]:
"""
Get ARIMA forecast with proper ticker-specific caching
Returns:
forecast_price: Next day's predicted price
residuals: Model residuals for XGBoost enhancement
data: Historical price data used
"""
print(f"\n๐ {ticker}: Starting ARIMA analysis...")
# Download fresh data (always download to ensure we have latest data)
data = self._download_stock_data(ticker)
close_prices = data['Close'].dropna()
# Determine optimal parameters
if use_optimized_params and not force_recalculate:
# Try to load cached parameters for THIS specific ticker
cached_data = self._load_cached_parameters(ticker)
if cached_data:
optimal_params = tuple(cached_data['optimal_params'])
metrics = cached_data # Use cached metrics
print(f"๐ {ticker}: Using cached parameters ARIMA{optimal_params}")
else:
# No cache for this ticker, optimize
print(f"๐ {ticker}: No cached parameters found, optimizing...")
optimal_params, metrics = self._optimize_arima_parameters(data, ticker)
self._save_optimized_parameters(ticker, optimal_params, metrics)
else:
# Force fresh optimization
if force_recalculate:
print(f"๐ {ticker}: Force recalculating parameters...")
else:
print(f"๐ {ticker}: Optimizing parameters...")
optimal_params, metrics = self._optimize_arima_parameters(data, ticker)
self._save_optimized_parameters(ticker, optimal_params, metrics)
# Fit final model with optimal parameters
print(f"๐ฏ {ticker}: Fitting final ARIMA{optimal_params} model...")
try:
model = ARIMA(close_prices, order=optimal_params)
fitted_model = model.fit() # Use basic fit() with no parameters
# Generate forecast
forecast_result = fitted_model.forecast(steps=1)
forecast_price = float(forecast_result.iloc[0])
# Get residuals for XGBoost enhancement
residuals = fitted_model.resid
# Model diagnostics
last_price = float(close_prices.iloc[-1])
expected_change = (forecast_price - last_price) / last_price
print(f"โ
{ticker}: ARIMA forecast complete")
print(f" Last price: ${last_price:.2f}")
print(f" Forecast: ${forecast_price:.2f} ({expected_change:+.2%})")
# Show validation method in results
if 'walk_forward_mae' in metrics:
print(f" Model: ARIMA{optimal_params} (Walk-forward validated, MAE=${metrics['walk_forward_mae']:.2f})")
print(f" Validation: {metrics['total_forecasting_tests']:,} forecasting tests, {metrics['success_rate']:.1%} success rate")
else:
print(f" Model: ARIMA{optimal_params}, AIC: {fitted_model.aic:.2f}")
return forecast_price, residuals, data
except Exception as e:
print(f"โ {ticker}: ARIMA model fitting failed: {e}")
raise
# Global instance for backward compatibility
arima_manager = ARIMAModelManager()
# Legacy class for backward compatibility
class ARIMAForecaster:
"""Legacy class for backward compatibility"""
def __init__(self):
self.manager = arima_manager
def forecast(self, ticker: str, use_optimized_params: bool = True,
force_recalculate: bool = False) -> Tuple[float, np.ndarray, pd.DataFrame]:
"""Legacy forecast method"""
return self.manager.get_arima_forecast(ticker, use_optimized_params, force_recalculate)
def get_forecast(self, ticker: str) -> Tuple[float, np.ndarray, pd.DataFrame]:
"""Legacy get_forecast method"""
return self.manager.get_arima_forecast(ticker)
def get_arima_forecast(ticker: str, use_optimized_params: bool = True,
force_recalculate: bool = False) -> Tuple[float, np.ndarray, pd.DataFrame]:
"""
FIXED: Get ARIMA forecast with proper ticker-specific caching
This function now ensures:
1. Each ticker has its own parameter cache
2. Parameters are never shared between tickers
3. Fresh 4-year data is always downloaded
4. Optimization results are ticker-specific
"""
return arima_manager.get_arima_forecast(ticker, use_optimized_params, force_recalculate)
def get_enhanced_arima_forecast(ticker: str) -> Tuple[float, np.ndarray, pd.DataFrame]:
"""Enhanced ARIMA forecast with forced parameter optimization"""
return get_arima_forecast(ticker, use_optimized_params=False, force_recalculate=True)
def get_validation_statistics(ticker: str) -> Optional[Dict[str, Any]]:
"""Get walk-forward validation statistics for a ticker"""
cache_file = arima_manager._get_cache_file_path(ticker)
try:
if cache_file.exists():
with open(cache_file, 'r') as f:
cache_data = json.load(f)
if cache_data.get('validation_method') == 'walk_forward_250_day':
return {
'ticker': ticker,
'optimal_params': cache_data['optimal_params'],
'walk_forward_mae': cache_data.get('walk_forward_mae'),
'walk_forward_rmse': cache_data.get('walk_forward_rmse'),
'success_rate': cache_data.get('success_rate'),
'total_forecasting_tests': cache_data.get('total_forecasting_tests'),
'optimization_date': cache_data.get('optimization_date')
}
return None
except Exception:
return None
def compare_validation_methods(ticker: str) -> None:
"""Show the difference between old AIC method and new walk-forward method"""
stats = get_validation_statistics(ticker)
if stats:
print(f"\n๐ VALIDATION COMPARISON FOR {ticker}:")
print(f"๐ Walk-Forward Method (Current):")
print(f" Best Parameters: ARIMA{stats['optimal_params']}")
print(f" Out-of-sample MAE: ${stats['walk_forward_mae']:.2f}")
print(f" Success Rate: {stats['success_rate']:.1%}")
print(f" Total Forecasting Tests: {stats['total_forecasting_tests']:,}")
print(f" โ
Optimized for REAL prediction accuracy")
print(f"")
print(f"๐ฐ๏ธ Old AIC Method (Previous):")
print(f" Selected parameters based on model fit to historical data")
print(f" No validation on unseen data")
print(f" โ Risk of overfitting, not optimized for prediction")
else:
print(f"No walk-forward validation data available for {ticker}")
def test_arima_setup(ticker: str = "AAPL") -> bool:
"""Test ARIMA setup and configuration"""
return arima_manager.test_arima_basic(ticker)
def clear_cache_for_ticker(ticker: str) -> bool:
"""Clear cached parameters for specific ticker"""
cache_file = arima_manager._get_cache_file_path(ticker)
try:
if cache_file.exists():
cache_file.unlink()
print(f"๐๏ธ {ticker}: Cache cleared")
return True
else:
print(f"โน๏ธ {ticker}: No cache to clear")
return False
except Exception as e:
print(f"โ {ticker}: Failed to clear cache: {e}")
return False
def clear_all_cache() -> int:
"""Clear all cached parameters"""
cache_dir = arima_manager.cache_dir
cleared_count = 0
try:
for cache_file in cache_dir.glob("*_arima_params.json"):
cache_file.unlink()
cleared_count += 1
print(f"๐๏ธ Cleared {cleared_count} cache files")
return cleared_count
except Exception as e:
print(f"โ Failed to clear cache: {e}")
return 0