Skip to main content
Glama

MCP Hybrid Forecasting

by j1c4b
volatile_stock_detection.py19.6 kB
# volatile_stock_detection.py - Detect and handle volatile stocks in portfolios import yfinance as yf import pandas as pd import numpy as np import json from pathlib import Path from typing import Dict, List, Tuple from datetime import datetime import warnings warnings.filterwarnings('ignore') class VolatileStockDetector: """Detect volatile stocks and suggest appropriate modeling approaches.""" def __init__(self): # Known volatile/crypto tickers self.crypto_keywords = [ 'BTC', 'ETH', 'COIN', 'GBTC', 'MSTR', 'HOOD', 'SQ', 'PYPL', 'DOGE', 'ADA', 'DOT', 'LINK', 'UNI', 'AAVE', 'SUSHI' ] # Volatility thresholds self.high_volatility_threshold = 0.03 # 3% daily volatility self.extreme_volatility_threshold = 0.05 # 5% daily volatility def analyze_stock_volatility(self, ticker: str, period: str = "1y") -> Dict: """Analyze a single stock's volatility characteristics.""" try: # Download data data = yf.download(ticker, period=period, progress=False) if data.empty: return {"ticker": ticker, "error": "No data available"} close = data['Close'].dropna() # Calculate returns and volatility metrics returns = close.pct_change().dropna() # Volatility metrics daily_vol = float(returns.std()) annualized_vol = daily_vol * np.sqrt(252) rolling_vol = returns.rolling(window=30).std() vol_of_vol = float(rolling_vol.std()) # Volatility of volatility # Price movement metrics max_1day_change = float(returns.abs().max()) avg_abs_return = float(returns.abs().mean()) # Trend analysis price_change_total = float((close.iloc[-1] - close.iloc[0]) / close.iloc[0]) # Classify volatility if daily_vol > self.extreme_volatility_threshold: volatility_class = "EXTREME" elif daily_vol > self.high_volatility_threshold: volatility_class = "HIGH" else: volatility_class = "MODERATE" # Check if crypto/volatile stock is_crypto_related = any(keyword in ticker.upper() for keyword in self.crypto_keywords) # Recommend modeling approach recommendations = self._get_modeling_recommendations( daily_vol, volatility_class, is_crypto_related, ticker ) return { "ticker": ticker, "daily_volatility": daily_vol, "annualized_volatility": annualized_vol, "volatility_class": volatility_class, "max_1day_change": max_1day_change, "avg_abs_return": avg_abs_return, "vol_of_vol": vol_of_vol, "price_change_total": price_change_total, "is_crypto_related": is_crypto_related, "data_points": len(close), "recommendations": recommendations, "arima_suitability": self._assess_arima_suitability(daily_vol, is_crypto_related) } except Exception as e: return {"ticker": ticker, "error": str(e)} def _get_modeling_recommendations(self, daily_vol: float, volatility_class: str, is_crypto: bool, ticker: str) -> List[str]: """Get modeling recommendations based on volatility analysis.""" recommendations = [] if volatility_class == "EXTREME": recommendations.extend([ "❌ ARIMA alone is NOT suitable for this level of volatility", "✅ Use ARIMA-GARCH hybrid models", "✅ Consider LSTM or other ML approaches", "⚠️ Expect high forecast uncertainty" ]) elif volatility_class == "HIGH": recommendations.extend([ "⚠️ ARIMA may struggle with this volatility", "✅ Consider ARIMA-GARCH models", "✅ Use shorter forecast horizons", "✅ Validate results carefully" ]) else: recommendations.extend([ "✅ ARIMA may be suitable", "💡 Standard ARIMA optimization should work", "📊 Monitor for regime changes" ]) if is_crypto: recommendations.extend([ "🪙 Cryptocurrency detected", "📈 Consider external factors (sentiment, news)", "⏰ Use intraday data if available" ]) # Specific recommendations based on daily volatility if daily_vol > 0.08: # Very extreme recommendations.append("🚨 Consider this stock unsuitable for traditional forecasting") elif daily_vol > 0.05: # Extreme recommendations.append("🔄 Use ensemble methods combining multiple models") return recommendations[:6] # Limit to top 6 def _assess_arima_suitability(self, daily_vol: float, is_crypto: bool) -> Dict: """Assess how suitable ARIMA modeling is for this stock.""" if daily_vol > self.extreme_volatility_threshold: suitability = "POOR" confidence = "LOW" reason = f"Daily volatility {daily_vol:.1%} exceeds extreme threshold" elif daily_vol > self.high_volatility_threshold: suitability = "MARGINAL" confidence = "MEDIUM" reason = f"Daily volatility {daily_vol:.1%} is high for ARIMA" else: suitability = "GOOD" confidence = "HIGH" reason = f"Daily volatility {daily_vol:.1%} is manageable for ARIMA" if is_crypto: if suitability == "GOOD": suitability = "MARGINAL" confidence = "LOW" reason += " (cryptocurrency detected)" return { "suitability": suitability, "confidence": confidence, "reason": reason } def analyze_portfolio(self, tickers: List[str]) -> Dict: """Analyze volatility for an entire portfolio.""" print(f"🔍 Analyzing volatility for {len(tickers)} stocks...") results = {} for i, ticker in enumerate(tickers, 1): print(f"[{i:2d}/{len(tickers)}] Analyzing {ticker}...") result = self.analyze_stock_volatility(ticker) results[ticker] = result # Portfolio summary summary = self._create_portfolio_summary(results) return { "individual_results": results, "portfolio_summary": summary, "analysis_date": datetime.now().isoformat() } def _create_portfolio_summary(self, results: Dict) -> Dict: """Create summary statistics for the portfolio.""" valid_results = {k: v for k, v in results.items() if "error" not in v} if not valid_results: return {"error": "No valid results to summarize"} # Volatility distribution volatilities = [r["daily_volatility"] for r in valid_results.values()] vol_classes = [r["volatility_class"] for r in valid_results.values()] arima_suitabilities = [r["arima_suitability"]["suitability"] for r in valid_results.values()] crypto_count = sum(1 for r in valid_results.values() if r["is_crypto_related"]) # Count by classification vol_class_counts = {cls: vol_classes.count(cls) for cls in ["MODERATE", "HIGH", "EXTREME"]} suitability_counts = {suit: arima_suitabilities.count(suit) for suit in ["GOOD", "MARGINAL", "POOR"]} # Identify problematic stocks problematic_stocks = [ ticker for ticker, result in valid_results.items() if result["arima_suitability"]["suitability"] in ["POOR", "MARGINAL"] ] return { "total_stocks": len(results), "valid_analyses": len(valid_results), "avg_daily_volatility": np.mean(volatilities), "median_daily_volatility": np.median(volatilities), "max_daily_volatility": max(volatilities), "volatility_class_distribution": vol_class_counts, "arima_suitability_distribution": suitability_counts, "crypto_related_count": crypto_count, "crypto_percentage": (crypto_count / len(valid_results)) * 100, "problematic_stocks": problematic_stocks, "arima_suitable_stocks": [ ticker for ticker, result in valid_results.items() if result["arima_suitability"]["suitability"] == "GOOD" ] } def print_portfolio_analysis(self, analysis_results: Dict): """Print formatted portfolio volatility analysis.""" individual_results = analysis_results["individual_results"] summary = analysis_results["portfolio_summary"] if "error" in summary: print(f"❌ {summary['error']}") return print(f"\n📊 PORTFOLIO VOLATILITY ANALYSIS") print("=" * 80) print(f"{'Ticker':<8} {'Daily Vol':<10} {'Class':<10} {'ARIMA Fit':<12} {'Crypto':<8} {'Recommendation'}") print("-" * 80) for ticker, result in sorted(individual_results.items()): if "error" in result: print(f"{ticker:<8} {'ERROR':<10} {'N/A':<10} {'N/A':<12} {'N/A':<8} {result['error']}") continue daily_vol = result["daily_volatility"] vol_class = result["volatility_class"] arima_suit = result["arima_suitability"]["suitability"] is_crypto = "YES" if result["is_crypto_related"] else "NO" # Color coding if vol_class == "EXTREME": class_icon = "🔴" elif vol_class == "HIGH": class_icon = "🟡" else: class_icon = "🟢" if arima_suit == "POOR": arima_icon = "❌" elif arima_suit == "MARGINAL": arima_icon = "⚠️ " else: arima_icon = "✅" # Main recommendation main_rec = result["recommendations"][0] if result["recommendations"] else "No recommendation" print(f"{ticker:<8} {daily_vol:<10.1%} {class_icon}{vol_class:<9} {arima_icon}{arima_suit:<10} {is_crypto:<8} {main_rec}") # Summary statistics print("-" * 80) print(f"\n📈 PORTFOLIO SUMMARY:") print(f" Total stocks analyzed: {summary['valid_analyses']}") print(f" Average daily volatility: {summary['avg_daily_volatility']:.1%}") print(f" Cryptocurrency-related: {summary['crypto_related_count']} ({summary['crypto_percentage']:.1f}%)") print(f"\n🎯 Volatility Distribution:") for vol_class, count in summary['volatility_class_distribution'].items(): percentage = (count / summary['valid_analyses']) * 100 print(f" {vol_class}: {count} stocks ({percentage:.1f}%)") print(f"\n🔍 ARIMA Suitability:") for suitability, count in summary['arima_suitability_distribution'].items(): percentage = (count / summary['valid_analyses']) * 100 if suitability == "GOOD": icon = "✅" elif suitability == "MARGINAL": icon = "⚠️ " else: icon = "❌" print(f" {icon} {suitability}: {count} stocks ({percentage:.1f}%)") # Recommendations if summary['problematic_stocks']: print(f"\n⚠️ PROBLEMATIC STOCKS FOR ARIMA:") for ticker in summary['problematic_stocks'][:10]: # Show top 10 result = individual_results[ticker] reason = result["arima_suitability"]["reason"] print(f" • {ticker}: {reason}") if len(summary['problematic_stocks']) > 10: print(f" ... and {len(summary['problematic_stocks']) - 10} more") print(f"\n💡 PORTFOLIO RECOMMENDATIONS:") # Overall portfolio assessment good_count = summary['arima_suitability_distribution'].get('GOOD', 0) total_count = summary['valid_analyses'] good_percentage = (good_count / total_count) * 100 if good_percentage >= 70: print(f" ✅ Portfolio is suitable for ARIMA optimization ({good_percentage:.1f}% suitable)") print(f" 📊 Proceed with standard ARIMA optimization") elif good_percentage >= 40: print(f" ⚠️ Portfolio is partially suitable for ARIMA ({good_percentage:.1f}% suitable)") print(f" 🔧 Use mixed approach: ARIMA for suitable stocks, alternatives for others") else: print(f" ❌ Portfolio is poorly suited for ARIMA ({good_percentage:.1f}% suitable)") print(f" 🚨 Consider alternative modeling approaches") # Specific recommendations based on portfolio composition crypto_percentage = summary['crypto_percentage'] if crypto_percentage > 50: print(f" 🪙 Portfolio is {crypto_percentage:.1f}% cryptocurrency-related") print(f" • Consider ARIMA-GARCH hybrid models") print(f" • Use shorter forecast horizons") print(f" • Include external sentiment data") extreme_count = summary['volatility_class_distribution'].get('EXTREME', 0) if extreme_count > 0: print(f" 🔴 {extreme_count} stocks have extreme volatility") print(f" • Consider excluding from ARIMA optimization") print(f" • Use volatility-specific models (GARCH family)") # List stocks suitable for ARIMA suitable_stocks = summary['arima_suitable_stocks'] if suitable_stocks: print(f"\n✅ STOCKS SUITABLE FOR ARIMA OPTIMIZATION ({len(suitable_stocks)} stocks):") # Group by 8 per line for i in range(0, len(suitable_stocks), 8): group = suitable_stocks[i:i+8] print(f" {', '.join(group)}") if not suitable_stocks: print(f"\n❌ NO STOCKS SUITABLE FOR ARIMA OPTIMIZATION") print(f" Consider alternative modeling approaches for this portfolio") def load_portfolio_config(config_path: str = "config/trading_config.json") -> Dict: """Load portfolio configuration.""" try: with open(config_path, 'r') as f: return json.load(f) except Exception as e: print(f"❌ Error loading config: {e}") return {"tickers": {"default": ["AAPL", "MSFT", "GOOGL"]}} def main(): """Main function for volatility analysis.""" import argparse parser = argparse.ArgumentParser( description="Analyze portfolio volatility and ARIMA suitability", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python volatile_stock_detection.py # Show available portfolios python volatile_stock_detection.py --portfolio large_cap # Analyze specific portfolio python volatile_stock_detection.py --tickers AAPL MSFT COIN # Analyze specific stocks python volatile_stock_detection.py --all # Analyze all stocks """ ) parser.add_argument('--portfolio', type=str, help='Specific portfolio name from config') parser.add_argument('--all', action='store_true', help='Analyze all stocks across all portfolios') parser.add_argument('--tickers', nargs='+', help='Specific tickers to analyze') parser.add_argument('--save', action='store_true', help='Save results to JSON file') args = parser.parse_args() # Load configuration config = load_portfolio_config() detector = VolatileStockDetector() # Determine tickers to analyze tickers = [] analysis_description = "" if args.tickers: tickers = args.tickers analysis_description = f"specified tickers: {', '.join(tickers)}" elif args.all: # Get all unique tickers all_tickers = set() for portfolio_name, portfolio_tickers in config.get("tickers", {}).items(): if isinstance(portfolio_tickers, list): all_tickers.update(portfolio_tickers) tickers = sorted(list(all_tickers)) analysis_description = f"all stocks across all portfolios ({len(tickers)} stocks)" elif args.portfolio: portfolio_tickers = config.get("tickers", {}).get(args.portfolio, []) if not portfolio_tickers: print(f"❌ Portfolio '{args.portfolio}' not found in config") print(f"\n📊 Available portfolios:") for name in config.get("tickers", {}).keys(): print(f" - {name}") return tickers = portfolio_tickers analysis_description = f"'{args.portfolio}' portfolio ({len(tickers)} stocks)" else: # Show available portfolios print("🔍 VOLATILITY ANALYSIS TOOL") print("=" * 50) print("Analyze portfolio volatility and ARIMA suitability\n") print("📊 Available portfolios:") for name, portfolio_tickers in config.get("tickers", {}).items(): if isinstance(portfolio_tickers, list): print(f" • {name:<20} ({len(portfolio_tickers):2d} stocks)") print(f"\n💡 Usage examples:") print(f" python volatile_stock_detection.py --portfolio large_cap") print(f" python volatile_stock_detection.py --tickers COIN GBTC MSTR") print(f" python volatile_stock_detection.py --all") return if not tickers: print("❌ No tickers to analyze") return # Run analysis print(f"🔍 Analyzing volatility for {analysis_description}") print(f"📊 This will help determine ARIMA optimization suitability\n") analysis_results = detector.analyze_portfolio(tickers) detector.print_portfolio_analysis(analysis_results) # Save results if requested if args.save: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") if args.portfolio: filename = f"volatility_analysis_{args.portfolio}_{timestamp}.json" elif args.all: filename = f"volatility_analysis_all_stocks_{timestamp}.json" else: filename = f"volatility_analysis_custom_{timestamp}.json" output_dir = Path("arima_optimization") output_dir.mkdir(exist_ok=True) filepath = output_dir / filename with open(filepath, 'w') as f: json.dump(analysis_results, f, indent=2) print(f"\n📁 Analysis results saved to: {filepath}") if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/j1c4b/mcp-hybrid-forecasting'

If you have feedback or need assistance with the MCP directory API, please join our Discord server