Skip to main content
Glama
leeguooooo
by leeguooooo
batch_fetch.py10.5 kB
""" Batch email fetching for improved performance """ import imaplib import logging from typing import List, Dict, Any, Optional, Tuple import email from email.header import decode_header import re from datetime import datetime logger = logging.getLogger(__name__) class BatchEmailFetcher: """Fetch multiple emails in a single request for better performance""" def __init__(self, connection: imaplib.IMAP4_SSL): self.connection = connection def fetch_batch( self, email_ids: List[str], include_body: bool = False, folder: str = "INBOX" ) -> List[Dict[str, Any]]: """ Fetch multiple emails in a single IMAP command Args: email_ids: List of email IDs (sequence numbers) include_body: Whether to include email body folder: Folder to fetch from Returns: List of email dictionaries """ if not email_ids: return [] try: # Select folder status, data = self.connection.select(folder, readonly=True) if status != 'OK': raise Exception(f"Cannot select folder {folder}") # Convert list to IMAP sequence # Group consecutive numbers for efficiency sequence = self._optimize_sequence(email_ids) # Determine what to fetch if include_body: fetch_parts = '(RFC822.HEADER BODY.PEEK[TEXT] FLAGS)' else: fetch_parts = '(RFC822.HEADER FLAGS)' logger.info(f"Batch fetching {len(email_ids)} emails with sequence: {sequence}") # Batch fetch status, data = self.connection.fetch(sequence, fetch_parts) if status != 'OK': logger.error(f"Batch fetch failed: {status}") return [] # Parse results emails = [] for response_part in data: if isinstance(response_part, tuple): email_data = self._parse_email_response(response_part, include_body) if email_data: emails.append(email_data) # Sort by date (newest first) emails.sort(key=lambda x: x.get('timestamp', 0), reverse=True) logger.info(f"Successfully fetched {len(emails)} emails") return emails except Exception as e: logger.error(f"Batch fetch failed: {e}") return [] def fetch_unread_batch( self, limit: int = 50, folder: str = "INBOX" ) -> Tuple[List[Dict[str, Any]], int]: """ Efficiently fetch unread emails using batch operations Returns: Tuple of (emails list, total unread count) """ try: # Select folder status, data = self.connection.select(folder, readonly=True) if status != 'OK': raise Exception(f"Cannot select folder {folder}") # Search for unread emails status, data = self.connection.search(None, 'UNSEEN') if status != 'OK': return [], 0 unread_ids = data[0].split() if data[0] else [] total_unread = len(unread_ids) if not unread_ids: return [], 0 # Limit and reverse (newest first) if len(unread_ids) > limit: unread_ids = unread_ids[-limit:] # Convert bytes to strings unread_ids = [id.decode() for id in unread_ids] # Batch fetch emails = self.fetch_batch(unread_ids, include_body=False, folder=folder) return emails, total_unread except Exception as e: logger.error(f"Failed to fetch unread batch: {e}") return [], 0 def _optimize_sequence(self, email_ids: List[str]) -> str: """ Optimize email ID list into IMAP sequence format Groups consecutive numbers for efficiency Example: [1,2,3,5,6,9] -> "1:3,5:6,9" """ if not email_ids: return "" # Convert to integers and sort try: ids = sorted([int(id) for id in email_ids]) except: # Fallback to simple comma-separated return ','.join(email_ids) # Group consecutive numbers sequences = [] start = ids[0] end = ids[0] for i in range(1, len(ids)): if ids[i] == end + 1: end = ids[i] else: if start == end: sequences.append(str(start)) else: sequences.append(f"{start}:{end}") start = ids[i] end = ids[i] # Add last sequence if start == end: sequences.append(str(start)) else: sequences.append(f"{start}:{end}") return ','.join(sequences) def _parse_email_response( self, response_part: Tuple, include_body: bool ) -> Optional[Dict[str, Any]]: """Parse email data from IMAP response""" try: # Extract email ID from response match = re.search(r'(\d+) \(', response_part[0].decode()) email_id = match.group(1) if match else None # Parse headers header_data = response_part[1] if not isinstance(header_data, bytes): return None msg = email.message_from_bytes(header_data) # Extract flags flags_match = re.search(br'FLAGS \((.*?)\)', response_part[0]) flags = flags_match.group(1).decode() if flags_match else "" is_unread = '\\Seen' not in flags is_flagged = '\\Flagged' in flags # Decode headers subject = self._decode_header(msg.get('Subject', '')) from_addr = self._decode_header(msg.get('From', '')) to_addr = self._decode_header(msg.get('To', '')) date_str = msg.get('Date', '') message_id = msg.get('Message-ID', '') # Parse date try: date_tuple = email.utils.parsedate_tz(date_str) timestamp = email.utils.mktime_tz(date_tuple) if date_tuple else 0 date_formatted = datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S") except: timestamp = 0 date_formatted = date_str # Check for attachments has_attachments = self._has_attachments(msg) email_data = { 'id': email_id, 'subject': subject, 'from': from_addr, 'to': to_addr, 'date': date_formatted, 'timestamp': timestamp, 'unread': is_unread, 'flagged': is_flagged, 'has_attachments': has_attachments, 'message_id': message_id } # Add body preview if requested if include_body: email_data['preview'] = self._get_body_preview(msg) return email_data except Exception as e: logger.error(f"Failed to parse email response: {e}") return None def _decode_header(self, header_value: str) -> str: """Decode email header handling various encodings""" if not header_value: return "" try: decoded_parts = decode_header(header_value) result = [] for part, encoding in decoded_parts: if isinstance(part, bytes): if encoding: try: result.append(part.decode(encoding)) except: result.append(part.decode('utf-8', errors='ignore')) else: result.append(part.decode('utf-8', errors='ignore')) else: result.append(str(part)) return ' '.join(result) except: return str(header_value) def _has_attachments(self, msg: email.message.Message) -> bool: """Check if email has attachments""" for part in msg.walk(): if part.get_content_disposition() == 'attachment': return True # Also check for inline images that might be considered attachments if part.get_content_type().startswith('image/'): return True return False def _get_body_preview(self, msg: email.message.Message, length: int = 200) -> str: """Get email body preview""" try: body = "" # Try to get plain text first for part in msg.walk(): if part.get_content_type() == "text/plain": payload = part.get_payload(decode=True) if payload: charset = part.get_content_charset() or 'utf-8' body = payload.decode(charset, errors='ignore') break # If no plain text, try HTML if not body: for part in msg.walk(): if part.get_content_type() == "text/html": payload = part.get_payload(decode=True) if payload: charset = part.get_content_charset() or 'utf-8' html = payload.decode(charset, errors='ignore') # Simple HTML to text body = re.sub('<[^<]+?>', ' ', html) break # Clean and truncate body = re.sub(r'\s+', ' ', body).strip() if len(body) > length: body = body[:length] + "..." return body except: return ""

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/leeguooooo/email-mcp-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server