Skip to main content
Glama
taylorwilsdon

Google Workspace MCP Server - Control Gmail, Calendar, Docs, Sheets, Slides, Chat, Forms & Drive

get_gmail_message_content

Retrieve the complete content of a specific Gmail message, including subject, sender, and body text, using the message ID and user email.

Instructions

Retrieves the full content (subject, sender, plain text body) of a specific Gmail message.

Args:
    message_id (str): The unique ID of the Gmail message to retrieve.
    user_google_email (str): The user's Google email address. Required.

Returns:
    str: The message details including subject, sender, and body content.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
message_idYes
serviceYes
user_google_emailYes

Implementation Reference

  • The primary handler function for the 'get_gmail_message_content' tool. It fetches message metadata and full payload using Gmail API, extracts headers, bodies (text/HTML), and attachments, then formats and returns the message content.
    @server.tool()
    @handle_http_errors("get_gmail_message_content", is_read_only=True, service_type="gmail")
    @require_google_service("gmail", "gmail_read")
    async def get_gmail_message_content(
        service, message_id: str, user_google_email: str
    ) -> str:
        """
        Retrieves the full content (subject, sender, recipients, plain text body) of a specific Gmail message.
    
        Args:
            message_id (str): The unique ID of the Gmail message to retrieve.
            user_google_email (str): The user's Google email address. Required.
    
        Returns:
            str: The message details including subject, sender, recipients (To, Cc), and body content.
        """
        logger.info(
            f"[get_gmail_message_content] Invoked. Message ID: '{message_id}', Email: '{user_google_email}'"
        )
    
        logger.info(f"[get_gmail_message_content] Using service for: {user_google_email}")
    
        # Fetch message metadata first to get headers
        message_metadata = await asyncio.to_thread(
            service.users()
            .messages()
            .get(
                userId="me",
                id=message_id,
                format="metadata",
                metadataHeaders=["Subject", "From", "To", "Cc"],
            )
            .execute
        )
    
        headers = {
            h["name"]: h["value"]
            for h in message_metadata.get("payload", {}).get("headers", [])
        }
        subject = headers.get("Subject", "(no subject)")
        sender = headers.get("From", "(unknown sender)")
        to = headers.get("To", "")
        cc = headers.get("Cc", "")
    
        # Now fetch the full message to get the body parts
        message_full = await asyncio.to_thread(
            service.users()
            .messages()
            .get(
                userId="me",
                id=message_id,
                format="full",  # Request full payload for body
            )
            .execute
        )
    
        # Extract both text and HTML bodies using enhanced helper function
        payload = message_full.get("payload", {})
        bodies = _extract_message_bodies(payload)
        text_body = bodies.get("text", "")
        html_body = bodies.get("html", "")
    
        # Format body content with HTML fallback
        body_data = _format_body_content(text_body, html_body)
    
        # Extract attachment metadata
        attachments = _extract_attachments(payload)
    
        content_lines = [
            f"Subject: {subject}",
            f"From:    {sender}",
        ]
    
        if to:
            content_lines.append(f"To:      {to}")
        if cc:
            content_lines.append(f"Cc:      {cc}")
    
        content_lines.append(f"\n--- BODY ---\n{body_data or '[No text/plain body found]'}")
    
        # Add attachment information if present
        if attachments:
            content_lines.append("\n--- ATTACHMENTS ---")
            for i, att in enumerate(attachments, 1):
                size_kb = att['size'] / 1024
                content_lines.append(
                    f"{i}. {att['filename']} ({att['mimeType']}, {size_kb:.1f} KB)\n"
                    f"   Attachment ID: {att['attachmentId']}\n"
                    f"   Use get_gmail_attachment_content(message_id='{message_id}', attachment_id='{att['attachmentId']}') to download"
                )
    
        return "\n".join(content_lines)
  • Key helper function used by the handler to extract plain text and HTML bodies from the Gmail message payload by traversing MIME parts.
    def _extract_message_bodies(payload):
        """
        Helper function to extract both plain text and HTML bodies from a Gmail message payload.
    
        Args:
            payload (dict): The message payload from Gmail API
    
        Returns:
            dict: Dictionary with 'text' and 'html' keys containing body content
        """
        text_body = ""
        html_body = ""
        parts = [payload] if "parts" not in payload else payload.get("parts", [])
    
        part_queue = list(parts)  # Use a queue for BFS traversal of parts
        while part_queue:
            part = part_queue.pop(0)
            mime_type = part.get("mimeType", "")
            body_data = part.get("body", {}).get("data")
    
            if body_data:
                try:
                    decoded_data = base64.urlsafe_b64decode(body_data).decode("utf-8", errors="ignore")
                    if mime_type == "text/plain" and not text_body:
                        text_body = decoded_data
                    elif mime_type == "text/html" and not html_body:
                        html_body = decoded_data
                except Exception as e:
                    logger.warning(f"Failed to decode body part: {e}")
    
            # Add sub-parts to queue for multipart messages
            if mime_type.startswith("multipart/") and "parts" in part:
                part_queue.extend(part.get("parts", []))
    
        # Check the main payload if it has body data directly
        if payload.get("body", {}).get("data"):
            try:
                decoded_data = base64.urlsafe_b64decode(payload["body"]["data"]).decode("utf-8", errors="ignore")
                mime_type = payload.get("mimeType", "")
                if mime_type == "text/plain" and not text_body:
                    text_body = decoded_data
                elif mime_type == "text/html" and not html_body:
                    html_body = decoded_data
            except Exception as e:
                logger.warning(f"Failed to decode main payload body: {e}")
    
        return {
            "text": text_body,
            "html": html_body
        }
  • Helper function used to format the extracted body content, preferring plain text, falling back to HTML (truncated if too long).
    def _format_body_content(text_body: str, html_body: str) -> str:
        """
        Helper function to format message body content with HTML fallback and truncation.
    
        Args:
            text_body: Plain text body content
            html_body: HTML body content
    
        Returns:
            Formatted body content string
        """
        if text_body.strip():
            return text_body
        elif html_body.strip():
            # Truncate very large HTML to keep responses manageable
            if len(html_body) > HTML_BODY_TRUNCATE_LIMIT:
                html_body = html_body[:HTML_BODY_TRUNCATE_LIMIT] + "\n\n[HTML content truncated...]"
            return f"[HTML Content Converted]\n{html_body}"
        else:
            return "[No readable content found]"
  • Helper function to recursively extract attachment metadata (filename, MIME type, size, attachment ID) from the message payload.
    def _extract_attachments(payload: dict) -> List[Dict[str, Any]]:
        """
        Extract attachment metadata from a Gmail message payload.
    
        Args:
            payload: The message payload from Gmail API
    
        Returns:
            List of attachment dictionaries with filename, mimeType, size, and attachmentId
        """
        attachments = []
    
        def search_parts(part):
            """Recursively search for attachments in message parts"""
            # Check if this part is an attachment
            if part.get("filename") and part.get("body", {}).get("attachmentId"):
                attachments.append({
                    "filename": part["filename"],
                    "mimeType": part.get("mimeType", "application/octet-stream"),
                    "size": part.get("body", {}).get("size", 0),
                    "attachmentId": part["body"]["attachmentId"]
                })
    
            # Recursively search sub-parts
            if "parts" in part:
                for subpart in part["parts"]:
                    search_parts(subpart)
    
        # Start searching from the root payload
        search_parts(payload)
        return attachments
  • The @server.tool() decorator registers this function as an MCP tool named 'get_gmail_message_content', with additional decorators for error handling and auth.
    @server.tool()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/taylorwilsdon/google_workspace_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server