Skip to main content
Glama
streamable-http.ts8.97 kB
/** * Streamable HTTP Server Transport for MCP Proxy * * MCP Protocol Compliance: * - Headers use Title-Case per MCP spec (Mcp-Session-Id, Mcp-Protocol-Version) * - CORS headers expose custom headers to clients * - Protocol version validation (2024-11-05) * - JSON-RPC 2.0 compliant error codes * * JSON-RPC Error Codes Used: * - -32600: Invalid Request (malformed request, unsupported protocol version) * - -32601: Method not found (HTTP method not allowed) * - -32603: Internal error (server-side exception) * - -32001: Server error - Unauthorized (auth failure) * - -32000: Server error - Generic application error (session not found, etc.) */ import express from 'express'; import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js'; import { Server } from '@modelcontextprotocol/sdk/server/index.js'; import { debugLog, debugError } from './debug-log.js'; import { MCP_SESSION_ID_HEADER, JSON_RPC_ERROR_CODES, SESSION_TTL_MS, SESSION_CLEANUP_INTERVAL_MS, MAX_SESSIONS, } from './constants.js'; import { corsMiddleware, versionMiddleware, acceptMiddleware, createAuthMiddleware, createWellKnownHandler, resolveTransport, } from './middleware.js'; // Session metadata interface interface SessionMetadata { transport: StreamableHTTPServerTransport; lastAccess: number; } // Map to store active sessions with metadata (for stateful mode) const sessions = new Map<string, SessionMetadata>(); /** * Clean up expired sessions based on TTL * @returns Number of sessions cleaned up */ function cleanupExpiredSessions(): number { const now = Date.now(); let cleanedCount = 0; for (const [sessionId, metadata] of sessions.entries()) { if (now - metadata.lastAccess > SESSION_TTL_MS) { try { metadata.transport.close().catch(error => { debugError(`Error closing expired session ${sessionId}:`, error); }); } catch (error) { debugError(`Error closing expired session ${sessionId}:`, error); } sessions.delete(sessionId); cleanedCount++; } } if (cleanedCount > 0) { debugLog(`Cleaned up ${cleanedCount} expired sessions`); } return cleanedCount; } /** * Evict oldest session when max sessions limit is reached (LRU eviction) */ function evictOldestSession(): void { if (sessions.size === 0) return; let oldestSessionId: string | null = null; let oldestAccessTime = Infinity; // Find session with oldest access time for (const [sessionId, metadata] of sessions.entries()) { if (metadata.lastAccess < oldestAccessTime) { oldestAccessTime = metadata.lastAccess; oldestSessionId = sessionId; } } if (oldestSessionId) { const metadata = sessions.get(oldestSessionId); if (metadata) { try { metadata.transport.close().catch(error => { debugError(`Error closing evicted session ${oldestSessionId}:`, error); }); } catch (error) { debugError(`Error closing evicted session ${oldestSessionId}:`, error); } sessions.delete(oldestSessionId); debugLog(`Evicted oldest session ${oldestSessionId} (LRU eviction)`); } } } export interface StreamableHTTPOptions { port: number; requireApiAuth?: boolean; stateless?: boolean; } /** * Start a Streamable HTTP server for the MCP proxy * @param server The MCP server instance * @param options Configuration options * @returns Cleanup function to stop the HTTP server */ export async function startStreamableHTTPServer( server: Server, options: StreamableHTTPOptions ): Promise<() => Promise<void>> { const app = express(); const { port, requireApiAuth = false, stateless = false } = options; // Apply middleware in order: CORS, version validation, accept normalization app.use(corsMiddleware); app.use(versionMiddleware); app.use(acceptMiddleware); // Serve static files from .well-known directory (for Smithery discovery) // This must come AFTER CORS but BEFORE authentication const wellKnownHandler = createWellKnownHandler(); app.use('/.well-known', wellKnownHandler); app.use('/mcp/.well-known', wellKnownHandler); // Middleware to parse JSON bodies app.use(express.json()); // Authentication middleware - only for MCP endpoint app.use(createAuthMiddleware(requireApiAuth)); // Shared MCP handler used for both /mcp and / routes const mcpHandler = async (req: any, res: any) => { try { const transport = await resolveTransport(req, res, server, stateless, sessions); const sessionId = stateless ? undefined : (req.headers['mcp-session-id'] as string); // Handle different HTTP methods switch (req.method) { case 'POST': // POST requests have req.body parsed by express.json() middleware // Pass the parsed body to avoid "stream is not readable" error await transport.handleRequest(req, res, req.body); break; case 'GET': // GET requests (SSE) don't have a request body // Pass undefined explicitly as body parameter is optional await transport.handleRequest(req, res, undefined); break; case 'DELETE': // Handle session termination if (stateless) { // In stateless mode, always return success res.status(200).json({ success: true, message: 'Stateless mode - no session to terminate' }); } else if (sessionId && sessions.has(sessionId)) { // Session exists, delete it const metadata = sessions.get(sessionId)!; await metadata.transport.close(); sessions.delete(sessionId); res.status(200).json({ success: true, message: 'Session terminated' }); } else { // Session ID not provided or doesn't exist - return success as nothing to delete res.status(200).json({ success: true, message: 'Session not found' }); } break; default: res.status(405).json({ jsonrpc: '2.0', error: { code: JSON_RPC_ERROR_CODES.METHOD_NOT_FOUND, message: `HTTP method ${req.method} not allowed` } }); } // Clean up transport in stateless mode if (stateless && req.method !== 'GET') { await transport.close(); } } catch (error) { debugError('Error handling request:', error); res.status(500).json({ jsonrpc: '2.0', error: { code: JSON_RPC_ERROR_CODES.INTERNAL_ERROR, message: 'Internal server error', // Only expose error details in development to prevent information disclosure ...(process.env.NODE_ENV === 'development' && { data: error instanceof Error ? error.message : String(error) }) } }); } }; // MCP endpoint handler (preferred path) app.all('/mcp', mcpHandler); // Fallback root path handler for clients that POST to base URL app.all('/', mcpHandler); // Health check endpoint app.get('/health', (_req: any, res: any) => { res.json({ status: 'ok', transport: 'streamable-http', sessions: stateless ? 0 : sessions.size, maxSessions: stateless ? 0 : MAX_SESSIONS }); }); // Set up periodic session cleanup (only in stateful mode) let cleanupInterval: NodeJS.Timeout | null = null; if (!stateless) { cleanupInterval = setInterval(() => { cleanupExpiredSessions(); }, SESSION_CLEANUP_INTERVAL_MS); debugLog(`Session cleanup interval started (every ${SESSION_CLEANUP_INTERVAL_MS / 1000}s)`); } // Start the Express server // Default to localhost for security, but allow override via BIND_HOST // In Docker/Cloud (Smithery), Dockerfile sets BIND_HOST=0.0.0.0 to accept external connections const host = process.env.BIND_HOST || 'localhost'; const httpServer = app.listen(port, host, () => { debugLog(`Streamable HTTP server listening on ${host}:${port}`); if (stateless) { debugLog('Running in stateless mode'); } else { debugLog('Running in stateful mode (session-based)'); } if (requireApiAuth) { debugLog('API authentication required'); } }); // Return cleanup function return async () => { // Clear cleanup interval if (cleanupInterval) { clearInterval(cleanupInterval); debugLog('Session cleanup interval stopped'); } // Close all active sessions for (const [sessionId, metadata] of sessions) { try { await metadata.transport.close(); } catch (error) { debugError(`Error closing transport for session ${sessionId}:`, error); } } sessions.clear(); // Close the HTTP server return new Promise((resolve) => { httpServer.close(() => { debugLog('Streamable HTTP server stopped'); resolve(); }); }); }; }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/VeriTeknik/pluggedin-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server