Skip to main content
Glama
AbhinavBansal17

MCP Headless Gmail Server

gmail_get_email_body_chunk

Retrieve a 1k character segment of a Gmail email body from a specified starting point for processing in headless environments using OAuth tokens.

Instructions

Get a 1k character chunk of an email body starting from the specified offset

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
google_access_tokenNoGoogle OAuth2 access token
message_idNoID of the message to retrieve
thread_idNoID of the thread to retrieve (will get the first message if multiple exist)
offsetNoOffset in characters to start from (default: 0)

Implementation Reference

  • Core handler function in GmailClient class that fetches the email message, extracts plain text body, and returns a 1k character chunk starting from the given offset. Handles thread_id by getting first message, token refresh, and returns JSON with metadata.
    def get_email_body_chunk(self, message_id: str = None, thread_id: str = None, offset: int = 0) -> str: """Get a chunk of the email body Args: message_id: ID of the message to retrieve thread_id: ID of the thread to retrieve (will get the first message if multiple exist) offset: Offset in characters to start from (default: 0) Returns: JSON string with the body chunk and metadata """ try: # Check if service is initialized if not hasattr(self, 'service'): logger.error("Gmail service not initialized. No valid access token provided.") return json.dumps({ "error": "No valid access token provided. Please refresh your token first.", "status": "error" }) # Define the operation def _operation(): logger.debug(f"Fetching email body chunk with offset {offset}") # Store message_id in local variable to make it accessible within _operation scope local_message_id = message_id local_thread_id = thread_id # Validate inputs if not local_message_id and not local_thread_id: return json.dumps({ "error": "Either message_id or thread_id must be provided", "status": "error" }) try: # If thread_id is provided but not message_id, get the first message in thread if local_thread_id and not local_message_id: logger.debug(f"Getting messages in thread {local_thread_id}") thread = self.service.users().threads().get( userId='me', id=local_thread_id ).execute() if not thread or 'messages' not in thread or not thread['messages']: return json.dumps({ "error": f"No messages found in thread {local_thread_id}", "status": "error" }) # Use the first message in the thread local_message_id = thread['messages'][0]['id'] logger.debug(f"Using first message {local_message_id} from thread {local_thread_id}") # Get the message logger.debug(f"Getting message {local_message_id}") msg = self.service.users().messages().get( userId='me', id=local_message_id, format='full' ).execute() # Extract the full plain text body body_text = "" body_size_bytes = 0 if 'payload' in msg: body_text, body_size_bytes = self.extract_plain_text_body(msg['payload']) # Apply offset and get chunk if offset >= len(body_text): chunk = "" else: chunk = body_text[offset:offset+1000] # Determine if this contains the full remaining body contains_full_body = (offset + len(chunk) >= len(body_text)) return json.dumps({ "message_id": local_message_id, "thread_id": msg.get('threadId', ''), "body": chunk, "body_size_bytes": body_size_bytes, "offset": offset, "chunk_size": len(chunk), "contains_full_body": contains_full_body, "status": "success" }) except Exception as e: logger.error(f"Error processing message: {str(e)}", exc_info=True) return json.dumps({ "error": f"Error processing message: {str(e)}", "status": "error" }) # Execute the operation with token refresh handling return self._handle_token_refresh(_operation) except HttpError as e: logger.error(f"Gmail API Exception: {str(e)}") return json.dumps({"error": str(e)}) except Exception as e: logger.error(f"Exception in get_email_body_chunk: {str(e)}", exc_info=True) return json.dumps({"error": str(e)})
  • Tool registration in list_tools() with name, description, and input schema definition.
    types.Tool( name="gmail_get_email_body_chunk", description="Get a 1k character chunk of an email body starting from the specified offset", inputSchema={ "type": "object", "properties": { "google_access_token": {"type": "string", "description": "Google OAuth2 access token"}, "message_id": {"type": "string", "description": "ID of the message to retrieve"}, "thread_id": {"type": "string", "description": "ID of the thread to retrieve (will get the first message if multiple exist)"}, "offset": {"type": "integer", "description": "Offset in characters to start from (default: 0)"} }, "required": [] }, ),
  • MCP server call_tool dispatch handler that initializes GmailClient and calls the core get_email_body_chunk method.
    elif name == "gmail_get_email_body_chunk": # Initialize Gmail client with just access token gmail = GmailClient( access_token=access_token ) message_id = arguments.get("message_id") thread_id = arguments.get("thread_id") offset = int(arguments.get("offset", 0)) if not message_id and not thread_id: raise ValueError("Either message_id or thread_id must be provided") results = gmail.get_email_body_chunk(message_id=message_id, thread_id=thread_id, offset=offset) return [types.TextContent(type="text", text=results)]
  • Helper function to recursively extract plain text body from Gmail message payload, decoding base64 parts and accumulating text and size.
    """Extract plain text body from message payload Args: msg_payload: Gmail API message payload Returns: tuple: (plain_text_body, body_size_in_bytes) """ body_text = "" body_size = 0 # Helper function to process message parts recursively def extract_from_parts(parts): nonlocal body_text, body_size if not parts: return for part in parts: mime_type = part.get('mimeType', '') # If this part is plain text if mime_type == 'text/plain': body_data = part.get('body', {}).get('data', '') if body_data: # Decode base64url encoded data decoded_bytes = base64.urlsafe_b64decode(body_data) body_size += len(decoded_bytes) body_part = decoded_bytes.decode('utf-8', errors='replace') body_text += body_part # If this part has child parts, process them if 'parts' in part: extract_from_parts(part['parts']) # If body data is directly in the payload if 'body' in msg_payload and 'data' in msg_payload['body']: body_data = msg_payload['body']['data'] if body_data: decoded_bytes = base64.urlsafe_b64decode(body_data) body_size += len(decoded_bytes) body_text = decoded_bytes.decode('utf-8', errors='replace') # If message has parts, process them if 'parts' in msg_payload: extract_from_parts(msg_payload['parts']) return body_text, body_size
  • JavaScript equivalent core handler in GmailClient class for fetching email body chunk.
    async getEmailBodyChunk({ message_id, thread_id, offset = 0 }) { const operation = async () => { if (!this.gmail) { throw new Error('Gmail service not initialized. No valid access token provided.'); } let local_message_id = message_id; if (!local_message_id && thread_id) { const thread = await this.gmail.users.threads.get({ userId: 'me', id: thread_id }); if (!thread.data.messages || !thread.data.messages.length) { return JSON.stringify({ error: `No messages found in thread ${thread_id}`, status: 'error' }); } local_message_id = thread.data.messages[0].id; } if (!local_message_id) { return JSON.stringify({ error: 'Either message_id or thread_id must be provided', status: 'error' }); } const msg = await this.gmail.users.messages.get({ userId: 'me', id: local_message_id, format: 'full' }); const payload = msg.data.payload || {}; const { body, body_size_bytes } = this.extractPlainTextBody(payload); const chunk = offset >= body.length ? '' : body.slice(offset, offset + 1000); const contains_full_body = (offset + chunk.length >= body.length); return JSON.stringify({ message_id: local_message_id, thread_id: msg.data.threadId, body: chunk, body_size_bytes, offset, chunk_size: chunk.length, contains_full_body, status: 'success' }); }; return await this._handleTokenRefresh(operation); }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/AbhinavBansal17/mcp-headless-gmail'

If you have feedback or need assistance with the MCP directory API, please join our Discord server