Skip to main content
Glama

Email MCP Server

by x-mahar
email_sender.py4 kB
"""Mailjet email sending functionality""" import httpx import base64 import json import re class EmailSender: def __init__(self, api_key: str, secret_key: str): self.api_key = api_key self.secret_key = secret_key self.base_url = "https://api.mailjet.com/v3.1" def _validate_email(self, email: str) -> bool: """Basic email format validation""" pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' return re.match(pattern, email) is not None async def send_single_email(self, email_data: dict): """Send a single email via Mailjet API""" # Validate email addresses if not self._validate_email(email_data["from_email"]): raise Exception(f"Invalid sender email format: {email_data['from_email']}") if not self._validate_email(email_data["to_email"]): raise Exception(f"Invalid recipient email format: {email_data['to_email']}") # Prepare Mailjet payload payload = { "Messages": [ { "From": { "Email": email_data["from_email"], "Name": email_data.get("from_name", "") }, "To": [ { "Email": email_data["to_email"], "Name": email_data.get("to_name", "") } ], "Subject": email_data["subject"], "TextPart": email_data["text_content"] } ] } # Add HTML content if provided if email_data.get("html_content"): payload["Messages"][0]["HTMLPart"] = email_data["html_content"] # Prepare authentication auth_string = f"{self.api_key}:{self.secret_key}" auth_bytes = auth_string.encode('ascii') auth_b64 = base64.b64encode(auth_bytes).decode('ascii') headers = { "Authorization": f"Basic {auth_b64}", "Content-Type": "application/json" } # Send email try: async with httpx.AsyncClient() as client: response = await client.post( f"{self.base_url}/send", headers=headers, json=payload, timeout=30.0 ) if response.status_code == 401: raise Exception("Invalid Mailjet credentials") elif response.status_code == 400: error_detail = response.json().get("ErrorMessage", "Bad request") raise Exception(f"Mailjet API error: {error_detail}") elif response.status_code != 200: raise Exception(f"Mailjet API error ({response.status_code}): {response.text}") result = response.json() # Check if email was successfully queued if result.get("Messages") and len(result["Messages"]) > 0: message_result = result["Messages"][0] if message_result.get("Status") == "success": return { "success": True, "message_id": message_result.get("To", [{}])[0].get("MessageID"), "status": "queued" } else: errors = message_result.get("Errors", []) error_msg = errors[0].get("ErrorMessage", "Unknown error") if errors else "Unknown error" raise Exception(f"Email sending failed: {error_msg}") else: raise Exception("No message status returned from Mailjet") except httpx.TimeoutException: raise Exception("Email sending timed out") except httpx.RequestError as e: raise Exception(f"Network error: {str(e)}")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/x-mahar/mail-mcpServer'

If you have feedback or need assistance with the MCP directory API, please join our Discord server