Skip to main content
Glama
transportRegistry.ts9.37 kB
import { randomUUID } from "node:crypto"; import type { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js"; import type { JSONRPCMessage } from "@modelcontextprotocol/sdk/types.js"; import { isInitializeRequest } from "@modelcontextprotocol/sdk/types.js"; import type { ILogger } from "../core/logger.js"; import type { Result } from "../core/result.js"; import { createErrorResult, createSuccessResult } from "../core/result.js"; import type { StreamableHttpTransportConfig } from "./config.js"; import type { ISessionManager } from "./sessionManager.js"; /** * Session entry containing transport and server instances */ interface SessionEntry { /** * StreamableHTTPServerTransport instance for this session */ transport: StreamableHTTPServerTransport; /** * MCP Server instance for this session */ server: McpServer; /** * Timestamp when session was created (milliseconds since epoch) */ createdAt: number; } /** * Transport Registry * * Manages per-session transport and server instances for Streamable HTTP transport. * Each session gets its own isolated transport and server to maintain independent MCP protocol state. * * Key Responsibilities: * - Create new transport + server instances for new sessions * - Reuse existing transport for requests with valid session IDs * - Clean up sessions on close or expiration * - Track active session count for health checks * * @example * ```typescript * const registry = new TransportRegistry(config, sessionManager, logger); * * // In HTTP request handler * const transport = await registry.getOrCreate( * sessionId, * requestBody, * () => createTouchDesignerServer() * ); * * if (transport) { * await transport.handleRequest(req, res, requestBody); * } * * // On shutdown * await registry.cleanup(); * ``` */ export class TransportRegistry { private readonly sessions: Map<string, SessionEntry> = new Map(); private readonly config: StreamableHttpTransportConfig; private readonly sessionManager: ISessionManager | null; private readonly logger: ILogger; constructor( config: StreamableHttpTransportConfig, sessionManager: ISessionManager | null, logger: ILogger, ) { this.config = config; this.sessionManager = sessionManager; this.logger = logger; if (this.sessionManager) { this.sessionManager.setExpirationHandler(async (sessionId: string) => { const entry = this.sessions.get(sessionId); if (!entry) { return; } this.logger.sendLog({ data: `Expiring session via TTL: ${sessionId}`, level: "info", logger: "TransportRegistry", }); try { await entry.transport.close(); } catch (error) { const err = error instanceof Error ? error : new Error(String(error)); this.logger.sendLog({ data: `Error closing expired session ${sessionId}: ${err.message}`, level: "error", logger: "TransportRegistry", }); } finally { this.remove(sessionId); } }); } } /** * Get or create transport for a session * * Logic: * 1. If sessionId exists and valid → return existing transport * 2. If no sessionId and request is initialize → create new transport + server * 3. Otherwise → return null (invalid session) * * @param sessionId - Session ID from mcp-session-id header (undefined for new sessions) * @param requestBody - JSON-RPC request body * @param serverFactory - Factory function to create new Server instances * @returns Transport instance or null if session is invalid */ async getOrCreate( sessionId: string | undefined, requestBody: JSONRPCMessage, serverFactory: () => McpServer, ): Promise<StreamableHTTPServerTransport | null> { // Case 1: Reuse existing session if (sessionId && this.sessions.has(sessionId)) { const entry = this.sessions.get(sessionId); if (entry) { if (this.sessionManager) { this.sessionManager.touch(sessionId); } this.logger.sendLog({ data: `Reusing existing session: ${sessionId}`, level: "debug", logger: "TransportRegistry", }); return entry.transport; } } // Case 2: Create new session (only for initialize requests without session ID) if (!sessionId && isInitializeRequest(requestBody)) { return await this.createSession(serverFactory); } // Case 3: Invalid session (session ID provided but not found, or non-initialize without session) this.logger.sendLog({ data: `Invalid session request: sessionId=${sessionId}, isInitialize=${isInitializeRequest(requestBody)}`, level: "warning", logger: "TransportRegistry", }); return null; } /** * Create a new session with transport and server instances * * @param serverFactory - Factory function to create new Server instances * @returns Transport instance for the new session */ private async createSession( serverFactory: () => McpServer, ): Promise<StreamableHTTPServerTransport> { let transport: StreamableHTTPServerTransport | null = null; let server: McpServer | null = null; // Create transport with session lifecycle callbacks transport = new StreamableHTTPServerTransport({ // Disable JSON responses (use SSE for streaming) enableJsonResponse: false, // Session close callback onsessionclosed: (sessionId: string) => { this.logger.sendLog({ data: `Session closed: ${sessionId}`, level: "info", logger: "TransportRegistry", }); // Remove from registry this.remove(sessionId); // Cleanup from SessionManager if (this.sessionManager) { this.sessionManager.cleanup(sessionId); } }, // Session initialization callback onsessioninitialized: (sessionId: string) => { this.logger.sendLog({ data: `Session initialized: ${sessionId}`, level: "info", logger: "TransportRegistry", }); // Store session in registry if (transport && server) { this.sessions.set(sessionId, { createdAt: Date.now(), server, transport, }); this.logger.sendLog({ data: `Session stored in registry: ${sessionId} (total: ${this.sessions.size})`, level: "debug", logger: "TransportRegistry", }); } // Register with SessionManager for TTL tracking if (this.sessionManager) { this.sessionManager.register(sessionId); } }, // Retry interval for SSE retryInterval: this.config.retryInterval, // Session ID generator sessionIdGenerator: this.config.sessionConfig?.enabled ? () => randomUUID() : undefined, }); // Handle transport close event transport.onclose = () => { if (transport?.sessionId) { this.logger.sendLog({ data: `Transport closed for session: ${transport.sessionId}`, level: "debug", logger: "TransportRegistry", }); this.remove(transport.sessionId); } }; // Create server instance server = serverFactory(); // Connect server to transport await server.connect(transport); this.logger.sendLog({ data: "Created new session (session ID will be assigned after initialize)", level: "info", logger: "TransportRegistry", }); return transport; } /** * Remove session from registry * * @param sessionId - Session ID to remove */ remove(sessionId: string): void { const entry = this.sessions.get(sessionId); if (entry) { this.sessions.delete(sessionId); this.logger.sendLog({ data: `Session removed from registry: ${sessionId} (remaining: ${this.sessions.size})`, level: "info", logger: "TransportRegistry", }); } } /** * Get number of active sessions * * @returns Active session count */ getCount(): number { return this.sessions.size; } /** * Get all session IDs * * @returns Array of session IDs */ getSessionIds(): string[] { return Array.from(this.sessions.keys()); } /** * Cleanup all sessions * * Called during graceful shutdown to close all active sessions * * @returns Result indicating success or failure */ async cleanup(): Promise<Result<void, Error>> { try { this.logger.sendLog({ data: `Cleaning up ${this.sessions.size} active session(s)`, level: "info", logger: "TransportRegistry", }); const closePromises: Promise<void>[] = []; for (const [sessionId, entry] of this.sessions.entries()) { try { // Close transport (this will trigger onsessionclosed callback) closePromises.push(entry.transport.close()); } catch (error) { const err = error instanceof Error ? error : new Error(String(error)); this.logger.sendLog({ data: `Error closing session ${sessionId}: ${err.message}`, level: "error", logger: "TransportRegistry", }); } } // Wait for all transports to close await Promise.all(closePromises); // Clear the map (should already be empty due to onsessionclosed callbacks) this.sessions.clear(); this.logger.sendLog({ data: "All sessions cleaned up", level: "info", logger: "TransportRegistry", }); return createSuccessResult(undefined); } catch (error) { const err = error instanceof Error ? error : new Error(String(error)); return createErrorResult( new Error(`Failed to cleanup sessions: ${err.message}`), ); } } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/8beeeaaat/touchdesigner-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server