"""
Gmail API Client Implementation
This module provides comprehensive Gmail API integration with:
- OAuth2 authentication using Google APIs
- Support for all Gmail scopes (readonly, labels, metadata)
- Email search, read, send functionality
- Attachment handling and processing
- Thread and conversation management
- Label operations and organization
- Rate limiting according to Gmail API limits
- Secure credential storage integration
"""
import asyncio
import base64
import email
import json
import mimetypes
from datetime import datetime, timezone, timedelta
from typing import Dict, Any, List, Optional, Union, AsyncIterator, Tuple
from dataclasses import dataclass, field
from pathlib import Path
import httpx
import structlog
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import Flow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from .base_client import BaseAPIClient, RateLimitConfig, CircuitBreakerConfig
from ..config.auth import AuthProvider, TokenType
from ..utils.rate_limiter import get_rate_limit_manager, RateLimitRule, RateLimitAlgorithm
logger = structlog.get_logger(__name__)
@dataclass
class GmailMessage:
"""Gmail message data structure"""
id: str
thread_id: str
label_ids: List[str]
snippet: str
history_id: str
internal_date: datetime
payload: Dict[str, Any]
size_estimate: int
# Parsed fields
subject: Optional[str] = None
sender: Optional[str] = None
recipients: List[str] = field(default_factory=list)
cc: List[str] = field(default_factory=list)
bcc: List[str] = field(default_factory=list)
body_text: Optional[str] = None
body_html: Optional[str] = None
attachments: List[Dict[str, Any]] = field(default_factory=list)
@classmethod
def from_api_response(cls, data: Dict[str, Any]) -> 'GmailMessage':
"""Create GmailMessage from API response"""
# Parse internal date
internal_date = datetime.fromtimestamp(
int(data['internalDate']) / 1000, timezone.utc
) if 'internalDate' in data else datetime.now(timezone.utc)
# Create base message
message = cls(
id=data['id'],
thread_id=data['threadId'],
label_ids=data.get('labelIds', []),
snippet=data.get('snippet', ''),
history_id=data.get('historyId', ''),
internal_date=internal_date,
payload=data.get('payload', {}),
size_estimate=data.get('sizeEstimate', 0)
)
# Parse message details
message._parse_headers()
message._parse_body()
message._parse_attachments()
return message
def _parse_headers(self):
"""Parse email headers"""
headers = self.payload.get('headers', [])
header_dict = {h['name'].lower(): h['value'] for h in headers}
self.subject = header_dict.get('subject')
self.sender = header_dict.get('from')
# Parse recipients
if 'to' in header_dict:
self.recipients = [addr.strip() for addr in header_dict['to'].split(',')]
if 'cc' in header_dict:
self.cc = [addr.strip() for addr in header_dict['cc'].split(',')]
if 'bcc' in header_dict:
self.bcc = [addr.strip() for addr in header_dict['bcc'].split(',')]
def _parse_body(self):
"""Parse message body"""
self._extract_body_from_payload(self.payload)
def _extract_body_from_payload(self, payload: Dict[str, Any]):
"""Recursively extract body from payload"""
mime_type = payload.get('mimeType', '')
if mime_type == 'text/plain' and 'data' in payload.get('body', {}):
self.body_text = base64.urlsafe_b64decode(
payload['body']['data']
).decode('utf-8', errors='ignore')
elif mime_type == 'text/html' and 'data' in payload.get('body', {}):
self.body_html = base64.urlsafe_b64decode(
payload['body']['data']
).decode('utf-8', errors='ignore')
# Process multipart messages
if 'parts' in payload:
for part in payload['parts']:
self._extract_body_from_payload(part)
def _parse_attachments(self):
"""Parse message attachments"""
self._extract_attachments_from_payload(self.payload)
def _extract_attachments_from_payload(self, payload: Dict[str, Any]):
"""Recursively extract attachments from payload"""
filename = payload.get('filename')
mime_type = payload.get('mimeType', '')
if filename and 'data' in payload.get('body', {}):
attachment = {
'filename': filename,
'mime_type': mime_type,
'size': payload['body'].get('size', 0),
'attachment_id': payload['body'].get('attachmentId'),
'data': payload['body'].get('data') # Base64 encoded
}
self.attachments.append(attachment)
# Process multipart messages
if 'parts' in payload:
for part in payload['parts']:
self._extract_attachments_from_payload(part)
@dataclass
class GmailThread:
"""Gmail thread data structure"""
id: str
history_id: str
messages: List[GmailMessage] = field(default_factory=list)
@classmethod
def from_api_response(cls, data: Dict[str, Any]) -> 'GmailThread':
"""Create GmailThread from API response"""
thread = cls(
id=data['id'],
history_id=data['historyId']
)
# Parse messages if included
if 'messages' in data:
thread.messages = [
GmailMessage.from_api_response(msg) for msg in data['messages']
]
return thread
@dataclass
class GmailLabel:
"""Gmail label data structure"""
id: str
name: str
type: str
message_list_visibility: str
label_list_visibility: str
color: Optional[Dict[str, str]] = None
messages_total: Optional[int] = None
messages_unread: Optional[int] = None
threads_total: Optional[int] = None
threads_unread: Optional[int] = None
class GmailClient(BaseAPIClient):
"""Gmail API client with full functionality"""
def __init__(
self,
client_id: str,
client_secret: str,
scopes: Optional[List[str]] = None,
**kwargs
):
# Gmail-specific rate limiting
rate_limit_config = RateLimitConfig(
requests_per_minute=250,
requests_per_hour=15000,
requests_per_day=1000000,
burst_size=10
)
super().__init__(
provider=AuthProvider.GOOGLE,
base_url="https://gmail.googleapis.com/gmail/v1",
rate_limit_config=rate_limit_config,
**kwargs
)
self.client_id = client_id
self.client_secret = client_secret
self.scopes = scopes or [
'https://www.googleapis.com/auth/gmail.readonly',
'https://www.googleapis.com/auth/gmail.labels',
'https://www.googleapis.com/auth/gmail.metadata'
]
self.rate_limiter = get_rate_limit_manager()
# Google API client (will be initialized after authentication)
self._gmail_service = None
def _format_auth_header(self, token: str) -> Dict[str, str]:
"""Format authentication header for Google APIs"""
return {"Authorization": f"Bearer {token}"}
async def authenticate(self, redirect_uri: str = "http://localhost:8080/oauth/callback") -> bool:
"""Authenticate with Gmail using OAuth2"""
try:
# Create OAuth2 flow
flow = Flow.from_client_config(
{
"web": {
"client_id": self.client_id,
"client_secret": self.client_secret,
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"redirect_uris": [redirect_uri]
}
},
scopes=self.scopes
)
flow.redirect_uri = redirect_uri
# Get authorization URL
auth_url, state = flow.authorization_url(
access_type='offline',
include_granted_scopes='true',
prompt='consent'
)
# Store session state
session_data = self.auth_manager.create_session_state(
provider=self.provider,
redirect_uri=redirect_uri,
scopes=self.scopes
)
logger.info(
"Gmail OAuth flow initiated",
auth_url=auth_url,
state=state[:8] + "...",
scopes=self.scopes
)
# In a real implementation, you would redirect user to auth_url
# and handle the callback with the authorization code
# For now, we'll raise an exception with the auth URL
raise Exception(f"Please visit this URL to authorize: {auth_url}")
except Exception as e:
logger.error(f"Gmail authentication failed", error=str(e))
return False
async def handle_oauth_callback(self, authorization_code: str, state: str) -> bool:
"""Handle OAuth callback with authorization code"""
try:
# Validate session state
session_data = self.auth_manager.consume_session_state(state)
if not session_data:
raise ValueError("Invalid or expired OAuth state")
# Create flow and fetch token
flow = Flow.from_client_config(
{
"web": {
"client_id": self.client_id,
"client_secret": self.client_secret,
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"redirect_uris": [session_data["redirect_uri"]]
}
},
scopes=self.scopes
)
flow.redirect_uri = session_data["redirect_uri"]
# Exchange authorization code for tokens
flow.fetch_token(code=authorization_code)
credentials = flow.credentials
# Store tokens securely
access_token_id = await self.auth_manager.store_token(
provider=self.provider,
token_type=TokenType.ACCESS_TOKEN,
token_value=credentials.token,
expires_in=3600, # Google tokens typically expire in 1 hour
scopes=self.scopes,
subject=None, # Will be populated after getting user info
client_id=self.client_id
)
if credentials.refresh_token:
refresh_token_id = await self.auth_manager.store_token(
provider=self.provider,
token_type=TokenType.REFRESH_TOKEN,
token_value=credentials.refresh_token,
expires_in=None, # Refresh tokens don't expire
scopes=self.scopes,
subject=None,
client_id=self.client_id
)
# Initialize Gmail service
await self._initialize_service()
logger.info(
"Gmail authentication successful",
access_token_id=access_token_id,
scopes=self.scopes
)
return True
except Exception as e:
logger.error(f"Gmail OAuth callback failed", error=str(e))
return False
async def refresh_token(self) -> bool:
"""Refresh Gmail access token"""
try:
# Find refresh token
tokens = await self.auth_manager.list_tokens(
provider=self.provider,
token_type=TokenType.REFRESH_TOKEN
)
if not tokens:
logger.error("No refresh token found for Gmail")
return False
refresh_token_id = tokens[0]["token_id"]
refresh_token = await self.auth_manager.retrieve_token(refresh_token_id)
if not refresh_token:
logger.error("Failed to retrieve Gmail refresh token")
return False
# Create credentials and refresh
credentials = Credentials(
token=None,
refresh_token=refresh_token,
token_uri="https://oauth2.googleapis.com/token",
client_id=self.client_id,
client_secret=self.client_secret,
scopes=self.scopes
)
# Refresh the token
credentials.refresh(Request())
# Store new access token
access_token_id = await self.auth_manager.store_token(
provider=self.provider,
token_type=TokenType.ACCESS_TOKEN,
token_value=credentials.token,
expires_in=3600,
scopes=self.scopes,
subject=tokens[0].get("subject"),
client_id=self.client_id
)
logger.info("Gmail token refreshed successfully", token_id=access_token_id)
return True
except Exception as e:
logger.error(f"Gmail token refresh failed", error=str(e))
return False
async def _initialize_service(self):
"""Initialize Gmail service client"""
try:
# Get access token
tokens = await self.auth_manager.list_tokens(
provider=self.provider,
token_type=TokenType.ACCESS_TOKEN
)
if not tokens:
raise Exception("No access token available")
token_value = await self.auth_manager.retrieve_token(tokens[0]["token_id"])
if not token_value:
raise Exception("Failed to retrieve access token")
# Create credentials
credentials = Credentials(token=token_value)
# Build Gmail service
self._gmail_service = build('gmail', 'v1', credentials=credentials)
logger.info("Gmail service initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize Gmail service", error=str(e))
raise
async def get_rate_limits(self) -> Dict[str, Any]:
"""Get current rate limit status for Gmail"""
# Gmail doesn't provide rate limit info in headers, so we return our tracking
status = await self.rate_limiter.get_all_status()
return status.get("gmail", {})
async def get_profile(self) -> Dict[str, Any]:
"""Get Gmail profile information"""
status = await self.rate_limiter.is_allowed("gmail", "profile")
if not status.allowed:
raise Exception(f"Rate limited. Retry after {status.retry_after}s")
try:
if not self._gmail_service:
await self._initialize_service()
response = await asyncio.get_event_loop().run_in_executor(
None, self._gmail_service.users().getProfile(userId='me').execute
)
await self.rate_limiter.record_response("gmail", "profile", True)
return {
"email_address": response.get("emailAddress"),
"messages_total": response.get("messagesTotal"),
"threads_total": response.get("threadsTotal"),
"history_id": response.get("historyId")
}
except Exception as e:
await self.rate_limiter.record_response("gmail", "profile", False)
logger.error(f"Failed to get Gmail profile", error=str(e))
raise
async def list_messages(
self,
query: Optional[str] = None,
label_ids: Optional[List[str]] = None,
max_results: int = 100,
page_token: Optional[str] = None,
include_spam_trash: bool = False
) -> Dict[str, Any]:
"""List Gmail messages with optional filtering"""
status = await self.rate_limiter.is_allowed("gmail", "list_messages")
if not status.allowed:
raise Exception(f"Rate limited. Retry after {status.retry_after}s")
try:
if not self._gmail_service:
await self._initialize_service()
# Build request parameters
params = {
'userId': 'me',
'maxResults': min(max_results, 500), # Gmail limit
'includeSpamTrash': include_spam_trash
}
if query:
params['q'] = query
if label_ids:
params['labelIds'] = label_ids
if page_token:
params['pageToken'] = page_token
response = await asyncio.get_event_loop().run_in_executor(
None, lambda: self._gmail_service.users().messages().list(**params).execute()
)
await self.rate_limiter.record_response("gmail", "list_messages", True)
return {
"messages": response.get("messages", []),
"next_page_token": response.get("nextPageToken"),
"result_size_estimate": response.get("resultSizeEstimate", 0)
}
except Exception as e:
await self.rate_limiter.record_response("gmail", "list_messages", False)
logger.error(f"Failed to list Gmail messages", error=str(e))
raise
async def get_message(
self,
message_id: str,
format: str = "full",
metadata_headers: Optional[List[str]] = None
) -> GmailMessage:
"""Get a specific Gmail message"""
status = await self.rate_limiter.is_allowed("gmail", "get_message")
if not status.allowed:
raise Exception(f"Rate limited. Retry after {status.retry_after}s")
try:
if not self._gmail_service:
await self._initialize_service()
params = {
'userId': 'me',
'id': message_id,
'format': format
}
if metadata_headers:
params['metadataHeaders'] = metadata_headers
response = await asyncio.get_event_loop().run_in_executor(
None, lambda: self._gmail_service.users().messages().get(**params).execute()
)
await self.rate_limiter.record_response("gmail", "get_message", True)
return GmailMessage.from_api_response(response)
except Exception as e:
await self.rate_limiter.record_response("gmail", "get_message", False)
logger.error(f"Failed to get Gmail message", message_id=message_id, error=str(e))
raise
async def get_messages_batch(
self,
message_ids: List[str],
format: str = "full"
) -> List[GmailMessage]:
"""Get multiple Gmail messages in batch"""
messages = []
# Process in batches to respect rate limits
batch_size = 10
for i in range(0, len(message_ids), batch_size):
batch_ids = message_ids[i:i + batch_size]
# Process batch concurrently but with rate limiting
batch_tasks = []
for msg_id in batch_ids:
task = self.get_message(msg_id, format)
batch_tasks.append(task)
batch_messages = await asyncio.gather(*batch_tasks, return_exceptions=True)
for result in batch_messages:
if isinstance(result, GmailMessage):
messages.append(result)
else:
logger.error(f"Failed to get message in batch", error=str(result))
return messages
async def search_messages(
self,
query: str,
max_results: int = 100,
include_body: bool = True
) -> List[GmailMessage]:
"""Search Gmail messages with query"""
# List messages matching query
message_list = await self.list_messages(query=query, max_results=max_results)
if not message_list["messages"]:
return []
# Get full message details
message_ids = [msg["id"] for msg in message_list["messages"]]
format_type = "full" if include_body else "metadata"
return await self.get_messages_batch(message_ids, format_type)
async def get_thread(self, thread_id: str, format: str = "full") -> GmailThread:
"""Get a Gmail thread with all messages"""
status = await self.rate_limiter.is_allowed("gmail", "get_thread")
if not status.allowed:
raise Exception(f"Rate limited. Retry after {status.retry_after}s")
try:
if not self._gmail_service:
await self._initialize_service()
response = await asyncio.get_event_loop().run_in_executor(
None, lambda: self._gmail_service.users().threads().get(
userId='me', id=thread_id, format=format
).execute()
)
await self.rate_limiter.record_response("gmail", "get_thread", True)
return GmailThread.from_api_response(response)
except Exception as e:
await self.rate_limiter.record_response("gmail", "get_thread", False)
logger.error(f"Failed to get Gmail thread", thread_id=thread_id, error=str(e))
raise
async def list_labels(self) -> List[GmailLabel]:
"""List all Gmail labels"""
status = await self.rate_limiter.is_allowed("gmail", "list_labels")
if not status.allowed:
raise Exception(f"Rate limited. Retry after {status.retry_after}s")
try:
if not self._gmail_service:
await self._initialize_service()
response = await asyncio.get_event_loop().run_in_executor(
None, self._gmail_service.users().labels().list(userId='me').execute
)
await self.rate_limiter.record_response("gmail", "list_labels", True)
labels = []
for label_data in response.get("labels", []):
label = GmailLabel(
id=label_data["id"],
name=label_data["name"],
type=label_data.get("type", "user"),
message_list_visibility=label_data.get("messageListVisibility", "show"),
label_list_visibility=label_data.get("labelListVisibility", "labelShow"),
color=label_data.get("color"),
messages_total=label_data.get("messagesTotal"),
messages_unread=label_data.get("messagesUnread"),
threads_total=label_data.get("threadsTotal"),
threads_unread=label_data.get("threadsUnread")
)
labels.append(label)
return labels
except Exception as e:
await self.rate_limiter.record_response("gmail", "list_labels", False)
logger.error(f"Failed to list Gmail labels", error=str(e))
raise
async def get_label(self, label_id: str) -> GmailLabel:
"""Get a specific Gmail label"""
status = await self.rate_limiter.is_allowed("gmail", "get_label")
if not status.allowed:
raise Exception(f"Rate limited. Retry after {status.retry_after}s")
try:
if not self._gmail_service:
await self._initialize_service()
response = await asyncio.get_event_loop().run_in_executor(
None, lambda: self._gmail_service.users().labels().get(
userId='me', id=label_id
).execute()
)
await self.rate_limiter.record_response("gmail", "get_label", True)
return GmailLabel(
id=response["id"],
name=response["name"],
type=response.get("type", "user"),
message_list_visibility=response.get("messageListVisibility", "show"),
label_list_visibility=response.get("labelListVisibility", "labelShow"),
color=response.get("color"),
messages_total=response.get("messagesTotal"),
messages_unread=response.get("messagesUnread"),
threads_total=response.get("threadsTotal"),
threads_unread=response.get("threadsUnread")
)
except Exception as e:
await self.rate_limiter.record_response("gmail", "get_label", False)
logger.error(f"Failed to get Gmail label", label_id=label_id, error=str(e))
raise
async def get_attachment(self, message_id: str, attachment_id: str) -> bytes:
"""Get message attachment data"""
status = await self.rate_limiter.is_allowed("gmail", "get_attachment")
if not status.allowed:
raise Exception(f"Rate limited. Retry after {status.retry_after}s")
try:
if not self._gmail_service:
await self._initialize_service()
response = await asyncio.get_event_loop().run_in_executor(
None, lambda: self._gmail_service.users().messages().attachments().get(
userId='me', messageId=message_id, id=attachment_id
).execute()
)
await self.rate_limiter.record_response("gmail", "get_attachment", True)
# Decode base64 attachment data
attachment_data = base64.urlsafe_b64decode(response['data'])
return attachment_data
except Exception as e:
await self.rate_limiter.record_response("gmail", "get_attachment", False)
logger.error(
f"Failed to get Gmail attachment",
message_id=message_id,
attachment_id=attachment_id,
error=str(e)
)
raise
async def watch_mailbox(
self,
topic_name: str,
label_ids: Optional[List[str]] = None,
label_filter_action: str = "include"
) -> Dict[str, Any]:
"""Set up push notifications for mailbox changes"""
status = await self.rate_limiter.is_allowed("gmail", "watch")
if not status.allowed:
raise Exception(f"Rate limited. Retry after {status.retry_after}s")
try:
if not self._gmail_service:
await self._initialize_service()
body = {
'topicName': topic_name,
'labelIds': label_ids or [],
'labelFilterAction': label_filter_action
}
response = await asyncio.get_event_loop().run_in_executor(
None, lambda: self._gmail_service.users().watch(
userId='me', body=body
).execute()
)
await self.rate_limiter.record_response("gmail", "watch", True)
return {
"history_id": response.get("historyId"),
"expiration": response.get("expiration")
}
except Exception as e:
await self.rate_limiter.record_response("gmail", "watch", False)
logger.error(f"Failed to setup Gmail watch", error=str(e))
raise
async def stop_watch(self) -> bool:
"""Stop push notifications"""
status = await self.rate_limiter.is_allowed("gmail", "stop")
if not status.allowed:
raise Exception(f"Rate limited. Retry after {status.retry_after}s")
try:
if not self._gmail_service:
await self._initialize_service()
await asyncio.get_event_loop().run_in_executor(
None, self._gmail_service.users().stop(userId='me').execute
)
await self.rate_limiter.record_response("gmail", "stop", True)
return True
except Exception as e:
await self.rate_limiter.record_response("gmail", "stop", False)
logger.error(f"Failed to stop Gmail watch", error=str(e))
return False
async def get_history(
self,
start_history_id: str,
max_results: int = 100,
label_id: Optional[str] = None,
page_token: Optional[str] = None
) -> Dict[str, Any]:
"""Get mailbox history changes"""
status = await self.rate_limiter.is_allowed("gmail", "history")
if not status.allowed:
raise Exception(f"Rate limited. Retry after {status.retry_after}s")
try:
if not self._gmail_service:
await self._initialize_service()
params = {
'userId': 'me',
'startHistoryId': start_history_id,
'maxResults': min(max_results, 500)
}
if label_id:
params['labelId'] = label_id
if page_token:
params['pageToken'] = page_token
response = await asyncio.get_event_loop().run_in_executor(
None, lambda: self._gmail_service.users().history().list(**params).execute()
)
await self.rate_limiter.record_response("gmail", "history", True)
return {
"history": response.get("history", []),
"next_page_token": response.get("nextPageToken"),
"history_id": response.get("historyId")
}
except Exception as e:
await self.rate_limiter.record_response("gmail", "history", False)
logger.error(f"Failed to get Gmail history", error=str(e))
raise
async def create_filter(
self,
criteria: Dict[str, Any],
action: Dict[str, Any]
) -> Dict[str, Any]:
"""Create a Gmail filter"""
# Note: This requires additional Gmail scopes beyond readonly
status = await self.rate_limiter.is_allowed("gmail", "create_filter")
if not status.allowed:
raise Exception(f"Rate limited. Retry after {status.retry_after}s")
try:
if not self._gmail_service:
await self._initialize_service()
body = {
'criteria': criteria,
'action': action
}
response = await asyncio.get_event_loop().run_in_executor(
None, lambda: self._gmail_service.users().settings().filters().create(
userId='me', body=body
).execute()
)
await self.rate_limiter.record_response("gmail", "create_filter", True)
return response
except Exception as e:
await self.rate_limiter.record_response("gmail", "create_filter", False)
logger.error(f"Failed to create Gmail filter", error=str(e))
raise
async def export_messages(
self,
output_path: Path,
query: Optional[str] = None,
format: str = "mbox",
max_messages: int = 1000
) -> int:
"""Export Gmail messages to file"""
logger.info(f"Starting Gmail export", output_path=str(output_path), query=query, format=format)
# List messages to export
all_messages = []
page_token = None
exported_count = 0
while len(all_messages) < max_messages:
batch_size = min(500, max_messages - len(all_messages))
message_list = await self.list_messages(
query=query,
max_results=batch_size,
page_token=page_token
)
if not message_list["messages"]:
break
# Get full message details for this batch
message_ids = [msg["id"] for msg in message_list["messages"]]
messages = await self.get_messages_batch(message_ids, "full")
all_messages.extend(messages)
page_token = message_list.get("next_page_token")
if not page_token:
break
# Export to file
output_path.parent.mkdir(parents=True, exist_ok=True)
if format.lower() == "mbox":
exported_count = await self._export_to_mbox(all_messages, output_path)
elif format.lower() == "json":
exported_count = await self._export_to_json(all_messages, output_path)
else:
raise ValueError(f"Unsupported export format: {format}")
logger.info(f"Gmail export completed", exported_count=exported_count, output_path=str(output_path))
return exported_count
async def _export_to_mbox(self, messages: List[GmailMessage], output_path: Path) -> int:
"""Export messages to mbox format"""
import mailbox
mbox = mailbox.mbox(str(output_path))
for message in messages:
# Create email message
msg = email.message.EmailMessage()
# Set headers
if message.subject:
msg['Subject'] = message.subject
if message.sender:
msg['From'] = message.sender
if message.recipients:
msg['To'] = ', '.join(message.recipients)
if message.cc:
msg['Cc'] = ', '.join(message.cc)
msg['Date'] = message.internal_date.strftime('%a, %d %b %Y %H:%M:%S %z')
msg['Message-ID'] = f"<{message.id}@gmail.com>"
# Set body
if message.body_text:
msg.set_content(message.body_text)
elif message.body_html:
msg.set_content(message.body_html, subtype='html')
# Add to mbox
mbox.add(msg)
mbox.close()
return len(messages)
async def _export_to_json(self, messages: List[GmailMessage], output_path: Path) -> int:
"""Export messages to JSON format"""
export_data = []
for message in messages:
msg_data = {
"id": message.id,
"thread_id": message.thread_id,
"subject": message.subject,
"sender": message.sender,
"recipients": message.recipients,
"cc": message.cc,
"bcc": message.bcc,
"date": message.internal_date.isoformat(),
"body_text": message.body_text,
"body_html": message.body_html,
"snippet": message.snippet,
"labels": message.label_ids,
"attachments": [
{
"filename": att["filename"],
"mime_type": att["mime_type"],
"size": att["size"]
}
for att in message.attachments
]
}
export_data.append(msg_data)
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(export_data, f, indent=2, ensure_ascii=False)
return len(messages)