Gmail MCP

  • gmail_mcp
import base64 import os from email.header import decode_header from typing import Any, Dict, List, Optional from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from googleapiclient.discovery import build from googleapiclient.errors import HttpError # If modifying these scopes, delete the file token.json. SCOPES = ["https://www.googleapis.com/auth/gmail.readonly"] def decode_mime_header(header: str) -> str: """Helper function to decode encoded email headers""" decoded_parts = decode_header(header) decoded_string = "" for part, encoding in decoded_parts: if isinstance(part, bytes): # Decode bytes to string using the specified encoding decoded_string += part.decode(encoding or "utf-8") else: # Already a string decoded_string += part return decoded_string class GmailClient: def __init__(self, creds_file_path: str, token_path: str): self.creds_file_path = creds_file_path self.token_path = token_path self.token = None self.service = None self.user_email = None @classmethod def create(cls, creds_file_path: str, token_path: str) -> "GmailClient": """Factory method to create and initialize a GmailClient instance""" client = cls(creds_file_path, token_path) client.initialize() return client @classmethod def authorize(cls, creds_file_path: str, token_path: str) -> None: """Run the authorization flow and save the token""" if not os.path.exists(creds_file_path): raise FileNotFoundError(f"Credentials file not found at: {creds_file_path}") try: flow = InstalledAppFlow.from_client_secrets_file(creds_file_path, SCOPES) token = flow.run_local_server(port=0) # Save the token with open(token_path, "w") as token_file: token_file.write(token.to_json()) print(f"Successfully saved token to {token_path}") except Exception as e: raise Exception(f"Failed to authorize: {str(e)}") def initialize(self) -> None: """Initialize the Gmail client with credentials""" self.token = self._get_token() self.service = self._get_service() self.user_email = self._get_user_email() def _get_token(self) -> Credentials: """Get or refresh Google API token""" token = None if os.path.exists(self.token_path): token = Credentials.from_authorized_user_file(self.token_path, SCOPES) if not token or not token.valid: if token and token.expired and token.refresh_token: token.refresh(Request()) else: raise Exception( "No valid token found. Please run 'gmail-mcp auth' first to authorize." ) return token def _get_service(self) -> Any: """Initialize Gmail API service""" try: service = build("gmail", "v1", credentials=self.token) return service except HttpError as error: raise ValueError(f"An error occurred: {error}") def _get_user_email(self) -> str: """Get user email address""" profile = self.service.users().getProfile(userId="me").execute() user_email = profile.get("emailAddress", "") return user_email def _get_message_body(self, parts: List[Dict[str, Any]]) -> str: """Get the body of the email message.""" body = "" if not parts: return body for part in parts: if part.get("parts"): # If this part has subparts, recursively get their content body += self._get_message_body(part.get("parts", [])) if part.get("body") and part.get("body").get("data"): data = part["body"]["data"] # Decode the base64url encoded data decoded_bytes = base64.urlsafe_b64decode(data) # If it's text, decode to a string if part.get("mimeType", "").startswith("text/"): body += decoded_bytes.decode("utf-8") return body def list_messages( self, user_id: str = "me", query: str = "", max_results: Optional[int] = None ) -> List[Dict[str, Any]]: """List all Messages of the user's mailbox matching the query.""" try: response = ( self.service.users().messages().list(userId=user_id, q=query).execute() ) messages = [] if "messages" in response: messages.extend(response["messages"]) while "nextPageToken" in response: if max_results and len(messages) >= max_results: break page_token = response["nextPageToken"] response = ( self.service.users() .messages() .list(userId=user_id, q=query, pageToken=page_token) .execute() ) if "messages" in response: messages.extend(response["messages"]) if max_results: return messages[:max_results] return messages except Exception: return [] def get_message( self, user_id: str = "me", msg_id: str = "" ) -> Optional[Dict[str, Any]]: """Get a Message with given ID.""" try: message = ( self.service.users().messages().get(userId=user_id, id=msg_id).execute() ) # Get email parts payload = message["payload"] headers = payload["headers"] # Look for Subject and Sender subject = "" sender = "" for header in headers: if header["name"] == "Subject": subject = header["value"] if header["name"] == "From": sender = header["value"] # The Body of the message parts = payload.get("parts", []) body = self._get_message_body(parts) return { "id": msg_id, "subject": subject, "sender": sender, "body": body, "snippet": message["snippet"], } except Exception: return None