sqlite-explorer-fastmcp-mcp-server
by hannesrudolph
Verified
- src
- mcp_gsuite
from googleapiclient.discovery import build
from . import gauth
import logging
import base64
import traceback
from email.mime.text import MIMEText
from typing import Tuple
class GmailService():
def __init__(self, user_id: str):
credentials = gauth.get_stored_credentials(user_id=user_id)
if not credentials:
raise RuntimeError("No Oauth2 credentials stored")
self.service = build('gmail', 'v1', credentials=credentials)
def _parse_message(self, txt, parse_body=False) -> dict | None:
"""
Parse a Gmail message into a structured format.
Args:
txt (dict): Raw message from Gmail API
parse_body (bool): Whether to parse and include the message body (default: False)
Returns:
dict: Parsed message containing comprehensive metadata
None: If parsing fails
"""
try:
message_id = txt.get('id')
thread_id = txt.get('threadId')
payload = txt.get('payload', {})
headers = payload.get('headers', [])
metadata = {
'id': message_id,
'threadId': thread_id,
'historyId': txt.get('historyId'),
'internalDate': txt.get('internalDate'),
'sizeEstimate': txt.get('sizeEstimate'),
'labelIds': txt.get('labelIds', []),
'snippet': txt.get('snippet'),
}
for header in headers:
name = header.get('name', '').lower()
value = header.get('value', '')
if name == 'subject':
metadata['subject'] = value
elif name == 'from':
metadata['from'] = value
elif name == 'to':
metadata['to'] = value
elif name == 'date':
metadata['date'] = value
elif name == 'cc':
metadata['cc'] = value
elif name == 'bcc':
metadata['bcc'] = value
elif name == 'message-id':
metadata['message_id'] = value
elif name == 'in-reply-to':
metadata['in_reply_to'] = value
elif name == 'references':
metadata['references'] = value
elif name == 'delivered-to':
metadata['delivered_to'] = value
if parse_body:
body = self._extract_body(payload)
if body:
metadata['body'] = body
metadata['mimeType'] = payload.get('mimeType')
return metadata
except Exception as e:
logging.error(f"Error parsing message: {str(e)}")
logging.error(traceback.format_exc())
return None
def _extract_body(self, payload) -> str | None:
"""
Extract the email body from the payload.
Handles both multipart and single part messages, including nested multiparts.
"""
try:
# For single part text/plain messages
if payload.get('mimeType') == 'text/plain':
data = payload.get('body', {}).get('data')
if data:
return base64.urlsafe_b64decode(data).decode('utf-8')
# For multipart messages (both alternative and related)
if payload.get('mimeType', '').startswith('multipart/'):
parts = payload.get('parts', [])
# First try to find a direct text/plain part
for part in parts:
if part.get('mimeType') == 'text/plain':
data = part.get('body', {}).get('data')
if data:
return base64.urlsafe_b64decode(data).decode('utf-8')
# If no direct text/plain, recursively check nested multipart structures
for part in parts:
if part.get('mimeType', '').startswith('multipart/'):
nested_body = self._extract_body(part)
if nested_body:
return nested_body
# If still no body found, try the first part as fallback
if parts and 'body' in parts[0] and 'data' in parts[0]['body']:
data = parts[0]['body']['data']
return base64.urlsafe_b64decode(data).decode('utf-8')
return None
except Exception as e:
logging.error(f"Error extracting body: {str(e)}")
return None
def query_emails(self, query=None, max_results=100):
"""
Query emails from Gmail based on a search query.
Args:
query (str, optional): Gmail search query (e.g., 'is:unread', 'from:example@gmail.com')
If None, returns all emails
max_results (int): Maximum number of emails to retrieve (1-500, default: 100)
Returns:
list: List of parsed email messages, newest first
"""
try:
# Ensure max_results is within API limits
max_results = min(max(1, max_results), 500)
# Get the list of messages
result = self.service.users().messages().list(
userId='me',
maxResults=max_results,
q=query if query else ''
).execute()
messages = result.get('messages', [])
parsed = []
# Fetch full message details for each message
for msg in messages:
txt = self.service.users().messages().get(
userId='me',
id=msg['id']
).execute()
parsed_message = self._parse_message(txt=txt, parse_body=False)
if parsed_message:
parsed.append(parsed_message)
return parsed
except Exception as e:
logging.error(f"Error reading emails: {str(e)}")
logging.error(traceback.format_exc())
return []
def get_email_by_id_with_attachments(self, email_id: str) -> Tuple[dict, dict] | Tuple[None, dict]:
"""
Fetch and parse a complete email message by its ID including attachment IDs.
Args:
email_id (str): The Gmail message ID to retrieve
Returns:
Tuple[dict, list]: Complete parsed email message including body and list of attachment IDs
Tuple[None, list]: If retrieval or parsing fails, returns None for email and empty list for attachment IDs
"""
try:
# Fetch the complete message by ID
message = self.service.users().messages().get(
userId='me',
id=email_id
).execute()
# Parse the message with body included
parsed_email = self._parse_message(txt=message, parse_body=True)
if parsed_email is None:
return None, []
attachments = {}
for part in message["payload"]["parts"]:
if "attachmentId" in part["body"]:
attachment_id = part["body"]["attachmentId"]
part_id = part["partId"]
attachment = {
"filename": part["filename"],
"mimeType": part["mimeType"],
"attachmentId": attachment_id,
"partId": part_id
}
attachments[part_id] = attachment
return parsed_email, attachments
except Exception as e:
logging.error(f"Error retrieving email {email_id}: {str(e)}")
logging.error(traceback.format_exc())
return None, []
def create_draft(self, to: str, subject: str, body: str, cc: list[str] | None = None) -> dict | None:
"""
Create a draft email message.
Args:
to (str): Email address of the recipient
subject (str): Subject line of the email
body (str): Body content of the email
cc (list[str], optional): List of email addresses to CC
Returns:
dict: Draft message data including the draft ID if successful
None: If creation fails
"""
try:
# Create message body
message = {
'to': to,
'subject': subject,
'text': body,
}
if cc:
message['cc'] = ','.join(cc)
# Create the message in MIME format
mime_message = MIMEText(body)
mime_message['to'] = to
mime_message['subject'] = subject
if cc:
mime_message['cc'] = ','.join(cc)
# Encode the message
raw_message = base64.urlsafe_b64encode(mime_message.as_bytes()).decode('utf-8')
# Create the draft
draft = self.service.users().drafts().create(
userId='me',
body={
'message': {
'raw': raw_message
}
}
).execute()
return draft
except Exception as e:
logging.error(f"Error creating draft: {str(e)}")
logging.error(traceback.format_exc())
return None
def delete_draft(self, draft_id: str) -> bool:
"""
Delete a draft email message.
Args:
draft_id (str): The ID of the draft to delete
Returns:
bool: True if deletion was successful, False otherwise
"""
try:
self.service.users().drafts().delete(
userId='me',
id=draft_id
).execute()
return True
except Exception as e:
logging.error(f"Error deleting draft {draft_id}: {str(e)}")
logging.error(traceback.format_exc())
return False
def create_reply(self, original_message: dict, reply_body: str, send: bool = False, cc: list[str] | None = None) -> dict | None:
"""
Create a reply to an email message and either send it or save as draft.
Args:
original_message (dict): The original message data (as returned by get_email_by_id)
reply_body (str): Body content of the reply
send (bool): If True, sends the reply immediately. If False, saves as draft.
cc (list[str], optional): List of email addresses to CC
Returns:
dict: Sent message or draft data if successful
None: If operation fails
"""
try:
to_address = original_message.get('from')
if not to_address:
raise ValueError("Could not determine original sender's address")
subject = original_message.get('subject', '')
if not subject.lower().startswith('re:'):
subject = f"Re: {subject}"
original_date = original_message.get('date', '')
original_from = original_message.get('from', '')
original_body = original_message.get('body', '')
full_reply_body = (
f"{reply_body}\n\n"
f"On {original_date}, {original_from} wrote:\n"
f"> {original_body.replace('\n', '\n> ') if original_body else '[No message body]'}"
)
mime_message = MIMEText(full_reply_body)
mime_message['to'] = to_address
mime_message['subject'] = subject
if cc:
mime_message['cc'] = ','.join(cc)
mime_message['In-Reply-To'] = original_message.get('id', '')
mime_message['References'] = original_message.get('id', '')
raw_message = base64.urlsafe_b64encode(mime_message.as_bytes()).decode('utf-8')
message_body = {
'raw': raw_message,
'threadId': original_message.get('threadId') # Ensure it's added to the same thread
}
if send:
# Send the reply immediately
result = self.service.users().messages().send(
userId='me',
body=message_body
).execute()
else:
# Save as draft
result = self.service.users().drafts().create(
userId='me',
body={
'message': message_body
}
).execute()
return result
except Exception as e:
logging.error(f"Error {'sending' if send else 'drafting'} reply: {str(e)}")
logging.error(traceback.format_exc())
return None
def get_attachment(self, message_id: str, attachment_id: str) -> dict | None:
"""
Retrieves a Gmail attachment by its ID.
Args:
message_id (str): The ID of the Gmail message containing the attachment
attachment_id (str): The ID of the attachment to retrieve
Returns:
dict: Attachment data including filename and base64-encoded content
None: If retrieval fails
"""
try:
attachment = self.service.users().messages().attachments().get(
userId='me',
messageId=message_id,
id=attachment_id
).execute()
return {
"size": attachment.get("size"),
"data": attachment.get("data")
}
except Exception as e:
logging.error(f"Error retrieving attachment {attachment_id} from message {message_id}: {str(e)}")
logging.error(traceback.format_exc())
return None