Skip to main content
Glama
AbhinavBansal17

MCP Headless Gmail Server

gmail_get_email_body_chunk

Retrieve a 1k character segment of a Gmail email body from a specified starting point for processing in headless environments using OAuth tokens.

Instructions

Get a 1k character chunk of an email body starting from the specified offset

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
google_access_tokenNoGoogle OAuth2 access token
message_idNoID of the message to retrieve
thread_idNoID of the thread to retrieve (will get the first message if multiple exist)
offsetNoOffset in characters to start from (default: 0)

Implementation Reference

  • Core handler function in GmailClient class that fetches the email message, extracts plain text body, and returns a 1k character chunk starting from the given offset. Handles thread_id by getting first message, token refresh, and returns JSON with metadata.
    def get_email_body_chunk(self, message_id: str = None, thread_id: str = None, offset: int = 0) -> str:
        """Get a chunk of the email body
        
        Args:
            message_id: ID of the message to retrieve
            thread_id: ID of the thread to retrieve (will get the first message if multiple exist)
            offset: Offset in characters to start from (default: 0)
            
        Returns:
            JSON string with the body chunk and metadata
        """
        try:
            # Check if service is initialized
            if not hasattr(self, 'service'):
                logger.error("Gmail service not initialized. No valid access token provided.")
                return json.dumps({
                    "error": "No valid access token provided. Please refresh your token first.",
                    "status": "error"
                })
                
            # Define the operation
            def _operation():
                logger.debug(f"Fetching email body chunk with offset {offset}")
                
                # Store message_id in local variable to make it accessible within _operation scope
                local_message_id = message_id
                local_thread_id = thread_id
                
                # Validate inputs
                if not local_message_id and not local_thread_id:
                    return json.dumps({
                        "error": "Either message_id or thread_id must be provided",
                        "status": "error"
                    })
                
                try:
                    # If thread_id is provided but not message_id, get the first message in thread
                    if local_thread_id and not local_message_id:
                        logger.debug(f"Getting messages in thread {local_thread_id}")
                        thread = self.service.users().threads().get(
                            userId='me',
                            id=local_thread_id
                        ).execute()
                        
                        if not thread or 'messages' not in thread or not thread['messages']:
                            return json.dumps({
                                "error": f"No messages found in thread {local_thread_id}",
                                "status": "error"
                            })
                            
                        # Use the first message in the thread
                        local_message_id = thread['messages'][0]['id']
                        logger.debug(f"Using first message {local_message_id} from thread {local_thread_id}")
                    
                    # Get the message
                    logger.debug(f"Getting message {local_message_id}")
                    msg = self.service.users().messages().get(
                        userId='me',
                        id=local_message_id,
                        format='full'
                    ).execute()
                    
                    # Extract the full plain text body
                    body_text = ""
                    body_size_bytes = 0
                    
                    if 'payload' in msg:
                        body_text, body_size_bytes = self.extract_plain_text_body(msg['payload'])
                    
                    # Apply offset and get chunk
                    if offset >= len(body_text):
                        chunk = ""
                    else:
                        chunk = body_text[offset:offset+1000]
                    
                    # Determine if this contains the full remaining body
                    contains_full_body = (offset + len(chunk) >= len(body_text))
                    
                    return json.dumps({
                        "message_id": local_message_id,
                        "thread_id": msg.get('threadId', ''),
                        "body": chunk,
                        "body_size_bytes": body_size_bytes,
                        "offset": offset,
                        "chunk_size": len(chunk),
                        "contains_full_body": contains_full_body,
                        "status": "success"
                    })
                    
                except Exception as e:
                    logger.error(f"Error processing message: {str(e)}", exc_info=True)
                    return json.dumps({
                        "error": f"Error processing message: {str(e)}",
                        "status": "error"
                    })
            
            # Execute the operation with token refresh handling
            return self._handle_token_refresh(_operation)
            
        except HttpError as e:
            logger.error(f"Gmail API Exception: {str(e)}")
            return json.dumps({"error": str(e)})
        except Exception as e:
            logger.error(f"Exception in get_email_body_chunk: {str(e)}", exc_info=True)
            return json.dumps({"error": str(e)})
  • Tool registration in list_tools() with name, description, and input schema definition.
    types.Tool(
        name="gmail_get_email_body_chunk",
        description="Get a 1k character chunk of an email body starting from the specified offset",
        inputSchema={
            "type": "object",
            "properties": {
                "google_access_token": {"type": "string", "description": "Google OAuth2 access token"},
                "message_id": {"type": "string", "description": "ID of the message to retrieve"},
                "thread_id": {"type": "string", "description": "ID of the thread to retrieve (will get the first message if multiple exist)"},
                "offset": {"type": "integer", "description": "Offset in characters to start from (default: 0)"}
            },
            "required": []
        },
    ),
  • MCP server call_tool dispatch handler that initializes GmailClient and calls the core get_email_body_chunk method.
    elif name == "gmail_get_email_body_chunk":
        # Initialize Gmail client with just access token
        gmail = GmailClient(
            access_token=access_token
        )
        
        message_id = arguments.get("message_id")
        thread_id = arguments.get("thread_id")
        offset = int(arguments.get("offset", 0))
        
        if not message_id and not thread_id:
            raise ValueError("Either message_id or thread_id must be provided")
        
        results = gmail.get_email_body_chunk(message_id=message_id, thread_id=thread_id, offset=offset)
        return [types.TextContent(type="text", text=results)]
  • Helper function to recursively extract plain text body from Gmail message payload, decoding base64 parts and accumulating text and size.
    """Extract plain text body from message payload
    
    Args:
        msg_payload: Gmail API message payload
        
    Returns:
        tuple: (plain_text_body, body_size_in_bytes)
    """
    body_text = ""
    body_size = 0
    
    # Helper function to process message parts recursively
    def extract_from_parts(parts):
        nonlocal body_text, body_size
        
        if not parts:
            return
            
        for part in parts:
            mime_type = part.get('mimeType', '')
            
            # If this part is plain text
            if mime_type == 'text/plain':
                body_data = part.get('body', {}).get('data', '')
                if body_data:
                    # Decode base64url encoded data
                    decoded_bytes = base64.urlsafe_b64decode(body_data)
                    body_size += len(decoded_bytes)
                    body_part = decoded_bytes.decode('utf-8', errors='replace')
                    body_text += body_part
            
            # If this part has child parts, process them
            if 'parts' in part:
                extract_from_parts(part['parts'])
    
    # If body data is directly in the payload
    if 'body' in msg_payload and 'data' in msg_payload['body']:
        body_data = msg_payload['body']['data']
        if body_data:
            decoded_bytes = base64.urlsafe_b64decode(body_data)
            body_size += len(decoded_bytes)
            body_text = decoded_bytes.decode('utf-8', errors='replace')
    
    # If message has parts, process them
    if 'parts' in msg_payload:
        extract_from_parts(msg_payload['parts'])
        
    return body_text, body_size
  • JavaScript equivalent core handler in GmailClient class for fetching email body chunk.
    async getEmailBodyChunk({ message_id, thread_id, offset = 0 }) {
      const operation = async () => {
        if (!this.gmail) {
          throw new Error('Gmail service not initialized. No valid access token provided.');
        }
        let local_message_id = message_id;
        if (!local_message_id && thread_id) {
          const thread = await this.gmail.users.threads.get({ userId: 'me', id: thread_id });
          if (!thread.data.messages || !thread.data.messages.length) {
            return JSON.stringify({
              error: `No messages found in thread ${thread_id}`,
              status: 'error'
            });
          }
          local_message_id = thread.data.messages[0].id;
        }
        if (!local_message_id) {
          return JSON.stringify({
            error: 'Either message_id or thread_id must be provided',
            status: 'error'
          });
        }
        const msg = await this.gmail.users.messages.get({
          userId: 'me',
          id: local_message_id,
          format: 'full'
        });
        const payload = msg.data.payload || {};
        const { body, body_size_bytes } = this.extractPlainTextBody(payload);
        const chunk = offset >= body.length ? '' : body.slice(offset, offset + 1000);
        const contains_full_body = (offset + chunk.length >= body.length);
        return JSON.stringify({
          message_id: local_message_id,
          thread_id: msg.data.threadId,
          body: chunk,
          body_size_bytes,
          offset,
          chunk_size: chunk.length,
          contains_full_body,
          status: 'success'
        });
      };
      return await this._handleTokenRefresh(operation);
    }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/AbhinavBansal17/mcp-headless-gmail'

If you have feedback or need assistance with the MCP directory API, please join our Discord server