Skip to main content
Glama
AbhinavBansal17

MCP Headless Gmail Server

gmail_get_recent_emails

Retrieve recent Gmail emails with metadata, snippets, and message previews for headless environments using OAuth tokens.

Instructions

Get the most recent emails from Gmail (returns metadata, snippets, and first 1k chars of body)

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
google_access_tokenNoGoogle OAuth2 access token
max_resultsNoMaximum number of emails to return (default: 10)
unread_onlyNoWhether to return only unread emails (default: False)

Implementation Reference

  • Core handler function in GmailClient class that fetches recent emails from Gmail INBOX using Google API, extracts headers and plain text body (truncated to 1000 chars), handles token refresh, and returns JSON array of email data.
    def get_recent_emails(self, max_results: int = 10, unread_only: bool = False) -> str: """Get the most recent emails from Gmail Args: max_results: Maximum number of emails to return (default: 10) unread_only: Whether to return only unread emails (default: False) Returns: JSON string with an array of emails containing metadata, snippets, and first 1k chars of body """ try: # Check if service is initialized if not hasattr(self, 'service'): logger.error("Gmail service not initialized. No valid access token provided.") return json.dumps({ "error": "No valid access token provided. Please refresh your token first.", "status": "error" }) # Define the operation def _operation(): logger.debug(f"Fetching up to {max_results} recent emails from Gmail") # Get list of recent messages query = 'is:unread' if unread_only else '' logger.debug(f"Calling Gmail API to list messages from INBOX with query: '{query}'") try: response = self.service.users().messages().list( userId='me', maxResults=max_results, labelIds=['INBOX'], q=query ).execute() logger.debug(f"API Response received: {json.dumps(response)[:200]}...") except Exception as e: logger.error(f"Error calling Gmail API list: {str(e)}", exc_info=True) return json.dumps({"error": f"Gmail API list error: {str(e)}"}) messages = response.get('messages', []) if not messages: logger.debug("No messages found in the response") return json.dumps({"emails": []}) logger.debug(f"Found {len(messages)} messages, processing details") # Fetch detailed information for each message emails = [] for i, message in enumerate(messages): logger.debug(f"Fetching details for message {i+1}/{len(messages)}, ID: {message['id']}") msg = self.service.users().messages().get( userId='me', id=message['id'], format='full' ).execute() logger.debug(f"Message {message['id']} details received, extracting fields") # Extract headers headers = {} if 'payload' in msg and 'headers' in msg['payload']: for header in msg['payload']['headers']: name = header.get('name', '').lower() if name in ['from', 'to', 'subject', 'date']: headers[name] = header.get('value', '') else: logger.debug(f"Message {message['id']} missing payload or headers fields: {json.dumps(msg)[:200]}...") # Extract plain text body and size body_text = "" body_size_bytes = 0 contains_full_body = True if 'payload' in msg: body_text, body_size_bytes = self.extract_plain_text_body(msg['payload']) # Check if we're returning the full body or truncating if len(body_text) > 1000: body_text = body_text[:1000] contains_full_body = False # Format the email email_data = { "id": msg['id'], "threadId": msg['threadId'], "labelIds": msg.get('labelIds', []), "snippet": msg.get('snippet', ''), "from": headers.get('from', ''), "to": headers.get('to', ''), "subject": headers.get('subject', ''), "date": headers.get('date', ''), "internalDate": msg.get('internalDate', ''), "body": body_text, "body_size_bytes": body_size_bytes, "contains_full_body": contains_full_body } logger.debug(f"Successfully processed message {message['id']}") emails.append(email_data) logger.debug(f"Successfully processed {len(emails)} emails") return json.dumps({"emails": convert_datetime_fields(emails)}) # Execute the operation with token refresh handling return self._handle_token_refresh(_operation) except HttpError as e: logger.error(f"Gmail API Exception: {str(e)}") return json.dumps({"error": str(e)}) except Exception as e: logger.error(f"Exception in get_recent_emails: {str(e)}", exc_info=True) return json.dumps({"error": str(e)})
  • Tool registration in the MCP server's list_tools handler, defining the tool name, description, and input schema.
    name="gmail_get_recent_emails", description="Get the most recent emails from Gmail (returns metadata, snippets, and first 1k chars of body)", inputSchema={ "type": "object", "properties": { "google_access_token": {"type": "string", "description": "Google OAuth2 access token"}, "max_results": {"type": "integer", "description": "Maximum number of emails to return (default: 10)"}, "unread_only": {"type": "boolean", "description": "Whether to return only unread emails (default: False)"} }, "required": [] }, ),
  • Core handler function in JS GmailClient class that fetches recent emails from Gmail INBOX using Google API, extracts headers and plain text body (truncated to 1000 chars), and returns JSON.
    async getRecentEmails({ maxResults = 10, unreadOnly = false } = {}) { const operation = async () => { if (!this.gmail) { throw new Error('Gmail service not initialized. No valid access token provided.'); } const query = unreadOnly ? 'is:unread' : ''; const res = await this.gmail.users.messages.list({ userId: 'me', maxResults, labelIds: ['INBOX'], q: query }); const messages = res.data.messages || []; const emails = []; for (const message of messages) { const msg = await this.gmail.users.messages.get({ userId: 'me', id: message.id, format: 'full' }); const payload = msg.data.payload || {}; const headers = (payload.headers || []).reduce((acc, h) => { const name = h.name.toLowerCase(); if (['from', 'to', 'subject', 'date'].includes(name)) { acc[name] = h.value; } return acc; }, {}); const { body, body_size_bytes, contains_full_body } = this.extractPlainTextBody(payload); emails.push({ id: msg.data.id, threadId: msg.data.threadId, labelIds: msg.data.labelIds, snippet: msg.data.snippet, from: headers.from || '', to: headers.to || '', subject: headers.subject || '', date: headers.date || '', internalDate: msg.data.internalDate, body, body_size_bytes, contains_full_body }); } return JSON.stringify({ emails }); }; return await this._handleTokenRefresh(operation); }
  • server.js:315-333 (registration)
    Tool registration using server.tool in the JS MCP server, including schema with Zod and the dispatch handler that instantiates GmailClient and calls getRecentEmails.
    server.tool( 'gmail_get_recent_emails', 'Get the most recent emails from Gmail (returns metadata, snippets, and first 1k chars of body)', { google_access_token: z.string().describe('Google OAuth2 access token'), max_results: z.number().optional().describe('Maximum number of emails to return (default: 10)'), unread_only: z.boolean().optional().describe('Whether to return only unread emails (default: False)') }, async ({ google_access_token, max_results = 10, unread_only = false }) => { try { const gmail = new GmailClient({ accessToken: google_access_token }); const result = await gmail.getRecentEmails({ maxResults: max_results, unreadOnly: unread_only }); return { content: [{ type: 'text', text: result }] }; } catch (error) { return { content: [{ type: 'text', text: `Error: ${error.message}` }] }; } }
  • Helper method to recursively extract plain text body from Gmail message payload, decoding base64url data and accumulating size.
    def extract_plain_text_body(self, msg_payload): """Extract plain text body from message payload Args: msg_payload: Gmail API message payload Returns: tuple: (plain_text_body, body_size_in_bytes) """ body_text = "" body_size = 0 # Helper function to process message parts recursively def extract_from_parts(parts): nonlocal body_text, body_size if not parts: return for part in parts: mime_type = part.get('mimeType', '') # If this part is plain text if mime_type == 'text/plain': body_data = part.get('body', {}).get('data', '') if body_data: # Decode base64url encoded data decoded_bytes = base64.urlsafe_b64decode(body_data) body_size += len(decoded_bytes) body_part = decoded_bytes.decode('utf-8', errors='replace') body_text += body_part # If this part has child parts, process them if 'parts' in part: extract_from_parts(part['parts']) # If body data is directly in the payload if 'body' in msg_payload and 'data' in msg_payload['body']: body_data = msg_payload['body']['data'] if body_data: decoded_bytes = base64.urlsafe_b64decode(body_data) body_size += len(decoded_bytes) body_text = decoded_bytes.decode('utf-8', errors='replace') # If message has parts, process them if 'parts' in msg_payload: extract_from_parts(msg_payload['parts']) return body_text, body_size

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/AbhinavBansal17/mcp-headless-gmail'

If you have feedback or need assistance with the MCP directory API, please join our Discord server