preprocess_arima_params.py•30.9 kB
# preprocess_arima_params.py - ARIMA Parameter Optimization for Portfolio Stocks
import yfinance as yf
import pandas as pd
import numpy as np
import json
import warnings
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Tuple, Optional
from collections import defaultdict
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
try:
from pmdarima import auto_arima
PMDARIMA_AVAILABLE = True
except ImportError:
PMDARIMA_AVAILABLE = False
print("⚠️ pmdarima not available, using grid search instead")
from sklearn.metrics import mean_squared_error, mean_absolute_error
from statsmodels.tsa.arima.model import ARIMA
warnings.filterwarnings('ignore')
class ARIMAParameterOptimizer:
"""
Optimize ARIMA parameters for each stock using rolling window validation.
"""
def __init__(self,
rolling_window: int = 750,
forecast_horizon: int = 5,
n_iterations: int = 50,
data_period: str = "4y",
results_dir: str = "arima_optimization"):
self.rolling_window = rolling_window
self.forecast_horizon = forecast_horizon
self.n_iterations = n_iterations
self.data_period = data_period
self.results_dir = Path(results_dir)
self.results_dir.mkdir(exist_ok=True)
# Setup logging
self.setup_logging()
# ARIMA parameter search space
self.p_range = range(0, 4) # AR terms
self.d_range = range(0, 3) # Differencing
self.q_range = range(0, 4) # MA terms
print(f"🔧 ARIMA Parameter Optimizer Initialized")
print(f" Rolling Window: {self.rolling_window} days")
print(f" Forecast Horizon: {self.forecast_horizon} days")
print(f" Iterations: {self.n_iterations}")
print(f" Data Period: {self.data_period}")
print(f" pmdarima Available: {PMDARIMA_AVAILABLE}")
def setup_logging(self):
"""Setup logging configuration."""
log_file = self.results_dir / "arima_optimization.log"
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def download_stock_data(self, ticker: str) -> Optional[pd.Series]:
"""Download and prepare stock data."""
try:
self.logger.info(f"Downloading {self.data_period} data for {ticker}")
data = yf.download(ticker, period=self.data_period, progress=False)
if data.empty:
self.logger.error(f"No data found for {ticker}")
return None
close_prices = data['Close'].dropna()
if len(close_prices) < self.rolling_window + self.forecast_horizon:
self.logger.error(f"Insufficient data for {ticker}: {len(close_prices)} points")
return None
self.logger.info(f"Downloaded {len(close_prices)} data points for {ticker}")
return close_prices.reset_index(drop=True)
except Exception as e:
self.logger.error(f"Error downloading data for {ticker}: {e}")
return None
def optimize_with_pmdarima(self, close_prices: pd.Series, ticker: str) -> Dict:
"""Optimize ARIMA parameters using pmdarima auto_arima."""
if not PMDARIMA_AVAILABLE:
return self.optimize_with_grid_search(close_prices, ticker)
self.logger.info(f"Starting pmdarima optimization for {ticker}")
errors = []
order_counts = defaultdict(int)
max_iterations = min(self.n_iterations, len(close_prices) - self.rolling_window - self.forecast_horizon)
for i in range(max_iterations):
try:
# Rolling window data
train = close_prices[i:i + self.rolling_window]
test = close_prices[i + self.rolling_window:i + self.rolling_window + self.forecast_horizon]
if len(test) < self.forecast_horizon:
break
# Auto ARIMA model selection
model = auto_arima(
train,
start_p=0, start_q=0,
max_p=3, max_q=3,
d=None, # Let auto_arima determine differencing
seasonal=False,
stepwise=True,
suppress_warnings=True,
error_action='ignore',
trace=False
)
# Forecast and evaluate
forecast = model.predict(n_periods=self.forecast_horizon)
mse = mean_squared_error(test, forecast)
mae = mean_absolute_error(test, forecast)
order = model.order
errors.append({
'order': order,
'mse': mse,
'rmse': np.sqrt(mse),
'mae': mae,
'iteration': i
})
order_counts[order] += 1
if i % 10 == 0:
self.logger.info(f"{ticker} - Iteration {i}: ARIMA{order}, RMSE={np.sqrt(mse):.4f}")
except Exception as e:
self.logger.warning(f"{ticker} - Iteration {i} failed: {e}")
continue
return self._analyze_results(errors, order_counts, ticker, "pmdarima")
def optimize_with_grid_search(self, close_prices: pd.Series, ticker: str) -> Dict:
"""Optimize ARIMA parameters using grid search."""
self.logger.info(f"Starting grid search optimization for {ticker}")
# Generate parameter combinations
param_combinations = [
(p, d, q) for p in self.p_range for d in self.d_range for q in self.q_range
]
order_performance = defaultdict(list)
iteration_count = 0
max_iterations = min(self.n_iterations, len(close_prices) - self.rolling_window - self.forecast_horizon)
for i in range(0, max_iterations, max(1, max_iterations // 20)): # Sample iterations
train = close_prices[i:i + self.rolling_window]
test = close_prices[i + self.rolling_window:i + self.rolling_window + self.forecast_horizon]
if len(test) < self.forecast_horizon:
break
for order in param_combinations:
try:
model = ARIMA(train, order=order)
fitted_model = model.fit()
forecast = fitted_model.forecast(steps=self.forecast_horizon)
mse = mean_squared_error(test, forecast)
order_performance[order].append({
'mse': mse,
'rmse': np.sqrt(mse),
'mae': mean_absolute_error(test, forecast),
'aic': fitted_model.aic,
'iteration': iteration_count
})
except Exception:
continue
iteration_count += 1
if iteration_count % 5 == 0:
self.logger.info(f"{ticker} - Completed {iteration_count} grid search iterations")
# Analyze grid search results
best_order = None
best_avg_rmse = float('inf')
order_stats = {}
for order, performances in order_performance.items():
if len(performances) >= 3: # Require at least 3 successful evaluations
avg_rmse = np.mean([p['rmse'] for p in performances])
avg_aic = np.mean([p['aic'] for p in performances])
order_stats[order] = {
'avg_rmse': avg_rmse,
'avg_aic': avg_aic,
'count': len(performances),
'std_rmse': np.std([p['rmse'] for p in performances])
}
if avg_rmse < best_avg_rmse:
best_avg_rmse = avg_rmse
best_order = order
if best_order is None:
# Fallback to (1,1,1)
best_order = (1, 1, 1)
self.logger.warning(f"{ticker} - Grid search failed, using fallback ARIMA{best_order}")
return {
'ticker': ticker,
'method': 'grid_search',
'best_order': best_order,
'best_avg_rmse': order_stats.get(best_order, {}).get('avg_rmse', 0),
'total_iterations': iteration_count,
'order_stats': order_stats,
'optimization_date': datetime.now().isoformat()
}
def _analyze_results(self, errors: List[Dict], order_counts: Dict, ticker: str, method: str) -> Dict:
"""Analyze optimization results and determine best parameters."""
if not errors:
self.logger.warning(f"{ticker} - No successful iterations, using fallback ARIMA(1,1,1)")
return {
'ticker': ticker,
'method': method,
'best_order': (1, 1, 1),
'best_avg_rmse': 0,
'total_iterations': 0,
'order_frequency': {},
'optimization_date': datetime.now().isoformat()
}
# Group errors by order
rmse_by_order = defaultdict(list)
for error in errors:
rmse_by_order[error['order']].append(error['rmse'])
# Calculate average RMSE for each order
avg_rmse_by_order = {
order: np.mean(rmses) for order, rmses in rmse_by_order.items()
}
# Find best order
best_order = min(avg_rmse_by_order, key=avg_rmse_by_order.get)
best_avg_rmse = avg_rmse_by_order[best_order]
# Calculate order frequency (how often each order was selected)
total_iterations = len(errors)
order_frequency = {order: count/total_iterations for order, count in order_counts.items()}
self.logger.info(f"{ticker} - Best ARIMA{best_order}, Avg RMSE: {best_avg_rmse:.4f}")
self.logger.info(f"{ticker} - Order frequency: {dict(sorted(order_frequency.items(), key=lambda x: x[1], reverse=True)[:3])}")
return {
'ticker': ticker,
'method': method,
'best_order': best_order,
'best_avg_rmse': best_avg_rmse,
'total_iterations': total_iterations,
'order_frequency': order_frequency,
'avg_rmse_by_order': avg_rmse_by_order,
'optimization_date': datetime.now().isoformat()
}
def optimize_single_stock(self, ticker: str) -> Optional[Dict]:
"""Optimize ARIMA parameters for a single stock."""
start_time = time.time()
self.logger.info(f"🔧 Starting optimization for {ticker}")
# Download data
close_prices = self.download_stock_data(ticker)
if close_prices is None:
return None
# Optimize parameters
if PMDARIMA_AVAILABLE:
result = self.optimize_with_pmdarima(close_prices, ticker)
else:
result = self.optimize_with_grid_search(close_prices, ticker)
# Add timing information
optimization_time = time.time() - start_time
result['optimization_time_seconds'] = round(optimization_time, 2)
self.logger.info(f"✅ {ticker} optimization completed in {optimization_time:.1f}s - Best: ARIMA{result['best_order']}")
return result
def optimize_portfolio(self, tickers: List[str], max_workers: int = 4) -> Dict[str, Dict]:
"""Optimize ARIMA parameters for a list of stocks."""
self.logger.info(f"🚀 Starting portfolio optimization for {len(tickers)} stocks")
results = {}
if max_workers == 1:
# Sequential processing
for i, ticker in enumerate(tickers, 1):
print(f"[{i:3d}/{len(tickers)}] Optimizing {ticker}...")
result = self.optimize_single_stock(ticker)
if result:
results[ticker] = result
else:
# Parallel processing
print(f"🔄 Using {max_workers} workers for parallel optimization")
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all tasks
future_to_ticker = {
executor.submit(self.optimize_single_stock, ticker): ticker
for ticker in tickers
}
# Collect results as they complete
completed = 0
for future in as_completed(future_to_ticker):
ticker = future_to_ticker[future]
completed += 1
try:
result = future.result()
if result:
results[ticker] = result
print(f"[{completed:3d}/{len(tickers)}] ✅ {ticker}: ARIMA{result['best_order']} (RMSE: {result['best_avg_rmse']:.4f})")
else:
print(f"[{completed:3d}/{len(tickers)}] ❌ {ticker}: Optimization failed")
except Exception as e:
print(f"[{completed:3d}/{len(tickers)}] ❌ {ticker}: {e}")
self.logger.info(f"Portfolio optimization completed: {len(results)}/{len(tickers)} successful")
return results
def save_optimization_results(self, results: Dict[str, Dict], filename: str = None) -> str:
"""Save optimization results to JSON file."""
if filename is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"arima_optimization_results_{timestamp}.json"
filepath = self.results_dir / filename
# Convert tuples to lists for JSON serialization
json_safe_results = {}
for ticker, result in results.items():
json_safe_result = result.copy()
# Convert tuple keys and values to JSON-safe format
if 'best_order' in json_safe_result and isinstance(json_safe_result['best_order'], tuple):
json_safe_result['best_order'] = list(json_safe_result['best_order'])
# Handle order_frequency dict with tuple keys
if 'order_frequency' in json_safe_result:
order_freq_safe = {}
for order, freq in json_safe_result['order_frequency'].items():
if isinstance(order, tuple):
order_freq_safe[str(order)] = freq
else:
order_freq_safe[order] = freq
json_safe_result['order_frequency'] = order_freq_safe
# Handle avg_rmse_by_order dict with tuple keys
if 'avg_rmse_by_order' in json_safe_result:
rmse_safe = {}
for order, rmse in json_safe_result['avg_rmse_by_order'].items():
if isinstance(order, tuple):
rmse_safe[str(order)] = rmse
else:
rmse_safe[order] = rmse
json_safe_result['avg_rmse_by_order'] = rmse_safe
# Handle order_stats dict with tuple keys (from grid search)
if 'order_stats' in json_safe_result:
stats_safe = {}
for order, stats in json_safe_result['order_stats'].items():
if isinstance(order, tuple):
stats_safe[str(order)] = stats
else:
stats_safe[order] = stats
json_safe_result['order_stats'] = stats_safe
json_safe_results[ticker] = json_safe_result
with open(filepath, 'w') as f:
json.dump(json_safe_results, f, indent=2)
self.logger.info(f"📁 Results saved to {filepath}")
return str(filepath)
def load_optimization_results(self, filename: str) -> Dict[str, Dict]:
"""Load optimization results from JSON file."""
filepath = self.results_dir / filename
with open(filepath, 'r') as f:
results = json.load(f)
# Convert string representations back to tuples for best_order
for ticker, result in results.items():
if 'best_order' in result and isinstance(result['best_order'], list):
result['best_order'] = tuple(result['best_order'])
# Convert string keys back to tuples for order_frequency
if 'order_frequency' in result:
order_freq_restored = {}
for order_str, freq in result['order_frequency'].items():
try:
# Try to evaluate string representation of tuple
if order_str.startswith('(') and order_str.endswith(')'):
order_tuple = eval(order_str)
order_freq_restored[order_tuple] = freq
else:
order_freq_restored[order_str] = freq
except:
order_freq_restored[order_str] = freq
result['order_frequency'] = order_freq_restored
# Convert string keys back to tuples for avg_rmse_by_order
if 'avg_rmse_by_order' in result:
rmse_restored = {}
for order_str, rmse in result['avg_rmse_by_order'].items():
try:
if order_str.startswith('(') and order_str.endswith(')'):
order_tuple = eval(order_str)
rmse_restored[order_tuple] = rmse
else:
rmse_restored[order_str] = rmse
except:
rmse_restored[order_str] = rmse
result['avg_rmse_by_order'] = rmse_restored
# Convert string keys back to tuples for order_stats
if 'order_stats' in result:
stats_restored = {}
for order_str, stats in result['order_stats'].items():
try:
if order_str.startswith('(') and order_str.endswith(')'):
order_tuple = eval(order_str)
stats_restored[order_tuple] = stats
else:
stats_restored[order_str] = stats
except:
stats_restored[order_str] = stats
result['order_stats'] = stats_restored
return results
def print_optimization_summary(self, results: Dict[str, Dict]):
"""Print a summary of optimization results."""
if not results:
print("❌ No optimization results to display")
return
print(f"\n📊 ARIMA OPTIMIZATION SUMMARY")
print("=" * 70)
print(f"{'Ticker':<8} {'Best Order':<12} {'Avg RMSE':<12} {'Iterations':<12} {'Time (s)':<10}")
print("-" * 70)
total_time = 0
order_counts = defaultdict(int)
for ticker, result in sorted(results.items()):
best_order = result['best_order']
avg_rmse = result.get('best_avg_rmse', 0)
iterations = result.get('total_iterations', 0)
opt_time = result.get('optimization_time_seconds', 0)
print(f"{ticker:<8} {str(best_order):<12} {avg_rmse:<12.4f} {iterations:<12} {opt_time:<10.1f}")
total_time += opt_time
order_counts[best_order] += 1
print("-" * 70)
print(f"📈 Total Optimization Time: {total_time:.1f} seconds")
print(f"📊 Successfully Optimized: {len(results)} stocks")
print(f"\n🏆 Most Common ARIMA Orders:")
for order, count in sorted(order_counts.items(), key=lambda x: x[1], reverse=True)[:5]:
percentage = (count / len(results)) * 100
print(f" ARIMA{order}: {count} stocks ({percentage:.1f}%)")
# Performance insights
rmse_values = [r.get('best_avg_rmse', 0) for r in results.values() if r.get('best_avg_rmse', 0) > 0]
if rmse_values:
avg_rmse = np.mean(rmse_values)
median_rmse = np.median(rmse_values)
print(f"\n📊 RMSE Statistics:")
print(f" Average RMSE: {avg_rmse:.4f}")
print(f" Median RMSE: {median_rmse:.4f}")
print(f" Best RMSE: {min(rmse_values):.4f}")
print(f" Worst RMSE: {max(rmse_values):.4f}")
def load_portfolio_config(config_path: str = "config/trading_config.json") -> Dict:
"""Load portfolio configuration."""
try:
with open(config_path, 'r') as f:
return json.load(f)
except Exception as e:
print(f"❌ Error loading config: {e}")
return {"tickers": {"default": ["AAPL", "MSFT", "GOOGL"]}}
def get_all_unique_tickers(config: Dict) -> List[str]:
"""Get all unique tickers from portfolio configuration."""
all_tickers = set()
for portfolio_name, tickers in config.get("tickers", {}).items():
if isinstance(tickers, list):
all_tickers.update(tickers)
# Remove metadata entries
all_tickers = {ticker for ticker in all_tickers if not str(ticker).startswith('//')}
return sorted(list(all_tickers))
def show_available_portfolios(config: Dict) -> None:
"""Display available portfolios from config."""
print("📊 AVAILABLE PORTFOLIOS")
print("=" * 50)
portfolios = config.get("tickers", {})
if not portfolios:
print("❌ No portfolios found in config")
return
total_unique_stocks = len(get_all_unique_tickers(config))
for i, (portfolio_name, tickers) in enumerate(portfolios.items(), 1):
if isinstance(tickers, list):
print(f" {i:2d}. {portfolio_name:<20} ({len(tickers):2d} stocks)")
# Show first few tickers as preview
preview = tickers[:3]
if len(tickers) > 3:
preview_str = f"{', '.join(preview)}, ... (+{len(tickers)-3} more)"
else:
preview_str = ', '.join(preview)
print(f" {preview_str}")
else:
print(f" {i:2d}. {portfolio_name:<20} (invalid format)")
print(f"\n🌐 Total unique stocks across all portfolios: {total_unique_stocks}")
print(f"\n💡 Usage examples:")
print(f" python preprocess_arima_params.py --portfolio tech_giants")
print(f" python preprocess_arima_params.py --all")
print(f" python preprocess_arima_params.py --tickers AAPL MSFT GOOGL")
def main():
"""Main function for ARIMA parameter optimization."""
import argparse
parser = argparse.ArgumentParser(
description="ARIMA Parameter Optimization for Portfolio Stocks",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python preprocess_arima_params.py # Show available portfolios
python preprocess_arima_params.py --portfolio tech_giants # Optimize specific portfolio
python preprocess_arima_params.py --all # Optimize ALL stocks
python preprocess_arima_params.py --tickers AAPL MSFT GOOGL # Optimize specific stocks
python preprocess_arima_params.py --portfolio large_cap --workers 2 # Custom settings
python preprocess_arima_params.py --load results_file.json # Load previous results
"""
)
parser.add_argument('--portfolio', type=str,
help='Specific portfolio name from config (e.g., tech_giants, large_cap)')
parser.add_argument('--all', action='store_true',
help='Optimize ALL stocks across all portfolios')
parser.add_argument('--tickers', nargs='+',
help='Specific tickers to optimize (space-separated)')
parser.add_argument('--window', type=int, default=750,
help='Rolling window size in days (default: 750)')
parser.add_argument('--horizon', type=int, default=5,
help='Forecast horizon in days (default: 5)')
parser.add_argument('--iterations', type=int, default=50,
help='Number of optimization iterations (default: 50)')
parser.add_argument('--workers', type=int, default=4,
help='Number of parallel workers (default: 4)')
parser.add_argument('--load', type=str,
help='Load and display previous results from JSON file')
parser.add_argument('--list-portfolios', action='store_true',
help='List available portfolios and exit')
args = parser.parse_args()
# Load configuration
config = load_portfolio_config()
# Handle list portfolios option
if args.list_portfolios:
show_available_portfolios(config)
return
# Initialize optimizer
optimizer = ARIMAParameterOptimizer(
rolling_window=args.window,
forecast_horizon=args.horizon,
n_iterations=args.iterations
)
if args.load:
# Load and display previous results
try:
results = optimizer.load_optimization_results(args.load)
optimizer.print_optimization_summary(results)
except Exception as e:
print(f"❌ Error loading results: {e}")
return
# Determine tickers to optimize
tickers = []
optimization_description = ""
if args.tickers:
# Specific tickers provided
tickers = args.tickers
optimization_description = f"specified tickers: {', '.join(tickers)}"
elif args.all:
# All stocks across all portfolios
tickers = get_all_unique_tickers(config)
optimization_description = f"ALL stocks across all portfolios ({len(tickers)} unique stocks)"
elif args.portfolio:
# Specific portfolio
portfolio_tickers = config.get("tickers", {}).get(args.portfolio, [])
if not portfolio_tickers:
print(f"❌ Portfolio '{args.portfolio}' not found in config")
print(f"\n📊 Available portfolios:")
for name in config.get("tickers", {}).keys():
print(f" - {name}")
return
tickers = portfolio_tickers
optimization_description = f"'{args.portfolio}' portfolio ({len(tickers)} stocks)"
else:
# No arguments provided - show help
print("🔧 ARIMA PARAMETER OPTIMIZATION")
print("=" * 50)
print("No optimization target specified.\n")
show_available_portfolios(config)
print(f"\n📖 Usage:")
print(f" --portfolio PORTFOLIO_NAME : Optimize specific portfolio")
print(f" --all : Optimize all stocks")
print(f" --tickers TICKER1 TICKER2 : Optimize specific stocks")
print(f" --list-portfolios : Show available portfolios")
print(f" --load FILENAME.json : Load previous results")
return
if not tickers:
print("❌ No tickers to optimize")
return
# Validate tickers are not empty
tickers = [t for t in tickers if t and not str(t).startswith('//')]
if not tickers:
print("❌ No valid tickers found after filtering")
return
# Show optimization plan
print(f"🔧 ARIMA PARAMETER OPTIMIZATION")
print("=" * 60)
print(f"📊 Target: {optimization_description}")
print(f"🔧 Settings:")
print(f" Rolling Window: {args.window} days")
print(f" Forecast Horizon: {args.horizon} days")
print(f" Iterations: {args.iterations}")
print(f" Parallel Workers: {args.workers}")
print(f" Stocks to optimize: {len(tickers)}")
if len(tickers) <= 10:
print(f" Tickers: {', '.join(tickers)}")
else:
print(f" First 10 tickers: {', '.join(tickers[:10])}, ... (+{len(tickers)-10} more)")
# Estimate time
estimated_time_per_stock = 30 # seconds
estimated_total_time = (len(tickers) * estimated_time_per_stock) / args.workers
print(f"🕐 Estimated time: {estimated_total_time/60:.1f} minutes")
# Confirm for large optimizations
if len(tickers) > 20:
response = input(f"\n⚠️ You're about to optimize {len(tickers)} stocks. Continue? (y/N): ")
if response.lower() not in ['y', 'yes']:
print("❌ Optimization cancelled")
return
print(f"\n🚀 Starting optimization...")
# Run optimization
start_time = time.time()
results = optimizer.optimize_portfolio(tickers, max_workers=args.workers)
total_time = time.time() - start_time
print(f"\n🏁 Optimization completed in {total_time:.1f} seconds")
# Save and display results
if results:
# Create descriptive filename
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
if args.portfolio:
filename_base = f"arima_optimization_{args.portfolio}_{timestamp}.json"
elif args.all:
filename_base = f"arima_optimization_all_stocks_{timestamp}.json"
else:
filename_base = f"arima_optimization_custom_{timestamp}.json"
filename = optimizer.save_optimization_results(results, filename_base)
optimizer.print_optimization_summary(results)
print(f"\n💾 Results saved to: {filename}")
print(f"🔧 Your enhanced arima_model.py will automatically load these parameters")
print(f"📊 Run your main trading system to use optimized parameters")
else:
print("❌ No successful optimizations")
if __name__ == "__main__":
main()