Gmail MCP Server
by jasonsum
- src
- gmail
from typing import Any
import argparse
import os
import asyncio
import logging
import base64
from email.message import EmailMessage
from email.header import decode_header
from base64 import urlsafe_b64decode
from email import message_from_bytes
import webbrowser
from mcp.server.models import InitializationOptions
import mcp.types as types
from mcp.server import NotificationOptions, Server
import mcp.server.stdio
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
EMAIL_ADMIN_PROMPTS = """You are an email administrator.
You can draft, edit, read, trash, open, and send emails.
You've been given access to a specific gmail account.
You have the following tools available:
- Send an email (send-email)
- Retrieve unread emails (get-unread-emails)
- Read email content (read-email)
- Trash email (tras-email)
- Open email in browser (open-email)
Never send an email draft or trash an email unless the user confirms first.
Always ask for approval if not already given.
"""
# Define available prompts
PROMPTS = {
"manage-email": types.Prompt(
name="manage-email",
description="Act like an email administator",
arguments=None,
),
"draft-email": types.Prompt(
name="draft-email",
description="Draft an email with cotent and recipient",
arguments=[
types.PromptArgument(
name="content",
description="What the email is about",
required=True
),
types.PromptArgument(
name="recipient",
description="Who should the email be addressed to",
required=True
),
types.PromptArgument(
name="recipient_email",
description="Recipient's email address",
required=True
),
],
),
"edit-draft": types.Prompt(
name="edit-draft",
description="Edit the existing email draft",
arguments=[
types.PromptArgument(
name="changes",
description="What changes should be made to the draft",
required=True
),
types.PromptArgument(
name="current_draft",
description="The current draft to edit",
required=True
),
],
),
}
def decode_mime_header(header: str) -> str:
"""Helper function to decode encoded email headers"""
decoded_parts = decode_header(header)
decoded_string = ''
for part, encoding in decoded_parts:
if isinstance(part, bytes):
# Decode bytes to string using the specified encoding
decoded_string += part.decode(encoding or 'utf-8')
else:
# Already a string
decoded_string += part
return decoded_string
class GmailService:
def __init__(self,
creds_file_path: str,
token_path: str,
scopes: list[str] = ['https://www.googleapis.com/auth/gmail.modify']):
logger.info(f"Initializing GmailService with creds file: {creds_file_path}")
self.creds_file_path = creds_file_path
self.token_path = token_path
self.scopes = scopes
self.token = self._get_token()
logger.info("Token retrieved successfully")
self.service = self._get_service()
logger.info("Gmail service initialized")
self.user_email = self._get_user_email()
logger.info(f"User email retrieved: {self.user_email}")
def _get_token(self) -> Credentials:
"""Get or refresh Google API token"""
token = None
if os.path.exists(self.token_path):
logger.info('Loading token from file')
token = Credentials.from_authorized_user_file(self.token_path, self.scopes)
if not token or not token.valid:
if token and token.expired and token.refresh_token:
logger.info('Refreshing token')
token.refresh(Request())
else:
logger.info('Fetching new token')
flow = InstalledAppFlow.from_client_secrets_file(self.creds_file_path, self.scopes)
token = flow.run_local_server(port=0)
with open(self.token_path, 'w') as token_file:
token_file.write(token.to_json())
logger.info(f'Token saved to {self.token_path}')
return token
def _get_service(self) -> Any:
"""Initialize Gmail API service"""
try:
service = build('gmail', 'v1', credentials=self.token)
return service
except HttpError as error:
logger.error(f'An error occurred building Gmail service: {error}')
raise ValueError(f'An error occurred: {error}')
def _get_user_email(self) -> str:
"""Get user email address"""
profile = self.service.users().getProfile(userId='me').execute()
user_email = profile.get('emailAddress', '')
return user_email
async def send_email(self, recipient_id: str, subject: str, message: str,) -> dict:
"""Creates and sends an email message"""
try:
message_obj = EmailMessage()
message_obj.set_content(message)
message_obj['To'] = recipient_id
message_obj['From'] = self.user_email
message_obj['Subject'] = subject
encoded_message = base64.urlsafe_b64encode(message_obj.as_bytes()).decode()
create_message = {'raw': encoded_message}
send_message = await asyncio.to_thread(
self.service.users().messages().send(userId="me", body=create_message).execute
)
logger.info(f"Message sent: {send_message['id']}")
return {"status": "success", "message_id": send_message["id"]}
except HttpError as error:
return {"status": "error", "error_message": str(error)}
async def open_email(self, email_id: str) -> str:
"""Opens email in browser given ID."""
try:
url = f"https://mail.google.com/#all/{email_id}"
webbrowser.open(url, new=0, autoraise=True)
return "Email opened in browser successfully."
except HttpError as error:
return f"An HttpError occurred: {str(error)}"
async def get_unread_emails(self) -> list[dict[str, str]]| str:
"""
Retrieves unread messages from mailbox.
Returns list of messsage IDs in key 'id'."""
try:
user_id = 'me'
query = 'in:inbox is:unread category:primary'
response = self.service.users().messages().list(userId=user_id,
q=query).execute()
messages = []
if 'messages' in response:
messages.extend(response['messages'])
while 'nextPageToken' in response:
page_token = response['nextPageToken']
response = self.service.users().messages().list(userId=user_id, q=query,
pageToken=page_token).execute()
messages.extend(response['messages'])
return messages
except HttpError as error:
return f"An HttpError occurred: {str(error)}"
async def read_email(self, email_id: str) -> dict[str, str]| str:
"""Retrieves email contents including to, from, subject, and contents."""
try:
msg = self.service.users().messages().get(userId="me", id=email_id, format='raw').execute()
email_metadata = {}
# Decode the base64URL encoded raw content
raw_data = msg['raw']
decoded_data = urlsafe_b64decode(raw_data)
# Parse the RFC 2822 email
mime_message = message_from_bytes(decoded_data)
# Extract the email body
body = None
if mime_message.is_multipart():
for part in mime_message.walk():
# Extract the text/plain part
if part.get_content_type() == "text/plain":
body = part.get_payload(decode=True).decode()
break
else:
# For non-multipart messages
body = mime_message.get_payload(decode=True).decode()
email_metadata['content'] = body
# Extract metadata
email_metadata['subject'] = decode_mime_header(mime_message.get('subject', ''))
email_metadata['from'] = mime_message.get('from','')
email_metadata['to'] = mime_message.get('to','')
email_metadata['date'] = mime_message.get('date','')
logger.info(f"Email read: {email_id}")
# We want to mark email as read once we read it
await self.mark_email_as_read(email_id)
return email_metadata
except HttpError as error:
return f"An HttpError occurred: {str(error)}"
async def trash_email(self, email_id: str) -> str:
"""Moves email to trash given ID."""
try:
self.service.users().messages().trash(userId="me", id=email_id).execute()
logger.info(f"Email moved to trash: {email_id}")
return "Email moved to trash successfully."
except HttpError as error:
return f"An HttpError occurred: {str(error)}"
async def mark_email_as_read(self, email_id: str) -> str:
"""Marks email as read given ID."""
try:
self.service.users().messages().modify(userId="me", id=email_id, body={'removeLabelIds': ['UNREAD']}).execute()
logger.info(f"Email marked as read: {email_id}")
return "Email marked as read."
except HttpError as error:
return f"An HttpError occurred: {str(error)}"
async def main(creds_file_path: str,
token_path: str):
gmail_service = GmailService(creds_file_path, token_path)
server = Server("gmail")
@server.list_prompts()
async def list_prompts() -> list[types.Prompt]:
return list(PROMPTS.values())
@server.get_prompt()
async def get_prompt(
name: str, arguments: dict[str, str] | None = None
) -> types.GetPromptResult:
if name not in PROMPTS:
raise ValueError(f"Prompt not found: {name}")
if name == "manage-email":
return types.GetPromptResult(
messages=[
types.PromptMessage(
role="user",
content=types.TextContent(
type="text",
text=EMAIL_ADMIN_PROMPTS,
)
)
]
)
if name == "draft-email":
content = arguments.get("content", "")
recipient = arguments.get("recipient", "")
recipient_email = arguments.get("recipient_email", "")
# First message asks the LLM to create the draft
return types.GetPromptResult(
messages=[
types.PromptMessage(
role="user",
content=types.TextContent(
type="text",
text=f"""Please draft an email about {content} for {recipient} ({recipient_email}).
Include a subject line starting with 'Subject:' on the first line.
Do not send the email yet, just draft it and ask the user for their thoughts."""
)
)
]
)
elif name == "edit-draft":
changes = arguments.get("changes", "")
current_draft = arguments.get("current_draft", "")
# Edit existing draft based on requested changes
return types.GetPromptResult(
messages=[
types.PromptMessage(
role="user",
content=types.TextContent(
type="text",
text=f"""Please revise the current email draft:
{current_draft}
Requested changes:
{changes}
Please provide the updated draft."""
)
)
]
)
raise ValueError("Prompt implementation not found")
@server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
return [
types.Tool(
name="send-email",
description="""Sends email to recipient.
Do not use if user only asked to draft email.
Drafts must be approved before sending.""",
inputSchema={
"type": "object",
"properties": {
"recipient_id": {
"type": "string",
"description": "Recipient email address",
},
"subject": {
"type": "string",
"description": "Email subject",
},
"message": {
"type": "string",
"description": "Email content text",
},
},
"required": ["recipient_id", "subject", "message"],
},
),
types.Tool(
name="trash-email",
description="""Moves email to trash.
Confirm before moving email to trash.""",
inputSchema={
"type": "object",
"properties": {
"email_id": {
"type": "string",
"description": "Email ID",
},
},
"required": ["email_id"],
},
),
types.Tool(
name="get-unread-emails",
description="Retrieve unread emails",
inputSchema={
"type": "object",
"properties": {},
"required": []
},
),
types.Tool(
name="read-email",
description="Retrieves given email content",
inputSchema={
"type": "object",
"properties": {
"email_id": {
"type": "string",
"description": "Email ID",
},
},
"required": ["email_id"],
},
),
types.Tool(
name="mark-email-as-read",
description="Marks given email as read",
inputSchema={
"type": "object",
"properties": {
"email_id": {
"type": "string",
"description": "Email ID",
},
},
"required": ["email_id"],
},
),
types.Tool(
name="open-email",
description="Open email in browser",
inputSchema={
"type": "object",
"properties": {
"email_id": {
"type": "string",
"description": "Email ID",
},
},
"required": ["email_id"],
},
),
]
@server.call_tool()
async def handle_call_tool(
name: str, arguments: dict | None
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
if name == "send-email":
recipient = arguments.get("recipient_id")
if not recipient:
raise ValueError("Missing recipient parameter")
subject = arguments.get("subject")
if not subject:
raise ValueError("Missing subject parameter")
message = arguments.get("message")
if not message:
raise ValueError("Missing message parameter")
# Extract subject and message content
email_lines = message.split('\n')
if email_lines[0].startswith('Subject:'):
subject = email_lines[0][8:].strip()
message_content = '\n'.join(email_lines[1:]).strip()
else:
message_content = message
send_response = await gmail_service.send_email(recipient, subject, message_content)
if send_response["status"] == "success":
response_text = f"Email sent successfully. Message ID: {send_response['message_id']}"
else:
response_text = f"Failed to send email: {send_response['error_message']}"
return [types.TextContent(type="text", text=response_text)]
if name == "get-unread-emails":
unread_emails = await gmail_service.get_unread_emails()
return [types.TextContent(type="text", text=str(unread_emails),artifact={"type": "json", "data": unread_emails} )]
if name == "read-email":
email_id = arguments.get("email_id")
if not email_id:
raise ValueError("Missing email ID parameter")
retrieved_email = await gmail_service.read_email(email_id)
return [types.TextContent(type="text", text=str(retrieved_email),artifact={"type": "dictionary", "data": retrieved_email} )]
if name == "open-email":
email_id = arguments.get("email_id")
if not email_id:
raise ValueError("Missing email ID parameter")
msg = await gmail_service.open_email(email_id)
return [types.TextContent(type="text", text=str(msg))]
if name == "trash-email":
email_id = arguments.get("email_id")
if not email_id:
raise ValueError("Missing email ID parameter")
msg = await gmail_service.trash_email(email_id)
return [types.TextContent(type="text", text=str(msg))]
if name == "mark-email-as-read":
email_id = arguments.get("email_id")
if not email_id:
raise ValueError("Missing email ID parameter")
msg = await gmail_service.mark_email_as_read(email_id)
return [types.TextContent(type="text", text=str(msg))]
else:
logger.error(f"Unknown tool: {name}")
raise ValueError(f"Unknown tool: {name}")
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="gmail",
server_version="0.1.0",
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Gmail API MCP Server')
parser.add_argument('--creds-file-path',
required=True,
help='OAuth 2.0 credentials file path')
parser.add_argument('--token-path',
required=True,
help='File location to store and retrieve access and refresh tokens for application')
args = parser.parse_args()
asyncio.run(main(args.creds_file_path, args.token_path))