Skip to main content
Glama
vitalune

Personal Knowledge Assistant

by vitalune
gmail_client.py36.9 kB
""" Gmail API Client Implementation This module provides comprehensive Gmail API integration with: - OAuth2 authentication using Google APIs - Support for all Gmail scopes (readonly, labels, metadata) - Email search, read, send functionality - Attachment handling and processing - Thread and conversation management - Label operations and organization - Rate limiting according to Gmail API limits - Secure credential storage integration """ import asyncio import base64 import email import json import mimetypes from datetime import datetime, timezone, timedelta from typing import Dict, Any, List, Optional, Union, AsyncIterator, Tuple from dataclasses import dataclass, field from pathlib import Path import httpx import structlog from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import Flow from googleapiclient.discovery import build from googleapiclient.errors import HttpError from .base_client import BaseAPIClient, RateLimitConfig, CircuitBreakerConfig from ..config.auth import AuthProvider, TokenType from ..utils.rate_limiter import get_rate_limit_manager, RateLimitRule, RateLimitAlgorithm logger = structlog.get_logger(__name__) @dataclass class GmailMessage: """Gmail message data structure""" id: str thread_id: str label_ids: List[str] snippet: str history_id: str internal_date: datetime payload: Dict[str, Any] size_estimate: int # Parsed fields subject: Optional[str] = None sender: Optional[str] = None recipients: List[str] = field(default_factory=list) cc: List[str] = field(default_factory=list) bcc: List[str] = field(default_factory=list) body_text: Optional[str] = None body_html: Optional[str] = None attachments: List[Dict[str, Any]] = field(default_factory=list) @classmethod def from_api_response(cls, data: Dict[str, Any]) -> 'GmailMessage': """Create GmailMessage from API response""" # Parse internal date internal_date = datetime.fromtimestamp( int(data['internalDate']) / 1000, timezone.utc ) if 'internalDate' in data else datetime.now(timezone.utc) # Create base message message = cls( id=data['id'], thread_id=data['threadId'], label_ids=data.get('labelIds', []), snippet=data.get('snippet', ''), history_id=data.get('historyId', ''), internal_date=internal_date, payload=data.get('payload', {}), size_estimate=data.get('sizeEstimate', 0) ) # Parse message details message._parse_headers() message._parse_body() message._parse_attachments() return message def _parse_headers(self): """Parse email headers""" headers = self.payload.get('headers', []) header_dict = {h['name'].lower(): h['value'] for h in headers} self.subject = header_dict.get('subject') self.sender = header_dict.get('from') # Parse recipients if 'to' in header_dict: self.recipients = [addr.strip() for addr in header_dict['to'].split(',')] if 'cc' in header_dict: self.cc = [addr.strip() for addr in header_dict['cc'].split(',')] if 'bcc' in header_dict: self.bcc = [addr.strip() for addr in header_dict['bcc'].split(',')] def _parse_body(self): """Parse message body""" self._extract_body_from_payload(self.payload) def _extract_body_from_payload(self, payload: Dict[str, Any]): """Recursively extract body from payload""" mime_type = payload.get('mimeType', '') if mime_type == 'text/plain' and 'data' in payload.get('body', {}): self.body_text = base64.urlsafe_b64decode( payload['body']['data'] ).decode('utf-8', errors='ignore') elif mime_type == 'text/html' and 'data' in payload.get('body', {}): self.body_html = base64.urlsafe_b64decode( payload['body']['data'] ).decode('utf-8', errors='ignore') # Process multipart messages if 'parts' in payload: for part in payload['parts']: self._extract_body_from_payload(part) def _parse_attachments(self): """Parse message attachments""" self._extract_attachments_from_payload(self.payload) def _extract_attachments_from_payload(self, payload: Dict[str, Any]): """Recursively extract attachments from payload""" filename = payload.get('filename') mime_type = payload.get('mimeType', '') if filename and 'data' in payload.get('body', {}): attachment = { 'filename': filename, 'mime_type': mime_type, 'size': payload['body'].get('size', 0), 'attachment_id': payload['body'].get('attachmentId'), 'data': payload['body'].get('data') # Base64 encoded } self.attachments.append(attachment) # Process multipart messages if 'parts' in payload: for part in payload['parts']: self._extract_attachments_from_payload(part) @dataclass class GmailThread: """Gmail thread data structure""" id: str history_id: str messages: List[GmailMessage] = field(default_factory=list) @classmethod def from_api_response(cls, data: Dict[str, Any]) -> 'GmailThread': """Create GmailThread from API response""" thread = cls( id=data['id'], history_id=data['historyId'] ) # Parse messages if included if 'messages' in data: thread.messages = [ GmailMessage.from_api_response(msg) for msg in data['messages'] ] return thread @dataclass class GmailLabel: """Gmail label data structure""" id: str name: str type: str message_list_visibility: str label_list_visibility: str color: Optional[Dict[str, str]] = None messages_total: Optional[int] = None messages_unread: Optional[int] = None threads_total: Optional[int] = None threads_unread: Optional[int] = None class GmailClient(BaseAPIClient): """Gmail API client with full functionality""" def __init__( self, client_id: str, client_secret: str, scopes: Optional[List[str]] = None, **kwargs ): # Gmail-specific rate limiting rate_limit_config = RateLimitConfig( requests_per_minute=250, requests_per_hour=15000, requests_per_day=1000000, burst_size=10 ) super().__init__( provider=AuthProvider.GOOGLE, base_url="https://gmail.googleapis.com/gmail/v1", rate_limit_config=rate_limit_config, **kwargs ) self.client_id = client_id self.client_secret = client_secret self.scopes = scopes or [ 'https://www.googleapis.com/auth/gmail.readonly', 'https://www.googleapis.com/auth/gmail.labels', 'https://www.googleapis.com/auth/gmail.metadata' ] self.rate_limiter = get_rate_limit_manager() # Google API client (will be initialized after authentication) self._gmail_service = None def _format_auth_header(self, token: str) -> Dict[str, str]: """Format authentication header for Google APIs""" return {"Authorization": f"Bearer {token}"} async def authenticate(self, redirect_uri: str = "http://localhost:8080/oauth/callback") -> bool: """Authenticate with Gmail using OAuth2""" try: # Create OAuth2 flow flow = Flow.from_client_config( { "web": { "client_id": self.client_id, "client_secret": self.client_secret, "auth_uri": "https://accounts.google.com/o/oauth2/auth", "token_uri": "https://oauth2.googleapis.com/token", "redirect_uris": [redirect_uri] } }, scopes=self.scopes ) flow.redirect_uri = redirect_uri # Get authorization URL auth_url, state = flow.authorization_url( access_type='offline', include_granted_scopes='true', prompt='consent' ) # Store session state session_data = self.auth_manager.create_session_state( provider=self.provider, redirect_uri=redirect_uri, scopes=self.scopes ) logger.info( "Gmail OAuth flow initiated", auth_url=auth_url, state=state[:8] + "...", scopes=self.scopes ) # In a real implementation, you would redirect user to auth_url # and handle the callback with the authorization code # For now, we'll raise an exception with the auth URL raise Exception(f"Please visit this URL to authorize: {auth_url}") except Exception as e: logger.error(f"Gmail authentication failed", error=str(e)) return False async def handle_oauth_callback(self, authorization_code: str, state: str) -> bool: """Handle OAuth callback with authorization code""" try: # Validate session state session_data = self.auth_manager.consume_session_state(state) if not session_data: raise ValueError("Invalid or expired OAuth state") # Create flow and fetch token flow = Flow.from_client_config( { "web": { "client_id": self.client_id, "client_secret": self.client_secret, "auth_uri": "https://accounts.google.com/o/oauth2/auth", "token_uri": "https://oauth2.googleapis.com/token", "redirect_uris": [session_data["redirect_uri"]] } }, scopes=self.scopes ) flow.redirect_uri = session_data["redirect_uri"] # Exchange authorization code for tokens flow.fetch_token(code=authorization_code) credentials = flow.credentials # Store tokens securely access_token_id = await self.auth_manager.store_token( provider=self.provider, token_type=TokenType.ACCESS_TOKEN, token_value=credentials.token, expires_in=3600, # Google tokens typically expire in 1 hour scopes=self.scopes, subject=None, # Will be populated after getting user info client_id=self.client_id ) if credentials.refresh_token: refresh_token_id = await self.auth_manager.store_token( provider=self.provider, token_type=TokenType.REFRESH_TOKEN, token_value=credentials.refresh_token, expires_in=None, # Refresh tokens don't expire scopes=self.scopes, subject=None, client_id=self.client_id ) # Initialize Gmail service await self._initialize_service() logger.info( "Gmail authentication successful", access_token_id=access_token_id, scopes=self.scopes ) return True except Exception as e: logger.error(f"Gmail OAuth callback failed", error=str(e)) return False async def refresh_token(self) -> bool: """Refresh Gmail access token""" try: # Find refresh token tokens = await self.auth_manager.list_tokens( provider=self.provider, token_type=TokenType.REFRESH_TOKEN ) if not tokens: logger.error("No refresh token found for Gmail") return False refresh_token_id = tokens[0]["token_id"] refresh_token = await self.auth_manager.retrieve_token(refresh_token_id) if not refresh_token: logger.error("Failed to retrieve Gmail refresh token") return False # Create credentials and refresh credentials = Credentials( token=None, refresh_token=refresh_token, token_uri="https://oauth2.googleapis.com/token", client_id=self.client_id, client_secret=self.client_secret, scopes=self.scopes ) # Refresh the token credentials.refresh(Request()) # Store new access token access_token_id = await self.auth_manager.store_token( provider=self.provider, token_type=TokenType.ACCESS_TOKEN, token_value=credentials.token, expires_in=3600, scopes=self.scopes, subject=tokens[0].get("subject"), client_id=self.client_id ) logger.info("Gmail token refreshed successfully", token_id=access_token_id) return True except Exception as e: logger.error(f"Gmail token refresh failed", error=str(e)) return False async def _initialize_service(self): """Initialize Gmail service client""" try: # Get access token tokens = await self.auth_manager.list_tokens( provider=self.provider, token_type=TokenType.ACCESS_TOKEN ) if not tokens: raise Exception("No access token available") token_value = await self.auth_manager.retrieve_token(tokens[0]["token_id"]) if not token_value: raise Exception("Failed to retrieve access token") # Create credentials credentials = Credentials(token=token_value) # Build Gmail service self._gmail_service = build('gmail', 'v1', credentials=credentials) logger.info("Gmail service initialized successfully") except Exception as e: logger.error(f"Failed to initialize Gmail service", error=str(e)) raise async def get_rate_limits(self) -> Dict[str, Any]: """Get current rate limit status for Gmail""" # Gmail doesn't provide rate limit info in headers, so we return our tracking status = await self.rate_limiter.get_all_status() return status.get("gmail", {}) async def get_profile(self) -> Dict[str, Any]: """Get Gmail profile information""" status = await self.rate_limiter.is_allowed("gmail", "profile") if not status.allowed: raise Exception(f"Rate limited. Retry after {status.retry_after}s") try: if not self._gmail_service: await self._initialize_service() response = await asyncio.get_event_loop().run_in_executor( None, self._gmail_service.users().getProfile(userId='me').execute ) await self.rate_limiter.record_response("gmail", "profile", True) return { "email_address": response.get("emailAddress"), "messages_total": response.get("messagesTotal"), "threads_total": response.get("threadsTotal"), "history_id": response.get("historyId") } except Exception as e: await self.rate_limiter.record_response("gmail", "profile", False) logger.error(f"Failed to get Gmail profile", error=str(e)) raise async def list_messages( self, query: Optional[str] = None, label_ids: Optional[List[str]] = None, max_results: int = 100, page_token: Optional[str] = None, include_spam_trash: bool = False ) -> Dict[str, Any]: """List Gmail messages with optional filtering""" status = await self.rate_limiter.is_allowed("gmail", "list_messages") if not status.allowed: raise Exception(f"Rate limited. Retry after {status.retry_after}s") try: if not self._gmail_service: await self._initialize_service() # Build request parameters params = { 'userId': 'me', 'maxResults': min(max_results, 500), # Gmail limit 'includeSpamTrash': include_spam_trash } if query: params['q'] = query if label_ids: params['labelIds'] = label_ids if page_token: params['pageToken'] = page_token response = await asyncio.get_event_loop().run_in_executor( None, lambda: self._gmail_service.users().messages().list(**params).execute() ) await self.rate_limiter.record_response("gmail", "list_messages", True) return { "messages": response.get("messages", []), "next_page_token": response.get("nextPageToken"), "result_size_estimate": response.get("resultSizeEstimate", 0) } except Exception as e: await self.rate_limiter.record_response("gmail", "list_messages", False) logger.error(f"Failed to list Gmail messages", error=str(e)) raise async def get_message( self, message_id: str, format: str = "full", metadata_headers: Optional[List[str]] = None ) -> GmailMessage: """Get a specific Gmail message""" status = await self.rate_limiter.is_allowed("gmail", "get_message") if not status.allowed: raise Exception(f"Rate limited. Retry after {status.retry_after}s") try: if not self._gmail_service: await self._initialize_service() params = { 'userId': 'me', 'id': message_id, 'format': format } if metadata_headers: params['metadataHeaders'] = metadata_headers response = await asyncio.get_event_loop().run_in_executor( None, lambda: self._gmail_service.users().messages().get(**params).execute() ) await self.rate_limiter.record_response("gmail", "get_message", True) return GmailMessage.from_api_response(response) except Exception as e: await self.rate_limiter.record_response("gmail", "get_message", False) logger.error(f"Failed to get Gmail message", message_id=message_id, error=str(e)) raise async def get_messages_batch( self, message_ids: List[str], format: str = "full" ) -> List[GmailMessage]: """Get multiple Gmail messages in batch""" messages = [] # Process in batches to respect rate limits batch_size = 10 for i in range(0, len(message_ids), batch_size): batch_ids = message_ids[i:i + batch_size] # Process batch concurrently but with rate limiting batch_tasks = [] for msg_id in batch_ids: task = self.get_message(msg_id, format) batch_tasks.append(task) batch_messages = await asyncio.gather(*batch_tasks, return_exceptions=True) for result in batch_messages: if isinstance(result, GmailMessage): messages.append(result) else: logger.error(f"Failed to get message in batch", error=str(result)) return messages async def search_messages( self, query: str, max_results: int = 100, include_body: bool = True ) -> List[GmailMessage]: """Search Gmail messages with query""" # List messages matching query message_list = await self.list_messages(query=query, max_results=max_results) if not message_list["messages"]: return [] # Get full message details message_ids = [msg["id"] for msg in message_list["messages"]] format_type = "full" if include_body else "metadata" return await self.get_messages_batch(message_ids, format_type) async def get_thread(self, thread_id: str, format: str = "full") -> GmailThread: """Get a Gmail thread with all messages""" status = await self.rate_limiter.is_allowed("gmail", "get_thread") if not status.allowed: raise Exception(f"Rate limited. Retry after {status.retry_after}s") try: if not self._gmail_service: await self._initialize_service() response = await asyncio.get_event_loop().run_in_executor( None, lambda: self._gmail_service.users().threads().get( userId='me', id=thread_id, format=format ).execute() ) await self.rate_limiter.record_response("gmail", "get_thread", True) return GmailThread.from_api_response(response) except Exception as e: await self.rate_limiter.record_response("gmail", "get_thread", False) logger.error(f"Failed to get Gmail thread", thread_id=thread_id, error=str(e)) raise async def list_labels(self) -> List[GmailLabel]: """List all Gmail labels""" status = await self.rate_limiter.is_allowed("gmail", "list_labels") if not status.allowed: raise Exception(f"Rate limited. Retry after {status.retry_after}s") try: if not self._gmail_service: await self._initialize_service() response = await asyncio.get_event_loop().run_in_executor( None, self._gmail_service.users().labels().list(userId='me').execute ) await self.rate_limiter.record_response("gmail", "list_labels", True) labels = [] for label_data in response.get("labels", []): label = GmailLabel( id=label_data["id"], name=label_data["name"], type=label_data.get("type", "user"), message_list_visibility=label_data.get("messageListVisibility", "show"), label_list_visibility=label_data.get("labelListVisibility", "labelShow"), color=label_data.get("color"), messages_total=label_data.get("messagesTotal"), messages_unread=label_data.get("messagesUnread"), threads_total=label_data.get("threadsTotal"), threads_unread=label_data.get("threadsUnread") ) labels.append(label) return labels except Exception as e: await self.rate_limiter.record_response("gmail", "list_labels", False) logger.error(f"Failed to list Gmail labels", error=str(e)) raise async def get_label(self, label_id: str) -> GmailLabel: """Get a specific Gmail label""" status = await self.rate_limiter.is_allowed("gmail", "get_label") if not status.allowed: raise Exception(f"Rate limited. Retry after {status.retry_after}s") try: if not self._gmail_service: await self._initialize_service() response = await asyncio.get_event_loop().run_in_executor( None, lambda: self._gmail_service.users().labels().get( userId='me', id=label_id ).execute() ) await self.rate_limiter.record_response("gmail", "get_label", True) return GmailLabel( id=response["id"], name=response["name"], type=response.get("type", "user"), message_list_visibility=response.get("messageListVisibility", "show"), label_list_visibility=response.get("labelListVisibility", "labelShow"), color=response.get("color"), messages_total=response.get("messagesTotal"), messages_unread=response.get("messagesUnread"), threads_total=response.get("threadsTotal"), threads_unread=response.get("threadsUnread") ) except Exception as e: await self.rate_limiter.record_response("gmail", "get_label", False) logger.error(f"Failed to get Gmail label", label_id=label_id, error=str(e)) raise async def get_attachment(self, message_id: str, attachment_id: str) -> bytes: """Get message attachment data""" status = await self.rate_limiter.is_allowed("gmail", "get_attachment") if not status.allowed: raise Exception(f"Rate limited. Retry after {status.retry_after}s") try: if not self._gmail_service: await self._initialize_service() response = await asyncio.get_event_loop().run_in_executor( None, lambda: self._gmail_service.users().messages().attachments().get( userId='me', messageId=message_id, id=attachment_id ).execute() ) await self.rate_limiter.record_response("gmail", "get_attachment", True) # Decode base64 attachment data attachment_data = base64.urlsafe_b64decode(response['data']) return attachment_data except Exception as e: await self.rate_limiter.record_response("gmail", "get_attachment", False) logger.error( f"Failed to get Gmail attachment", message_id=message_id, attachment_id=attachment_id, error=str(e) ) raise async def watch_mailbox( self, topic_name: str, label_ids: Optional[List[str]] = None, label_filter_action: str = "include" ) -> Dict[str, Any]: """Set up push notifications for mailbox changes""" status = await self.rate_limiter.is_allowed("gmail", "watch") if not status.allowed: raise Exception(f"Rate limited. Retry after {status.retry_after}s") try: if not self._gmail_service: await self._initialize_service() body = { 'topicName': topic_name, 'labelIds': label_ids or [], 'labelFilterAction': label_filter_action } response = await asyncio.get_event_loop().run_in_executor( None, lambda: self._gmail_service.users().watch( userId='me', body=body ).execute() ) await self.rate_limiter.record_response("gmail", "watch", True) return { "history_id": response.get("historyId"), "expiration": response.get("expiration") } except Exception as e: await self.rate_limiter.record_response("gmail", "watch", False) logger.error(f"Failed to setup Gmail watch", error=str(e)) raise async def stop_watch(self) -> bool: """Stop push notifications""" status = await self.rate_limiter.is_allowed("gmail", "stop") if not status.allowed: raise Exception(f"Rate limited. Retry after {status.retry_after}s") try: if not self._gmail_service: await self._initialize_service() await asyncio.get_event_loop().run_in_executor( None, self._gmail_service.users().stop(userId='me').execute ) await self.rate_limiter.record_response("gmail", "stop", True) return True except Exception as e: await self.rate_limiter.record_response("gmail", "stop", False) logger.error(f"Failed to stop Gmail watch", error=str(e)) return False async def get_history( self, start_history_id: str, max_results: int = 100, label_id: Optional[str] = None, page_token: Optional[str] = None ) -> Dict[str, Any]: """Get mailbox history changes""" status = await self.rate_limiter.is_allowed("gmail", "history") if not status.allowed: raise Exception(f"Rate limited. Retry after {status.retry_after}s") try: if not self._gmail_service: await self._initialize_service() params = { 'userId': 'me', 'startHistoryId': start_history_id, 'maxResults': min(max_results, 500) } if label_id: params['labelId'] = label_id if page_token: params['pageToken'] = page_token response = await asyncio.get_event_loop().run_in_executor( None, lambda: self._gmail_service.users().history().list(**params).execute() ) await self.rate_limiter.record_response("gmail", "history", True) return { "history": response.get("history", []), "next_page_token": response.get("nextPageToken"), "history_id": response.get("historyId") } except Exception as e: await self.rate_limiter.record_response("gmail", "history", False) logger.error(f"Failed to get Gmail history", error=str(e)) raise async def create_filter( self, criteria: Dict[str, Any], action: Dict[str, Any] ) -> Dict[str, Any]: """Create a Gmail filter""" # Note: This requires additional Gmail scopes beyond readonly status = await self.rate_limiter.is_allowed("gmail", "create_filter") if not status.allowed: raise Exception(f"Rate limited. Retry after {status.retry_after}s") try: if not self._gmail_service: await self._initialize_service() body = { 'criteria': criteria, 'action': action } response = await asyncio.get_event_loop().run_in_executor( None, lambda: self._gmail_service.users().settings().filters().create( userId='me', body=body ).execute() ) await self.rate_limiter.record_response("gmail", "create_filter", True) return response except Exception as e: await self.rate_limiter.record_response("gmail", "create_filter", False) logger.error(f"Failed to create Gmail filter", error=str(e)) raise async def export_messages( self, output_path: Path, query: Optional[str] = None, format: str = "mbox", max_messages: int = 1000 ) -> int: """Export Gmail messages to file""" logger.info(f"Starting Gmail export", output_path=str(output_path), query=query, format=format) # List messages to export all_messages = [] page_token = None exported_count = 0 while len(all_messages) < max_messages: batch_size = min(500, max_messages - len(all_messages)) message_list = await self.list_messages( query=query, max_results=batch_size, page_token=page_token ) if not message_list["messages"]: break # Get full message details for this batch message_ids = [msg["id"] for msg in message_list["messages"]] messages = await self.get_messages_batch(message_ids, "full") all_messages.extend(messages) page_token = message_list.get("next_page_token") if not page_token: break # Export to file output_path.parent.mkdir(parents=True, exist_ok=True) if format.lower() == "mbox": exported_count = await self._export_to_mbox(all_messages, output_path) elif format.lower() == "json": exported_count = await self._export_to_json(all_messages, output_path) else: raise ValueError(f"Unsupported export format: {format}") logger.info(f"Gmail export completed", exported_count=exported_count, output_path=str(output_path)) return exported_count async def _export_to_mbox(self, messages: List[GmailMessage], output_path: Path) -> int: """Export messages to mbox format""" import mailbox mbox = mailbox.mbox(str(output_path)) for message in messages: # Create email message msg = email.message.EmailMessage() # Set headers if message.subject: msg['Subject'] = message.subject if message.sender: msg['From'] = message.sender if message.recipients: msg['To'] = ', '.join(message.recipients) if message.cc: msg['Cc'] = ', '.join(message.cc) msg['Date'] = message.internal_date.strftime('%a, %d %b %Y %H:%M:%S %z') msg['Message-ID'] = f"<{message.id}@gmail.com>" # Set body if message.body_text: msg.set_content(message.body_text) elif message.body_html: msg.set_content(message.body_html, subtype='html') # Add to mbox mbox.add(msg) mbox.close() return len(messages) async def _export_to_json(self, messages: List[GmailMessage], output_path: Path) -> int: """Export messages to JSON format""" export_data = [] for message in messages: msg_data = { "id": message.id, "thread_id": message.thread_id, "subject": message.subject, "sender": message.sender, "recipients": message.recipients, "cc": message.cc, "bcc": message.bcc, "date": message.internal_date.isoformat(), "body_text": message.body_text, "body_html": message.body_html, "snippet": message.snippet, "labels": message.label_ids, "attachments": [ { "filename": att["filename"], "mime_type": att["mime_type"], "size": att["size"] } for att in message.attachments ] } export_data.append(msg_data) with open(output_path, 'w', encoding='utf-8') as f: json.dump(export_data, f, indent=2, ensure_ascii=False) return len(messages)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/vitalune/Nexus-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server