gmail_mcp_server.py•21.8 kB
#!/usr/bin/env python3
"""
Gmail MCP Server
This server provides MCP tools for interacting with Gmail API.
It supports reading emails, sending emails, and searching through Gmail.
"""
import json
import os
import pickle
from typing import Any, Dict, List, Optional
from datetime import datetime
import mcp.types as types
from mcp.server import Server, NotificationOptions
from mcp.server.models import InitializationOptions
from mcp.server.stdio import stdio_server
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
# Gmail API scopes
SCOPES = [
'https://www.googleapis.com/auth/gmail.readonly',
'https://www.googleapis.com/auth/gmail.send',
'https://www.googleapis.com/auth/gmail.modify'
]
class GmailMCPServer:
def __init__(self):
self.server = Server("gmail-mcp-server")
self.service = None
self.credentials_file = "gcp-oauth.keys.json"
self.token_file = "token.pickle"
# Register tools
self._register_tools()
def _register_tools(self):
"""Register all MCP tools"""
@self.server.list_tools()
async def handle_list_tools() -> List[types.Tool]:
return [
types.Tool(
name="authenticate_gmail",
description="Authenticate with Gmail API using OAuth2",
inputSchema={
"type": "object",
"properties": {},
"required": []
}
),
types.Tool(
name="list_emails",
description="List emails from Gmail inbox",
inputSchema={
"type": "object",
"properties": {
"max_results": {
"type": "integer",
"description": "Maximum number of emails to return (default: 10)",
"default": 10
},
"query": {
"type": "string",
"description": "Gmail search query (e.g., 'is:unread', 'from:example@gmail.com')"
}
},
"required": []
}
),
types.Tool(
name="read_email",
description="Read a specific email by ID",
inputSchema={
"type": "object",
"properties": {
"email_id": {
"type": "string",
"description": "The ID of the email to read"
}
},
"required": ["email_id"]
}
),
types.Tool(
name="send_email",
description="Send an email through Gmail",
inputSchema={
"type": "object",
"properties": {
"to": {
"type": "string",
"description": "Recipient email address"
},
"subject": {
"type": "string",
"description": "Email subject"
},
"body": {
"type": "string",
"description": "Email body (plain text)"
},
"cc": {
"type": "string",
"description": "CC email addresses (comma-separated)"
},
"bcc": {
"type": "string",
"description": "BCC email addresses (comma-separated)"
}
},
"required": ["to", "subject", "body"]
}
),
types.Tool(
name="search_emails",
description="Search emails using Gmail search syntax",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Gmail search query (e.g., 'is:unread', 'from:example@gmail.com', 'subject:important')"
},
"max_results": {
"type": "integer",
"description": "Maximum number of results to return (default: 10)",
"default": 10
}
},
"required": ["query"]
}
),
types.Tool(
name="mark_as_read",
description="Mark an email as read",
inputSchema={
"type": "object",
"properties": {
"email_id": {
"type": "string",
"description": "The ID of the email to mark as read"
}
},
"required": ["email_id"]
}
),
types.Tool(
name="mark_as_unread",
description="Mark an email as unread",
inputSchema={
"type": "object",
"properties": {
"email_id": {
"type": "string",
"description": "The ID of the email to mark as unread"
}
},
"required": ["email_id"]
}
)
]
@self.server.call_tool()
async def handle_call_tool(name: str, arguments: Dict[str, Any]) -> List[types.TextContent]:
try:
if name == "authenticate_gmail":
return await self._authenticate_gmail()
elif name == "list_emails":
return await self._list_emails(arguments)
elif name == "read_email":
return await self._read_email(arguments)
elif name == "send_email":
return await self._send_email(arguments)
elif name == "search_emails":
return await self._search_emails(arguments)
elif name == "mark_as_read":
return await self._mark_as_read(arguments)
elif name == "mark_as_unread":
return await self._mark_as_unread(arguments)
else:
return [types.TextContent(
type="text",
text=f"Unknown tool: {name}"
)]
except Exception as e:
return [types.TextContent(
type="text",
text=f"Error executing {name}: {str(e)}"
)]
async def _authenticate_gmail(self) -> List[types.TextContent]:
"""Authenticate with Gmail API"""
try:
creds = None
# Load existing token
if os.path.exists(self.token_file):
with open(self.token_file, 'rb') as token:
creds = pickle.load(token)
# If there are no valid credentials, request authorization
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
self.credentials_file, SCOPES)
creds = flow.run_local_server(port=0)
# Save credentials for next run
with open(self.token_file, 'wb') as token:
pickle.dump(creds, token)
# Build the Gmail service
self.service = build('gmail', 'v1', credentials=creds)
return [types.TextContent(
type="text",
text="Successfully authenticated with Gmail API!"
)]
except Exception as e:
return [types.TextContent(
type="text",
text=f"Authentication failed: {str(e)}"
)]
async def _list_emails(self, arguments: Dict[str, Any]) -> List[types.TextContent]:
"""List emails from Gmail inbox"""
if not self.service:
return [types.TextContent(
type="text",
text="Please authenticate first using authenticate_gmail tool"
)]
try:
max_results = arguments.get('max_results', 10)
query = arguments.get('query', '')
# Get list of messages
results = self.service.users().messages().list(
userId='me',
maxResults=max_results,
q=query
).execute()
messages = results.get('messages', [])
if not messages:
return [types.TextContent(
type="text",
text="No emails found."
)]
email_list = []
for message in messages:
msg = self.service.users().messages().get(
userId='me',
id=message['id'],
format='metadata',
metadataHeaders=['From', 'To', 'Subject', 'Date']
).execute()
headers = msg['payload'].get('headers', [])
email_data = {}
for header in headers:
name = header['name'].lower()
if name in ['from', 'to', 'subject', 'date']:
email_data[name] = header['value']
email_list.append({
'id': message['id'],
'from': email_data.get('from', 'Unknown'),
'to': email_data.get('to', 'Unknown'),
'subject': email_data.get('subject', 'No Subject'),
'date': email_data.get('date', 'Unknown'),
'snippet': msg.get('snippet', '')
})
result_text = "Gmail Inbox:\n\n"
for email in email_list:
result_text += f"ID: {email['id']}\n"
result_text += f"From: {email['from']}\n"
result_text += f"To: {email['to']}\n"
result_text += f"Subject: {email['subject']}\n"
result_text += f"Date: {email['date']}\n"
result_text += f"Snippet: {email['snippet']}\n"
result_text += "-" * 50 + "\n"
return [types.TextContent(type="text", text=result_text)]
except HttpError as error:
return [types.TextContent(
type="text",
text=f"Gmail API error: {error}"
)]
async def _read_email(self, arguments: Dict[str, Any]) -> List[types.TextContent]:
"""Read a specific email by ID"""
if not self.service:
return [types.TextContent(
type="text",
text="Please authenticate first using authenticate_gmail tool"
)]
try:
email_id = arguments['email_id']
# Get the full message
message = self.service.users().messages().get(
userId='me',
id=email_id,
format='full'
).execute()
# Extract headers
headers = message['payload'].get('headers', [])
email_data = {}
for header in headers:
name = header['name'].lower()
if name in ['from', 'to', 'subject', 'date', 'cc', 'bcc']:
email_data[name] = header['value']
# Extract body
body = self._extract_email_body(message['payload'])
result_text = f"Email Details:\n\n"
result_text += f"From: {email_data.get('from', 'Unknown')}\n"
result_text += f"To: {email_data.get('to', 'Unknown')}\n"
if email_data.get('cc'):
result_text += f"CC: {email_data['cc']}\n"
if email_data.get('bcc'):
result_text += f"BCC: {email_data['bcc']}\n"
result_text += f"Subject: {email_data.get('subject', 'No Subject')}\n"
result_text += f"Date: {email_data.get('date', 'Unknown')}\n"
result_text += f"\nBody:\n{body}\n"
return [types.TextContent(type="text", text=result_text)]
except HttpError as error:
return [types.TextContent(
type="text",
text=f"Gmail API error: {error}"
)]
async def _send_email(self, arguments: Dict[str, Any]) -> List[types.TextContent]:
"""Send an email through Gmail"""
if not self.service:
return [types.TextContent(
type="text",
text="Please authenticate first using authenticate_gmail tool"
)]
try:
to = arguments['to']
subject = arguments['subject']
body = arguments['body']
cc = arguments.get('cc', '')
bcc = arguments.get('bcc', '')
# Create email message
message = self._create_message(to, subject, body, cc, bcc)
# Send the message
sent_message = self.service.users().messages().send(
userId='me',
body=message
).execute()
return [types.TextContent(
type="text",
text=f"Email sent successfully! Message ID: {sent_message['id']}"
)]
except HttpError as error:
return [types.TextContent(
type="text",
text=f"Gmail API error: {error}"
)]
async def _search_emails(self, arguments: Dict[str, Any]) -> List[types.TextContent]:
"""Search emails using Gmail search syntax"""
if not self.service:
return [types.TextContent(
type="text",
text="Please authenticate first using authenticate_gmail tool"
)]
try:
query = arguments['query']
max_results = arguments.get('max_results', 10)
# Search for messages
results = self.service.users().messages().list(
userId='me',
q=query,
maxResults=max_results
).execute()
messages = results.get('messages', [])
if not messages:
return [types.TextContent(
type="text",
text=f"No emails found matching query: {query}"
)]
result_text = f"Search Results for '{query}':\n\n"
for message in messages:
msg = self.service.users().messages().get(
userId='me',
id=message['id'],
format='metadata',
metadataHeaders=['From', 'To', 'Subject', 'Date']
).execute()
headers = msg['payload'].get('headers', [])
email_data = {}
for header in headers:
name = header['name'].lower()
if name in ['from', 'to', 'subject', 'date']:
email_data[name] = header['value']
result_text += f"ID: {message['id']}\n"
result_text += f"From: {email_data.get('from', 'Unknown')}\n"
result_text += f"Subject: {email_data.get('subject', 'No Subject')}\n"
result_text += f"Date: {email_data.get('date', 'Unknown')}\n"
result_text += f"Snippet: {msg.get('snippet', '')}\n"
result_text += "-" * 50 + "\n"
return [types.TextContent(type="text", text=result_text)]
except HttpError as error:
return [types.TextContent(
type="text",
text=f"Gmail API error: {error}"
)]
async def _mark_as_read(self, arguments: Dict[str, Any]) -> List[types.TextContent]:
"""Mark an email as read"""
if not self.service:
return [types.TextContent(
type="text",
text="Please authenticate first using authenticate_gmail tool"
)]
try:
email_id = arguments['email_id']
# Remove UNREAD label
self.service.users().messages().modify(
userId='me',
id=email_id,
body={'removeLabelIds': ['UNREAD']}
).execute()
return [types.TextContent(
type="text",
text=f"Email {email_id} marked as read"
)]
except HttpError as error:
return [types.TextContent(
type="text",
text=f"Gmail API error: {error}"
)]
async def _mark_as_unread(self, arguments: Dict[str, Any]) -> List[types.TextContent]:
"""Mark an email as unread"""
if not self.service:
return [types.TextContent(
type="text",
text="Please authenticate first using authenticate_gmail tool"
)]
try:
email_id = arguments['email_id']
# Add UNREAD label
self.service.users().messages().modify(
userId='me',
id=email_id,
body={'addLabelIds': ['UNREAD']}
).execute()
return [types.TextContent(
type="text",
text=f"Email {email_id} marked as unread"
)]
except HttpError as error:
return [types.TextContent(
type="text",
text=f"Gmail API error: {error}"
)]
def _extract_email_body(self, payload: Dict[str, Any]) -> str:
"""Extract email body from payload"""
body = ""
if 'parts' in payload:
for part in payload['parts']:
if part['mimeType'] == 'text/plain':
if 'data' in part['body']:
import base64
body = base64.urlsafe_b64decode(part['body']['data']).decode('utf-8')
break
elif part['mimeType'] == 'text/html':
if 'data' in part['body']:
import base64
body = base64.urlsafe_b64decode(part['body']['data']).decode('utf-8')
break
else:
if payload['mimeType'] == 'text/plain' and 'data' in payload['body']:
import base64
body = base64.urlsafe_b64decode(payload['body']['data']).decode('utf-8')
return body
def _create_message(self, to: str, subject: str, body: str, cc: str = '', bcc: str = '') -> Dict[str, Any]:
"""Create email message"""
import base64
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
message = MIMEText(body)
message['to'] = to
message['subject'] = subject
if cc:
message['cc'] = cc
if bcc:
message['bcc'] = bcc
raw_message = base64.urlsafe_b64encode(message.as_bytes()).decode('utf-8')
return {'raw': raw_message}
async def run(self):
"""Run the MCP server"""
async with stdio_server() as (read_stream, write_stream):
await self.server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="gmail-mcp-server",
server_version="1.0.0",
capabilities=self.server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities=None
)
)
)
if __name__ == "__main__":
import asyncio
server = GmailMCPServer()
asyncio.run(server.run())