volatile_stock_detection.py•19.6 kB
# volatile_stock_detection.py - Detect and handle volatile stocks in portfolios
import yfinance as yf
import pandas as pd
import numpy as np
import json
from pathlib import Path
from typing import Dict, List, Tuple
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')
class VolatileStockDetector:
"""Detect volatile stocks and suggest appropriate modeling approaches."""
def __init__(self):
# Known volatile/crypto tickers
self.crypto_keywords = [
'BTC', 'ETH', 'COIN', 'GBTC', 'MSTR', 'HOOD', 'SQ', 'PYPL',
'DOGE', 'ADA', 'DOT', 'LINK', 'UNI', 'AAVE', 'SUSHI'
]
# Volatility thresholds
self.high_volatility_threshold = 0.03 # 3% daily volatility
self.extreme_volatility_threshold = 0.05 # 5% daily volatility
def analyze_stock_volatility(self, ticker: str, period: str = "1y") -> Dict:
"""Analyze a single stock's volatility characteristics."""
try:
# Download data
data = yf.download(ticker, period=period, progress=False)
if data.empty:
return {"ticker": ticker, "error": "No data available"}
close = data['Close'].dropna()
# Calculate returns and volatility metrics
returns = close.pct_change().dropna()
# Volatility metrics
daily_vol = float(returns.std())
annualized_vol = daily_vol * np.sqrt(252)
rolling_vol = returns.rolling(window=30).std()
vol_of_vol = float(rolling_vol.std()) # Volatility of volatility
# Price movement metrics
max_1day_change = float(returns.abs().max())
avg_abs_return = float(returns.abs().mean())
# Trend analysis
price_change_total = float((close.iloc[-1] - close.iloc[0]) / close.iloc[0])
# Classify volatility
if daily_vol > self.extreme_volatility_threshold:
volatility_class = "EXTREME"
elif daily_vol > self.high_volatility_threshold:
volatility_class = "HIGH"
else:
volatility_class = "MODERATE"
# Check if crypto/volatile stock
is_crypto_related = any(keyword in ticker.upper() for keyword in self.crypto_keywords)
# Recommend modeling approach
recommendations = self._get_modeling_recommendations(
daily_vol, volatility_class, is_crypto_related, ticker
)
return {
"ticker": ticker,
"daily_volatility": daily_vol,
"annualized_volatility": annualized_vol,
"volatility_class": volatility_class,
"max_1day_change": max_1day_change,
"avg_abs_return": avg_abs_return,
"vol_of_vol": vol_of_vol,
"price_change_total": price_change_total,
"is_crypto_related": is_crypto_related,
"data_points": len(close),
"recommendations": recommendations,
"arima_suitability": self._assess_arima_suitability(daily_vol, is_crypto_related)
}
except Exception as e:
return {"ticker": ticker, "error": str(e)}
def _get_modeling_recommendations(self, daily_vol: float, volatility_class: str,
is_crypto: bool, ticker: str) -> List[str]:
"""Get modeling recommendations based on volatility analysis."""
recommendations = []
if volatility_class == "EXTREME":
recommendations.extend([
"❌ ARIMA alone is NOT suitable for this level of volatility",
"✅ Use ARIMA-GARCH hybrid models",
"✅ Consider LSTM or other ML approaches",
"⚠️ Expect high forecast uncertainty"
])
elif volatility_class == "HIGH":
recommendations.extend([
"⚠️ ARIMA may struggle with this volatility",
"✅ Consider ARIMA-GARCH models",
"✅ Use shorter forecast horizons",
"✅ Validate results carefully"
])
else:
recommendations.extend([
"✅ ARIMA may be suitable",
"💡 Standard ARIMA optimization should work",
"📊 Monitor for regime changes"
])
if is_crypto:
recommendations.extend([
"🪙 Cryptocurrency detected",
"📈 Consider external factors (sentiment, news)",
"⏰ Use intraday data if available"
])
# Specific recommendations based on daily volatility
if daily_vol > 0.08: # Very extreme
recommendations.append("🚨 Consider this stock unsuitable for traditional forecasting")
elif daily_vol > 0.05: # Extreme
recommendations.append("🔄 Use ensemble methods combining multiple models")
return recommendations[:6] # Limit to top 6
def _assess_arima_suitability(self, daily_vol: float, is_crypto: bool) -> Dict:
"""Assess how suitable ARIMA modeling is for this stock."""
if daily_vol > self.extreme_volatility_threshold:
suitability = "POOR"
confidence = "LOW"
reason = f"Daily volatility {daily_vol:.1%} exceeds extreme threshold"
elif daily_vol > self.high_volatility_threshold:
suitability = "MARGINAL"
confidence = "MEDIUM"
reason = f"Daily volatility {daily_vol:.1%} is high for ARIMA"
else:
suitability = "GOOD"
confidence = "HIGH"
reason = f"Daily volatility {daily_vol:.1%} is manageable for ARIMA"
if is_crypto:
if suitability == "GOOD":
suitability = "MARGINAL"
confidence = "LOW"
reason += " (cryptocurrency detected)"
return {
"suitability": suitability,
"confidence": confidence,
"reason": reason
}
def analyze_portfolio(self, tickers: List[str]) -> Dict:
"""Analyze volatility for an entire portfolio."""
print(f"🔍 Analyzing volatility for {len(tickers)} stocks...")
results = {}
for i, ticker in enumerate(tickers, 1):
print(f"[{i:2d}/{len(tickers)}] Analyzing {ticker}...")
result = self.analyze_stock_volatility(ticker)
results[ticker] = result
# Portfolio summary
summary = self._create_portfolio_summary(results)
return {
"individual_results": results,
"portfolio_summary": summary,
"analysis_date": datetime.now().isoformat()
}
def _create_portfolio_summary(self, results: Dict) -> Dict:
"""Create summary statistics for the portfolio."""
valid_results = {k: v for k, v in results.items() if "error" not in v}
if not valid_results:
return {"error": "No valid results to summarize"}
# Volatility distribution
volatilities = [r["daily_volatility"] for r in valid_results.values()]
vol_classes = [r["volatility_class"] for r in valid_results.values()]
arima_suitabilities = [r["arima_suitability"]["suitability"] for r in valid_results.values()]
crypto_count = sum(1 for r in valid_results.values() if r["is_crypto_related"])
# Count by classification
vol_class_counts = {cls: vol_classes.count(cls) for cls in ["MODERATE", "HIGH", "EXTREME"]}
suitability_counts = {suit: arima_suitabilities.count(suit) for suit in ["GOOD", "MARGINAL", "POOR"]}
# Identify problematic stocks
problematic_stocks = [
ticker for ticker, result in valid_results.items()
if result["arima_suitability"]["suitability"] in ["POOR", "MARGINAL"]
]
return {
"total_stocks": len(results),
"valid_analyses": len(valid_results),
"avg_daily_volatility": np.mean(volatilities),
"median_daily_volatility": np.median(volatilities),
"max_daily_volatility": max(volatilities),
"volatility_class_distribution": vol_class_counts,
"arima_suitability_distribution": suitability_counts,
"crypto_related_count": crypto_count,
"crypto_percentage": (crypto_count / len(valid_results)) * 100,
"problematic_stocks": problematic_stocks,
"arima_suitable_stocks": [
ticker for ticker, result in valid_results.items()
if result["arima_suitability"]["suitability"] == "GOOD"
]
}
def print_portfolio_analysis(self, analysis_results: Dict):
"""Print formatted portfolio volatility analysis."""
individual_results = analysis_results["individual_results"]
summary = analysis_results["portfolio_summary"]
if "error" in summary:
print(f"❌ {summary['error']}")
return
print(f"\n📊 PORTFOLIO VOLATILITY ANALYSIS")
print("=" * 80)
print(f"{'Ticker':<8} {'Daily Vol':<10} {'Class':<10} {'ARIMA Fit':<12} {'Crypto':<8} {'Recommendation'}")
print("-" * 80)
for ticker, result in sorted(individual_results.items()):
if "error" in result:
print(f"{ticker:<8} {'ERROR':<10} {'N/A':<10} {'N/A':<12} {'N/A':<8} {result['error']}")
continue
daily_vol = result["daily_volatility"]
vol_class = result["volatility_class"]
arima_suit = result["arima_suitability"]["suitability"]
is_crypto = "YES" if result["is_crypto_related"] else "NO"
# Color coding
if vol_class == "EXTREME":
class_icon = "🔴"
elif vol_class == "HIGH":
class_icon = "🟡"
else:
class_icon = "🟢"
if arima_suit == "POOR":
arima_icon = "❌"
elif arima_suit == "MARGINAL":
arima_icon = "⚠️ "
else:
arima_icon = "✅"
# Main recommendation
main_rec = result["recommendations"][0] if result["recommendations"] else "No recommendation"
print(f"{ticker:<8} {daily_vol:<10.1%} {class_icon}{vol_class:<9} {arima_icon}{arima_suit:<10} {is_crypto:<8} {main_rec}")
# Summary statistics
print("-" * 80)
print(f"\n📈 PORTFOLIO SUMMARY:")
print(f" Total stocks analyzed: {summary['valid_analyses']}")
print(f" Average daily volatility: {summary['avg_daily_volatility']:.1%}")
print(f" Cryptocurrency-related: {summary['crypto_related_count']} ({summary['crypto_percentage']:.1f}%)")
print(f"\n🎯 Volatility Distribution:")
for vol_class, count in summary['volatility_class_distribution'].items():
percentage = (count / summary['valid_analyses']) * 100
print(f" {vol_class}: {count} stocks ({percentage:.1f}%)")
print(f"\n🔍 ARIMA Suitability:")
for suitability, count in summary['arima_suitability_distribution'].items():
percentage = (count / summary['valid_analyses']) * 100
if suitability == "GOOD":
icon = "✅"
elif suitability == "MARGINAL":
icon = "⚠️ "
else:
icon = "❌"
print(f" {icon} {suitability}: {count} stocks ({percentage:.1f}%)")
# Recommendations
if summary['problematic_stocks']:
print(f"\n⚠️ PROBLEMATIC STOCKS FOR ARIMA:")
for ticker in summary['problematic_stocks'][:10]: # Show top 10
result = individual_results[ticker]
reason = result["arima_suitability"]["reason"]
print(f" • {ticker}: {reason}")
if len(summary['problematic_stocks']) > 10:
print(f" ... and {len(summary['problematic_stocks']) - 10} more")
print(f"\n💡 PORTFOLIO RECOMMENDATIONS:")
# Overall portfolio assessment
good_count = summary['arima_suitability_distribution'].get('GOOD', 0)
total_count = summary['valid_analyses']
good_percentage = (good_count / total_count) * 100
if good_percentage >= 70:
print(f" ✅ Portfolio is suitable for ARIMA optimization ({good_percentage:.1f}% suitable)")
print(f" 📊 Proceed with standard ARIMA optimization")
elif good_percentage >= 40:
print(f" ⚠️ Portfolio is partially suitable for ARIMA ({good_percentage:.1f}% suitable)")
print(f" 🔧 Use mixed approach: ARIMA for suitable stocks, alternatives for others")
else:
print(f" ❌ Portfolio is poorly suited for ARIMA ({good_percentage:.1f}% suitable)")
print(f" 🚨 Consider alternative modeling approaches")
# Specific recommendations based on portfolio composition
crypto_percentage = summary['crypto_percentage']
if crypto_percentage > 50:
print(f" 🪙 Portfolio is {crypto_percentage:.1f}% cryptocurrency-related")
print(f" • Consider ARIMA-GARCH hybrid models")
print(f" • Use shorter forecast horizons")
print(f" • Include external sentiment data")
extreme_count = summary['volatility_class_distribution'].get('EXTREME', 0)
if extreme_count > 0:
print(f" 🔴 {extreme_count} stocks have extreme volatility")
print(f" • Consider excluding from ARIMA optimization")
print(f" • Use volatility-specific models (GARCH family)")
# List stocks suitable for ARIMA
suitable_stocks = summary['arima_suitable_stocks']
if suitable_stocks:
print(f"\n✅ STOCKS SUITABLE FOR ARIMA OPTIMIZATION ({len(suitable_stocks)} stocks):")
# Group by 8 per line
for i in range(0, len(suitable_stocks), 8):
group = suitable_stocks[i:i+8]
print(f" {', '.join(group)}")
if not suitable_stocks:
print(f"\n❌ NO STOCKS SUITABLE FOR ARIMA OPTIMIZATION")
print(f" Consider alternative modeling approaches for this portfolio")
def load_portfolio_config(config_path: str = "config/trading_config.json") -> Dict:
"""Load portfolio configuration."""
try:
with open(config_path, 'r') as f:
return json.load(f)
except Exception as e:
print(f"❌ Error loading config: {e}")
return {"tickers": {"default": ["AAPL", "MSFT", "GOOGL"]}}
def main():
"""Main function for volatility analysis."""
import argparse
parser = argparse.ArgumentParser(
description="Analyze portfolio volatility and ARIMA suitability",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python volatile_stock_detection.py # Show available portfolios
python volatile_stock_detection.py --portfolio large_cap # Analyze specific portfolio
python volatile_stock_detection.py --tickers AAPL MSFT COIN # Analyze specific stocks
python volatile_stock_detection.py --all # Analyze all stocks
"""
)
parser.add_argument('--portfolio', type=str,
help='Specific portfolio name from config')
parser.add_argument('--all', action='store_true',
help='Analyze all stocks across all portfolios')
parser.add_argument('--tickers', nargs='+',
help='Specific tickers to analyze')
parser.add_argument('--save', action='store_true',
help='Save results to JSON file')
args = parser.parse_args()
# Load configuration
config = load_portfolio_config()
detector = VolatileStockDetector()
# Determine tickers to analyze
tickers = []
analysis_description = ""
if args.tickers:
tickers = args.tickers
analysis_description = f"specified tickers: {', '.join(tickers)}"
elif args.all:
# Get all unique tickers
all_tickers = set()
for portfolio_name, portfolio_tickers in config.get("tickers", {}).items():
if isinstance(portfolio_tickers, list):
all_tickers.update(portfolio_tickers)
tickers = sorted(list(all_tickers))
analysis_description = f"all stocks across all portfolios ({len(tickers)} stocks)"
elif args.portfolio:
portfolio_tickers = config.get("tickers", {}).get(args.portfolio, [])
if not portfolio_tickers:
print(f"❌ Portfolio '{args.portfolio}' not found in config")
print(f"\n📊 Available portfolios:")
for name in config.get("tickers", {}).keys():
print(f" - {name}")
return
tickers = portfolio_tickers
analysis_description = f"'{args.portfolio}' portfolio ({len(tickers)} stocks)"
else:
# Show available portfolios
print("🔍 VOLATILITY ANALYSIS TOOL")
print("=" * 50)
print("Analyze portfolio volatility and ARIMA suitability\n")
print("📊 Available portfolios:")
for name, portfolio_tickers in config.get("tickers", {}).items():
if isinstance(portfolio_tickers, list):
print(f" • {name:<20} ({len(portfolio_tickers):2d} stocks)")
print(f"\n💡 Usage examples:")
print(f" python volatile_stock_detection.py --portfolio large_cap")
print(f" python volatile_stock_detection.py --tickers COIN GBTC MSTR")
print(f" python volatile_stock_detection.py --all")
return
if not tickers:
print("❌ No tickers to analyze")
return
# Run analysis
print(f"🔍 Analyzing volatility for {analysis_description}")
print(f"📊 This will help determine ARIMA optimization suitability\n")
analysis_results = detector.analyze_portfolio(tickers)
detector.print_portfolio_analysis(analysis_results)
# Save results if requested
if args.save:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
if args.portfolio:
filename = f"volatility_analysis_{args.portfolio}_{timestamp}.json"
elif args.all:
filename = f"volatility_analysis_all_stocks_{timestamp}.json"
else:
filename = f"volatility_analysis_custom_{timestamp}.json"
output_dir = Path("arima_optimization")
output_dir.mkdir(exist_ok=True)
filepath = output_dir / filename
with open(filepath, 'w') as f:
json.dump(analysis_results, f, indent=2)
print(f"\n📁 Analysis results saved to: {filepath}")
if __name__ == "__main__":
main()