Skip to main content
Glama
leeguooooo
by leeguooooo
email_operations.py12.5 kB
""" Enhanced email operations """ import imaplib import logging from typing import List, Dict, Any, Optional from email import message_from_bytes from email.header import decode_header import os import email logger = logging.getLogger(__name__) def _logout_safely(mail): """Close and logout IMAP connection without raising state errors.""" if not mail: return try: if getattr(mail, 'state', '').upper() == 'SELECTED': mail.close() except Exception: pass try: mail.logout() except Exception: pass class EmailOperations: """Handles enhanced email operations""" def __init__(self, connection_manager): self.connection_manager = connection_manager def mark_email_unread(self, email_id: str, folder: str = "INBOX") -> Dict[str, Any]: """ Mark an email as unread Args: email_id: Email ID to mark as unread folder: Email folder Returns: Dict with success status """ try: mail = self.connection_manager.connect_imap() try: # Select folder result, data = mail.select(folder) if result != 'OK': raise ValueError(f"Cannot select folder: {folder}") # Remove \\Seen flag result, data = mail.store(email_id, '-FLAGS', '\\Seen') if result == 'OK': logger.info(f"✅ Marked email {email_id} as unread") return { 'success': True, 'message': f'Email marked as unread', 'email_id': email_id, 'folder': folder, 'account': self.connection_manager.email } else: raise ValueError(f"Failed to mark as unread: {data}") finally: _logout_safely(mail) except Exception as e: logger.error(f"Failed to mark email as unread: {e}") return { 'success': False, 'error': str(e), 'account': self.connection_manager.email } def flag_email( self, email_id: str, flag_type: str = "flagged", # flagged, important, answered folder: str = "INBOX", set_flag: bool = True ) -> Dict[str, Any]: """ Flag or unflag an email Args: email_id: Email ID to flag flag_type: Type of flag (flagged/important/answered) folder: Email folder set_flag: True to set flag, False to remove Returns: Dict with success status """ try: mail = self.connection_manager.connect_imap() try: # Select folder result, data = mail.select(folder) if result != 'OK': raise ValueError(f"Cannot select folder: {folder}") # Map flag types to IMAP flags flag_map = { 'flagged': '\\Flagged', 'important': '\\Flagged', # Most providers use Flagged for important 'answered': '\\Answered' } imap_flag = flag_map.get(flag_type, '\\Flagged') operation = '+FLAGS' if set_flag else '-FLAGS' # Set or remove flag result, data = mail.store(email_id, operation, imap_flag) if result == 'OK': action = "set" if set_flag else "removed" logger.info(f"✅ Flag {flag_type} {action} for email {email_id}") return { 'success': True, 'message': f'Flag "{flag_type}" {action}', 'email_id': email_id, 'flag_type': flag_type, 'set_flag': set_flag, 'folder': folder, 'account': self.connection_manager.email } else: raise ValueError(f"Failed to update flag: {data}") finally: _logout_safely(mail) except Exception as e: logger.error(f"Failed to flag email: {e}") return { 'success': False, 'error': str(e), 'account': self.connection_manager.email } def get_email_attachments( self, email_id: str, save_path: Optional[str] = None, folder: str = "INBOX" ) -> Dict[str, Any]: """ Get and optionally download email attachments Args: email_id: Email ID to get attachments from save_path: Directory to save attachments (optional) folder: Email folder Returns: Dict with attachment information """ try: mail = self.connection_manager.connect_imap() try: # Select folder result, data = mail.select(folder, readonly=True) if result != 'OK': raise ValueError(f"Cannot select folder: {folder}") # Fetch email result, data = mail.fetch(email_id, '(RFC822)') if result != 'OK': raise ValueError(f"Failed to fetch email {email_id}") # Parse email raw_email = data[0][1] msg = message_from_bytes(raw_email) attachments = [] attachment_count = 0 # Walk through email parts for part in msg.walk(): # Skip non-attachment parts if part.get_content_disposition() is None: continue if 'attachment' not in part.get_content_disposition(): continue # Get filename filename = part.get_filename() if not filename: continue # Decode filename if needed decoded_filename = decode_header(filename) if decoded_filename: filename = "" for text, encoding in decoded_filename: if isinstance(text, bytes): filename += text.decode(encoding or 'utf-8', errors='ignore') else: filename += text # Get attachment data attachment_data = part.get_payload(decode=True) size = len(attachment_data) attachment_info = { 'filename': filename, 'size': size, 'size_formatted': self._format_size(size), 'content_type': part.get_content_type() } # Save if path provided if save_path and os.path.isdir(save_path): filepath = os.path.join(save_path, filename) # Handle duplicate filenames base, ext = os.path.splitext(filepath) counter = 1 while os.path.exists(filepath): filepath = f"{base}_{counter}{ext}" counter += 1 with open(filepath, 'wb') as f: f.write(attachment_data) attachment_info['saved_path'] = filepath logger.info(f"Saved attachment: {filepath}") attachments.append(attachment_info) attachment_count += 1 return { 'success': True, 'attachments': attachments, 'attachment_count': attachment_count, 'email_id': email_id, 'folder': folder, 'account': self.connection_manager.email } finally: _logout_safely(mail) except Exception as e: logger.error(f"Failed to get attachments: {e}") return { 'success': False, 'error': str(e), 'account': self.connection_manager.email } def batch_mark_unread(self, email_ids: List[str], folder: str = "INBOX") -> Dict[str, Any]: """ Mark multiple emails as unread Args: email_ids: List of email IDs to mark as unread folder: Email folder Returns: Dict with success status and count """ try: mail = self.connection_manager.connect_imap() try: # Select folder result, data = mail.select(folder) if result != 'OK': raise ValueError(f"Cannot select folder: {folder}") marked_count = 0 failed_ids = [] for email_id in email_ids: try: # Remove \\Seen flag result, data = mail.store(email_id, '-FLAGS', '\\Seen') if result == 'OK': marked_count += 1 else: failed_ids.append(email_id) except Exception as e: logger.warning(f"Failed to mark email {email_id} as unread: {e}") failed_ids.append(email_id) result_data = { 'success': True, 'message': f'Marked {marked_count}/{len(email_ids)} emails as unread', 'marked_count': marked_count, 'folder': folder, 'account': self.connection_manager.email } if failed_ids: result_data['failed_ids'] = failed_ids return result_data finally: mail.close() mail.logout() except Exception as e: logger.error(f"Failed to batch mark as unread: {e}") return { 'success': False, 'error': str(e), 'account': self.connection_manager.email } def archive_emails( self, email_ids: List[str], archive_folder: str = "Archive", source_folder: str = "INBOX" ) -> Dict[str, Any]: """ Archive emails to a specific folder Args: email_ids: List of email IDs to archive archive_folder: Archive folder name source_folder: Source folder Returns: Dict with success status """ # Use folder operations to move emails from .folder_operations import FolderOperations folder_ops = FolderOperations(self.connection_manager) return folder_ops.move_emails_to_folder( email_ids=email_ids, target_folder=archive_folder, source_folder=source_folder ) def _format_size(self, size_bytes: int) -> str: """Format file size in human-readable format""" for unit in ['B', 'KB', 'MB', 'GB']: if size_bytes < 1024.0: return f"{size_bytes:.1f} {unit}" size_bytes /= 1024.0 return f"{size_bytes:.1f} TB"

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/leeguooooo/email-mcp-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server