from datetime import datetime
from typing import Annotated, Literal
from mcp.server.fastmcp import FastMCP
from pydantic import Field
from mcp_email_server.config import (
AccountAttributes,
EmailSettings,
ProviderSettings,
get_settings,
)
from mcp_email_server.emails.dispatcher import dispatch_handler
from mcp_email_server.emails.models import (
AttachmentDownloadResponse,
EmailContentBatchResponse,
EmailMetadataPageResponse,
)
mcp = FastMCP("email")
@mcp.resource("email://{account_name}")
async def get_account(account_name: str) -> EmailSettings | ProviderSettings | None:
settings = get_settings()
return settings.get_account(account_name, masked=True)
@mcp.tool(description="List all configured email accounts with masked credentials.")
async def list_available_accounts() -> list[AccountAttributes]:
settings = get_settings()
return [account.masked() for account in settings.get_accounts()]
@mcp.tool(description="Add a new email account configuration to the settings.")
async def add_email_account(email: EmailSettings) -> str:
settings = get_settings()
settings.add_email(email)
settings.store()
return f"Successfully added email account '{email.account_name}'"
@mcp.tool(
description="List email metadata (email_id, subject, sender, recipients, date) without body content. Returns email_id for use with get_emails_content."
)
async def list_emails_metadata(
account_name: Annotated[str, Field(description="The name of the email account.")],
page: Annotated[
int,
Field(default=1, description="The page number to retrieve (starting from 1)."),
] = 1,
page_size: Annotated[int, Field(default=10, description="The number of emails to retrieve per page.")] = 10,
before: Annotated[
datetime | None,
Field(default=None, description="Retrieve emails before this datetime (UTC)."),
] = None,
since: Annotated[
datetime | None,
Field(default=None, description="Retrieve emails since this datetime (UTC)."),
] = None,
subject: Annotated[str | None, Field(default=None, description="Filter emails by subject.")] = None,
from_address: Annotated[str | None, Field(default=None, description="Filter emails by sender address.")] = None,
to_address: Annotated[
str | None,
Field(default=None, description="Filter emails by recipient address."),
] = None,
order: Annotated[
Literal["asc", "desc"],
Field(default=None, description="Order emails by field. `asc` or `desc`."),
] = "desc",
mailbox: Annotated[str, Field(default="INBOX", description="The mailbox to retrieve emails from.")] = "INBOX",
) -> EmailMetadataPageResponse:
handler = dispatch_handler(account_name)
return await handler.get_emails_metadata(
page=page,
page_size=page_size,
before=before,
since=since,
subject=subject,
from_address=from_address,
to_address=to_address,
order=order,
mailbox=mailbox,
)
@mcp.tool(
description="Get the full content (including body) of one or more emails by their email_id. Use list_emails_metadata first to get the email_id."
)
async def get_emails_content(
account_name: Annotated[str, Field(description="The name of the email account.")],
email_ids: Annotated[
list[str],
Field(
description="List of email_id to retrieve (obtained from list_emails_metadata). Can be a single email_id or multiple email_ids."
),
],
mailbox: Annotated[str, Field(default="INBOX", description="The mailbox to retrieve emails from.")] = "INBOX",
) -> EmailContentBatchResponse:
handler = dispatch_handler(account_name)
return await handler.get_emails_content(email_ids, mailbox)
@mcp.tool(
description="Send an email using the specified account. Recipient should be a list of email addresses. Optionally attach files by providing their absolute paths.",
)
async def send_email(
account_name: Annotated[str, Field(description="The name of the email account to send from.")],
recipients: Annotated[list[str], Field(description="A list of recipient email addresses.")],
subject: Annotated[str, Field(description="The subject of the email.")],
body: Annotated[str, Field(description="The body of the email.")],
cc: Annotated[
list[str] | None,
Field(default=None, description="A list of CC email addresses."),
] = None,
bcc: Annotated[
list[str] | None,
Field(default=None, description="A list of BCC email addresses."),
] = None,
html: Annotated[
bool,
Field(default=False, description="Whether to send the email as HTML (True) or plain text (False)."),
] = False,
attachments: Annotated[
list[str] | None,
Field(
default=None,
description="A list of absolute file paths to attach to the email. Supports common file types (documents, images, archives, etc.).",
),
] = None,
) -> str:
handler = dispatch_handler(account_name)
await handler.send_email(recipients, subject, body, cc, bcc, html, attachments)
recipient_str = ", ".join(recipients)
attachment_info = f" with {len(attachments)} attachment(s)" if attachments else ""
return f"Email sent successfully to {recipient_str}{attachment_info}"
@mcp.tool(
description="Delete one or more emails by their email_id. Use list_emails_metadata first to get the email_id."
)
async def delete_emails(
account_name: Annotated[str, Field(description="The name of the email account.")],
email_ids: Annotated[
list[str],
Field(description="List of email_id to delete (obtained from list_emails_metadata)."),
],
mailbox: Annotated[str, Field(default="INBOX", description="The mailbox to delete emails from.")] = "INBOX",
) -> str:
handler = dispatch_handler(account_name)
deleted_ids, failed_ids = await handler.delete_emails(email_ids, mailbox)
result = f"Successfully deleted {len(deleted_ids)} email(s)"
if failed_ids:
result += f", failed to delete {len(failed_ids)} email(s): {', '.join(failed_ids)}"
return result
@mcp.tool(
description="Download an email attachment and save it to the specified path. This feature must be explicitly enabled in settings (enable_attachment_download=true) due to security considerations.",
)
async def download_attachment(
account_name: Annotated[str, Field(description="The name of the email account.")],
email_id: Annotated[
str, Field(description="The email ID (obtained from list_emails_metadata or get_emails_content).")
],
attachment_name: Annotated[
str, Field(description="The name of the attachment to download (as shown in the attachments list).")
],
save_path: Annotated[str, Field(description="The absolute path where the attachment should be saved.")],
) -> AttachmentDownloadResponse:
settings = get_settings()
if not settings.enable_attachment_download:
msg = (
"Attachment download is disabled. Set 'enable_attachment_download=true' in settings to enable this feature."
)
raise PermissionError(msg)
handler = dispatch_handler(account_name)
return await handler.download_attachment(email_id, attachment_name, save_path)