Skip to main content
Glama

Financial Data MCP Server

by j1c4b
batch_fin_mcp_server.py25.2 kB
#!/usr/bin/env python3 """ Batch Financial MCP Server Analyzes all portfolios and generates comprehensive reports with charts """ import os import json import logging from pathlib import Path from datetime import datetime, timedelta from typing import Dict, List, Any, Optional, Tuple import pandas as pd import numpy as np import yfinance as yf import matplotlib.pyplot as plt import matplotlib.dates as mdates from dataclasses import dataclass # Configure matplotlib for headless operation import matplotlib matplotlib.use('Agg') # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) @dataclass class TrendAnalysis: """Data class for trend analysis results""" symbol: str trend_direction: str # "up" or "down" price_current: float ema_50: float ema_200: float scenario: str scenario_description: str recommendation: str confidence: float class BatchFinancialAnalyzer: def __init__(self): """Initialize the batch analyzer""" # Configuration self.project_dir = Path(__file__).parent self.portfolio_file = self.project_dir / "portfolio.json" self.batch_charts_dir = self.project_dir / "batch_financial_charts" self.batch_charts_dir.mkdir(exist_ok=True) # Analysis parameters self.analysis_period = "1y" # 1 year of data self.trend_lookback_days = 252 # Trading days in a year self.ema_touch_days = 2 # Days to check for EMA touch self.price_decrease_days = 3 # Days to check for price decrease # Results storage self.analysis_results: List[TrendAnalysis] = [] self.timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") logger.info("Batch Financial Analyzer initialized") logger.info(f"Portfolio file: {self.portfolio_file}") logger.info(f"Charts directory: {self.batch_charts_dir}") def load_portfolios(self) -> Dict[str, Any]: """Load portfolio data from JSON file""" try: with open(self.portfolio_file, 'r') as f: portfolio_data = json.load(f) logger.info(f"Loaded {len(portfolio_data)} portfolios") return portfolio_data except FileNotFoundError: logger.error(f"Portfolio file not found: {self.portfolio_file}") return {} except json.JSONDecodeError as e: logger.error(f"Error parsing portfolio JSON: {e}") return {} def get_stock_data(self, symbol: str) -> Optional[pd.DataFrame]: """Fetch stock data for analysis""" try: logger.info(f"Fetching data for {symbol}") ticker = yf.Ticker(symbol) data = ticker.history(period=self.analysis_period) if data.empty: logger.warning(f"No data available for {symbol}") return None # Calculate EMAs data['EMA_50'] = data['Close'].ewm(span=50).mean() data['EMA_200'] = data['Close'].ewm(span=200).mean() return data except Exception as e: logger.error(f"Error fetching data for {symbol}: {e}") return None def analyze_trend(self, data: pd.DataFrame) -> str: """ Analyze if the overall trend is up or down using proven technical analysis methods Based on moving average crossovers and price position relative to MAs """ if data.empty or len(data) < 200: return "unknown" current_price = data['Close'].iloc[-1] ema_50 = data['EMA_50'].iloc[-1] ema_200 = data['EMA_200'].iloc[-1] # Check if we have valid EMA values if pd.isna(ema_50) or pd.isna(ema_200): return "unknown" # Primary trend determination: Golden Cross / Death Cross # Golden Cross: 50 EMA above 200 EMA = Bullish # Death Cross: 50 EMA below 200 EMA = Bearish golden_cross = ema_50 > ema_200 # Secondary confirmation: Price position relative to EMAs price_above_50 = current_price > ema_50 price_above_200 = current_price > ema_200 # Tertiary confirmation: Recent EMA slope (last 10 days) if len(data) >= 10: ema_50_slope = data['EMA_50'].iloc[-1] - data['EMA_50'].iloc[-10] ema_200_slope = data['EMA_200'].iloc[-1] - data['EMA_200'].iloc[-10] ema_50_rising = ema_50_slope > 0 ema_200_rising = ema_200_slope > 0 else: ema_50_rising = True ema_200_rising = True # Combine all factors for trend determination bullish_signals = sum([ golden_cross, # 50 EMA > 200 EMA price_above_50, # Price > 50 EMA price_above_200, # Price > 200 EMA ema_50_rising, # 50 EMA rising ema_200_rising # 200 EMA rising ]) # Strong uptrend: 4-5 bullish signals if bullish_signals >= 4: return "up" # Strong downtrend: 0-1 bullish signals elif bullish_signals <= 1: return "down" # Mixed signals: 2-3 bullish signals - use Golden/Death Cross as tiebreaker else: return "up" if golden_cross else "down" def check_ema_touch(self, data: pd.DataFrame, ema_column: str, days: int = 2) -> bool: """Check if price touched EMA within last N days""" if len(data) < days: return False recent_data = data.tail(days) ema_values = recent_data[ema_column] low_values = recent_data['Low'] high_values = recent_data['High'] # Check if any day's range (Low to High) intersected with EMA for i, (low, high, ema) in enumerate(zip(low_values, high_values, ema_values)): if low <= ema <= high: return True return False def check_price_decrease_days(self, data: pd.DataFrame, days: int = 3) -> bool: """Check if price decreased for at least N consecutive days""" if len(data) < days: return False recent_closes = data['Close'].tail(days + 1) # +1 to compare with previous consecutive_decreases = 0 for i in range(len(recent_closes) - 1): if recent_closes.iloc[i + 1] < recent_closes.iloc[i]: consecutive_decreases += 1 else: consecutive_decreases = 0 return consecutive_decreases >= days def identify_scenario(self, data: pd.DataFrame) -> Tuple[str, str, str, float]: """Identify trading scenario and provide recommendation""" if data.empty: return "no_data", "No data available", "Hold", 0.0 current_price = data['Close'].iloc[-1] ema_50 = data['EMA_50'].iloc[-1] ema_200 = data['EMA_200'].iloc[-1] # Scenario A: Price > 10% above 50 EMA if current_price > ema_50 * 1.10: return ("scenario_a", f"Price ${current_price:.2f} is >10% above 50-day EMA ${ema_50:.2f}", "SELL - Take profits, price may be overextended", 0.8) # Scenario B: Price above 50 EMA and touched it within last 2 days elif (current_price > ema_50 and self.check_ema_touch(data, 'EMA_50', self.ema_touch_days)): return ("scenario_b", f"Price ${current_price:.2f} above 50-day EMA ${ema_50:.2f} and touched it recently", "BUY - Good entry point after pullback to support", 0.85) # Scenario C: Price > 5% below 50 EMA, decreasing 3+ days, above 200 EMA elif (current_price < ema_50 * 0.95 and current_price > ema_200 and self.check_price_decrease_days(data, self.price_decrease_days)): return ("scenario_c", f"Price ${current_price:.2f} >5% below 50-day EMA ${ema_50:.2f}, decreasing, above 200-day EMA ${ema_200:.2f}", "BUY - Potential oversold condition in uptrend", 0.75) # Scenario D: Price below 50 EMA, touched 200 EMA recently, above 200 EMA elif (current_price < ema_50 and current_price > ema_200 and self.check_ema_touch(data, 'EMA_200', self.ema_touch_days)): return ("scenario_d", f"Price ${current_price:.2f} below 50-day EMA, touched 200-day EMA ${ema_200:.2f} recently", "BUY - Potential bounce from major support", 0.70) # Default case - no clear scenario else: return ("no_clear_scenario", f"Price ${current_price:.2f} doesn't fit clear trading scenarios", "HOLD - Monitor for better entry/exit points", 0.3) def create_analysis_chart(self, symbol: str, data: pd.DataFrame, analysis: TrendAnalysis) -> str: """Create comprehensive analysis chart""" try: fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(15, 12)) fig.suptitle(f'{symbol} - Batch Analysis Report', fontsize=16, fontweight='bold') # Chart 1: Price and EMAs ax1.plot(data.index, data['Close'], label='Price', linewidth=2, color='black') ax1.plot(data.index, data['EMA_50'], label='50-day EMA', linewidth=1.5, color='blue', alpha=0.7) ax1.plot(data.index, data['EMA_200'], label='200-day EMA', linewidth=1.5, color='red', alpha=0.7) # Highlight current price current_price = data['Close'].iloc[-1] ax1.scatter(data.index[-1], current_price, color='red', s=100, zorder=5) ax1.annotate(f'${current_price:.2f}', xy=(data.index[-1], current_price), xytext=(10, 10), textcoords='offset points', bbox=dict(boxstyle='round,pad=0.3', facecolor='yellow', alpha=0.7)) ax1.set_title(f'Price Action & EMAs - {analysis.scenario_description}') ax1.set_ylabel('Price ($)') ax1.legend() ax1.grid(True, alpha=0.3) # Format x-axis ax1.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m')) ax1.xaxis.set_major_locator(mdates.MonthLocator(interval=2)) # Chart 2: Volume and additional info ax2.bar(data.index, data['Volume'], alpha=0.6, color='gray') ax2.set_title('Volume') ax2.set_ylabel('Volume') ax2.set_xlabel('Date') # Add analysis text box analysis_text = f""" ANALYSIS SUMMARY: • Symbol: {analysis.symbol} • Trend: {analysis.trend_direction.upper()} • Current Price: ${analysis.price_current:.2f} • 50-day EMA: ${analysis.ema_50:.2f} • 200-day EMA: ${analysis.ema_200:.2f} • Scenario: {analysis.scenario} • Recommendation: {analysis.recommendation} • Confidence: {analysis.confidence:.0%} """.strip() ax2.text(0.02, 0.98, analysis_text, transform=ax2.transAxes, verticalalignment='top', fontsize=10, bbox=dict(boxstyle='round,pad=0.5', facecolor='lightblue', alpha=0.8)) plt.tight_layout() # Save chart chart_filename = f"{symbol}_batch_analysis_{self.timestamp}.png" chart_path = self.batch_charts_dir / chart_filename plt.savefig(chart_path, dpi=300, bbox_inches='tight') plt.close() logger.info(f"Chart saved: {chart_path}") return str(chart_path) except Exception as e: logger.error(f"Error creating chart for {symbol}: {e}") plt.close() return "" def analyze_symbol(self, symbol: str) -> Optional[TrendAnalysis]: """Perform complete analysis on a single symbol""" logger.info(f"Analyzing {symbol}") # Get stock data data = self.get_stock_data(symbol) if data is None: return None # Analyze trend trend = self.analyze_trend(data) logger.info(f"{symbol}: Trend = {trend}") # Skip if trend is down if trend == "down": logger.info(f"{symbol}: Skipping due to downtrend") return TrendAnalysis( symbol=symbol, trend_direction=trend, price_current=data['Close'].iloc[-1], ema_50=data['EMA_50'].iloc[-1] if not data['EMA_50'].isna().all() else 0, ema_200=data['EMA_200'].iloc[-1] if not data['EMA_200'].isna().all() else 0, scenario="downtrend_skip", scenario_description="Skipped due to downtrend", recommendation="AVOID - Downtrend detected", confidence=0.9 ) # Identify scenario for uptrend stocks scenario, description, recommendation, confidence = self.identify_scenario(data) # Create analysis object analysis = TrendAnalysis( symbol=symbol, trend_direction=trend, price_current=data['Close'].iloc[-1], ema_50=data['EMA_50'].iloc[-1] if not data['EMA_50'].isna().all() else 0, ema_200=data['EMA_200'].iloc[-1] if not data['EMA_200'].isna().all() else 0, scenario=scenario, scenario_description=description, recommendation=recommendation, confidence=confidence ) # Create chart chart_path = self.create_analysis_chart(symbol, data, analysis) return analysis def generate_summary_report(self) -> str: """Generate comprehensive summary report""" report_filename = f"batch_analysis_report_{self.timestamp}.txt" report_path = self.batch_charts_dir / report_filename with open(report_path, 'w') as f: f.write("="*80 + "\n") f.write("BATCH FINANCIAL ANALYSIS REPORT\n") f.write("="*80 + "\n") f.write(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") f.write(f"Total symbols analyzed: {len(self.analysis_results)}\n\n") # Summary statistics uptrend_count = sum(1 for r in self.analysis_results if r.trend_direction == "up") downtrend_count = sum(1 for r in self.analysis_results if r.trend_direction == "down") f.write("TREND SUMMARY:\n") f.write(f"• Uptrend: {uptrend_count} symbols\n") f.write(f"• Downtrend: {downtrend_count} symbols\n\n") # Scenario breakdown scenario_counts = {} for result in self.analysis_results: scenario_counts[result.scenario] = scenario_counts.get(result.scenario, 0) + 1 f.write("SCENARIO BREAKDOWN:\n") for scenario, count in scenario_counts.items(): f.write(f"• {scenario}: {count} symbols\n") f.write("\n") # Detailed results f.write("DETAILED ANALYSIS:\n") f.write("-"*80 + "\n") for result in self.analysis_results: f.write(f"Symbol: {result.symbol}\n") f.write(f"Trend: {result.trend_direction.upper()}\n") f.write(f"Current Price: ${result.price_current:.2f}\n") f.write(f"50-day EMA: ${result.ema_50:.2f}\n") f.write(f"200-day EMA: ${result.ema_200:.2f}\n") f.write(f"Scenario: {result.scenario}\n") f.write(f"Description: {result.scenario_description}\n") f.write(f"Recommendation: {result.recommendation}\n") f.write(f"Confidence: {result.confidence:.0%}\n") f.write("-"*40 + "\n") # Trading recommendations f.write("\nTRADING RECOMMENDATIONS:\n") f.write("="*50 + "\n") buy_candidates = [r for r in self.analysis_results if "BUY" in r.recommendation] sell_candidates = [r for r in self.analysis_results if "SELL" in r.recommendation] if buy_candidates: f.write("BUY CANDIDATES:\n") for candidate in sorted(buy_candidates, key=lambda x: x.confidence, reverse=True): f.write(f"• {candidate.symbol}: {candidate.recommendation} (Confidence: {candidate.confidence:.0%})\n") f.write("\n") if sell_candidates: f.write("SELL CANDIDATES:\n") for candidate in sorted(sell_candidates, key=lambda x: x.confidence, reverse=True): f.write(f"• {candidate.symbol}: {candidate.recommendation} (Confidence: {candidate.confidence:.0%})\n") logger.info(f"Summary report saved: {report_path}") return str(report_path) def run_batch_analysis(self): """Run the complete batch analysis""" logger.info("Starting batch financial analysis") # Load portfolios portfolios = self.load_portfolios() if not portfolios: logger.error("No portfolios found. Exiting.") return # Collect all unique symbols from all portfolios all_symbols = set() for portfolio_name, portfolio_data in portfolios.items(): logger.info(f"Processing portfolio: {portfolio_name}") # Handle different portfolio structures if isinstance(portfolio_data, dict): # Check for 'stock_list' key (your structure) if 'stock_list' in portfolio_data: stock_list = portfolio_data['stock_list'] if isinstance(stock_list, list): for symbol in stock_list: if isinstance(symbol, str): symbol = symbol.upper().strip() all_symbols.add(symbol) logger.debug(f"Added symbol from stock_list: {symbol}") # Check for 'holdings' key (alternative structure) elif 'holdings' in portfolio_data: holdings = portfolio_data['holdings'] if isinstance(holdings, list): for holding in holdings: if isinstance(holding, dict) and 'symbol' in holding: symbol = holding['symbol'].upper().strip() all_symbols.add(symbol) logger.debug(f"Added symbol from holdings: {symbol}") elif isinstance(holding, str): symbol = holding.upper().strip() all_symbols.add(symbol) logger.debug(f"Added symbol from holdings list: {symbol}") elif isinstance(holdings, dict): # Holdings might be a dict with symbol keys for symbol in holdings.keys(): symbol = symbol.upper().strip() all_symbols.add(symbol) logger.debug(f"Added symbol from holdings dict: {symbol}") # Check for 'symbols' key (another alternative) elif 'symbols' in portfolio_data: symbols = portfolio_data['symbols'] if isinstance(symbols, list): for symbol in symbols: if isinstance(symbol, str): symbol = symbol.upper().strip() all_symbols.add(symbol) logger.debug(f"Added symbol from symbols list: {symbol}") # Check for direct symbol keys (in case portfolio structure is different) else: for key, value in portfolio_data.items(): if key not in ['portfolio', 'name', 'description', 'created_date', 'stock_list', 'holdings', 'symbols'] and isinstance(value, (dict, int, float)): # This might be a symbol symbol = key.upper().strip() if len(symbol) <= 5 and symbol.isalpha(): # Basic symbol validation all_symbols.add(symbol) logger.debug(f"Added symbol from portfolio keys: {symbol}") elif isinstance(portfolio_data, list): # Portfolio data is directly a list of symbols or holdings for item in portfolio_data: if isinstance(item, str): symbol = item.upper().strip() all_symbols.add(symbol) logger.debug(f"Added symbol from direct list: {symbol}") elif isinstance(item, dict) and 'symbol' in item: symbol = item['symbol'].upper().strip() all_symbols.add(symbol) logger.debug(f"Added symbol from direct list dict: {symbol}") # Remove any empty strings or invalid symbols all_symbols = {symbol for symbol in all_symbols if symbol and len(symbol) <= 5} logger.info(f"Found {len(all_symbols)} unique symbols to analyze: {sorted(all_symbols)}") if len(all_symbols) == 0: logger.warning("No valid symbols found in portfolio. Please check your portfolio.json structure.") logger.info("Expected structure examples:") logger.info("1. {'portfolio_name': {'stock_list': ['AAPL', 'GOOGL', ...]}}") logger.info("2. {'portfolio_name': {'holdings': [{'symbol': 'AAPL', 'quantity': 10}, ...]}}") logger.info("3. {'portfolio_name': [{'symbol': 'AAPL', 'quantity': 10}, ...]}") # Create a sample portfolio for testing logger.info("Creating sample analysis with popular stocks...") all_symbols = {'AAPL', 'MSFT', 'GOOGL', 'TSLA', 'NVDA'} # Analyze each symbol for symbol in sorted(all_symbols): try: analysis = self.analyze_symbol(symbol) if analysis: self.analysis_results.append(analysis) except Exception as e: logger.error(f"Error analyzing {symbol}: {e}") continue # Generate summary report report_path = self.generate_summary_report() logger.info("Batch analysis complete!") logger.info(f"Results: {len(self.analysis_results)} symbols analyzed") logger.info(f"Charts saved to: {self.batch_charts_dir}") logger.info(f"Summary report: {report_path}") # Print quick summary to console if self.analysis_results: print(f"\n{'='*60}") print("QUICK ANALYSIS SUMMARY") print(f"{'='*60}") uptrend_count = sum(1 for r in self.analysis_results if r.trend_direction == "up") downtrend_count = sum(1 for r in self.analysis_results if r.trend_direction == "down") print(f"Total symbols analyzed: {len(self.analysis_results)}") print(f"Uptrend stocks: {uptrend_count}") print(f"Downtrend stocks: {downtrend_count}") # Show top buy candidates buy_candidates = [r for r in self.analysis_results if "BUY" in r.recommendation] if buy_candidates: print(f"\nTOP BUY CANDIDATES:") for candidate in sorted(buy_candidates, key=lambda x: x.confidence, reverse=True)[:3]: print(f"• {candidate.symbol}: {candidate.recommendation} (Confidence: {candidate.confidence:.0%})") print(f"\nDetailed results in: {report_path}") print(f"Charts saved in: {self.batch_charts_dir}") else: logger.warning("No symbols were successfully analyzed. Check your data sources and network connection.") def main(): """Main function""" print("="*60) print("BATCH FINANCIAL MCP SERVER") print("="*60) analyzer = BatchFinancialAnalyzer() analyzer.run_batch_analysis() print("\nBatch analysis completed successfully!") print(f"Check {analyzer.batch_charts_dir} for results") if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/j1c4b/finance_mcp_server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server