enhanced_main.py•50.5 kB
# enhanced_main.py - Smart trading system with dual model comparison for crypto stocks
import pandas as pd
import numpy as np
import warnings
import json
import argparse
from datetime import datetime, timedelta
from pathlib import Path
from typing import List, Dict, Any, Optional, Set, Tuple
import matplotlib.pyplot as plt
import seaborn as sns
warnings.filterwarnings('ignore')
# Enhanced imports with graceful fallback
try:
from models.arima_model import get_arima_forecast, get_enhanced_arima_forecast, clear_cache_for_ticker, clear_all_cache
ARIMA_AVAILABLE = True
except ImportError as e:
print(f"⚠️ ARIMA model not available: {e}")
ARIMA_AVAILABLE = False
try:
from models.arima_garch_model import get_arima_garch_forecast, get_enhanced_arima_garch_forecast, check_dependencies
ARIMA_GARCH_AVAILABLE = True
except ImportError as e:
print(f"⚠️ ARIMA-GARCH model not available: {e}")
print(f"💡 Create models/arima_garch_model.py to enable ARIMA-GARCH functionality")
ARIMA_GARCH_AVAILABLE = False
# Fallback function
def check_dependencies():
return False
try:
from models.hybrid_model import train_xgboost_on_residuals
HYBRID_AVAILABLE = True
except ImportError as e:
print(f"⚠️ Hybrid model not available: {e}")
HYBRID_AVAILABLE = False
# Import volatility detection
import yfinance as yf
class VolatilityClassifier:
"""Lightweight volatility classifier with two modes: config-based or pure volatility."""
def __init__(self, extreme_volatility_config: str = "config/extreme_volatility.json",
use_config: bool = True):
self.extreme_volatility_config_path = extreme_volatility_config
self.use_config = use_config
self.crypto_keywords = self._load_crypto_keywords() if use_config else []
self.high_volatility_threshold = 0.03 # 3% daily volatility
self.extreme_volatility_threshold = 0.05 # 5% daily volatility
# Show mode
if use_config:
print(f"🪙 CONFIG MODE: Using {len(self.crypto_keywords)} crypto keywords from config")
else:
print("📊 PURE VOLATILITY MODE: Classifying stocks by volatility only")
def _load_crypto_keywords(self) -> List[str]:
"""Load crypto/extreme volatility keywords from JSON config file."""
try:
with open(self.extreme_volatility_config_path, 'r') as f:
config = json.load(f)
# Extract tickers from the extreme category
crypto_keywords = config.get("tickers", {}).get("extreme", [])
if crypto_keywords:
print(f"✅ Loaded {len(crypto_keywords)} crypto/extreme volatility tickers from {self.extreme_volatility_config_path}")
return crypto_keywords
else:
print(f"⚠️ No extreme volatility tickers found in {self.extreme_volatility_config_path}")
return []
except FileNotFoundError:
print(f"❌ Config file {self.extreme_volatility_config_path} not found!")
return []
except json.JSONDecodeError as e:
print(f"❌ Error parsing {self.extreme_volatility_config_path}: {e}")
return []
except Exception as e:
print(f"❌ Unexpected error loading {self.extreme_volatility_config_path}: {e}")
return []
def reload_crypto_keywords(self) -> bool:
"""Reload crypto keywords from config file. Only works in config mode."""
if not self.use_config:
print("⚠️ Not in config mode, cannot reload crypto keywords")
return False
try:
new_keywords = self._load_crypto_keywords()
self.crypto_keywords = new_keywords
return True
except Exception as e:
print(f"❌ Failed to reload crypto keywords: {e}")
return False
def add_crypto_ticker(self, ticker: str) -> bool:
"""Add a new ticker to the crypto keywords list and save to config."""
if not self.use_config:
print("⚠️ Not in config mode, cannot add crypto ticker")
return False
if ticker.upper() not in [kw.upper() for kw in self.crypto_keywords]:
self.crypto_keywords.append(ticker.upper())
return self._save_crypto_keywords()
else:
print(f"⚠️ {ticker} already in crypto keywords list")
return True
def remove_crypto_ticker(self, ticker: str) -> bool:
"""Remove a ticker from crypto keywords list and save to config."""
if not self.use_config:
print("⚠️ Not in config mode, cannot remove crypto ticker")
return False
original_count = len(self.crypto_keywords)
self.crypto_keywords = [kw for kw in self.crypto_keywords if kw.upper() != ticker.upper()]
if len(self.crypto_keywords) < original_count:
return self._save_crypto_keywords()
else:
print(f"⚠️ {ticker} not found in crypto keywords list")
return True
def _save_crypto_keywords(self) -> bool:
"""Save current crypto keywords back to config file."""
try:
# Ensure directory exists
Path(self.extreme_volatility_config_path).parent.mkdir(parents=True, exist_ok=True)
# Create config structure
config = {
"tickers": {
"extreme": sorted(self.crypto_keywords)
}
}
# Save to file
with open(self.extreme_volatility_config_path, 'w') as f:
json.dump(config, f, indent=2)
print(f"✅ Saved {len(self.crypto_keywords)} crypto keywords to {self.extreme_volatility_config_path}")
return True
except Exception as e:
print(f"❌ Failed to save crypto keywords: {e}")
return False
def list_crypto_keywords(self) -> List[str]:
"""Return current list of crypto keywords."""
if not self.use_config:
print("⚠️ Not in config mode, no crypto keywords loaded")
return []
return self.crypto_keywords.copy()
def classify_stock(self, ticker: str, period: str = "6mo") -> Dict[str, Any]:
"""Classify a stock's volatility and recommend modeling approach."""
try:
# Download recent data for classification
data = yf.download(ticker, period=period, progress=False)
if data.empty:
return {
"ticker": ticker,
"volatility_class": "UNKNOWN",
"recommended_model": "ARIMA",
"daily_volatility": 0.0,
"error": "No data available"
}
close = data['Close'].dropna()
returns = close.pct_change().dropna()
daily_vol = float(returns.std())
# Classify volatility
if daily_vol > self.extreme_volatility_threshold:
volatility_class = "EXTREME"
recommended_model = "ARIMA-GARCH"
elif daily_vol > self.high_volatility_threshold:
volatility_class = "HIGH"
recommended_model = "ARIMA-GARCH"
else:
volatility_class = "MODERATE"
recommended_model = "ARIMA"
# Check if crypto (only in config mode)
if self.use_config:
is_crypto = any(keyword in ticker.upper() for keyword in self.crypto_keywords)
else:
is_crypto = False # In pure volatility mode, no crypto classification
return {
"ticker": ticker,
"volatility_class": volatility_class,
"recommended_model": recommended_model,
"daily_volatility": daily_vol,
"is_crypto": is_crypto,
"mode": "config" if self.use_config else "pure_volatility",
"crypto_keywords_source": self.extreme_volatility_config_path if self.use_config else None,
"data_points": len(close)
}
except Exception as e:
return {
"ticker": ticker,
"volatility_class": "UNKNOWN",
"recommended_model": "ARIMA",
"daily_volatility": 0.0,
"error": str(e)
}
class ModelComparator:
"""Compare ARIMA vs ARIMA-GARCH models for crypto stocks with moderate volatility."""
def __init__(self, force_recalculate: bool = False):
self.comparison_results = {}
self.force_recalculate = force_recalculate
def compare_models_for_crypto_moderate(self, ticker: str) -> Dict[str, Any]:
"""
Compare ARIMA vs ARIMA-GARCH for crypto stocks with moderate volatility.
Returns the better performing model and comparison metrics.
"""
print(f"\n🔬 DUAL MODEL COMPARISON FOR {ticker}")
print("=" * 50)
comparison_result = {
'ticker': ticker,
'arima_results': None,
'garch_results': None,
'comparison_metrics': {},
'best_model': None,
'confidence_in_choice': 0.0,
'error': None
}
try:
# Test ARIMA model
print(f"📊 Testing ARIMA model for {ticker}...")
arima_results = self._test_arima_model(ticker)
comparison_result['arima_results'] = arima_results
# Test ARIMA-GARCH model
print(f"🔧 Testing ARIMA-GARCH model for {ticker}...")
garch_results = self._test_garch_model(ticker)
comparison_result['garch_results'] = garch_results
# Compare models
if arima_results and garch_results:
comparison_metrics = self._compare_model_performance(arima_results, garch_results)
comparison_result['comparison_metrics'] = comparison_metrics
comparison_result['best_model'] = comparison_metrics['winner']
comparison_result['confidence_in_choice'] = comparison_metrics['confidence']
self._print_comparison_results(ticker, comparison_metrics, arima_results, garch_results)
elif arima_results and not garch_results:
comparison_result['best_model'] = 'ARIMA'
comparison_result['confidence_in_choice'] = 0.6
print(f"⚠️ Only ARIMA succeeded, using ARIMA by default")
elif garch_results and not arima_results:
comparison_result['best_model'] = 'ARIMA-GARCH'
comparison_result['confidence_in_choice'] = 0.6
print(f"⚠️ Only ARIMA-GARCH succeeded, using ARIMA-GARCH by default")
else:
comparison_result['error'] = "Both models failed"
print(f"❌ Both models failed for {ticker}")
except Exception as e:
comparison_result['error'] = str(e)
print(f"❌ Model comparison failed for {ticker}: {e}")
self.comparison_results[ticker] = comparison_result
return comparison_result
def _test_arima_model(self, ticker: str) -> Optional[Dict[str, Any]]:
"""Test ARIMA model and return performance metrics."""
try:
forecast_price, residuals, df = get_arima_forecast(ticker,
use_optimized_params=not self.force_recalculate,
force_recalculate=self.force_recalculate)
# ADD XGBOOST ENHANCEMENT IF AVAILABLE
base_forecast = float(forecast_price)
if HYBRID_AVAILABLE:
try:
xgb_model, latest_features = train_xgboost_on_residuals(residuals, df)
residual_correction = float(xgb_model.predict(latest_features)[0])
forecast_price = base_forecast + residual_correction
model_type = 'ARIMA+XGBoost'
except:
model_type = 'ARIMA'
else:
model_type = 'ARIMA'
# Calculate performance metrics
last_price = float(df['Close'].iloc[-1])
returns = df['Close'].pct_change().dropna()
# Model fit metrics
residuals_array = np.asarray(residuals).flatten()
residuals_clean = residuals_array[np.isfinite(residuals_array)]
aic_proxy = len(residuals_clean) * np.log(np.var(residuals_clean)) + 2 * 3 # Rough AIC
residual_std = float(np.std(residuals_clean))
residual_mean = float(np.mean(residuals_clean))
# Forecast metrics
expected_change = (forecast_price - last_price) / last_price
volatility = float(returns.rolling(window=20).std().iloc[-1])
return {
'model_type': model_type,
'forecast_price': float(forecast_price),
'last_price': last_price,
'expected_change': expected_change,
'volatility': volatility,
'aic_proxy': aic_proxy,
'residual_std': residual_std,
'residual_mean': residual_mean,
'residual_normality': self._test_normality(residuals_clean),
'data_points': len(df),
'success': True
}
except Exception as e:
print(f" ❌ ARIMA failed: {e}")
return None
def _test_garch_model(self, ticker: str) -> Optional[Dict[str, Any]]:
"""Test ARIMA-GARCH model and return performance metrics."""
try:
# Note: get_arima_garch_forecast should also be updated to support force_recalculate
# For now, we assume it uses its own caching mechanism
forecast_price, forecast_details, df = get_arima_garch_forecast(ticker)
# GET BASE FORECAST
base_forecast = forecast_price
# ADD XGBOOST ENHANCEMENT IF AVAILABLE
if HYBRID_AVAILABLE and 'residuals' in forecast_details:
try:
residuals = forecast_details['residuals']
xgb_model, latest_features = train_xgboost_on_residuals(residuals, df)
residual_correction = float(xgb_model.predict(latest_features)[0])
forecast_price = base_forecast + residual_correction
model_type = 'ARIMA-GARCH+XGBoost'
except:
model_type = 'ARIMA-GARCH'
else:
model_type = 'ARIMA-GARCH'
# Extract metrics
last_price = float(df['Close'].iloc[-1])
expected_change = (forecast_price - last_price) / last_price
volatility = forecast_details['forecast_result']['volatility_forecast'] / 100
# Model diagnostics
diagnostics = forecast_details.get('diagnostics', {})
aic = diagnostics.get('arima_aic', None)
garch_aic = diagnostics.get('garch_aic', None)
residual_std = diagnostics.get('residual_std', 0.0)
residual_mean = diagnostics.get('residual_mean', 0.0)
# Get residuals for normality test
residuals = forecast_details.get('residuals', None)
if residuals is not None:
residuals_array = np.asarray(residuals).flatten()
residuals_clean = residuals_array[np.isfinite(residuals_array)]
residual_normality = self._test_normality(residuals_clean)
else:
residual_normality = 0.5
return {
'model_type': model_type,
'forecast_price': forecast_price,
'last_price': last_price,
'expected_change': expected_change,
'volatility': volatility,
'aic_proxy': aic or 0.0,
'garch_aic': garch_aic or 0.0,
'residual_std': residual_std,
'residual_mean': residual_mean,
'residual_normality': residual_normality,
'data_points': len(df),
'success': True
}
except Exception as e:
print(f" ❌ ARIMA-GARCH failed: {e}")
return None
def _test_normality(self, residuals: np.ndarray) -> float:
"""Simple normality test for residuals (returns score 0-1)."""
try:
if len(residuals) < 10:
return 0.5
# Simple normality indicators
mean_close_to_zero = abs(np.mean(residuals)) < 0.1 * np.std(residuals)
skewness = abs(self._calculate_skewness(residuals)) < 1.0
# Return normalized score
score = (int(mean_close_to_zero) + int(skewness)) / 2.0
return score
except:
return 0.5
def _calculate_skewness(self, data: np.ndarray) -> float:
"""Calculate skewness of data."""
mean = np.mean(data)
std = np.std(data)
if std == 0:
return 0
return np.mean(((data - mean) / std) ** 3)
def _compare_model_performance(self, arima_results: Dict, garch_results: Dict) -> Dict[str, Any]:
"""Compare ARIMA vs ARIMA-GARCH and determine winner."""
metrics = {
'arima_score': 0.0,
'garch_score': 0.0,
'winner': None,
'confidence': 0.0,
'decision_factors': {}
}
# Scoring criteria (each worth up to 1 point)
# 1. Residual quality (lower std is better)
arima_res_std = arima_results['residual_std']
garch_res_std = garch_results['residual_std']
if arima_res_std > 0 and garch_res_std > 0:
if arima_res_std < garch_res_std:
metrics['arima_score'] += 1.0
metrics['decision_factors']['residual_quality'] = 'ARIMA better'
else:
metrics['garch_score'] += 1.0
metrics['decision_factors']['residual_quality'] = 'GARCH better'
# 2. AIC comparison (lower is better)
arima_aic = arima_results['aic_proxy']
garch_aic = garch_results['aic_proxy']
if arima_aic > 0 and garch_aic > 0:
if arima_aic < garch_aic:
metrics['arima_score'] += 1.0
metrics['decision_factors']['model_fit'] = 'ARIMA better AIC'
else:
metrics['garch_score'] += 1.0
metrics['decision_factors']['model_fit'] = 'GARCH better AIC'
# 3. Residual normality (closer to normal is better)
arima_norm = arima_results['residual_normality']
garch_norm = garch_results['residual_normality']
if arima_norm > garch_norm:
metrics['arima_score'] += 1.0
metrics['decision_factors']['residual_normality'] = 'ARIMA more normal'
else:
metrics['garch_score'] += 1.0
metrics['decision_factors']['residual_normality'] = 'GARCH more normal'
# 4. Volatility modeling appropriateness
# For moderate volatility crypto, slight preference for GARCH
metrics['garch_score'] += 0.3
metrics['decision_factors']['volatility_modeling'] = 'GARCH slight advantage for crypto'
# 5. Forecast reasonableness (penalize extreme forecasts)
arima_change = abs(arima_results['expected_change'])
garch_change = abs(garch_results['expected_change'])
# Prefer forecasts between 0.1% and 5%
arima_reasonable = 0.001 <= arima_change <= 0.05
garch_reasonable = 0.001 <= garch_change <= 0.05
if arima_reasonable and not garch_reasonable:
metrics['arima_score'] += 0.5
metrics['decision_factors']['forecast_reasonableness'] = 'ARIMA more reasonable'
elif garch_reasonable and not arima_reasonable:
metrics['garch_score'] += 0.5
metrics['decision_factors']['forecast_reasonableness'] = 'GARCH more reasonable'
# Determine winner
if metrics['arima_score'] > metrics['garch_score']:
metrics['winner'] = 'ARIMA'
score_diff = metrics['arima_score'] - metrics['garch_score']
else:
metrics['winner'] = 'ARIMA-GARCH'
score_diff = metrics['garch_score'] - metrics['arima_score']
# Calculate confidence (0.5 = tie, 1.0 = clear winner)
max_possible_diff = 3.8 # Maximum possible score difference
metrics['confidence'] = 0.5 + (score_diff / max_possible_diff) * 0.5
return metrics
def _print_comparison_results(self, ticker: str, metrics: Dict, arima_results: Dict, garch_results: Dict):
"""Print detailed comparison results."""
print(f"\n📋 COMPARISON RESULTS FOR {ticker}:")
print("-" * 40)
# Model scores
print(f"🏆 Winner: {metrics['winner']} (Confidence: {metrics['confidence']:.1%})")
print(f"📊 Scores: ARIMA={metrics['arima_score']:.1f}, GARCH={metrics['garch_score']:.1f}")
# Key metrics comparison
print(f"\n📈 Forecast Comparison:")
print(f" ARIMA: {arima_results['expected_change']:+.2%}")
print(f" GARCH: {garch_results['expected_change']:+.2%}")
print(f"\n🎯 Model Quality:")
print(f" ARIMA AIC: {arima_results['aic_proxy']:.1f}")
print(f" GARCH AIC: {garch_results['aic_proxy']:.1f}")
print(f" ARIMA Res: {arima_results['residual_std']:.4f}")
print(f" GARCH Res: {garch_results['residual_std']:.4f}")
# Decision factors
print(f"\n🧠 Decision Factors:")
for factor, reason in metrics['decision_factors'].items():
print(f" {factor}: {reason}")
class SmartTradingSystem:
"""Enhanced trading system with dual model comparison for crypto stocks."""
def __init__(self, config_path: str = "config/trading_config.json",
extreme_volatility_config: str = "config/extreme_volatility.json",
use_volatility_config: bool = True, force_recalculate: bool = False):
self.config = self._load_config(config_path)
self.volatility_classifier = VolatilityClassifier(extreme_volatility_config, use_volatility_config)
self.model_comparator = ModelComparator(force_recalculate)
self.analyzed_stocks = {}
self.use_volatility_config = use_volatility_config
self.force_recalculate = force_recalculate
print("🧠 DUAL MODEL COMPARISON MODE: Testing both models for crypto stocks with moderate volatility")
# Show mode-specific info
if use_volatility_config:
crypto_keywords = self.volatility_classifier.list_crypto_keywords()
if crypto_keywords:
print(f"🪙 Crypto keywords loaded: {', '.join(crypto_keywords[:10])}")
if len(crypto_keywords) > 10:
print(f" ... and {len(crypto_keywords) - 10} more")
else:
print("📊 Pure volatility mode: All stocks classified by volatility thresholds only")
# Check model availability
self.arima_available = ARIMA_AVAILABLE
self.garch_available = ARIMA_GARCH_AVAILABLE and check_dependencies()
self.hybrid_available = HYBRID_AVAILABLE
if not self.arima_available:
print("❌ ARIMA model not available - system cannot function")
return
if not self.garch_available:
print("⚠️ ARIMA-GARCH not available - cannot perform dual comparison")
if not self.hybrid_available:
print("⚠️ XGBoost hybrid model not available")
# Trading parameters
trading_params = self.config.get("trading_parameters", {})
self.buy_threshold = trading_params.get('buy_threshold', 0.02)
self.sell_threshold = trading_params.get('sell_threshold', -0.02)
self.confidence_threshold = trading_params.get('confidence_threshold', 0.6)
def _load_config(self, config_path: str) -> Dict[str, Any]:
"""Load configuration from file."""
try:
with open(config_path, 'r') as f:
return json.load(f)
except Exception as e:
print(f"❌ Error loading config: {e}")
return {"tickers": {"default": ["AAPL", "MSFT"]}}
def classify_portfolio_volatility(self, tickers: List[str]) -> Dict[str, Dict]:
"""Classify volatility and identify stocks needing dual comparison."""
mode_str = "CONFIG" if self.use_volatility_config else "PURE VOLATILITY"
print(f"🔍 Classifying volatility for {len(tickers)} stocks ({mode_str} mode)...")
classifications = {}
arima_stocks = []
garch_stocks = []
dual_comparison_stocks = []
for i, ticker in enumerate(tickers, 1):
print(f"[{i:2d}/{len(tickers)}] Analyzing {ticker}...", end=" ")
classification = self.volatility_classifier.classify_stock(ticker)
classifications[ticker] = classification
if "error" in classification:
print(f"❌ Error")
continue
vol_class = classification["volatility_class"]
daily_vol = classification["daily_volatility"]
is_crypto = classification["is_crypto"]
# Determine if needs dual comparison
if self.use_volatility_config:
# Config mode: dual comparison for crypto + moderate
needs_dual_comparison = (is_crypto and vol_class == "MODERATE" and
self.arima_available and self.garch_available)
else:
# Pure volatility mode: no dual comparison (crypto override disabled)
needs_dual_comparison = False
if needs_dual_comparison:
dual_comparison_stocks.append(ticker)
icon = "⚖️"
model_str = "DUAL-COMPARISON"
elif vol_class in ["HIGH", "EXTREME"]:
garch_stocks.append(ticker)
icon = "🔴" if vol_class == "EXTREME" else "🟡"
model_str = "ARIMA-GARCH"
else:
arima_stocks.append(ticker)
icon = "🟢"
model_str = "ARIMA"
# Show crypto status only in config mode
crypto_status = " (CRYPTO)" if is_crypto and self.use_volatility_config else ""
print(f"{icon} {vol_class} ({daily_vol:.1%}){crypto_status} → {model_str}")
# Summary
print(f"\n📊 VOLATILITY CLASSIFICATION SUMMARY ({mode_str} MODE):")
print(f" 🟢 ARIMA suitable: {len(arima_stocks)} stocks")
if arima_stocks:
print(f" {', '.join(arima_stocks)}")
print(f" 🟡🔴 ARIMA-GARCH needed: {len(garch_stocks)} stocks")
if garch_stocks:
print(f" {', '.join(garch_stocks)}")
if dual_comparison_stocks:
print(f" ⚖️ Dual comparison needed: {len(dual_comparison_stocks)} stocks")
print(f" {', '.join(dual_comparison_stocks)}")
return {
"classifications": classifications,
"arima_stocks": arima_stocks,
"garch_stocks": garch_stocks,
"dual_comparison_stocks": dual_comparison_stocks
}
def analyze_stock_with_smart_model(self, ticker: str) -> Optional[Dict[str, Any]]:
"""Analyze stock using smart model selection with dual comparison for crypto moderate."""
if not self.arima_available:
print(f"❌ {ticker}: ARIMA model not available")
return None
# Get volatility classification
if ticker not in self.analyzed_stocks:
classification = self.volatility_classifier.classify_stock(ticker)
self.analyzed_stocks[ticker] = classification
else:
classification = self.analyzed_stocks[ticker]
if "error" in classification:
print(f"❌ {ticker}: Classification failed - {classification['error']}")
return None
vol_class = classification["volatility_class"]
is_crypto = classification["is_crypto"]
# Check if this stock needs dual comparison (only in config mode)
if self.use_volatility_config:
needs_dual_comparison = (is_crypto and vol_class == "MODERATE" and
self.arima_available and self.garch_available)
else:
needs_dual_comparison = False
if needs_dual_comparison:
return self._analyze_with_dual_comparison(ticker, classification)
else:
return self._analyze_with_single_model(ticker, classification)
def _analyze_with_dual_comparison(self, ticker: str, classification: Dict) -> Optional[Dict[str, Any]]:
"""Analyze crypto stock with moderate volatility using dual model comparison."""
# Run dual comparison
comparison_result = self.model_comparator.compare_models_for_crypto_moderate(ticker)
if comparison_result['error']:
print(f"❌ {ticker}: Dual comparison failed - {comparison_result['error']}")
return None
best_model = comparison_result['best_model']
confidence_in_choice = comparison_result['confidence_in_choice']
# Use the best model for final forecast
try:
if best_model == 'ARIMA-GARCH':
model_results = comparison_result['garch_results']
# Use the actual model type that was tested
winning_model_type = model_results['model_type']
model_type = f"{winning_model_type} (won dual test, {confidence_in_choice:.1%} confidence)"
else:
model_results = comparison_result['arima_results']
# Use the actual model type that was tested
winning_model_type = model_results['model_type']
model_type = f"{winning_model_type} (won dual test, {confidence_in_choice:.1%} confidence)"
forecast_price = model_results['forecast_price']
last_price = model_results['last_price']
expected_change = model_results['expected_change']
volatility = model_results['volatility']
# Generate trading signal
if expected_change > self.buy_threshold:
signal = "BUY"
emoji = "🟢"
elif expected_change < self.sell_threshold:
signal = "SELL"
emoji = "🔴"
else:
signal = "HOLD"
emoji = "🟡"
# Calculate confidence (combine forecast confidence with model choice confidence)
forecast_confidence = min(0.9, max(0.1, abs(expected_change) / (volatility + 0.01)))
combined_confidence = (forecast_confidence + confidence_in_choice) / 2.0
result = {
'ticker': ticker,
'last_price': last_price,
'forecast_price': forecast_price,
'expected_change': expected_change,
'volatility': volatility,
'confidence': combined_confidence,
'signal': signal,
'emoji': emoji,
'model_used': model_type,
'volatility_class': classification["volatility_class"],
'dual_comparison_performed': True,
'winning_model': best_model,
'model_choice_confidence': confidence_in_choice,
'comparison_details': comparison_result,
'timestamp': datetime.now(),
'analysis_time': datetime.now().isoformat()
}
print(f"✅ ⚖️ {ticker}: {signal} | Change: {expected_change:+.2%} | {model_type}")
print(f" Comparison winner: {best_model} (confidence: {confidence_in_choice:.1%})")
return result
except Exception as e:
print(f"❌ {ticker} final analysis failed: {e}")
return None
def _analyze_with_single_model(self, ticker: str, classification: Dict) -> Optional[Dict[str, Any]]:
"""Analyze stock using standard single model approach with ARIMA-GARCH+XGBoost enhancement."""
vol_class = classification["volatility_class"]
is_crypto = classification["is_crypto"]
# Determine model based on mode
if self.use_volatility_config:
# Config mode: crypto override applies
if vol_class in ["HIGH", "EXTREME"] or is_crypto:
model_type = "ARIMA-GARCH" if self.garch_available else "ARIMA"
else:
model_type = "ARIMA"
else:
# Pure volatility mode: only volatility matters
if vol_class in ["HIGH", "EXTREME"]:
model_type = "ARIMA-GARCH" if self.garch_available else "ARIMA"
else:
model_type = "ARIMA"
try:
if model_type == "ARIMA-GARCH" and self.garch_available:
if self.use_volatility_config and is_crypto:
icon = "🪙"
status = f"CRYPTO {vol_class} volatility"
else:
icon = "🔧"
status = f"{vol_class} volatility"
print(f"{icon} {ticker}: Using ARIMA-GARCH model ({status})")
# Get ARIMA-GARCH forecast
forecast_price, forecast_details, df = get_arima_garch_forecast(ticker)
base_forecast = forecast_price
# APPLY XGBOOST ENHANCEMENT TO ARIMA-GARCH
if self.hybrid_available:
try:
# Get residuals from ARIMA-GARCH
if 'residuals' in forecast_details:
residuals = forecast_details['residuals']
else:
# Fallback: use returns as residuals proxy
returns = df['Close'].pct_change().dropna()
residuals = returns
# Apply XGBoost enhancement
xgb_model, latest_features = train_xgboost_on_residuals(residuals, df)
residual_correction = float(xgb_model.predict(latest_features)[0])
forecast_price = base_forecast + residual_correction
model_used = "ARIMA-GARCH+XGBoost"
except Exception as e:
# If XGBoost enhancement fails, use base ARIMA-GARCH
model_used = "ARIMA-GARCH"
else:
model_used = "ARIMA-GARCH"
last_price = float(df['Close'].iloc[-1])
expected_change = (forecast_price - last_price) / last_price
volatility = forecast_details['forecast_result']['volatility_forecast'] / 100
else:
# ARIMA model path
if model_type == "ARIMA" and vol_class in ["HIGH", "EXTREME"]:
print(f"⚠️ {ticker}: ARIMA-GARCH not available, using ARIMA fallback")
else:
print(f"📊 {ticker}: Using ARIMA model ({vol_class} volatility)")
forecast_price, residuals, df = get_arima_forecast(ticker,
use_optimized_params=not self.force_recalculate,
force_recalculate=self.force_recalculate)
# Apply XGBoost enhancement to ARIMA
if self.hybrid_available:
xgb_model, latest_features = train_xgboost_on_residuals(residuals, df)
residual_correction = float(xgb_model.predict(latest_features)[0])
forecast_price = float(forecast_price) + residual_correction
model_used = "ARIMA+XGBoost"
else:
forecast_price = float(forecast_price)
model_used = "ARIMA"
last_price = float(df['Close'].iloc[-1])
expected_change = (forecast_price - last_price) / last_price
returns = df['Close'].pct_change().dropna()
volatility = float(returns.rolling(window=20).std().iloc[-1])
# Generate trading signal
if expected_change > self.buy_threshold:
signal = "BUY"
emoji = "🟢"
elif expected_change < self.sell_threshold:
signal = "SELL"
emoji = "🔴"
else:
signal = "HOLD"
emoji = "🟡"
confidence = min(0.9, max(0.1, abs(expected_change) / (volatility + 0.01)))
result = {
'ticker': ticker,
'last_price': last_price,
'forecast_price': forecast_price,
'expected_change': expected_change,
'volatility': volatility,
'confidence': confidence,
'signal': signal,
'emoji': emoji,
'model_used': model_used,
'volatility_class': vol_class,
'dual_comparison_performed': False,
'mode': classification.get('mode', 'unknown'),
'timestamp': datetime.now(),
'analysis_time': datetime.now().isoformat()
}
print(f"✅ {emoji} {ticker}: {signal} | Change: {expected_change:+.2%} | Model: {model_used}")
return result
except Exception as e:
print(f"❌ {ticker} analysis failed: {e}")
return None
def analyze_portfolio_with_smart_models(self, portfolio_name: str) -> List[Dict[str, Any]]:
"""Analyze portfolio using smart model selection with dual comparison."""
if portfolio_name == "all":
tickers = self.get_all_unique_stocks()
else:
tickers = self.config["tickers"].get(portfolio_name, [])
if not tickers:
print(f"❌ Portfolio '{portfolio_name}' not found in config")
return []
mode_str = "CONFIG" if self.use_volatility_config else "PURE VOLATILITY"
print(f"\n🧠 DUAL-MODEL SMART PORTFOLIO ANALYSIS: {portfolio_name} ({mode_str} MODE)")
print("=" * 70)
# Step 1: Classify all stocks
volatility_analysis = self.classify_portfolio_volatility(tickers)
print(f"\n🔬 ANALYZING STOCKS WITH APPROPRIATE MODELS:")
print("=" * 70)
# Step 2: Analyze each stock
results = []
for ticker in tickers:
result = self.analyze_stock_with_smart_model(ticker)
if result:
results.append(result)
# Step 3: Enhanced portfolio summary
if results:
self._print_enhanced_portfolio_summary(results, portfolio_name, volatility_analysis)
return results
def _print_enhanced_portfolio_summary(self, results: List[Dict], portfolio_name: str, volatility_analysis: Dict):
"""Print enhanced portfolio summary including dual comparison results."""
mode_str = "CONFIG" if self.use_volatility_config else "PURE VOLATILITY"
print(f"\n📊 {portfolio_name.upper()} PORTFOLIO SUMMARY ({mode_str} MODE)")
print("=" * 80)
# Dual comparison summary (only in config mode)
if self.use_volatility_config:
dual_comparison_results = [r for r in results if r.get('dual_comparison_performed', False)]
if dual_comparison_results:
print(f"\n⚖️ DUAL COMPARISON RESULTS:")
for result in dual_comparison_results:
ticker = result['ticker']
winner = result['winning_model']
confidence = result['model_choice_confidence']
print(f" {ticker}: {winner} won (confidence: {confidence:.1%})")
# Model usage summary
model_counts = {}
for result in results:
model = result['model_used'].split(' (')[0] # Strip confidence info
model_counts[model] = model_counts.get(model, 0) + 1
print(f"\n📈 Model Usage:")
for model, count in model_counts.items():
print(f" {model}: {count} stocks")
# Performance by model type
for model in model_counts.keys():
model_results = [r for r in results if r['model_used'].startswith(model)]
if model_results:
avg_change = np.mean([r['expected_change'] for r in model_results])
signals = [r['signal'] for r in model_results]
buy_count = signals.count('BUY')
sell_count = signals.count('SELL')
hold_count = signals.count('HOLD')
print(f" 📊 {model} average expected change: {avg_change:+.2%}")
print(f" Signals: {buy_count} BUY, {sell_count} SELL, {hold_count} HOLD")
# Volatility distribution
vol_classes = [r['volatility_class'] for r in results]
print(f"\n🎯 Volatility Distribution:")
for vol_class in ['MODERATE', 'HIGH', 'EXTREME']:
count = vol_classes.count(vol_class)
if count > 0:
percentage = count / len(results) * 100
print(f" {vol_class}: {count} stocks ({percentage:.1f}%)")
# Trading signals
signals = [r['signal'] for r in results]
print(f"\n📈 Trading Signals:")
print(f" 🟢 BUY: {signals.count('BUY')} stocks")
print(f" 🔴 SELL: {signals.count('SELL')} stocks")
print(f" 🟡 HOLD: {signals.count('HOLD')} stocks")
# Top recommendations
buy_signals = [r for r in results if r['signal'] == 'BUY']
if buy_signals:
buy_signals.sort(key=lambda x: x['confidence'], reverse=True)
print(f"\n🏆 TOP BUY RECOMMENDATIONS:")
for i, rec in enumerate(buy_signals[:5], 1):
model_info = rec['model_used'].split(' (')[0]
dual_info = " [DUAL-TESTED]" if rec.get('dual_comparison_performed', False) else ""
print(f" {i}. {rec['ticker']}: {rec['expected_change']:+.2%} ({rec['confidence']:.1%}) [{model_info}]{dual_info}")
sell_signals = [r for r in results if r['signal'] == 'SELL']
if sell_signals:
sell_signals.sort(key=lambda x: x['confidence'], reverse=True)
print(f"\n⚠️ TOP SELL RECOMMENDATIONS:")
for i, rec in enumerate(sell_signals[:3], 1):
model_info = rec['model_used'].split(' (')[0]
dual_info = " [DUAL-TESTED]" if rec.get('dual_comparison_performed', False) else ""
print(f" {i}. {rec['ticker']}: {rec['expected_change']:+.2%} ({rec['confidence']:.1%}) [{model_info}]{dual_info}")
def get_all_unique_stocks(self) -> List[str]:
"""Get all unique stocks from all portfolios."""
all_stocks = set()
for portfolio_name, tickers in self.config.get("tickers", {}).items():
if isinstance(tickers, list):
all_stocks.update(tickers)
# Remove metadata entries
all_stocks = {stock for stock in all_stocks if not str(stock).startswith('//')}
return sorted(list(all_stocks))
def parse_arguments():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(
description='Smart Trading System with Dual Model Comparison',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python enhanced_main.py lenny_golub # Config mode (default)
python enhanced_main.py lenny_golub --pure-volatility # Pure volatility mode
python enhanced_main.py lenny_golub --force-recalculate # Force fresh parameter optimization
python enhanced_main.py --list-crypto # Show current crypto keywords
"""
)
parser.add_argument('portfolios', nargs='*',
help='Portfolio names to analyze or compare (use "all" for all stocks)')
parser.add_argument('--pure-volatility', action='store_true', default=False,
help='Use pure volatility classification (ignore crypto config)')
parser.add_argument('--force-recalculate', action='store_true', default=False,
help='Force fresh parameter optimization for all models (ignore cache)')
parser.add_argument('--clear-cache', action='store_true', default=False,
help='Clear all cached model parameters before analysis')
parser.add_argument('--list-crypto', action='store_true',
help='List current crypto keywords')
parser.add_argument('--save', action='store_true', default=True,
help='Save results to CSV (default: True)')
parser.add_argument('--no-save', dest='save', action='store_false',
help='Do not save results to CSV')
return parser.parse_args()
def main():
"""Enhanced main function with config management."""
args = parse_arguments()
if args.clear_cache:
print("🗑️ Clearing all cached model parameters...")
from models.arima_model import clear_all_cache
cleared = clear_all_cache()
print(f"✅ Cleared {cleared} cache files")
if not args.portfolios:
return # Just clear cache and exit
# Handle config management commands (only work in config mode)
if args.list_crypto:
if args.pure_volatility:
print("📊 Pure volatility mode: No crypto keywords used")
return
system = SmartTradingSystem(use_volatility_config=True)
keywords = system.volatility_classifier.list_crypto_keywords()
print(f"🪙 Current crypto keywords ({len(keywords)}):")
for i, keyword in enumerate(keywords, 1):
print(f" {i:2d}. {keyword}")
return
# Regular analysis with mode selection
use_config = not args.pure_volatility
system = SmartTradingSystem(use_volatility_config=use_config, force_recalculate=args.force_recalculate)
if not args.portfolios:
# Show help
mode_desc = "CONFIG" if use_config else "PURE VOLATILITY"
print(f"🧠 DUAL-MODEL SMART TRADING SYSTEM ({mode_desc} MODE)")
print("=" * 70)
if use_config:
print("⚖️ Automatically tests both ARIMA and ARIMA-GARCH for crypto stocks with moderate volatility")
else:
print("📊 Classifies all stocks purely by volatility thresholds (no crypto overrides)")
if args.force_recalculate:
print("🔄 FORCE RECALCULATION MODE: All model parameters will be optimized fresh")
print("🏆 Chooses the best performing model based on statistical comparison")
available_portfolios = list(system.config.get("tickers", {}).keys())
print(f"\n📊 Available portfolios ({len(available_portfolios)}):")
for i, portfolio in enumerate(available_portfolios, 1):
tickers = system.config["tickers"][portfolio]
print(f" {i:2d}. {portfolio:<20} ({len(tickers)} stocks)")
print("\n📖 Usage Examples:")
print(" python enhanced_main.py lenny_golub # Config mode")
print(" python enhanced_main.py lenny_golub --pure-volatility # Pure volatility mode")
print(" python enhanced_main.py lenny_golub --force-recalculate # Force fresh optimization")
return
# Single portfolio analysis
portfolio_name = args.portfolios[0]
results = system.analyze_portfolio_with_smart_models(portfolio_name)
if args.save and results:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
mode_suffix = "_pure_vol" if args.pure_volatility else "_config"
if args.force_recalculate:
mode_suffix += "_fresh"
filename = f"results/dual_model_trading_{portfolio_name}{mode_suffix}_{timestamp}.csv"
df_results = pd.DataFrame(results)
Path(filename).parent.mkdir(parents=True, exist_ok=True)
df_results.to_csv(filename, index=False)
print(f"\n📁 Results saved to: {filename}")
if __name__ == "__main__":
main()