Skip to main content
Glama

Supabase MCP HTTP Stream Server

by BIFROTEK-com
Apache 2.0
4
mcp-http-server.js29 kB
#!/usr/bin/env node const express = require('express'); const { spawn } = require('child_process'); const cors = require('cors'); const rateLimit = require('express-rate-limit'); const helmet = require('helmet'); const crypto = require('crypto'); const app = express(); const port = process.env.MCP_PORT || 3333; // MCP HTTP Stream Transport Configuration const mcpConfig = { responseMode: process.env.MCP_RESPONSE_MODE || 'batch', // 'batch' or 'stream' batchTimeout: parseInt(process.env.MCP_BATCH_TIMEOUT || '30000'), // 30 seconds maxMessageSize: process.env.MCP_MAX_MESSAGE_SIZE || '4mb', session: { enabled: process.env.MCP_SESSION_ENABLED !== 'false', // default true headerName: 'Mcp-Session-Id', allowClientTermination: process.env.MCP_ALLOW_CLIENT_TERMINATION !== 'false' // default true }, resumability: { enabled: process.env.MCP_RESUMABILITY_ENABLED === 'true', // default false historyDuration: parseInt(process.env.MCP_HISTORY_DURATION || '300000') // 5 minutes } }; console.log('🔧 MCP Configuration:', { responseMode: mcpConfig.responseMode, sessionEnabled: mcpConfig.session.enabled, resumabilityEnabled: mcpConfig.resumability.enabled }); // Configure Express trust proxy for ngrok/proxy environments if (process.env.EXPRESS_TRUST_PROXY === '1' || process.env.NODE_ENV === 'production') { app.set('trust proxy', 1); console.log('✅ Express trust proxy enabled for production/proxy environment'); } // Security: Basic hardening against script kiddies app.use(helmet({ contentSecurityPolicy: { directives: { defaultSrc: ["'self'"], scriptSrc: ["'self'"], styleSrc: ["'self'", "'unsafe-inline'"], }, }, crossOriginEmbedderPolicy: false // Allow iframe embedding for testing })); // Rate limiting - prevents basic DDoS and abuse const createRateLimiter = (windowMs, max, message) => rateLimit({ windowMs, max, message: { error: message, retryAfter: Math.ceil(windowMs / 1000) + ' seconds' }, standardHeaders: true, legacyHeaders: false, skip: (req) => { // Skip rate limiting for health checks return req.path === '/health'; } }); // Different rate limits for different endpoints const mcpLimiter = createRateLimiter( 15 * 60 * 1000, // 15 minutes parseInt(process.env.MCP_RATE_LIMIT_REQUESTS || '100'), // 100 requests per 15min 'Too many MCP requests. This server is rate limited to prevent abuse.' ); const generalLimiter = createRateLimiter( 60 * 1000, // 1 minute parseInt(process.env.MCP_RATE_LIMIT_GENERAL || '60'), // 60 requests per minute 'Too many requests. Please slow down.' ); // Apply rate limiting app.use(generalLimiter); app.use('/mcp', mcpLimiter); // API Key Authentication (optional but recommended) function authenticateAPIKey(req, res, next) { // Skip auth for health and status checks and landing page if (req.path === '/health' || req.path === '/' || req.path === '/ping') { return next(); } // Check if API keys are configured const validApiKeys = process.env.MCP_API_KEYS?.split(',').map(key => key.trim()).filter(Boolean) || []; if (validApiKeys.length === 0) { // No API keys configured - log warning but allow access console.warn('⚠️ WARNING: MCP_API_KEYS not configured. Server is open to public access!'); console.warn(' For production use, set MCP_API_KEYS="your-secret-key1,your-secret-key2"'); return next(); } // API keys are configured - require authentication const providedKey = req.headers['x-api-key'] || req.headers['api-key'] || req.headers.authorization?.replace('Bearer ', ''); if (!providedKey) { return res.status(401).json({ error: 'Authentication required', message: 'Include X-API-Key header or Authorization: Bearer <key>', hint: 'Contact server administrator for API key' }); } if (!validApiKeys.includes(providedKey)) { console.warn(`🚨 Unauthorized access attempt from ${req.ip} with key: ${providedKey.substring(0, 8)}...`); return res.status(403).json({ error: 'Invalid API key', message: 'The provided API key is not valid' }); } console.log(`✅ Authenticated request from ${req.ip}`); next(); } // Request validation for MCP endpoints function validateMCPRequest(req, res, next) { if (req.method !== 'POST') { return next(); } const body = req.body; // Basic JSON-RPC validation if (!body || typeof body !== 'object') { return res.status(400).json({ error: 'Invalid request', message: 'Request body must be valid JSON' }); } if (body.jsonrpc !== '2.0' || !body.method || !body.id) { return res.status(400).json({ error: 'Invalid JSON-RPC request', message: 'Request must follow JSON-RPC 2.0 specification' }); } // Prevent potentially dangerous methods (if any) const dangeroMethods = ['eval', 'exec', 'system', 'file/write', 'file/delete']; if (dangeroMethods.some(dangerous => body.method.toLowerCase().includes(dangerous))) { console.warn(`🚨 Blocked potentially dangerous method: ${body.method} from ${req.ip}`); return res.status(403).json({ error: 'Method not allowed', message: 'This method is not permitted for security reasons' }); } next(); } // CORS with security considerations app.use(cors({ origin: function(origin, callback) { // Allow requests with no origin (mobile apps, curl, postman, etc.) if (!origin) return callback(null, true); // Check if specific origins are configured const allowedOrigins = process.env.MCP_ALLOWED_ORIGINS?.split(',').map(o => o.trim()) || ['*']; if (allowedOrigins.includes('*')) { // Allow all origins if explicitly configured return callback(null, true); } if (allowedOrigins.includes(origin)) { return callback(null, true); } console.warn(`🚨 Blocked CORS request from unauthorized origin: ${origin}`); callback(new Error('Not allowed by CORS')); }, credentials: true, optionsSuccessStatus: 200 })); // Middleware order matters - auth and validation after CORS app.use(express.json({ limit: '1mb' })); // Limit payload size app.use(authenticateAPIKey); app.use('/mcp', validateMCPRequest); // Security headers for responses app.use((req, res, next) => { res.setHeader('X-Content-Type-Options', 'nosniff'); res.setHeader('X-Frame-Options', 'DENY'); res.setHeader('X-XSS-Protection', '1; mode=block'); res.setHeader('Referrer-Policy', 'strict-origin-when-cross-origin'); next(); }); // MCP Server Process let mcpProcess = null; let mcpReady = false; // SSE connections for MCP (backward compatibility) const sseClients = new Map(); let sseClientId = 1; // Session management for Streamable HTTP const sessions = new Map(); let sessionIdCounter = 1; // Generate secure session ID function generateSecureSessionId() { return crypto.randomBytes(16).toString('hex'); } // Response buffer for handling multi-line JSON let responseBuffer = ''; // Start MCP Server function startMCPServer() { const args = [ 'packages/mcp-server-supabase/dist/transports/stdio.js' ]; // Add command line arguments from environment if (process.env.SUPABASE_PROJECT_REF) { args.push('--project-ref=' + process.env.SUPABASE_PROJECT_REF); } if (process.env.SUPABASE_ACCESS_TOKEN) { args.push('--access-token=' + process.env.SUPABASE_ACCESS_TOKEN); } if (process.env.MCP_FEATURES) { args.push('--features=' + process.env.MCP_FEATURES); } if (process.env.MCP_READ_ONLY === 'true') { args.push('--read-only'); } console.log('Starting MCP server with args:', args); mcpProcess = spawn('node', args, { stdio: ['pipe', 'pipe', 'pipe'], env: { ...process.env } }); mcpProcess.stdout.on('data', (data) => { const text = data.toString(); console.log('MCP stdout:', text); // Add to buffer responseBuffer += text; // Try to parse complete JSON messages const lines = responseBuffer.split('\n'); responseBuffer = lines.pop() || ''; // Keep the last incomplete line lines.forEach(line => { if (line.trim()) { try { const response = JSON.parse(line.trim()); handleMCPResponse(response); } catch (err) { // If it's not complete JSON, add it back to buffer responseBuffer = line + '\n' + responseBuffer; } } }); }); mcpProcess.stderr.on('data', (data) => { console.error('MCP stderr:', data.toString()); }); mcpProcess.on('close', (code) => { console.log('MCP process exited with code:', code); mcpReady = false; mcpProcess = null; }); mcpProcess.on('error', (err) => { console.error('MCP process error:', err); mcpReady = false; }); // Initialize the MCP server setTimeout(() => { const initMessage = { jsonrpc: "2.0", id: "init", method: "initialize", params: { protocolVersion: "2024-11-05", capabilities: {}, clientInfo: { name: "supabase-mcp-http-server", version: "1.0.0" } } }; mcpProcess.stdin.write(JSON.stringify(initMessage) + '\n'); mcpReady = true; console.log('MCP server initialized'); }, 1000); } // Pending requests const pendingRequests = new Map(); // Handle MCP responses function handleMCPResponse(response) { const requestId = response.id; if (pendingRequests.has(requestId)) { const { resolve, sessionId } = pendingRequests.get(requestId); pendingRequests.delete(requestId); resolve(response); // For SSE clients, send the response if (sseClients.has(sessionId)) { const client = sseClients.get(sessionId); client.write(`data: ${JSON.stringify(response)}\n\n`); } } } // Send request to MCP server async function sendMCPRequest(request, sessionId = null) { return new Promise((resolve, reject) => { if (!mcpProcess || !mcpReady) { reject(new Error('MCP server not ready')); return; } const requestId = request.id; pendingRequests.set(requestId, { resolve, reject, sessionId }); // Set timeout setTimeout(() => { if (pendingRequests.has(requestId)) { pendingRequests.delete(requestId); reject(new Error('Request timeout')); } }, 30000); mcpProcess.stdin.write(JSON.stringify(request) + '\n'); }); } // Routes // Landing page with live status dashboard app.get('/', (req, res) => { const hasApiKeys = !!(process.env.MCP_API_KEYS?.split(',').filter(Boolean).length); const rateLimitMcp = process.env.MCP_RATE_LIMIT_REQUESTS || '100'; const rateLimitGeneral = process.env.MCP_RATE_LIMIT_GENERAL || '60'; const allowedOrigins = process.env.MCP_ALLOWED_ORIGINS || '*'; const features = process.env.MCP_FEATURES || 'database,docs,development,functions'; const readOnly = process.env.MCP_READ_ONLY === 'true'; // Security status calculation const securityFeatures = { 'api_keys': hasApiKeys, 'rate_limiting': true, 'cors_protection': allowedOrigins !== '*', 'security_headers': true, 'request_validation': true }; const securityScore = Object.values(securityFeatures).filter(Boolean).length; const maxSecurityScore = Object.keys(securityFeatures).length; res.setHeader('Content-Type', 'application/json'); res.json({ service: '🐳 Supabase MCP Server', description: 'Multi-transport Model Context Protocol server for Supabase', version: '1.0.0', // 🚦 Live status indicators status: { http_server: '🟢 online', mcp_backend: mcpReady ? '🟢 ready' : '🟡 initializing', supabase_connection: mcpReady ? '🟢 connected' : '🔴 disconnected', overall: mcpReady ? '🟢 fully operational' : '🟡 starting up', timestamp: new Date().toISOString() }, // 🔄 Multi-transport support transports: { 'http_streamable': '🚀 POST /mcp (MCP HTTP Streamable Transport - RECOMMENDED)', 'server_sent_events': '🌊 GET/POST /sse (Legacy n8n compatibility)' }, // 📍 Available endpoints endpoints: { 'api_dashboard': '📖 GET / (this page)', 'health_check': '❤️ GET /health', 'mcp_streamable': '🚀 POST /mcp (HTTP Streamable)', 'sse_legacy': '🌊 GET/POST /sse (Legacy SSE)' }, // 🔒 Security dashboard security: { overall_status: securityScore >= 4 ? '🔒 secure' : securityScore >= 2 ? '🟡 basic protection' : '🔴 vulnerable', security_score: `${securityScore}/${maxSecurityScore}`, features: { '🔑 API Authentication': hasApiKeys ? '🟢 enabled' : '🔴 DISABLED (open access!)', '⏱️ Rate Limiting': `🟢 enabled (${rateLimitMcp}/15min, ${rateLimitGeneral}/1min)`, '🌐 CORS Protection': allowedOrigins !== '*' ? '🟢 restricted origins' : '🟡 permissive (*)', '🛡️ Security Headers': '🟢 helmet.js active', '✅ Input Validation': '🟢 JSON-RPC validation' } }, // ⚙️ Configuration info configuration: { 'supabase_features': features.split(','), 'read_only_mode': readOnly ? '🔒 enabled (safe)' : '🔓 disabled (full access)', 'environment': process.env.NODE_ENV || 'development', 'port': port }, // 📝 Quick start examples examples: { 'list_available_tools': { method: 'POST', url: '/mcp', headers: hasApiKeys ? { 'X-API-Key': 'your-api-key', 'Content-Type': 'application/json', 'Accept': 'application/json' } : { 'Content-Type': 'application/json', 'Accept': 'application/json' }, body: { jsonrpc: '2.0', id: 1, method: 'tools/list' } }, 'http_streamable_session': { method: 'POST', url: '/mcp', headers: { 'Content-Type': 'application/json', 'Accept': 'application/json', 'Mcp-Session-Id': 'your-session-id' // Optional, server will create one if not provided }, body: { jsonrpc: '2.0', id: 1, method: 'tools/list' }, description: 'Server responds with Mcp-Session-Id header for session tracking' }, 'health_check': { method: 'GET', url: '/health', auth_required: false, expected_response: { status: 'ok', mcpReady: true } }, 'sse_connection_legacy': { method: 'GET', url: '/sse', headers: { 'Accept': 'text/event-stream', 'Cache-Control': 'no-cache' }, auth_required: hasApiKeys, description: 'Legacy SSE connection for older n8n versions' } }, // 🔗 Useful links links: { 'documentation': 'https://github.com/Silverstar187/supabase-mcp-docker', 'mcp_specification': 'https://modelcontextprotocol.io', 'supabase_platform': 'https://supabase.com' }, // ⚠️ Warnings and recommendations warnings: [ ...(hasApiKeys ? [] : [ '⚠️ SECURITY WARNING: No API keys configured!', '💡 Set MCP_API_KEYS environment variable for production' ]), ...(allowedOrigins === '*' ? [ '⚠️ CORS is permissive - consider restricting origins for production' ] : []), ...(!readOnly ? [ '⚠️ Read-only mode disabled - server has full write access to Supabase' ] : []) ].filter(Boolean) }); }); // Health check app.get('/health', (req, res) => { res.status(200).json({ status: 'ok', mcpReady: mcpReady, timestamp: new Date().toISOString() }); }); // Simple ping endpoint for load balancers app.get('/ping', (req, res) => { res.status(200).send('pong'); }); // ✅ SSE Endpoint speziell für n8n MCP Client Tool Node app.get('/sse', async (req, res) => { console.log('🌊 SSE connection request from:', req.ip); // N8n-compatible SSE Headers with additional specifications res.writeHead(200, { 'Content-Type': 'text/event-stream; charset=utf-8', 'Cache-Control': 'no-cache, no-store, must-revalidate', 'Connection': 'keep-alive', 'Pragma': 'no-cache', 'Expires': '0', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Headers': 'Content-Type, Authorization, Cache-Control, X-API-Key, Accept, Last-Event-ID', 'Access-Control-Allow-Methods': 'GET, POST, OPTIONS', 'Access-Control-Expose-Headers': 'Content-Type, Cache-Control', 'X-Accel-Buffering': 'no', // Disable nginx buffering 'X-Content-Type-Options': 'nosniff' }); const clientId = generateSecureSessionId(); sseClients.set(clientId, res); // Send N8n-compatible initial event (simple format) res.write(`event: endpoint\n`); res.write(`data: /messages?sessionId=${clientId}\n\n`); // Send simplified connection confirmation res.write(`data: ${JSON.stringify({ type: 'ready', status: 'connected', session: clientId })}\n\n`); // Keep-alive ping every 15 seconds const keepAlive = setInterval(() => { try { res.write(`data: ${JSON.stringify({ type: 'ping', timestamp: new Date().toISOString() })}\n\n`); } catch (error) { clearInterval(keepAlive); sseClients.delete(clientId); } }, 15000); // Handle client disconnect req.on('close', () => { clearInterval(keepAlive); sseClients.delete(clientId); console.log(`🌊 SSE client ${clientId} disconnected`); }); req.on('error', () => { clearInterval(keepAlive); sseClients.delete(clientId); }); }); // ✅ SSE Message endpoint für n8n POST-Requests (Improved with session support) app.post('/sse', async (req, res) => { console.log('🌊 SSE POST request from:', req.ip, 'Body:', req.body); try { const response = await sendMCPRequest(req.body); // Send response to all SSE clients const message = `data: ${JSON.stringify(response)}\n\n`; sseClients.forEach((client, clientId) => { try { client.write(message); } catch (error) { sseClients.delete(clientId); } }); // Also send regular HTTP response res.json(response); } catch (error) { const errorResponse = { jsonrpc: "2.0", id: req.body?.id || null, error: { code: -32603, message: error.message } }; // Send error to SSE clients const errorMessage = `data: ${JSON.stringify(errorResponse)}\n\n`; sseClients.forEach((client, clientId) => { try { client.write(errorMessage); } catch (error) { sseClients.delete(clientId); } }); res.status(500).json(errorResponse); } }); // ✅ Enhanced SSE Messages endpoint with sessionId support (n8n compatibility) app.post('/messages', async (req, res) => { const sessionId = req.query.sessionId; if (!sessionId || !sseClients.has(sessionId)) { return res.status(400).json({ jsonrpc: '2.0', id: req.body?.id || null, error: { code: -32000, message: 'Invalid or missing session ID' } }); } console.log('📤 SSE Message received for session:', sessionId, req.body); try { const { jsonrpc, id, method, params } = req.body; if (jsonrpc !== '2.0') { throw new Error('Invalid JSON-RPC version'); } let result; switch (method) { case 'initialize': result = { protocolVersion: '2024-11-05', capabilities: { tools: true, resources: false, prompts: false }, serverInfo: { name: 'supabase-mcp-http-server', version: '1.0.0' } }; break; case 'tools/list': const response = await sendMCPRequest(req.body, sessionId); result = response.result; break; case 'tools/call': if (!params?.name || !params?.arguments) { throw new Error('Missing tool name or arguments'); } const callResponse = await sendMCPRequest(req.body, sessionId); result = callResponse.result; break; case 'ping': result = { message: 'pong', timestamp: new Date().toISOString() }; break; default: // Forward unknown methods to MCP backend const forwardResponse = await sendMCPRequest(req.body, sessionId); result = forwardResponse.result; } const response = { jsonrpc: '2.0', id, result }; // Send response via HTTP (n8n expects this!) res.json(response); // Also send via SSE to the connected client const client = sseClients.get(sessionId); if (client && !client.writableEnded) { client.write(`data: ${JSON.stringify(response)}\n\n`); } } catch (error) { console.error('❌ SSE Message error:', error); const errorResponse = { jsonrpc: '2.0', id: req.body?.id || null, error: { code: -32000, message: error instanceof Error ? error.message : 'Unknown error', data: { method: req.body?.method, sessionId, connectionActive: sseClients.has(sessionId) } } }; res.status(500).json(errorResponse); // Also send error via SSE const client = sseClients.get(sessionId); if (client && !client.writableEnded) { client.write(`data: ${JSON.stringify(errorResponse)}\n\n`); } } }); // Main MCP endpoint - Streamable HTTP Transport app.post('/mcp', async (req, res) => { // Validate Accept header according to MCP specification const acceptHeader = req.headers['accept'] || ''; if (!acceptHeader.includes('application/json') && !acceptHeader.includes('text/event-stream')) { return res.status(406).json({ jsonrpc: "2.0", id: req.body?.id || null, error: { code: -32602, message: "Invalid Accept header. Must include 'application/json' or 'text/event-stream'" } }); } // Get or create secure session ID const sessionId = req.headers['mcp-session-id'] || generateSecureSessionId(); // Track session (create if new) if (!sessions.has(sessionId)) { sessions.set(sessionId, { id: sessionId, created: new Date().toISOString(), lastActivity: new Date().toISOString(), messageHistory: [] // For resumability }); console.log(`🆕 New session created: ${sessionId}`); } else { // Update last activity const session = sessions.get(sessionId); session.lastActivity = new Date().toISOString(); } // Set response headers for Streamable HTTP res.setHeader('Content-Type', 'application/json'); res.setHeader('Mcp-Session-Id', sessionId); try { // Process the request const response = await sendMCPRequest(req.body, sessionId); // Store message in session history for resumability const session = sessions.get(sessionId); if (session) { session.messageHistory.push({ id: `event-${Date.now()}`, timestamp: new Date().toISOString(), request: req.body, response: response }); // Limit history size (keep last 100 messages) if (session.messageHistory.length > 100) { session.messageHistory = session.messageHistory.slice(-100); } } // Send JSON response (HTTP Streamable batch mode) res.json(response); } catch (error) { // Send error response in JSON-RPC format res.status(500).json({ jsonrpc: "2.0", id: req.body?.id || null, error: { code: -32603, message: error.message } }); } }); // DELETE method for session termination (MCP HTTP Stream Transport specification) app.delete('/mcp', (req, res) => { const sessionId = req.headers['mcp-session-id']; if (!sessionId) { return res.status(400).json({ jsonrpc: "2.0", id: null, error: { code: -32602, message: "Missing Mcp-Session-Id header" } }); } // Remove session from our tracking if (sessions.has(sessionId)) { sessions.delete(sessionId); console.log(`🗑️ Session ${sessionId} terminated by client`); } // Also remove from SSE clients if applicable if (sseClients.has(sessionId)) { const client = sseClients.get(sessionId); client.end(); sseClients.delete(sessionId); } res.status(204).end(); // No content response for successful deletion }); // Legacy HTTP endpoint for simple testing app.post('/tools', async (req, res) => { try { const response = await sendMCPRequest({ jsonrpc: "2.0", id: `tools-${Date.now()}`, method: "tools/list", params: {} }); res.json(response.result); } catch (error) { res.status(500).json({ error: error.message }); } }); // Start server console.log('🚀 Starting MCP HTTP Server...'); console.log(`Port: ${port}`); console.log(`Node Environment: ${process.env.NODE_ENV || 'development'}`); console.log(`Supabase Token: ${process.env.SUPABASE_ACCESS_TOKEN ? 'configured' : '❌ MISSING'}`); console.log(`Supabase Project: ${process.env.SUPABASE_PROJECT_REF ? 'configured' : '❌ MISSING'}`); console.log(`API Keys: ${process.env.MCP_API_KEYS ? 'configured' : '⚠️ not configured'}`); startMCPServer(); app.listen(port, () => { console.log(`✅ MCP HTTP Server running on port ${port}`); console.log(`🚀 Streamable HTTP: POST /mcp`); console.log(`🌊 SSE: GET/POST /sse`); console.log(`❤️ Health: GET /health`); console.log(`🏠 Landing: GET /`); console.log(''); console.log('🔗 Ready for connections!'); }); // Graceful shutdown process.on('SIGINT', () => { console.log('Shutting down...'); if (mcpProcess) { mcpProcess.kill(); } process.exit(0); });

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/BIFROTEK-com/supabase-mcp-http-stream-n8n'

If you have feedback or need assistance with the MCP directory API, please join our Discord server