Skip to main content
Glama
leeguooooo
by leeguooooo
smtp_operations.py10.4 kB
""" SMTP operations for sending emails """ import logging from typing import List, Optional, Dict, Any from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.mime.base import MIMEBase from email import encoders from email.utils import formataddr, formatdate import os import mimetypes from pathlib import Path logger = logging.getLogger(__name__) class SMTPOperations: """Handles email sending operations""" def __init__(self, connection_manager): self.connection_manager = connection_manager def send_email( self, to: List[str], subject: str, body: str, cc: Optional[List[str]] = None, bcc: Optional[List[str]] = None, attachments: Optional[List[str]] = None, is_html: bool = False, reply_to: Optional[str] = None ) -> Dict[str, Any]: """ Send an email Args: to: List of recipient email addresses subject: Email subject body: Email body (plain text or HTML) cc: List of CC recipients bcc: List of BCC recipients attachments: List of file paths to attach is_html: Whether the body is HTML reply_to: Reply-to address Returns: Dict with success status and message """ try: # Create message msg = MIMEMultipart('mixed') msg['From'] = formataddr((self.connection_manager.email.split('@')[0], self.connection_manager.email)) msg['To'] = ', '.join(to) msg['Subject'] = subject msg['Date'] = formatdate(localtime=True) if cc: msg['Cc'] = ', '.join(cc) if reply_to: msg['Reply-To'] = reply_to # Add body if is_html: # Create alternative container for plain text and HTML msg_alternative = MIMEMultipart('alternative') # Add plain text version (simple strip of HTML tags) text_body = body.replace('<br>', '\n').replace('</p>', '\n\n') import re text_body = re.sub('<[^<]+?>', '', text_body) msg_alternative.attach(MIMEText(text_body, 'plain', 'utf-8')) # Add HTML version msg_alternative.attach(MIMEText(body, 'html', 'utf-8')) msg.attach(msg_alternative) else: msg.attach(MIMEText(body, 'plain', 'utf-8')) # Add attachments if attachments: for file_path in attachments: if os.path.isfile(file_path): self._attach_file(msg, file_path) else: logger.warning(f"Attachment not found: {file_path}") # Connect and send server = self.connection_manager.connect_smtp() try: # Combine all recipients all_recipients = list(to) if cc: all_recipients.extend(cc) if bcc: all_recipients.extend(bcc) # Send email server.send_message(msg, from_addr=self.connection_manager.email, to_addrs=all_recipients) logger.info(f"✅ Email sent successfully to {', '.join(to)}") return { 'success': True, 'message': f'Email sent successfully to {len(all_recipients)} recipient(s)', 'recipients': all_recipients, 'from': self.connection_manager.email } finally: server.quit() except Exception as e: logger.error(f"Failed to send email: {e}") return { 'success': False, 'error': str(e), 'from': self.connection_manager.email } def _attach_file(self, msg: MIMEMultipart, file_path: str): """Attach a file to the email message""" try: # Guess the content type ctype, encoding = mimetypes.guess_type(file_path) if ctype is None or encoding is not None: ctype = 'application/octet-stream' maintype, subtype = ctype.split('/', 1) # Read file with open(file_path, 'rb') as f: file_data = f.read() # Create attachment attachment = MIMEBase(maintype, subtype) attachment.set_payload(file_data) encoders.encode_base64(attachment) # Add header filename = os.path.basename(file_path) attachment.add_header( 'Content-Disposition', f'attachment; filename="{filename}"' ) msg.attach(attachment) logger.info(f"Attached file: {filename}") except Exception as e: logger.error(f"Failed to attach file {file_path}: {e}") raise def reply_email( self, original_msg: Dict[str, Any], body: str, reply_all: bool = False, attachments: Optional[List[str]] = None, is_html: bool = False ) -> Dict[str, Any]: """ Reply to an email Args: original_msg: Original message data (from get_email_detail) body: Reply body reply_all: Whether to reply to all recipients attachments: List of file paths to attach is_html: Whether the body is HTML Returns: Dict with success status and message """ try: # Determine recipients to = [original_msg['from']] cc = [] if reply_all: # Add original recipients (excluding self) original_to = original_msg.get('to', '').split(', ') original_cc = original_msg.get('cc', '').split(', ') for addr in original_to + original_cc: addr = addr.strip() if addr and addr != self.connection_manager.email and addr not in to: cc.append(addr) # Prepare subject subject = original_msg['subject'] if not subject.lower().startswith('re:'): subject = f"Re: {subject}" # Prepare body with quote quoted_body = self._quote_original_message(original_msg) if is_html: full_body = f"{body}<br><br>{quoted_body}" else: full_body = f"{body}\n\n{quoted_body}" # Add reply headers reply_to = original_msg.get('message-id') return self.send_email( to=to, subject=subject, body=full_body, cc=cc if cc else None, attachments=attachments, is_html=is_html, reply_to=reply_to ) except Exception as e: logger.error(f"Failed to reply to email: {e}") return { 'success': False, 'error': str(e) } def forward_email( self, original_msg: Dict[str, Any], to: List[str], body: Optional[str] = None, include_attachments: bool = True ) -> Dict[str, Any]: """ Forward an email Args: original_msg: Original message data (from get_email_detail) to: List of recipient email addresses body: Optional forward message include_attachments: Whether to include original attachments Returns: Dict with success status and message """ try: # Prepare subject subject = original_msg['subject'] if not subject.lower().startswith('fwd:'): subject = f"Fwd: {subject}" # Prepare body forward_header = f"""---------- Forwarded message ---------- From: {original_msg['from']} Date: {original_msg['date']} Subject: {original_msg['subject']} To: {original_msg['to']}""" if original_msg.get('cc'): forward_header += f"\nCc: {original_msg['cc']}" forward_header += "\n\n" # Combine with original body original_body = original_msg.get('body', '') if body: full_body = f"{body}\n\n{forward_header}{original_body}" else: full_body = f"{forward_header}{original_body}" # Handle attachments if needed attachments = None if include_attachments and original_msg.get('attachments'): # Note: This would require downloading attachments first # For now, we'll skip actual attachment forwarding logger.info("Attachment forwarding not yet implemented") return self.send_email( to=to, subject=subject, body=full_body, attachments=attachments, is_html=False ) except Exception as e: logger.error(f"Failed to forward email: {e}") return { 'success': False, 'error': str(e) } def _quote_original_message(self, original_msg: Dict[str, Any]) -> str: """Format original message for quoting in reply""" quote = f"On {original_msg['date']}, {original_msg['from']} wrote:\n" # Quote each line of the original body original_body = original_msg.get('body', '') quoted_lines = [f"> {line}" for line in original_body.split('\n')] quote += '\n'.join(quoted_lines) return quote

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/leeguooooo/email-mcp-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server