Skip to main content
Glama
AbhinavBansal17

MCP Headless Gmail Server

gmail_get_email_body_chunk

Retrieve specific 1k character segments from Gmail email bodies by specifying offset positions for processing large messages in headless environments.

Instructions

Get a 1k character chunk of an email body starting from the specified offset

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
google_access_tokenNoGoogle OAuth2 access token
message_idNoID of the message to retrieve
thread_idNoID of the thread to retrieve (will get the first message if multiple exist)
offsetNoOffset in characters to start from (default: 0)

Implementation Reference

  • Core handler function in GmailClient class that implements the tool logic: fetches email message by ID or thread, extracts full plain text body, applies offset for chunking (1k chars), and returns JSON with metadata including whether full body is contained.
    def get_email_body_chunk(self, message_id: str = None, thread_id: str = None, offset: int = 0) -> str: """Get a chunk of the email body Args: message_id: ID of the message to retrieve thread_id: ID of the thread to retrieve (will get the first message if multiple exist) offset: Offset in characters to start from (default: 0) Returns: JSON string with the body chunk and metadata """ try: # Check if service is initialized if not hasattr(self, 'service'): logger.error("Gmail service not initialized. No valid access token provided.") return json.dumps({ "error": "No valid access token provided. Please refresh your token first.", "status": "error" }) # Define the operation def _operation(): logger.debug(f"Fetching email body chunk with offset {offset}") # Store message_id in local variable to make it accessible within _operation scope local_message_id = message_id local_thread_id = thread_id # Validate inputs if not local_message_id and not local_thread_id: return json.dumps({ "error": "Either message_id or thread_id must be provided", "status": "error" }) try: # If thread_id is provided but not message_id, get the first message in thread if local_thread_id and not local_message_id: logger.debug(f"Getting messages in thread {local_thread_id}") thread = self.service.users().threads().get( userId='me', id=local_thread_id ).execute() if not thread or 'messages' not in thread or not thread['messages']: return json.dumps({ "error": f"No messages found in thread {local_thread_id}", "status": "error" }) # Use the first message in the thread local_message_id = thread['messages'][0]['id'] logger.debug(f"Using first message {local_message_id} from thread {local_thread_id}") # Get the message logger.debug(f"Getting message {local_message_id}") msg = self.service.users().messages().get( userId='me', id=local_message_id, format='full' ).execute() # Extract the full plain text body body_text = "" body_size_bytes = 0 if 'payload' in msg: body_text, body_size_bytes = self.extract_plain_text_body(msg['payload']) # Apply offset and get chunk if offset >= len(body_text): chunk = "" else: chunk = body_text[offset:offset+1000] # Determine if this contains the full remaining body contains_full_body = (offset + len(chunk) >= len(body_text)) return json.dumps({ "message_id": local_message_id, "thread_id": msg.get('threadId', ''), "body": chunk, "body_size_bytes": body_size_bytes, "offset": offset, "chunk_size": len(chunk), "contains_full_body": contains_full_body, "status": "success" }) except Exception as e: logger.error(f"Error processing message: {str(e)}", exc_info=True) return json.dumps({ "error": f"Error processing message: {str(e)}", "status": "error" }) # Execute the operation with token refresh handling return self._handle_token_refresh(_operation) except HttpError as e: logger.error(f"Gmail API Exception: {str(e)}") return json.dumps({"error": str(e)}) except Exception as e: logger.error(f"Exception in get_email_body_chunk: {str(e)}", exc_info=True) return json.dumps({"error": str(e)})
  • Input schema definition for the tool, specifying parameters: google_access_token (required), message_id, thread_id, offset.
    name="gmail_get_email_body_chunk", description="Get a 1k character chunk of an email body starting from the specified offset", inputSchema={ "type": "object", "properties": { "google_access_token": {"type": "string", "description": "Google OAuth2 access token"}, "message_id": {"type": "string", "description": "ID of the message to retrieve"}, "thread_id": {"type": "string", "description": "ID of the thread to retrieve (will get the first message if multiple exist)"}, "offset": {"type": "integer", "description": "Offset in characters to start from (default: 0)"} }, "required": [] }, ),
  • Registration and dispatching logic in the MCP @server.call_tool() handler that initializes GmailClient and invokes the tool implementation.
    elif name == "gmail_get_email_body_chunk": # Initialize Gmail client with just access token gmail = GmailClient( access_token=access_token ) message_id = arguments.get("message_id") thread_id = arguments.get("thread_id") offset = int(arguments.get("offset", 0)) if not message_id and not thread_id: raise ValueError("Either message_id or thread_id must be provided") results = gmail.get_email_body_chunk(message_id=message_id, thread_id=thread_id, offset=offset) return [types.TextContent(type="text", text=results)]
  • Helper method to recursively extract plain text body from Gmail message payload, decoding base64url parts and accumulating text and byte size.
    def extract_plain_text_body(self, msg_payload): """Extract plain text body from message payload Args: msg_payload: Gmail API message payload Returns: tuple: (plain_text_body, body_size_in_bytes) """ body_text = "" body_size = 0 # Helper function to process message parts recursively def extract_from_parts(parts): nonlocal body_text, body_size if not parts: return for part in parts: mime_type = part.get('mimeType', '') # If this part is plain text if mime_type == 'text/plain': body_data = part.get('body', {}).get('data', '') if body_data: # Decode base64url encoded data decoded_bytes = base64.urlsafe_b64decode(body_data) body_size += len(decoded_bytes) body_part = decoded_bytes.decode('utf-8', errors='replace') body_text += body_part # If this part has child parts, process them if 'parts' in part: extract_from_parts(part['parts']) # If body data is directly in the payload if 'body' in msg_payload and 'data' in msg_payload['body']: body_data = msg_payload['body']['data'] if body_data: decoded_bytes = base64.urlsafe_b64decode(body_data) body_size += len(decoded_bytes) body_text = decoded_bytes.decode('utf-8', errors='replace') # If message has parts, process them if 'parts' in msg_payload: extract_from_parts(msg_payload['parts']) return body_text, body_size

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/AbhinavBansal17/mcp-headless-gmail'

If you have feedback or need assistance with the MCP directory API, please join our Discord server