Skip to main content
Glama
email_receiver.py8.78 kB
""" Email receiving service using IMAP and POP3. """ import aioimaplib import asyncio import email import email.message from email.header import decode_header from typing import List, Dict, Any, Optional import poplib from datetime import datetime from ..config import get_settings class EmailReceiver: """Service for receiving emails via IMAP or POP3.""" def __init__(self): """Initialize the email receiver with configuration.""" self.settings = get_settings() async def receive_emails_imap( self, mailbox: str = "INBOX", limit: int = 10, unread_only: bool = False ) -> Dict[str, Any]: """ Receive emails using IMAP. Args: mailbox: Mailbox to read from (default: INBOX) limit: Maximum number of emails to retrieve unread_only: Only retrieve unread emails Returns: Dictionary with status and email list """ try: # Connect to IMAP server imap = aioimaplib.IMAP4_SSL( host=self.settings.IMAP_SERVER, port=self.settings.IMAP_PORT ) await imap.wait_hello_from_server() # Login await imap.login( self.settings.IMAP_USERNAME, self.settings.IMAP_PASSWORD ) # Select mailbox await imap.select(mailbox) # Search for emails search_criteria = "UNSEEN" if unread_only else "ALL" response = await imap.search(search_criteria) if response[0] != "OK": return { "status": "error", "message": "Failed to search emails" } email_ids = response[1][0].split() # Limit the number of emails email_ids = email_ids[-limit:] if len(email_ids) > limit else email_ids emails = [] for email_id in email_ids: # Convert bytes to string for aioimaplib email_id_str = email_id.decode() if isinstance(email_id, bytes) else str(email_id) email_data = await self._fetch_email_imap(imap, email_id_str) if email_data: emails.append(email_data) # Logout await imap.logout() return { "status": "success", "count": len(emails), "emails": emails } except Exception as e: return { "status": "error", "message": f"Failed to receive emails via IMAP: {str(e)}" } async def _fetch_email_imap( self, imap: aioimaplib.IMAP4_SSL, email_id: str ) -> Optional[Dict[str, Any]]: """ Fetch a single email via IMAP. Args: imap: IMAP connection email_id: Email ID to fetch Returns: Dictionary with email data or None """ try: # Use RFC822 to fetch the complete message response = await imap.fetch(email_id, "RFC822") if response[0] != "OK": return None # Extract email body from response # Based on our testing, the message is in response[1][1] if len(response[1]) >= 2: email_body = response[1][1] elif len(response[1]) >= 1: email_body = response[1][0] else: return None # Convert to bytes if needed if isinstance(email_body, str): email_body = email_body.encode() # Parse the email message email_message = email.message_from_bytes(email_body) return self._parse_email(email_message, email_id) except Exception as e: return None def receive_emails_pop3( self, limit: int = 10 ) -> Dict[str, Any]: """ Receive emails using POP3 (synchronous operation). Args: limit: Maximum number of emails to retrieve Returns: Dictionary with status and email list """ try: # Connect to POP3 server if self.settings.POP3_USE_SSL: pop = poplib.POP3_SSL( self.settings.POP3_SERVER, self.settings.POP3_PORT ) else: pop = poplib.POP3( self.settings.POP3_SERVER, self.settings.POP3_PORT ) # Login pop.user(self.settings.POP3_USERNAME) pop.pass_(self.settings.POP3_PASSWORD) # Get message count num_messages = len(pop.list()[1]) # Limit the number of emails start_index = max(1, num_messages - limit + 1) emails = [] for i in range(start_index, num_messages + 1): try: response, lines, octets = pop.retr(i) email_content = b"\r\n".join(lines) email_message = email.message_from_bytes(email_content) email_data = self._parse_email(email_message, str(i)) if email_data: emails.append(email_data) except Exception as e: continue # Quit pop.quit() return { "status": "success", "count": len(emails), "emails": emails } except Exception as e: return { "status": "error", "message": f"Failed to receive emails via POP3: {str(e)}" } def _parse_email( self, email_message: email.message.Message, email_id: str ) -> Dict[str, Any]: """ Parse an email message. Args: email_message: Email message object email_id: Email ID Returns: Dictionary with parsed email data """ # Decode subject subject = "" if email_message["Subject"]: subject_parts = decode_header(email_message["Subject"]) subject = "" for content, encoding in subject_parts: if isinstance(content, bytes): subject += content.decode(encoding or "utf-8", errors="ignore") else: subject += content # Get sender from_header = email_message.get("From", "") # Get recipient to_header = email_message.get("To", "") # Get date date_header = email_message.get("Date", "") # Get body body = "" if email_message.is_multipart(): for part in email_message.walk(): content_type = part.get_content_type() if content_type == "text/plain": try: body = part.get_payload(decode=True).decode(errors="ignore") break except: pass else: try: body = email_message.get_payload(decode=True).decode(errors="ignore") except: body = str(email_message.get_payload()) # Get attachments info attachments = [] if email_message.is_multipart(): for part in email_message.walk(): if part.get_content_disposition() == "attachment": filename = part.get_filename() if filename: attachments.append({ "filename": filename, "content_type": part.get_content_type() }) return { "id": email_id, "subject": subject, "from": from_header, "to": to_header, "date": date_header, "body": body[:1000] if len(body) > 1000 else body, # Limit body length "body_length": len(body), "attachments": attachments, "has_attachments": len(attachments) > 0 }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/bedro96/email-send-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server