sqlite-explorer-fastmcp-mcp-server
by hannesrudolph
Verified
- src
- mcp_gsuite
from collections.abc import Sequence
from mcp.types import (
Tool,
TextContent,
ImageContent,
EmbeddedResource,
LoggingLevel,
)
from . import gmail
import json
from . import toolhandler
import base64
def decode_base64_data(file_data):
standard_base64_data = file_data.replace("-", "+").replace("_", "/")
missing_padding = len(standard_base64_data) % 4
if missing_padding:
standard_base64_data += '=' * (4 - missing_padding)
return base64.b64decode(standard_base64_data, validate=True)
class QueryEmailsToolHandler(toolhandler.ToolHandler):
def __init__(self):
super().__init__("query_gmail_emails")
def get_tool_description(self) -> Tool:
return Tool(
name=self.name,
description="""Query Gmail emails based on an optional search query.
Returns emails in reverse chronological order (newest first).
Returns metadata such as subject and also a short summary of the content.
""",
inputSchema={
"type": "object",
"properties": {
"__user_id__": self.get_user_id_arg_schema(),
"query": {
"type": "string",
"description": """Gmail search query (optional). Examples:
- a $string: Search email body, subject, and sender information for $string
- 'is:unread' for unread emails
- 'from:example@gmail.com' for emails from a specific sender
- 'newer_than:2d' for emails from last 2 days
- 'has:attachment' for emails with attachments
If not provided, returns recent emails without filtering.""",
"required": False
},
"max_results": {
"type": "integer",
"description": "Maximum number of emails to retrieve (1-500)",
"minimum": 1,
"maximum": 500,
"default": 100
}
},
"required": [toolhandler.USER_ID_ARG]
}
)
def run_tool(self, args: dict) -> Sequence[TextContent | ImageContent | EmbeddedResource]:
user_id = args.get(toolhandler.USER_ID_ARG)
if not user_id:
raise RuntimeError(f"Missing required argument: {toolhandler.USER_ID_ARG}")
gmail_service = gmail.GmailService(user_id=user_id)
query = args.get('query')
max_results = args.get('max_results', 100)
emails = gmail_service.query_emails(query=query, max_results=max_results)
return [
TextContent(
type="text",
text=json.dumps(emails, indent=2)
)
]
class GetEmailByIdToolHandler(toolhandler.ToolHandler):
def __init__(self):
super().__init__("get_gmail_email")
def get_tool_description(self) -> Tool:
return Tool(
name=self.name,
description="Retrieves a complete Gmail email message by its ID, including the full message body and attachment IDs.",
inputSchema={
"type": "object",
"properties": {
"__user_id__": self.get_user_id_arg_schema(),
"email_id": {
"type": "string",
"description": "The ID of the Gmail message to retrieve"
}
},
"required": ["email_id", toolhandler.USER_ID_ARG]
}
)
def run_tool(self, args: dict) -> Sequence[TextContent | ImageContent | EmbeddedResource]:
if "email_id" not in args:
raise RuntimeError("Missing required argument: email_id")
user_id = args.get(toolhandler.USER_ID_ARG)
if not user_id:
raise RuntimeError(f"Missing required argument: {toolhandler.USER_ID_ARG}")
gmail_service = gmail.GmailService(user_id=user_id)
email, attachments = gmail_service.get_email_by_id_with_attachments(args["email_id"])
if email is None:
return [
TextContent(
type="text",
text=f"Failed to retrieve email with ID: {args['email_id']}"
)
]
email["attachments"] = attachments
return [
TextContent(
type="text",
text=json.dumps(email, indent=2)
)
]
class BulkGetEmailsByIdsToolHandler(toolhandler.ToolHandler):
def __init__(self):
super().__init__("bulk_get_gmail_emails")
def get_tool_description(self) -> Tool:
return Tool(
name=self.name,
description="Retrieves multiple Gmail email messages by their IDs in a single request, including the full message bodies and attachment IDs.",
inputSchema={
"type": "object",
"properties": {
"__user_id__": self.get_user_id_arg_schema(),
"email_ids": {
"type": "array",
"items": {
"type": "string"
},
"description": "List of Gmail message IDs to retrieve"
}
},
"required": ["email_ids", toolhandler.USER_ID_ARG]
}
)
def run_tool(self, args: dict) -> Sequence[TextContent | ImageContent | EmbeddedResource]:
if "email_ids" not in args:
raise RuntimeError("Missing required argument: email_ids")
user_id = args.get(toolhandler.USER_ID_ARG)
if not user_id:
raise RuntimeError(f"Missing required argument: {toolhandler.USER_ID_ARG}")
gmail_service = gmail.GmailService(user_id=user_id)
results = []
for email_id in args["email_ids"]:
email, attachments = gmail_service.get_email_by_id_with_attachments(email_id)
if email is not None:
email["attachments"] = attachments
results.append(email)
if not results:
return [
TextContent(
type="text",
text=f"Failed to retrieve any emails from the provided IDs"
)
]
return [
TextContent(
type="text",
text=json.dumps(results, indent=2)
)
]
class CreateDraftToolHandler(toolhandler.ToolHandler):
def __init__(self):
super().__init__("create_gmail_draft")
def get_tool_description(self) -> Tool:
return Tool(
name=self.name,
description="""Creates a draft email message from scratch in Gmail with specified recipient, subject, body, and optional CC recipients.
Do NOT use this tool when you want to draft or send a REPLY to an existing message. This tool does NOT include any previous message content. Use the reply_gmail_email tool
with send=False instead."
""",
inputSchema={
"type": "object",
"properties": {
"__user_id__": self.get_user_id_arg_schema(),
"to": {
"type": "string",
"description": "Email address of the recipient"
},
"subject": {
"type": "string",
"description": "Subject line of the email"
},
"body": {
"type": "string",
"description": "Body content of the email"
},
"cc": {
"type": "array",
"items": {
"type": "string"
},
"description": "Optional list of email addresses to CC"
}
},
"required": ["to", "subject", "body", toolhandler.USER_ID_ARG]
}
)
def run_tool(self, args: dict) -> Sequence[TextContent | ImageContent | EmbeddedResource]:
required = ["to", "subject", "body"]
if not all(key in args for key in required):
raise RuntimeError(f"Missing required arguments: {', '.join(required)}")
user_id = args.get(toolhandler.USER_ID_ARG)
if not user_id:
raise RuntimeError(f"Missing required argument: {toolhandler.USER_ID_ARG}")
gmail_service = gmail.GmailService(user_id=user_id)
draft = gmail_service.create_draft(
to=args["to"],
subject=args["subject"],
body=args["body"],
cc=args.get("cc")
)
if draft is None:
return [
TextContent(
type="text",
text="Failed to create draft email"
)
]
return [
TextContent(
type="text",
text=json.dumps(draft, indent=2)
)
]
class DeleteDraftToolHandler(toolhandler.ToolHandler):
def __init__(self):
super().__init__("delete_gmail_draft")
def get_tool_description(self) -> Tool:
return Tool(
name=self.name,
description="Deletes a Gmail draft message by its ID. This action cannot be undone.",
inputSchema={
"type": "object",
"properties": {
"__user_id__": self.get_user_id_arg_schema(),
"draft_id": {
"type": "string",
"description": "The ID of the draft to delete"
}
},
"required": ["draft_id", toolhandler.USER_ID_ARG]
}
)
def run_tool(self, args: dict) -> Sequence[TextContent | ImageContent | EmbeddedResource]:
if "draft_id" not in args:
raise RuntimeError("Missing required argument: draft_id")
user_id = args.get(toolhandler.USER_ID_ARG)
if not user_id:
raise RuntimeError(f"Missing required argument: {toolhandler.USER_ID_ARG}")
gmail_service = gmail.GmailService(user_id=user_id)
success = gmail_service.delete_draft(args["draft_id"])
return [
TextContent(
type="text",
text="Successfully deleted draft" if success else f"Failed to delete draft with ID: {args['draft_id']}"
)
]
class ReplyEmailToolHandler(toolhandler.ToolHandler):
def __init__(self):
super().__init__("reply_gmail_email")
def get_tool_description(self) -> Tool:
return Tool(
name=self.name,
description="""Creates a reply to an existing Gmail email message and either sends it or saves as draft.
Use this tool if you want to draft a reply. Use the 'cc' argument if you want to perform a "reply all".
""",
inputSchema={
"type": "object",
"properties": {
"__user_id__": self.get_user_id_arg_schema(),
"original_message_id": {
"type": "string",
"description": "The ID of the Gmail message to reply to"
},
"reply_body": {
"type": "string",
"description": "The body content of your reply message"
},
"send": {
"type": "boolean",
"description": "If true, sends the reply immediately. If false, saves as draft.",
"default": False
},
"cc": {
"type": "array",
"items": {
"type": "string"
},
"description": "Optional list of email addresses to CC on the reply"
}
},
"required": ["original_message_id", "reply_body", toolhandler.USER_ID_ARG]
}
)
def run_tool(self, args: dict) -> Sequence[TextContent | ImageContent | EmbeddedResource]:
if not all(key in args for key in ["original_message_id", "reply_body"]):
raise RuntimeError("Missing required arguments: original_message_id and reply_body")
user_id = args.get(toolhandler.USER_ID_ARG)
if not user_id:
raise RuntimeError(f"Missing required argument: {toolhandler.USER_ID_ARG}")
gmail_service = gmail.GmailService(user_id=user_id)
# First get the original message to extract necessary information
original_message = gmail_service.get_email_by_id(args["original_message_id"])
if original_message is None:
return [
TextContent(
type="text",
text=f"Failed to retrieve original message with ID: {args['original_message_id']}"
)
]
# Create and send/draft the reply
result = gmail_service.create_reply(
original_message=original_message,
reply_body=args.get("reply_body", ""),
send=args.get("send", False),
cc=args.get("cc")
)
if result is None:
return [
TextContent(
type="text",
text=f"Failed to {'send' if args.get('send', True) else 'draft'} reply email"
)
]
return [
TextContent(
type="text",
text=json.dumps(result, indent=2)
)
]
class GetAttachmentToolHandler(toolhandler.ToolHandler):
def __init__(self):
super().__init__("get_gmail_attachment")
def get_tool_description(self) -> Tool:
return Tool(
name=self.name,
description="Retrieves a Gmail attachment by its ID.",
inputSchema={
"type": "object",
"properties": {
"__user_id__": self.get_user_id_arg_schema(),
"message_id": {
"type": "string",
"description": "The ID of the Gmail message containing the attachment"
},
"attachment_id": {
"type": "string",
"description": "The ID of the attachment to retrieve"
},
"mime_type": {
"type": "string",
"description": "The MIME type of the attachment"
},
"filename": {
"type": "string",
"description": "The filename of the attachment"
},
"save_to_disk": {
"type": "string",
"description": "The fullpath to save the attachment to disk. If not provided, the attachment is returned as a resource."
}
},
"required": ["message_id", "attachment_id", "mime_type", "filename", toolhandler.USER_ID_ARG]
}
)
def run_tool(self, args: dict) -> Sequence[TextContent | ImageContent | EmbeddedResource]:
if "message_id" not in args:
raise RuntimeError("Missing required argument: message_id")
if "attachment_id" not in args:
raise RuntimeError("Missing required argument: attachment_id")
if "mime_type" not in args:
raise RuntimeError("Missing required argument: mime_type")
if "filename" not in args:
raise RuntimeError("Missing required argument: filename")
filename = args["filename"]
mime_type = args["mime_type"]
user_id = args.get(toolhandler.USER_ID_ARG)
if not user_id:
raise RuntimeError(f"Missing required argument: {toolhandler.USER_ID_ARG}")
gmail_service = gmail.GmailService(user_id=user_id)
attachment_data = gmail_service.get_attachment(args["message_id"], args["attachment_id"])
if attachment_data is None:
return [
TextContent(
type="text",
text=f"Failed to retrieve attachment with ID: {args['attachment_id']} from message: {args['message_id']}"
)
]
file_data = attachment_data["data"]
attachment_url = f"attachment://gmail/{args['message_id']}/{args['attachment_id']}/{filename}"
if args.get("save_to_disk"):
decoded_data = decode_base64_data(file_data)
with open(args["save_to_disk"], "wb") as f:
f.write(decoded_data)
return [
TextContent(
type="text",
text=f"Attachment saved to disk: {args['save_to_disk']}"
)
]
return [
EmbeddedResource(
type="resource",
resource={
"blob": file_data,
"uri": attachment_url,
"mimeType": mime_type,
},
)
]
class BulkSaveAttachmentsToolHandler(toolhandler.ToolHandler):
def __init__(self):
super().__init__("bulk_save_gmail_attachments")
def get_tool_description(self) -> Tool:
return Tool(
name=self.name,
description="Saves multiple Gmail attachments to disk by their message IDs and attachment IDs in a single request.",
inputSchema={
"type": "object",
"properties": {
"__user_id__": self.get_user_id_arg_schema(),
"attachments": {
"type": "array",
"items": {
"type": "object",
"properties": {
"message_id": {
"type": "string",
"description": "ID of the Gmail message containing the attachment"
},
"part_id": {
"type": "string",
"description": "ID of the part containing the attachment"
},
"save_path": {
"type": "string",
"description": "Path where the attachment should be saved"
}
},
"required": ["message_id", "part_id", "save_path"]
}
}
},
"required": ["attachments", toolhandler.USER_ID_ARG]
}
)
def run_tool(self, args: dict) -> Sequence[TextContent | ImageContent | EmbeddedResource]:
if "attachments" not in args:
raise RuntimeError("Missing required argument: attachments")
user_id = args.get(toolhandler.USER_ID_ARG)
if not user_id:
raise RuntimeError(f"Missing required argument: {toolhandler.USER_ID_ARG}")
gmail_service = gmail.GmailService(user_id=user_id)
results = []
for attachment_info in args["attachments"]:
# get attachment data from message_id and part_id
message, attachments = gmail_service.get_email_by_id_with_attachments(
attachment_info["message_id"]
)
if message is None:
results.append(
TextContent(
type="text",
text=f"Failed to retrieve message with ID: {attachment_info['message_id']}"
)
)
continue
# get attachment_id from part_id
attachment_id = attachments[attachment_info["part_id"]]["attachmentId"]
attachment_data = gmail_service.get_attachment(
attachment_info["message_id"],
attachment_id
)
if attachment_data is None:
results.append(
TextContent(
type="text",
text=f"Failed to retrieve attachment with ID: {attachment_id} from message: {attachment_info['message_id']}"
)
)
continue
file_data = attachment_data["data"]
try:
decoded_data = decode_base64_data(file_data)
with open(attachment_info["save_path"], "wb") as f:
f.write(decoded_data)
results.append(
TextContent(
type="text",
text=f"Attachment saved to: {attachment_info['save_path']}"
)
)
except Exception as e:
results.append(
TextContent(
type="text",
text=f"Failed to save attachment to {attachment_info['save_path']}: {str(e)}"
)
)
continue
return results