email_report_script.py•16.8 kB
#!/usr/bin/env python3
"""
Email Financial Analysis Report Script
Sends batch analysis results via email with chart attachments
"""
import os
import json
import smtplib
import logging
from pathlib import Path
from datetime import datetime
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
from typing import List, Dict, Any
import glob
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class FinancialReportEmailer:
def __init__(self, config_file: str = "email_config.json"):
"""Initialize the emailer with configuration"""
self.project_dir = Path(__file__).parent
self.config_file = self.project_dir / config_file
self.batch_charts_dir = self.project_dir / "batch_financial_charts"
self.recommendations_file = self.project_dir / "buy_recommendations.json"
# Load email configuration
self.config = self.load_email_config()
logger.info("Financial Report Emailer initialized")
def load_email_config(self) -> Dict[str, Any]:
"""Load email configuration from JSON file"""
try:
if self.config_file.exists():
with open(self.config_file, 'r') as f:
return json.load(f)
else:
# Create default config file
default_config = {
"smtp_server": "smtp.gmail.com",
"smtp_port": 587,
"sender_email": "your_email@gmail.com",
"sender_password": "your_app_password",
"recipient_emails": [
"recipient1@example.com",
"recipient2@example.com"
],
"subject_prefix": "📊 Financial Analysis Report",
"max_attachment_size_mb": 25
}
with open(self.config_file, 'w') as f:
json.dump(default_config, f, indent=2)
logger.warning(f"Created default config file: {self.config_file}")
logger.warning("Please update email credentials and recipients!")
return default_config
except Exception as e:
logger.error(f"Error loading email config: {e}")
return {}
def find_latest_report(self) -> Dict[str, Any]:
"""Find the most recent batch analysis report and related files"""
try:
# Find the latest report file
report_files = list(self.batch_charts_dir.glob("batch_analysis_report_*.txt"))
if not report_files:
logger.error("No analysis reports found")
return {}
latest_report = max(report_files, key=lambda x: x.stat().st_mtime)
# Extract timestamp from filename
filename = latest_report.stem
timestamp = filename.replace("batch_analysis_report_", "")
# Find corresponding chart files
chart_pattern = f"*_batch_analysis_{timestamp}.png"
chart_files = list(self.batch_charts_dir.glob(chart_pattern))
logger.info(f"Found latest report: {latest_report}")
logger.info(f"Found {len(chart_files)} chart files")
return {
"report_file": latest_report,
"chart_files": chart_files,
"timestamp": timestamp
}
except Exception as e:
logger.error(f"Error finding latest report: {e}")
return {}
def parse_report_summary(self, report_file: Path) -> Dict[str, Any]:
"""Parse the report file to extract key information"""
try:
with open(report_file, 'r') as f:
content = f.read()
summary = {
"total_symbols": 0,
"uptrend_stocks": 0,
"downtrend_stocks": 0,
"buy_candidates": [],
"sell_candidates": [],
"generation_time": "Unknown"
}
lines = content.split('\n')
for line in lines:
line = line.strip()
if "Total symbols analyzed:" in line:
summary["total_symbols"] = int(line.split(':')[1].strip())
elif "Uptrend:" in line and "symbols" in line:
summary["uptrend_stocks"] = int(line.split(':')[1].split()[0])
elif "Downtrend:" in line and "symbols" in line:
summary["downtrend_stocks"] = int(line.split(':')[1].split()[0])
elif "Generated:" in line:
summary["generation_time"] = line.split('Generated:')[1].strip()
# Extract buy candidates from the trading recommendations section
in_buy_section = False
for line in lines:
line = line.strip()
if "BUY CANDIDATES:" in line:
in_buy_section = True
continue
elif "SELL CANDIDATES:" in line:
in_buy_section = False
continue
elif in_buy_section and line.startswith("•"):
summary["buy_candidates"].append(line[1:].strip())
return summary
except Exception as e:
logger.error(f"Error parsing report: {e}")
return {}
def save_buy_recommendations(self, buy_candidates: List[str], timestamp: str):
"""Save buy recommendations for daily tracking"""
try:
recommendations = []
for candidate in buy_candidates:
# Parse candidate string: "SYMBOL: recommendation (Confidence: XX%)"
parts = candidate.split(':')
if len(parts) >= 2:
symbol = parts[0].strip()
recommendation_part = ':'.join(parts[1:]).strip()
# Extract confidence if present
confidence = "Unknown"
if "Confidence:" in recommendation_part:
confidence_part = recommendation_part.split("Confidence:")[1]
confidence = confidence_part.strip().replace(')', '')
recommendations.append({
"symbol": symbol,
"recommendation": recommendation_part,
"confidence": confidence,
"recommendation_date": datetime.now().strftime("%Y-%m-%d"),
"recommendation_timestamp": timestamp,
"initial_price": None, # Will be filled by tracking script
"current_price": None,
"performance": None,
"status": "active"
})
# Save to file for tracking script
with open(self.recommendations_file, 'w') as f:
json.dump(recommendations, f, indent=2)
logger.info(f"Saved {len(recommendations)} buy recommendations for tracking")
except Exception as e:
logger.error(f"Error saving buy recommendations: {e}")
def create_email_content(self, summary: Dict[str, Any], chart_files: List[Path]) -> str:
"""Create HTML email content"""
html_content = f"""
<html>
<head>
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; }}
.header {{ background-color: #2c3e50; color: white; padding: 20px; border-radius: 8px; text-align: center; }}
.summary {{ background-color: #f8f9fa; padding: 20px; border-radius: 8px; margin: 20px 0; }}
.candidates {{ background-color: #e8f5e9; padding: 15px; border-radius: 8px; margin: 10px 0; }}
.metrics {{ display: flex; justify-content: space-around; margin: 20px 0; }}
.metric {{ text-align: center; padding: 10px; }}
.metric-value {{ font-size: 2em; font-weight: bold; color: #2c3e50; }}
.metric-label {{ color: #666; }}
.footer {{ color: #666; font-size: 0.9em; text-align: center; margin-top: 30px; }}
ul {{ padding-left: 20px; }}
li {{ margin: 5px 0; }}
</style>
</head>
<body>
<div class="header">
<h1>📊 Financial Analysis Report</h1>
<p>Automated Portfolio Analysis & Trading Recommendations</p>
</div>
<div class="summary">
<h2>📈 Analysis Summary</h2>
<div class="metrics">
<div class="metric">
<div class="metric-value">{summary.get('total_symbols', 0)}</div>
<div class="metric-label">Total Symbols</div>
</div>
<div class="metric">
<div class="metric-value" style="color: #27ae60;">{summary.get('uptrend_stocks', 0)}</div>
<div class="metric-label">Uptrend Stocks</div>
</div>
<div class="metric">
<div class="metric-value" style="color: #e74c3c;">{summary.get('downtrend_stocks', 0)}</div>
<div class="metric-label">Downtrend Stocks</div>
</div>
</div>
</div>
"""
if summary.get('buy_candidates'):
html_content += f"""
<div class="candidates">
<h3>🎯 Top Buy Candidates</h3>
<ul>
"""
for candidate in summary['buy_candidates']:
html_content += f"<li>{candidate}</li>"
html_content += """
</ul>
<p><strong>Note:</strong> These recommendations have been saved for daily performance tracking.</p>
</div>
"""
html_content += f"""
<div class="summary">
<h3>📊 Charts & Analysis</h3>
<p>This email includes {len(chart_files)} individual stock analysis charts.</p>
<p>Each chart contains:</p>
<ul>
<li>Price action with 50-day and 200-day EMAs</li>
<li>Volume analysis</li>
<li>Trading scenario identification</li>
<li>Detailed recommendations with confidence levels</li>
</ul>
</div>
<div class="footer">
<p>Generated: {summary.get('generation_time', 'Unknown')}</p>
<p>⚠️ This is automated analysis for informational purposes only. Always do your own research before making investment decisions.</p>
</div>
</body>
</html>
"""
return html_content
def send_email(self, summary: Dict[str, Any], report_file: Path, chart_files: List[Path]):
"""Send the email with attachments"""
try:
# Create message
msg = MIMEMultipart('alternative')
msg['From'] = self.config['sender_email']
msg['To'] = ', '.join(self.config['recipient_emails'])
# Create subject with key metrics
buy_count = len(summary.get('buy_candidates', []))
uptrend_count = summary.get('uptrend_stocks', 0)
subject = f"{self.config['subject_prefix']} - {uptrend_count} Uptrends, {buy_count} Buy Signals"
msg['Subject'] = subject
# Create HTML content
html_content = self.create_email_content(summary, chart_files)
html_part = MIMEText(html_content, 'html')
msg.attach(html_part)
# Attach report file
with open(report_file, 'rb') as attachment:
part = MIMEBase('application', 'octet-stream')
part.set_payload(attachment.read())
encoders.encode_base64(part)
part.add_header(
'Content-Disposition',
f'attachment; filename= {report_file.name}'
)
msg.attach(part)
# Attach chart files (limit to prevent email size issues)
max_size_bytes = self.config.get('max_attachment_size_mb', 25) * 1024 * 1024
current_size = os.path.getsize(report_file)
attached_charts = 0
# Prioritize buy candidate charts
buy_symbols = []
for candidate in summary.get('buy_candidates', []):
symbol = candidate.split(':')[0].strip()
buy_symbols.append(symbol)
# Sort chart files to prioritize buy candidates
prioritized_charts = []
remaining_charts = []
for chart_file in chart_files:
chart_symbol = chart_file.name.split('_')[0]
if chart_symbol in buy_symbols:
prioritized_charts.append(chart_file)
else:
remaining_charts.append(chart_file)
# Attach prioritized charts first, then others if space allows
for chart_file in prioritized_charts + remaining_charts:
chart_size = os.path.getsize(chart_file)
if current_size + chart_size < max_size_bytes:
with open(chart_file, 'rb') as attachment:
part = MIMEBase('application', 'octet-stream')
part.set_payload(attachment.read())
encoders.encode_base64(part)
part.add_header(
'Content-Disposition',
f'attachment; filename= {chart_file.name}'
)
msg.attach(part)
current_size += chart_size
attached_charts += 1
else:
logger.warning(f"Skipping {chart_file.name} - would exceed size limit")
# Send email
server = smtplib.SMTP(self.config['smtp_server'], self.config['smtp_port'])
server.starttls()
server.login(self.config['sender_email'], self.config['sender_password'])
server.send_message(msg)
server.quit()
logger.info(f"Email sent successfully to {len(self.config['recipient_emails'])} recipients")
logger.info(f"Attached: 1 report + {attached_charts} charts")
except Exception as e:
logger.error(f"Error sending email: {e}")
raise e
def run_email_report(self):
"""Main function to send the email report"""
logger.info("Starting email report process")
# Validate configuration
if not self.config.get('sender_email') or self.config['sender_email'] == 'your_email@gmail.com':
logger.error("Please update email_config.json with your email credentials")
return
# Find latest report
report_data = self.find_latest_report()
if not report_data:
logger.error("No report data found")
return
# Parse report summary
summary = self.parse_report_summary(report_data['report_file'])
if not summary:
logger.error("Could not parse report summary")
return
# Save buy recommendations for tracking
if summary.get('buy_candidates'):
self.save_buy_recommendations(summary['buy_candidates'], report_data['timestamp'])
# Send email
try:
self.send_email(summary, report_data['report_file'], report_data['chart_files'])
logger.info("Email report sent successfully!")
except Exception as e:
logger.error(f"Failed to send email: {e}")
def main():
"""Main function"""
print("="*60)
print("FINANCIAL REPORT EMAILER")
print("="*60)
emailer = FinancialReportEmailer()
emailer.run_email_report()
print("\nEmail report process completed!")
if __name__ == "__main__":
main()