#!/usr/bin/env python3
"""
Gmail MCP Server - Model Context Protocol server for Gmail API integration.
Provides tools for email management, labels, threads, attachments, and drafts
using the Gmail API with OAuth 2.0 authentication via OpenBao.
OAuth Setup:
1. Create OAuth 2.0 Client ID in Google Cloud Console
2. Download credentials.json
3. Run initial OAuth flow to get refresh token
4. Store credentials in OpenBao at secret/{client}/{env}-mcp-gmail-{user}
Secret Structure in OpenBao:
client_id: OAuth client ID
client_secret: OAuth client secret
refresh_token: OAuth refresh token from initial flow
"""
import os
import sys
import json
import base64
import warnings
from typing import Optional, List, Dict, Any
from enum import Enum
from email.mime.text import MIMEText
from datetime import datetime
from contextlib import asynccontextmanager
import httpx
from pydantic import BaseModel, Field, field_validator, ConfigDict
from mcp.server.fastmcp import FastMCP, Context
# Import OpenBao secrets utility
from openbao_secrets import get_mcp_config, DeferredCredentialLoader, DEV_MODE
# Constants
CHARACTER_LIMIT = 25000 # Maximum response size
GMAIL_API_BASE = "https://gmail.googleapis.com/gmail/v1"
GMAIL_OAUTH_TOKEN_URL = "https://oauth2.googleapis.com/token"
# Initialize MCP server
mcp = FastMCP("gmail_mcp")
# Deferred credential loading
_gmail_loader = DeferredCredentialLoader(
service="gmail",
key="refresh_token",
dev_fallback="GMAIL_REFRESH_TOKEN"
)
# Global Gmail service configuration (loaded in lifespan)
_gmail_config: Dict[str, Any] = {}
# =============================================================================
# Enums and Pydantic Models
# =============================================================================
class ResponseFormat(str, Enum):
"""Output format for tool responses."""
MARKDOWN = "markdown"
JSON = "json"
class SearchMessagesInput(BaseModel):
"""Input for searching Gmail messages."""
model_config = ConfigDict(str_strip_whitespace=True, validate_assignment=True)
query: str = Field(
...,
description="Gmail search query (e.g., 'from:user@example.com subject:invoice is:unread')",
min_length=1,
max_length=500
)
max_results: int = Field(
default=20,
description="Maximum number of results to return",
ge=1,
le=100
)
page_token: Optional[str] = Field(
default=None,
description="Pagination token from previous search"
)
response_format: ResponseFormat = Field(
default=ResponseFormat.MARKDOWN,
description="Output format: 'markdown' or 'json'"
)
class GetMessageInput(BaseModel):
"""Input for getting a specific message."""
model_config = ConfigDict(str_strip_whitespace=True)
message_id: str = Field(
...,
description="Gmail message ID",
min_length=1
)
response_format: ResponseFormat = Field(
default=ResponseFormat.MARKDOWN,
description="Output format: 'markdown' or 'json'"
)
class SendMessageInput(BaseModel):
"""Input for sending an email."""
model_config = ConfigDict(str_strip_whitespace=True)
to: List[str] = Field(
...,
description="Recipient email addresses",
min_items=1,
max_items=50
)
subject: str = Field(
...,
description="Email subject line",
max_length=500
)
body: str = Field(
...,
description="Email body (plain text)",
max_length=100000
)
cc: Optional[List[str]] = Field(
default=None,
description="CC recipients",
max_items=50
)
bcc: Optional[List[str]] = Field(
default=None,
description="BCC recipients",
max_items=50
)
@field_validator('to', 'cc', 'bcc')
@classmethod
def validate_emails(cls, v: Optional[List[str]]) -> Optional[List[str]]:
if v is None:
return v
for email in v:
if '@' not in email or len(email) < 3:
raise ValueError(f"Invalid email address: {email}")
return v
class MessageIdInput(BaseModel):
"""Input for operations requiring only a message ID."""
model_config = ConfigDict(str_strip_whitespace=True)
message_id: str = Field(
...,
description="Gmail message ID",
min_length=1
)
class ModifyMessageLabelsInput(BaseModel):
"""Input for modifying message labels."""
model_config = ConfigDict(str_strip_whitespace=True)
message_id: str = Field(..., description="Gmail message ID")
add_label_ids: Optional[List[str]] = Field(
default=None,
description="Label IDs to add",
max_items=100
)
remove_label_ids: Optional[List[str]] = Field(
default=None,
description="Label IDs to remove",
max_items=100
)
class ListLabelsInput(BaseModel):
"""Input for listing labels."""
model_config = ConfigDict(str_strip_whitespace=True)
response_format: ResponseFormat = Field(
default=ResponseFormat.MARKDOWN,
description="Output format: 'markdown' or 'json'"
)
class CreateLabelInput(BaseModel):
"""Input for creating a label."""
model_config = ConfigDict(str_strip_whitespace=True)
name: str = Field(
...,
description="Label name",
min_length=1,
max_length=100
)
label_list_visibility: str = Field(
default="labelShow",
description="Visibility in label list: 'labelShow', 'labelShowIfUnread', 'labelHide'"
)
message_list_visibility: str = Field(
default="show",
description="Visibility in message list: 'show', 'hide'"
)
class LabelIdInput(BaseModel):
"""Input for operations requiring a label ID."""
model_config = ConfigDict(str_strip_whitespace=True)
label_id: str = Field(..., description="Gmail label ID")
class ListThreadsInput(BaseModel):
"""Input for listing threads."""
model_config = ConfigDict(str_strip_whitespace=True)
query: Optional[str] = Field(
default=None,
description="Gmail search query to filter threads",
max_length=500
)
max_results: int = Field(
default=20,
description="Maximum number of results",
ge=1,
le=100
)
page_token: Optional[str] = Field(
default=None,
description="Pagination token"
)
response_format: ResponseFormat = Field(
default=ResponseFormat.MARKDOWN,
description="Output format: 'markdown' or 'json'"
)
class GetThreadInput(BaseModel):
"""Input for getting a specific thread."""
model_config = ConfigDict(str_strip_whitespace=True)
thread_id: str = Field(..., description="Gmail thread ID")
response_format: ResponseFormat = Field(
default=ResponseFormat.MARKDOWN,
description="Output format: 'markdown' or 'json'"
)
class ModifyThreadLabelsInput(BaseModel):
"""Input for modifying thread labels."""
model_config = ConfigDict(str_strip_whitespace=True)
thread_id: str = Field(..., description="Gmail thread ID")
add_label_ids: Optional[List[str]] = Field(default=None, max_items=100)
remove_label_ids: Optional[List[str]] = Field(default=None, max_items=100)
class GetAttachmentInput(BaseModel):
"""Input for getting an attachment."""
model_config = ConfigDict(str_strip_whitespace=True)
message_id: str = Field(..., description="Gmail message ID")
attachment_id: str = Field(..., description="Attachment ID")
save_path: Optional[str] = Field(
default=None,
description="Optional file path to save attachment"
)
class ListDraftsInput(BaseModel):
"""Input for listing drafts."""
model_config = ConfigDict(str_strip_whitespace=True)
max_results: int = Field(default=20, ge=1, le=100)
page_token: Optional[str] = Field(default=None)
response_format: ResponseFormat = Field(default=ResponseFormat.MARKDOWN)
class CreateDraftInput(BaseModel):
"""Input for creating a draft."""
model_config = ConfigDict(str_strip_whitespace=True)
to: List[str] = Field(..., min_items=1, max_items=50)
subject: str = Field(..., max_length=500)
body: str = Field(..., max_length=100000)
cc: Optional[List[str]] = Field(default=None, max_items=50)
bcc: Optional[List[str]] = Field(default=None, max_items=50)
class SendDraftInput(BaseModel):
"""Input for sending a draft."""
model_config = ConfigDict(str_strip_whitespace=True)
draft_id: str = Field(..., description="Gmail draft ID")
# =============================================================================
# OAuth & Gmail API Utilities
# =============================================================================
async def _get_access_token() -> str:
"""
Get a fresh access token using the refresh token from OpenBao.
Returns:
Access token string.
Raises:
Exception: If token refresh fails.
"""
async with httpx.AsyncClient() as client:
response = await client.post(
GMAIL_OAUTH_TOKEN_URL,
data={
"client_id": _gmail_config["client_id"],
"client_secret": _gmail_config["client_secret"],
"refresh_token": _gmail_config["refresh_token"],
"grant_type": "refresh_token"
},
timeout=10.0
)
if response.status_code != 200:
raise Exception(f"Failed to refresh OAuth token: {response.status_code} - {response.text}")
data = response.json()
return data["access_token"]
async def _make_gmail_request(
endpoint: str,
method: str = "GET",
params: Optional[Dict[str, Any]] = None,
json_data: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Make an authenticated Gmail API request.
Args:
endpoint: API endpoint (e.g., "users/me/messages")
method: HTTP method
params: Query parameters
json_data: Request body for POST/PUT
Returns:
Response JSON data.
Raises:
httpx.HTTPStatusError: For HTTP errors.
"""
access_token = await _get_access_token()
url = f"{GMAIL_API_BASE}/{endpoint}"
headers = {"Authorization": f"Bearer {access_token}"}
async with httpx.AsyncClient() as client:
response = await client.request(
method,
url,
headers=headers,
params=params,
json=json_data,
timeout=30.0
)
response.raise_for_status()
return response.json()
def _handle_gmail_error(e: Exception) -> str:
"""
Convert Gmail API errors to actionable messages.
Args:
e: Exception from Gmail API call.
Returns:
User-friendly error message.
"""
if isinstance(e, httpx.HTTPStatusError):
status = e.response.status_code
if status == 401:
return "Error: Authentication failed. Please re-authorize the Gmail MCP server."
elif status == 403:
return "Error: Permission denied. This operation requires gmail.modify scope."
elif status == 404:
return "Error: Resource not found. The ID may be invalid."
elif status == 429:
return "Error: Rate limit exceeded. Please wait before retrying."
elif status >= 500:
return "Error: Gmail API unavailable. Please try again later."
return f"Error: Gmail API request failed with status {status}"
elif isinstance(e, httpx.TimeoutException):
return "Error: Request timed out. Please try again."
return f"Error: Unexpected error occurred: {type(e).__name__}"
def _build_message_mime(
to: List[str],
subject: str,
body: str,
cc: Optional[List[str]] = None,
bcc: Optional[List[str]] = None
) -> str:
"""
Build RFC 2822 email message.
Args:
to: Recipient addresses.
subject: Email subject.
body: Plain text body.
cc: CC addresses.
bcc: BCC addresses.
Returns:
Base64url-encoded message string.
"""
message = MIMEText(body)
message['To'] = ', '.join(to)
message['Subject'] = subject
if cc:
message['Cc'] = ', '.join(cc)
if bcc:
message['Bcc'] = ', '.join(bcc)
raw_message = base64.urlsafe_b64encode(message.as_bytes()).decode('utf-8')
return raw_message
def _decode_base64url(data: str) -> str:
"""Decode Gmail's base64url encoding."""
try:
# Add padding if needed
padding = 4 - len(data) % 4
if padding != 4:
data += '=' * padding
return base64.urlsafe_b64decode(data).decode('utf-8', errors='replace')
except Exception:
return data
def _parse_message_body(payload: Dict[str, Any]) -> str:
"""
Extract plain text body from message payload.
Args:
payload: Message payload dict.
Returns:
Plain text body content.
"""
if 'body' in payload and 'data' in payload['body']:
return _decode_base64url(payload['body']['data'])
if 'parts' in payload:
for part in payload['parts']:
if part.get('mimeType') == 'text/plain':
if 'data' in part.get('body', {}):
return _decode_base64url(part['body']['data'])
# Recursive for nested parts
if 'parts' in part:
body = _parse_message_body(part)
if body:
return body
return ""
def _get_header(headers: List[Dict[str, str]], name: str) -> str:
"""Get header value by name."""
for header in headers:
if header['name'].lower() == name.lower():
return header['value']
return ""
def _format_message_markdown(message: Dict[str, Any]) -> str:
"""Format message as Markdown."""
headers = message.get('payload', {}).get('headers', [])
from_addr = _get_header(headers, 'From')
to_addr = _get_header(headers, 'To')
subject = _get_header(headers, 'Subject')
date = _get_header(headers, 'Date')
body = _parse_message_body(message.get('payload', {}))
# Truncate long bodies
if len(body) > 1000:
body = body[:1000] + "\n\n[Body truncated - use get_message for full content]"
labels = message.get('labelIds', [])
output = f"# Message: {subject}\n\n"
output += f"**ID**: {message['id']}\n"
output += f"**From**: {from_addr}\n"
output += f"**To**: {to_addr}\n"
output += f"**Date**: {date}\n"
output += f"**Labels**: {', '.join(labels)}\n\n"
output += f"## Body\n\n{body}\n"
return output
def _format_message_json(message: Dict[str, Any]) -> str:
"""Format message as JSON."""
headers = message.get('payload', {}).get('headers', [])
result = {
"id": message.get('id'),
"threadId": message.get('threadId'),
"labelIds": message.get('labelIds', []),
"snippet": message.get('snippet'),
"from": _get_header(headers, 'From'),
"to": _get_header(headers, 'To'),
"subject": _get_header(headers, 'Subject'),
"date": _get_header(headers, 'Date'),
"body": _parse_message_body(message.get('payload', {}))
}
return json.dumps(result, indent=2)
def _format_thread_markdown(thread: Dict[str, Any]) -> str:
"""Format thread as Markdown."""
messages = thread.get('messages', [])
output = f"# Thread {thread['id']}\n\n"
output += f"**Messages**: {len(messages)}\n\n"
for i, msg in enumerate(messages, 1):
headers = msg.get('payload', {}).get('headers', [])
from_addr = _get_header(headers, 'From')
subject = _get_header(headers, 'Subject')
date = _get_header(headers, 'Date')
output += f"## Message {i}/{len(messages)}\n\n"
output += f"**From**: {from_addr}\n"
output += f"**Subject**: {subject}\n"
output += f"**Date**: {date}\n"
output += f"**ID**: {msg['id']}\n"
output += f"**Snippet**: {msg.get('snippet', '')}\n\n"
return output
# =============================================================================
# MCP Tools - Messages
# =============================================================================
@mcp.tool(
name="gmail_search_messages",
annotations={
"title": "Search Gmail Messages",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False
}
)
async def gmail_search_messages(params: SearchMessagesInput) -> str:
"""Search messages using Gmail query syntax (from:, to:, subject:, is:unread).
Searches across all messages in the mailbox using Gmail's powerful query syntax.
Supports pagination for large result sets.
Args:
params: SearchMessagesInput containing:
- query (str): Gmail search query (e.g., "from:user@example.com subject:invoice is:unread")
- max_results (int): Maximum results to return (1-100, default: 20)
- page_token (Optional[str]): Pagination token from previous search
- response_format (ResponseFormat): Output format ('markdown' or 'json')
Returns:
str: Formatted search results with pagination info.
Examples:
- query="from:boss@company.com" - Find all messages from specific sender
- query="subject:invoice is:unread" - Find unread messages about invoices
- query="has:attachment after:2025/01/01" - Find messages with attachments from 2025
"""
try:
request_params = {
"q": params.query,
"maxResults": params.max_results
}
if params.page_token:
request_params["pageToken"] = params.page_token
result = await _make_gmail_request(
"users/me/messages",
params=request_params
)
messages = result.get('messages', [])
if not messages:
return f"No messages found matching query: {params.query}"
# Get full message details
full_messages = []
for msg in messages[:params.max_results]:
full_msg = await _make_gmail_request(f"users/me/messages/{msg['id']}")
full_messages.append(full_msg)
if params.response_format == ResponseFormat.MARKDOWN:
output = f"# Search Results: {params.query}\n\n"
output += f"Found {result.get('resultSizeEstimate', len(messages))} messages (showing {len(full_messages)})\n\n"
for msg in full_messages:
headers = msg.get('payload', {}).get('headers', [])
from_addr = _get_header(headers, 'From')
subject = _get_header(headers, 'Subject')
date = _get_header(headers, 'Date')
output += f"## {subject}\n"
output += f"- **From**: {from_addr}\n"
output += f"- **Date**: {date}\n"
output += f"- **ID**: {msg['id']}\n"
output += f"- **Snippet**: {msg.get('snippet', '')}\n\n"
if 'nextPageToken' in result:
output += f"\n**More results available** - Use page_token: `{result['nextPageToken']}`\n"
# Check character limit
if len(output) > CHARACTER_LIMIT:
output = output[:CHARACTER_LIMIT] + "\n\n[Results truncated - use pagination or more specific query]"
return output
else:
return json.dumps({
"total": result.get('resultSizeEstimate'),
"count": len(full_messages),
"messages": full_messages,
"nextPageToken": result.get('nextPageToken')
}, indent=2)
except Exception as e:
return _handle_gmail_error(e)
@mcp.tool(
name="gmail_get_message",
annotations={
"title": "Get Full Message Details",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False
}
)
async def gmail_get_message(params: GetMessageInput) -> str:
"""Get full message content including headers, body, and attachments list.
Retrieves complete message details by ID. Use gmail_search_messages to find message IDs.
Args:
params: GetMessageInput containing:
- message_id (str): Gmail message ID
- response_format (ResponseFormat): Output format ('markdown' or 'json')
Returns:
str: Complete message details.
"""
try:
message = await _make_gmail_request(f"users/me/messages/{params.message_id}")
if params.response_format == ResponseFormat.MARKDOWN:
return _format_message_markdown(message)
else:
return _format_message_json(message)
except Exception as e:
return _handle_gmail_error(e)
@mcp.tool(
name="gmail_send_message",
annotations={
"title": "Send Email",
"readOnlyHint": False,
"destructiveHint": False,
"idempotentHint": False,
"openWorldHint": True
}
)
async def gmail_send_message(params: SendMessageInput) -> str:
"""Send email with to/cc/bcc recipients and plain text body.
Composes and sends a new email message. The message is sent immediately.
Args:
params: SendMessageInput containing:
- to (List[str]): Recipient email addresses (1-50)
- subject (str): Email subject line (max 500 chars)
- body (str): Plain text email body (max 100,000 chars)
- cc (Optional[List[str]]): CC recipients (max 50)
- bcc (Optional[List[str]]): BCC recipients (max 50)
Returns:
str: Confirmation with sent message ID.
"""
try:
raw_message = _build_message_mime(
to=params.to,
subject=params.subject,
body=params.body,
cc=params.cc,
bcc=params.bcc
)
result = await _make_gmail_request(
"users/me/messages/send",
method="POST",
json_data={"raw": raw_message}
)
return f"Email sent successfully!\n\nMessage ID: {result['id']}\nThread ID: {result['threadId']}\nTo: {', '.join(params.to)}\nSubject: {params.subject}"
except Exception as e:
return _handle_gmail_error(e)
@mcp.tool(
name="gmail_delete_message",
annotations={
"title": "Delete Message Permanently",
"readOnlyHint": False,
"destructiveHint": True,
"idempotentHint": True,
"openWorldHint": False
}
)
async def gmail_delete_message(params: MessageIdInput) -> str:
"""Permanently delete message (bypass trash).
Immediately and permanently deletes the specified message. This cannot be undone.
Use gmail_trash_message for reversible deletion.
Args:
params: MessageIdInput containing message_id.
Returns:
str: Deletion confirmation.
"""
try:
await _make_gmail_request(
f"users/me/messages/{params.message_id}",
method="DELETE"
)
return f"Message {params.message_id} permanently deleted."
except Exception as e:
return _handle_gmail_error(e)
@mcp.tool(
name="gmail_trash_message",
annotations={
"title": "Move Message to Trash",
"readOnlyHint": False,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False
}
)
async def gmail_trash_message(params: MessageIdInput) -> str:
"""Move message to trash (reversible with gmail_untrash_message).
Moves the message to trash. Can be restored with gmail_untrash_message.
Args:
params: MessageIdInput containing message_id.
Returns:
str: Confirmation message.
"""
try:
result = await _make_gmail_request(
f"users/me/messages/{params.message_id}/trash",
method="POST"
)
return f"Message {params.message_id} moved to trash. Use gmail_untrash_message to restore."
except Exception as e:
return _handle_gmail_error(e)
@mcp.tool(
name="gmail_untrash_message",
annotations={
"title": "Restore Message from Trash",
"readOnlyHint": False,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False
}
)
async def gmail_untrash_message(params: MessageIdInput) -> str:
"""Restore message from trash to inbox.
Removes the message from trash and restores it to the inbox.
Args:
params: MessageIdInput containing message_id.
Returns:
str: Confirmation message.
"""
try:
result = await _make_gmail_request(
f"users/me/messages/{params.message_id}/untrash",
method="POST"
)
return f"Message {params.message_id} restored from trash."
except Exception as e:
return _handle_gmail_error(e)
@mcp.tool(
name="gmail_modify_message_labels",
annotations={
"title": "Modify Message Labels",
"readOnlyHint": False,
"destructiveHint": False,
"idempotentHint": False,
"openWorldHint": False
}
)
async def gmail_modify_message_labels(params: ModifyMessageLabelsInput) -> str:
"""Add or remove labels from message (e.g., mark as read, add custom labels).
Modifies the labels applied to a message. Use gmail_list_labels to find label IDs.
System label IDs: INBOX, SPAM, TRASH, UNREAD, STARRED, IMPORTANT, SENT, DRAFT
Args:
params: ModifyMessageLabelsInput containing:
- message_id (str): Gmail message ID
- add_label_ids (Optional[List[str]]): Label IDs to add
- remove_label_ids (Optional[List[str]]): Label IDs to remove
Returns:
str: Confirmation with updated labels.
Examples:
- add_label_ids=["STARRED"] - Star the message
- remove_label_ids=["UNREAD"] - Mark as read
"""
try:
body: Dict[str, Any] = {}
if params.add_label_ids:
body["addLabelIds"] = params.add_label_ids
if params.remove_label_ids:
body["removeLabelIds"] = params.remove_label_ids
result = await _make_gmail_request(
f"users/me/messages/{params.message_id}/modify",
method="POST",
json_data=body
)
return f"Labels modified for message {params.message_id}.\n\nCurrent labels: {', '.join(result.get('labelIds', []))}"
except Exception as e:
return _handle_gmail_error(e)
# =============================================================================
# MCP Tools - Labels
# =============================================================================
@mcp.tool(
name="gmail_list_labels",
annotations={
"title": "List All Labels",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False
}
)
async def gmail_list_labels(params: ListLabelsInput) -> str:
"""List labels with IDs, names, types (user/system).
Returns all labels in the mailbox, including both system labels (INBOX, SENT, etc.)
and custom user-created labels.
Args:
params: ListLabelsInput containing response_format.
Returns:
str: List of all labels with details.
"""
try:
result = await _make_gmail_request("users/me/labels")
labels = result.get('labels', [])
if params.response_format == ResponseFormat.MARKDOWN:
output = "# Gmail Labels\n\n"
system_labels = [l for l in labels if l.get('type') == 'system']
user_labels = [l for l in labels if l.get('type') == 'user']
output += f"## System Labels ({len(system_labels)})\n\n"
for label in sorted(system_labels, key=lambda x: x['name']):
output += f"- **{label['name']}** (ID: `{label['id']}`)\n"
output += f"\n## User Labels ({len(user_labels)})\n\n"
for label in sorted(user_labels, key=lambda x: x['name']):
output += f"- **{label['name']}** (ID: `{label['id']}`)\n"
return output
else:
return json.dumps({"labels": labels}, indent=2)
except Exception as e:
return _handle_gmail_error(e)
@mcp.tool(
name="gmail_create_label",
annotations={
"title": "Create Custom Label",
"readOnlyHint": False,
"destructiveHint": False,
"idempotentHint": False,
"openWorldHint": False
}
)
async def gmail_create_label(params: CreateLabelInput) -> str:
"""Create new custom label for organizing emails.
Creates a new user label that can be applied to messages for organization.
Args:
params: CreateLabelInput containing:
- name (str): Label name (1-100 chars)
- label_list_visibility (str): 'labelShow', 'labelShowIfUnread', 'labelHide'
- message_list_visibility (str): 'show', 'hide'
Returns:
str: Confirmation with new label ID.
"""
try:
result = await _make_gmail_request(
"users/me/labels",
method="POST",
json_data={
"name": params.name,
"labelListVisibility": params.label_list_visibility,
"messageListVisibility": params.message_list_visibility
}
)
return f"Label '{params.name}' created successfully!\n\nLabel ID: {result['id']}\nUse this ID to apply the label to messages."
except Exception as e:
return _handle_gmail_error(e)
@mcp.tool(
name="gmail_delete_label",
annotations={
"title": "Delete Custom Label",
"readOnlyHint": False,
"destructiveHint": True,
"idempotentHint": True,
"openWorldHint": False
}
)
async def gmail_delete_label(params: LabelIdInput) -> str:
"""Remove label permanently (only custom labels, not system labels).
Deletes a user-created label. Cannot delete system labels like INBOX, SENT, etc.
Args:
params: LabelIdInput containing label_id.
Returns:
str: Deletion confirmation.
"""
try:
await _make_gmail_request(
f"users/me/labels/{params.label_id}",
method="DELETE"
)
return f"Label {params.label_id} deleted successfully."
except Exception as e:
return _handle_gmail_error(e)
# =============================================================================
# MCP Tools - Threads
# =============================================================================
@mcp.tool(
name="gmail_list_threads",
annotations={
"title": "List Conversation Threads",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False
}
)
async def gmail_list_threads(params: ListThreadsInput) -> str:
"""List conversation threads with message count and participants.
Lists conversation threads, optionally filtered by query. Threads group
related messages together (email conversations).
Args:
params: ListThreadsInput containing:
- query (Optional[str]): Gmail search query to filter threads
- max_results (int): Maximum results (1-100, default: 20)
- page_token (Optional[str]): Pagination token
- response_format (ResponseFormat): Output format
Returns:
str: List of threads with details.
"""
try:
request_params = {"maxResults": params.max_results}
if params.query:
request_params["q"] = params.query
if params.page_token:
request_params["pageToken"] = params.page_token
result = await _make_gmail_request(
"users/me/threads",
params=request_params
)
threads = result.get('threads', [])
if not threads:
return "No threads found."
if params.response_format == ResponseFormat.MARKDOWN:
output = "# Gmail Threads\n\n"
output += f"Found {result.get('resultSizeEstimate', len(threads))} threads (showing {len(threads)})\n\n"
for thread in threads:
# Get thread details
full_thread = await _make_gmail_request(f"users/me/threads/{thread['id']}")
messages = full_thread.get('messages', [])
if messages:
first_msg = messages[0]
headers = first_msg.get('payload', {}).get('headers', [])
subject = _get_header(headers, 'Subject')
participants = set()
for msg in messages:
msg_headers = msg.get('payload', {}).get('headers', [])
from_addr = _get_header(msg_headers, 'From')
if from_addr:
participants.add(from_addr)
output += f"## {subject or '(No subject)'}\n"
output += f"- **Thread ID**: {thread['id']}\n"
output += f"- **Messages**: {len(messages)}\n"
output += f"- **Participants**: {', '.join(list(participants)[:3])}\n"
output += f"- **Snippet**: {full_thread.get('snippet', '')}\n\n"
if 'nextPageToken' in result:
output += f"\n**More results available** - Use page_token: `{result['nextPageToken']}`\n"
return output
else:
return json.dumps({
"total": result.get('resultSizeEstimate'),
"count": len(threads),
"threads": threads,
"nextPageToken": result.get('nextPageToken')
}, indent=2)
except Exception as e:
return _handle_gmail_error(e)
@mcp.tool(
name="gmail_get_thread",
annotations={
"title": "Get Full Thread Details",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False
}
)
async def gmail_get_thread(params: GetThreadInput) -> str:
"""Get full thread with all messages in conversation.
Retrieves complete thread details including all messages in the conversation.
Args:
params: GetThreadInput containing:
- thread_id (str): Gmail thread ID
- response_format (ResponseFormat): Output format
Returns:
str: Complete thread with all messages.
"""
try:
thread = await _make_gmail_request(f"users/me/threads/{params.thread_id}")
if params.response_format == ResponseFormat.MARKDOWN:
return _format_thread_markdown(thread)
else:
return json.dumps(thread, indent=2)
except Exception as e:
return _handle_gmail_error(e)
@mcp.tool(
name="gmail_modify_thread_labels",
annotations={
"title": "Modify Thread Labels",
"readOnlyHint": False,
"destructiveHint": False,
"idempotentHint": False,
"openWorldHint": False
}
)
async def gmail_modify_thread_labels(params: ModifyThreadLabelsInput) -> str:
"""Apply or remove labels from entire thread (all messages).
Modifies labels for all messages in a thread. Use gmail_list_labels to find label IDs.
Args:
params: ModifyThreadLabelsInput containing:
- thread_id (str): Gmail thread ID
- add_label_ids (Optional[List[str]]): Label IDs to add
- remove_label_ids (Optional[List[str]]): Label IDs to remove
Returns:
str: Confirmation with updated labels.
"""
try:
body: Dict[str, Any] = {}
if params.add_label_ids:
body["addLabelIds"] = params.add_label_ids
if params.remove_label_ids:
body["removeLabelIds"] = params.remove_label_ids
result = await _make_gmail_request(
f"users/me/threads/{params.thread_id}/modify",
method="POST",
json_data=body
)
return f"Labels modified for thread {params.thread_id}."
except Exception as e:
return _handle_gmail_error(e)
# =============================================================================
# MCP Tools - Attachments
# =============================================================================
@mcp.tool(
name="gmail_get_attachment",
annotations={
"title": "Download Attachment",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False
}
)
async def gmail_get_attachment(params: GetAttachmentInput) -> str:
"""Download attachment data by ID (optionally save to file).
Retrieves attachment data from a message. Use gmail_get_message to find attachment IDs.
Args:
params: GetAttachmentInput containing:
- message_id (str): Gmail message ID
- attachment_id (str): Attachment ID from message
- save_path (Optional[str]): File path to save attachment
Returns:
str: Attachment info and base64 data (or save confirmation).
"""
try:
result = await _make_gmail_request(
f"users/me/messages/{params.message_id}/attachments/{params.attachment_id}"
)
data = result.get('data', '')
size = result.get('size', 0)
if params.save_path:
# Decode and save to file
decoded_data = base64.urlsafe_b64decode(data)
with open(params.save_path, 'wb') as f:
f.write(decoded_data)
return f"Attachment saved to {params.save_path}\n\nSize: {size} bytes"
else:
# Return base64 data (truncated if too large)
if len(data) > 10000:
data_preview = data[:1000] + "...[truncated]"
else:
data_preview = data
return f"Attachment ID: {params.attachment_id}\nSize: {size} bytes\n\nBase64 Data:\n{data_preview}\n\nUse save_path parameter to save to file."
except Exception as e:
return _handle_gmail_error(e)
# =============================================================================
# MCP Tools - Drafts
# =============================================================================
@mcp.tool(
name="gmail_list_drafts",
annotations={
"title": "List Draft Messages",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False
}
)
async def gmail_list_drafts(params: ListDraftsInput) -> str:
"""List draft messages with subject and recipient info.
Returns all draft messages in the mailbox.
Args:
params: ListDraftsInput containing max_results, page_token, response_format.
Returns:
str: List of drafts.
"""
try:
request_params = {"maxResults": params.max_results}
if params.page_token:
request_params["pageToken"] = params.page_token
result = await _make_gmail_request(
"users/me/drafts",
params=request_params
)
drafts = result.get('drafts', [])
if not drafts:
return "No drafts found."
if params.response_format == ResponseFormat.MARKDOWN:
output = f"# Draft Messages ({len(drafts)})\n\n"
for draft in drafts:
msg = draft.get('message', {})
headers = msg.get('payload', {}).get('headers', [])
to_addr = _get_header(headers, 'To')
subject = _get_header(headers, 'Subject')
output += f"## {subject or '(No subject)'}\n"
output += f"- **Draft ID**: {draft['id']}\n"
output += f"- **To**: {to_addr}\n"
output += f"- **Snippet**: {msg.get('snippet', '')}\n\n"
if 'nextPageToken' in result:
output += f"\n**More results** - Use page_token: `{result['nextPageToken']}`\n"
return output
else:
return json.dumps({"drafts": drafts, "nextPageToken": result.get('nextPageToken')}, indent=2)
except Exception as e:
return _handle_gmail_error(e)
@mcp.tool(
name="gmail_create_draft",
annotations={
"title": "Create Draft Message",
"readOnlyHint": False,
"destructiveHint": False,
"idempotentHint": False,
"openWorldHint": False
}
)
async def gmail_create_draft(params: CreateDraftInput) -> str:
"""Create new draft (saved but not sent).
Creates a draft message that can be edited or sent later with gmail_send_draft.
Args:
params: CreateDraftInput containing to, subject, body, cc, bcc.
Returns:
str: Confirmation with draft ID.
"""
try:
raw_message = _build_message_mime(
to=params.to,
subject=params.subject,
body=params.body,
cc=params.cc,
bcc=params.bcc
)
result = await _make_gmail_request(
"users/me/drafts",
method="POST",
json_data={
"message": {"raw": raw_message}
}
)
return f"Draft created successfully!\n\nDraft ID: {result['id']}\nTo: {', '.join(params.to)}\nSubject: {params.subject}\n\nUse gmail_send_draft to send this draft."
except Exception as e:
return _handle_gmail_error(e)
@mcp.tool(
name="gmail_send_draft",
annotations={
"title": "Send Existing Draft",
"readOnlyHint": False,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": True
}
)
async def gmail_send_draft(params: SendDraftInput) -> str:
"""Send existing draft message immediately.
Sends a previously created draft. The draft is deleted after sending.
Args:
params: SendDraftInput containing draft_id.
Returns:
str: Confirmation with sent message ID.
"""
try:
result = await _make_gmail_request(
"users/me/drafts/send",
method="POST",
json_data={"id": params.draft_id}
)
return f"Draft sent successfully!\n\nMessage ID: {result['id']}\nThread ID: {result['threadId']}"
except Exception as e:
return _handle_gmail_error(e)
# =============================================================================
# Server Lifespan Management
# =============================================================================
@asynccontextmanager
async def app_lifespan():
"""
Server lifespan manager - loads Gmail OAuth credentials at startup.
This prevents import-time failures when OpenBao agent isn't running.
"""
global _gmail_config
try:
# Load Gmail OAuth credentials from OpenBao
_gmail_config = get_mcp_config(
"gmail",
dev_fallbacks={
"client_id": "GMAIL_CLIENT_ID",
"client_secret": "GMAIL_CLIENT_SECRET",
"refresh_token": "GMAIL_REFRESH_TOKEN"
}
)
# Validate required fields
required_fields = ["client_id", "client_secret", "refresh_token"]
missing = [f for f in required_fields if f not in _gmail_config]
if missing:
raise ValueError(
f"Missing required Gmail OAuth credentials in OpenBao: {', '.join(missing)}\n\n"
f"Required secret fields at secret/{{client}}/{{env}}-mcp-gmail-{{user}}:\n"
f" - client_id: OAuth client ID from Google Cloud Console\n"
f" - client_secret: OAuth client secret\n"
f" - refresh_token: OAuth refresh token from initial flow\n\n"
f"See README.md for OAuth setup instructions."
)
print("MCP_SERVER_READY", file=sys.stderr)
yield
except Exception as e:
print(f"OPENBAO_ERROR: {str(e)}", file=sys.stderr)
raise
# Configure lifespan
mcp._mcp.lifespan = app_lifespan
# =============================================================================
# Entry Point
# =============================================================================
if __name__ == "__main__":
mcp.run()