Skip to main content
Glama

gmail_get_recent_emails

Retrieve recent Gmail emails with metadata, snippets, and content previews for automated email monitoring and processing workflows.

Instructions

Get the most recent emails from Gmail (returns metadata, snippets, and first 1k chars of body)

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
google_access_tokenYesGoogle OAuth2 access token
max_resultsNoMaximum number of emails to return (default: 10)
unread_onlyNoWhether to return only unread emails (default: False)

Implementation Reference

  • Core handler implementation in GmailClient.get_recent_emails: fetches recent INBOX emails via Gmail API, extracts headers, snippet, truncates plain text body to 1000 chars, returns JSON.
    def get_recent_emails(self, max_results: int = 10, unread_only: bool = False) -> str: """Get the most recent emails from Gmail Args: max_results: Maximum number of emails to return (default: 10) unread_only: Whether to return only unread emails (default: False) Returns: JSON string with an array of emails containing metadata, snippets, and first 1k chars of body """ try: # Check if service is initialized if not hasattr(self, 'service'): logger.error("Gmail service not initialized. No valid access token provided.") return json.dumps({ "error": "No valid access token provided. Please refresh your token first.", "status": "error" }) # Define the operation def _operation(): logger.debug(f"Fetching up to {max_results} recent emails from Gmail") # Get list of recent messages query = 'is:unread' if unread_only else '' logger.debug(f"Calling Gmail API to list messages from INBOX with query: '{query}'") try: response = self.service.users().messages().list( userId='me', maxResults=max_results, labelIds=['INBOX'], q=query ).execute() logger.debug(f"API Response received: {json.dumps(response)[:200]}...") except Exception as e: logger.error(f"Error calling Gmail API list: {str(e)}", exc_info=True) return json.dumps({"error": f"Gmail API list error: {str(e)}"}) messages = response.get('messages', []) if not messages: logger.debug("No messages found in the response") return json.dumps({"emails": []}) logger.debug(f"Found {len(messages)} messages, processing details") # Fetch detailed information for each message emails = [] for i, message in enumerate(messages): logger.debug(f"Fetching details for message {i+1}/{len(messages)}, ID: {message['id']}") msg = self.service.users().messages().get( userId='me', id=message['id'], format='full' ).execute() logger.debug(f"Message {message['id']} details received, extracting fields") # Extract headers headers = {} if 'payload' in msg and 'headers' in msg['payload']: for header in msg['payload']['headers']: name = header.get('name', '').lower() if name in ['from', 'to', 'subject', 'date']: headers[name] = header.get('value', '') else: logger.debug(f"Message {message['id']} missing payload or headers fields: {json.dumps(msg)[:200]}...") # Extract plain text body and size body_text = "" body_size_bytes = 0 contains_full_body = True if 'payload' in msg: body_text, body_size_bytes = self.extract_plain_text_body(msg['payload']) # Check if we're returning the full body or truncating if len(body_text) > 1000: body_text = body_text[:1000] contains_full_body = False # Format the email email_data = { "id": msg['id'], "threadId": msg['threadId'], "labelIds": msg.get('labelIds', []), "snippet": msg.get('snippet', ''), "from": headers.get('from', ''), "to": headers.get('to', ''), "subject": headers.get('subject', ''), "date": headers.get('date', ''), "internalDate": msg.get('internalDate', ''), "body": body_text, "body_size_bytes": body_size_bytes, "contains_full_body": contains_full_body } logger.debug(f"Successfully processed message {message['id']}") emails.append(email_data) logger.debug(f"Successfully processed {len(emails)} emails") return json.dumps({"emails": convert_datetime_fields(emails)}) # Execute the operation with token refresh handling return self._handle_token_refresh(_operation) except HttpError as e: logger.error(f"Gmail API Exception: {str(e)}") return json.dumps({"error": str(e)}) except Exception as e: logger.error(f"Exception in get_recent_emails: {str(e)}", exc_info=True) return json.dumps({"error": str(e)})
  • MCP tool dispatch handler in @server.call_tool(): extracts params, initializes GmailClient, calls get_recent_emails, handles errors.
    if name == "gmail_get_recent_emails": # Initialize Gmail client with just access token logger.debug(f"Initializing Gmail client for get_recent_emails with access token: {access_token[:10]}...") try: gmail = GmailClient( access_token=access_token ) logger.debug("Gmail client initialized successfully") max_results = int(arguments.get("max_results", 10)) unread_only = bool(arguments.get("unread_only", False)) logger.debug(f"Calling get_recent_emails with max_results={max_results} and unread_only={unread_only}") results = gmail.get_recent_emails(max_results=max_results, unread_only=unread_only) logger.debug(f"get_recent_emails result (first 200 chars): {results[:200]}...") return [types.TextContent(type="text", text=results)] except Exception as e: logger.error(f"Exception in gmail_get_recent_emails handler: {str(e)}", exc_info=True) return [types.TextContent(type="text", text=f"Error: {str(e)}")]
  • Tool registration in @server.list_tools() with description and input schema definition.
    types.Tool( name="gmail_get_recent_emails", description="Get the most recent emails from Gmail (returns metadata, snippets, and first 1k chars of body)", inputSchema={ "type": "object", "properties": { "google_access_token": {"type": "string", "description": "Google OAuth2 access token"}, "max_results": {"type": "integer", "description": "Maximum number of emails to return (default: 10)"}, "unread_only": {"type": "boolean", "description": "Whether to return only unread emails (default: False)"} }, "required": ["google_access_token"] }, ),
  • Helper function to recursively extract plain text body from Gmail message payload MIME parts.
    def extract_plain_text_body(self, msg_payload): """Extract plain text body from message payload Args: msg_payload: Gmail API message payload Returns: tuple: (plain_text_body, body_size_in_bytes) """ body_text = "" body_size = 0 # Helper function to process message parts recursively def extract_from_parts(parts): nonlocal body_text, body_size if not parts: return for part in parts: mime_type = part.get('mimeType', '') # If this part is plain text if mime_type == 'text/plain': body_data = part.get('body', {}).get('data', '') if body_data: # Decode base64url encoded data decoded_bytes = base64.urlsafe_b64decode(body_data) body_size += len(decoded_bytes) body_part = decoded_bytes.decode('utf-8', errors='replace') body_text += body_part # If this part has child parts, process them if 'parts' in part: extract_from_parts(part['parts']) # If body data is directly in the payload if 'body' in msg_payload and 'data' in msg_payload['body']: body_data = msg_payload['body']['data'] if body_data: decoded_bytes = base64.urlsafe_b64decode(body_data) body_size += len(decoded_bytes) body_text = decoded_bytes.decode('utf-8', errors='replace') # If message has parts, process them if 'parts' in msg_payload: extract_from_parts(msg_payload['parts']) return body_text, body_size
  • JS version core handler in GmailClient.getRecentEmails: similar logic to Python, fetches recent emails, extracts data.
    async getRecentEmails({ maxResults = 10, unreadOnly = false } = {}) { const operation = async () => { if (!this.gmail) { throw new Error('Gmail service not initialized. No valid access token provided.'); } const query = unreadOnly ? 'is:unread' : ''; const res = await this.gmail.users.messages.list({ userId: 'me', maxResults, labelIds: ['INBOX'], q: query }); const messages = res.data.messages || []; const emails = []; for (const message of messages) { const msg = await this.gmail.users.messages.get({ userId: 'me', id: message.id, format: 'full' }); const payload = msg.data.payload || {}; const headers = (payload.headers || []).reduce((acc, h) => { const name = h.name.toLowerCase(); if (['from', 'to', 'subject', 'date'].includes(name)) { acc[name] = h.value; } return acc; }, {}); const { body, body_size_bytes, contains_full_body } = this.extractPlainTextBody(payload); emails.push({ id: msg.data.id, threadId: msg.data.threadId, labelIds: msg.data.labelIds, snippet: msg.data.snippet, from: headers.from || '', to: headers.to || '', subject: headers.subject || '', date: headers.date || '', internalDate: msg.data.internalDate, body, body_size_bytes, contains_full_body }); } return JSON.stringify({ emails }); }; return await this._handleTokenRefresh(operation); }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/baryhuang/mcp-headless-gmail'

If you have feedback or need assistance with the MCP directory API, please join our Discord server