Skip to main content
Glama

Gmail MCP Server

by nvdpsingh
gmail_mcp_server.py21.8 kB
#!/usr/bin/env python3 """ Gmail MCP Server This server provides MCP tools for interacting with Gmail API. It supports reading emails, sending emails, and searching through Gmail. """ import json import os import pickle from typing import Any, Dict, List, Optional from datetime import datetime import mcp.types as types from mcp.server import Server, NotificationOptions from mcp.server.models import InitializationOptions from mcp.server.stdio import stdio_server from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from googleapiclient.discovery import build from googleapiclient.errors import HttpError # Gmail API scopes SCOPES = [ 'https://www.googleapis.com/auth/gmail.readonly', 'https://www.googleapis.com/auth/gmail.send', 'https://www.googleapis.com/auth/gmail.modify' ] class GmailMCPServer: def __init__(self): self.server = Server("gmail-mcp-server") self.service = None self.credentials_file = "gcp-oauth.keys.json" self.token_file = "token.pickle" # Register tools self._register_tools() def _register_tools(self): """Register all MCP tools""" @self.server.list_tools() async def handle_list_tools() -> List[types.Tool]: return [ types.Tool( name="authenticate_gmail", description="Authenticate with Gmail API using OAuth2", inputSchema={ "type": "object", "properties": {}, "required": [] } ), types.Tool( name="list_emails", description="List emails from Gmail inbox", inputSchema={ "type": "object", "properties": { "max_results": { "type": "integer", "description": "Maximum number of emails to return (default: 10)", "default": 10 }, "query": { "type": "string", "description": "Gmail search query (e.g., 'is:unread', 'from:example@gmail.com')" } }, "required": [] } ), types.Tool( name="read_email", description="Read a specific email by ID", inputSchema={ "type": "object", "properties": { "email_id": { "type": "string", "description": "The ID of the email to read" } }, "required": ["email_id"] } ), types.Tool( name="send_email", description="Send an email through Gmail", inputSchema={ "type": "object", "properties": { "to": { "type": "string", "description": "Recipient email address" }, "subject": { "type": "string", "description": "Email subject" }, "body": { "type": "string", "description": "Email body (plain text)" }, "cc": { "type": "string", "description": "CC email addresses (comma-separated)" }, "bcc": { "type": "string", "description": "BCC email addresses (comma-separated)" } }, "required": ["to", "subject", "body"] } ), types.Tool( name="search_emails", description="Search emails using Gmail search syntax", inputSchema={ "type": "object", "properties": { "query": { "type": "string", "description": "Gmail search query (e.g., 'is:unread', 'from:example@gmail.com', 'subject:important')" }, "max_results": { "type": "integer", "description": "Maximum number of results to return (default: 10)", "default": 10 } }, "required": ["query"] } ), types.Tool( name="mark_as_read", description="Mark an email as read", inputSchema={ "type": "object", "properties": { "email_id": { "type": "string", "description": "The ID of the email to mark as read" } }, "required": ["email_id"] } ), types.Tool( name="mark_as_unread", description="Mark an email as unread", inputSchema={ "type": "object", "properties": { "email_id": { "type": "string", "description": "The ID of the email to mark as unread" } }, "required": ["email_id"] } ) ] @self.server.call_tool() async def handle_call_tool(name: str, arguments: Dict[str, Any]) -> List[types.TextContent]: try: if name == "authenticate_gmail": return await self._authenticate_gmail() elif name == "list_emails": return await self._list_emails(arguments) elif name == "read_email": return await self._read_email(arguments) elif name == "send_email": return await self._send_email(arguments) elif name == "search_emails": return await self._search_emails(arguments) elif name == "mark_as_read": return await self._mark_as_read(arguments) elif name == "mark_as_unread": return await self._mark_as_unread(arguments) else: return [types.TextContent( type="text", text=f"Unknown tool: {name}" )] except Exception as e: return [types.TextContent( type="text", text=f"Error executing {name}: {str(e)}" )] async def _authenticate_gmail(self) -> List[types.TextContent]: """Authenticate with Gmail API""" try: creds = None # Load existing token if os.path.exists(self.token_file): with open(self.token_file, 'rb') as token: creds = pickle.load(token) # If there are no valid credentials, request authorization if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else: flow = InstalledAppFlow.from_client_secrets_file( self.credentials_file, SCOPES) creds = flow.run_local_server(port=0) # Save credentials for next run with open(self.token_file, 'wb') as token: pickle.dump(creds, token) # Build the Gmail service self.service = build('gmail', 'v1', credentials=creds) return [types.TextContent( type="text", text="Successfully authenticated with Gmail API!" )] except Exception as e: return [types.TextContent( type="text", text=f"Authentication failed: {str(e)}" )] async def _list_emails(self, arguments: Dict[str, Any]) -> List[types.TextContent]: """List emails from Gmail inbox""" if not self.service: return [types.TextContent( type="text", text="Please authenticate first using authenticate_gmail tool" )] try: max_results = arguments.get('max_results', 10) query = arguments.get('query', '') # Get list of messages results = self.service.users().messages().list( userId='me', maxResults=max_results, q=query ).execute() messages = results.get('messages', []) if not messages: return [types.TextContent( type="text", text="No emails found." )] email_list = [] for message in messages: msg = self.service.users().messages().get( userId='me', id=message['id'], format='metadata', metadataHeaders=['From', 'To', 'Subject', 'Date'] ).execute() headers = msg['payload'].get('headers', []) email_data = {} for header in headers: name = header['name'].lower() if name in ['from', 'to', 'subject', 'date']: email_data[name] = header['value'] email_list.append({ 'id': message['id'], 'from': email_data.get('from', 'Unknown'), 'to': email_data.get('to', 'Unknown'), 'subject': email_data.get('subject', 'No Subject'), 'date': email_data.get('date', 'Unknown'), 'snippet': msg.get('snippet', '') }) result_text = "Gmail Inbox:\n\n" for email in email_list: result_text += f"ID: {email['id']}\n" result_text += f"From: {email['from']}\n" result_text += f"To: {email['to']}\n" result_text += f"Subject: {email['subject']}\n" result_text += f"Date: {email['date']}\n" result_text += f"Snippet: {email['snippet']}\n" result_text += "-" * 50 + "\n" return [types.TextContent(type="text", text=result_text)] except HttpError as error: return [types.TextContent( type="text", text=f"Gmail API error: {error}" )] async def _read_email(self, arguments: Dict[str, Any]) -> List[types.TextContent]: """Read a specific email by ID""" if not self.service: return [types.TextContent( type="text", text="Please authenticate first using authenticate_gmail tool" )] try: email_id = arguments['email_id'] # Get the full message message = self.service.users().messages().get( userId='me', id=email_id, format='full' ).execute() # Extract headers headers = message['payload'].get('headers', []) email_data = {} for header in headers: name = header['name'].lower() if name in ['from', 'to', 'subject', 'date', 'cc', 'bcc']: email_data[name] = header['value'] # Extract body body = self._extract_email_body(message['payload']) result_text = f"Email Details:\n\n" result_text += f"From: {email_data.get('from', 'Unknown')}\n" result_text += f"To: {email_data.get('to', 'Unknown')}\n" if email_data.get('cc'): result_text += f"CC: {email_data['cc']}\n" if email_data.get('bcc'): result_text += f"BCC: {email_data['bcc']}\n" result_text += f"Subject: {email_data.get('subject', 'No Subject')}\n" result_text += f"Date: {email_data.get('date', 'Unknown')}\n" result_text += f"\nBody:\n{body}\n" return [types.TextContent(type="text", text=result_text)] except HttpError as error: return [types.TextContent( type="text", text=f"Gmail API error: {error}" )] async def _send_email(self, arguments: Dict[str, Any]) -> List[types.TextContent]: """Send an email through Gmail""" if not self.service: return [types.TextContent( type="text", text="Please authenticate first using authenticate_gmail tool" )] try: to = arguments['to'] subject = arguments['subject'] body = arguments['body'] cc = arguments.get('cc', '') bcc = arguments.get('bcc', '') # Create email message message = self._create_message(to, subject, body, cc, bcc) # Send the message sent_message = self.service.users().messages().send( userId='me', body=message ).execute() return [types.TextContent( type="text", text=f"Email sent successfully! Message ID: {sent_message['id']}" )] except HttpError as error: return [types.TextContent( type="text", text=f"Gmail API error: {error}" )] async def _search_emails(self, arguments: Dict[str, Any]) -> List[types.TextContent]: """Search emails using Gmail search syntax""" if not self.service: return [types.TextContent( type="text", text="Please authenticate first using authenticate_gmail tool" )] try: query = arguments['query'] max_results = arguments.get('max_results', 10) # Search for messages results = self.service.users().messages().list( userId='me', q=query, maxResults=max_results ).execute() messages = results.get('messages', []) if not messages: return [types.TextContent( type="text", text=f"No emails found matching query: {query}" )] result_text = f"Search Results for '{query}':\n\n" for message in messages: msg = self.service.users().messages().get( userId='me', id=message['id'], format='metadata', metadataHeaders=['From', 'To', 'Subject', 'Date'] ).execute() headers = msg['payload'].get('headers', []) email_data = {} for header in headers: name = header['name'].lower() if name in ['from', 'to', 'subject', 'date']: email_data[name] = header['value'] result_text += f"ID: {message['id']}\n" result_text += f"From: {email_data.get('from', 'Unknown')}\n" result_text += f"Subject: {email_data.get('subject', 'No Subject')}\n" result_text += f"Date: {email_data.get('date', 'Unknown')}\n" result_text += f"Snippet: {msg.get('snippet', '')}\n" result_text += "-" * 50 + "\n" return [types.TextContent(type="text", text=result_text)] except HttpError as error: return [types.TextContent( type="text", text=f"Gmail API error: {error}" )] async def _mark_as_read(self, arguments: Dict[str, Any]) -> List[types.TextContent]: """Mark an email as read""" if not self.service: return [types.TextContent( type="text", text="Please authenticate first using authenticate_gmail tool" )] try: email_id = arguments['email_id'] # Remove UNREAD label self.service.users().messages().modify( userId='me', id=email_id, body={'removeLabelIds': ['UNREAD']} ).execute() return [types.TextContent( type="text", text=f"Email {email_id} marked as read" )] except HttpError as error: return [types.TextContent( type="text", text=f"Gmail API error: {error}" )] async def _mark_as_unread(self, arguments: Dict[str, Any]) -> List[types.TextContent]: """Mark an email as unread""" if not self.service: return [types.TextContent( type="text", text="Please authenticate first using authenticate_gmail tool" )] try: email_id = arguments['email_id'] # Add UNREAD label self.service.users().messages().modify( userId='me', id=email_id, body={'addLabelIds': ['UNREAD']} ).execute() return [types.TextContent( type="text", text=f"Email {email_id} marked as unread" )] except HttpError as error: return [types.TextContent( type="text", text=f"Gmail API error: {error}" )] def _extract_email_body(self, payload: Dict[str, Any]) -> str: """Extract email body from payload""" body = "" if 'parts' in payload: for part in payload['parts']: if part['mimeType'] == 'text/plain': if 'data' in part['body']: import base64 body = base64.urlsafe_b64decode(part['body']['data']).decode('utf-8') break elif part['mimeType'] == 'text/html': if 'data' in part['body']: import base64 body = base64.urlsafe_b64decode(part['body']['data']).decode('utf-8') break else: if payload['mimeType'] == 'text/plain' and 'data' in payload['body']: import base64 body = base64.urlsafe_b64decode(payload['body']['data']).decode('utf-8') return body def _create_message(self, to: str, subject: str, body: str, cc: str = '', bcc: str = '') -> Dict[str, Any]: """Create email message""" import base64 from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart message = MIMEText(body) message['to'] = to message['subject'] = subject if cc: message['cc'] = cc if bcc: message['bcc'] = bcc raw_message = base64.urlsafe_b64encode(message.as_bytes()).decode('utf-8') return {'raw': raw_message} async def run(self): """Run the MCP server""" async with stdio_server() as (read_stream, write_stream): await self.server.run( read_stream, write_stream, InitializationOptions( server_name="gmail-mcp-server", server_version="1.0.0", capabilities=self.server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities=None ) ) ) if __name__ == "__main__": import asyncio server = GmailMCPServer() asyncio.run(server.run())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/nvdpsingh/GmailMCPServer'

If you have feedback or need assistance with the MCP directory API, please join our Discord server