gmail_get_recent_emails
Retrieve recent Gmail emails with metadata and content snippets for headless environments using OAuth tokens, supporting unread filtering and result limits.
Instructions
Get the most recent emails from Gmail (returns metadata, snippets, and first 1k chars of body)
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| google_access_token | No | Google OAuth2 access token | |
| max_results | No | Maximum number of emails to return (default: 10) | |
| unread_only | No | Whether to return only unread emails (default: False) |
Implementation Reference
- Core handler implementing the Gmail API calls to fetch recent emails from INBOX, extract headers, snippets, and first 1000 characters of plain text body.def get_recent_emails(self, max_results: int = 10, unread_only: bool = False) -> str: """Get the most recent emails from Gmail Args: max_results: Maximum number of emails to return (default: 10) unread_only: Whether to return only unread emails (default: False) Returns: JSON string with an array of emails containing metadata, snippets, and first 1k chars of body """ try: # Check if service is initialized if not hasattr(self, 'service'): logger.error("Gmail service not initialized. No valid access token provided.") return json.dumps({ "error": "No valid access token provided. Please refresh your token first.", "status": "error" }) # Define the operation def _operation(): logger.debug(f"Fetching up to {max_results} recent emails from Gmail") # Get list of recent messages query = 'is:unread' if unread_only else '' logger.debug(f"Calling Gmail API to list messages from INBOX with query: '{query}'") try: response = self.service.users().messages().list( userId='me', maxResults=max_results, labelIds=['INBOX'], q=query ).execute() logger.debug(f"API Response received: {json.dumps(response)[:200]}...") except Exception as e: logger.error(f"Error calling Gmail API list: {str(e)}", exc_info=True) return json.dumps({"error": f"Gmail API list error: {str(e)}"}) messages = response.get('messages', []) if not messages: logger.debug("No messages found in the response") return json.dumps({"emails": []}) logger.debug(f"Found {len(messages)} messages, processing details") # Fetch detailed information for each message emails = [] for i, message in enumerate(messages): logger.debug(f"Fetching details for message {i+1}/{len(messages)}, ID: {message['id']}") msg = self.service.users().messages().get( userId='me', id=message['id'], format='full' ).execute() logger.debug(f"Message {message['id']} details received, extracting fields") # Extract headers headers = {} if 'payload' in msg and 'headers' in msg['payload']: for header in msg['payload']['headers']: name = header.get('name', '').lower() if name in ['from', 'to', 'subject', 'date']: headers[name] = header.get('value', '') else: logger.debug(f"Message {message['id']} missing payload or headers fields: {json.dumps(msg)[:200]}...") # Extract plain text body and size body_text = "" body_size_bytes = 0 contains_full_body = True if 'payload' in msg: body_text, body_size_bytes = self.extract_plain_text_body(msg['payload']) # Check if we're returning the full body or truncating if len(body_text) > 1000: body_text = body_text[:1000] contains_full_body = False # Format the email email_data = { "id": msg['id'], "threadId": msg['threadId'], "labelIds": msg.get('labelIds', []), "snippet": msg.get('snippet', ''), "from": headers.get('from', ''), "to": headers.get('to', ''), "subject": headers.get('subject', ''), "date": headers.get('date', ''), "internalDate": msg.get('internalDate', ''), "body": body_text, "body_size_bytes": body_size_bytes, "contains_full_body": contains_full_body } logger.debug(f"Successfully processed message {message['id']}") emails.append(email_data) logger.debug(f"Successfully processed {len(emails)} emails") return json.dumps({"emails": convert_datetime_fields(emails)}) # Execute the operation with token refresh handling return self._handle_token_refresh(_operation) except HttpError as e: logger.error(f"Gmail API Exception: {str(e)}") return json.dumps({"error": str(e)}) except Exception as e: logger.error(f"Exception in get_recent_emails: {str(e)}", exc_info=True) return json.dumps({"error": str(e)})
- src/mcp_server_headless_gmail/server.py:484-495 (registration)Tool registration in MCP server's list_tools() method, including input schema definition.name="gmail_get_recent_emails", description="Get the most recent emails from Gmail (returns metadata, snippets, and first 1k chars of body)", inputSchema={ "type": "object", "properties": { "google_access_token": {"type": "string", "description": "Google OAuth2 access token"}, "max_results": {"type": "integer", "description": "Maximum number of emails to return (default: 10)"}, "unread_only": {"type": "boolean", "description": "Whether to return only unread emails (default: False)"} }, "required": [] }, ),
- MCP server tool dispatch handler that instantiates GmailClient and calls the get_recent_emails method with parsed arguments.if name == "gmail_get_recent_emails": # Initialize Gmail client with just access token logger.debug(f"Initializing Gmail client for get_recent_emails with access token: {access_token[:10]}...") try: gmail = GmailClient( access_token=access_token ) logger.debug("Gmail client initialized successfully") max_results = int(arguments.get("max_results", 10)) unread_only = bool(arguments.get("unread_only", False)) logger.debug(f"Calling get_recent_emails with max_results={max_results} and unread_only={unread_only}") results = gmail.get_recent_emails(max_results=max_results, unread_only=unread_only) logger.debug(f"get_recent_emails result (first 200 chars): {results[:200]}...") return [types.TextContent(type="text", text=results)] except Exception as e: logger.error(f"Exception in gmail_get_recent_emails handler: {str(e)}", exc_info=True) return [types.TextContent(type="text", text=f"Error: {str(e)}")]
- Helper utility to recursively extract plain text body from Gmail message payload parts, decoding base64url data.def extract_plain_text_body(self, msg_payload): """Extract plain text body from message payload Args: msg_payload: Gmail API message payload Returns: tuple: (plain_text_body, body_size_in_bytes) """ body_text = "" body_size = 0 # Helper function to process message parts recursively def extract_from_parts(parts): nonlocal body_text, body_size if not parts: return for part in parts: mime_type = part.get('mimeType', '') # If this part is plain text if mime_type == 'text/plain': body_data = part.get('body', {}).get('data', '') if body_data: # Decode base64url encoded data decoded_bytes = base64.urlsafe_b64decode(body_data) body_size += len(decoded_bytes) body_part = decoded_bytes.decode('utf-8', errors='replace') body_text += body_part # If this part has child parts, process them if 'parts' in part: extract_from_parts(part['parts']) # If body data is directly in the payload if 'body' in msg_payload and 'data' in msg_payload['body']: body_data = msg_payload['body']['data'] if body_data: decoded_bytes = base64.urlsafe_b64decode(body_data) body_size += len(decoded_bytes) body_text = decoded_bytes.decode('utf-8', errors='replace') # If message has parts, process them if 'parts' in msg_payload: extract_from_parts(msg_payload['parts']) return body_text, body_size
- JSON schema definition for the tool's input parameters.inputSchema={ "type": "object", "properties": { "google_access_token": {"type": "string", "description": "Google OAuth2 access token"}, "max_results": {"type": "integer", "description": "Maximum number of emails to return (default: 10)"}, "unread_only": {"type": "boolean", "description": "Whether to return only unread emails (default: False)"} }, "required": [] },