Skip to main content
Glama

MCP Hybrid Forecasting

by j1c4b
enhanced_main.py50.5 kB
# enhanced_main.py - Smart trading system with dual model comparison for crypto stocks import pandas as pd import numpy as np import warnings import json import argparse from datetime import datetime, timedelta from pathlib import Path from typing import List, Dict, Any, Optional, Set, Tuple import matplotlib.pyplot as plt import seaborn as sns warnings.filterwarnings('ignore') # Enhanced imports with graceful fallback try: from models.arima_model import get_arima_forecast, get_enhanced_arima_forecast, clear_cache_for_ticker, clear_all_cache ARIMA_AVAILABLE = True except ImportError as e: print(f"⚠️ ARIMA model not available: {e}") ARIMA_AVAILABLE = False try: from models.arima_garch_model import get_arima_garch_forecast, get_enhanced_arima_garch_forecast, check_dependencies ARIMA_GARCH_AVAILABLE = True except ImportError as e: print(f"⚠️ ARIMA-GARCH model not available: {e}") print(f"💡 Create models/arima_garch_model.py to enable ARIMA-GARCH functionality") ARIMA_GARCH_AVAILABLE = False # Fallback function def check_dependencies(): return False try: from models.hybrid_model import train_xgboost_on_residuals HYBRID_AVAILABLE = True except ImportError as e: print(f"⚠️ Hybrid model not available: {e}") HYBRID_AVAILABLE = False # Import volatility detection import yfinance as yf class VolatilityClassifier: """Lightweight volatility classifier with two modes: config-based or pure volatility.""" def __init__(self, extreme_volatility_config: str = "config/extreme_volatility.json", use_config: bool = True): self.extreme_volatility_config_path = extreme_volatility_config self.use_config = use_config self.crypto_keywords = self._load_crypto_keywords() if use_config else [] self.high_volatility_threshold = 0.03 # 3% daily volatility self.extreme_volatility_threshold = 0.05 # 5% daily volatility # Show mode if use_config: print(f"🪙 CONFIG MODE: Using {len(self.crypto_keywords)} crypto keywords from config") else: print("📊 PURE VOLATILITY MODE: Classifying stocks by volatility only") def _load_crypto_keywords(self) -> List[str]: """Load crypto/extreme volatility keywords from JSON config file.""" try: with open(self.extreme_volatility_config_path, 'r') as f: config = json.load(f) # Extract tickers from the extreme category crypto_keywords = config.get("tickers", {}).get("extreme", []) if crypto_keywords: print(f"✅ Loaded {len(crypto_keywords)} crypto/extreme volatility tickers from {self.extreme_volatility_config_path}") return crypto_keywords else: print(f"⚠️ No extreme volatility tickers found in {self.extreme_volatility_config_path}") return [] except FileNotFoundError: print(f"❌ Config file {self.extreme_volatility_config_path} not found!") return [] except json.JSONDecodeError as e: print(f"❌ Error parsing {self.extreme_volatility_config_path}: {e}") return [] except Exception as e: print(f"❌ Unexpected error loading {self.extreme_volatility_config_path}: {e}") return [] def reload_crypto_keywords(self) -> bool: """Reload crypto keywords from config file. Only works in config mode.""" if not self.use_config: print("⚠️ Not in config mode, cannot reload crypto keywords") return False try: new_keywords = self._load_crypto_keywords() self.crypto_keywords = new_keywords return True except Exception as e: print(f"❌ Failed to reload crypto keywords: {e}") return False def add_crypto_ticker(self, ticker: str) -> bool: """Add a new ticker to the crypto keywords list and save to config.""" if not self.use_config: print("⚠️ Not in config mode, cannot add crypto ticker") return False if ticker.upper() not in [kw.upper() for kw in self.crypto_keywords]: self.crypto_keywords.append(ticker.upper()) return self._save_crypto_keywords() else: print(f"⚠️ {ticker} already in crypto keywords list") return True def remove_crypto_ticker(self, ticker: str) -> bool: """Remove a ticker from crypto keywords list and save to config.""" if not self.use_config: print("⚠️ Not in config mode, cannot remove crypto ticker") return False original_count = len(self.crypto_keywords) self.crypto_keywords = [kw for kw in self.crypto_keywords if kw.upper() != ticker.upper()] if len(self.crypto_keywords) < original_count: return self._save_crypto_keywords() else: print(f"⚠️ {ticker} not found in crypto keywords list") return True def _save_crypto_keywords(self) -> bool: """Save current crypto keywords back to config file.""" try: # Ensure directory exists Path(self.extreme_volatility_config_path).parent.mkdir(parents=True, exist_ok=True) # Create config structure config = { "tickers": { "extreme": sorted(self.crypto_keywords) } } # Save to file with open(self.extreme_volatility_config_path, 'w') as f: json.dump(config, f, indent=2) print(f"✅ Saved {len(self.crypto_keywords)} crypto keywords to {self.extreme_volatility_config_path}") return True except Exception as e: print(f"❌ Failed to save crypto keywords: {e}") return False def list_crypto_keywords(self) -> List[str]: """Return current list of crypto keywords.""" if not self.use_config: print("⚠️ Not in config mode, no crypto keywords loaded") return [] return self.crypto_keywords.copy() def classify_stock(self, ticker: str, period: str = "6mo") -> Dict[str, Any]: """Classify a stock's volatility and recommend modeling approach.""" try: # Download recent data for classification data = yf.download(ticker, period=period, progress=False) if data.empty: return { "ticker": ticker, "volatility_class": "UNKNOWN", "recommended_model": "ARIMA", "daily_volatility": 0.0, "error": "No data available" } close = data['Close'].dropna() returns = close.pct_change().dropna() daily_vol = float(returns.std()) # Classify volatility if daily_vol > self.extreme_volatility_threshold: volatility_class = "EXTREME" recommended_model = "ARIMA-GARCH" elif daily_vol > self.high_volatility_threshold: volatility_class = "HIGH" recommended_model = "ARIMA-GARCH" else: volatility_class = "MODERATE" recommended_model = "ARIMA" # Check if crypto (only in config mode) if self.use_config: is_crypto = any(keyword in ticker.upper() for keyword in self.crypto_keywords) else: is_crypto = False # In pure volatility mode, no crypto classification return { "ticker": ticker, "volatility_class": volatility_class, "recommended_model": recommended_model, "daily_volatility": daily_vol, "is_crypto": is_crypto, "mode": "config" if self.use_config else "pure_volatility", "crypto_keywords_source": self.extreme_volatility_config_path if self.use_config else None, "data_points": len(close) } except Exception as e: return { "ticker": ticker, "volatility_class": "UNKNOWN", "recommended_model": "ARIMA", "daily_volatility": 0.0, "error": str(e) } class ModelComparator: """Compare ARIMA vs ARIMA-GARCH models for crypto stocks with moderate volatility.""" def __init__(self, force_recalculate: bool = False): self.comparison_results = {} self.force_recalculate = force_recalculate def compare_models_for_crypto_moderate(self, ticker: str) -> Dict[str, Any]: """ Compare ARIMA vs ARIMA-GARCH for crypto stocks with moderate volatility. Returns the better performing model and comparison metrics. """ print(f"\n🔬 DUAL MODEL COMPARISON FOR {ticker}") print("=" * 50) comparison_result = { 'ticker': ticker, 'arima_results': None, 'garch_results': None, 'comparison_metrics': {}, 'best_model': None, 'confidence_in_choice': 0.0, 'error': None } try: # Test ARIMA model print(f"📊 Testing ARIMA model for {ticker}...") arima_results = self._test_arima_model(ticker) comparison_result['arima_results'] = arima_results # Test ARIMA-GARCH model print(f"🔧 Testing ARIMA-GARCH model for {ticker}...") garch_results = self._test_garch_model(ticker) comparison_result['garch_results'] = garch_results # Compare models if arima_results and garch_results: comparison_metrics = self._compare_model_performance(arima_results, garch_results) comparison_result['comparison_metrics'] = comparison_metrics comparison_result['best_model'] = comparison_metrics['winner'] comparison_result['confidence_in_choice'] = comparison_metrics['confidence'] self._print_comparison_results(ticker, comparison_metrics, arima_results, garch_results) elif arima_results and not garch_results: comparison_result['best_model'] = 'ARIMA' comparison_result['confidence_in_choice'] = 0.6 print(f"⚠️ Only ARIMA succeeded, using ARIMA by default") elif garch_results and not arima_results: comparison_result['best_model'] = 'ARIMA-GARCH' comparison_result['confidence_in_choice'] = 0.6 print(f"⚠️ Only ARIMA-GARCH succeeded, using ARIMA-GARCH by default") else: comparison_result['error'] = "Both models failed" print(f"❌ Both models failed for {ticker}") except Exception as e: comparison_result['error'] = str(e) print(f"❌ Model comparison failed for {ticker}: {e}") self.comparison_results[ticker] = comparison_result return comparison_result def _test_arima_model(self, ticker: str) -> Optional[Dict[str, Any]]: """Test ARIMA model and return performance metrics.""" try: forecast_price, residuals, df = get_arima_forecast(ticker, use_optimized_params=not self.force_recalculate, force_recalculate=self.force_recalculate) # ADD XGBOOST ENHANCEMENT IF AVAILABLE base_forecast = float(forecast_price) if HYBRID_AVAILABLE: try: xgb_model, latest_features = train_xgboost_on_residuals(residuals, df) residual_correction = float(xgb_model.predict(latest_features)[0]) forecast_price = base_forecast + residual_correction model_type = 'ARIMA+XGBoost' except: model_type = 'ARIMA' else: model_type = 'ARIMA' # Calculate performance metrics last_price = float(df['Close'].iloc[-1]) returns = df['Close'].pct_change().dropna() # Model fit metrics residuals_array = np.asarray(residuals).flatten() residuals_clean = residuals_array[np.isfinite(residuals_array)] aic_proxy = len(residuals_clean) * np.log(np.var(residuals_clean)) + 2 * 3 # Rough AIC residual_std = float(np.std(residuals_clean)) residual_mean = float(np.mean(residuals_clean)) # Forecast metrics expected_change = (forecast_price - last_price) / last_price volatility = float(returns.rolling(window=20).std().iloc[-1]) return { 'model_type': model_type, 'forecast_price': float(forecast_price), 'last_price': last_price, 'expected_change': expected_change, 'volatility': volatility, 'aic_proxy': aic_proxy, 'residual_std': residual_std, 'residual_mean': residual_mean, 'residual_normality': self._test_normality(residuals_clean), 'data_points': len(df), 'success': True } except Exception as e: print(f" ❌ ARIMA failed: {e}") return None def _test_garch_model(self, ticker: str) -> Optional[Dict[str, Any]]: """Test ARIMA-GARCH model and return performance metrics.""" try: # Note: get_arima_garch_forecast should also be updated to support force_recalculate # For now, we assume it uses its own caching mechanism forecast_price, forecast_details, df = get_arima_garch_forecast(ticker) # GET BASE FORECAST base_forecast = forecast_price # ADD XGBOOST ENHANCEMENT IF AVAILABLE if HYBRID_AVAILABLE and 'residuals' in forecast_details: try: residuals = forecast_details['residuals'] xgb_model, latest_features = train_xgboost_on_residuals(residuals, df) residual_correction = float(xgb_model.predict(latest_features)[0]) forecast_price = base_forecast + residual_correction model_type = 'ARIMA-GARCH+XGBoost' except: model_type = 'ARIMA-GARCH' else: model_type = 'ARIMA-GARCH' # Extract metrics last_price = float(df['Close'].iloc[-1]) expected_change = (forecast_price - last_price) / last_price volatility = forecast_details['forecast_result']['volatility_forecast'] / 100 # Model diagnostics diagnostics = forecast_details.get('diagnostics', {}) aic = diagnostics.get('arima_aic', None) garch_aic = diagnostics.get('garch_aic', None) residual_std = diagnostics.get('residual_std', 0.0) residual_mean = diagnostics.get('residual_mean', 0.0) # Get residuals for normality test residuals = forecast_details.get('residuals', None) if residuals is not None: residuals_array = np.asarray(residuals).flatten() residuals_clean = residuals_array[np.isfinite(residuals_array)] residual_normality = self._test_normality(residuals_clean) else: residual_normality = 0.5 return { 'model_type': model_type, 'forecast_price': forecast_price, 'last_price': last_price, 'expected_change': expected_change, 'volatility': volatility, 'aic_proxy': aic or 0.0, 'garch_aic': garch_aic or 0.0, 'residual_std': residual_std, 'residual_mean': residual_mean, 'residual_normality': residual_normality, 'data_points': len(df), 'success': True } except Exception as e: print(f" ❌ ARIMA-GARCH failed: {e}") return None def _test_normality(self, residuals: np.ndarray) -> float: """Simple normality test for residuals (returns score 0-1).""" try: if len(residuals) < 10: return 0.5 # Simple normality indicators mean_close_to_zero = abs(np.mean(residuals)) < 0.1 * np.std(residuals) skewness = abs(self._calculate_skewness(residuals)) < 1.0 # Return normalized score score = (int(mean_close_to_zero) + int(skewness)) / 2.0 return score except: return 0.5 def _calculate_skewness(self, data: np.ndarray) -> float: """Calculate skewness of data.""" mean = np.mean(data) std = np.std(data) if std == 0: return 0 return np.mean(((data - mean) / std) ** 3) def _compare_model_performance(self, arima_results: Dict, garch_results: Dict) -> Dict[str, Any]: """Compare ARIMA vs ARIMA-GARCH and determine winner.""" metrics = { 'arima_score': 0.0, 'garch_score': 0.0, 'winner': None, 'confidence': 0.0, 'decision_factors': {} } # Scoring criteria (each worth up to 1 point) # 1. Residual quality (lower std is better) arima_res_std = arima_results['residual_std'] garch_res_std = garch_results['residual_std'] if arima_res_std > 0 and garch_res_std > 0: if arima_res_std < garch_res_std: metrics['arima_score'] += 1.0 metrics['decision_factors']['residual_quality'] = 'ARIMA better' else: metrics['garch_score'] += 1.0 metrics['decision_factors']['residual_quality'] = 'GARCH better' # 2. AIC comparison (lower is better) arima_aic = arima_results['aic_proxy'] garch_aic = garch_results['aic_proxy'] if arima_aic > 0 and garch_aic > 0: if arima_aic < garch_aic: metrics['arima_score'] += 1.0 metrics['decision_factors']['model_fit'] = 'ARIMA better AIC' else: metrics['garch_score'] += 1.0 metrics['decision_factors']['model_fit'] = 'GARCH better AIC' # 3. Residual normality (closer to normal is better) arima_norm = arima_results['residual_normality'] garch_norm = garch_results['residual_normality'] if arima_norm > garch_norm: metrics['arima_score'] += 1.0 metrics['decision_factors']['residual_normality'] = 'ARIMA more normal' else: metrics['garch_score'] += 1.0 metrics['decision_factors']['residual_normality'] = 'GARCH more normal' # 4. Volatility modeling appropriateness # For moderate volatility crypto, slight preference for GARCH metrics['garch_score'] += 0.3 metrics['decision_factors']['volatility_modeling'] = 'GARCH slight advantage for crypto' # 5. Forecast reasonableness (penalize extreme forecasts) arima_change = abs(arima_results['expected_change']) garch_change = abs(garch_results['expected_change']) # Prefer forecasts between 0.1% and 5% arima_reasonable = 0.001 <= arima_change <= 0.05 garch_reasonable = 0.001 <= garch_change <= 0.05 if arima_reasonable and not garch_reasonable: metrics['arima_score'] += 0.5 metrics['decision_factors']['forecast_reasonableness'] = 'ARIMA more reasonable' elif garch_reasonable and not arima_reasonable: metrics['garch_score'] += 0.5 metrics['decision_factors']['forecast_reasonableness'] = 'GARCH more reasonable' # Determine winner if metrics['arima_score'] > metrics['garch_score']: metrics['winner'] = 'ARIMA' score_diff = metrics['arima_score'] - metrics['garch_score'] else: metrics['winner'] = 'ARIMA-GARCH' score_diff = metrics['garch_score'] - metrics['arima_score'] # Calculate confidence (0.5 = tie, 1.0 = clear winner) max_possible_diff = 3.8 # Maximum possible score difference metrics['confidence'] = 0.5 + (score_diff / max_possible_diff) * 0.5 return metrics def _print_comparison_results(self, ticker: str, metrics: Dict, arima_results: Dict, garch_results: Dict): """Print detailed comparison results.""" print(f"\n📋 COMPARISON RESULTS FOR {ticker}:") print("-" * 40) # Model scores print(f"🏆 Winner: {metrics['winner']} (Confidence: {metrics['confidence']:.1%})") print(f"📊 Scores: ARIMA={metrics['arima_score']:.1f}, GARCH={metrics['garch_score']:.1f}") # Key metrics comparison print(f"\n📈 Forecast Comparison:") print(f" ARIMA: {arima_results['expected_change']:+.2%}") print(f" GARCH: {garch_results['expected_change']:+.2%}") print(f"\n🎯 Model Quality:") print(f" ARIMA AIC: {arima_results['aic_proxy']:.1f}") print(f" GARCH AIC: {garch_results['aic_proxy']:.1f}") print(f" ARIMA Res: {arima_results['residual_std']:.4f}") print(f" GARCH Res: {garch_results['residual_std']:.4f}") # Decision factors print(f"\n🧠 Decision Factors:") for factor, reason in metrics['decision_factors'].items(): print(f" {factor}: {reason}") class SmartTradingSystem: """Enhanced trading system with dual model comparison for crypto stocks.""" def __init__(self, config_path: str = "config/trading_config.json", extreme_volatility_config: str = "config/extreme_volatility.json", use_volatility_config: bool = True, force_recalculate: bool = False): self.config = self._load_config(config_path) self.volatility_classifier = VolatilityClassifier(extreme_volatility_config, use_volatility_config) self.model_comparator = ModelComparator(force_recalculate) self.analyzed_stocks = {} self.use_volatility_config = use_volatility_config self.force_recalculate = force_recalculate print("🧠 DUAL MODEL COMPARISON MODE: Testing both models for crypto stocks with moderate volatility") # Show mode-specific info if use_volatility_config: crypto_keywords = self.volatility_classifier.list_crypto_keywords() if crypto_keywords: print(f"🪙 Crypto keywords loaded: {', '.join(crypto_keywords[:10])}") if len(crypto_keywords) > 10: print(f" ... and {len(crypto_keywords) - 10} more") else: print("📊 Pure volatility mode: All stocks classified by volatility thresholds only") # Check model availability self.arima_available = ARIMA_AVAILABLE self.garch_available = ARIMA_GARCH_AVAILABLE and check_dependencies() self.hybrid_available = HYBRID_AVAILABLE if not self.arima_available: print("❌ ARIMA model not available - system cannot function") return if not self.garch_available: print("⚠️ ARIMA-GARCH not available - cannot perform dual comparison") if not self.hybrid_available: print("⚠️ XGBoost hybrid model not available") # Trading parameters trading_params = self.config.get("trading_parameters", {}) self.buy_threshold = trading_params.get('buy_threshold', 0.02) self.sell_threshold = trading_params.get('sell_threshold', -0.02) self.confidence_threshold = trading_params.get('confidence_threshold', 0.6) def _load_config(self, config_path: str) -> Dict[str, Any]: """Load configuration from file.""" try: with open(config_path, 'r') as f: return json.load(f) except Exception as e: print(f"❌ Error loading config: {e}") return {"tickers": {"default": ["AAPL", "MSFT"]}} def classify_portfolio_volatility(self, tickers: List[str]) -> Dict[str, Dict]: """Classify volatility and identify stocks needing dual comparison.""" mode_str = "CONFIG" if self.use_volatility_config else "PURE VOLATILITY" print(f"🔍 Classifying volatility for {len(tickers)} stocks ({mode_str} mode)...") classifications = {} arima_stocks = [] garch_stocks = [] dual_comparison_stocks = [] for i, ticker in enumerate(tickers, 1): print(f"[{i:2d}/{len(tickers)}] Analyzing {ticker}...", end=" ") classification = self.volatility_classifier.classify_stock(ticker) classifications[ticker] = classification if "error" in classification: print(f"❌ Error") continue vol_class = classification["volatility_class"] daily_vol = classification["daily_volatility"] is_crypto = classification["is_crypto"] # Determine if needs dual comparison if self.use_volatility_config: # Config mode: dual comparison for crypto + moderate needs_dual_comparison = (is_crypto and vol_class == "MODERATE" and self.arima_available and self.garch_available) else: # Pure volatility mode: no dual comparison (crypto override disabled) needs_dual_comparison = False if needs_dual_comparison: dual_comparison_stocks.append(ticker) icon = "⚖️" model_str = "DUAL-COMPARISON" elif vol_class in ["HIGH", "EXTREME"]: garch_stocks.append(ticker) icon = "🔴" if vol_class == "EXTREME" else "🟡" model_str = "ARIMA-GARCH" else: arima_stocks.append(ticker) icon = "🟢" model_str = "ARIMA" # Show crypto status only in config mode crypto_status = " (CRYPTO)" if is_crypto and self.use_volatility_config else "" print(f"{icon} {vol_class} ({daily_vol:.1%}){crypto_status} → {model_str}") # Summary print(f"\n📊 VOLATILITY CLASSIFICATION SUMMARY ({mode_str} MODE):") print(f" 🟢 ARIMA suitable: {len(arima_stocks)} stocks") if arima_stocks: print(f" {', '.join(arima_stocks)}") print(f" 🟡🔴 ARIMA-GARCH needed: {len(garch_stocks)} stocks") if garch_stocks: print(f" {', '.join(garch_stocks)}") if dual_comparison_stocks: print(f" ⚖️ Dual comparison needed: {len(dual_comparison_stocks)} stocks") print(f" {', '.join(dual_comparison_stocks)}") return { "classifications": classifications, "arima_stocks": arima_stocks, "garch_stocks": garch_stocks, "dual_comparison_stocks": dual_comparison_stocks } def analyze_stock_with_smart_model(self, ticker: str) -> Optional[Dict[str, Any]]: """Analyze stock using smart model selection with dual comparison for crypto moderate.""" if not self.arima_available: print(f"❌ {ticker}: ARIMA model not available") return None # Get volatility classification if ticker not in self.analyzed_stocks: classification = self.volatility_classifier.classify_stock(ticker) self.analyzed_stocks[ticker] = classification else: classification = self.analyzed_stocks[ticker] if "error" in classification: print(f"❌ {ticker}: Classification failed - {classification['error']}") return None vol_class = classification["volatility_class"] is_crypto = classification["is_crypto"] # Check if this stock needs dual comparison (only in config mode) if self.use_volatility_config: needs_dual_comparison = (is_crypto and vol_class == "MODERATE" and self.arima_available and self.garch_available) else: needs_dual_comparison = False if needs_dual_comparison: return self._analyze_with_dual_comparison(ticker, classification) else: return self._analyze_with_single_model(ticker, classification) def _analyze_with_dual_comparison(self, ticker: str, classification: Dict) -> Optional[Dict[str, Any]]: """Analyze crypto stock with moderate volatility using dual model comparison.""" # Run dual comparison comparison_result = self.model_comparator.compare_models_for_crypto_moderate(ticker) if comparison_result['error']: print(f"❌ {ticker}: Dual comparison failed - {comparison_result['error']}") return None best_model = comparison_result['best_model'] confidence_in_choice = comparison_result['confidence_in_choice'] # Use the best model for final forecast try: if best_model == 'ARIMA-GARCH': model_results = comparison_result['garch_results'] # Use the actual model type that was tested winning_model_type = model_results['model_type'] model_type = f"{winning_model_type} (won dual test, {confidence_in_choice:.1%} confidence)" else: model_results = comparison_result['arima_results'] # Use the actual model type that was tested winning_model_type = model_results['model_type'] model_type = f"{winning_model_type} (won dual test, {confidence_in_choice:.1%} confidence)" forecast_price = model_results['forecast_price'] last_price = model_results['last_price'] expected_change = model_results['expected_change'] volatility = model_results['volatility'] # Generate trading signal if expected_change > self.buy_threshold: signal = "BUY" emoji = "🟢" elif expected_change < self.sell_threshold: signal = "SELL" emoji = "🔴" else: signal = "HOLD" emoji = "🟡" # Calculate confidence (combine forecast confidence with model choice confidence) forecast_confidence = min(0.9, max(0.1, abs(expected_change) / (volatility + 0.01))) combined_confidence = (forecast_confidence + confidence_in_choice) / 2.0 result = { 'ticker': ticker, 'last_price': last_price, 'forecast_price': forecast_price, 'expected_change': expected_change, 'volatility': volatility, 'confidence': combined_confidence, 'signal': signal, 'emoji': emoji, 'model_used': model_type, 'volatility_class': classification["volatility_class"], 'dual_comparison_performed': True, 'winning_model': best_model, 'model_choice_confidence': confidence_in_choice, 'comparison_details': comparison_result, 'timestamp': datetime.now(), 'analysis_time': datetime.now().isoformat() } print(f"✅ ⚖️ {ticker}: {signal} | Change: {expected_change:+.2%} | {model_type}") print(f" Comparison winner: {best_model} (confidence: {confidence_in_choice:.1%})") return result except Exception as e: print(f"❌ {ticker} final analysis failed: {e}") return None def _analyze_with_single_model(self, ticker: str, classification: Dict) -> Optional[Dict[str, Any]]: """Analyze stock using standard single model approach with ARIMA-GARCH+XGBoost enhancement.""" vol_class = classification["volatility_class"] is_crypto = classification["is_crypto"] # Determine model based on mode if self.use_volatility_config: # Config mode: crypto override applies if vol_class in ["HIGH", "EXTREME"] or is_crypto: model_type = "ARIMA-GARCH" if self.garch_available else "ARIMA" else: model_type = "ARIMA" else: # Pure volatility mode: only volatility matters if vol_class in ["HIGH", "EXTREME"]: model_type = "ARIMA-GARCH" if self.garch_available else "ARIMA" else: model_type = "ARIMA" try: if model_type == "ARIMA-GARCH" and self.garch_available: if self.use_volatility_config and is_crypto: icon = "🪙" status = f"CRYPTO {vol_class} volatility" else: icon = "🔧" status = f"{vol_class} volatility" print(f"{icon} {ticker}: Using ARIMA-GARCH model ({status})") # Get ARIMA-GARCH forecast forecast_price, forecast_details, df = get_arima_garch_forecast(ticker) base_forecast = forecast_price # APPLY XGBOOST ENHANCEMENT TO ARIMA-GARCH if self.hybrid_available: try: # Get residuals from ARIMA-GARCH if 'residuals' in forecast_details: residuals = forecast_details['residuals'] else: # Fallback: use returns as residuals proxy returns = df['Close'].pct_change().dropna() residuals = returns # Apply XGBoost enhancement xgb_model, latest_features = train_xgboost_on_residuals(residuals, df) residual_correction = float(xgb_model.predict(latest_features)[0]) forecast_price = base_forecast + residual_correction model_used = "ARIMA-GARCH+XGBoost" except Exception as e: # If XGBoost enhancement fails, use base ARIMA-GARCH model_used = "ARIMA-GARCH" else: model_used = "ARIMA-GARCH" last_price = float(df['Close'].iloc[-1]) expected_change = (forecast_price - last_price) / last_price volatility = forecast_details['forecast_result']['volatility_forecast'] / 100 else: # ARIMA model path if model_type == "ARIMA" and vol_class in ["HIGH", "EXTREME"]: print(f"⚠️ {ticker}: ARIMA-GARCH not available, using ARIMA fallback") else: print(f"📊 {ticker}: Using ARIMA model ({vol_class} volatility)") forecast_price, residuals, df = get_arima_forecast(ticker, use_optimized_params=not self.force_recalculate, force_recalculate=self.force_recalculate) # Apply XGBoost enhancement to ARIMA if self.hybrid_available: xgb_model, latest_features = train_xgboost_on_residuals(residuals, df) residual_correction = float(xgb_model.predict(latest_features)[0]) forecast_price = float(forecast_price) + residual_correction model_used = "ARIMA+XGBoost" else: forecast_price = float(forecast_price) model_used = "ARIMA" last_price = float(df['Close'].iloc[-1]) expected_change = (forecast_price - last_price) / last_price returns = df['Close'].pct_change().dropna() volatility = float(returns.rolling(window=20).std().iloc[-1]) # Generate trading signal if expected_change > self.buy_threshold: signal = "BUY" emoji = "🟢" elif expected_change < self.sell_threshold: signal = "SELL" emoji = "🔴" else: signal = "HOLD" emoji = "🟡" confidence = min(0.9, max(0.1, abs(expected_change) / (volatility + 0.01))) result = { 'ticker': ticker, 'last_price': last_price, 'forecast_price': forecast_price, 'expected_change': expected_change, 'volatility': volatility, 'confidence': confidence, 'signal': signal, 'emoji': emoji, 'model_used': model_used, 'volatility_class': vol_class, 'dual_comparison_performed': False, 'mode': classification.get('mode', 'unknown'), 'timestamp': datetime.now(), 'analysis_time': datetime.now().isoformat() } print(f"✅ {emoji} {ticker}: {signal} | Change: {expected_change:+.2%} | Model: {model_used}") return result except Exception as e: print(f"❌ {ticker} analysis failed: {e}") return None def analyze_portfolio_with_smart_models(self, portfolio_name: str) -> List[Dict[str, Any]]: """Analyze portfolio using smart model selection with dual comparison.""" if portfolio_name == "all": tickers = self.get_all_unique_stocks() else: tickers = self.config["tickers"].get(portfolio_name, []) if not tickers: print(f"❌ Portfolio '{portfolio_name}' not found in config") return [] mode_str = "CONFIG" if self.use_volatility_config else "PURE VOLATILITY" print(f"\n🧠 DUAL-MODEL SMART PORTFOLIO ANALYSIS: {portfolio_name} ({mode_str} MODE)") print("=" * 70) # Step 1: Classify all stocks volatility_analysis = self.classify_portfolio_volatility(tickers) print(f"\n🔬 ANALYZING STOCKS WITH APPROPRIATE MODELS:") print("=" * 70) # Step 2: Analyze each stock results = [] for ticker in tickers: result = self.analyze_stock_with_smart_model(ticker) if result: results.append(result) # Step 3: Enhanced portfolio summary if results: self._print_enhanced_portfolio_summary(results, portfolio_name, volatility_analysis) return results def _print_enhanced_portfolio_summary(self, results: List[Dict], portfolio_name: str, volatility_analysis: Dict): """Print enhanced portfolio summary including dual comparison results.""" mode_str = "CONFIG" if self.use_volatility_config else "PURE VOLATILITY" print(f"\n📊 {portfolio_name.upper()} PORTFOLIO SUMMARY ({mode_str} MODE)") print("=" * 80) # Dual comparison summary (only in config mode) if self.use_volatility_config: dual_comparison_results = [r for r in results if r.get('dual_comparison_performed', False)] if dual_comparison_results: print(f"\n⚖️ DUAL COMPARISON RESULTS:") for result in dual_comparison_results: ticker = result['ticker'] winner = result['winning_model'] confidence = result['model_choice_confidence'] print(f" {ticker}: {winner} won (confidence: {confidence:.1%})") # Model usage summary model_counts = {} for result in results: model = result['model_used'].split(' (')[0] # Strip confidence info model_counts[model] = model_counts.get(model, 0) + 1 print(f"\n📈 Model Usage:") for model, count in model_counts.items(): print(f" {model}: {count} stocks") # Performance by model type for model in model_counts.keys(): model_results = [r for r in results if r['model_used'].startswith(model)] if model_results: avg_change = np.mean([r['expected_change'] for r in model_results]) signals = [r['signal'] for r in model_results] buy_count = signals.count('BUY') sell_count = signals.count('SELL') hold_count = signals.count('HOLD') print(f" 📊 {model} average expected change: {avg_change:+.2%}") print(f" Signals: {buy_count} BUY, {sell_count} SELL, {hold_count} HOLD") # Volatility distribution vol_classes = [r['volatility_class'] for r in results] print(f"\n🎯 Volatility Distribution:") for vol_class in ['MODERATE', 'HIGH', 'EXTREME']: count = vol_classes.count(vol_class) if count > 0: percentage = count / len(results) * 100 print(f" {vol_class}: {count} stocks ({percentage:.1f}%)") # Trading signals signals = [r['signal'] for r in results] print(f"\n📈 Trading Signals:") print(f" 🟢 BUY: {signals.count('BUY')} stocks") print(f" 🔴 SELL: {signals.count('SELL')} stocks") print(f" 🟡 HOLD: {signals.count('HOLD')} stocks") # Top recommendations buy_signals = [r for r in results if r['signal'] == 'BUY'] if buy_signals: buy_signals.sort(key=lambda x: x['confidence'], reverse=True) print(f"\n🏆 TOP BUY RECOMMENDATIONS:") for i, rec in enumerate(buy_signals[:5], 1): model_info = rec['model_used'].split(' (')[0] dual_info = " [DUAL-TESTED]" if rec.get('dual_comparison_performed', False) else "" print(f" {i}. {rec['ticker']}: {rec['expected_change']:+.2%} ({rec['confidence']:.1%}) [{model_info}]{dual_info}") sell_signals = [r for r in results if r['signal'] == 'SELL'] if sell_signals: sell_signals.sort(key=lambda x: x['confidence'], reverse=True) print(f"\n⚠️ TOP SELL RECOMMENDATIONS:") for i, rec in enumerate(sell_signals[:3], 1): model_info = rec['model_used'].split(' (')[0] dual_info = " [DUAL-TESTED]" if rec.get('dual_comparison_performed', False) else "" print(f" {i}. {rec['ticker']}: {rec['expected_change']:+.2%} ({rec['confidence']:.1%}) [{model_info}]{dual_info}") def get_all_unique_stocks(self) -> List[str]: """Get all unique stocks from all portfolios.""" all_stocks = set() for portfolio_name, tickers in self.config.get("tickers", {}).items(): if isinstance(tickers, list): all_stocks.update(tickers) # Remove metadata entries all_stocks = {stock for stock in all_stocks if not str(stock).startswith('//')} return sorted(list(all_stocks)) def parse_arguments(): """Parse command line arguments.""" parser = argparse.ArgumentParser( description='Smart Trading System with Dual Model Comparison', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python enhanced_main.py lenny_golub # Config mode (default) python enhanced_main.py lenny_golub --pure-volatility # Pure volatility mode python enhanced_main.py lenny_golub --force-recalculate # Force fresh parameter optimization python enhanced_main.py --list-crypto # Show current crypto keywords """ ) parser.add_argument('portfolios', nargs='*', help='Portfolio names to analyze or compare (use "all" for all stocks)') parser.add_argument('--pure-volatility', action='store_true', default=False, help='Use pure volatility classification (ignore crypto config)') parser.add_argument('--force-recalculate', action='store_true', default=False, help='Force fresh parameter optimization for all models (ignore cache)') parser.add_argument('--clear-cache', action='store_true', default=False, help='Clear all cached model parameters before analysis') parser.add_argument('--list-crypto', action='store_true', help='List current crypto keywords') parser.add_argument('--save', action='store_true', default=True, help='Save results to CSV (default: True)') parser.add_argument('--no-save', dest='save', action='store_false', help='Do not save results to CSV') return parser.parse_args() def main(): """Enhanced main function with config management.""" args = parse_arguments() if args.clear_cache: print("🗑️ Clearing all cached model parameters...") from models.arima_model import clear_all_cache cleared = clear_all_cache() print(f"✅ Cleared {cleared} cache files") if not args.portfolios: return # Just clear cache and exit # Handle config management commands (only work in config mode) if args.list_crypto: if args.pure_volatility: print("📊 Pure volatility mode: No crypto keywords used") return system = SmartTradingSystem(use_volatility_config=True) keywords = system.volatility_classifier.list_crypto_keywords() print(f"🪙 Current crypto keywords ({len(keywords)}):") for i, keyword in enumerate(keywords, 1): print(f" {i:2d}. {keyword}") return # Regular analysis with mode selection use_config = not args.pure_volatility system = SmartTradingSystem(use_volatility_config=use_config, force_recalculate=args.force_recalculate) if not args.portfolios: # Show help mode_desc = "CONFIG" if use_config else "PURE VOLATILITY" print(f"🧠 DUAL-MODEL SMART TRADING SYSTEM ({mode_desc} MODE)") print("=" * 70) if use_config: print("⚖️ Automatically tests both ARIMA and ARIMA-GARCH for crypto stocks with moderate volatility") else: print("📊 Classifies all stocks purely by volatility thresholds (no crypto overrides)") if args.force_recalculate: print("🔄 FORCE RECALCULATION MODE: All model parameters will be optimized fresh") print("🏆 Chooses the best performing model based on statistical comparison") available_portfolios = list(system.config.get("tickers", {}).keys()) print(f"\n📊 Available portfolios ({len(available_portfolios)}):") for i, portfolio in enumerate(available_portfolios, 1): tickers = system.config["tickers"][portfolio] print(f" {i:2d}. {portfolio:<20} ({len(tickers)} stocks)") print("\n📖 Usage Examples:") print(" python enhanced_main.py lenny_golub # Config mode") print(" python enhanced_main.py lenny_golub --pure-volatility # Pure volatility mode") print(" python enhanced_main.py lenny_golub --force-recalculate # Force fresh optimization") return # Single portfolio analysis portfolio_name = args.portfolios[0] results = system.analyze_portfolio_with_smart_models(portfolio_name) if args.save and results: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") mode_suffix = "_pure_vol" if args.pure_volatility else "_config" if args.force_recalculate: mode_suffix += "_fresh" filename = f"results/dual_model_trading_{portfolio_name}{mode_suffix}_{timestamp}.csv" df_results = pd.DataFrame(results) Path(filename).parent.mkdir(parents=True, exist_ok=True) df_results.to_csv(filename, index=False) print(f"\n📁 Results saved to: {filename}") if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/j1c4b/mcp-hybrid-forecasting'

If you have feedback or need assistance with the MCP directory API, please join our Discord server