server.py•34.5 kB
from typing import Any
import argparse
import os
import asyncio
import logging
import base64
from email.message import EmailMessage
from email.header import decode_header
from base64 import urlsafe_b64decode
from email import message_from_bytes
import webbrowser
from mcp.server.models import InitializationOptions
import mcp.types as types
from mcp.server import NotificationOptions, Server
import mcp.server.stdio
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
EMAIL_ADMIN_PROMPTS = """You are an email administrator.
You can draft, edit, read, trash, open, and send emails.
You've been given access to a specific gmail account.
You have the following tools available:
- Send an email (send-email)
- Retrieve unread emails (get-unread-emails)
- Read email content (read-email)
- Trash email (tras-email)
- Open email in browser (open-email)
Never send an email draft or trash an email unless the user confirms first.
Always ask for approval if not already given.
"""
# Define available prompts
PROMPTS = {
"manage-email": types.Prompt(
name="manage-email",
description="Act like an email administator",
arguments=None,
),
"draft-email": types.Prompt(
name="draft-email",
description="Draft an email with cotent and recipient",
arguments=[
types.PromptArgument(
name="content",
description="What the email is about",
required=True
),
types.PromptArgument(
name="recipient",
description="Who should the email be addressed to",
required=True
),
types.PromptArgument(
name="recipient_email",
description="Recipient's email address",
required=True
),
],
),
"edit-draft": types.Prompt(
name="edit-draft",
description="Edit the existing email draft",
arguments=[
types.PromptArgument(
name="changes",
description="What changes should be made to the draft",
required=True
),
types.PromptArgument(
name="current_draft",
description="The current draft to edit",
required=True
),
],
),
}
def decode_mime_header(header: str) -> str:
"""Helper function to decode encoded email headers"""
decoded_parts = decode_header(header)
decoded_string = ''
for part, encoding in decoded_parts:
if isinstance(part, bytes):
# Decode bytes to string using the specified encoding
decoded_string += part.decode(encoding or 'utf-8')
else:
# Already a string
decoded_string += part
return decoded_string
class GmailService:
def __init__(self,
creds_file_path: str,
token_path: str ,
scopes: list[str] = ['https://www.googleapis.com/auth/gmail.modify']):
logger.info(f"Initializing GmailService with creds file: {creds_file_path}")
self.creds_file_path = creds_file_path
self.token_path = token_path
self.scopes = scopes
self.token = self._get_token()
logger.info("Token retrieved successfully")
self.service = self._get_service()
logger.info("Gmail service initialized")
self.user_email = self._get_user_email()
logger.info(f"User email retrieved: {self.user_email}")
def _get_token(self) -> Credentials:
"""Get or refresh Google API token"""
token = None
if os.path.exists(self.token_path):
logger.info('Loading token from file')
token = Credentials.from_authorized_user_file(self.token_path, self.scopes)
if not token or not token.valid:
if token and token.expired and token.refresh_token:
logger.info('Refreshing token')
token.refresh(Request())
else:
logger.info('Fetching new token')
flow = InstalledAppFlow.from_client_secrets_file(self.creds_file_path, self.scopes)
token = flow.run_local_server(port=0)
with open(self.token_path, 'w') as token_file:
token_file.write(token.to_json())
logger.info(f'Token saved to {self.token_path}')
return token
def _get_service(self) -> Any:
"""Initialize Gmail API service"""
try:
service = build('gmail', 'v1', credentials=self.token)
return service
except HttpError as error:
logger.error(f'An error occurred building Gmail service: {error}')
raise ValueError(f'An error occurred: {error}')
def _get_user_email(self) -> str:
"""Get user email address"""
profile = self.service.users().getProfile(userId='me').execute()
user_email = profile.get('emailAddress', '')
return user_email
async def send_email(self, recipient_id: str, subject: str, message: str,) -> dict:
"""Creates and sends an email message"""
try:
message_obj = EmailMessage()
message_obj.set_content(message)
message_obj['To'] = recipient_id
message_obj['From'] = self.user_email
message_obj['Subject'] = subject
encoded_message = base64.urlsafe_b64encode(message_obj.as_bytes()).decode()
create_message = {'raw': encoded_message}
send_message = await asyncio.to_thread(
self.service.users().messages().send(userId="me", body=create_message).execute
)
logger.info(f"Message sent: {send_message['id']}")
return {"status": "success", "message_id": send_message["id"]}
except HttpError as error:
return {"status": "error", "error_message": str(error)}
async def open_email(self, email_id: str) -> str:
"""Opens email in browser given ID."""
try:
url = f"https://mail.google.com/#all/{email_id}"
webbrowser.open(url, new=0, autoraise=True)
return "Email opened in browser successfully."
except HttpError as error:
return f"An HttpError occurred: {str(error)}"
async def get_unread_emails(self) -> list[dict]:
"""
Retrieves unread messages from mailbox.
Returns a list of messages with their details.
"""
try:
user_id = 'me'
query = 'in:inbox is:unread category:primary'
# Get message IDs first
response = self.service.users().messages().list(
userId=user_id,
q=query,
maxResults=10 # Limit to prevent timeout
).execute()
message_ids = []
if 'messages' in response:
message_ids.extend(response['messages'])
# Get actual message details
messages = []
for msg_id in message_ids:
msg = self.service.users().messages().get(
userId=user_id,
id=msg_id['id'],
format='metadata', # Use 'full' if you need the complete message
metadataHeaders=['Subject', 'From', 'Date']
).execute()
# Extract and format relevant information
headers = msg.get('payload', {}).get('headers', [])
email_data = {
'id': msg['id'],
'threadId': msg['threadId'],
'subject': next((h['value'] for h in headers if h['name'] == 'Subject'), 'No Subject'),
'from': next((h['value'] for h in headers if h['name'] == 'From'), 'Unknown'),
'date': next((h['value'] for h in headers if h['name'] == 'Date'), 'Unknown'),
'snippet': msg.get('snippet', '')
}
messages.append(email_data)
return messages
except HttpError as error:
print(f"An HttpError occurred: {str(error)}")
return []
except Exception as e:
print(f"An unexpected error occurred: {str(e)}")
return []
async def read_email(self, email_id: str) -> dict[str, str]| str:
"""Retrieves email contents including to, from, subject, and contents."""
try:
msg = self.service.users().messages().get(userId="me", id=email_id, format='raw').execute()
email_metadata = {}
# Decode the base64URL encoded raw content
raw_data = msg['raw']
decoded_data = urlsafe_b64decode(raw_data)
# Parse the RFC 2822 email
mime_message = message_from_bytes(decoded_data)
# Extract the email body
body = None
if mime_message.is_multipart():
for part in mime_message.walk():
# Extract the text/plain part
if part.get_content_type() == "text/plain":
body = part.get_payload(decode=True).decode()
break
else:
# For non-multipart messages
body = mime_message.get_payload(decode=True).decode()
email_metadata['content'] = body
# Extract metadata
email_metadata['subject'] = decode_mime_header(mime_message.get('subject', ''))
email_metadata['from'] = mime_message.get('from','')
email_metadata['to'] = mime_message.get('to','')
email_metadata['date'] = mime_message.get('date','')
logger.info(f"Email read: {email_id}")
# We want to mark email as read once we read it
await self.mark_email_as_read(email_id)
return email_metadata
except HttpError as error:
return f"An HttpError occurred: {str(error)}"
async def trash_email(self, email_id: str) -> str:
"""Moves email to trash given ID."""
try:
self.service.users().messages().trash(userId="me", id=email_id).execute()
logger.info(f"Email moved to trash: {email_id}")
return "Email moved to trash successfully."
except HttpError as error:
return f"An HttpError occurred: {str(error)}"
async def mark_email_as_read(self, email_id: str) -> str:
"""Marks email as read given ID."""
try:
self.service.users().messages().modify(userId="me", id=email_id, body={'removeLabelIds': ['UNREAD']}).execute()
logger.info(f"Email marked as read: {email_id}")
return "Email marked as read."
except HttpError as error:
return f"An HttpError occurred: {str(error)}"
async def main(creds_file_path: str,
token_path: str):
gmail_service = GmailService(creds_file_path, token_path)
server = Server("gmail")
@server.list_prompts()
async def list_prompts() -> list[types.Prompt]:
return list(PROMPTS.values())
@server.get_prompt()
async def get_prompt(
name: str, arguments: dict[str, str] | None = None
) -> types.GetPromptResult:
if name not in PROMPTS:
raise ValueError(f"Prompt not found: {name}")
if name == "manage-email":
return types.GetPromptResult(
messages=[
types.PromptMessage(
role="user",
content=types.TextContent(
type="text",
text=EMAIL_ADMIN_PROMPTS,
)
)
]
)
if name == "draft-email":
content = arguments.get("content", "")
recipient = arguments.get("recipient", "")
recipient_email = arguments.get("recipient_email", "")
# First message asks the LLM to create the draft
return types.GetPromptResult(
messages=[
types.PromptMessage(
role="user",
content=types.TextContent(
type="text",
text=f"""Please draft an email about {content} for {recipient} ({recipient_email}).
Include a subject line starting with 'Subject:' on the first line.
Do not send the email yet, just draft it and ask the user for their thoughts."""
)
)
]
)
elif name == "edit-draft":
changes = arguments.get("changes", "")
current_draft = arguments.get("current_draft", "")
# Edit existing draft based on requested changes
return types.GetPromptResult(
messages=[
types.PromptMessage(
role="user",
content=types.TextContent(
type="text",
text=f"""Please revise the current email draft:
{current_draft}
Requested changes:
{changes}
Please provide the updated draft."""
)
)
]
)
raise ValueError("Prompt implementation not found")
@server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
return [
types.Tool(
name="send-email",
description="""Sends email to recipient.
Do not use if user only asked to draft email.
Drafts must be approved before sending.""",
inputSchema={
"type": "object",
"properties": {
"recipient_id": {
"type": "string",
"description": "Recipient email address",
},
"subject": {
"type": "string",
"description": "Email subject",
},
"message": {
"type": "string",
"description": "Email content text",
},
},
"required": ["recipient_id", "subject", "message"],
},
),
types.Tool(
name="trash-email",
description="""Moves email to trash.
Confirm before moving email to trash.""",
inputSchema={
"type": "object",
"properties": {
"email_id": {
"type": "string",
"description": "Email ID",
},
},
"required": ["email_id"],
},
),
types.Tool(
name="get-unread-emails",
description="Retrieve unread emails",
inputSchema={
"type": "object",
"properties": {},
"required": []
},
),
types.Tool(
name="read-email",
description="Retrieves given email content",
inputSchema={
"type": "object",
"properties": {
"email_id": {
"type": "string",
"description": "Email ID",
},
},
"required": ["email_id"],
},
),
types.Tool(
name="mark-email-as-read",
description="Marks given email as read",
inputSchema={
"type": "object",
"properties": {
"email_id": {
"type": "string",
"description": "Email ID",
},
},
"required": ["email_id"],
},
),
types.Tool(
name="open-email",
description="Open email in browser",
inputSchema={
"type": "object",
"properties": {
"email_id": {
"type": "string",
"description": "Email ID",
},
},
"required": ["email_id"],
},
),
]
@server.call_tool()
async def handle_call_tool(
name: str, arguments: dict | None
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
if name == "send-email":
recipient = arguments.get("recipient_id")
if not recipient:
raise ValueError("Missing recipient parameter")
subject = arguments.get("subject")
if not subject:
raise ValueError("Missing subject parameter")
message = arguments.get("message")
if not message:
raise ValueError("Missing message parameter")
# Extract subject and message content
email_lines = message.split('\n')
if email_lines[0].startswith('Subject:'):
subject = email_lines[0][8:].strip()
message_content = '\n'.join(email_lines[1:]).strip()
else:
message_content = message
send_response = await gmail_service.send_email(recipient, subject, message_content)
if send_response["status"] == "success":
response_text = f"Email sent successfully. Message ID: {send_response['message_id']}"
else:
response_text = f"Failed to send email: {send_response['error_message']}"
return [types.TextContent(type="text", text=response_text)]
if name == "get-unread-emails":
unread_emails = await gmail_service.get_unread_emails()
return [types.TextContent(type="text", text=str(unread_emails),artifact={"type": "json", "data": unread_emails} )]
if name == "read-email":
email_id = arguments.get("email_id")
if not email_id:
raise ValueError("Missing email ID parameter")
retrieved_email = await gmail_service.read_email(email_id)
return [types.TextContent(type="text", text=str(retrieved_email),artifact={"type": "dictionary", "data": retrieved_email} )]
if name == "open-email":
email_id = arguments.get("email_id")
if not email_id:
raise ValueError("Missing email ID parameter")
msg = await gmail_service.open_email(email_id)
return [types.TextContent(type="text", text=str(msg))]
if name == "trash-email":
email_id = arguments.get("email_id")
if not email_id:
raise ValueError("Missing email ID parameter")
msg = await gmail_service.trash_email(email_id)
return [types.TextContent(type="text", text=str(msg))]
if name == "mark-email-as-read":
email_id = arguments.get("email_id")
if not email_id:
raise ValueError("Missing email ID parameter")
msg = await gmail_service.mark_email_as_read(email_id)
return [types.TextContent(type="text", text=str(msg))]
else:
logger.error(f"Unknown tool: {name}")
raise ValueError(f"Unknown tool: {name}")
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="gmail",
server_version="0.1.0",
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Gmail API MCP Server')
parser.add_argument('--creds-file-path',
required=True,
help='OAuth 2.0 credentials file path')
parser.add_argument('--token-path',
required=True,
help='File location to store and retrieve access and refresh tokens for application')
args = parser.parse_args()
asyncio.run(main(args.creds_file_path, args.token_path))
# # path = C:/Users/sanch/Desktop/gmail_plugin/gmail-plugin
#!/usr/bin/env python3
# import os
# import base64
# import json
# import time
# import email
# import asyncio
# import logging
# from aiohttp import web
# import socket
# from email.mime.text import MIMEText
# from google.oauth2.credentials import Credentials
# from google_auth_oauthlib.flow import InstalledAppFlow
# from google.auth.transport.requests import Request
# from googleapiclient.discovery import build
# # Configure logging
# logging.basicConfig(
# level=logging.INFO,
# format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
# )
# logger = logging.getLogger(__name__)
# # Gmail API configuration
# SCOPES = ['https://www.googleapis.com/auth/gmail.readonly',
# 'https://www.googleapis.com/auth/gmail.send']
# TOKEN_FILE = 'token.json'
# CREDENTIALS_FILE = 'credentials.json'
# # MCP Server configuration
# MCP_HOST = '127.0.0.1'
# MCP_PORT = 9999
# WEB_PORT = 8080
# class GmailClient:
# def __init__(self):
# self.service = None
# self.authenticated = False
# def authenticate(self):
# """Authenticate with Gmail API."""
# creds = None
# if os.path.exists(TOKEN_FILE):
# try:
# creds = Credentials.from_authorized_user_info(
# json.loads(open(TOKEN_FILE).read()), SCOPES)
# except Exception as e:
# logger.error(f"Error loading credentials: {e}")
# if not creds or not creds.valid:
# if creds and creds.expired and creds.refresh_token:
# try:
# creds.refresh(Request())
# except Exception as e:
# logger.error(f"Error refreshing credentials: {e}")
# creds = None
# if not creds:
# if not os.path.exists(CREDENTIALS_FILE):
# logger.error(f"Credentials file not found: {CREDENTIALS_FILE}")
# return False
# try:
# flow = InstalledAppFlow.from_client_secrets_file(
# CREDENTIALS_FILE, SCOPES)
# creds = flow.run_local_server(port=0)
# except Exception as e:
# logger.error(f"Error in authentication flow: {e}")
# return False
# # Save the credentials for the next run
# with open(TOKEN_FILE, 'w') as token:
# token.write(creds.to_json())
# try:
# self.service = build('gmail', 'v1', credentials=creds)
# self.authenticated = True
# logger.info("Successfully authenticated with Gmail API")
# return True
# except Exception as e:
# logger.error(f"Error building Gmail service: {e}")
# return False
# async def get_messages(self, query="", max_results=10):
# """Get messages from Gmail."""
# if not self.authenticated:
# if not self.authenticate():
# return []
# try:
# results = self.service.users().messages().list(
# userId='me', q=query, maxResults=max_results).execute()
# messages = results.get('messages', [])
# detailed_messages = []
# for msg in messages:
# message = self.service.users().messages().get(
# userId='me', id=msg['id']).execute()
# headers = {}
# for header in message['payload']['headers']:
# headers[header['name']] = header['value']
# subject = headers.get('Subject', '(No Subject)')
# sender = headers.get('From', '(Unknown Sender)')
# date = headers.get('Date', '')
# body = ""
# if 'parts' in message['payload']:
# for part in message['payload']['parts']:
# if part['mimeType'] == 'text/plain':
# body = base64.urlsafe_b64decode(
# part['body']['data']).decode('utf-8')
# break
# elif 'body' in message['payload'] and 'data' in message['payload']['body']:
# body = base64.urlsafe_b64decode(
# message['payload']['body']['data']).decode('utf-8')
# detailed_messages.append({
# 'id': msg['id'],
# 'threadId': message['threadId'],
# 'subject': subject,
# 'sender': sender,
# 'date': date,
# 'snippet': message['snippet'],
# 'body': body
# })
# return detailed_messages
# except Exception as e:
# logger.error(f"Error fetching messages: {e}")
# return []
# async def send_message(self, to, subject, body):
# """Send a message through Gmail."""
# if not self.authenticated:
# if not self.authenticate():
# return False
# try:
# msg = MIMEText(body)
# msg['to'] = to
# msg['subject'] = subject
# raw_message = base64.urlsafe_b64encode(msg.as_bytes()).decode('utf-8')
# message = self.service.users().messages().send(
# userId='me', body={'raw': raw_message}).execute()
# logger.info(f"Message sent. Message ID: {message['id']}")
# return True
# except Exception as e:
# logger.error(f"Error sending message: {e}")
# return False
# class MCPServer:
# def __init__(self, gmail_client):
# self.gmail_client = gmail_client
# self.clients = set()
# self.server = None
# async def start_server(self):
# """Start the MCP server."""
# self.server = await asyncio.start_server(
# self.handle_client, MCP_HOST, MCP_PORT)
# addr = self.server.sockets[0].getsockname()
# logger.info(f'MCP Server started on {addr}')
# async with self.server:
# await self.server.serve_forever()
# async def handle_client(self, reader, writer):
# """Handle MCP client connections."""
# addr = writer.get_extra_info('peername')
# logger.info(f'New client connection from {addr}')
# self.clients.add(writer)
# try:
# while True:
# data = await reader.readline()
# if not data:
# break
# message = data.decode('utf-8').strip()
# logger.info(f'Received: {message} from {addr}')
# try:
# cmd = json.loads(message)
# response = await self.process_command(cmd)
# writer.write(json.dumps(response).encode('utf-8') + b'\n')
# await writer.drain()
# except json.JSONDecodeError:
# writer.write(b'{"status": "error", "message": "Invalid JSON"}\n')
# await writer.drain()
# except Exception as e:
# logger.error(f"Error handling client: {e}")
# finally:
# writer.close()
# await writer.wait_closed()
# self.clients.remove(writer)
# logger.info(f'Connection closed for {addr}')
# async def process_command(self, cmd):
# """Process MCP commands."""
# try:
# command = cmd.get('command', '')
# if command == 'get_messages':
# query = cmd.get('query', '')
# max_results = cmd.get('max_results', 10)
# messages = await self.gmail_client.get_messages(query, max_results)
# return {'status': 'ok', 'messages': messages}
# elif command == 'send_message':
# to = cmd.get('to', '')
# subject = cmd.get('subject', '')
# body = cmd.get('body', '')
# if not to or not subject or not body:
# return {'status': 'error', 'message': 'Missing required fields'}
# success = await self.gmail_client.send_message(to, subject, body)
# if success:
# return {'status': 'ok', 'message': 'Message sent successfully'}
# else:
# return {'status': 'error', 'message': 'Failed to send message'}
# elif command == 'ping':
# return {'status': 'ok', 'pong': time.time()}
# else:
# return {'status': 'error', 'message': f'Unknown command: {command}'}
# except Exception as e:
# logger.error(f"Error processing command: {e}")
# return {'status': 'error', 'message': str(e)}
# class WebServer:
# def __init__(self, gmail_client):
# self.gmail_client = gmail_client
# self.app = web.Application()
# self.setup_routes()
# def setup_routes(self):
# """Set up web routes."""
# self.app.add_routes([
# web.get('/', self.handle_index),
# web.get('/api/messages', self.handle_get_messages),
# web.post('/api/send', self.handle_send_message),
# web.get('/auth', self.handle_auth),
# ])
# async def handle_index(self, request):
# """Handle index route."""
# return web.Response(text="Gmail-MCP Server API", content_type='text/plain')
# async def handle_get_messages(self, request):
# """Handle API request to get messages."""
# query = request.query.get('q', '')
# max_results = int(request.query.get('max', 10))
# messages = await self.gmail_client.get_messages(query, max_results)
# return web.json_response({'status': 'ok', 'messages': messages})
# async def handle_send_message(self, request):
# """Handle API request to send a message."""
# try:
# data = await request.json()
# to = data.get('to', '')
# subject = data.get('subject', '')
# body = data.get('body', '')
# if not to or not subject or not body:
# return web.json_response(
# {'status': 'error', 'message': 'Missing required fields'},
# status=400
# )
# success = await self.gmail_client.send_message(to, subject, body)
# if success:
# return web.json_response({'status': 'ok', 'message': 'Message sent successfully'})
# else:
# return web.json_response(
# {'status': 'error', 'message': 'Failed to send message'},
# status=500
# )
# except Exception as e:
# return web.json_response(
# {'status': 'error', 'message': str(e)},
# status=500
# )
# async def handle_auth(self, request):
# """Handle authentication."""
# if self.gmail_client.authenticate():
# return web.Response(text="Authentication successful")
# else:
# return web.Response(text="Authentication failed", status=500)
# async def start_server(self):
# """Start the web server."""
# runner = web.AppRunner(self.app)
# await runner.setup()
# site = web.TCPSite(runner, 'localhost', WEB_PORT)
# await site.start()
# logger.info(f'Web server started on http://localhost:{WEB_PORT}')
# async def main():
# """Main entry point."""
# gmail_client = GmailClient()
# # Initialize the MCP server
# mcp_server = MCPServer(gmail_client)
# # Initialize the web server
# web_server = WebServer(gmail_client)
# # Run both servers concurrently
# await asyncio.gather(
# mcp_server.start_server(),
# web_server.start_server()
# )
# if __name__ == '__main__':
# try:
# asyncio.run(main())
# except KeyboardInterrupt:
# logger.info("Server shutdown by user")