classic.py•20.6 kB
import email.utils
from collections.abc import AsyncGenerator
from datetime import datetime
from email.header import Header
from email.mime.text import MIMEText
from email.parser import BytesParser
from email.policy import default
from typing import Any
import aioimaplib
import aiosmtplib
from mcp_email_server.config import EmailServer, EmailSettings
from mcp_email_server.emails import EmailHandler
from mcp_email_server.emails.models import (
EmailBodyResponse,
EmailContentBatchResponse,
EmailMetadata,
EmailMetadataPageResponse,
)
from mcp_email_server.log import logger
class EmailClient:
def __init__(self, email_server: EmailServer, sender: str | None = None):
self.email_server = email_server
self.sender = sender or email_server.user_name
self.imap_class = aioimaplib.IMAP4_SSL if self.email_server.use_ssl else aioimaplib.IMAP4
self.smtp_use_tls = self.email_server.use_ssl
self.smtp_start_tls = self.email_server.start_ssl
def _parse_email_data(self, raw_email: bytes, email_id: str | None = None) -> dict[str, Any]: # noqa: C901
"""Parse raw email data into a structured dictionary."""
parser = BytesParser(policy=default)
email_message = parser.parsebytes(raw_email)
# Extract email parts
subject = email_message.get("Subject", "")
sender = email_message.get("From", "")
date_str = email_message.get("Date", "")
# Extract recipients
to_addresses = []
to_header = email_message.get("To", "")
if to_header:
# Simple parsing - split by comma and strip whitespace
to_addresses = [addr.strip() for addr in to_header.split(",")]
# Also check CC recipients
cc_header = email_message.get("Cc", "")
if cc_header:
to_addresses.extend([addr.strip() for addr in cc_header.split(",")])
# Parse date
try:
date_tuple = email.utils.parsedate_tz(date_str)
date = datetime.fromtimestamp(email.utils.mktime_tz(date_tuple)) if date_tuple else datetime.now()
except Exception:
date = datetime.now()
# Get body content
body = ""
attachments = []
if email_message.is_multipart():
for part in email_message.walk():
content_type = part.get_content_type()
content_disposition = str(part.get("Content-Disposition", ""))
# Handle attachments
if "attachment" in content_disposition:
filename = part.get_filename()
if filename:
attachments.append(filename)
# Handle text parts
elif content_type == "text/plain":
body_part = part.get_payload(decode=True)
if body_part:
charset = part.get_content_charset("utf-8")
try:
body += body_part.decode(charset)
except UnicodeDecodeError:
body += body_part.decode("utf-8", errors="replace")
else:
# Handle plain text emails
payload = email_message.get_payload(decode=True)
if payload:
charset = email_message.get_content_charset("utf-8")
try:
body = payload.decode(charset)
except UnicodeDecodeError:
body = payload.decode("utf-8", errors="replace")
# TODO: Allow retrieving full email body
if body and len(body) > 20000:
body = body[:20000] + "...[TRUNCATED]"
return {
"email_id": email_id or "",
"subject": subject,
"from": sender,
"to": to_addresses,
"body": body,
"date": date,
"attachments": attachments,
}
@staticmethod
def _build_search_criteria(
before: datetime | None = None,
since: datetime | None = None,
subject: str | None = None,
body: str | None = None,
text: str | None = None,
from_address: str | None = None,
to_address: str | None = None,
):
search_criteria = []
if before:
search_criteria.extend(["BEFORE", before.strftime("%d-%b-%Y").upper()])
if since:
search_criteria.extend(["SINCE", since.strftime("%d-%b-%Y").upper()])
if subject:
search_criteria.extend(["SUBJECT", subject])
if body:
search_criteria.extend(["BODY", body])
if text:
search_criteria.extend(["TEXT", text])
if from_address:
search_criteria.extend(["FROM", from_address])
if to_address:
search_criteria.extend(["TO", to_address])
# If no specific criteria, search for ALL
if not search_criteria:
search_criteria = ["ALL"]
return search_criteria
async def get_email_count(
self,
before: datetime | None = None,
since: datetime | None = None,
subject: str | None = None,
from_address: str | None = None,
to_address: str | None = None,
) -> int:
imap = self.imap_class(self.email_server.host, self.email_server.port)
try:
# Wait for the connection to be established
await imap._client_task
await imap.wait_hello_from_server()
# Login and select inbox
await imap.login(self.email_server.user_name, self.email_server.password)
await imap.select("INBOX")
search_criteria = self._build_search_criteria(
before, since, subject, from_address=from_address, to_address=to_address
)
logger.info(f"Count: Search criteria: {search_criteria}")
# Search for messages and count them - use UID SEARCH for consistency
_, messages = await imap.uid_search(*search_criteria)
return len(messages[0].split())
finally:
# Ensure we logout properly
try:
await imap.logout()
except Exception as e:
logger.info(f"Error during logout: {e}")
async def get_emails_metadata_stream( # noqa: C901
self,
page: int = 1,
page_size: int = 10,
before: datetime | None = None,
since: datetime | None = None,
subject: str | None = None,
from_address: str | None = None,
to_address: str | None = None,
order: str = "desc",
) -> AsyncGenerator[dict[str, Any], None]:
imap = self.imap_class(self.email_server.host, self.email_server.port)
try:
# Wait for the connection to be established
await imap._client_task
await imap.wait_hello_from_server()
# Login and select inbox
await imap.login(self.email_server.user_name, self.email_server.password)
try:
await imap.id(name="mcp-email-server", version="1.0.0")
except Exception as e:
logger.warning(f"IMAP ID command failed: {e!s}")
await imap.select("INBOX")
search_criteria = self._build_search_criteria(
before, since, subject, from_address=from_address, to_address=to_address
)
logger.info(f"Get metadata: Search criteria: {search_criteria}")
# Search for messages - use UID SEARCH for better compatibility
_, messages = await imap.uid_search(*search_criteria)
# Handle empty or None responses
if not messages or not messages[0]:
logger.warning("No messages returned from search")
email_ids = []
else:
email_ids = messages[0].split()
logger.info(f"Found {len(email_ids)} email IDs")
start = (page - 1) * page_size
end = start + page_size
if order == "desc":
email_ids.reverse()
# Fetch each message's metadata only
for _, email_id in enumerate(email_ids[start:end]):
try:
# Convert email_id from bytes to string
email_id_str = email_id.decode("utf-8")
# Fetch only headers to get metadata without body
_, data = await imap.uid("fetch", email_id_str, "BODY.PEEK[HEADER]")
if not data:
logger.error(f"Failed to fetch headers for UID {email_id_str}")
continue
# Find the email headers in the response
raw_headers = None
if len(data) > 1 and isinstance(data[1], bytearray):
raw_headers = bytes(data[1])
else:
# Search through all items for header content
for item in data:
if isinstance(item, bytes | bytearray) and len(item) > 10:
# Skip IMAP protocol responses
if isinstance(item, bytes) and b"FETCH" in item:
continue
# This is likely the header content
raw_headers = bytes(item) if isinstance(item, bytearray) else item
break
if raw_headers:
try:
# Parse headers only
parser = BytesParser(policy=default)
email_message = parser.parsebytes(raw_headers)
# Extract metadata
subject = email_message.get("Subject", "")
sender = email_message.get("From", "")
date_str = email_message.get("Date", "")
# Extract recipients
to_addresses = []
to_header = email_message.get("To", "")
if to_header:
to_addresses = [addr.strip() for addr in to_header.split(",")]
cc_header = email_message.get("Cc", "")
if cc_header:
to_addresses.extend([addr.strip() for addr in cc_header.split(",")])
# Parse date
try:
date_tuple = email.utils.parsedate_tz(date_str)
date = (
datetime.fromtimestamp(email.utils.mktime_tz(date_tuple))
if date_tuple
else datetime.now()
)
except Exception:
date = datetime.now()
# For metadata, we don't fetch attachments to save bandwidth
# We'll mark it as unknown for now
metadata = {
"email_id": email_id_str,
"subject": subject,
"from": sender,
"to": to_addresses,
"date": date,
"attachments": [], # We don't fetch attachment info for metadata
}
yield metadata
except Exception as e:
# Log error but continue with other emails
logger.error(f"Error parsing email metadata: {e!s}")
else:
logger.error(f"Could not find header data in response for email ID: {email_id_str}")
except Exception as e:
logger.error(f"Error fetching email metadata {email_id}: {e!s}")
finally:
# Ensure we logout properly
try:
await imap.logout()
except Exception as e:
logger.info(f"Error during logout: {e}")
def _check_email_content(self, data: list) -> bool:
"""Check if the fetched data contains actual email content."""
for item in data:
if isinstance(item, bytes) and b"FETCH (" in item and b"RFC822" not in item and b"BODY" not in item:
# This is just metadata, not actual content
continue
elif isinstance(item, bytes | bytearray) and len(item) > 100:
# This looks like email content
return True
return False
def _extract_raw_email(self, data: list) -> bytes | None:
"""Extract raw email bytes from IMAP response data."""
# The email content is typically at index 1 as a bytearray
if len(data) > 1 and isinstance(data[1], bytearray):
return bytes(data[1])
# Search through all items for email content
for item in data:
if isinstance(item, bytes | bytearray) and len(item) > 100:
# Skip IMAP protocol responses
if isinstance(item, bytes) and b"FETCH" in item:
continue
# This is likely the email content
return bytes(item) if isinstance(item, bytearray) else item
return None
async def _fetch_email_with_formats(self, imap, email_id: str) -> list | None:
"""Try different fetch formats to get email data."""
fetch_formats = ["RFC822", "BODY[]", "BODY.PEEK[]", "(BODY.PEEK[])"]
for fetch_format in fetch_formats:
try:
_, data = await imap.uid("fetch", email_id, fetch_format)
if data and len(data) > 0 and self._check_email_content(data):
return data
except Exception as e:
logger.debug(f"Fetch format {fetch_format} failed: {e}")
return None
async def get_email_body_by_id(self, email_id: str) -> dict[str, Any] | None:
imap = self.imap_class(self.email_server.host, self.email_server.port)
try:
# Wait for the connection to be established
await imap._client_task
await imap.wait_hello_from_server()
# Login and select inbox
await imap.login(self.email_server.user_name, self.email_server.password)
try:
await imap.id(name="mcp-email-server", version="1.0.0")
except Exception as e:
logger.warning(f"IMAP ID command failed: {e!s}")
await imap.select("INBOX")
# Fetch the specific email by UID
data = await self._fetch_email_with_formats(imap, email_id)
if not data:
logger.error(f"Failed to fetch UID {email_id} with any format")
return None
# Extract raw email data
raw_email = self._extract_raw_email(data)
if not raw_email:
logger.error(f"Could not find email data in response for email ID: {email_id}")
return None
# Parse the email
try:
return self._parse_email_data(raw_email, email_id)
except Exception as e:
logger.error(f"Error parsing email: {e!s}")
return None
finally:
# Ensure we logout properly
try:
await imap.logout()
except Exception as e:
logger.info(f"Error during logout: {e}")
async def send_email(
self,
recipients: list[str],
subject: str,
body: str,
cc: list[str] | None = None,
bcc: list[str] | None = None,
html: bool = False,
):
# Create message with UTF-8 encoding to support special characters
content_type = "html" if html else "plain"
msg = MIMEText(body, content_type, "utf-8")
# Handle subject with special characters
if any(ord(c) > 127 for c in subject):
msg["Subject"] = Header(subject, "utf-8")
else:
msg["Subject"] = subject
# Handle sender name with special characters
if any(ord(c) > 127 for c in self.sender):
msg["From"] = Header(self.sender, "utf-8")
else:
msg["From"] = self.sender
msg["To"] = ", ".join(recipients)
# Add CC header if provided (visible to recipients)
if cc:
msg["Cc"] = ", ".join(cc)
# Note: BCC recipients are not added to headers (they remain hidden)
# but will be included in the actual recipients for SMTP delivery
async with aiosmtplib.SMTP(
hostname=self.email_server.host,
port=self.email_server.port,
start_tls=self.smtp_start_tls,
use_tls=self.smtp_use_tls,
) as smtp:
await smtp.login(self.email_server.user_name, self.email_server.password)
# Create a combined list of all recipients for delivery
all_recipients = recipients.copy()
if cc:
all_recipients.extend(cc)
if bcc:
all_recipients.extend(bcc)
await smtp.send_message(msg, recipients=all_recipients)
class ClassicEmailHandler(EmailHandler):
def __init__(self, email_settings: EmailSettings):
self.email_settings = email_settings
self.incoming_client = EmailClient(email_settings.incoming)
self.outgoing_client = EmailClient(
email_settings.outgoing,
sender=f"{email_settings.full_name} <{email_settings.email_address}>",
)
async def get_emails_metadata(
self,
page: int = 1,
page_size: int = 10,
before: datetime | None = None,
since: datetime | None = None,
subject: str | None = None,
from_address: str | None = None,
to_address: str | None = None,
order: str = "desc",
) -> EmailMetadataPageResponse:
emails = []
async for email_data in self.incoming_client.get_emails_metadata_stream(
page, page_size, before, since, subject, from_address, to_address, order
):
emails.append(EmailMetadata.from_email(email_data))
total = await self.incoming_client.get_email_count(
before, since, subject, from_address=from_address, to_address=to_address
)
return EmailMetadataPageResponse(
page=page,
page_size=page_size,
before=before,
since=since,
subject=subject,
emails=emails,
total=total,
)
async def get_emails_content(self, email_ids: list[str]) -> EmailContentBatchResponse:
"""Batch retrieve email body content"""
emails = []
failed_ids = []
for email_id in email_ids:
try:
email_data = await self.incoming_client.get_email_body_by_id(email_id)
if email_data:
emails.append(
EmailBodyResponse(
email_id=email_data["email_id"],
subject=email_data["subject"],
sender=email_data["from"],
recipients=email_data["to"],
date=email_data["date"],
body=email_data["body"],
attachments=email_data["attachments"],
)
)
else:
failed_ids.append(email_id)
except Exception as e:
logger.error(f"Failed to retrieve email {email_id}: {e}")
failed_ids.append(email_id)
return EmailContentBatchResponse(
emails=emails,
requested_count=len(email_ids),
retrieved_count=len(emails),
failed_ids=failed_ids,
)
async def send_email(
self,
recipients: list[str],
subject: str,
body: str,
cc: list[str] | None = None,
bcc: list[str] | None = None,
html: bool = False,
) -> None:
await self.outgoing_client.send_email(recipients, subject, body, cc, bcc, html)