gmail_get_email_body_chunk
Retrieve specific 1k character segments from Gmail email bodies by specifying offset positions for processing large messages in headless environments.
Instructions
Get a 1k character chunk of an email body starting from the specified offset
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| google_access_token | No | Google OAuth2 access token | |
| message_id | No | ID of the message to retrieve | |
| thread_id | No | ID of the thread to retrieve (will get the first message if multiple exist) | |
| offset | No | Offset in characters to start from (default: 0) |
Implementation Reference
- Core handler function in GmailClient class that implements the tool logic: fetches email message by ID or thread, extracts full plain text body, applies offset for chunking (1k chars), and returns JSON with metadata including whether full body is contained.def get_email_body_chunk(self, message_id: str = None, thread_id: str = None, offset: int = 0) -> str: """Get a chunk of the email body Args: message_id: ID of the message to retrieve thread_id: ID of the thread to retrieve (will get the first message if multiple exist) offset: Offset in characters to start from (default: 0) Returns: JSON string with the body chunk and metadata """ try: # Check if service is initialized if not hasattr(self, 'service'): logger.error("Gmail service not initialized. No valid access token provided.") return json.dumps({ "error": "No valid access token provided. Please refresh your token first.", "status": "error" }) # Define the operation def _operation(): logger.debug(f"Fetching email body chunk with offset {offset}") # Store message_id in local variable to make it accessible within _operation scope local_message_id = message_id local_thread_id = thread_id # Validate inputs if not local_message_id and not local_thread_id: return json.dumps({ "error": "Either message_id or thread_id must be provided", "status": "error" }) try: # If thread_id is provided but not message_id, get the first message in thread if local_thread_id and not local_message_id: logger.debug(f"Getting messages in thread {local_thread_id}") thread = self.service.users().threads().get( userId='me', id=local_thread_id ).execute() if not thread or 'messages' not in thread or not thread['messages']: return json.dumps({ "error": f"No messages found in thread {local_thread_id}", "status": "error" }) # Use the first message in the thread local_message_id = thread['messages'][0]['id'] logger.debug(f"Using first message {local_message_id} from thread {local_thread_id}") # Get the message logger.debug(f"Getting message {local_message_id}") msg = self.service.users().messages().get( userId='me', id=local_message_id, format='full' ).execute() # Extract the full plain text body body_text = "" body_size_bytes = 0 if 'payload' in msg: body_text, body_size_bytes = self.extract_plain_text_body(msg['payload']) # Apply offset and get chunk if offset >= len(body_text): chunk = "" else: chunk = body_text[offset:offset+1000] # Determine if this contains the full remaining body contains_full_body = (offset + len(chunk) >= len(body_text)) return json.dumps({ "message_id": local_message_id, "thread_id": msg.get('threadId', ''), "body": chunk, "body_size_bytes": body_size_bytes, "offset": offset, "chunk_size": len(chunk), "contains_full_body": contains_full_body, "status": "success" }) except Exception as e: logger.error(f"Error processing message: {str(e)}", exc_info=True) return json.dumps({ "error": f"Error processing message: {str(e)}", "status": "error" }) # Execute the operation with token refresh handling return self._handle_token_refresh(_operation) except HttpError as e: logger.error(f"Gmail API Exception: {str(e)}") return json.dumps({"error": str(e)}) except Exception as e: logger.error(f"Exception in get_email_body_chunk: {str(e)}", exc_info=True) return json.dumps({"error": str(e)})
- Input schema definition for the tool, specifying parameters: google_access_token (required), message_id, thread_id, offset.name="gmail_get_email_body_chunk", description="Get a 1k character chunk of an email body starting from the specified offset", inputSchema={ "type": "object", "properties": { "google_access_token": {"type": "string", "description": "Google OAuth2 access token"}, "message_id": {"type": "string", "description": "ID of the message to retrieve"}, "thread_id": {"type": "string", "description": "ID of the thread to retrieve (will get the first message if multiple exist)"}, "offset": {"type": "integer", "description": "Offset in characters to start from (default: 0)"} }, "required": [] }, ),
- src/mcp_server_headless_gmail/server.py:574-589 (registration)Registration and dispatching logic in the MCP @server.call_tool() handler that initializes GmailClient and invokes the tool implementation.elif name == "gmail_get_email_body_chunk": # Initialize Gmail client with just access token gmail = GmailClient( access_token=access_token ) message_id = arguments.get("message_id") thread_id = arguments.get("thread_id") offset = int(arguments.get("offset", 0)) if not message_id and not thread_id: raise ValueError("Either message_id or thread_id must be provided") results = gmail.get_email_body_chunk(message_id=message_id, thread_id=thread_id, offset=offset) return [types.TextContent(type="text", text=results)]
- Helper method to recursively extract plain text body from Gmail message payload, decoding base64url parts and accumulating text and byte size.def extract_plain_text_body(self, msg_payload): """Extract plain text body from message payload Args: msg_payload: Gmail API message payload Returns: tuple: (plain_text_body, body_size_in_bytes) """ body_text = "" body_size = 0 # Helper function to process message parts recursively def extract_from_parts(parts): nonlocal body_text, body_size if not parts: return for part in parts: mime_type = part.get('mimeType', '') # If this part is plain text if mime_type == 'text/plain': body_data = part.get('body', {}).get('data', '') if body_data: # Decode base64url encoded data decoded_bytes = base64.urlsafe_b64decode(body_data) body_size += len(decoded_bytes) body_part = decoded_bytes.decode('utf-8', errors='replace') body_text += body_part # If this part has child parts, process them if 'parts' in part: extract_from_parts(part['parts']) # If body data is directly in the payload if 'body' in msg_payload and 'data' in msg_payload['body']: body_data = msg_payload['body']['data'] if body_data: decoded_bytes = base64.urlsafe_b64decode(body_data) body_size += len(decoded_bytes) body_text = decoded_bytes.decode('utf-8', errors='replace') # If message has parts, process them if 'parts' in msg_payload: extract_from_parts(msg_payload['parts']) return body_text, body_size