main.py•4.72 kB
from fastapi import FastAPI, Query, HTTPException
from fastapi.responses import StreamingResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
import asyncio
import json
import os
from dotenv import load_dotenv
from mcp_tools import get_mcp_tools, handle_tool_call
from email_sender import EmailSender
load_dotenv()
app = FastAPI(
title="Email MCP Server",
description="Send emails via MCP protocol over SSE",
version="1.0.0"
)
# CORS for web clients
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Restrict in production
allow_methods=["GET", "POST"],
allow_headers=["*"],
)
# Simple in-memory auth
VALID_TOKENS = os.getenv("VALID_API_TOKENS", "").split(",")
def validate_token(token: str) -> bool:
return token in VALID_TOKENS and token != ""
@app.get("/")
async def root():
return {
"message": "Email MCP Server",
"sse_endpoint": "/sse",
"status": "running"
}
@app.get("/sse")
async def mcp_sse_endpoint(
api_token: str = Query(..., description="API token for authentication")
):
"""SSE endpoint for MCP protocol communication"""
if not validate_token(api_token):
raise HTTPException(status_code=401, detail="Invalid API token")
async def event_stream():
# Send SSE headers
yield "event: connected\n"
yield 'data: {"status": "connected", "server": "email-mcp"}\n\n'
# Keep connection alive and handle MCP messages
try:
while True:
# In a real implementation, you'd handle incoming JSON-RPC messages here
# For now, just keep connection alive
await asyncio.sleep(30)
yield "event: ping\n"
yield 'data: {"type": "ping"}\n\n'
except Exception as e:
yield "event: error\n"
yield f'data: {{"error": "{str(e)}"}}\n\n'
return StreamingResponse(
event_stream(),
media_type="text/plain",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Access-Control-Allow-Origin": "*",
}
)
@app.post("/mcp")
async def handle_mcp_message(
message: dict,
api_token: str = Query(..., description="API token")
):
"""Handle MCP JSON-RPC messages (for testing/debugging)"""
if not validate_token(api_token):
raise HTTPException(status_code=401, detail="Invalid API token")
# Handle MCP protocol messages
if message.get("method") == "initialize":
return {
"jsonrpc": "2.0",
"id": message.get("id"),
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {}
},
"serverInfo": {
"name": "email-mcp-server",
"version": "1.0.0"
}
}
}
elif message.get("method") == "tools/list":
return {
"jsonrpc": "2.0",
"id": message.get("id"),
"result": {
"tools": get_mcp_tools()
}
}
elif message.get("method") == "tools/call":
tool_name = message.get("params", {}).get("name")
arguments = message.get("params", {}).get("arguments", {})
# Get user's Mailjet credentials from arguments or headers
mailjet_key = arguments.pop("mailjet_api_key", None)
mailjet_secret = arguments.pop("mailjet_secret_key", None)
if not mailjet_key or not mailjet_secret:
return {
"jsonrpc": "2.0",
"id": message.get("id"),
"error": {
"code": -32602,
"message": "Missing Mailjet credentials"
}
}
try:
result = await handle_tool_call(tool_name, arguments, mailjet_key, mailjet_secret)
return {
"jsonrpc": "2.0",
"id": message.get("id"),
"result": result
}
except Exception as e:
return {
"jsonrpc": "2.0",
"id": message.get("id"),
"error": {
"code": -32603,
"message": str(e)
}
}
return {
"jsonrpc": "2.0",
"id": message.get("id"),
"error": {
"code": -32601,
"message": "Method not found"
}
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(
app,
host=os.getenv("HOST", "0.0.0.0"),
port=int(os.getenv("PORT", 8000))
)