Skip to main content
Glama

Financial Data MCP Server

by j1c4b
email_report_script.py16.8 kB
#!/usr/bin/env python3 """ Email Financial Analysis Report Script Sends batch analysis results via email with chart attachments """ import os import json import smtplib import logging from pathlib import Path from datetime import datetime from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from email.mime.base import MIMEBase from email import encoders from typing import List, Dict, Any import glob # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class FinancialReportEmailer: def __init__(self, config_file: str = "email_config.json"): """Initialize the emailer with configuration""" self.project_dir = Path(__file__).parent self.config_file = self.project_dir / config_file self.batch_charts_dir = self.project_dir / "batch_financial_charts" self.recommendations_file = self.project_dir / "buy_recommendations.json" # Load email configuration self.config = self.load_email_config() logger.info("Financial Report Emailer initialized") def load_email_config(self) -> Dict[str, Any]: """Load email configuration from JSON file""" try: if self.config_file.exists(): with open(self.config_file, 'r') as f: return json.load(f) else: # Create default config file default_config = { "smtp_server": "smtp.gmail.com", "smtp_port": 587, "sender_email": "your_email@gmail.com", "sender_password": "your_app_password", "recipient_emails": [ "recipient1@example.com", "recipient2@example.com" ], "subject_prefix": "📊 Financial Analysis Report", "max_attachment_size_mb": 25 } with open(self.config_file, 'w') as f: json.dump(default_config, f, indent=2) logger.warning(f"Created default config file: {self.config_file}") logger.warning("Please update email credentials and recipients!") return default_config except Exception as e: logger.error(f"Error loading email config: {e}") return {} def find_latest_report(self) -> Dict[str, Any]: """Find the most recent batch analysis report and related files""" try: # Find the latest report file report_files = list(self.batch_charts_dir.glob("batch_analysis_report_*.txt")) if not report_files: logger.error("No analysis reports found") return {} latest_report = max(report_files, key=lambda x: x.stat().st_mtime) # Extract timestamp from filename filename = latest_report.stem timestamp = filename.replace("batch_analysis_report_", "") # Find corresponding chart files chart_pattern = f"*_batch_analysis_{timestamp}.png" chart_files = list(self.batch_charts_dir.glob(chart_pattern)) logger.info(f"Found latest report: {latest_report}") logger.info(f"Found {len(chart_files)} chart files") return { "report_file": latest_report, "chart_files": chart_files, "timestamp": timestamp } except Exception as e: logger.error(f"Error finding latest report: {e}") return {} def parse_report_summary(self, report_file: Path) -> Dict[str, Any]: """Parse the report file to extract key information""" try: with open(report_file, 'r') as f: content = f.read() summary = { "total_symbols": 0, "uptrend_stocks": 0, "downtrend_stocks": 0, "buy_candidates": [], "sell_candidates": [], "generation_time": "Unknown" } lines = content.split('\n') for line in lines: line = line.strip() if "Total symbols analyzed:" in line: summary["total_symbols"] = int(line.split(':')[1].strip()) elif "Uptrend:" in line and "symbols" in line: summary["uptrend_stocks"] = int(line.split(':')[1].split()[0]) elif "Downtrend:" in line and "symbols" in line: summary["downtrend_stocks"] = int(line.split(':')[1].split()[0]) elif "Generated:" in line: summary["generation_time"] = line.split('Generated:')[1].strip() # Extract buy candidates from the trading recommendations section in_buy_section = False for line in lines: line = line.strip() if "BUY CANDIDATES:" in line: in_buy_section = True continue elif "SELL CANDIDATES:" in line: in_buy_section = False continue elif in_buy_section and line.startswith("•"): summary["buy_candidates"].append(line[1:].strip()) return summary except Exception as e: logger.error(f"Error parsing report: {e}") return {} def save_buy_recommendations(self, buy_candidates: List[str], timestamp: str): """Save buy recommendations for daily tracking""" try: recommendations = [] for candidate in buy_candidates: # Parse candidate string: "SYMBOL: recommendation (Confidence: XX%)" parts = candidate.split(':') if len(parts) >= 2: symbol = parts[0].strip() recommendation_part = ':'.join(parts[1:]).strip() # Extract confidence if present confidence = "Unknown" if "Confidence:" in recommendation_part: confidence_part = recommendation_part.split("Confidence:")[1] confidence = confidence_part.strip().replace(')', '') recommendations.append({ "symbol": symbol, "recommendation": recommendation_part, "confidence": confidence, "recommendation_date": datetime.now().strftime("%Y-%m-%d"), "recommendation_timestamp": timestamp, "initial_price": None, # Will be filled by tracking script "current_price": None, "performance": None, "status": "active" }) # Save to file for tracking script with open(self.recommendations_file, 'w') as f: json.dump(recommendations, f, indent=2) logger.info(f"Saved {len(recommendations)} buy recommendations for tracking") except Exception as e: logger.error(f"Error saving buy recommendations: {e}") def create_email_content(self, summary: Dict[str, Any], chart_files: List[Path]) -> str: """Create HTML email content""" html_content = f""" <html> <head> <style> body {{ font-family: Arial, sans-serif; margin: 20px; }} .header {{ background-color: #2c3e50; color: white; padding: 20px; border-radius: 8px; text-align: center; }} .summary {{ background-color: #f8f9fa; padding: 20px; border-radius: 8px; margin: 20px 0; }} .candidates {{ background-color: #e8f5e9; padding: 15px; border-radius: 8px; margin: 10px 0; }} .metrics {{ display: flex; justify-content: space-around; margin: 20px 0; }} .metric {{ text-align: center; padding: 10px; }} .metric-value {{ font-size: 2em; font-weight: bold; color: #2c3e50; }} .metric-label {{ color: #666; }} .footer {{ color: #666; font-size: 0.9em; text-align: center; margin-top: 30px; }} ul {{ padding-left: 20px; }} li {{ margin: 5px 0; }} </style> </head> <body> <div class="header"> <h1>📊 Financial Analysis Report</h1> <p>Automated Portfolio Analysis & Trading Recommendations</p> </div> <div class="summary"> <h2>📈 Analysis Summary</h2> <div class="metrics"> <div class="metric"> <div class="metric-value">{summary.get('total_symbols', 0)}</div> <div class="metric-label">Total Symbols</div> </div> <div class="metric"> <div class="metric-value" style="color: #27ae60;">{summary.get('uptrend_stocks', 0)}</div> <div class="metric-label">Uptrend Stocks</div> </div> <div class="metric"> <div class="metric-value" style="color: #e74c3c;">{summary.get('downtrend_stocks', 0)}</div> <div class="metric-label">Downtrend Stocks</div> </div> </div> </div> """ if summary.get('buy_candidates'): html_content += f""" <div class="candidates"> <h3>🎯 Top Buy Candidates</h3> <ul> """ for candidate in summary['buy_candidates']: html_content += f"<li>{candidate}</li>" html_content += """ </ul> <p><strong>Note:</strong> These recommendations have been saved for daily performance tracking.</p> </div> """ html_content += f""" <div class="summary"> <h3>📊 Charts & Analysis</h3> <p>This email includes {len(chart_files)} individual stock analysis charts.</p> <p>Each chart contains:</p> <ul> <li>Price action with 50-day and 200-day EMAs</li> <li>Volume analysis</li> <li>Trading scenario identification</li> <li>Detailed recommendations with confidence levels</li> </ul> </div> <div class="footer"> <p>Generated: {summary.get('generation_time', 'Unknown')}</p> <p>⚠️ This is automated analysis for informational purposes only. Always do your own research before making investment decisions.</p> </div> </body> </html> """ return html_content def send_email(self, summary: Dict[str, Any], report_file: Path, chart_files: List[Path]): """Send the email with attachments""" try: # Create message msg = MIMEMultipart('alternative') msg['From'] = self.config['sender_email'] msg['To'] = ', '.join(self.config['recipient_emails']) # Create subject with key metrics buy_count = len(summary.get('buy_candidates', [])) uptrend_count = summary.get('uptrend_stocks', 0) subject = f"{self.config['subject_prefix']} - {uptrend_count} Uptrends, {buy_count} Buy Signals" msg['Subject'] = subject # Create HTML content html_content = self.create_email_content(summary, chart_files) html_part = MIMEText(html_content, 'html') msg.attach(html_part) # Attach report file with open(report_file, 'rb') as attachment: part = MIMEBase('application', 'octet-stream') part.set_payload(attachment.read()) encoders.encode_base64(part) part.add_header( 'Content-Disposition', f'attachment; filename= {report_file.name}' ) msg.attach(part) # Attach chart files (limit to prevent email size issues) max_size_bytes = self.config.get('max_attachment_size_mb', 25) * 1024 * 1024 current_size = os.path.getsize(report_file) attached_charts = 0 # Prioritize buy candidate charts buy_symbols = [] for candidate in summary.get('buy_candidates', []): symbol = candidate.split(':')[0].strip() buy_symbols.append(symbol) # Sort chart files to prioritize buy candidates prioritized_charts = [] remaining_charts = [] for chart_file in chart_files: chart_symbol = chart_file.name.split('_')[0] if chart_symbol in buy_symbols: prioritized_charts.append(chart_file) else: remaining_charts.append(chart_file) # Attach prioritized charts first, then others if space allows for chart_file in prioritized_charts + remaining_charts: chart_size = os.path.getsize(chart_file) if current_size + chart_size < max_size_bytes: with open(chart_file, 'rb') as attachment: part = MIMEBase('application', 'octet-stream') part.set_payload(attachment.read()) encoders.encode_base64(part) part.add_header( 'Content-Disposition', f'attachment; filename= {chart_file.name}' ) msg.attach(part) current_size += chart_size attached_charts += 1 else: logger.warning(f"Skipping {chart_file.name} - would exceed size limit") # Send email server = smtplib.SMTP(self.config['smtp_server'], self.config['smtp_port']) server.starttls() server.login(self.config['sender_email'], self.config['sender_password']) server.send_message(msg) server.quit() logger.info(f"Email sent successfully to {len(self.config['recipient_emails'])} recipients") logger.info(f"Attached: 1 report + {attached_charts} charts") except Exception as e: logger.error(f"Error sending email: {e}") raise e def run_email_report(self): """Main function to send the email report""" logger.info("Starting email report process") # Validate configuration if not self.config.get('sender_email') or self.config['sender_email'] == 'your_email@gmail.com': logger.error("Please update email_config.json with your email credentials") return # Find latest report report_data = self.find_latest_report() if not report_data: logger.error("No report data found") return # Parse report summary summary = self.parse_report_summary(report_data['report_file']) if not summary: logger.error("Could not parse report summary") return # Save buy recommendations for tracking if summary.get('buy_candidates'): self.save_buy_recommendations(summary['buy_candidates'], report_data['timestamp']) # Send email try: self.send_email(summary, report_data['report_file'], report_data['chart_files']) logger.info("Email report sent successfully!") except Exception as e: logger.error(f"Failed to send email: {e}") def main(): """Main function""" print("="*60) print("FINANCIAL REPORT EMAILER") print("="*60) emailer = FinancialReportEmailer() emailer.run_email_report() print("\nEmail report process completed!") if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/j1c4b/finance_mcp_server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server