Skip to main content
Glama

Lemon Email MCP Server

by manojk0303
simple_mcp_server.py13.6 kB
#!/usr/bin/env python3 """ Lemon Email MCP Server - Direct API Integration No intermediate servers - connects directly to Lemon Email API """ import asyncio import json import os import sys from typing import Any, Dict, List, Optional import httpx # MCP imports with error handling try: from mcp.server.models import InitializationOptions from mcp.server import NotificationOptions, Server from mcp.server.stdio import stdio_server from mcp.types import ( Tool, ) MCP_AVAILABLE = True except ImportError as e: print(f"MCP not available: {e}") MCP_AVAILABLE = False # Server configuration SERVER_NAME = "lemon-email" SERVER_VERSION = "2.0.0" LEMON_API_URL = "https://app.xn--lemn-sqa.com/api/transactional/send" class LemonEmailClient: """Direct client for Lemon Email API - no intermediates""" def __init__(self, api_key: str): if not api_key: raise ValueError("Lemon Email API key is required") self.api_key = api_key self.api_url = LEMON_API_URL async def send_email( self, to: str, subject: str, body: str, fromname: str = "Email Assistant", fromemail: str = "mail@member-notification.com", toname: str = "", tag: str = "mcp-agent", variables: Optional[Dict[str, Any]] = None, replyto: Optional[str] = None ) -> Dict[str, Any]: """ Send email directly to Lemon Email API Args: to: Recipient email address subject: Email subject body: Email body (plain text or HTML) fromname: Sender display name fromemail: Sender email address toname: Recipient display name tag: Tag for categorization/tracking variables: Template variables (dict) replyto: Reply-to address Returns: Dict with success status and response details """ if not replyto: replyto = fromemail # Prepare payload for Lemon Email API payload = { "fromname": fromname, "fromemail": fromemail, "to": to, "toname": toname, "subject": subject, "body": body, "tag": tag, "variables": variables or {}, "replyto": replyto } # Set up headers with API key headers = { "Content-Type": "application/json", "X-Auth-APIKey": self.api_key } # Make direct API call to Lemon Email async with httpx.AsyncClient() as client: try: response = await client.post( self.api_url, headers=headers, json=payload, timeout=30.0 ) response_data = { "status_code": response.status_code, "response": response.text, "success": response.is_success } if not response.is_success: response_data["error"] = f"Lemon API error {response.status_code}: {response.text}" return response_data except httpx.TimeoutException: return { "success": False, "error": "Request to Lemon API timed out after 30 seconds" } except Exception as e: return { "success": False, "error": f"Network error connecting to Lemon API: {str(e)}" } def create_mcp_server(api_key: str): """Create and configure the MCP server with direct Lemon API access""" if not MCP_AVAILABLE: raise ImportError("MCP library not available. Install with: pip install mcp") server = Server(SERVER_NAME) email_client = LemonEmailClient(api_key) @server.list_tools() async def list_tools() -> List[Tool]: """List available email tools""" return [ Tool( name="send_email", description=( "Send an email directly via Lemon Email API. " "Perfect for AI agents to send transactional emails, notifications, and messages." ), inputSchema={ "type": "object", "properties": { "to": { "type": "string", "description": "Recipient email address (required)" }, "subject": { "type": "string", "description": "Email subject line (required)" }, "body": { "type": "string", "description": "Email body content - supports plain text or HTML (required)" }, "fromname": { "type": "string", "description": "Sender display name (default: 'Email Assistant')", "default": "Email Assistant" }, "fromemail": { "type": "string", "description": "Sender email address (default: 'mail@member-notification.com')", "default": "mail@member-notification.com" }, "toname": { "type": "string", "description": "Recipient display name (optional)", "default": "" }, "tag": { "type": "string", "description": "Tag for email categorization/tracking (default: 'mcp-agent')", "default": "mcp-agent" }, "variables": { "type": "object", "description": "Template variables as key-value pairs (optional)", "additionalProperties": True }, "replyto": { "type": "string", "description": "Reply-to email address (optional, defaults to fromemail)" } }, "required": ["to", "subject", "body"] } ) ] @server.call_tool() async def call_tool(name: str, arguments: dict): """Handle tool execution - send emails via Lemon API""" if name != "send_email": return { "content": [ { "type": "text", "text": f"❌ Unknown tool: {name}" } ] } try: # Validate required fields required = ["to", "subject", "body"] missing = [field for field in required if field not in arguments or not arguments[field]] if missing: return { "content": [ { "type": "text", "text": f"❌ Missing required fields: {', '.join(missing)}" } ] } # Send email directly to Lemon API result = await email_client.send_email(**arguments) if result["success"]: return { "content": [ { "type": "text", "text": ( f"✅ Email sent successfully via Lemon API!\n" f"📧 To: {arguments['to']}\n" f"📝 Subject: {arguments['subject']}\n" f"🔖 Tag: {arguments.get('tag', 'mcp-agent')}\n" f"📊 Status: {result['status_code']}\n" f"🎯 Response: {result['response']}" ) } ] } else: return { "content": [ { "type": "text", "text": f"❌ Email failed: {result.get('error', 'Unknown error from Lemon API')}" } ] } except Exception as e: return { "content": [ { "type": "text", "text": f"❌ Error sending email: {type(e).__name__}: {str(e)}" } ] } return server async def run_mcp_server(): """Run the MCP server with direct Lemon API connection""" if not MCP_AVAILABLE: print("❌ MCP library not available. Install with: pip install mcp") return api_key = os.getenv("LEMON_EMAIL_API_KEY") if not api_key: print("❌ LEMON_EMAIL_API_KEY environment variable required") print("💡 Set it with: export LEMON_EMAIL_API_KEY='your-key-here'") return print(f"🍋 Starting {SERVER_NAME} MCP Server v{SERVER_VERSION}") print(f"🔗 Direct connection to: {LEMON_API_URL}") print("📡 Waiting for MCP client connection...") print("💡 Press Ctrl+C to stop") try: server = create_mcp_server(api_key) async with stdio_server() as (read_stream, write_stream): initialization_options = InitializationOptions( server_name=SERVER_NAME, server_version=SERVER_VERSION, capabilities=server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={} ) ) await server.run( read_stream, write_stream, initialization_options ) except KeyboardInterrupt: print("\n🛑 Server stopped by user") except Exception as e: print(f"❌ Server error: {type(e).__name__}: {e}") import traceback traceback.print_exc() async def run_standalone_test(): """Test direct connection to Lemon API""" print("🧪 Testing direct connection to Lemon Email API...") print("=" * 50) api_key = os.getenv("LEMON_EMAIL_API_KEY") if not api_key: print("❌ LEMON_EMAIL_API_KEY environment variable required") return try: email_client = LemonEmailClient(api_key) print("📧 Sending test email...") result = await email_client.send_email( to="test@example.com", # Change to your email subject="🧪 Direct API Test - Lemon Email MCP", body=( "This is a test email sent directly to Lemon Email API!\n\n" "✅ No intermediate servers\n" "✅ Direct API connection\n" "✅ Fast and reliable\n\n" "Your MCP server is working perfectly!" ), fromname="Lemon MCP Test", fromemail="mail@member-notification.com", tag="direct-api-test" ) print("\n" + "=" * 50) if result["success"]: print("✅ SUCCESS! Email sent directly to Lemon API") print(f"📊 Status Code: {result['status_code']}") print(f"📝 Response: {result['response']}") print("\n🎉 Your direct API connection is working!") else: print("❌ FAILED to send email") print(f"⚠️ Error: {result.get('error', 'Unknown error')}") except Exception as e: print(f"❌ Test error: {type(e).__name__}: {e}") def print_usage(): """Print usage instructions""" print("🍋 Lemon Email MCP Server - Direct API") print("=" * 50) print("\nUsage:") print(" python simple_mcp_server.py # Start MCP server") print(" python simple_mcp_server.py test # Test direct API connection") print(" python simple_mcp_server.py help # Show this help") print("\nEnvironment Variables:") print(" LEMON_EMAIL_API_KEY (required) Your Lemon Email API key") print("\nGet API Key:") print(" DM @Norman_Szobotka on Twitter") print(" Email: manojk030303@gmail.com") print("\nDirect API Endpoint:") print(f" {LEMON_API_URL}") async def main(): """Main entry point""" if len(sys.argv) > 1: command = sys.argv[1].lower() if command == "test": await run_standalone_test() elif command in ["help", "-h", "--help"]: print_usage() else: print(f"❌ Unknown command: {command}") print_usage() else: await run_mcp_server() if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/manojk0303/lemon-email-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server