Skip to main content
Glama

vchart-mcp-server

by VisActor
streamable.ts7.09 kB
import { randomUUID } from 'node:crypto'; import type { IncomingMessage, ServerResponse } from 'node:http'; import type { Server } from '@modelcontextprotocol/sdk/server/index.js'; import { type EventStore, StreamableHTTPServerTransport, } from '@modelcontextprotocol/sdk/server/streamableHttp.js'; import { isInitializeRequest } from '@modelcontextprotocol/sdk/types.js'; import { InMemoryEventStore } from '../utils/InMemoryEventStore'; import { getBody } from '../utils/getBody'; import { createBaseHttpServer, type RequestHandlers, } from '../utils/httpServer'; export const startStreamableServer = async ( createServer: () => Server, endpoint = '/streamable', port = 3001, eventStore: EventStore = new InMemoryEventStore() ): Promise<void> => { const activeTransports: Record< string, { server: Server; transport: StreamableHTTPServerTransport; } > = {}; // Define the request handler for streamable-specific logic const handleRequest: RequestHandlers['handleRequest'] = async ( req: IncomingMessage, res: ServerResponse ) => { if (!req.url) { res.writeHead(400).end('No URL'); return; } const reqUrl = new URL(req.url, 'http://localhost'); // Handle POST requests to endpoint if (req.method === 'POST' && reqUrl.pathname === endpoint) { try { const sessionId = Array.isArray(req.headers['mcp-session-id']) ? req.headers['mcp-session-id'][0] : req.headers['mcp-session-id']; let transport: StreamableHTTPServerTransport; let server: Server; const body = await getBody(req); /** * diagram: https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#sequence-diagram. */ // 1. If the sessionId is provided and the server is already created, use the existing transport and server. if (sessionId && activeTransports[sessionId]) { transport = activeTransports[sessionId].transport; server = activeTransports[sessionId].server; // 2. If the sessionId is not provided and the request is an initialize request, create a new transport for the session. } else if (!sessionId && isInitializeRequest(body)) { transport = new StreamableHTTPServerTransport({ // use the event store to store the events to replay on reconnect. // more details: https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#resumability-and-redelivery. eventStore: eventStore || new InMemoryEventStore(), onsessioninitialized: (_sessionId: string) => { // add only when the id Sesison id is generated. activeTransports[_sessionId] = { server, transport, }; }, sessionIdGenerator: randomUUID, }); // Handle the server close event. transport.onclose = async () => { const sid = transport.sessionId; if (sid && activeTransports[sid]) { try { await server?.close(); } catch (error) { console.error('Error closing server:', error); } // delete used transport and server to avoid memory leak. delete activeTransports[sid]; } }; // Create the server try { server = createServer(); } catch (error) { if (error instanceof Response) { res.writeHead(error.status).end(error.statusText); return; } res.writeHead(500).end('Error creating server'); return; } server.connect(transport); await transport.handleRequest(req, res, body); return; } else { // Error if the server is not created but the request is not an initialize request. res.setHeader('Content-Type', 'application/json'); res.writeHead(400).end( JSON.stringify({ error: { code: -32000, message: 'Bad Request: No valid session ID provided', }, id: null, jsonrpc: '2.0', }) ); return; } // Handle the request if the server is already created. await transport.handleRequest(req, res, body); } catch (error) { console.error('Error handling request:', error); res.setHeader('Content-Type', 'application/json'); res.writeHead(500).end( JSON.stringify({ error: { code: -32603, message: 'Internal Server Error' }, id: null, jsonrpc: '2.0', }) ); } return; } // Handle GET requests to endpoint if (req.method === 'GET' && reqUrl.pathname === endpoint) { const sessionId = req.headers['mcp-session-id'] as string | undefined; const activeTransport: | { server: Server; transport: StreamableHTTPServerTransport; } | undefined = sessionId ? activeTransports[sessionId] : undefined; if (!sessionId) { res.writeHead(400).end('No sessionId'); return; } if (!activeTransport) { res.writeHead(400).end('No active transport'); return; } const lastEventId = req.headers['last-event-id'] as string | undefined; if (lastEventId) { console.log(`Client reconnecting with Last-Event-ID: ${lastEventId}`); } else { console.log(`Establishing new SSE stream for session ${sessionId}`); } await activeTransport.transport.handleRequest(req, res); return; } // Handle DELETE requests to endpoint if (req.method === 'DELETE' && reqUrl.pathname === endpoint) { console.log('received delete request'); const sessionId = req.headers['mcp-session-id'] as string | undefined; if (!sessionId) { res.writeHead(400).end('Invalid or missing sessionId'); return; } console.log('received delete request for session', sessionId); const transport = activeTransports[sessionId]?.transport; if (!transport) { res.writeHead(400).end('No active transport'); return; } try { await transport.handleRequest(req, res); } catch (error) { console.error('Error handling delete request:', error); res.writeHead(500).end('Error handling delete request'); } return; } // If we reach here, no handler matched res.writeHead(404).end('Not found'); }; // Custom cleanup for streamable server const cleanup = () => { for (const { server, transport } of Object.values(activeTransports)) { transport.close(); server.close(); } }; // Create the HTTP server using our factory createBaseHttpServer(port, endpoint, { handleRequest, cleanup, serverType: 'HTTP Streamable Server', }); };

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/VisActor/vchart-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server