Skip to main content
Glama
AbhinavBansal17

MCP Headless Gmail Server

gmail_get_recent_emails

Retrieve recent Gmail emails with metadata and content snippets for headless environments using OAuth tokens, supporting unread filtering and result limits.

Instructions

Get the most recent emails from Gmail (returns metadata, snippets, and first 1k chars of body)

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
google_access_tokenNoGoogle OAuth2 access token
max_resultsNoMaximum number of emails to return (default: 10)
unread_onlyNoWhether to return only unread emails (default: False)

Implementation Reference

  • Core handler implementing the Gmail API calls to fetch recent emails from INBOX, extract headers, snippets, and first 1000 characters of plain text body.
    def get_recent_emails(self, max_results: int = 10, unread_only: bool = False) -> str:
        """Get the most recent emails from Gmail
        
        Args:
            max_results: Maximum number of emails to return (default: 10)
            unread_only: Whether to return only unread emails (default: False)
            
        Returns:
            JSON string with an array of emails containing metadata, snippets, and first 1k chars of body
        """
        try:
            # Check if service is initialized
            if not hasattr(self, 'service'):
                logger.error("Gmail service not initialized. No valid access token provided.")
                return json.dumps({
                    "error": "No valid access token provided. Please refresh your token first.",
                    "status": "error"
                })
                
            # Define the operation
            def _operation():
                logger.debug(f"Fetching up to {max_results} recent emails from Gmail")
                
                # Get list of recent messages
                query = 'is:unread' if unread_only else ''
                logger.debug(f"Calling Gmail API to list messages from INBOX with query: '{query}'")
                
                try:
                    response = self.service.users().messages().list(
                        userId='me',
                        maxResults=max_results,
                        labelIds=['INBOX'],
                        q=query
                    ).execute()
                    
                    logger.debug(f"API Response received: {json.dumps(response)[:200]}...")
                except Exception as e:
                    logger.error(f"Error calling Gmail API list: {str(e)}", exc_info=True)
                    return json.dumps({"error": f"Gmail API list error: {str(e)}"})
                
                messages = response.get('messages', [])
                
                if not messages:
                    logger.debug("No messages found in the response")
                    return json.dumps({"emails": []})
                
                logger.debug(f"Found {len(messages)} messages, processing details")
                
                # Fetch detailed information for each message
                emails = []
                for i, message in enumerate(messages):
                    logger.debug(f"Fetching details for message {i+1}/{len(messages)}, ID: {message['id']}")
                    msg = self.service.users().messages().get(
                        userId='me',
                        id=message['id'],
                        format='full'
                    ).execute()
                    
                    logger.debug(f"Message {message['id']} details received, extracting fields")
                    
                    # Extract headers
                    headers = {}
                    if 'payload' in msg and 'headers' in msg['payload']:
                        for header in msg['payload']['headers']:
                            name = header.get('name', '').lower()
                            if name in ['from', 'to', 'subject', 'date']:
                                headers[name] = header.get('value', '')
                    else:
                        logger.debug(f"Message {message['id']} missing payload or headers fields: {json.dumps(msg)[:200]}...")
                    
                    # Extract plain text body and size
                    body_text = ""
                    body_size_bytes = 0
                    contains_full_body = True
                    
                    if 'payload' in msg:
                        body_text, body_size_bytes = self.extract_plain_text_body(msg['payload'])
                        
                        # Check if we're returning the full body or truncating
                        if len(body_text) > 1000:
                            body_text = body_text[:1000]
                            contains_full_body = False
                    
                    # Format the email
                    email_data = {
                        "id": msg['id'],
                        "threadId": msg['threadId'],
                        "labelIds": msg.get('labelIds', []),
                        "snippet": msg.get('snippet', ''),
                        "from": headers.get('from', ''),
                        "to": headers.get('to', ''),
                        "subject": headers.get('subject', ''),
                        "date": headers.get('date', ''),
                        "internalDate": msg.get('internalDate', ''),
                        "body": body_text,
                        "body_size_bytes": body_size_bytes,
                        "contains_full_body": contains_full_body
                    }
                    
                    logger.debug(f"Successfully processed message {message['id']}")
                    emails.append(email_data)
                
                logger.debug(f"Successfully processed {len(emails)} emails")
                return json.dumps({"emails": convert_datetime_fields(emails)})
            
            # Execute the operation with token refresh handling
            return self._handle_token_refresh(_operation)
            
        except HttpError as e:
            logger.error(f"Gmail API Exception: {str(e)}")
            return json.dumps({"error": str(e)})
        except Exception as e:
            logger.error(f"Exception in get_recent_emails: {str(e)}", exc_info=True)
            return json.dumps({"error": str(e)})
  • Tool registration in MCP server's list_tools() method, including input schema definition.
        name="gmail_get_recent_emails",
        description="Get the most recent emails from Gmail (returns metadata, snippets, and first 1k chars of body)",
        inputSchema={
            "type": "object",
            "properties": {
                "google_access_token": {"type": "string", "description": "Google OAuth2 access token"},
                "max_results": {"type": "integer", "description": "Maximum number of emails to return (default: 10)"},
                "unread_only": {"type": "boolean", "description": "Whether to return only unread emails (default: False)"}
            },
            "required": []
        },
    ),
  • MCP server tool dispatch handler that instantiates GmailClient and calls the get_recent_emails method with parsed arguments.
    if name == "gmail_get_recent_emails":
        # Initialize Gmail client with just access token
        logger.debug(f"Initializing Gmail client for get_recent_emails with access token: {access_token[:10]}...")
        try:
            gmail = GmailClient(
                access_token=access_token
            )
            logger.debug("Gmail client initialized successfully")
            
            max_results = int(arguments.get("max_results", 10))
            unread_only = bool(arguments.get("unread_only", False))
            logger.debug(f"Calling get_recent_emails with max_results={max_results} and unread_only={unread_only}")
            results = gmail.get_recent_emails(max_results=max_results, unread_only=unread_only)
            logger.debug(f"get_recent_emails result (first 200 chars): {results[:200]}...")
            return [types.TextContent(type="text", text=results)]
        except Exception as e:
            logger.error(f"Exception in gmail_get_recent_emails handler: {str(e)}", exc_info=True)
            return [types.TextContent(type="text", text=f"Error: {str(e)}")]
  • Helper utility to recursively extract plain text body from Gmail message payload parts, decoding base64url data.
    def extract_plain_text_body(self, msg_payload):
        """Extract plain text body from message payload
        
        Args:
            msg_payload: Gmail API message payload
            
        Returns:
            tuple: (plain_text_body, body_size_in_bytes)
        """
        body_text = ""
        body_size = 0
        
        # Helper function to process message parts recursively
        def extract_from_parts(parts):
            nonlocal body_text, body_size
            
            if not parts:
                return
                
            for part in parts:
                mime_type = part.get('mimeType', '')
                
                # If this part is plain text
                if mime_type == 'text/plain':
                    body_data = part.get('body', {}).get('data', '')
                    if body_data:
                        # Decode base64url encoded data
                        decoded_bytes = base64.urlsafe_b64decode(body_data)
                        body_size += len(decoded_bytes)
                        body_part = decoded_bytes.decode('utf-8', errors='replace')
                        body_text += body_part
                
                # If this part has child parts, process them
                if 'parts' in part:
                    extract_from_parts(part['parts'])
        
        # If body data is directly in the payload
        if 'body' in msg_payload and 'data' in msg_payload['body']:
            body_data = msg_payload['body']['data']
            if body_data:
                decoded_bytes = base64.urlsafe_b64decode(body_data)
                body_size += len(decoded_bytes)
                body_text = decoded_bytes.decode('utf-8', errors='replace')
        
        # If message has parts, process them
        if 'parts' in msg_payload:
            extract_from_parts(msg_payload['parts'])
            
        return body_text, body_size
  • JSON schema definition for the tool's input parameters.
    inputSchema={
        "type": "object",
        "properties": {
            "google_access_token": {"type": "string", "description": "Google OAuth2 access token"},
            "max_results": {"type": "integer", "description": "Maximum number of emails to return (default: 10)"},
            "unread_only": {"type": "boolean", "description": "Whether to return only unread emails (default: False)"}
        },
        "required": []
    },

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/AbhinavBansal17/mcp-headless-gmail'

If you have feedback or need assistance with the MCP directory API, please join our Discord server