import base64
import json
import logging
from email.message import EmailMessage
from email.utils import parseaddr
from mcp.server.fastmcp import FastMCP
from mcp.types import TextContent
from mcp.shared.exceptions import McpError
from .gmail_oauth import gmail_service
mcp = FastMCP(name="mcp-server-gmail")
def get_email_data_helper(service: dict, message_id: str) -> dict:
message = service.users().messages().get(userId="me", id=message_id).execute()
# Find headers by name instead of assuming order
headers = {h["name"]: h["value"] for h in message["payload"]["headers"]}
return {
"sender": headers.get("From", "Unknown"),
"subject": headers.get("Subject", "(No Subject)"),
"body_snippet": message.get("snippet", ""),
"email_id": message_id
}
@mcp.tool()
def get_unread_emails() -> list[TextContent]:
"""Get a list of unread emails"""
try:
service = gmail_service()
results = (
service.users().messages().list(
userId="me",
labelIds=["INBOX", "UNREAD"] # Filter for unread emails only
).execute()
)
messages = results.get("messages", [])
if not messages:
logging.info("No unread messages found.")
return [TextContent(type="text", text="[]")] # Return empty JSON array instead of message
email_data_list = []
for message in messages:
email_data = get_email_data_helper(service, message["id"])
email_data_list.append(email_data)
return [TextContent(type="text", text=json.dumps(email_data_list, indent=4))]
except Exception as e:
logging.error(f"Error getting unread emails: {e}", exc_info=True)
raise McpError(f"Error getting unread emails: {e}")
@mcp.tool()
def create_draft_reply(message_content: str, message_id: str) -> list[TextContent]:
"""Create a draft reply to an email"""
try:
# create gmail api client
service = gmail_service()
# Get the original message
original_message = service.users().messages().get(
userId="me",
id=message_id,
format="full"
).execute()
# Extract headers from original message
headers = {h["name"]: h["value"] for h in original_message["payload"]["headers"]}
# Get the user's email address (you may need to get this from the profile)
# For now, using "me" as userId
profile = service.users().getProfile(userId="me").execute()
user_email = profile["emailAddress"]
# Extract original sender and subject
original_from = headers.get("From", "")
original_subject = headers.get("Subject", "")
original_message_id = headers.get("Message-ID", "")
# Create reply subject (add "Re: " if not already present)
reply_subject = original_subject
if not reply_subject.startswith("Re: "):
reply_subject = f"Re: {reply_subject}"
# Parse the original sender's email address
_, original_sender_email = parseaddr(original_from)
reply_message = EmailMessage()
reply_message.set_content(message_content)
reply_message["To"] = original_sender_email
reply_message["From"] = user_email
reply_message["Subject"] = reply_subject
# encoded message
encoded_message = base64.urlsafe_b64encode(reply_message.as_bytes()).decode()
create_message = {"message": {"raw": encoded_message}}
# pylint: disable=E1101
draft = (
service.users()
.drafts()
.create(userId="me", body=create_message)
.execute()
)
return [TextContent(type="text", text=f'Draft id: {draft["id"]}\nDraft message: {draft["message"]}')]
except Exception as e:
logging.error(f"Error creating draft reply: {e}", exc_info=True)
raise McpError(f"Error creating draft reply: {e}")