Skip to main content
Glama

gmail_get_email_body_chunk

Retrieve a 1,000-character segment of a Gmail email body from a specified starting point to handle large messages in manageable portions.

Instructions

Get a 1k character chunk of an email body starting from the specified offset

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
google_access_tokenYesGoogle OAuth2 access token
message_idNoID of the message to retrieve
thread_idNoID of the thread to retrieve (will get the first message if multiple exist)
offsetNoOffset in characters to start from (default: 0)

Implementation Reference

  • Core handler function in GmailClient class that retrieves a 1KB chunk of the email body starting from the specified offset. Handles thread_id resolution to message_id, extracts plain text body using helper, applies offset, and returns JSON with chunk and metadata.
    def get_email_body_chunk(self, message_id: str = None, thread_id: str = None, offset: int = 0) -> str:
        """Get a chunk of the email body
        
        Args:
            message_id: ID of the message to retrieve
            thread_id: ID of the thread to retrieve (will get the first message if multiple exist)
            offset: Offset in characters to start from (default: 0)
            
        Returns:
            JSON string with the body chunk and metadata
        """
        try:
            # Check if service is initialized
            if not hasattr(self, 'service'):
                logger.error("Gmail service not initialized. No valid access token provided.")
                return json.dumps({
                    "error": "No valid access token provided. Please refresh your token first.",
                    "status": "error"
                })
                
            # Define the operation
            def _operation():
                logger.debug(f"Fetching email body chunk with offset {offset}")
                
                # Store message_id in local variable to make it accessible within _operation scope
                local_message_id = message_id
                local_thread_id = thread_id
                
                # Validate inputs
                if not local_message_id and not local_thread_id:
                    return json.dumps({
                        "error": "Either message_id or thread_id must be provided",
                        "status": "error"
                    })
                
                try:
                    # If thread_id is provided but not message_id, get the first message in thread
                    if local_thread_id and not local_message_id:
                        logger.debug(f"Getting messages in thread {local_thread_id}")
                        thread = self.service.users().threads().get(
                            userId='me',
                            id=local_thread_id
                        ).execute()
                        
                        if not thread or 'messages' not in thread or not thread['messages']:
                            return json.dumps({
                                "error": f"No messages found in thread {local_thread_id}",
                                "status": "error"
                            })
                            
                        # Use the first message in the thread
                        local_message_id = thread['messages'][0]['id']
                        logger.debug(f"Using first message {local_message_id} from thread {local_thread_id}")
                    
                    # Get the message
                    logger.debug(f"Getting message {local_message_id}")
                    msg = self.service.users().messages().get(
                        userId='me',
                        id=local_message_id,
                        format='full'
                    ).execute()
                    
                    # Extract the full plain text body
                    body_text = ""
                    body_size_bytes = 0
                    
                    if 'payload' in msg:
                        body_text, body_size_bytes = self.extract_plain_text_body(msg['payload'])
                    
                    # Apply offset and get chunk
                    if offset >= len(body_text):
                        chunk = ""
                    else:
                        chunk = body_text[offset:offset+1000]
                    
                    # Determine if this contains the full remaining body
                    contains_full_body = (offset + len(chunk) >= len(body_text))
                    
                    return json.dumps({
                        "message_id": local_message_id,
                        "thread_id": msg.get('threadId', ''),
                        "body": chunk,
                        "body_size_bytes": body_size_bytes,
                        "offset": offset,
                        "chunk_size": len(chunk),
                        "contains_full_body": contains_full_body,
                        "status": "success"
                    })
                    
                except Exception as e:
                    logger.error(f"Error processing message: {str(e)}", exc_info=True)
                    return json.dumps({
                        "error": f"Error processing message: {str(e)}",
                        "status": "error"
                    })
            
            # Execute the operation with token refresh handling
            return self._handle_token_refresh(_operation)
            
        except HttpError as e:
            logger.error(f"Gmail API Exception: {str(e)}")
            return json.dumps({"error": str(e)})
        except Exception as e:
            logger.error(f"Exception in get_email_body_chunk: {str(e)}", exc_info=True)
            return json.dumps({"error": str(e)})
  • Tool registration in list_tools() handler, including name, description, and input schema definition.
    types.Tool(
        name="gmail_get_email_body_chunk",
        description="Get a 1k character chunk of an email body starting from the specified offset",
        inputSchema={
            "type": "object",
            "properties": {
                "google_access_token": {"type": "string", "description": "Google OAuth2 access token"},
                "message_id": {"type": "string", "description": "ID of the message to retrieve"},
                "thread_id": {"type": "string", "description": "ID of the thread to retrieve (will get the first message if multiple exist)"},
                "offset": {"type": "integer", "description": "Offset in characters to start from (default: 0)"}
            },
            "required": ["google_access_token"]
        },
    ),
  • Dispatch handler within @server.call_tool() that extracts arguments, initializes GmailClient, and invokes the core get_email_body_chunk method.
    elif name == "gmail_get_email_body_chunk":
        # Initialize Gmail client with just access token
        gmail = GmailClient(
            access_token=access_token
        )
        
        message_id = arguments.get("message_id")
        thread_id = arguments.get("thread_id")
        offset = int(arguments.get("offset", 0))
        
        if not message_id and not thread_id:
            raise ValueError("Either message_id or thread_id must be provided")
        
        results = gmail.get_email_body_chunk(message_id=message_id, thread_id=thread_id, offset=offset)
        return [types.TextContent(type="text", text=results)]
  • Helper method used by the handler to recursively extract plain text body from Gmail message payload parts, decoding base64url data and calculating size.
    def extract_plain_text_body(self, msg_payload):
        """Extract plain text body from message payload
        
        Args:
            msg_payload: Gmail API message payload
            
        Returns:
            tuple: (plain_text_body, body_size_in_bytes)
        """
        body_text = ""
        body_size = 0
        
        # Helper function to process message parts recursively
        def extract_from_parts(parts):
            nonlocal body_text, body_size
            
            if not parts:
                return
                
            for part in parts:
                mime_type = part.get('mimeType', '')
                
                # If this part is plain text
                if mime_type == 'text/plain':
                    body_data = part.get('body', {}).get('data', '')
                    if body_data:
                        # Decode base64url encoded data
                        decoded_bytes = base64.urlsafe_b64decode(body_data)
                        body_size += len(decoded_bytes)
                        body_part = decoded_bytes.decode('utf-8', errors='replace')
                        body_text += body_part
                
                # If this part has child parts, process them
                if 'parts' in part:
                    extract_from_parts(part['parts'])
        
        # If body data is directly in the payload
        if 'body' in msg_payload and 'data' in msg_payload['body']:
            body_data = msg_payload['body']['data']
            if body_data:
                decoded_bytes = base64.urlsafe_b64decode(body_data)
                body_size += len(decoded_bytes)
                body_text = decoded_bytes.decode('utf-8', errors='replace')
        
        # If message has parts, process them
        if 'parts' in msg_payload:
            extract_from_parts(msg_payload['parts'])
            
        return body_text, body_size

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/baryhuang/mcp-headless-gmail'

If you have feedback or need assistance with the MCP directory API, please join our Discord server