Skip to main content
Glama

MCP Hybrid Forecasting

by j1c4b
preprocess_arima_params.py30.9 kB
# preprocess_arima_params.py - ARIMA Parameter Optimization for Portfolio Stocks import yfinance as yf import pandas as pd import numpy as np import json import warnings from datetime import datetime, timedelta from pathlib import Path from typing import Dict, List, Tuple, Optional from collections import defaultdict import logging from concurrent.futures import ThreadPoolExecutor, as_completed import time try: from pmdarima import auto_arima PMDARIMA_AVAILABLE = True except ImportError: PMDARIMA_AVAILABLE = False print("⚠️ pmdarima not available, using grid search instead") from sklearn.metrics import mean_squared_error, mean_absolute_error from statsmodels.tsa.arima.model import ARIMA warnings.filterwarnings('ignore') class ARIMAParameterOptimizer: """ Optimize ARIMA parameters for each stock using rolling window validation. """ def __init__(self, rolling_window: int = 750, forecast_horizon: int = 5, n_iterations: int = 50, data_period: str = "4y", results_dir: str = "arima_optimization"): self.rolling_window = rolling_window self.forecast_horizon = forecast_horizon self.n_iterations = n_iterations self.data_period = data_period self.results_dir = Path(results_dir) self.results_dir.mkdir(exist_ok=True) # Setup logging self.setup_logging() # ARIMA parameter search space self.p_range = range(0, 4) # AR terms self.d_range = range(0, 3) # Differencing self.q_range = range(0, 4) # MA terms print(f"🔧 ARIMA Parameter Optimizer Initialized") print(f" Rolling Window: {self.rolling_window} days") print(f" Forecast Horizon: {self.forecast_horizon} days") print(f" Iterations: {self.n_iterations}") print(f" Data Period: {self.data_period}") print(f" pmdarima Available: {PMDARIMA_AVAILABLE}") def setup_logging(self): """Setup logging configuration.""" log_file = self.results_dir / "arima_optimization.log" logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_file), logging.StreamHandler() ] ) self.logger = logging.getLogger(__name__) def download_stock_data(self, ticker: str) -> Optional[pd.Series]: """Download and prepare stock data.""" try: self.logger.info(f"Downloading {self.data_period} data for {ticker}") data = yf.download(ticker, period=self.data_period, progress=False) if data.empty: self.logger.error(f"No data found for {ticker}") return None close_prices = data['Close'].dropna() if len(close_prices) < self.rolling_window + self.forecast_horizon: self.logger.error(f"Insufficient data for {ticker}: {len(close_prices)} points") return None self.logger.info(f"Downloaded {len(close_prices)} data points for {ticker}") return close_prices.reset_index(drop=True) except Exception as e: self.logger.error(f"Error downloading data for {ticker}: {e}") return None def optimize_with_pmdarima(self, close_prices: pd.Series, ticker: str) -> Dict: """Optimize ARIMA parameters using pmdarima auto_arima.""" if not PMDARIMA_AVAILABLE: return self.optimize_with_grid_search(close_prices, ticker) self.logger.info(f"Starting pmdarima optimization for {ticker}") errors = [] order_counts = defaultdict(int) max_iterations = min(self.n_iterations, len(close_prices) - self.rolling_window - self.forecast_horizon) for i in range(max_iterations): try: # Rolling window data train = close_prices[i:i + self.rolling_window] test = close_prices[i + self.rolling_window:i + self.rolling_window + self.forecast_horizon] if len(test) < self.forecast_horizon: break # Auto ARIMA model selection model = auto_arima( train, start_p=0, start_q=0, max_p=3, max_q=3, d=None, # Let auto_arima determine differencing seasonal=False, stepwise=True, suppress_warnings=True, error_action='ignore', trace=False ) # Forecast and evaluate forecast = model.predict(n_periods=self.forecast_horizon) mse = mean_squared_error(test, forecast) mae = mean_absolute_error(test, forecast) order = model.order errors.append({ 'order': order, 'mse': mse, 'rmse': np.sqrt(mse), 'mae': mae, 'iteration': i }) order_counts[order] += 1 if i % 10 == 0: self.logger.info(f"{ticker} - Iteration {i}: ARIMA{order}, RMSE={np.sqrt(mse):.4f}") except Exception as e: self.logger.warning(f"{ticker} - Iteration {i} failed: {e}") continue return self._analyze_results(errors, order_counts, ticker, "pmdarima") def optimize_with_grid_search(self, close_prices: pd.Series, ticker: str) -> Dict: """Optimize ARIMA parameters using grid search.""" self.logger.info(f"Starting grid search optimization for {ticker}") # Generate parameter combinations param_combinations = [ (p, d, q) for p in self.p_range for d in self.d_range for q in self.q_range ] order_performance = defaultdict(list) iteration_count = 0 max_iterations = min(self.n_iterations, len(close_prices) - self.rolling_window - self.forecast_horizon) for i in range(0, max_iterations, max(1, max_iterations // 20)): # Sample iterations train = close_prices[i:i + self.rolling_window] test = close_prices[i + self.rolling_window:i + self.rolling_window + self.forecast_horizon] if len(test) < self.forecast_horizon: break for order in param_combinations: try: model = ARIMA(train, order=order) fitted_model = model.fit() forecast = fitted_model.forecast(steps=self.forecast_horizon) mse = mean_squared_error(test, forecast) order_performance[order].append({ 'mse': mse, 'rmse': np.sqrt(mse), 'mae': mean_absolute_error(test, forecast), 'aic': fitted_model.aic, 'iteration': iteration_count }) except Exception: continue iteration_count += 1 if iteration_count % 5 == 0: self.logger.info(f"{ticker} - Completed {iteration_count} grid search iterations") # Analyze grid search results best_order = None best_avg_rmse = float('inf') order_stats = {} for order, performances in order_performance.items(): if len(performances) >= 3: # Require at least 3 successful evaluations avg_rmse = np.mean([p['rmse'] for p in performances]) avg_aic = np.mean([p['aic'] for p in performances]) order_stats[order] = { 'avg_rmse': avg_rmse, 'avg_aic': avg_aic, 'count': len(performances), 'std_rmse': np.std([p['rmse'] for p in performances]) } if avg_rmse < best_avg_rmse: best_avg_rmse = avg_rmse best_order = order if best_order is None: # Fallback to (1,1,1) best_order = (1, 1, 1) self.logger.warning(f"{ticker} - Grid search failed, using fallback ARIMA{best_order}") return { 'ticker': ticker, 'method': 'grid_search', 'best_order': best_order, 'best_avg_rmse': order_stats.get(best_order, {}).get('avg_rmse', 0), 'total_iterations': iteration_count, 'order_stats': order_stats, 'optimization_date': datetime.now().isoformat() } def _analyze_results(self, errors: List[Dict], order_counts: Dict, ticker: str, method: str) -> Dict: """Analyze optimization results and determine best parameters.""" if not errors: self.logger.warning(f"{ticker} - No successful iterations, using fallback ARIMA(1,1,1)") return { 'ticker': ticker, 'method': method, 'best_order': (1, 1, 1), 'best_avg_rmse': 0, 'total_iterations': 0, 'order_frequency': {}, 'optimization_date': datetime.now().isoformat() } # Group errors by order rmse_by_order = defaultdict(list) for error in errors: rmse_by_order[error['order']].append(error['rmse']) # Calculate average RMSE for each order avg_rmse_by_order = { order: np.mean(rmses) for order, rmses in rmse_by_order.items() } # Find best order best_order = min(avg_rmse_by_order, key=avg_rmse_by_order.get) best_avg_rmse = avg_rmse_by_order[best_order] # Calculate order frequency (how often each order was selected) total_iterations = len(errors) order_frequency = {order: count/total_iterations for order, count in order_counts.items()} self.logger.info(f"{ticker} - Best ARIMA{best_order}, Avg RMSE: {best_avg_rmse:.4f}") self.logger.info(f"{ticker} - Order frequency: {dict(sorted(order_frequency.items(), key=lambda x: x[1], reverse=True)[:3])}") return { 'ticker': ticker, 'method': method, 'best_order': best_order, 'best_avg_rmse': best_avg_rmse, 'total_iterations': total_iterations, 'order_frequency': order_frequency, 'avg_rmse_by_order': avg_rmse_by_order, 'optimization_date': datetime.now().isoformat() } def optimize_single_stock(self, ticker: str) -> Optional[Dict]: """Optimize ARIMA parameters for a single stock.""" start_time = time.time() self.logger.info(f"🔧 Starting optimization for {ticker}") # Download data close_prices = self.download_stock_data(ticker) if close_prices is None: return None # Optimize parameters if PMDARIMA_AVAILABLE: result = self.optimize_with_pmdarima(close_prices, ticker) else: result = self.optimize_with_grid_search(close_prices, ticker) # Add timing information optimization_time = time.time() - start_time result['optimization_time_seconds'] = round(optimization_time, 2) self.logger.info(f"✅ {ticker} optimization completed in {optimization_time:.1f}s - Best: ARIMA{result['best_order']}") return result def optimize_portfolio(self, tickers: List[str], max_workers: int = 4) -> Dict[str, Dict]: """Optimize ARIMA parameters for a list of stocks.""" self.logger.info(f"🚀 Starting portfolio optimization for {len(tickers)} stocks") results = {} if max_workers == 1: # Sequential processing for i, ticker in enumerate(tickers, 1): print(f"[{i:3d}/{len(tickers)}] Optimizing {ticker}...") result = self.optimize_single_stock(ticker) if result: results[ticker] = result else: # Parallel processing print(f"🔄 Using {max_workers} workers for parallel optimization") with ThreadPoolExecutor(max_workers=max_workers) as executor: # Submit all tasks future_to_ticker = { executor.submit(self.optimize_single_stock, ticker): ticker for ticker in tickers } # Collect results as they complete completed = 0 for future in as_completed(future_to_ticker): ticker = future_to_ticker[future] completed += 1 try: result = future.result() if result: results[ticker] = result print(f"[{completed:3d}/{len(tickers)}] ✅ {ticker}: ARIMA{result['best_order']} (RMSE: {result['best_avg_rmse']:.4f})") else: print(f"[{completed:3d}/{len(tickers)}] ❌ {ticker}: Optimization failed") except Exception as e: print(f"[{completed:3d}/{len(tickers)}] ❌ {ticker}: {e}") self.logger.info(f"Portfolio optimization completed: {len(results)}/{len(tickers)} successful") return results def save_optimization_results(self, results: Dict[str, Dict], filename: str = None) -> str: """Save optimization results to JSON file.""" if filename is None: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"arima_optimization_results_{timestamp}.json" filepath = self.results_dir / filename # Convert tuples to lists for JSON serialization json_safe_results = {} for ticker, result in results.items(): json_safe_result = result.copy() # Convert tuple keys and values to JSON-safe format if 'best_order' in json_safe_result and isinstance(json_safe_result['best_order'], tuple): json_safe_result['best_order'] = list(json_safe_result['best_order']) # Handle order_frequency dict with tuple keys if 'order_frequency' in json_safe_result: order_freq_safe = {} for order, freq in json_safe_result['order_frequency'].items(): if isinstance(order, tuple): order_freq_safe[str(order)] = freq else: order_freq_safe[order] = freq json_safe_result['order_frequency'] = order_freq_safe # Handle avg_rmse_by_order dict with tuple keys if 'avg_rmse_by_order' in json_safe_result: rmse_safe = {} for order, rmse in json_safe_result['avg_rmse_by_order'].items(): if isinstance(order, tuple): rmse_safe[str(order)] = rmse else: rmse_safe[order] = rmse json_safe_result['avg_rmse_by_order'] = rmse_safe # Handle order_stats dict with tuple keys (from grid search) if 'order_stats' in json_safe_result: stats_safe = {} for order, stats in json_safe_result['order_stats'].items(): if isinstance(order, tuple): stats_safe[str(order)] = stats else: stats_safe[order] = stats json_safe_result['order_stats'] = stats_safe json_safe_results[ticker] = json_safe_result with open(filepath, 'w') as f: json.dump(json_safe_results, f, indent=2) self.logger.info(f"📁 Results saved to {filepath}") return str(filepath) def load_optimization_results(self, filename: str) -> Dict[str, Dict]: """Load optimization results from JSON file.""" filepath = self.results_dir / filename with open(filepath, 'r') as f: results = json.load(f) # Convert string representations back to tuples for best_order for ticker, result in results.items(): if 'best_order' in result and isinstance(result['best_order'], list): result['best_order'] = tuple(result['best_order']) # Convert string keys back to tuples for order_frequency if 'order_frequency' in result: order_freq_restored = {} for order_str, freq in result['order_frequency'].items(): try: # Try to evaluate string representation of tuple if order_str.startswith('(') and order_str.endswith(')'): order_tuple = eval(order_str) order_freq_restored[order_tuple] = freq else: order_freq_restored[order_str] = freq except: order_freq_restored[order_str] = freq result['order_frequency'] = order_freq_restored # Convert string keys back to tuples for avg_rmse_by_order if 'avg_rmse_by_order' in result: rmse_restored = {} for order_str, rmse in result['avg_rmse_by_order'].items(): try: if order_str.startswith('(') and order_str.endswith(')'): order_tuple = eval(order_str) rmse_restored[order_tuple] = rmse else: rmse_restored[order_str] = rmse except: rmse_restored[order_str] = rmse result['avg_rmse_by_order'] = rmse_restored # Convert string keys back to tuples for order_stats if 'order_stats' in result: stats_restored = {} for order_str, stats in result['order_stats'].items(): try: if order_str.startswith('(') and order_str.endswith(')'): order_tuple = eval(order_str) stats_restored[order_tuple] = stats else: stats_restored[order_str] = stats except: stats_restored[order_str] = stats result['order_stats'] = stats_restored return results def print_optimization_summary(self, results: Dict[str, Dict]): """Print a summary of optimization results.""" if not results: print("❌ No optimization results to display") return print(f"\n📊 ARIMA OPTIMIZATION SUMMARY") print("=" * 70) print(f"{'Ticker':<8} {'Best Order':<12} {'Avg RMSE':<12} {'Iterations':<12} {'Time (s)':<10}") print("-" * 70) total_time = 0 order_counts = defaultdict(int) for ticker, result in sorted(results.items()): best_order = result['best_order'] avg_rmse = result.get('best_avg_rmse', 0) iterations = result.get('total_iterations', 0) opt_time = result.get('optimization_time_seconds', 0) print(f"{ticker:<8} {str(best_order):<12} {avg_rmse:<12.4f} {iterations:<12} {opt_time:<10.1f}") total_time += opt_time order_counts[best_order] += 1 print("-" * 70) print(f"📈 Total Optimization Time: {total_time:.1f} seconds") print(f"📊 Successfully Optimized: {len(results)} stocks") print(f"\n🏆 Most Common ARIMA Orders:") for order, count in sorted(order_counts.items(), key=lambda x: x[1], reverse=True)[:5]: percentage = (count / len(results)) * 100 print(f" ARIMA{order}: {count} stocks ({percentage:.1f}%)") # Performance insights rmse_values = [r.get('best_avg_rmse', 0) for r in results.values() if r.get('best_avg_rmse', 0) > 0] if rmse_values: avg_rmse = np.mean(rmse_values) median_rmse = np.median(rmse_values) print(f"\n📊 RMSE Statistics:") print(f" Average RMSE: {avg_rmse:.4f}") print(f" Median RMSE: {median_rmse:.4f}") print(f" Best RMSE: {min(rmse_values):.4f}") print(f" Worst RMSE: {max(rmse_values):.4f}") def load_portfolio_config(config_path: str = "config/trading_config.json") -> Dict: """Load portfolio configuration.""" try: with open(config_path, 'r') as f: return json.load(f) except Exception as e: print(f"❌ Error loading config: {e}") return {"tickers": {"default": ["AAPL", "MSFT", "GOOGL"]}} def get_all_unique_tickers(config: Dict) -> List[str]: """Get all unique tickers from portfolio configuration.""" all_tickers = set() for portfolio_name, tickers in config.get("tickers", {}).items(): if isinstance(tickers, list): all_tickers.update(tickers) # Remove metadata entries all_tickers = {ticker for ticker in all_tickers if not str(ticker).startswith('//')} return sorted(list(all_tickers)) def show_available_portfolios(config: Dict) -> None: """Display available portfolios from config.""" print("📊 AVAILABLE PORTFOLIOS") print("=" * 50) portfolios = config.get("tickers", {}) if not portfolios: print("❌ No portfolios found in config") return total_unique_stocks = len(get_all_unique_tickers(config)) for i, (portfolio_name, tickers) in enumerate(portfolios.items(), 1): if isinstance(tickers, list): print(f" {i:2d}. {portfolio_name:<20} ({len(tickers):2d} stocks)") # Show first few tickers as preview preview = tickers[:3] if len(tickers) > 3: preview_str = f"{', '.join(preview)}, ... (+{len(tickers)-3} more)" else: preview_str = ', '.join(preview) print(f" {preview_str}") else: print(f" {i:2d}. {portfolio_name:<20} (invalid format)") print(f"\n🌐 Total unique stocks across all portfolios: {total_unique_stocks}") print(f"\n💡 Usage examples:") print(f" python preprocess_arima_params.py --portfolio tech_giants") print(f" python preprocess_arima_params.py --all") print(f" python preprocess_arima_params.py --tickers AAPL MSFT GOOGL") def main(): """Main function for ARIMA parameter optimization.""" import argparse parser = argparse.ArgumentParser( description="ARIMA Parameter Optimization for Portfolio Stocks", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python preprocess_arima_params.py # Show available portfolios python preprocess_arima_params.py --portfolio tech_giants # Optimize specific portfolio python preprocess_arima_params.py --all # Optimize ALL stocks python preprocess_arima_params.py --tickers AAPL MSFT GOOGL # Optimize specific stocks python preprocess_arima_params.py --portfolio large_cap --workers 2 # Custom settings python preprocess_arima_params.py --load results_file.json # Load previous results """ ) parser.add_argument('--portfolio', type=str, help='Specific portfolio name from config (e.g., tech_giants, large_cap)') parser.add_argument('--all', action='store_true', help='Optimize ALL stocks across all portfolios') parser.add_argument('--tickers', nargs='+', help='Specific tickers to optimize (space-separated)') parser.add_argument('--window', type=int, default=750, help='Rolling window size in days (default: 750)') parser.add_argument('--horizon', type=int, default=5, help='Forecast horizon in days (default: 5)') parser.add_argument('--iterations', type=int, default=50, help='Number of optimization iterations (default: 50)') parser.add_argument('--workers', type=int, default=4, help='Number of parallel workers (default: 4)') parser.add_argument('--load', type=str, help='Load and display previous results from JSON file') parser.add_argument('--list-portfolios', action='store_true', help='List available portfolios and exit') args = parser.parse_args() # Load configuration config = load_portfolio_config() # Handle list portfolios option if args.list_portfolios: show_available_portfolios(config) return # Initialize optimizer optimizer = ARIMAParameterOptimizer( rolling_window=args.window, forecast_horizon=args.horizon, n_iterations=args.iterations ) if args.load: # Load and display previous results try: results = optimizer.load_optimization_results(args.load) optimizer.print_optimization_summary(results) except Exception as e: print(f"❌ Error loading results: {e}") return # Determine tickers to optimize tickers = [] optimization_description = "" if args.tickers: # Specific tickers provided tickers = args.tickers optimization_description = f"specified tickers: {', '.join(tickers)}" elif args.all: # All stocks across all portfolios tickers = get_all_unique_tickers(config) optimization_description = f"ALL stocks across all portfolios ({len(tickers)} unique stocks)" elif args.portfolio: # Specific portfolio portfolio_tickers = config.get("tickers", {}).get(args.portfolio, []) if not portfolio_tickers: print(f"❌ Portfolio '{args.portfolio}' not found in config") print(f"\n📊 Available portfolios:") for name in config.get("tickers", {}).keys(): print(f" - {name}") return tickers = portfolio_tickers optimization_description = f"'{args.portfolio}' portfolio ({len(tickers)} stocks)" else: # No arguments provided - show help print("🔧 ARIMA PARAMETER OPTIMIZATION") print("=" * 50) print("No optimization target specified.\n") show_available_portfolios(config) print(f"\n📖 Usage:") print(f" --portfolio PORTFOLIO_NAME : Optimize specific portfolio") print(f" --all : Optimize all stocks") print(f" --tickers TICKER1 TICKER2 : Optimize specific stocks") print(f" --list-portfolios : Show available portfolios") print(f" --load FILENAME.json : Load previous results") return if not tickers: print("❌ No tickers to optimize") return # Validate tickers are not empty tickers = [t for t in tickers if t and not str(t).startswith('//')] if not tickers: print("❌ No valid tickers found after filtering") return # Show optimization plan print(f"🔧 ARIMA PARAMETER OPTIMIZATION") print("=" * 60) print(f"📊 Target: {optimization_description}") print(f"🔧 Settings:") print(f" Rolling Window: {args.window} days") print(f" Forecast Horizon: {args.horizon} days") print(f" Iterations: {args.iterations}") print(f" Parallel Workers: {args.workers}") print(f" Stocks to optimize: {len(tickers)}") if len(tickers) <= 10: print(f" Tickers: {', '.join(tickers)}") else: print(f" First 10 tickers: {', '.join(tickers[:10])}, ... (+{len(tickers)-10} more)") # Estimate time estimated_time_per_stock = 30 # seconds estimated_total_time = (len(tickers) * estimated_time_per_stock) / args.workers print(f"🕐 Estimated time: {estimated_total_time/60:.1f} minutes") # Confirm for large optimizations if len(tickers) > 20: response = input(f"\n⚠️ You're about to optimize {len(tickers)} stocks. Continue? (y/N): ") if response.lower() not in ['y', 'yes']: print("❌ Optimization cancelled") return print(f"\n🚀 Starting optimization...") # Run optimization start_time = time.time() results = optimizer.optimize_portfolio(tickers, max_workers=args.workers) total_time = time.time() - start_time print(f"\n🏁 Optimization completed in {total_time:.1f} seconds") # Save and display results if results: # Create descriptive filename timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") if args.portfolio: filename_base = f"arima_optimization_{args.portfolio}_{timestamp}.json" elif args.all: filename_base = f"arima_optimization_all_stocks_{timestamp}.json" else: filename_base = f"arima_optimization_custom_{timestamp}.json" filename = optimizer.save_optimization_results(results, filename_base) optimizer.print_optimization_summary(results) print(f"\n💾 Results saved to: {filename}") print(f"🔧 Your enhanced arima_model.py will automatically load these parameters") print(f"📊 Run your main trading system to use optimized parameters") else: print("❌ No successful optimizations") if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/j1c4b/mcp-hybrid-forecasting'

If you have feedback or need assistance with the MCP directory API, please join our Discord server