#!/usr/bin/env python3
"""
Vercel Serverless Function for MCP Server
"""
import json
import subprocess
import sys
from pathlib import Path
from typing import Any
# Workspace directory - use /tmp for Vercel serverless
WORKSPACE_DIR = Path("/tmp/workspace")
WORKSPACE_DIR.mkdir(exist_ok=True)
def get_safe_path(filepath: str) -> Path:
"""Ensure path is within workspace directory"""
requested_path = WORKSPACE_DIR / filepath
resolved_path = requested_path.resolve()
if not str(resolved_path).startswith(str(WORKSPACE_DIR.resolve())):
raise ValueError("Path must be within workspace directory")
return resolved_path
def get_tools() -> list[dict]:
"""Return list of available tools"""
return [
{
"name": "create_file",
"description": "Create a new file with content in the workspace",
"inputSchema": {
"type": "object",
"properties": {
"filepath": {
"type": "string",
"description": "Path to the file relative to workspace",
},
"content": {
"type": "string",
"description": "Content to write to the file",
},
},
"required": ["filepath", "content"],
},
},
{
"name": "read_file",
"description": "Read the contents of a file from the workspace",
"inputSchema": {
"type": "object",
"properties": {
"filepath": {
"type": "string",
"description": "Path to the file relative to workspace",
},
},
"required": ["filepath"],
},
},
{
"name": "update_file",
"description": "Update an existing file's content in the workspace",
"inputSchema": {
"type": "object",
"properties": {
"filepath": {
"type": "string",
"description": "Path to the file relative to workspace",
},
"content": {
"type": "string",
"description": "New content for the file",
},
},
"required": ["filepath", "content"],
},
},
{
"name": "delete_file",
"description": "Delete a file from the workspace",
"inputSchema": {
"type": "object",
"properties": {
"filepath": {
"type": "string",
"description": "Path to the file relative to workspace",
},
},
"required": ["filepath"],
},
},
{
"name": "list_files",
"description": "List all files in the workspace or a specific directory",
"inputSchema": {
"type": "object",
"properties": {
"directory": {
"type": "string",
"description": "Directory path relative to workspace (empty for root)",
"default": "",
},
},
},
},
{
"name": "execute_python",
"description": "Execute a Python script from the workspace",
"inputSchema": {
"type": "object",
"properties": {
"filepath": {
"type": "string",
"description": "Path to the Python script relative to workspace",
},
"args": {
"type": "array",
"items": {"type": "string"},
"description": "Command line arguments to pass to the script",
"default": [],
},
},
"required": ["filepath"],
},
},
{
"name": "create_directory",
"description": "Create a new directory in the workspace",
"inputSchema": {
"type": "object",
"properties": {
"dirpath": {
"type": "string",
"description": "Path to the directory relative to workspace",
},
},
"required": ["dirpath"],
},
},
]
def execute_tool(name: str, arguments: dict) -> dict:
"""Execute a tool and return the result"""
try:
if name == "create_file":
filepath = get_safe_path(arguments["filepath"])
content = arguments["content"]
filepath.parent.mkdir(parents=True, exist_ok=True)
filepath.write_text(content, encoding="utf-8")
return {
"success": True,
"message": f"Successfully created file: {arguments['filepath']}\nSize: {len(content)} bytes"
}
elif name == "read_file":
filepath = get_safe_path(arguments["filepath"])
if not filepath.exists():
return {"success": False, "error": f"File not found: {arguments['filepath']}"}
content = filepath.read_text(encoding="utf-8")
return {
"success": True,
"content": content,
"filepath": arguments['filepath']
}
elif name == "update_file":
filepath = get_safe_path(arguments["filepath"])
content = arguments["content"]
if not filepath.exists():
return {"success": False, "error": f"File not found: {arguments['filepath']}"}
filepath.write_text(content, encoding="utf-8")
return {
"success": True,
"message": f"Successfully updated file: {arguments['filepath']}\nNew size: {len(content)} bytes"
}
elif name == "delete_file":
filepath = get_safe_path(arguments["filepath"])
if not filepath.exists():
return {"success": False, "error": f"File not found: {arguments['filepath']}"}
filepath.unlink()
return {"success": True, "message": f"Successfully deleted file: {arguments['filepath']}"}
elif name == "list_files":
directory = arguments.get("directory", "")
dirpath = get_safe_path(directory) if directory else WORKSPACE_DIR
if not dirpath.exists():
return {"success": False, "error": f"Directory not found: {directory}"}
files = []
dirs = []
for item in sorted(dirpath.iterdir()):
rel_path = str(item.relative_to(WORKSPACE_DIR))
if item.is_file():
files.append({"name": rel_path, "size": item.stat().st_size, "type": "file"})
else:
dirs.append({"name": rel_path, "type": "directory"})
return {"success": True, "directories": dirs, "files": files}
elif name == "execute_python":
filepath = get_safe_path(arguments["filepath"])
args = arguments.get("args", [])
if not filepath.exists():
return {"success": False, "error": f"Script not found: {arguments['filepath']}"}
if not filepath.suffix == ".py":
return {"success": False, "error": "File must be a Python script (.py)"}
try:
result = subprocess.run(
[sys.executable, str(filepath)] + args,
capture_output=True,
text=True,
timeout=30,
cwd=str(WORKSPACE_DIR),
)
return {
"success": True,
"exit_code": result.returncode,
"stdout": result.stdout,
"stderr": result.stderr
}
except subprocess.TimeoutExpired:
return {"success": False, "error": "Script execution timed out after 30 seconds"}
except Exception as e:
return {"success": False, "error": f"Error executing script: {str(e)}"}
elif name == "create_directory":
dirpath = get_safe_path(arguments["dirpath"])
dirpath.mkdir(parents=True, exist_ok=True)
return {"success": True, "message": f"Successfully created directory: {arguments['dirpath']}"}
else:
return {"success": False, "error": f"Unknown tool: {name}"}
except ValueError as e:
return {"success": False, "error": f"Security error: {str(e)}"}
except Exception as e:
return {"success": False, "error": f"Error: {str(e)}"}
def handler(request):
"""Main Vercel handler function with MCP SSE support"""
from urllib.parse import urlparse, parse_qs
# Get request details
method = request.get('method', 'GET')
path = request.get('path', '/')
query = request.get('queryStringParameters', {}) or {}
headers_in = request.get('headers', {})
# Common CORS headers for all responses
cors_headers = {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'GET, POST, OPTIONS',
'Access-Control-Allow-Headers': 'Content-Type, Accept, Authorization, X-Requested-With',
'Access-Control-Allow-Credentials': 'true',
'Access-Control-Max-Age': '86400',
}
# Handle OPTIONS/preflight for CORS
if method == 'OPTIONS':
return {
'statusCode': 200,
'headers': cors_headers,
'body': ''
}
# MCP SSE endpoint - this is what ChatGPT Apps connects to
if path == '/sse' or path == '/api/mcp/sse' or path.endswith('/sse'):
# SSE headers
sse_headers = {
**cors_headers,
'Content-Type': 'text/event-stream; charset=utf-8',
'Cache-Control': 'no-cache, no-transform',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no',
}
# Check if this is initialization or a message
body_str = request.get('body', '')
# If GET or no body, send initialization
if method == 'GET' or not body_str or body_str == '':
# MCP initialization response
init_message = {
"jsonrpc": "2.0",
"id": 1,
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {}
},
"serverInfo": {
"name": "ai-workspace",
"version": "1.0.0"
}
}
}
# Format as SSE
sse_response = f"event: message\ndata: {json.dumps(init_message)}\n\n"
return {
'statusCode': 200,
'headers': sse_headers,
'body': sse_response
}
# Handle MCP messages via POST
try:
if isinstance(body_str, str):
message = json.loads(body_str)
else:
message = body_str
method_name = message.get('method', '')
msg_id = message.get('id', 1)
# Handle tools/list
if method_name == 'tools/list':
response = {
"jsonrpc": "2.0",
"id": msg_id,
"result": {
"tools": get_tools()
}
}
# Handle tools/call
elif method_name == 'tools/call':
params = message.get('params', {})
tool_name = params.get('name')
arguments = params.get('arguments', {})
result = execute_tool(tool_name, arguments)
# Format result for MCP
if result.get('success'):
content_text = result.get('message') or result.get('content') or json.dumps(result)
else:
content_text = f"Error: {result.get('error', 'Unknown error')}"
response = {
"jsonrpc": "2.0",
"id": msg_id,
"result": {
"content": [
{
"type": "text",
"text": content_text
}
]
}
}
# Handle initialize
elif method_name == 'initialize':
response = {
"jsonrpc": "2.0",
"id": msg_id,
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {}
},
"serverInfo": {
"name": "ai-workspace",
"version": "1.0.0"
}
}
}
else:
response = {
"jsonrpc": "2.0",
"id": msg_id,
"error": {
"code": -32601,
"message": f"Method not found: {method_name}"
}
}
# Format as SSE
sse_response = f"event: message\ndata: {json.dumps(response)}\n\n"
return {
'statusCode': 200,
'headers': sse_headers,
'body': sse_response
}
except Exception as e:
error_response = {
"jsonrpc": "2.0",
"id": 1,
"error": {
"code": -32603,
"message": str(e)
}
}
sse_response = f"event: message\ndata: {json.dumps(error_response)}\n\n"
return {
'statusCode': 200,
'headers': sse_headers,
'body': sse_response
}
# MCP discovery endpoint
if path == '/.well-known/mcp.json':
host = headers_in.get('host', 'your-project.vercel.app')
return {
'statusCode': 200,
'headers': {
**cors_headers,
'Content-Type': 'application/json',
},
'body': json.dumps({
"mcpServers": {
"ai-workspace": {
"url": f"https://{host}/sse",
"transport": "sse"
}
}
})
}
# Regular HTTP endpoints (for testing)
json_headers = {
**cors_headers,
'Content-Type': 'application/json',
}
if method == 'GET':
if path == '/' or path == '/health' or path == '/api/mcp':
response_data = {
"name": "AI Workspace MCP Server",
"version": "1.0.0",
"status": "running",
"mcp_sse_endpoint": "/sse",
"discovery": "/.well-known/mcp.json",
"endpoints": {
"health": "/health",
"tools": "/tools",
"execute": "/execute (POST)",
"mcp_sse": "/sse (for ChatGPT Apps)"
},
"note": "This endpoint is public and requires no authentication"
}
return {
'statusCode': 200,
'headers': json_headers,
'body': json.dumps(response_data)
}
elif path == '/tools':
return {
'statusCode': 200,
'headers': json_headers,
'body': json.dumps({"tools": get_tools()})
}
else:
return {
'statusCode': 404,
'headers': json_headers,
'body': json.dumps({"error": "Not found"})
}
# Handle POST requests (legacy REST API)
elif method == 'POST':
if path == '/execute' or path == '/api/mcp/execute':
try:
body = request.get('body', '{}')
if isinstance(body, str):
request_data = json.loads(body)
else:
request_data = body
tool_name = request_data.get("tool")
arguments = request_data.get("arguments", {})
if not tool_name:
return {
'statusCode': 400,
'headers': json_headers,
'body': json.dumps({"error": "Missing 'tool' parameter"})
}
result = execute_tool(tool_name, arguments)
return {
'statusCode': 200,
'headers': json_headers,
'body': json.dumps(result)
}
except json.JSONDecodeError:
return {
'statusCode': 400,
'headers': json_headers,
'body': json.dumps({"error": "Invalid JSON"})
}
except Exception as e:
return {
'statusCode': 500,
'headers': json_headers,
'body': json.dumps({"error": str(e)})
}
else:
return {
'statusCode': 404,
'headers': json_headers,
'body': json.dumps({"error": "Not found"})
}
else:
return {
'statusCode': 405,
'headers': json_headers,
'body': json.dumps({"error": "Method not allowed"})
}