Skip to main content
Glama
cameronsjo

MCP Server Template

by cameronsjo
http-transport.ts11.5 kB
/** * Streamable HTTP Transport for MCP Server (MCP 2025-11-25 Spec) * * Uses the official SDK StreamableHTTPServerTransport with: * - Session management via Mcp-Session-Id header * - Server-Sent Events (SSE) for streaming * - Event store for resumability * - OAuth 2.1 protected resource metadata * - Health check endpoints (liveness/readiness) * - DNS rebinding protection via SDK middleware */ import express, { Express, Request, Response, NextFunction } from 'express'; import { randomUUID } from 'node:crypto'; import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js'; import { createMcpExpressApp } from '@modelcontextprotocol/sdk/server/express.js'; import { isInitializeRequest } from '@modelcontextprotocol/sdk/types.js'; import type { Server } from '@modelcontextprotocol/sdk/server/index.js'; import { createLogger } from '../shared/logger.js'; import { getConfig } from '../config/index.js'; import { mountHealthEndpoints, registerHealthCheck, CommonHealthChecks } from '../shared/health.js'; import { JwtValidator, extractBearerToken, createJwtValidatorFromEnv } from '../shared/jwt.js'; import { AuthenticationError } from '../shared/errors.js'; const logger = createLogger('http-transport'); const config = getConfig(); /** * In-memory event store for resumability */ class InMemoryEventStore { private events: Map<string, Array<{ eventId: string; message: unknown }>> = new Map(); private eventIdCounter = 0; async storeEvent(streamId: string, message: unknown): Promise<string> { const eventId = `evt_${++this.eventIdCounter}`; const events = this.events.get(streamId) ?? []; events.push({ eventId, message }); this.events.set(streamId, events); return eventId; } async replayEventsAfter( lastEventId: string, { send }: { send: (eventId: string, message: unknown) => Promise<void> } ): Promise<string> { // Find the stream containing this event for (const [streamId, events] of this.events) { const lastIndex = events.findIndex((e) => e.eventId === lastEventId); if (lastIndex !== -1) { // Replay events after the last seen event for (let i = lastIndex + 1; i < events.length; i++) { const event = events[i]; if (event) { await send(event.eventId, event.message); } } return streamId; } } return ''; } async getStreamIdForEventId(eventId: string): Promise<string | undefined> { for (const [streamId, events] of this.events) { if (events.some((e) => e.eventId === eventId)) { return streamId; } } return undefined; } } /** * Store for active transports by session ID */ const transports = new Map<string, StreamableHTTPServerTransport>(); /** * OAuth 2.1 Protected Resource Metadata (RFC 9728) */ interface ProtectedResourceMetadata { resource: string; authorization_servers: string[]; scopes_supported: string[]; bearer_methods_supported: string[]; } /** * Create the protected resource metadata */ function getProtectedResourceMetadata(baseUrl: string): ProtectedResourceMetadata { return { resource: baseUrl, authorization_servers: [ process.env['MCP_SERVER_AUTH_SERVER'] ?? 'https://auth.example.com', ], scopes_supported: ['read', 'write', 'admin'], bearer_methods_supported: ['header'], }; } /** * JWT authentication middleware */ function createJwtAuthMiddleware( jwtValidator: JwtValidator | null ): (req: Request, res: Response, next: NextFunction) => Promise<void> { return async (req: Request, res: Response, next: NextFunction): Promise<void> => { // Skip auth in development mode without token const authHeader = req.get('Authorization'); if (config.debugMode && !authHeader) { next(); return; } // Require auth if validator is configured if (!jwtValidator) { next(); return; } const token = extractBearerToken(authHeader); if (!token) { const resourceMetadata = `${req.protocol}://${req.get('host') ?? 'localhost'}/.well-known/oauth-protected-resource`; res.set('WWW-Authenticate', `Bearer resource_metadata="${resourceMetadata}"`); res.status(401).json({ error: 'Authorization required' }); return; } try { const claims = await jwtValidator.validate(token); // Attach claims to request for downstream use (req as Request & { auth?: unknown }).auth = claims; next(); } catch (error) { if (error instanceof AuthenticationError) { res.status(401).json({ error: error.message }); return; } logger.error('JWT validation error', { error: String(error) }); res.status(401).json({ error: 'Invalid token' }); } }; } /** * Create Express app for Streamable HTTP transport */ export function createHttpTransport(options?: { allowedHosts?: string[]; requireAuth?: boolean; mcpServer?: Server; }): Express { // Use SDK's createMcpExpressApp for DNS rebinding protection const app = options?.allowedHosts ? createMcpExpressApp({ allowedHosts: options.allowedHosts }) : createMcpExpressApp(); // Parse JSON bodies app.use(express.json()); // Mount health check endpoints mountHealthEndpoints(app); // Register default health checks registerHealthCheck('memory', CommonHealthChecks.memory(512)); registerHealthCheck('eventLoop', CommonHealthChecks.eventLoop(100)); // OAuth Protected Resource Metadata (RFC 9728) app.get('/.well-known/oauth-protected-resource', (req: Request, res: Response) => { const baseUrl = `${req.protocol}://${req.get('host') ?? 'localhost'}`; res.json(getProtectedResourceMetadata(baseUrl)); }); // MCP endpoint discovery (MCP 2025-11-25) app.get('/.well-known/mcp', (_req: Request, res: Response) => { res.json({ name: config.serverName, version: config.serverVersion, protocol_version: '2025-11-25', capabilities: { tools: {}, resources: {}, prompts: {}, }, }); }); // Set up JWT validation if configured const jwtValidator = options?.requireAuth ? createJwtValidatorFromEnv() : null; const authMiddleware = createJwtAuthMiddleware(jwtValidator); // MCP POST endpoint - handles JSON-RPC requests const mcpPostHandler = async (req: Request, res: Response): Promise<void> => { const sessionId = req.get('mcp-session-id'); try { let transport: StreamableHTTPServerTransport; if (sessionId && transports.has(sessionId)) { // Reuse existing transport for session transport = transports.get(sessionId)!; } else if (!sessionId && isInitializeRequest(req.body)) { // New initialization request - create transport with resumability const eventStore = new InMemoryEventStore(); transport = new StreamableHTTPServerTransport({ sessionIdGenerator: () => randomUUID(), eventStore, onsessioninitialized: (newSessionId: string) => { logger.info('Session initialized', { sessionId: newSessionId }); transports.set(newSessionId, transport); }, onsessionclosed: (closedSessionId: string) => { logger.info('Session closed', { sessionId: closedSessionId }); transports.delete(closedSessionId); }, }); // Set up close handler transport.onclose = () => { const sid = transport.sessionId; if (sid && transports.has(sid)) { logger.debug('Transport closed, removing from map', { sessionId: sid }); transports.delete(sid); } }; // Connect to MCP server if provided if (options?.mcpServer) { await options.mcpServer.connect(transport); } await transport.handleRequest(req, res, req.body); return; } else { // Invalid request res.status(400).json({ jsonrpc: '2.0', error: { code: -32000, message: 'Bad Request: No valid session ID provided', }, id: null, }); return; } // Handle request with existing transport await transport.handleRequest(req, res, req.body); } catch (error) { logger.error('Error handling MCP request', { error: String(error) }); if (!res.headersSent) { res.status(500).json({ jsonrpc: '2.0', error: { code: -32603, message: 'Internal server error', }, id: null, }); } } }; // MCP GET endpoint - SSE stream const mcpGetHandler = async (req: Request, res: Response): Promise<void> => { const sessionId = req.get('mcp-session-id'); if (!sessionId || !transports.has(sessionId)) { res.status(400).json({ error: 'Invalid or missing session ID' }); return; } const lastEventId = req.get('last-event-id'); if (lastEventId) { logger.debug('Client reconnecting with Last-Event-ID', { sessionId, lastEventId }); } const transport = transports.get(sessionId)!; await transport.handleRequest(req, res); }; // MCP DELETE endpoint - session termination const mcpDeleteHandler = async (req: Request, res: Response): Promise<void> => { const sessionId = req.get('mcp-session-id'); if (!sessionId || !transports.has(sessionId)) { res.status(400).json({ error: 'Invalid or missing session ID' }); return; } logger.info('Session termination requested', { sessionId }); try { const transport = transports.get(sessionId)!; await transport.handleRequest(req, res); } catch (error) { logger.error('Error handling session termination', { error: String(error) }); if (!res.headersSent) { res.status(500).json({ error: 'Error processing session termination' }); } } }; // Register routes with optional auth if (options?.requireAuth) { app.post('/mcp', authMiddleware, mcpPostHandler); app.get('/mcp', authMiddleware, mcpGetHandler); app.delete('/mcp', authMiddleware, mcpDeleteHandler); } else { app.post('/mcp', mcpPostHandler); app.get('/mcp', mcpGetHandler); app.delete('/mcp', mcpDeleteHandler); } return app; } /** * Get active session count */ export function getSessionCount(): number { return transports.size; } /** * Close all active transports (for shutdown) */ export async function closeAllTransports(): Promise<void> { const closePromises: Promise<void>[] = []; for (const [sessionId, transport] of transports) { logger.debug('Closing transport', { sessionId }); closePromises.push( transport.close().catch((error) => { logger.error('Error closing transport', { sessionId, error: String(error) }); }) ); } await Promise.all(closePromises); transports.clear(); logger.info('All transports closed'); } /** * Start session cleanup task (cleanup handled by transport callbacks) */ export function startSessionCleanup(): NodeJS.Timeout { // The SDK handles session cleanup via onsessionclosed callback // This is kept for API compatibility but does minimal work return setInterval(() => { logger.debug('Session count', { active: transports.size }); }, 5 * 60 * 1000); } /** * Clear all sessions (for testing) */ export function clearSessions(): void { transports.clear(); }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/cameronsjo/mcp-server-template'

If you have feedback or need assistance with the MCP directory API, please join our Discord server