Skip to main content
Glama

Enhanced Gmail MCP Server

server.py75.9 kB
from typing import Any import argparse import os import asyncio import logging import base64 from email.message import EmailMessage from email.header import decode_header from base64 import urlsafe_b64decode from email import message_from_bytes import webbrowser from mcp.server.models import InitializationOptions import mcp.types as types from mcp.server import NotificationOptions, Server import mcp.server.stdio from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from googleapiclient.discovery import build from googleapiclient.errors import HttpError # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) EMAIL_ADMIN_PROMPTS = """You are an email administrator. You can draft, edit, read, trash, open, and send emails. You've been given access to a specific gmail account. You have the following tools available: - Send an email (send-email) - Create a draft email (create-draft) - List draft emails (list-drafts) - Retrieve unread emails (get-unread-emails) - Read email content (read-email) - Trash email (trash-email) - Open email in browser (open-email) - List all labels (list-labels) - Create a new label (create-label) - Apply a label to an email (apply-label) - Remove a label from an email (remove-label) - Rename a label (rename-label) - Delete a label (delete-label) - Search for emails with a specific label (search-by-label) - Search for emails using Gmail's search syntax (search-emails) - List all email filters (list-filters) - Get details of a specific filter (get-filter) - Create a new email filter (create-filter) - Delete a filter (delete-filter) - Create a new folder (create-folder) - Move an email to a folder (move-to-folder) - List all folders (list-folders) - Archive an email (archive-email) - Batch archive emails (batch-archive) - List archived emails (list-archived) - Restore an email to inbox (restore-to-inbox) Never send an email draft or trash an email unless the user confirms first. Always ask for approval if not already given. """ # Define available prompts PROMPTS = { "manage-email": types.Prompt( name="manage-email", description="Act like an email administator", arguments=None, ), "draft-email": types.Prompt( name="draft-email", description="Draft an email with cotent and recipient", arguments=[ types.PromptArgument( name="content", description="What the email is about", required=True ), types.PromptArgument( name="recipient", description="Who should the email be addressed to", required=True ), types.PromptArgument( name="recipient_email", description="Recipient's email address", required=True ), ], ), "edit-draft": types.Prompt( name="edit-draft", description="Edit the existing email draft", arguments=[ types.PromptArgument( name="changes", description="What changes should be made to the draft", required=True ), types.PromptArgument( name="current_draft", description="The current draft to edit", required=True ), ], ), "manage-labels": types.Prompt( name="manage-labels", description="Manage email labels for organization", arguments=[ types.PromptArgument( name="action", description="What action to take with labels (create, list, apply, remove, search)", required=True ), ], ), "manage-filters": types.Prompt( name="manage-filters", description="Manage email filters for automation", arguments=[ types.PromptArgument( name="action", description="What action to take with filters (create, list, view, delete)", required=True ), ], ), "search-emails": types.Prompt( name="search-emails", description="Search for emails using Gmail's search syntax", arguments=[ types.PromptArgument( name="query", description="What to search for in emails", required=True ), ], ), "manage-folders": types.Prompt( name="manage-folders", description="Manage email folders for organization", arguments=[ types.PromptArgument( name="action", description="What action to take with folders (create, list, move)", required=True ), ], ), "manage-archive": types.Prompt( name="manage-archive", description="Manage archived emails", arguments=[ types.PromptArgument( name="action", description="What action to take with archives (archive, batch-archive, list, restore)", required=True ), ], ), } def decode_mime_header(header: str) -> str: """Helper function to decode encoded email headers""" decoded_parts = decode_header(header) decoded_string = '' for part, encoding in decoded_parts: if isinstance(part, bytes): # Decode bytes to string using the specified encoding decoded_string += part.decode(encoding or 'utf-8') else: # Already a string decoded_string += part return decoded_string class GmailService: def __init__(self, creds_file_path: str, token_path: str, scopes: list[str] = ['https://www.googleapis.com/auth/gmail.modify']): logger.info(f"Initializing GmailService with creds file: {creds_file_path}") self.creds_file_path = creds_file_path self.token_path = token_path self.scopes = scopes self.token = self._get_token() logger.info("Token retrieved successfully") self.service = self._get_service() logger.info("Gmail service initialized") self.user_email = self._get_user_email() logger.info(f"User email retrieved: {self.user_email}") def _get_token(self) -> Credentials: """Get or refresh Google API token""" token = None if os.path.exists(self.token_path): logger.info('Loading token from file') token = Credentials.from_authorized_user_file(self.token_path, self.scopes) if not token or not token.valid: if token and token.expired and token.refresh_token: logger.info('Refreshing token') token.refresh(Request()) else: logger.info('Fetching new token') flow = InstalledAppFlow.from_client_secrets_file(self.creds_file_path, self.scopes) token = flow.run_local_server(port=0) with open(self.token_path, 'w') as token_file: token_file.write(token.to_json()) logger.info(f'Token saved to {self.token_path}') return token def _get_service(self) -> Any: """Initialize Gmail API service""" try: service = build('gmail', 'v1', credentials=self.token) return service except HttpError as error: logger.error(f'An error occurred building Gmail service: {error}') raise ValueError(f'An error occurred: {error}') def _get_user_email(self) -> str: """Get user email address""" profile = self.service.users().getProfile(userId='me').execute() user_email = profile.get('emailAddress', '') return user_email async def send_email(self, recipient_id: str, subject: str, message: str,) -> dict: """Creates and sends an email message""" try: message_obj = EmailMessage() message_obj.set_content(message) message_obj['To'] = recipient_id message_obj['From'] = self.user_email message_obj['Subject'] = subject encoded_message = base64.urlsafe_b64encode(message_obj.as_bytes()).decode() create_message = {'raw': encoded_message} send_message = await asyncio.to_thread( self.service.users().messages().send(userId="me", body=create_message).execute ) logger.info(f"Message sent: {send_message['id']}") return {"status": "success", "message_id": send_message["id"]} except HttpError as error: return {"status": "error", "error_message": str(error)} async def open_email(self, email_id: str) -> str: """Opens email in browser given ID.""" try: url = f"https://mail.google.com/#all/{email_id}" webbrowser.open(url, new=0, autoraise=True) return "Email opened in browser successfully." except HttpError as error: return f"An HttpError occurred: {str(error)}" async def get_unread_emails(self) -> list[dict[str, str]]| str: """ Retrieves unread messages from mailbox. Returns list of messsage IDs in key 'id'.""" try: user_id = 'me' query = 'in:inbox is:unread category:primary' response = self.service.users().messages().list(userId=user_id, q=query).execute() messages = [] if 'messages' in response: messages.extend(response['messages']) while 'nextPageToken' in response: page_token = response['nextPageToken'] response = self.service.users().messages().list(userId=user_id, q=query, pageToken=page_token).execute() messages.extend(response['messages']) return messages except HttpError as error: return f"An HttpError occurred: {str(error)}" async def read_email(self, email_id: str) -> dict[str, str]| str: """Retrieves email contents including to, from, subject, and contents.""" try: msg = self.service.users().messages().get(userId="me", id=email_id, format='raw').execute() email_metadata = {} # Decode the base64URL encoded raw content raw_data = msg['raw'] decoded_data = urlsafe_b64decode(raw_data) # Parse the RFC 2822 email mime_message = message_from_bytes(decoded_data) # Extract the email body body = None if mime_message.is_multipart(): for part in mime_message.walk(): # Extract the text/plain part if part.get_content_type() == "text/plain": body = part.get_payload(decode=True).decode() break else: # For non-multipart messages body = mime_message.get_payload(decode=True).decode() email_metadata['content'] = body # Extract metadata email_metadata['subject'] = decode_mime_header(mime_message.get('subject', '')) email_metadata['from'] = mime_message.get('from','') email_metadata['to'] = mime_message.get('to','') email_metadata['date'] = mime_message.get('date','') logger.info(f"Email read: {email_id}") # We want to mark email as read once we read it await self.mark_email_as_read(email_id) return email_metadata except HttpError as error: return f"An HttpError occurred: {str(error)}" async def trash_email(self, email_id: str) -> str: """Moves email to trash given ID.""" try: self.service.users().messages().trash(userId="me", id=email_id).execute() logger.info(f"Email moved to trash: {email_id}") return "Email moved to trash successfully." except HttpError as error: return f"An HttpError occurred: {str(error)}" async def mark_email_as_read(self, email_id: str) -> str: """Marks email as read given ID.""" try: self.service.users().messages().modify(userId="me", id=email_id, body={'removeLabelIds': ['UNREAD']}).execute() logger.info(f"Email marked as read: {email_id}") return "Email marked as read." except HttpError as error: return f"An HttpError occurred: {str(error)}" async def create_draft(self, recipient_id: str, subject: str, message: str) -> dict: """Creates a draft email message""" try: message_obj = EmailMessage() message_obj.set_content(message) message_obj['To'] = recipient_id message_obj['From'] = self.user_email message_obj['Subject'] = subject encoded_message = base64.urlsafe_b64encode(message_obj.as_bytes()).decode() create_message = {'raw': encoded_message} draft = await asyncio.to_thread( self.service.users().drafts().create(userId="me", body={'message': create_message}).execute ) logger.info(f"Draft created: {draft['id']}") return {"status": "success", "draft_id": draft["id"]} except HttpError as error: return {"status": "error", "error_message": str(error)} async def list_drafts(self) -> list[dict] | str: """Lists all draft emails""" try: results = await asyncio.to_thread( self.service.users().drafts().list(userId="me").execute ) drafts = results.get('drafts', []) draft_list = [] for draft in drafts: draft_id = draft['id'] # Get the draft details to extract subject and recipient draft_data = await asyncio.to_thread( self.service.users().drafts().get(userId="me", id=draft_id).execute ) message = draft_data.get('message', {}) headers = message.get('payload', {}).get('headers', []) subject = next((header['value'] for header in headers if header['name'].lower() == 'subject'), 'No Subject') to = next((header['value'] for header in headers if header['name'].lower() == 'to'), 'No Recipient') draft_list.append({ 'id': draft_id, 'subject': subject, 'to': to }) return draft_list except HttpError as error: return f"An HttpError occurred: {str(error)}" async def list_labels(self) -> list[dict] | str: """Lists all labels in the user's mailbox""" try: results = await asyncio.to_thread( self.service.users().labels().list(userId="me").execute ) labels = results.get('labels', []) label_list = [] for label in labels: label_list.append({ 'id': label['id'], 'name': label['name'], 'type': label['type'] # 'system' or 'user' }) return label_list except HttpError as error: return f"An HttpError occurred: {str(error)}" async def create_label(self, name: str) -> dict | str: """Creates a new label""" try: label_object = { 'name': name, 'labelListVisibility': 'labelShow', # Show in label list 'messageListVisibility': 'show' # Show in message list } created_label = await asyncio.to_thread( self.service.users().labels().create(userId="me", body=label_object).execute ) logger.info(f"Label created: {created_label['id']}") return { 'status': 'success', 'label_id': created_label['id'], 'name': created_label['name'] } except HttpError as error: return {"status": "error", "error_message": str(error)} async def apply_label(self, email_id: str, label_id: str) -> str: """Applies a label to an email""" try: await asyncio.to_thread( self.service.users().messages().modify( userId="me", id=email_id, body={'addLabelIds': [label_id]} ).execute ) logger.info(f"Label {label_id} applied to email {email_id}") return f"Label applied successfully to email." except HttpError as error: return f"An HttpError occurred: {str(error)}" async def remove_label(self, email_id: str, label_id: str) -> str: """Removes a label from an email""" try: await asyncio.to_thread( self.service.users().messages().modify( userId="me", id=email_id, body={'removeLabelIds': [label_id]} ).execute ) logger.info(f"Label {label_id} removed from email {email_id}") return f"Label removed successfully from email." except HttpError as error: return f"An HttpError occurred: {str(error)}" async def search_by_label(self, label_id: str) -> list[dict] | str: """Searches for emails with a specific label""" try: query = f"label:{label_id}" response = await asyncio.to_thread( self.service.users().messages().list(userId="me", q=query).execute ) messages = [] if 'messages' in response: messages.extend(response['messages']) while 'nextPageToken' in response: page_token = response['nextPageToken'] response = await asyncio.to_thread( self.service.users().messages().list( userId="me", q=query, pageToken=page_token ).execute ) messages.extend(response['messages']) return messages except HttpError as error: return f"An HttpError occurred: {str(error)}" async def list_filters(self) -> list[dict] | str: """Lists all filters in the user's mailbox""" try: results = await asyncio.to_thread( self.service.users().settings().filters().list(userId="me").execute ) filters = results.get('filter', []) return filters except HttpError as error: return f"An HttpError occurred: {str(error)}" async def get_filter(self, filter_id: str) -> dict | str: """Gets a specific filter by ID""" try: filter_data = await asyncio.to_thread( self.service.users().settings().filters().get(userId="me", id=filter_id).execute ) return filter_data except HttpError as error: return f"An HttpError occurred: {str(error)}" async def create_filter(self, from_email: str = None, to_email: str = None, subject: str = None, query: str = None, has_attachment: bool = None, exclude_chats: bool = None, size_comparison: str = None, size: int = None, add_label_ids: list[str] = None, remove_label_ids: list[str] = None, forward_to: str = None) -> dict | str: """Creates a new email filter Args: from_email: Email from a specific sender to_email: Email to a specific recipient subject: Email with a specific subject query: Email matching a custom query has_attachment: Email has an attachment exclude_chats: Exclude chats from filter size_comparison: 'larger' or 'smaller' size: Size in bytes for comparison add_label_ids: Labels to add to matching emails remove_label_ids: Labels to remove from matching emails forward_to: Email address to forward matching emails to """ try: # Build the filter criteria criteria = {} if from_email: criteria['from'] = from_email if to_email: criteria['to'] = to_email if subject: criteria['subject'] = subject if query: criteria['query'] = query if has_attachment is not None: criteria['hasAttachment'] = has_attachment if exclude_chats is not None: criteria['excludeChats'] = exclude_chats if size_comparison and size: if size_comparison.lower() == 'larger': criteria['sizeComparison'] = 'larger' criteria['size'] = size elif size_comparison.lower() == 'smaller': criteria['sizeComparison'] = 'smaller' criteria['size'] = size # Build the filter actions action = {} if add_label_ids: action['addLabelIds'] = add_label_ids if remove_label_ids: action['removeLabelIds'] = remove_label_ids if forward_to: action['forward'] = forward_to # Create the filter filter_object = { 'criteria': criteria, 'action': action } created_filter = await asyncio.to_thread( self.service.users().settings().filters().create( userId="me", body=filter_object ).execute ) logger.info(f"Filter created: {created_filter['id']}") return { 'status': 'success', 'filter_id': created_filter['id'], 'filter': created_filter } except HttpError as error: return {"status": "error", "error_message": str(error)} async def delete_filter(self, filter_id: str) -> str: """Deletes a filter by ID""" try: await asyncio.to_thread( self.service.users().settings().filters().delete( userId="me", id=filter_id ).execute ) logger.info(f"Filter deleted: {filter_id}") return f"Filter deleted successfully." except HttpError as error: return f"An HttpError occurred: {str(error)}" async def search_emails(self, query: str, max_results: int = 50) -> list[dict] | str: """ Searches for emails using Gmail's search syntax. Args: query: Gmail search query (e.g., 'from:example@gmail.com', 'subject:hello', etc.) max_results: Maximum number of results to return (default: 50) Returns: List of message objects or error message """ try: user_id = 'me' response = await asyncio.to_thread( self.service.users().messages().list( userId=user_id, q=query, maxResults=max_results ).execute ) messages = [] if 'messages' in response: messages.extend(response['messages']) # Get additional pages if available and needed while 'nextPageToken' in response and len(messages) < max_results: page_token = response['nextPageToken'] response = await asyncio.to_thread( self.service.users().messages().list( userId=user_id, q=query, pageToken=page_token, maxResults=max_results - len(messages) ).execute ) if 'messages' in response: messages.extend(response['messages']) # Get basic metadata for each message result_messages = [] for msg in messages: msg_data = await asyncio.to_thread( self.service.users().messages().get( userId=user_id, id=msg['id'], format='metadata', metadataHeaders=['Subject', 'From', 'Date'] ).execute ) headers = msg_data.get('payload', {}).get('headers', []) subject = next((header['value'] for header in headers if header['name'].lower() == 'subject'), 'No Subject') sender = next((header['value'] for header in headers if header['name'].lower() == 'from'), 'Unknown Sender') date = next((header['value'] for header in headers if header['name'].lower() == 'date'), '') result_messages.append({ 'id': msg['id'], 'threadId': msg['threadId'], 'subject': subject, 'from': sender, 'date': date, 'snippet': msg_data.get('snippet', '') }) return result_messages except HttpError as error: return f"An HttpError occurred: {str(error)}" async def create_folder(self, name: str) -> dict | str: """ Creates a new folder (implemented as a label with special handling). Args: name: Name of the folder to create Returns: Dictionary with status and folder information or error message """ try: # In Gmail, folders are just labels with special visibility settings label_object = { 'name': name, 'labelListVisibility': 'labelShow', 'messageListVisibility': 'show', 'type': 'user' # Ensure it's a user label } created_label = await asyncio.to_thread( self.service.users().labels().create(userId="me", body=label_object).execute ) logger.info(f"Folder created: {created_label['id']}") return { 'status': 'success', 'folder_id': created_label['id'], 'name': created_label['name'] } except HttpError as error: return {"status": "error", "error_message": str(error)} async def move_to_folder(self, email_id: str, folder_id: str) -> str: """ Moves an email to a folder by: 1. Applying the folder label 2. Removing the INBOX label (to remove from inbox) Args: email_id: ID of the email to move folder_id: ID of the folder (label) to move to Returns: Success or error message """ try: # First, apply the folder label await asyncio.to_thread( self.service.users().messages().modify( userId="me", id=email_id, body={'addLabelIds': [folder_id], 'removeLabelIds': ['INBOX']} ).execute ) logger.info(f"Email {email_id} moved to folder {folder_id}") return f"Email moved to folder successfully." except HttpError as error: return f"An HttpError occurred: {str(error)}" async def list_folders(self) -> list[dict] | str: """ Lists all user-created labels (folders) Returns: List of folder information or error message """ try: results = await asyncio.to_thread( self.service.users().labels().list(userId="me").execute ) labels = results.get('labels', []) # Filter to only include user-created labels (folders) folders = [ { 'id': label['id'], 'name': label['name'] } for label in labels if label['type'] == 'user' ] return folders except HttpError as error: return f"An HttpError occurred: {str(error)}" async def rename_label(self, label_id: str, new_name: str) -> dict | str: """ Renames an existing label Args: label_id: ID of the label to rename new_name: New name for the label Returns: Dictionary with status and updated label information or error message """ try: # First, get the current label to preserve its settings label = await asyncio.to_thread( self.service.users().labels().get(userId="me", id=label_id).execute ) # Update only the name field label['name'] = new_name # Update the label updated_label = await asyncio.to_thread( self.service.users().labels().update( userId="me", id=label_id, body=label ).execute ) logger.info(f"Label renamed: {label_id} to {new_name}") return { 'status': 'success', 'label_id': updated_label['id'], 'name': updated_label['name'] } except HttpError as error: return {"status": "error", "error_message": str(error)} async def delete_label(self, label_id: str) -> str: """ Deletes a label Args: label_id: ID of the label to delete Returns: Success or error message """ try: await asyncio.to_thread( self.service.users().labels().delete( userId="me", id=label_id ).execute ) logger.info(f"Label deleted: {label_id}") return f"Label deleted successfully." except HttpError as error: return f"An HttpError occurred: {str(error)}" async def archive_email(self, email_id: str) -> str: """ Archives an email by removing the INBOX label Args: email_id: ID of the email to archive Returns: Success or error message """ try: await asyncio.to_thread( self.service.users().messages().modify( userId="me", id=email_id, body={'removeLabelIds': ['INBOX']} ).execute ) logger.info(f"Email archived: {email_id}") return f"Email archived successfully." except HttpError as error: return f"An HttpError occurred: {str(error)}" async def batch_archive(self, query: str, max_emails: int = 100) -> dict: """ Archives multiple emails matching a search query Args: query: Gmail search query to find emails to archive max_emails: Maximum number of emails to archive in one batch Returns: Dictionary with status and count of archived emails """ try: # First, search for emails matching the query user_id = 'me' response = await asyncio.to_thread( self.service.users().messages().list( userId=user_id, q=query, maxResults=max_emails ).execute ) messages = [] if 'messages' in response: messages.extend(response['messages']) if not messages: return { 'status': 'success', 'archived_count': 0, 'message': 'No emails found matching the query.' } # Archive each email in the batch archived_count = 0 for msg in messages: try: await asyncio.to_thread( self.service.users().messages().modify( userId="me", id=msg['id'], body={'removeLabelIds': ['INBOX']} ).execute ) archived_count += 1 except Exception as e: logger.error(f"Error archiving email {msg['id']}: {str(e)}") logger.info(f"Batch archived {archived_count} emails") return { 'status': 'success', 'archived_count': archived_count, 'total_found': len(messages), 'message': f"Successfully archived {archived_count} out of {len(messages)} emails." } except HttpError as error: return { 'status': 'error', 'error_message': str(error) } async def list_archived(self, max_results: int = 50) -> list[dict] | str: """ Lists archived emails (emails not in inbox) Args: max_results: Maximum number of results to return Returns: List of archived email objects or error message """ try: # Search for emails that are in "All Mail" but not in "Inbox" query = "-in:inbox" # Use the existing search_emails method return await self.search_emails(query, max_results) except Exception as error: return f"An error occurred: {str(error)}" async def restore_to_inbox(self, email_id: str) -> str: """ Restores an archived email to the inbox Args: email_id: ID of the email to restore Returns: Success or error message """ try: await asyncio.to_thread( self.service.users().messages().modify( userId="me", id=email_id, body={'addLabelIds': ['INBOX']} ).execute ) logger.info(f"Email restored to inbox: {email_id}") return f"Email restored to inbox successfully." except HttpError as error: return f"An HttpError occurred: {str(error)}" async def main(creds_file_path: str, token_path: str): gmail_service = GmailService(creds_file_path, token_path) server = Server("gmail") @server.list_prompts() async def list_prompts() -> list[types.Prompt]: return list(PROMPTS.values()) @server.get_prompt() async def get_prompt( name: str, arguments: dict[str, str] | None = None ) -> types.GetPromptResult: if name not in PROMPTS: raise ValueError(f"Prompt not found: {name}") if name == "manage-email": return types.GetPromptResult( messages=[ types.PromptMessage( role="user", content=types.TextContent( type="text", text=EMAIL_ADMIN_PROMPTS, ) ) ] ) if name == "draft-email": content = arguments.get("content", "") recipient = arguments.get("recipient", "") recipient_email = arguments.get("recipient_email", "") # First message asks the LLM to create the draft return types.GetPromptResult( messages=[ types.PromptMessage( role="user", content=types.TextContent( type="text", text=f"""Please draft an email about {content} for {recipient} ({recipient_email}). Include a subject line starting with 'Subject:' on the first line. Do not send the email yet, just draft it and ask the user for their thoughts.""" ) ) ] ) elif name == "edit-draft": changes = arguments.get("changes", "") current_draft = arguments.get("current_draft", "") # Edit existing draft based on requested changes return types.GetPromptResult( messages=[ types.PromptMessage( role="user", content=types.TextContent( type="text", text=f"""Please revise the current email draft: {current_draft} Requested changes: {changes} Please provide the updated draft.""" ) ) ] ) elif name == "manage-labels": action = arguments.get("action", "") # Guide the LLM on how to manage labels return types.GetPromptResult( messages=[ types.PromptMessage( role="user", content=types.TextContent( type="text", text=f"""I need help with managing my email labels. Specifically, I want to {action}. Here are the tools you can use for label management: - list-labels: Lists all existing labels in my Gmail account - create-label: Creates a new label with a specified name - apply-label: Applies a label to a specific email - remove-label: Removes a label from a specific email - rename-label: Renames an existing label - delete-label: Permanently deletes a label - search-by-label: Finds all emails with a specific label Please help me {action} by using the appropriate tools. If you need to list labels first to get label IDs, please do so.""" ) ) ] ) elif name == "manage-filters": action = arguments.get("action", "") # Guide the LLM on how to manage filters return types.GetPromptResult( messages=[ types.PromptMessage( role="user", content=types.TextContent( type="text", text=f"""I need help with managing my email filters. Specifically, I want to {action}. Here are the tools you can use for filter management: - list-filters: Lists all existing filters in my Gmail account - get-filter: Gets details of a specific filter - create-filter: Creates a new filter - delete-filter: Deletes a specific filter Please help me {action} by using the appropriate tools. If you need to list filters first to get filter IDs, please do so.""" ) ) ] ) elif name == "search-emails": query = arguments.get("query", "") # Guide the LLM on how to search emails return types.GetPromptResult( messages=[ types.PromptMessage( role="user", content=types.TextContent( type="text", text=f"""I need to search through my emails for: {query} Here are the tools you can use for searching emails: - search-emails: Searches all emails using Gmail's search syntax - get-unread-emails: Gets only unread emails from the inbox Please help me find emails matching my search criteria. You can use Gmail's search syntax for advanced searches: - from:sender - Emails from a specific sender - to:recipient - Emails to a specific recipient - subject:text - Emails with specific text in the subject - has:attachment - Emails with attachments - after:YYYY/MM/DD - Emails after a specific date - before:YYYY/MM/DD - Emails before a specific date - is:important - Important emails - label:name - Emails with a specific label Please search for emails matching: {query}""" ) ) ] ) elif name == "manage-folders": action = arguments.get("action", "") # Guide the LLM on how to manage folders return types.GetPromptResult( messages=[ types.PromptMessage( role="user", content=types.TextContent( type="text", text=f"""I need help with managing my email folders. Specifically, I want to {action}. Here are the tools you can use for folder management: - list-folders: Lists all existing folders in my Gmail account - create-folder: Creates a new folder with a specified name - move-to-folder: Moves an email to a specific folder (removes it from inbox) Please help me {action} by using the appropriate tools. If you need to list folders first to get folder IDs, please do so. Note: In Gmail, folders are implemented as labels with special handling. When you move an email to a folder, it applies the folder's label and removes the email from the inbox.""" ) ) ] ) elif name == "manage-archive": action = arguments.get("action", "") # Guide the LLM on how to manage archives return types.GetPromptResult( messages=[ types.PromptMessage( role="user", content=types.TextContent( type="text", text=f"""I need help with managing my email archives. Specifically, I want to {action}. Here are the tools you can use for archive management: - archive-email: Archives a single email (removes from inbox without deleting) - batch-archive: Archives multiple emails matching a search query - list-archived: Lists emails that have been archived - restore-to-inbox: Restores an archived email back to the inbox Please help me {action} by using the appropriate tools. For batch archiving, you can use Gmail's search syntax to find emails to archive: - from:sender - Emails from a specific sender - older_than:30d - Emails older than 30 days - has:attachment - Emails with attachments - subject:text - Emails with specific text in the subject - before:YYYY/MM/DD - Emails before a specific date Note: Archiving in Gmail means removing the email from your inbox while keeping it accessible in "All Mail". It's a great way to declutter your inbox without losing any emails.""" ) ) ] ) raise ValueError("Prompt implementation not found") @server.list_tools() async def handle_list_tools() -> list[types.Tool]: return [ types.Tool( name="send-email", description="""Sends email to recipient. Do not use if user only asked to draft email. Drafts must be approved before sending.""", inputSchema={ "type": "object", "properties": { "recipient_id": { "type": "string", "description": "Recipient email address", }, "subject": { "type": "string", "description": "Email subject", }, "message": { "type": "string", "description": "Email content text", }, }, "required": ["recipient_id", "subject", "message"], }, ), types.Tool( name="trash-email", description="""Moves email to trash. Confirm before moving email to trash.""", inputSchema={ "type": "object", "properties": { "email_id": { "type": "string", "description": "Email ID", }, }, "required": ["email_id"], }, ), types.Tool( name="get-unread-emails", description="Retrieve unread emails", inputSchema={ "type": "object", "properties": {}, "required": [] }, ), types.Tool( name="read-email", description="Retrieves given email content", inputSchema={ "type": "object", "properties": { "email_id": { "type": "string", "description": "Email ID", }, }, "required": ["email_id"], }, ), types.Tool( name="mark-email-as-read", description="Marks given email as read", inputSchema={ "type": "object", "properties": { "email_id": { "type": "string", "description": "Email ID", }, }, "required": ["email_id"], }, ), types.Tool( name="open-email", description="Open email in browser", inputSchema={ "type": "object", "properties": { "email_id": { "type": "string", "description": "Email ID", }, }, "required": ["email_id"], }, ), types.Tool( name="create-draft", description="Creates a draft email without sending it", inputSchema={ "type": "object", "properties": { "recipient_id": { "type": "string", "description": "Recipient email address", }, "subject": { "type": "string", "description": "Email subject", }, "message": { "type": "string", "description": "Email content text", }, }, "required": ["recipient_id", "subject", "message"], }, ), types.Tool( name="list-drafts", description="Lists all draft emails", inputSchema={ "type": "object", "properties": {}, "required": [] }, ), types.Tool( name="list-labels", description="Lists all labels in the user's mailbox", inputSchema={ "type": "object", "properties": {}, "required": [] }, ), types.Tool( name="create-label", description="Creates a new label", inputSchema={ "type": "object", "properties": { "name": { "type": "string", "description": "Label name", }, }, "required": ["name"], }, ), types.Tool( name="apply-label", description="Applies a label to an email", inputSchema={ "type": "object", "properties": { "email_id": { "type": "string", "description": "Email ID", }, "label_id": { "type": "string", "description": "Label ID", }, }, "required": ["email_id", "label_id"], }, ), types.Tool( name="remove-label", description="Removes a label from an email", inputSchema={ "type": "object", "properties": { "email_id": { "type": "string", "description": "Email ID", }, "label_id": { "type": "string", "description": "Label ID", }, }, "required": ["email_id", "label_id"], }, ), types.Tool( name="rename-label", description="Renames an existing label", inputSchema={ "type": "object", "properties": { "label_id": { "type": "string", "description": "Label ID to rename", }, "new_name": { "type": "string", "description": "New name for the label", }, }, "required": ["label_id", "new_name"], }, ), types.Tool( name="delete-label", description="Permanently deletes a label", inputSchema={ "type": "object", "properties": { "label_id": { "type": "string", "description": "Label ID to delete", }, }, "required": ["label_id"], }, ), types.Tool( name="search-by-label", description="Searches for emails with a specific label", inputSchema={ "type": "object", "properties": { "label_id": { "type": "string", "description": "Label ID", }, }, "required": ["label_id"], }, ), types.Tool( name="list-filters", description="Lists all email filters in the user's mailbox", inputSchema={ "type": "object", "properties": {}, "required": [] }, ), types.Tool( name="get-filter", description="Gets details of a specific filter", inputSchema={ "type": "object", "properties": { "filter_id": { "type": "string", "description": "Filter ID", }, }, "required": ["filter_id"], }, ), types.Tool( name="create-filter", description="Creates a new email filter", inputSchema={ "type": "object", "properties": { "from_email": { "type": "string", "description": "Filter emails from this sender", }, "to_email": { "type": "string", "description": "Filter emails to this recipient", }, "subject": { "type": "string", "description": "Filter emails with this subject", }, "query": { "type": "string", "description": "Filter emails matching this query", }, "has_attachment": { "type": "boolean", "description": "Filter emails with attachments", }, "exclude_chats": { "type": "boolean", "description": "Exclude chats from filter", }, "size_comparison": { "type": "string", "description": "Size comparison ('larger' or 'smaller')", }, "size": { "type": "integer", "description": "Size in bytes for comparison", }, "add_label_ids": { "type": "array", "items": { "type": "string" }, "description": "Labels to add to matching emails", }, "remove_label_ids": { "type": "array", "items": { "type": "string" }, "description": "Labels to remove from matching emails", }, "forward_to": { "type": "string", "description": "Email address to forward matching emails to", }, }, }, ), types.Tool( name="delete-filter", description="Deletes a specific filter", inputSchema={ "type": "object", "properties": { "filter_id": { "type": "string", "description": "Filter ID", }, }, "required": ["filter_id"], }, ), types.Tool( name="search-emails", description="Searches for emails using Gmail's search syntax", inputSchema={ "type": "object", "properties": { "query": { "type": "string", "description": "Gmail search query", }, "max_results": { "type": "integer", "description": "Maximum number of results to return", }, }, "required": ["query"], }, ), types.Tool( name="create-folder", description="Creates a new folder", inputSchema={ "type": "object", "properties": { "name": { "type": "string", "description": "Folder name", }, }, "required": ["name"], }, ), types.Tool( name="move-to-folder", description="Moves an email to a folder", inputSchema={ "type": "object", "properties": { "email_id": { "type": "string", "description": "Email ID", }, "folder_id": { "type": "string", "description": "Folder ID", }, }, "required": ["email_id", "folder_id"], }, ), types.Tool( name="list-folders", description="Lists all user-created folders", inputSchema={ "type": "object", "properties": {}, "required": [] }, ), types.Tool( name="archive-email", description="Archives an email (removes from inbox without deleting)", inputSchema={ "type": "object", "properties": { "email_id": { "type": "string", "description": "Email ID to archive", }, }, "required": ["email_id"], }, ), types.Tool( name="batch-archive", description="Archives multiple emails matching a search query", inputSchema={ "type": "object", "properties": { "query": { "type": "string", "description": "Gmail search query to find emails to archive", }, "max_emails": { "type": "integer", "description": "Maximum number of emails to archive (default: 100)", }, }, "required": ["query"], }, ), types.Tool( name="list-archived", description="Lists archived emails (not in inbox)", inputSchema={ "type": "object", "properties": { "max_results": { "type": "integer", "description": "Maximum number of results to return", }, }, "required": [], }, ), types.Tool( name="restore-to-inbox", description="Restores an archived email back to the inbox", inputSchema={ "type": "object", "properties": { "email_id": { "type": "string", "description": "Email ID to restore to inbox", }, }, "required": ["email_id"], }, ), ] @server.call_tool() async def handle_call_tool( name: str, arguments: dict | None ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: if name == "send-email": recipient = arguments.get("recipient_id") if not recipient: raise ValueError("Missing recipient parameter") subject = arguments.get("subject") if not subject: raise ValueError("Missing subject parameter") message = arguments.get("message") if not message: raise ValueError("Missing message parameter") # Extract subject and message content email_lines = message.split('\n') if email_lines[0].startswith('Subject:'): subject = email_lines[0][8:].strip() message_content = '\n'.join(email_lines[1:]).strip() else: message_content = message send_response = await gmail_service.send_email(recipient, subject, message_content) if send_response["status"] == "success": response_text = f"Email sent successfully. Message ID: {send_response['message_id']}" else: response_text = f"Failed to send email: {send_response['error_message']}" return [types.TextContent(type="text", text=response_text)] if name == "get-unread-emails": unread_emails = await gmail_service.get_unread_emails() return [types.TextContent(type="text", text=str(unread_emails),artifact={"type": "json", "data": unread_emails} )] if name == "read-email": email_id = arguments.get("email_id") if not email_id: raise ValueError("Missing email ID parameter") retrieved_email = await gmail_service.read_email(email_id) return [types.TextContent(type="text", text=str(retrieved_email),artifact={"type": "dictionary", "data": retrieved_email} )] if name == "open-email": email_id = arguments.get("email_id") if not email_id: raise ValueError("Missing email ID parameter") msg = await gmail_service.open_email(email_id) return [types.TextContent(type="text", text=str(msg))] if name == "trash-email": email_id = arguments.get("email_id") if not email_id: raise ValueError("Missing email ID parameter") msg = await gmail_service.trash_email(email_id) return [types.TextContent(type="text", text=str(msg))] if name == "mark-email-as-read": email_id = arguments.get("email_id") if not email_id: raise ValueError("Missing email ID parameter") msg = await gmail_service.mark_email_as_read(email_id) return [types.TextContent(type="text", text=str(msg))] elif name == "create-draft": recipient_id = arguments.get("recipient_id") subject = arguments.get("subject") message = arguments.get("message") if not recipient_id or not subject or not message: raise ValueError("Missing required parameters for creating a draft") draft_response = await gmail_service.create_draft(recipient_id, subject, message) if draft_response["status"] == "success": response_text = f"Draft created successfully. Draft ID: {draft_response['draft_id']}" else: response_text = f"Failed to create draft: {draft_response['error_message']}" return [types.TextContent(type="text", text=response_text)] elif name == "list-drafts": drafts = await gmail_service.list_drafts() return [types.TextContent(type="text", text=str(drafts), artifact={"type": "json", "data": drafts})] elif name == "list-labels": labels = await gmail_service.list_labels() return [types.TextContent(type="text", text=str(labels), artifact={"type": "json", "data": labels})] elif name == "create-label": name = arguments.get("name") if not name: raise ValueError("Missing required parameter for creating a label") label_response = await gmail_service.create_label(name) if label_response["status"] == "success": response_text = f"Label created successfully. Label ID: {label_response['label_id']}, Name: {label_response['name']}" else: response_text = f"Failed to create label: {label_response['error_message']}" return [types.TextContent(type="text", text=response_text)] elif name == "apply-label": email_id = arguments.get("email_id") label_id = arguments.get("label_id") if not email_id or not label_id: raise ValueError("Missing required parameters for applying a label") msg = await gmail_service.apply_label(email_id, label_id) return [types.TextContent(type="text", text=str(msg))] elif name == "remove-label": email_id = arguments.get("email_id") label_id = arguments.get("label_id") if not email_id or not label_id: raise ValueError("Missing required parameters for removing a label") msg = await gmail_service.remove_label(email_id, label_id) return [types.TextContent(type="text", text=str(msg))] elif name == "search-by-label": label_id = arguments.get("label_id") if not label_id: raise ValueError("Missing required parameter for searching by label") messages = await gmail_service.search_by_label(label_id) return [types.TextContent(type="text", text=str(messages), artifact={"type": "json", "data": messages})] elif name == "list-filters": filters = await gmail_service.list_filters() return [types.TextContent(type="text", text=str(filters), artifact={"type": "json", "data": filters})] elif name == "get-filter": filter_id = arguments.get("filter_id") if not filter_id: raise ValueError("Missing required parameter for getting a filter") filter_data = await gmail_service.get_filter(filter_id) return [types.TextContent(type="text", text=str(filter_data), artifact={"type": "dictionary", "data": filter_data})] elif name == "create-filter": from_email = arguments.get("from_email") to_email = arguments.get("to_email") subject = arguments.get("subject") query = arguments.get("query") has_attachment = arguments.get("has_attachment") exclude_chats = arguments.get("exclude_chats") size_comparison = arguments.get("size_comparison") size = arguments.get("size") add_label_ids = arguments.get("add_label_ids") remove_label_ids = arguments.get("remove_label_ids") forward_to = arguments.get("forward_to") if not from_email and not to_email and not subject and not query and has_attachment is None and exclude_chats is None and size_comparison is None and size is None and add_label_ids is None and remove_label_ids is None and forward_to is None: raise ValueError("Missing required parameters for creating a filter") filter_response = await gmail_service.create_filter(from_email, to_email, subject, query, has_attachment, exclude_chats, size_comparison, size, add_label_ids, remove_label_ids, forward_to) if filter_response["status"] == "success": response_text = f"Filter created successfully. Filter ID: {filter_response['filter_id']}, Filter: {filter_response['filter']}" else: response_text = f"Failed to create filter: {filter_response['error_message']}" return [types.TextContent(type="text", text=response_text)] elif name == "delete-filter": filter_id = arguments.get("filter_id") if not filter_id: raise ValueError("Missing required parameter for deleting a filter") msg = await gmail_service.delete_filter(filter_id) return [types.TextContent(type="text", text=str(msg))] elif name == "search-emails": query = arguments.get("query") max_results = arguments.get("max_results", 50) if not query: raise ValueError("Missing required parameter for searching emails") messages = await gmail_service.search_emails(query, max_results) return [types.TextContent(type="text", text=str(messages), artifact={"type": "json", "data": messages})] elif name == "create-folder": name = arguments.get("name") if not name: raise ValueError("Missing required parameter for creating a folder") folder_response = await gmail_service.create_folder(name) if folder_response["status"] == "success": response_text = f"Folder created successfully. Folder ID: {folder_response['folder_id']}, Name: {folder_response['name']}" else: response_text = f"Failed to create folder: {folder_response['error_message']}" return [types.TextContent(type="text", text=response_text)] elif name == "move-to-folder": email_id = arguments.get("email_id") folder_id = arguments.get("folder_id") if not email_id or not folder_id: raise ValueError("Missing required parameters for moving an email to a folder") msg = await gmail_service.move_to_folder(email_id, folder_id) return [types.TextContent(type="text", text=str(msg))] elif name == "list-folders": folders = await gmail_service.list_folders() return [types.TextContent(type="text", text=str(folders), artifact={"type": "json", "data": folders})] elif name == "rename-label": label_id = arguments.get("label_id") new_name = arguments.get("new_name") if not label_id or not new_name: raise ValueError("Missing required parameters for renaming a label") rename_response = await gmail_service.rename_label(label_id, new_name) if rename_response["status"] == "success": response_text = f"Label renamed successfully. Label ID: {rename_response['label_id']}, New name: {rename_response['name']}" else: response_text = f"Failed to rename label: {rename_response['error_message']}" return [types.TextContent(type="text", text=response_text)] elif name == "delete-label": label_id = arguments.get("label_id") if not label_id: raise ValueError("Missing required parameter for deleting a label") msg = await gmail_service.delete_label(label_id) return [types.TextContent(type="text", text=str(msg))] elif name == "archive-email": email_id = arguments.get("email_id") if not email_id: raise ValueError("Missing required parameter for archiving an email") msg = await gmail_service.archive_email(email_id) return [types.TextContent(type="text", text=str(msg))] elif name == "batch-archive": query = arguments.get("query") max_emails = arguments.get("max_emails", 100) if not query: raise ValueError("Missing required parameter for batch archiving") archive_response = await gmail_service.batch_archive(query, max_emails) return [types.TextContent(type="text", text=str(archive_response))] elif name == "list-archived": max_results = arguments.get("max_results", 50) archived_emails = await gmail_service.list_archived(max_results) return [types.TextContent(type="text", text=str(archived_emails), artifact={"type": "json", "data": archived_emails})] elif name == "restore-to-inbox": email_id = arguments.get("email_id") if not email_id: raise ValueError("Missing required parameter for restoring an email to inbox") msg = await gmail_service.restore_to_inbox(email_id) return [types.TextContent(type="text", text=str(msg))] else: logger.error(f"Unknown tool: {name}") raise ValueError(f"Unknown tool: {name}") async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, InitializationOptions( server_name="gmail", server_version="0.1.0", capabilities=server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={}, ), ), ) if __name__ == "__main__": parser = argparse.ArgumentParser(description='Gmail API MCP Server') parser.add_argument('--creds-file-path', required=True, help='OAuth 2.0 credentials file path') parser.add_argument('--token-path', required=True, help='File location to store and retrieve access and refresh tokens for application') args = parser.parse_args() asyncio.run(main(args.creds_file_path, args.token_path))

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/theposch/gmail-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server