MCP Personal Assistant Agent

  • modules
from typing import List, Dict, Any, Optional import os import logging import base64 import re from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from google.oauth2.credentials import Credentials from googleapiclient.discovery import build from googleapiclient.errors import HttpError # Import the MCP server instance from the main file from mcp_server import mcp, Context logger = logging.getLogger("mcp-pa-agent.email") # Helper functions async def get_gmail_service(): """Get an authenticated Gmail service if credentials are available.""" try: # In a real implementation, you would handle OAuth2 credentials properly # For demo purposes, we'll just check if the required env vars exist if not all([ os.getenv("GOOGLE_CLIENT_ID"), os.getenv("GOOGLE_CLIENT_SECRET"), os.getenv("GOOGLE_REFRESH_TOKEN") ]): return None credentials = Credentials( token=os.getenv("GOOGLE_ACCESS_TOKEN"), refresh_token=os.getenv("GOOGLE_REFRESH_TOKEN"), client_id=os.getenv("GOOGLE_CLIENT_ID"), client_secret=os.getenv("GOOGLE_CLIENT_SECRET"), token_uri="https://oauth2.googleapis.com/token", scopes=["https://www.googleapis.com/auth/gmail.readonly", "https://www.googleapis.com/auth/gmail.send"] ) return build("gmail", "v1", credentials=credentials) except Exception as e: logger.error(f"Failed to get Gmail service: {str(e)}") return None # Prompts @mcp.prompt() def compose_email_prompt(to: str, subject: str = "") -> str: """Create a prompt for composing an email""" return f"Please write a professional email to {to}" + (f" with the subject '{subject}'" if subject else "") + "." @mcp.prompt() def reply_to_email_prompt(message_id: str) -> str: """Create a prompt for replying to an email""" return f"Please help me draft a reply to the email with message ID {message_id}." # Resources @mcp.resource("email://inbox/{max_results}") async def inbox_resource(max_results: str = "10") -> str: """Resource providing emails from the inbox""" service = await get_gmail_service() if not service: return "Gmail service is not available. Please check your Google API credentials." try: # Convert max_results to int with validation try: num_results = int(max_results) if num_results < 1 or num_results > 50: return "Invalid max_results parameter. Must be between 1 and 50." except ValueError: return "Invalid max_results parameter. Must be an integer." # Call the Gmail API results = service.users().messages().list( userId='me', labelIds=["INBOX"], maxResults=num_results ).execute() # Return JSON representation import json return json.dumps(results, indent=2) except Exception as e: logger.error(f"Error fetching inbox: {str(e)}") return f"Error fetching inbox: {str(e)}" # Tool functions @mcp.tool() async def get_emails(max_results: int = 10, label: str = "INBOX", ctx: Context = None) -> str: """Get recent emails from a specific label. Args: max_results: Maximum number of emails to retrieve (default 10, max 50) label: Email label to fetch from (default 'INBOX') """ if ctx: ctx.info(f"Getting up to {max_results} emails from label: {label}") if max_results < 1 or max_results > 50: error_msg = "max_results must be between 1 and 50." if ctx: ctx.error(error_msg) return error_msg service = await get_gmail_service() if not service: error_msg = "Gmail service is not available. Please check your Google API credentials." if ctx: ctx.error(error_msg) return error_msg try: # Call the Gmail API results = service.users().messages().list( userId='me', labelIds=[label], maxResults=max_results ).execute() messages = results.get('messages', []) if not messages: return f"No messages found in {label}." if ctx: ctx.info(f"Found {len(messages)} emails. Fetching details...") # Format the emails nicely formatted_emails = [] for i, message in enumerate(messages): if ctx: await ctx.report_progress(i, len(messages)) msg = service.users().messages().get(userId='me', id=message['id']).execute() # Extract headers headers = msg['payload']['headers'] subject = next((h['value'] for h in headers if h['name'].lower() == 'subject'), 'No subject') sender = next((h['value'] for h in headers if h['name'].lower() == 'from'), 'Unknown sender') date = next((h['value'] for h in headers if h['name'].lower() == 'date'), 'Unknown date') # Extract snippet snippet = msg.get('snippet', 'No preview available') formatted_emails.append(f""" From: {sender} Date: {date} Subject: {subject} Preview: {snippet} Message ID: {message['id']} """) return "\n---\n".join(formatted_emails) except HttpError as error: error_msg = f"An error occurred while fetching emails: {error}" if ctx: ctx.error(error_msg) return error_msg except Exception as e: error_msg = f"Unexpected error fetching emails: {str(e)}" if ctx: ctx.error(error_msg) return error_msg @mcp.tool() async def read_email(message_id: str, ctx: Context = None) -> str: """Read the full content of a specific email. Args: message_id: The ID of the email to read """ if ctx: ctx.info(f"Reading email with ID: {message_id}") if not message_id or message_id.strip() == "": error_msg = "Message ID cannot be empty." if ctx: ctx.error(error_msg) return error_msg service = await get_gmail_service() if not service: error_msg = "Gmail service is not available. Please check your Google API credentials." if ctx: ctx.error(error_msg) return error_msg try: # Get the full message msg = service.users().messages().get(userId='me', id=message_id, format='full').execute() # Extract headers headers = msg['payload']['headers'] subject = next((h['value'] for h in headers if h['name'].lower() == 'subject'), 'No subject') sender = next((h['value'] for h in headers if h['name'].lower() == 'from'), 'Unknown sender') date = next((h['value'] for h in headers if h['name'].lower() == 'date'), 'Unknown date') to = next((h['value'] for h in headers if h['name'].lower() == 'to'), 'Unknown recipient') if ctx: ctx.info(f"Retrieved email: '{subject}' from {sender}") # Extract body body = "" if 'parts' in msg['payload']: for part in msg['payload']['parts']: if part['mimeType'] == 'text/plain': if 'data' in part['body']: body = base64.urlsafe_b64decode(part['body']['data']).decode('utf-8') break elif 'body' in msg['payload'] and 'data' in msg['payload']['body']: body = base64.urlsafe_b64decode(msg['payload']['body']['data']).decode('utf-8') if not body: body = "Email body could not be extracted (possibly HTML-only content)" return f""" From: {sender} To: {to} Date: {date} Subject: {subject} {body} """ except HttpError as error: error_msg = f"An error occurred while reading the email: {error}" if ctx: ctx.error(error_msg) return error_msg except Exception as e: error_msg = f"Unexpected error reading email: {str(e)}" if ctx: ctx.error(error_msg) return error_msg @mcp.tool() async def send_email(to: str, subject: str, body: str, ctx: Context = None) -> str: """Send an email. Args: to: Recipient email address subject: Email subject body: Email body (plain text) """ if ctx: ctx.info(f"Preparing to send email to: {to}") # Validate inputs if not to or len(to.strip()) == 0: error_msg = "Recipient email address cannot be empty." if ctx: ctx.error(error_msg) return error_msg # Validate email format with regex email_pattern = re.compile(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$') if not email_pattern.match(to): error_msg = f"Invalid email address format: {to}" if ctx: ctx.error(error_msg) return error_msg if not subject or len(subject.strip()) == 0: error_msg = "Email subject cannot be empty." if ctx: ctx.error(error_msg) return error_msg if not body or len(body.strip()) == 0: error_msg = "Email body cannot be empty." if ctx: ctx.error(error_msg) return error_msg service = await get_gmail_service() if not service: error_msg = "Gmail service is not available. Please check your Google API credentials." if ctx: ctx.error(error_msg) return error_msg try: if ctx: ctx.info("Creating email message...") # Create the email message message = MIMEMultipart() message['to'] = to message['subject'] = subject # Add text part msg = MIMEText(body) message.attach(msg) # Encode the message raw_message = base64.urlsafe_b64encode(message.as_bytes()).decode('utf-8') if ctx: ctx.info("Sending email...") # Send the message sent_message = service.users().messages().send( userId='me', body={'raw': raw_message} ).execute() success_msg = f"Email sent successfully. Message ID: {sent_message['id']}" if ctx: ctx.info(success_msg) return success_msg except HttpError as error: error_msg = f"An error occurred while sending the email: {error}" if ctx: ctx.error(error_msg) return error_msg except Exception as e: error_msg = f"Unexpected error sending email: {str(e)}" if ctx: ctx.error(error_msg) return error_msg @mcp.tool() async def search_emails(query: str, max_results: int = 10, ctx: Context = None) -> str: """Search emails using Gmail search syntax. Args: query: Search query using Gmail search operators max_results: Maximum number of results to return (default 10, max 50) """ if ctx: ctx.info(f"Searching emails with query: {query}") if not query or len(query.strip()) == 0: error_msg = "Search query cannot be empty." if ctx: ctx.error(error_msg) return error_msg if max_results < 1 or max_results > 50: error_msg = "max_results must be between 1 and 50." if ctx: ctx.error(error_msg) return error_msg service = await get_gmail_service() if not service: error_msg = "Gmail service is not available. Please check your Google API credentials." if ctx: ctx.error(error_msg) return error_msg try: # Search emails results = service.users().messages().list( userId='me', q=query, maxResults=max_results ).execute() messages = results.get('messages', []) if not messages: return f"No emails matching '{query}' found." if ctx: ctx.info(f"Found {len(messages)} matching emails. Fetching details...") # Format the emails nicely formatted_emails = [] for i, message in enumerate(messages): if ctx: await ctx.report_progress(i, len(messages)) msg = service.users().messages().get(userId='me', id=message['id']).execute() # Extract headers headers = msg['payload']['headers'] subject = next((h['value'] for h in headers if h['name'].lower() == 'subject'), 'No subject') sender = next((h['value'] for h in headers if h['name'].lower() == 'from'), 'Unknown sender') date = next((h['value'] for h in headers if h['name'].lower() == 'date'), 'Unknown date') # Extract snippet snippet = msg.get('snippet', 'No preview available') formatted_emails.append(f""" From: {sender} Date: {date} Subject: {subject} Preview: {snippet} Message ID: {message['id']} """) return f"Search results for '{query}':\n\n" + "\n---\n".join(formatted_emails) except HttpError as error: error_msg = f"An error occurred while searching emails: {error}" if ctx: ctx.error(error_msg) return error_msg except Exception as e: error_msg = f"Unexpected error searching emails: {str(e)}" if ctx: ctx.error(error_msg) return error_msg