Skip to main content
Glama

Gmail Plugin MCP Server

server.py34.5 kB
from typing import Any import argparse import os import asyncio import logging import base64 from email.message import EmailMessage from email.header import decode_header from base64 import urlsafe_b64decode from email import message_from_bytes import webbrowser from mcp.server.models import InitializationOptions import mcp.types as types from mcp.server import NotificationOptions, Server import mcp.server.stdio from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from googleapiclient.discovery import build from googleapiclient.errors import HttpError # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) EMAIL_ADMIN_PROMPTS = """You are an email administrator. You can draft, edit, read, trash, open, and send emails. You've been given access to a specific gmail account. You have the following tools available: - Send an email (send-email) - Retrieve unread emails (get-unread-emails) - Read email content (read-email) - Trash email (tras-email) - Open email in browser (open-email) Never send an email draft or trash an email unless the user confirms first. Always ask for approval if not already given. """ # Define available prompts PROMPTS = { "manage-email": types.Prompt( name="manage-email", description="Act like an email administator", arguments=None, ), "draft-email": types.Prompt( name="draft-email", description="Draft an email with cotent and recipient", arguments=[ types.PromptArgument( name="content", description="What the email is about", required=True ), types.PromptArgument( name="recipient", description="Who should the email be addressed to", required=True ), types.PromptArgument( name="recipient_email", description="Recipient's email address", required=True ), ], ), "edit-draft": types.Prompt( name="edit-draft", description="Edit the existing email draft", arguments=[ types.PromptArgument( name="changes", description="What changes should be made to the draft", required=True ), types.PromptArgument( name="current_draft", description="The current draft to edit", required=True ), ], ), } def decode_mime_header(header: str) -> str: """Helper function to decode encoded email headers""" decoded_parts = decode_header(header) decoded_string = '' for part, encoding in decoded_parts: if isinstance(part, bytes): # Decode bytes to string using the specified encoding decoded_string += part.decode(encoding or 'utf-8') else: # Already a string decoded_string += part return decoded_string class GmailService: def __init__(self, creds_file_path: str, token_path: str , scopes: list[str] = ['https://www.googleapis.com/auth/gmail.modify']): logger.info(f"Initializing GmailService with creds file: {creds_file_path}") self.creds_file_path = creds_file_path self.token_path = token_path self.scopes = scopes self.token = self._get_token() logger.info("Token retrieved successfully") self.service = self._get_service() logger.info("Gmail service initialized") self.user_email = self._get_user_email() logger.info(f"User email retrieved: {self.user_email}") def _get_token(self) -> Credentials: """Get or refresh Google API token""" token = None if os.path.exists(self.token_path): logger.info('Loading token from file') token = Credentials.from_authorized_user_file(self.token_path, self.scopes) if not token or not token.valid: if token and token.expired and token.refresh_token: logger.info('Refreshing token') token.refresh(Request()) else: logger.info('Fetching new token') flow = InstalledAppFlow.from_client_secrets_file(self.creds_file_path, self.scopes) token = flow.run_local_server(port=0) with open(self.token_path, 'w') as token_file: token_file.write(token.to_json()) logger.info(f'Token saved to {self.token_path}') return token def _get_service(self) -> Any: """Initialize Gmail API service""" try: service = build('gmail', 'v1', credentials=self.token) return service except HttpError as error: logger.error(f'An error occurred building Gmail service: {error}') raise ValueError(f'An error occurred: {error}') def _get_user_email(self) -> str: """Get user email address""" profile = self.service.users().getProfile(userId='me').execute() user_email = profile.get('emailAddress', '') return user_email async def send_email(self, recipient_id: str, subject: str, message: str,) -> dict: """Creates and sends an email message""" try: message_obj = EmailMessage() message_obj.set_content(message) message_obj['To'] = recipient_id message_obj['From'] = self.user_email message_obj['Subject'] = subject encoded_message = base64.urlsafe_b64encode(message_obj.as_bytes()).decode() create_message = {'raw': encoded_message} send_message = await asyncio.to_thread( self.service.users().messages().send(userId="me", body=create_message).execute ) logger.info(f"Message sent: {send_message['id']}") return {"status": "success", "message_id": send_message["id"]} except HttpError as error: return {"status": "error", "error_message": str(error)} async def open_email(self, email_id: str) -> str: """Opens email in browser given ID.""" try: url = f"https://mail.google.com/#all/{email_id}" webbrowser.open(url, new=0, autoraise=True) return "Email opened in browser successfully." except HttpError as error: return f"An HttpError occurred: {str(error)}" async def get_unread_emails(self) -> list[dict]: """ Retrieves unread messages from mailbox. Returns a list of messages with their details. """ try: user_id = 'me' query = 'in:inbox is:unread category:primary' # Get message IDs first response = self.service.users().messages().list( userId=user_id, q=query, maxResults=10 # Limit to prevent timeout ).execute() message_ids = [] if 'messages' in response: message_ids.extend(response['messages']) # Get actual message details messages = [] for msg_id in message_ids: msg = self.service.users().messages().get( userId=user_id, id=msg_id['id'], format='metadata', # Use 'full' if you need the complete message metadataHeaders=['Subject', 'From', 'Date'] ).execute() # Extract and format relevant information headers = msg.get('payload', {}).get('headers', []) email_data = { 'id': msg['id'], 'threadId': msg['threadId'], 'subject': next((h['value'] for h in headers if h['name'] == 'Subject'), 'No Subject'), 'from': next((h['value'] for h in headers if h['name'] == 'From'), 'Unknown'), 'date': next((h['value'] for h in headers if h['name'] == 'Date'), 'Unknown'), 'snippet': msg.get('snippet', '') } messages.append(email_data) return messages except HttpError as error: print(f"An HttpError occurred: {str(error)}") return [] except Exception as e: print(f"An unexpected error occurred: {str(e)}") return [] async def read_email(self, email_id: str) -> dict[str, str]| str: """Retrieves email contents including to, from, subject, and contents.""" try: msg = self.service.users().messages().get(userId="me", id=email_id, format='raw').execute() email_metadata = {} # Decode the base64URL encoded raw content raw_data = msg['raw'] decoded_data = urlsafe_b64decode(raw_data) # Parse the RFC 2822 email mime_message = message_from_bytes(decoded_data) # Extract the email body body = None if mime_message.is_multipart(): for part in mime_message.walk(): # Extract the text/plain part if part.get_content_type() == "text/plain": body = part.get_payload(decode=True).decode() break else: # For non-multipart messages body = mime_message.get_payload(decode=True).decode() email_metadata['content'] = body # Extract metadata email_metadata['subject'] = decode_mime_header(mime_message.get('subject', '')) email_metadata['from'] = mime_message.get('from','') email_metadata['to'] = mime_message.get('to','') email_metadata['date'] = mime_message.get('date','') logger.info(f"Email read: {email_id}") # We want to mark email as read once we read it await self.mark_email_as_read(email_id) return email_metadata except HttpError as error: return f"An HttpError occurred: {str(error)}" async def trash_email(self, email_id: str) -> str: """Moves email to trash given ID.""" try: self.service.users().messages().trash(userId="me", id=email_id).execute() logger.info(f"Email moved to trash: {email_id}") return "Email moved to trash successfully." except HttpError as error: return f"An HttpError occurred: {str(error)}" async def mark_email_as_read(self, email_id: str) -> str: """Marks email as read given ID.""" try: self.service.users().messages().modify(userId="me", id=email_id, body={'removeLabelIds': ['UNREAD']}).execute() logger.info(f"Email marked as read: {email_id}") return "Email marked as read." except HttpError as error: return f"An HttpError occurred: {str(error)}" async def main(creds_file_path: str, token_path: str): gmail_service = GmailService(creds_file_path, token_path) server = Server("gmail") @server.list_prompts() async def list_prompts() -> list[types.Prompt]: return list(PROMPTS.values()) @server.get_prompt() async def get_prompt( name: str, arguments: dict[str, str] | None = None ) -> types.GetPromptResult: if name not in PROMPTS: raise ValueError(f"Prompt not found: {name}") if name == "manage-email": return types.GetPromptResult( messages=[ types.PromptMessage( role="user", content=types.TextContent( type="text", text=EMAIL_ADMIN_PROMPTS, ) ) ] ) if name == "draft-email": content = arguments.get("content", "") recipient = arguments.get("recipient", "") recipient_email = arguments.get("recipient_email", "") # First message asks the LLM to create the draft return types.GetPromptResult( messages=[ types.PromptMessage( role="user", content=types.TextContent( type="text", text=f"""Please draft an email about {content} for {recipient} ({recipient_email}). Include a subject line starting with 'Subject:' on the first line. Do not send the email yet, just draft it and ask the user for their thoughts.""" ) ) ] ) elif name == "edit-draft": changes = arguments.get("changes", "") current_draft = arguments.get("current_draft", "") # Edit existing draft based on requested changes return types.GetPromptResult( messages=[ types.PromptMessage( role="user", content=types.TextContent( type="text", text=f"""Please revise the current email draft: {current_draft} Requested changes: {changes} Please provide the updated draft.""" ) ) ] ) raise ValueError("Prompt implementation not found") @server.list_tools() async def handle_list_tools() -> list[types.Tool]: return [ types.Tool( name="send-email", description="""Sends email to recipient. Do not use if user only asked to draft email. Drafts must be approved before sending.""", inputSchema={ "type": "object", "properties": { "recipient_id": { "type": "string", "description": "Recipient email address", }, "subject": { "type": "string", "description": "Email subject", }, "message": { "type": "string", "description": "Email content text", }, }, "required": ["recipient_id", "subject", "message"], }, ), types.Tool( name="trash-email", description="""Moves email to trash. Confirm before moving email to trash.""", inputSchema={ "type": "object", "properties": { "email_id": { "type": "string", "description": "Email ID", }, }, "required": ["email_id"], }, ), types.Tool( name="get-unread-emails", description="Retrieve unread emails", inputSchema={ "type": "object", "properties": {}, "required": [] }, ), types.Tool( name="read-email", description="Retrieves given email content", inputSchema={ "type": "object", "properties": { "email_id": { "type": "string", "description": "Email ID", }, }, "required": ["email_id"], }, ), types.Tool( name="mark-email-as-read", description="Marks given email as read", inputSchema={ "type": "object", "properties": { "email_id": { "type": "string", "description": "Email ID", }, }, "required": ["email_id"], }, ), types.Tool( name="open-email", description="Open email in browser", inputSchema={ "type": "object", "properties": { "email_id": { "type": "string", "description": "Email ID", }, }, "required": ["email_id"], }, ), ] @server.call_tool() async def handle_call_tool( name: str, arguments: dict | None ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: if name == "send-email": recipient = arguments.get("recipient_id") if not recipient: raise ValueError("Missing recipient parameter") subject = arguments.get("subject") if not subject: raise ValueError("Missing subject parameter") message = arguments.get("message") if not message: raise ValueError("Missing message parameter") # Extract subject and message content email_lines = message.split('\n') if email_lines[0].startswith('Subject:'): subject = email_lines[0][8:].strip() message_content = '\n'.join(email_lines[1:]).strip() else: message_content = message send_response = await gmail_service.send_email(recipient, subject, message_content) if send_response["status"] == "success": response_text = f"Email sent successfully. Message ID: {send_response['message_id']}" else: response_text = f"Failed to send email: {send_response['error_message']}" return [types.TextContent(type="text", text=response_text)] if name == "get-unread-emails": unread_emails = await gmail_service.get_unread_emails() return [types.TextContent(type="text", text=str(unread_emails),artifact={"type": "json", "data": unread_emails} )] if name == "read-email": email_id = arguments.get("email_id") if not email_id: raise ValueError("Missing email ID parameter") retrieved_email = await gmail_service.read_email(email_id) return [types.TextContent(type="text", text=str(retrieved_email),artifact={"type": "dictionary", "data": retrieved_email} )] if name == "open-email": email_id = arguments.get("email_id") if not email_id: raise ValueError("Missing email ID parameter") msg = await gmail_service.open_email(email_id) return [types.TextContent(type="text", text=str(msg))] if name == "trash-email": email_id = arguments.get("email_id") if not email_id: raise ValueError("Missing email ID parameter") msg = await gmail_service.trash_email(email_id) return [types.TextContent(type="text", text=str(msg))] if name == "mark-email-as-read": email_id = arguments.get("email_id") if not email_id: raise ValueError("Missing email ID parameter") msg = await gmail_service.mark_email_as_read(email_id) return [types.TextContent(type="text", text=str(msg))] else: logger.error(f"Unknown tool: {name}") raise ValueError(f"Unknown tool: {name}") async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, InitializationOptions( server_name="gmail", server_version="0.1.0", capabilities=server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={}, ), ), ) if __name__ == "__main__": parser = argparse.ArgumentParser(description='Gmail API MCP Server') parser.add_argument('--creds-file-path', required=True, help='OAuth 2.0 credentials file path') parser.add_argument('--token-path', required=True, help='File location to store and retrieve access and refresh tokens for application') args = parser.parse_args() asyncio.run(main(args.creds_file_path, args.token_path)) # # path = C:/Users/sanch/Desktop/gmail_plugin/gmail-plugin #!/usr/bin/env python3 # import os # import base64 # import json # import time # import email # import asyncio # import logging # from aiohttp import web # import socket # from email.mime.text import MIMEText # from google.oauth2.credentials import Credentials # from google_auth_oauthlib.flow import InstalledAppFlow # from google.auth.transport.requests import Request # from googleapiclient.discovery import build # # Configure logging # logging.basicConfig( # level=logging.INFO, # format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' # ) # logger = logging.getLogger(__name__) # # Gmail API configuration # SCOPES = ['https://www.googleapis.com/auth/gmail.readonly', # 'https://www.googleapis.com/auth/gmail.send'] # TOKEN_FILE = 'token.json' # CREDENTIALS_FILE = 'credentials.json' # # MCP Server configuration # MCP_HOST = '127.0.0.1' # MCP_PORT = 9999 # WEB_PORT = 8080 # class GmailClient: # def __init__(self): # self.service = None # self.authenticated = False # def authenticate(self): # """Authenticate with Gmail API.""" # creds = None # if os.path.exists(TOKEN_FILE): # try: # creds = Credentials.from_authorized_user_info( # json.loads(open(TOKEN_FILE).read()), SCOPES) # except Exception as e: # logger.error(f"Error loading credentials: {e}") # if not creds or not creds.valid: # if creds and creds.expired and creds.refresh_token: # try: # creds.refresh(Request()) # except Exception as e: # logger.error(f"Error refreshing credentials: {e}") # creds = None # if not creds: # if not os.path.exists(CREDENTIALS_FILE): # logger.error(f"Credentials file not found: {CREDENTIALS_FILE}") # return False # try: # flow = InstalledAppFlow.from_client_secrets_file( # CREDENTIALS_FILE, SCOPES) # creds = flow.run_local_server(port=0) # except Exception as e: # logger.error(f"Error in authentication flow: {e}") # return False # # Save the credentials for the next run # with open(TOKEN_FILE, 'w') as token: # token.write(creds.to_json()) # try: # self.service = build('gmail', 'v1', credentials=creds) # self.authenticated = True # logger.info("Successfully authenticated with Gmail API") # return True # except Exception as e: # logger.error(f"Error building Gmail service: {e}") # return False # async def get_messages(self, query="", max_results=10): # """Get messages from Gmail.""" # if not self.authenticated: # if not self.authenticate(): # return [] # try: # results = self.service.users().messages().list( # userId='me', q=query, maxResults=max_results).execute() # messages = results.get('messages', []) # detailed_messages = [] # for msg in messages: # message = self.service.users().messages().get( # userId='me', id=msg['id']).execute() # headers = {} # for header in message['payload']['headers']: # headers[header['name']] = header['value'] # subject = headers.get('Subject', '(No Subject)') # sender = headers.get('From', '(Unknown Sender)') # date = headers.get('Date', '') # body = "" # if 'parts' in message['payload']: # for part in message['payload']['parts']: # if part['mimeType'] == 'text/plain': # body = base64.urlsafe_b64decode( # part['body']['data']).decode('utf-8') # break # elif 'body' in message['payload'] and 'data' in message['payload']['body']: # body = base64.urlsafe_b64decode( # message['payload']['body']['data']).decode('utf-8') # detailed_messages.append({ # 'id': msg['id'], # 'threadId': message['threadId'], # 'subject': subject, # 'sender': sender, # 'date': date, # 'snippet': message['snippet'], # 'body': body # }) # return detailed_messages # except Exception as e: # logger.error(f"Error fetching messages: {e}") # return [] # async def send_message(self, to, subject, body): # """Send a message through Gmail.""" # if not self.authenticated: # if not self.authenticate(): # return False # try: # msg = MIMEText(body) # msg['to'] = to # msg['subject'] = subject # raw_message = base64.urlsafe_b64encode(msg.as_bytes()).decode('utf-8') # message = self.service.users().messages().send( # userId='me', body={'raw': raw_message}).execute() # logger.info(f"Message sent. Message ID: {message['id']}") # return True # except Exception as e: # logger.error(f"Error sending message: {e}") # return False # class MCPServer: # def __init__(self, gmail_client): # self.gmail_client = gmail_client # self.clients = set() # self.server = None # async def start_server(self): # """Start the MCP server.""" # self.server = await asyncio.start_server( # self.handle_client, MCP_HOST, MCP_PORT) # addr = self.server.sockets[0].getsockname() # logger.info(f'MCP Server started on {addr}') # async with self.server: # await self.server.serve_forever() # async def handle_client(self, reader, writer): # """Handle MCP client connections.""" # addr = writer.get_extra_info('peername') # logger.info(f'New client connection from {addr}') # self.clients.add(writer) # try: # while True: # data = await reader.readline() # if not data: # break # message = data.decode('utf-8').strip() # logger.info(f'Received: {message} from {addr}') # try: # cmd = json.loads(message) # response = await self.process_command(cmd) # writer.write(json.dumps(response).encode('utf-8') + b'\n') # await writer.drain() # except json.JSONDecodeError: # writer.write(b'{"status": "error", "message": "Invalid JSON"}\n') # await writer.drain() # except Exception as e: # logger.error(f"Error handling client: {e}") # finally: # writer.close() # await writer.wait_closed() # self.clients.remove(writer) # logger.info(f'Connection closed for {addr}') # async def process_command(self, cmd): # """Process MCP commands.""" # try: # command = cmd.get('command', '') # if command == 'get_messages': # query = cmd.get('query', '') # max_results = cmd.get('max_results', 10) # messages = await self.gmail_client.get_messages(query, max_results) # return {'status': 'ok', 'messages': messages} # elif command == 'send_message': # to = cmd.get('to', '') # subject = cmd.get('subject', '') # body = cmd.get('body', '') # if not to or not subject or not body: # return {'status': 'error', 'message': 'Missing required fields'} # success = await self.gmail_client.send_message(to, subject, body) # if success: # return {'status': 'ok', 'message': 'Message sent successfully'} # else: # return {'status': 'error', 'message': 'Failed to send message'} # elif command == 'ping': # return {'status': 'ok', 'pong': time.time()} # else: # return {'status': 'error', 'message': f'Unknown command: {command}'} # except Exception as e: # logger.error(f"Error processing command: {e}") # return {'status': 'error', 'message': str(e)} # class WebServer: # def __init__(self, gmail_client): # self.gmail_client = gmail_client # self.app = web.Application() # self.setup_routes() # def setup_routes(self): # """Set up web routes.""" # self.app.add_routes([ # web.get('/', self.handle_index), # web.get('/api/messages', self.handle_get_messages), # web.post('/api/send', self.handle_send_message), # web.get('/auth', self.handle_auth), # ]) # async def handle_index(self, request): # """Handle index route.""" # return web.Response(text="Gmail-MCP Server API", content_type='text/plain') # async def handle_get_messages(self, request): # """Handle API request to get messages.""" # query = request.query.get('q', '') # max_results = int(request.query.get('max', 10)) # messages = await self.gmail_client.get_messages(query, max_results) # return web.json_response({'status': 'ok', 'messages': messages}) # async def handle_send_message(self, request): # """Handle API request to send a message.""" # try: # data = await request.json() # to = data.get('to', '') # subject = data.get('subject', '') # body = data.get('body', '') # if not to or not subject or not body: # return web.json_response( # {'status': 'error', 'message': 'Missing required fields'}, # status=400 # ) # success = await self.gmail_client.send_message(to, subject, body) # if success: # return web.json_response({'status': 'ok', 'message': 'Message sent successfully'}) # else: # return web.json_response( # {'status': 'error', 'message': 'Failed to send message'}, # status=500 # ) # except Exception as e: # return web.json_response( # {'status': 'error', 'message': str(e)}, # status=500 # ) # async def handle_auth(self, request): # """Handle authentication.""" # if self.gmail_client.authenticate(): # return web.Response(text="Authentication successful") # else: # return web.Response(text="Authentication failed", status=500) # async def start_server(self): # """Start the web server.""" # runner = web.AppRunner(self.app) # await runner.setup() # site = web.TCPSite(runner, 'localhost', WEB_PORT) # await site.start() # logger.info(f'Web server started on http://localhost:{WEB_PORT}') # async def main(): # """Main entry point.""" # gmail_client = GmailClient() # # Initialize the MCP server # mcp_server = MCPServer(gmail_client) # # Initialize the web server # web_server = WebServer(gmail_client) # # Run both servers concurrently # await asyncio.gather( # mcp_server.start_server(), # web_server.start_server() # ) # if __name__ == '__main__': # try: # asyncio.run(main()) # except KeyboardInterrupt: # logger.info("Server shutdown by user")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/sanchisingh01/MCP-Server---Gmail-Plugin-for-Claude-Desktop'

If you have feedback or need assistance with the MCP directory API, please join our Discord server