batch_fin_mcp_server.py•25.2 kB
#!/usr/bin/env python3
"""
Batch Financial MCP Server
Analyzes all portfolios and generates comprehensive reports with charts
"""
import os
import json
import logging
from pathlib import Path
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Tuple
import pandas as pd
import numpy as np
import yfinance as yf
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from dataclasses import dataclass
# Configure matplotlib for headless operation
import matplotlib
matplotlib.use('Agg')
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
@dataclass
class TrendAnalysis:
"""Data class for trend analysis results"""
symbol: str
trend_direction: str # "up" or "down"
price_current: float
ema_50: float
ema_200: float
scenario: str
scenario_description: str
recommendation: str
confidence: float
class BatchFinancialAnalyzer:
def __init__(self):
"""Initialize the batch analyzer"""
# Configuration
self.project_dir = Path(__file__).parent
self.portfolio_file = self.project_dir / "portfolio.json"
self.batch_charts_dir = self.project_dir / "batch_financial_charts"
self.batch_charts_dir.mkdir(exist_ok=True)
# Analysis parameters
self.analysis_period = "1y" # 1 year of data
self.trend_lookback_days = 252 # Trading days in a year
self.ema_touch_days = 2 # Days to check for EMA touch
self.price_decrease_days = 3 # Days to check for price decrease
# Results storage
self.analysis_results: List[TrendAnalysis] = []
self.timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
logger.info("Batch Financial Analyzer initialized")
logger.info(f"Portfolio file: {self.portfolio_file}")
logger.info(f"Charts directory: {self.batch_charts_dir}")
def load_portfolios(self) -> Dict[str, Any]:
"""Load portfolio data from JSON file"""
try:
with open(self.portfolio_file, 'r') as f:
portfolio_data = json.load(f)
logger.info(f"Loaded {len(portfolio_data)} portfolios")
return portfolio_data
except FileNotFoundError:
logger.error(f"Portfolio file not found: {self.portfolio_file}")
return {}
except json.JSONDecodeError as e:
logger.error(f"Error parsing portfolio JSON: {e}")
return {}
def get_stock_data(self, symbol: str) -> Optional[pd.DataFrame]:
"""Fetch stock data for analysis"""
try:
logger.info(f"Fetching data for {symbol}")
ticker = yf.Ticker(symbol)
data = ticker.history(period=self.analysis_period)
if data.empty:
logger.warning(f"No data available for {symbol}")
return None
# Calculate EMAs
data['EMA_50'] = data['Close'].ewm(span=50).mean()
data['EMA_200'] = data['Close'].ewm(span=200).mean()
return data
except Exception as e:
logger.error(f"Error fetching data for {symbol}: {e}")
return None
def analyze_trend(self, data: pd.DataFrame) -> str:
"""
Analyze if the overall trend is up or down using proven technical analysis methods
Based on moving average crossovers and price position relative to MAs
"""
if data.empty or len(data) < 200:
return "unknown"
current_price = data['Close'].iloc[-1]
ema_50 = data['EMA_50'].iloc[-1]
ema_200 = data['EMA_200'].iloc[-1]
# Check if we have valid EMA values
if pd.isna(ema_50) or pd.isna(ema_200):
return "unknown"
# Primary trend determination: Golden Cross / Death Cross
# Golden Cross: 50 EMA above 200 EMA = Bullish
# Death Cross: 50 EMA below 200 EMA = Bearish
golden_cross = ema_50 > ema_200
# Secondary confirmation: Price position relative to EMAs
price_above_50 = current_price > ema_50
price_above_200 = current_price > ema_200
# Tertiary confirmation: Recent EMA slope (last 10 days)
if len(data) >= 10:
ema_50_slope = data['EMA_50'].iloc[-1] - data['EMA_50'].iloc[-10]
ema_200_slope = data['EMA_200'].iloc[-1] - data['EMA_200'].iloc[-10]
ema_50_rising = ema_50_slope > 0
ema_200_rising = ema_200_slope > 0
else:
ema_50_rising = True
ema_200_rising = True
# Combine all factors for trend determination
bullish_signals = sum([
golden_cross, # 50 EMA > 200 EMA
price_above_50, # Price > 50 EMA
price_above_200, # Price > 200 EMA
ema_50_rising, # 50 EMA rising
ema_200_rising # 200 EMA rising
])
# Strong uptrend: 4-5 bullish signals
if bullish_signals >= 4:
return "up"
# Strong downtrend: 0-1 bullish signals
elif bullish_signals <= 1:
return "down"
# Mixed signals: 2-3 bullish signals - use Golden/Death Cross as tiebreaker
else:
return "up" if golden_cross else "down"
def check_ema_touch(self, data: pd.DataFrame, ema_column: str, days: int = 2) -> bool:
"""Check if price touched EMA within last N days"""
if len(data) < days:
return False
recent_data = data.tail(days)
ema_values = recent_data[ema_column]
low_values = recent_data['Low']
high_values = recent_data['High']
# Check if any day's range (Low to High) intersected with EMA
for i, (low, high, ema) in enumerate(zip(low_values, high_values, ema_values)):
if low <= ema <= high:
return True
return False
def check_price_decrease_days(self, data: pd.DataFrame, days: int = 3) -> bool:
"""Check if price decreased for at least N consecutive days"""
if len(data) < days:
return False
recent_closes = data['Close'].tail(days + 1) # +1 to compare with previous
consecutive_decreases = 0
for i in range(len(recent_closes) - 1):
if recent_closes.iloc[i + 1] < recent_closes.iloc[i]:
consecutive_decreases += 1
else:
consecutive_decreases = 0
return consecutive_decreases >= days
def identify_scenario(self, data: pd.DataFrame) -> Tuple[str, str, str, float]:
"""Identify trading scenario and provide recommendation"""
if data.empty:
return "no_data", "No data available", "Hold", 0.0
current_price = data['Close'].iloc[-1]
ema_50 = data['EMA_50'].iloc[-1]
ema_200 = data['EMA_200'].iloc[-1]
# Scenario A: Price > 10% above 50 EMA
if current_price > ema_50 * 1.10:
return ("scenario_a",
f"Price ${current_price:.2f} is >10% above 50-day EMA ${ema_50:.2f}",
"SELL - Take profits, price may be overextended",
0.8)
# Scenario B: Price above 50 EMA and touched it within last 2 days
elif (current_price > ema_50 and
self.check_ema_touch(data, 'EMA_50', self.ema_touch_days)):
return ("scenario_b",
f"Price ${current_price:.2f} above 50-day EMA ${ema_50:.2f} and touched it recently",
"BUY - Good entry point after pullback to support",
0.85)
# Scenario C: Price > 5% below 50 EMA, decreasing 3+ days, above 200 EMA
elif (current_price < ema_50 * 0.95 and
current_price > ema_200 and
self.check_price_decrease_days(data, self.price_decrease_days)):
return ("scenario_c",
f"Price ${current_price:.2f} >5% below 50-day EMA ${ema_50:.2f}, decreasing, above 200-day EMA ${ema_200:.2f}",
"BUY - Potential oversold condition in uptrend",
0.75)
# Scenario D: Price below 50 EMA, touched 200 EMA recently, above 200 EMA
elif (current_price < ema_50 and
current_price > ema_200 and
self.check_ema_touch(data, 'EMA_200', self.ema_touch_days)):
return ("scenario_d",
f"Price ${current_price:.2f} below 50-day EMA, touched 200-day EMA ${ema_200:.2f} recently",
"BUY - Potential bounce from major support",
0.70)
# Default case - no clear scenario
else:
return ("no_clear_scenario",
f"Price ${current_price:.2f} doesn't fit clear trading scenarios",
"HOLD - Monitor for better entry/exit points",
0.3)
def create_analysis_chart(self, symbol: str, data: pd.DataFrame, analysis: TrendAnalysis) -> str:
"""Create comprehensive analysis chart"""
try:
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(15, 12))
fig.suptitle(f'{symbol} - Batch Analysis Report', fontsize=16, fontweight='bold')
# Chart 1: Price and EMAs
ax1.plot(data.index, data['Close'], label='Price', linewidth=2, color='black')
ax1.plot(data.index, data['EMA_50'], label='50-day EMA', linewidth=1.5, color='blue', alpha=0.7)
ax1.plot(data.index, data['EMA_200'], label='200-day EMA', linewidth=1.5, color='red', alpha=0.7)
# Highlight current price
current_price = data['Close'].iloc[-1]
ax1.scatter(data.index[-1], current_price, color='red', s=100, zorder=5)
ax1.annotate(f'${current_price:.2f}',
xy=(data.index[-1], current_price),
xytext=(10, 10), textcoords='offset points',
bbox=dict(boxstyle='round,pad=0.3', facecolor='yellow', alpha=0.7))
ax1.set_title(f'Price Action & EMAs - {analysis.scenario_description}')
ax1.set_ylabel('Price ($)')
ax1.legend()
ax1.grid(True, alpha=0.3)
# Format x-axis
ax1.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m'))
ax1.xaxis.set_major_locator(mdates.MonthLocator(interval=2))
# Chart 2: Volume and additional info
ax2.bar(data.index, data['Volume'], alpha=0.6, color='gray')
ax2.set_title('Volume')
ax2.set_ylabel('Volume')
ax2.set_xlabel('Date')
# Add analysis text box
analysis_text = f"""
ANALYSIS SUMMARY:
• Symbol: {analysis.symbol}
• Trend: {analysis.trend_direction.upper()}
• Current Price: ${analysis.price_current:.2f}
• 50-day EMA: ${analysis.ema_50:.2f}
• 200-day EMA: ${analysis.ema_200:.2f}
• Scenario: {analysis.scenario}
• Recommendation: {analysis.recommendation}
• Confidence: {analysis.confidence:.0%}
""".strip()
ax2.text(0.02, 0.98, analysis_text, transform=ax2.transAxes,
verticalalignment='top', fontsize=10,
bbox=dict(boxstyle='round,pad=0.5', facecolor='lightblue', alpha=0.8))
plt.tight_layout()
# Save chart
chart_filename = f"{symbol}_batch_analysis_{self.timestamp}.png"
chart_path = self.batch_charts_dir / chart_filename
plt.savefig(chart_path, dpi=300, bbox_inches='tight')
plt.close()
logger.info(f"Chart saved: {chart_path}")
return str(chart_path)
except Exception as e:
logger.error(f"Error creating chart for {symbol}: {e}")
plt.close()
return ""
def analyze_symbol(self, symbol: str) -> Optional[TrendAnalysis]:
"""Perform complete analysis on a single symbol"""
logger.info(f"Analyzing {symbol}")
# Get stock data
data = self.get_stock_data(symbol)
if data is None:
return None
# Analyze trend
trend = self.analyze_trend(data)
logger.info(f"{symbol}: Trend = {trend}")
# Skip if trend is down
if trend == "down":
logger.info(f"{symbol}: Skipping due to downtrend")
return TrendAnalysis(
symbol=symbol,
trend_direction=trend,
price_current=data['Close'].iloc[-1],
ema_50=data['EMA_50'].iloc[-1] if not data['EMA_50'].isna().all() else 0,
ema_200=data['EMA_200'].iloc[-1] if not data['EMA_200'].isna().all() else 0,
scenario="downtrend_skip",
scenario_description="Skipped due to downtrend",
recommendation="AVOID - Downtrend detected",
confidence=0.9
)
# Identify scenario for uptrend stocks
scenario, description, recommendation, confidence = self.identify_scenario(data)
# Create analysis object
analysis = TrendAnalysis(
symbol=symbol,
trend_direction=trend,
price_current=data['Close'].iloc[-1],
ema_50=data['EMA_50'].iloc[-1] if not data['EMA_50'].isna().all() else 0,
ema_200=data['EMA_200'].iloc[-1] if not data['EMA_200'].isna().all() else 0,
scenario=scenario,
scenario_description=description,
recommendation=recommendation,
confidence=confidence
)
# Create chart
chart_path = self.create_analysis_chart(symbol, data, analysis)
return analysis
def generate_summary_report(self) -> str:
"""Generate comprehensive summary report"""
report_filename = f"batch_analysis_report_{self.timestamp}.txt"
report_path = self.batch_charts_dir / report_filename
with open(report_path, 'w') as f:
f.write("="*80 + "\n")
f.write("BATCH FINANCIAL ANALYSIS REPORT\n")
f.write("="*80 + "\n")
f.write(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"Total symbols analyzed: {len(self.analysis_results)}\n\n")
# Summary statistics
uptrend_count = sum(1 for r in self.analysis_results if r.trend_direction == "up")
downtrend_count = sum(1 for r in self.analysis_results if r.trend_direction == "down")
f.write("TREND SUMMARY:\n")
f.write(f"• Uptrend: {uptrend_count} symbols\n")
f.write(f"• Downtrend: {downtrend_count} symbols\n\n")
# Scenario breakdown
scenario_counts = {}
for result in self.analysis_results:
scenario_counts[result.scenario] = scenario_counts.get(result.scenario, 0) + 1
f.write("SCENARIO BREAKDOWN:\n")
for scenario, count in scenario_counts.items():
f.write(f"• {scenario}: {count} symbols\n")
f.write("\n")
# Detailed results
f.write("DETAILED ANALYSIS:\n")
f.write("-"*80 + "\n")
for result in self.analysis_results:
f.write(f"Symbol: {result.symbol}\n")
f.write(f"Trend: {result.trend_direction.upper()}\n")
f.write(f"Current Price: ${result.price_current:.2f}\n")
f.write(f"50-day EMA: ${result.ema_50:.2f}\n")
f.write(f"200-day EMA: ${result.ema_200:.2f}\n")
f.write(f"Scenario: {result.scenario}\n")
f.write(f"Description: {result.scenario_description}\n")
f.write(f"Recommendation: {result.recommendation}\n")
f.write(f"Confidence: {result.confidence:.0%}\n")
f.write("-"*40 + "\n")
# Trading recommendations
f.write("\nTRADING RECOMMENDATIONS:\n")
f.write("="*50 + "\n")
buy_candidates = [r for r in self.analysis_results if "BUY" in r.recommendation]
sell_candidates = [r for r in self.analysis_results if "SELL" in r.recommendation]
if buy_candidates:
f.write("BUY CANDIDATES:\n")
for candidate in sorted(buy_candidates, key=lambda x: x.confidence, reverse=True):
f.write(f"• {candidate.symbol}: {candidate.recommendation} (Confidence: {candidate.confidence:.0%})\n")
f.write("\n")
if sell_candidates:
f.write("SELL CANDIDATES:\n")
for candidate in sorted(sell_candidates, key=lambda x: x.confidence, reverse=True):
f.write(f"• {candidate.symbol}: {candidate.recommendation} (Confidence: {candidate.confidence:.0%})\n")
logger.info(f"Summary report saved: {report_path}")
return str(report_path)
def run_batch_analysis(self):
"""Run the complete batch analysis"""
logger.info("Starting batch financial analysis")
# Load portfolios
portfolios = self.load_portfolios()
if not portfolios:
logger.error("No portfolios found. Exiting.")
return
# Collect all unique symbols from all portfolios
all_symbols = set()
for portfolio_name, portfolio_data in portfolios.items():
logger.info(f"Processing portfolio: {portfolio_name}")
# Handle different portfolio structures
if isinstance(portfolio_data, dict):
# Check for 'stock_list' key (your structure)
if 'stock_list' in portfolio_data:
stock_list = portfolio_data['stock_list']
if isinstance(stock_list, list):
for symbol in stock_list:
if isinstance(symbol, str):
symbol = symbol.upper().strip()
all_symbols.add(symbol)
logger.debug(f"Added symbol from stock_list: {symbol}")
# Check for 'holdings' key (alternative structure)
elif 'holdings' in portfolio_data:
holdings = portfolio_data['holdings']
if isinstance(holdings, list):
for holding in holdings:
if isinstance(holding, dict) and 'symbol' in holding:
symbol = holding['symbol'].upper().strip()
all_symbols.add(symbol)
logger.debug(f"Added symbol from holdings: {symbol}")
elif isinstance(holding, str):
symbol = holding.upper().strip()
all_symbols.add(symbol)
logger.debug(f"Added symbol from holdings list: {symbol}")
elif isinstance(holdings, dict):
# Holdings might be a dict with symbol keys
for symbol in holdings.keys():
symbol = symbol.upper().strip()
all_symbols.add(symbol)
logger.debug(f"Added symbol from holdings dict: {symbol}")
# Check for 'symbols' key (another alternative)
elif 'symbols' in portfolio_data:
symbols = portfolio_data['symbols']
if isinstance(symbols, list):
for symbol in symbols:
if isinstance(symbol, str):
symbol = symbol.upper().strip()
all_symbols.add(symbol)
logger.debug(f"Added symbol from symbols list: {symbol}")
# Check for direct symbol keys (in case portfolio structure is different)
else:
for key, value in portfolio_data.items():
if key not in ['portfolio', 'name', 'description', 'created_date', 'stock_list', 'holdings', 'symbols'] and isinstance(value, (dict, int, float)):
# This might be a symbol
symbol = key.upper().strip()
if len(symbol) <= 5 and symbol.isalpha(): # Basic symbol validation
all_symbols.add(symbol)
logger.debug(f"Added symbol from portfolio keys: {symbol}")
elif isinstance(portfolio_data, list):
# Portfolio data is directly a list of symbols or holdings
for item in portfolio_data:
if isinstance(item, str):
symbol = item.upper().strip()
all_symbols.add(symbol)
logger.debug(f"Added symbol from direct list: {symbol}")
elif isinstance(item, dict) and 'symbol' in item:
symbol = item['symbol'].upper().strip()
all_symbols.add(symbol)
logger.debug(f"Added symbol from direct list dict: {symbol}")
# Remove any empty strings or invalid symbols
all_symbols = {symbol for symbol in all_symbols if symbol and len(symbol) <= 5}
logger.info(f"Found {len(all_symbols)} unique symbols to analyze: {sorted(all_symbols)}")
if len(all_symbols) == 0:
logger.warning("No valid symbols found in portfolio. Please check your portfolio.json structure.")
logger.info("Expected structure examples:")
logger.info("1. {'portfolio_name': {'stock_list': ['AAPL', 'GOOGL', ...]}}")
logger.info("2. {'portfolio_name': {'holdings': [{'symbol': 'AAPL', 'quantity': 10}, ...]}}")
logger.info("3. {'portfolio_name': [{'symbol': 'AAPL', 'quantity': 10}, ...]}")
# Create a sample portfolio for testing
logger.info("Creating sample analysis with popular stocks...")
all_symbols = {'AAPL', 'MSFT', 'GOOGL', 'TSLA', 'NVDA'}
# Analyze each symbol
for symbol in sorted(all_symbols):
try:
analysis = self.analyze_symbol(symbol)
if analysis:
self.analysis_results.append(analysis)
except Exception as e:
logger.error(f"Error analyzing {symbol}: {e}")
continue
# Generate summary report
report_path = self.generate_summary_report()
logger.info("Batch analysis complete!")
logger.info(f"Results: {len(self.analysis_results)} symbols analyzed")
logger.info(f"Charts saved to: {self.batch_charts_dir}")
logger.info(f"Summary report: {report_path}")
# Print quick summary to console
if self.analysis_results:
print(f"\n{'='*60}")
print("QUICK ANALYSIS SUMMARY")
print(f"{'='*60}")
uptrend_count = sum(1 for r in self.analysis_results if r.trend_direction == "up")
downtrend_count = sum(1 for r in self.analysis_results if r.trend_direction == "down")
print(f"Total symbols analyzed: {len(self.analysis_results)}")
print(f"Uptrend stocks: {uptrend_count}")
print(f"Downtrend stocks: {downtrend_count}")
# Show top buy candidates
buy_candidates = [r for r in self.analysis_results if "BUY" in r.recommendation]
if buy_candidates:
print(f"\nTOP BUY CANDIDATES:")
for candidate in sorted(buy_candidates, key=lambda x: x.confidence, reverse=True)[:3]:
print(f"• {candidate.symbol}: {candidate.recommendation} (Confidence: {candidate.confidence:.0%})")
print(f"\nDetailed results in: {report_path}")
print(f"Charts saved in: {self.batch_charts_dir}")
else:
logger.warning("No symbols were successfully analyzed. Check your data sources and network connection.")
def main():
"""Main function"""
print("="*60)
print("BATCH FINANCIAL MCP SERVER")
print("="*60)
analyzer = BatchFinancialAnalyzer()
analyzer.run_batch_analysis()
print("\nBatch analysis completed successfully!")
print(f"Check {analyzer.batch_charts_dir} for results")
if __name__ == "__main__":
main()