Skip to main content
Glama

MCP Email Server

by ai-zerolab
BSD 3-Clause
93
  • Apple
  • Linux
classic.py20.6 kB
import email.utils from collections.abc import AsyncGenerator from datetime import datetime from email.header import Header from email.mime.text import MIMEText from email.parser import BytesParser from email.policy import default from typing import Any import aioimaplib import aiosmtplib from mcp_email_server.config import EmailServer, EmailSettings from mcp_email_server.emails import EmailHandler from mcp_email_server.emails.models import ( EmailBodyResponse, EmailContentBatchResponse, EmailMetadata, EmailMetadataPageResponse, ) from mcp_email_server.log import logger class EmailClient: def __init__(self, email_server: EmailServer, sender: str | None = None): self.email_server = email_server self.sender = sender or email_server.user_name self.imap_class = aioimaplib.IMAP4_SSL if self.email_server.use_ssl else aioimaplib.IMAP4 self.smtp_use_tls = self.email_server.use_ssl self.smtp_start_tls = self.email_server.start_ssl def _parse_email_data(self, raw_email: bytes, email_id: str | None = None) -> dict[str, Any]: # noqa: C901 """Parse raw email data into a structured dictionary.""" parser = BytesParser(policy=default) email_message = parser.parsebytes(raw_email) # Extract email parts subject = email_message.get("Subject", "") sender = email_message.get("From", "") date_str = email_message.get("Date", "") # Extract recipients to_addresses = [] to_header = email_message.get("To", "") if to_header: # Simple parsing - split by comma and strip whitespace to_addresses = [addr.strip() for addr in to_header.split(",")] # Also check CC recipients cc_header = email_message.get("Cc", "") if cc_header: to_addresses.extend([addr.strip() for addr in cc_header.split(",")]) # Parse date try: date_tuple = email.utils.parsedate_tz(date_str) date = datetime.fromtimestamp(email.utils.mktime_tz(date_tuple)) if date_tuple else datetime.now() except Exception: date = datetime.now() # Get body content body = "" attachments = [] if email_message.is_multipart(): for part in email_message.walk(): content_type = part.get_content_type() content_disposition = str(part.get("Content-Disposition", "")) # Handle attachments if "attachment" in content_disposition: filename = part.get_filename() if filename: attachments.append(filename) # Handle text parts elif content_type == "text/plain": body_part = part.get_payload(decode=True) if body_part: charset = part.get_content_charset("utf-8") try: body += body_part.decode(charset) except UnicodeDecodeError: body += body_part.decode("utf-8", errors="replace") else: # Handle plain text emails payload = email_message.get_payload(decode=True) if payload: charset = email_message.get_content_charset("utf-8") try: body = payload.decode(charset) except UnicodeDecodeError: body = payload.decode("utf-8", errors="replace") # TODO: Allow retrieving full email body if body and len(body) > 20000: body = body[:20000] + "...[TRUNCATED]" return { "email_id": email_id or "", "subject": subject, "from": sender, "to": to_addresses, "body": body, "date": date, "attachments": attachments, } @staticmethod def _build_search_criteria( before: datetime | None = None, since: datetime | None = None, subject: str | None = None, body: str | None = None, text: str | None = None, from_address: str | None = None, to_address: str | None = None, ): search_criteria = [] if before: search_criteria.extend(["BEFORE", before.strftime("%d-%b-%Y").upper()]) if since: search_criteria.extend(["SINCE", since.strftime("%d-%b-%Y").upper()]) if subject: search_criteria.extend(["SUBJECT", subject]) if body: search_criteria.extend(["BODY", body]) if text: search_criteria.extend(["TEXT", text]) if from_address: search_criteria.extend(["FROM", from_address]) if to_address: search_criteria.extend(["TO", to_address]) # If no specific criteria, search for ALL if not search_criteria: search_criteria = ["ALL"] return search_criteria async def get_email_count( self, before: datetime | None = None, since: datetime | None = None, subject: str | None = None, from_address: str | None = None, to_address: str | None = None, ) -> int: imap = self.imap_class(self.email_server.host, self.email_server.port) try: # Wait for the connection to be established await imap._client_task await imap.wait_hello_from_server() # Login and select inbox await imap.login(self.email_server.user_name, self.email_server.password) await imap.select("INBOX") search_criteria = self._build_search_criteria( before, since, subject, from_address=from_address, to_address=to_address ) logger.info(f"Count: Search criteria: {search_criteria}") # Search for messages and count them - use UID SEARCH for consistency _, messages = await imap.uid_search(*search_criteria) return len(messages[0].split()) finally: # Ensure we logout properly try: await imap.logout() except Exception as e: logger.info(f"Error during logout: {e}") async def get_emails_metadata_stream( # noqa: C901 self, page: int = 1, page_size: int = 10, before: datetime | None = None, since: datetime | None = None, subject: str | None = None, from_address: str | None = None, to_address: str | None = None, order: str = "desc", ) -> AsyncGenerator[dict[str, Any], None]: imap = self.imap_class(self.email_server.host, self.email_server.port) try: # Wait for the connection to be established await imap._client_task await imap.wait_hello_from_server() # Login and select inbox await imap.login(self.email_server.user_name, self.email_server.password) try: await imap.id(name="mcp-email-server", version="1.0.0") except Exception as e: logger.warning(f"IMAP ID command failed: {e!s}") await imap.select("INBOX") search_criteria = self._build_search_criteria( before, since, subject, from_address=from_address, to_address=to_address ) logger.info(f"Get metadata: Search criteria: {search_criteria}") # Search for messages - use UID SEARCH for better compatibility _, messages = await imap.uid_search(*search_criteria) # Handle empty or None responses if not messages or not messages[0]: logger.warning("No messages returned from search") email_ids = [] else: email_ids = messages[0].split() logger.info(f"Found {len(email_ids)} email IDs") start = (page - 1) * page_size end = start + page_size if order == "desc": email_ids.reverse() # Fetch each message's metadata only for _, email_id in enumerate(email_ids[start:end]): try: # Convert email_id from bytes to string email_id_str = email_id.decode("utf-8") # Fetch only headers to get metadata without body _, data = await imap.uid("fetch", email_id_str, "BODY.PEEK[HEADER]") if not data: logger.error(f"Failed to fetch headers for UID {email_id_str}") continue # Find the email headers in the response raw_headers = None if len(data) > 1 and isinstance(data[1], bytearray): raw_headers = bytes(data[1]) else: # Search through all items for header content for item in data: if isinstance(item, bytes | bytearray) and len(item) > 10: # Skip IMAP protocol responses if isinstance(item, bytes) and b"FETCH" in item: continue # This is likely the header content raw_headers = bytes(item) if isinstance(item, bytearray) else item break if raw_headers: try: # Parse headers only parser = BytesParser(policy=default) email_message = parser.parsebytes(raw_headers) # Extract metadata subject = email_message.get("Subject", "") sender = email_message.get("From", "") date_str = email_message.get("Date", "") # Extract recipients to_addresses = [] to_header = email_message.get("To", "") if to_header: to_addresses = [addr.strip() for addr in to_header.split(",")] cc_header = email_message.get("Cc", "") if cc_header: to_addresses.extend([addr.strip() for addr in cc_header.split(",")]) # Parse date try: date_tuple = email.utils.parsedate_tz(date_str) date = ( datetime.fromtimestamp(email.utils.mktime_tz(date_tuple)) if date_tuple else datetime.now() ) except Exception: date = datetime.now() # For metadata, we don't fetch attachments to save bandwidth # We'll mark it as unknown for now metadata = { "email_id": email_id_str, "subject": subject, "from": sender, "to": to_addresses, "date": date, "attachments": [], # We don't fetch attachment info for metadata } yield metadata except Exception as e: # Log error but continue with other emails logger.error(f"Error parsing email metadata: {e!s}") else: logger.error(f"Could not find header data in response for email ID: {email_id_str}") except Exception as e: logger.error(f"Error fetching email metadata {email_id}: {e!s}") finally: # Ensure we logout properly try: await imap.logout() except Exception as e: logger.info(f"Error during logout: {e}") def _check_email_content(self, data: list) -> bool: """Check if the fetched data contains actual email content.""" for item in data: if isinstance(item, bytes) and b"FETCH (" in item and b"RFC822" not in item and b"BODY" not in item: # This is just metadata, not actual content continue elif isinstance(item, bytes | bytearray) and len(item) > 100: # This looks like email content return True return False def _extract_raw_email(self, data: list) -> bytes | None: """Extract raw email bytes from IMAP response data.""" # The email content is typically at index 1 as a bytearray if len(data) > 1 and isinstance(data[1], bytearray): return bytes(data[1]) # Search through all items for email content for item in data: if isinstance(item, bytes | bytearray) and len(item) > 100: # Skip IMAP protocol responses if isinstance(item, bytes) and b"FETCH" in item: continue # This is likely the email content return bytes(item) if isinstance(item, bytearray) else item return None async def _fetch_email_with_formats(self, imap, email_id: str) -> list | None: """Try different fetch formats to get email data.""" fetch_formats = ["RFC822", "BODY[]", "BODY.PEEK[]", "(BODY.PEEK[])"] for fetch_format in fetch_formats: try: _, data = await imap.uid("fetch", email_id, fetch_format) if data and len(data) > 0 and self._check_email_content(data): return data except Exception as e: logger.debug(f"Fetch format {fetch_format} failed: {e}") return None async def get_email_body_by_id(self, email_id: str) -> dict[str, Any] | None: imap = self.imap_class(self.email_server.host, self.email_server.port) try: # Wait for the connection to be established await imap._client_task await imap.wait_hello_from_server() # Login and select inbox await imap.login(self.email_server.user_name, self.email_server.password) try: await imap.id(name="mcp-email-server", version="1.0.0") except Exception as e: logger.warning(f"IMAP ID command failed: {e!s}") await imap.select("INBOX") # Fetch the specific email by UID data = await self._fetch_email_with_formats(imap, email_id) if not data: logger.error(f"Failed to fetch UID {email_id} with any format") return None # Extract raw email data raw_email = self._extract_raw_email(data) if not raw_email: logger.error(f"Could not find email data in response for email ID: {email_id}") return None # Parse the email try: return self._parse_email_data(raw_email, email_id) except Exception as e: logger.error(f"Error parsing email: {e!s}") return None finally: # Ensure we logout properly try: await imap.logout() except Exception as e: logger.info(f"Error during logout: {e}") async def send_email( self, recipients: list[str], subject: str, body: str, cc: list[str] | None = None, bcc: list[str] | None = None, html: bool = False, ): # Create message with UTF-8 encoding to support special characters content_type = "html" if html else "plain" msg = MIMEText(body, content_type, "utf-8") # Handle subject with special characters if any(ord(c) > 127 for c in subject): msg["Subject"] = Header(subject, "utf-8") else: msg["Subject"] = subject # Handle sender name with special characters if any(ord(c) > 127 for c in self.sender): msg["From"] = Header(self.sender, "utf-8") else: msg["From"] = self.sender msg["To"] = ", ".join(recipients) # Add CC header if provided (visible to recipients) if cc: msg["Cc"] = ", ".join(cc) # Note: BCC recipients are not added to headers (they remain hidden) # but will be included in the actual recipients for SMTP delivery async with aiosmtplib.SMTP( hostname=self.email_server.host, port=self.email_server.port, start_tls=self.smtp_start_tls, use_tls=self.smtp_use_tls, ) as smtp: await smtp.login(self.email_server.user_name, self.email_server.password) # Create a combined list of all recipients for delivery all_recipients = recipients.copy() if cc: all_recipients.extend(cc) if bcc: all_recipients.extend(bcc) await smtp.send_message(msg, recipients=all_recipients) class ClassicEmailHandler(EmailHandler): def __init__(self, email_settings: EmailSettings): self.email_settings = email_settings self.incoming_client = EmailClient(email_settings.incoming) self.outgoing_client = EmailClient( email_settings.outgoing, sender=f"{email_settings.full_name} <{email_settings.email_address}>", ) async def get_emails_metadata( self, page: int = 1, page_size: int = 10, before: datetime | None = None, since: datetime | None = None, subject: str | None = None, from_address: str | None = None, to_address: str | None = None, order: str = "desc", ) -> EmailMetadataPageResponse: emails = [] async for email_data in self.incoming_client.get_emails_metadata_stream( page, page_size, before, since, subject, from_address, to_address, order ): emails.append(EmailMetadata.from_email(email_data)) total = await self.incoming_client.get_email_count( before, since, subject, from_address=from_address, to_address=to_address ) return EmailMetadataPageResponse( page=page, page_size=page_size, before=before, since=since, subject=subject, emails=emails, total=total, ) async def get_emails_content(self, email_ids: list[str]) -> EmailContentBatchResponse: """Batch retrieve email body content""" emails = [] failed_ids = [] for email_id in email_ids: try: email_data = await self.incoming_client.get_email_body_by_id(email_id) if email_data: emails.append( EmailBodyResponse( email_id=email_data["email_id"], subject=email_data["subject"], sender=email_data["from"], recipients=email_data["to"], date=email_data["date"], body=email_data["body"], attachments=email_data["attachments"], ) ) else: failed_ids.append(email_id) except Exception as e: logger.error(f"Failed to retrieve email {email_id}: {e}") failed_ids.append(email_id) return EmailContentBatchResponse( emails=emails, requested_count=len(email_ids), retrieved_count=len(emails), failed_ids=failed_ids, ) async def send_email( self, recipients: list[str], subject: str, body: str, cc: list[str] | None = None, bcc: list[str] | None = None, html: bool = False, ) -> None: await self.outgoing_client.send_email(recipients, subject, body, cc, bcc, html)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ai-zerolab/mcp-email-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server