Skip to main content
Glama

Simple Streamable HTTP MCP Server

index.ts14.7 kB
import { z } from 'zod'; import { randomUUID } from 'node:crypto'; import { zodToJsonSchema } from "zod-to-json-schema"; import express, { Request, Response } from 'express'; import { Server } from "@modelcontextprotocol/sdk/server/index.js"; import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js'; import { Tool, ToolSchema, CallToolRequestSchema, ListToolsRequestSchema, } from "@modelcontextprotocol/sdk/types.js"; console.error('Starting Streamable HTTP server...'); const app = express(); app.use(express.json()); const { DEBUG = 'true' } = process.env; const debug = (...args: any[]) => { if (DEBUG?.toLowerCase() === 'true' || DEBUG === '1') { console.error('[DEBUG]', ...args); } }; // Store transports by session ID const transports: Map<string, StreamableHTTPServerTransport> = new Map(); // Tool schemas and types const ToolInputSchema = ToolSchema.shape.inputSchema; type ToolInput = z.infer<typeof ToolInputSchema>; // Define tool schemas const HelloWorldSchema = z.object({ name: z.string().describe("The name to greet") }); const GetServerInfoSchema = z.object({}); const LongRunningTestSchema = z.object({ duration: z.number().optional().default(30).describe("Duration in seconds (default: 30)"), steps: z.number().optional().default(10).describe("Number of progress steps (default: 10)"), message: z.string().optional().describe("Optional message to include in the response") }); const SlowTestSchema = z.object({ message: z.string().optional().describe("Optional message to include in the response"), steps: z.number().optional().default(20).describe("Number of progress steps (default: 20)") }); // Tool names enum enum ToolName { HELLO_WORLD = "hello_world", GET_SERVER_INFO = "get_server_info", LONG_RUNNING_TEST = "long_running_test", SLOW_TEST = "slow_test" } // Function to create a new MCP server instance function createServer() { const server = new Server( { name: "simple-streamable-http-mcp-server", version: "1.0.0", }, { instructions: "A simple test MCP server implemented with Streamable HTTP transport. Supports basic tools and long-running operations with progress updates.", capabilities: { tools: {} } } ); // Set up the list tools handler server.setRequestHandler(ListToolsRequestSchema, async () => { console.error('[TOOLS LIST] Listing available tools'); const tools: Tool[] = [ { name: ToolName.HELLO_WORLD, description: "A simple tool that returns a greeting", inputSchema: zodToJsonSchema(HelloWorldSchema) as ToolInput, }, { name: ToolName.GET_SERVER_INFO, description: "Get information about the server", inputSchema: zodToJsonSchema(GetServerInfoSchema) as ToolInput, }, { name: ToolName.LONG_RUNNING_TEST, description: "A test tool that demonstrates long-running operations with progress updates", inputSchema: zodToJsonSchema(LongRunningTestSchema) as ToolInput, }, { name: ToolName.SLOW_TEST, description: "A test tool that takes 10 minutes to complete and returns timing information", inputSchema: zodToJsonSchema(SlowTestSchema) as ToolInput, } ]; return { tools }; }); // Set up the call tool handler server.setRequestHandler(CallToolRequestSchema, async (request, extra) => { const { name, arguments: args } = request.params; console.error(`[TOOL CALL] Tool: ${name}, Args:`, JSON.stringify(args, null, 2)); debug(`Tool request details:`, JSON.stringify(request.params, null, 2)); if (name === ToolName.HELLO_WORLD) { const validatedArgs = HelloWorldSchema.parse(args); debug(`hello_world tool called with args:`, validatedArgs); await new Promise(resolve => setTimeout(resolve, 200)); return { content: [{ type: "text", text: `Hello, ${validatedArgs.name}! Welcome to the MCP server.` }] }; } if (name === ToolName.GET_SERVER_INFO) { debug(`get_server_info tool called`); return { content: [{ type: "text", text: JSON.stringify({ name: "Simple Streamable HTTP MCP Server", version: "1.0.0", features: ["tools"], timestamp: new Date().toISOString() }, null, 2) }] }; } if (name === ToolName.LONG_RUNNING_TEST) { const validatedArgs = LongRunningTestSchema.parse(args); const { duration = 30, steps = 10, message } = validatedArgs; const startTime = new Date(); const startTimestamp = startTime.toISOString(); debug(`long_running_test started at: ${startTimestamp}, duration: ${duration}s, steps: ${steps}`); // Get progress token if available const progressToken = request.params._meta?.progressToken; const stepDurationMs = (duration * 1000) / steps; // Send progress updates for (let i = 1; i <= steps; i++) { await new Promise(resolve => setTimeout(resolve, stepDurationMs)); if (progressToken !== undefined) { try { console.error(`[PROGRESS] Sending progress update: ${i}/${steps} for token: ${progressToken}`); await server.notification({ method: "notifications/progress", params: { progress: i, total: steps, progressToken, }, }, { relatedRequestId: extra.requestId }); console.error(`[PROGRESS] Successfully sent progress update: ${i}/${steps}`); } catch (error) { console.error(`[PROGRESS ERROR] Failed to send progress notification:`, error); } } else { debug(`No progress token provided, skipping progress update ${i}/${steps}`); } } const endTime = new Date(); const endTimestamp = endTime.toISOString(); const actualDurationMs = endTime.getTime() - startTime.getTime(); debug(`long_running_test completed at: ${endTimestamp}`); return { content: [{ type: "text", text: JSON.stringify({ message: message || "Long-running test completed successfully", start: startTimestamp, finish: endTimestamp, requestedDuration: duration, actualDuration: { milliseconds: actualDurationMs, seconds: actualDurationMs / 1000 }, steps: steps }, null, 2) }] }; } if (name === ToolName.SLOW_TEST) { const validatedArgs = SlowTestSchema.parse(args); const { message, steps = 20 } = validatedArgs; const startTime = new Date(); const startTimestamp = startTime.toISOString(); debug(`slow_test tool started at: ${startTimestamp}`); // Get progress token if available const progressToken = request.params._meta?.progressToken; // Wait for 10 minutes (600,000 milliseconds) const tenMinutesMs = 10 * 60 * 1000; const stepDurationMs = tenMinutesMs / steps; // Send progress updates for (let i = 1; i <= steps; i++) { await new Promise(resolve => setTimeout(resolve, stepDurationMs)); if (progressToken !== undefined) { try { console.error(`[PROGRESS] Sending progress update: ${i}/${steps} for token: ${progressToken}`); await server.notification({ method: "notifications/progress", params: { progress: i, total: steps, progressToken, }, }, { relatedRequestId: extra.requestId }); console.error(`[PROGRESS] Successfully sent progress update: ${i}/${steps}`); } catch (error) { console.error(`[PROGRESS ERROR] Failed to send progress notification:`, error); } } else { debug(`No progress token provided, skipping progress update ${i}/${steps}`); } // Log progress every 5 steps if (i % 5 === 0) { const elapsedMinutes = (i / steps) * 10; console.error(`[SLOW_TEST] Progress: ${i}/${steps} steps (${elapsedMinutes.toFixed(1)} minutes elapsed)`); } } const endTime = new Date(); const endTimestamp = endTime.toISOString(); const durationMs = endTime.getTime() - startTime.getTime(); const durationMinutes = durationMs / (60 * 1000); debug(`slow_test tool completed at: ${endTimestamp}`); return { content: [{ type: "text", text: JSON.stringify({ message: message || "Slow test completed successfully", start: startTimestamp, finish: endTimestamp, duration: { milliseconds: durationMs, seconds: durationMs / 1000, minutes: durationMinutes }, steps: steps }, null, 2) }] }; } throw new Error(`Unknown tool: ${name}`); }); return { server }; } // Handle POST requests app.post('/mcp', async (req: Request, res: Response) => { console.error('Received MCP POST request'); console.error('Request method:', req.body?.method); debug('Headers:', JSON.stringify(req.headers, null, 2)); debug('Body:', JSON.stringify(req.body, null, 2)); try { // Check for existing session ID const sessionId = req.headers['mcp-session-id'] as string | undefined; let transport: StreamableHTTPServerTransport; if (sessionId && transports.has(sessionId)) { // Reuse existing transport transport = transports.get(sessionId)!; console.error(`[SESSION] Reusing existing transport for session ${sessionId}`); } else if (!sessionId) { // New initialization request console.error('[SESSION] Creating new server for initialization request'); const { server } = createServer(); transport = new StreamableHTTPServerTransport({ sessionIdGenerator: () => randomUUID(), enableJsonResponse: true, onsessioninitialized: (sessionId: string) => { console.error(`Session initialized with ID: ${sessionId}`); transports.set(sessionId, transport); } }); // Set up onclose handler to clean up transport when closed server.onclose = async () => { const sid = transport.sessionId; if (sid && transports.has(sid)) { console.error(`Transport closed for session ${sid}, removing from transports map`); transports.delete(sid); } }; // Connect the transport to the MCP server BEFORE handling the request await server.connect(transport); await transport.handleRequest(req, res, req.body); return; // Already handled } else { // Invalid request - session ID provided but not found res.status(400).json({ jsonrpc: '2.0', error: { code: -32000, message: 'Bad Request: Invalid session ID', }, id: req?.body?.id, }); return; } // Handle the request with existing transport await transport.handleRequest(req, res, req.body); } catch (error) { console.error('Error handling MCP request:', error); if (!res.headersSent) { res.status(500).json({ jsonrpc: '2.0', error: { code: -32603, message: 'Internal server error', }, id: req?.body?.id, }); } } }); // Handle GET requests for SSE streams app.get('/mcp', async (req: Request, res: Response) => { console.error('Received MCP GET request'); const sessionId = req.headers['mcp-session-id'] as string | undefined; if (!sessionId || !transports.has(sessionId)) { res.status(400).json({ jsonrpc: '2.0', error: { code: -32000, message: 'Bad Request: No valid session ID provided', }, id: null, }); return; } const lastEventId = req.headers['last-event-id'] as string | undefined; if (lastEventId) { console.error(`Client reconnecting with Last-Event-ID: ${lastEventId}`); } else { console.error(`Establishing new SSE stream for session ${sessionId}`); } const transport = transports.get(sessionId)!; await transport.handleRequest(req, res); }); // Handle DELETE requests for session termination app.delete('/mcp', async (req: Request, res: Response) => { const sessionId = req.headers['mcp-session-id'] as string | undefined; if (!sessionId || !transports.has(sessionId)) { res.status(400).json({ jsonrpc: '2.0', error: { code: -32000, message: 'Bad Request: No valid session ID provided', }, id: null, }); return; } console.error(`Received session termination request for session ${sessionId}`); try { const transport = transports.get(sessionId)!; await transport.handleRequest(req, res, req.body); } catch (error) { console.error('Error handling session termination:', error); if (!res.headersSent) { res.status(500).json({ jsonrpc: '2.0', error: { code: -32603, message: 'Error handling session termination', }, id: null, }); } } }); // Health check endpoint app.get('/health', (req, res) => { res.json({ status: 'ok', server: { name: "simple-streamable-http-mcp-server", version: "1.0.0" }, transport: 'streamable-http', activeSessions: transports.size }); }); // Start server const PORT = process.env.PORT || 3001; app.listen(PORT, () => { console.error(`MCP Streamable HTTP Server listening on port ${PORT}`); console.log(`📨 MCP endpoint: http://localhost:${PORT}/mcp`); console.log(`❤️ Health check: http://localhost:${PORT}/health`); console.log(`🛠️ Available tools: hello_world, get_server_info, long_running_test, slow_test`); }); // Handle server shutdown process.on('SIGINT', async () => { console.error('Shutting down server...'); // Close all active transports for (const [sessionId, transport] of transports) { try { console.error(`Closing transport for session ${sessionId}`); await transport.close(); transports.delete(sessionId); } catch (error) { console.error(`Error closing transport for session ${sessionId}:`, error); } } console.error('Server shutdown complete'); process.exit(0); });

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/danny-avila/Example-MCP-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server