MCP Headless Gmail Server
by baryhuang
Verified
- mcp-headless-gmail
- src
- mcp_server_headless_gmail
import logging
from typing import Any, Dict, List, Optional
import os
from dotenv import load_dotenv
from mcp.server.models import InitializationOptions
import mcp.types as types
from mcp.server import NotificationOptions, Server
import mcp.server.stdio
from pydantic import AnyUrl
import json
from datetime import datetime, timedelta
from dateutil.tz import tzlocal
import argparse
import base64
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import google.oauth2.credentials
import google.auth.exceptions
import email
import re
from google.auth.transport.requests import Request
# Configure logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('mcp_server_headless_gmail')
logger.setLevel(logging.DEBUG)
def convert_datetime_fields(obj: Any) -> Any:
"""Convert any datetime or tzlocal objects to string in the given object"""
if isinstance(obj, dict):
return {k: convert_datetime_fields(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [convert_datetime_fields(item) for item in obj]
elif isinstance(obj, datetime):
return obj.isoformat()
elif isinstance(obj, tzlocal):
# Get the current timezone offset
offset = datetime.now(tzlocal()).strftime('%z')
return f"UTC{offset[:3]}:{offset[3:]}" # Format like "UTC+08:00" or "UTC-05:00"
return obj
class GmailClient:
def __init__(self, access_token: Optional[str] = None, refresh_token: Optional[str] = None,
client_id: Optional[str] = None, client_secret: Optional[str] = None):
if not access_token and not refresh_token:
raise ValueError("Either access_token or refresh_token must be provided")
# Create credentials from the provided tokens
self.credentials = google.oauth2.credentials.Credentials(
token=access_token,
refresh_token=refresh_token,
token_uri="https://oauth2.googleapis.com/token",
client_id=client_id,
client_secret=client_secret,
)
# Build the Gmail service if access token is provided
if access_token:
self.service = build('gmail', 'v1', credentials=self.credentials, cache_discovery=False)
def _handle_token_refresh(self, func):
"""Decorator to handle token refresh errors gracefully"""
try:
return func()
except google.auth.exceptions.RefreshError as e:
logger.error(f"Token refresh error: {str(e)}")
return json.dumps({
"error": "Token refresh failed. Please provide new access and refresh tokens.",
"details": str(e)
})
def refresh_token(self, client_id: str, client_secret: str) -> str:
"""Refresh the access token using the refresh token
Args:
client_id: Google OAuth2 client ID
client_secret: Google OAuth2 client secret
"""
if not self.credentials.refresh_token:
return json.dumps({
"error": "No refresh token provided",
"status": "error"
})
try:
# Set client_id and client_secret for refresh
self.credentials._client_id = client_id
self.credentials._client_secret = client_secret
# Force refresh
request = Request()
self.credentials.refresh(request)
# Get token expiration time
expiry = self.credentials.expiry
# Return the new access token and its expiration
return json.dumps({
"access_token": self.credentials.token,
"expires_at": expiry.isoformat() if expiry else None,
"expires_in": int((expiry - datetime.now(expiry.tzinfo)).total_seconds()) if expiry else None,
"status": "success"
})
except google.auth.exceptions.RefreshError as e:
logger.error(f"Token refresh error: {str(e)}")
return json.dumps({
"error": "Token refresh failed. Please provide valid client ID and client secret.",
"details": str(e),
"status": "error"
})
except Exception as e:
logger.error(f"Exception: {str(e)}")
return json.dumps({
"error": str(e),
"status": "error"
})
def extract_plain_text_body(self, msg_payload):
"""Extract plain text body from message payload
Args:
msg_payload: Gmail API message payload
Returns:
tuple: (plain_text_body, body_size_in_bytes)
"""
body_text = ""
body_size = 0
# Helper function to process message parts recursively
def extract_from_parts(parts):
nonlocal body_text, body_size
if not parts:
return
for part in parts:
mime_type = part.get('mimeType', '')
# If this part is plain text
if mime_type == 'text/plain':
body_data = part.get('body', {}).get('data', '')
if body_data:
# Decode base64url encoded data
decoded_bytes = base64.urlsafe_b64decode(body_data)
body_size += len(decoded_bytes)
body_part = decoded_bytes.decode('utf-8', errors='replace')
body_text += body_part
# If this part has child parts, process them
if 'parts' in part:
extract_from_parts(part['parts'])
# If body data is directly in the payload
if 'body' in msg_payload and 'data' in msg_payload['body']:
body_data = msg_payload['body']['data']
if body_data:
decoded_bytes = base64.urlsafe_b64decode(body_data)
body_size += len(decoded_bytes)
body_text = decoded_bytes.decode('utf-8', errors='replace')
# If message has parts, process them
if 'parts' in msg_payload:
extract_from_parts(msg_payload['parts'])
return body_text, body_size
def get_recent_emails(self, max_results: int = 10, unread_only: bool = False) -> str:
"""Get the most recent emails from Gmail
Args:
max_results: Maximum number of emails to return (default: 10)
unread_only: Whether to return only unread emails (default: False)
Returns:
JSON string with an array of emails containing metadata, snippets, and first 1k chars of body
"""
try:
# Check if service is initialized
if not hasattr(self, 'service'):
logger.error("Gmail service not initialized. No valid access token provided.")
return json.dumps({
"error": "No valid access token provided. Please refresh your token first.",
"status": "error"
})
# Define the operation
def _operation():
logger.debug(f"Fetching up to {max_results} recent emails from Gmail")
# Get list of recent messages
query = 'is:unread' if unread_only else ''
logger.debug(f"Calling Gmail API to list messages from INBOX with query: '{query}'")
try:
response = self.service.users().messages().list(
userId='me',
maxResults=max_results,
labelIds=['INBOX'],
q=query
).execute()
logger.debug(f"API Response received: {json.dumps(response)[:200]}...")
except Exception as e:
logger.error(f"Error calling Gmail API list: {str(e)}", exc_info=True)
return json.dumps({"error": f"Gmail API list error: {str(e)}"})
messages = response.get('messages', [])
if not messages:
logger.debug("No messages found in the response")
return json.dumps({"emails": []})
logger.debug(f"Found {len(messages)} messages, processing details")
# Fetch detailed information for each message
emails = []
for i, message in enumerate(messages):
logger.debug(f"Fetching details for message {i+1}/{len(messages)}, ID: {message['id']}")
msg = self.service.users().messages().get(
userId='me',
id=message['id'],
format='full'
).execute()
logger.debug(f"Message {message['id']} details received, extracting fields")
# Extract headers
headers = {}
if 'payload' in msg and 'headers' in msg['payload']:
for header in msg['payload']['headers']:
name = header.get('name', '').lower()
if name in ['from', 'to', 'subject', 'date']:
headers[name] = header.get('value', '')
else:
logger.debug(f"Message {message['id']} missing payload or headers fields: {json.dumps(msg)[:200]}...")
# Extract plain text body and size
body_text = ""
body_size_bytes = 0
contains_full_body = True
if 'payload' in msg:
body_text, body_size_bytes = self.extract_plain_text_body(msg['payload'])
# Check if we're returning the full body or truncating
if len(body_text) > 1000:
body_text = body_text[:1000]
contains_full_body = False
# Format the email
email_data = {
"id": msg['id'],
"threadId": msg['threadId'],
"labelIds": msg.get('labelIds', []),
"snippet": msg.get('snippet', ''),
"from": headers.get('from', ''),
"to": headers.get('to', ''),
"subject": headers.get('subject', ''),
"date": headers.get('date', ''),
"internalDate": msg.get('internalDate', ''),
"body": body_text,
"body_size_bytes": body_size_bytes,
"contains_full_body": contains_full_body
}
logger.debug(f"Successfully processed message {message['id']}")
emails.append(email_data)
logger.debug(f"Successfully processed {len(emails)} emails")
return json.dumps({"emails": convert_datetime_fields(emails)})
# Execute the operation with token refresh handling
return self._handle_token_refresh(_operation)
except HttpError as e:
logger.error(f"Gmail API Exception: {str(e)}")
return json.dumps({"error": str(e)})
except Exception as e:
logger.error(f"Exception in get_recent_emails: {str(e)}", exc_info=True)
return json.dumps({"error": str(e)})
def send_email(self, to: str, subject: str, body: str, html_body: Optional[str] = None) -> str:
"""Send an email via Gmail
Args:
to: Recipient email address
subject: Email subject
body: Plain text email body
html_body: Optional HTML email body
"""
try:
# Check if service is initialized
if not hasattr(self, 'service'):
return json.dumps({
"error": "No valid access token provided. Please refresh your token first.",
"status": "error"
})
# Define the operation
def _operation():
# Create message container
message = MIMEMultipart('alternative')
message['to'] = to
message['subject'] = subject
# Attach plain text and HTML parts
message.attach(MIMEText(body, 'plain'))
if html_body:
message.attach(MIMEText(html_body, 'html'))
# Encode the message
encoded_message = base64.urlsafe_b64encode(message.as_bytes()).decode()
# Create the message body
create_message = {
'raw': encoded_message
}
# Send the message
send_response = self.service.users().messages().send(
userId='me',
body=create_message
).execute()
return json.dumps({
"messageId": send_response['id'],
"threadId": send_response.get('threadId', ''),
"labelIds": send_response.get('labelIds', [])
})
# Execute the operation with token refresh handling
return self._handle_token_refresh(_operation)
except HttpError as e:
logger.error(f"API Exception: {str(e)}")
return json.dumps({"error": str(e)})
except Exception as e:
logger.error(f"Exception: {str(e)}")
return json.dumps({"error": str(e)})
def get_email_body_chunk(self, message_id: str = None, thread_id: str = None, offset: int = 0) -> str:
"""Get a chunk of the email body
Args:
message_id: ID of the message to retrieve
thread_id: ID of the thread to retrieve (will get the first message if multiple exist)
offset: Offset in characters to start from (default: 0)
Returns:
JSON string with the body chunk and metadata
"""
try:
# Check if service is initialized
if not hasattr(self, 'service'):
logger.error("Gmail service not initialized. No valid access token provided.")
return json.dumps({
"error": "No valid access token provided. Please refresh your token first.",
"status": "error"
})
# Define the operation
def _operation():
logger.debug(f"Fetching email body chunk with offset {offset}")
# Store message_id in local variable to make it accessible within _operation scope
local_message_id = message_id
local_thread_id = thread_id
# Validate inputs
if not local_message_id and not local_thread_id:
return json.dumps({
"error": "Either message_id or thread_id must be provided",
"status": "error"
})
try:
# If thread_id is provided but not message_id, get the first message in thread
if local_thread_id and not local_message_id:
logger.debug(f"Getting messages in thread {local_thread_id}")
thread = self.service.users().threads().get(
userId='me',
id=local_thread_id
).execute()
if not thread or 'messages' not in thread or not thread['messages']:
return json.dumps({
"error": f"No messages found in thread {local_thread_id}",
"status": "error"
})
# Use the first message in the thread
local_message_id = thread['messages'][0]['id']
logger.debug(f"Using first message {local_message_id} from thread {local_thread_id}")
# Get the message
logger.debug(f"Getting message {local_message_id}")
msg = self.service.users().messages().get(
userId='me',
id=local_message_id,
format='full'
).execute()
# Extract the full plain text body
body_text = ""
body_size_bytes = 0
if 'payload' in msg:
body_text, body_size_bytes = self.extract_plain_text_body(msg['payload'])
# Apply offset and get chunk
if offset >= len(body_text):
chunk = ""
else:
chunk = body_text[offset:offset+1000]
# Determine if this contains the full remaining body
contains_full_body = (offset + len(chunk) >= len(body_text))
return json.dumps({
"message_id": local_message_id,
"thread_id": msg.get('threadId', ''),
"body": chunk,
"body_size_bytes": body_size_bytes,
"offset": offset,
"chunk_size": len(chunk),
"contains_full_body": contains_full_body,
"status": "success"
})
except Exception as e:
logger.error(f"Error processing message: {str(e)}", exc_info=True)
return json.dumps({
"error": f"Error processing message: {str(e)}",
"status": "error"
})
# Execute the operation with token refresh handling
return self._handle_token_refresh(_operation)
except HttpError as e:
logger.error(f"Gmail API Exception: {str(e)}")
return json.dumps({"error": str(e)})
except Exception as e:
logger.error(f"Exception in get_email_body_chunk: {str(e)}", exc_info=True)
return json.dumps({"error": str(e)})
async def main():
"""Run the Gmail MCP server."""
logger.info("Gmail server starting")
server = Server("gmail-client")
@server.list_resources()
async def handle_list_resources() -> list[types.Resource]:
return []
@server.read_resource()
async def handle_read_resource(uri: AnyUrl) -> str:
if uri.scheme != "gmail":
raise ValueError(f"Unsupported URI scheme: {uri.scheme}")
path = str(uri).replace("gmail://", "")
return ""
@server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
"""List available tools"""
return [
types.Tool(
name="gmail_refresh_token",
description="Refresh the access token using the refresh token and client credentials",
inputSchema={
"type": "object",
"properties": {
"google_access_token": {"type": "string", "description": "Google OAuth2 access token (optional if expired)"},
"google_refresh_token": {"type": "string", "description": "Google OAuth2 refresh token"},
"google_client_id": {"type": "string", "description": "Google OAuth2 client ID for token refresh"},
"google_client_secret": {"type": "string", "description": "Google OAuth2 client secret for token refresh"}
},
"required": ["google_refresh_token", "google_client_id", "google_client_secret"]
},
),
types.Tool(
name="gmail_get_recent_emails",
description="Get the most recent emails from Gmail (returns metadata, snippets, and first 1k chars of body)",
inputSchema={
"type": "object",
"properties": {
"google_access_token": {"type": "string", "description": "Google OAuth2 access token"},
"max_results": {"type": "integer", "description": "Maximum number of emails to return (default: 10)"},
"unread_only": {"type": "boolean", "description": "Whether to return only unread emails (default: False)"}
},
"required": ["google_access_token"]
},
),
types.Tool(
name="gmail_get_email_body_chunk",
description="Get a 1k character chunk of an email body starting from the specified offset",
inputSchema={
"type": "object",
"properties": {
"google_access_token": {"type": "string", "description": "Google OAuth2 access token"},
"message_id": {"type": "string", "description": "ID of the message to retrieve"},
"thread_id": {"type": "string", "description": "ID of the thread to retrieve (will get the first message if multiple exist)"},
"offset": {"type": "integer", "description": "Offset in characters to start from (default: 0)"}
},
"required": ["google_access_token"]
},
),
types.Tool(
name="gmail_send_email",
description="Send an email via Gmail",
inputSchema={
"type": "object",
"properties": {
"google_access_token": {"type": "string", "description": "Google OAuth2 access token"},
"to": {"type": "string", "description": "Recipient email address"},
"subject": {"type": "string", "description": "Email subject"},
"body": {"type": "string", "description": "Email body content (plain text)"},
"html_body": {"type": "string", "description": "Email body content in HTML format (optional)"}
},
"required": ["google_access_token", "to", "subject", "body"]
},
),
]
@server.call_tool()
async def handle_call_tool(
name: str, arguments: dict[str, Any] | None
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
"""Handle tool execution requests"""
try:
if not arguments:
raise ValueError(f"Missing arguments for {name}")
if name == "gmail_refresh_token":
# For refresh token, we need refresh token, client ID and secret
refresh_token = arguments.get("google_refresh_token")
client_id = arguments.get("google_client_id")
client_secret = arguments.get("google_client_secret")
access_token = arguments.get("google_access_token") # Optional for refresh
if not refresh_token:
raise ValueError("google_refresh_token is required for token refresh")
if not client_id or not client_secret:
raise ValueError("Both google_client_id and google_client_secret are required for token refresh")
# Initialize Gmail client for token refresh
gmail = GmailClient(
access_token=access_token,
refresh_token=refresh_token
)
# Call the refresh_token method
results = gmail.refresh_token(client_id=client_id, client_secret=client_secret)
return [types.TextContent(type="text", text=results)]
else:
# For all other tools, we only need access token
access_token = arguments.get("google_access_token")
if not access_token:
raise ValueError("google_access_token is required")
if name == "gmail_get_recent_emails":
# Initialize Gmail client with just access token
logger.debug(f"Initializing Gmail client for get_recent_emails with access token: {access_token[:10]}...")
try:
gmail = GmailClient(
access_token=access_token
)
logger.debug("Gmail client initialized successfully")
max_results = int(arguments.get("max_results", 10))
unread_only = bool(arguments.get("unread_only", False))
logger.debug(f"Calling get_recent_emails with max_results={max_results} and unread_only={unread_only}")
results = gmail.get_recent_emails(max_results=max_results, unread_only=unread_only)
logger.debug(f"get_recent_emails result (first 200 chars): {results[:200]}...")
return [types.TextContent(type="text", text=results)]
except Exception as e:
logger.error(f"Exception in gmail_get_recent_emails handler: {str(e)}", exc_info=True)
return [types.TextContent(type="text", text=f"Error: {str(e)}")]
elif name == "gmail_get_email_body_chunk":
# Initialize Gmail client with just access token
gmail = GmailClient(
access_token=access_token
)
message_id = arguments.get("message_id")
thread_id = arguments.get("thread_id")
offset = int(arguments.get("offset", 0))
if not message_id and not thread_id:
raise ValueError("Either message_id or thread_id must be provided")
results = gmail.get_email_body_chunk(message_id=message_id, thread_id=thread_id, offset=offset)
return [types.TextContent(type="text", text=results)]
elif name == "gmail_send_email":
# Initialize Gmail client with just access token
gmail = GmailClient(
access_token=access_token
)
to = arguments.get("to")
subject = arguments.get("subject")
body = arguments.get("body")
html_body = arguments.get("html_body")
if not to or not subject or not body:
raise ValueError("Missing required parameters: to, subject, and body are required")
results = gmail.send_email(to=to, subject=subject, body=body, html_body=html_body)
return [types.TextContent(type="text", text=results)]
else:
raise ValueError(f"Unknown tool: {name}")
except Exception as e:
return [types.TextContent(type="text", text=f"Error: {str(e)}")]
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
logger.info("Server running with stdio transport")
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="gmail",
server_version="0.1.0",
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)
if __name__ == "__main__":
import asyncio
# Simplified command-line with no OAuth parameters
asyncio.run(main())