Skip to main content
Glama

MCP Firebird

streamable-http.ts15.1 kB
/** * Streamable HTTP transport implementation for MCP Firebird * Implements the modern MCP protocol (2025-03-26) with session management * and backwards compatibility support */ import express from 'express'; import { randomUUID } from 'node:crypto'; import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'; import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js'; import { isInitializeRequest } from '@modelcontextprotocol/sdk/types.js'; import { createLogger } from '../utils/logger.js'; const logger = createLogger('server:streamable-http'); interface SessionInfo { transport: StreamableHTTPServerTransport; server: McpServer; createdAt: Date; lastActivity: Date; } /** * Creates a Streamable HTTP router for modern MCP clients * @param createServerInstance Function to create a new McpServer instance * @returns Express router configured for Streamable HTTP */ export function createStreamableHttpRouter(createServerInstance: () => Promise<McpServer>): express.Router { const router = express.Router(); // Session storage for stateful mode const activeSessions: Record<string, SessionInfo> = {}; // Configuration const SESSION_TIMEOUT_MS = parseInt(process.env.STREAMABLE_SESSION_TIMEOUT_MS || '1800000', 10); // 30 minutes const CLEANUP_INTERVAL_MS = 60000; // 1 minute const STATELESS_MODE = process.env.STREAMABLE_STATELESS_MODE === 'true'; logger.info(`Streamable HTTP router initialized in ${STATELESS_MODE ? 'stateless' : 'stateful'} mode`); // Periodic cleanup of expired sessions (only in stateful mode) let cleanupInterval: NodeJS.Timeout | null = null; if (!STATELESS_MODE) { cleanupInterval = setInterval(() => { const now = new Date(); const expiredSessions = Object.entries(activeSessions) .filter(([_, info]) => now.getTime() - info.lastActivity.getTime() > SESSION_TIMEOUT_MS); for (const [sessionId, info] of expiredSessions) { logger.info(`Cleaning up expired session: ${sessionId}`); try { info.transport.close(); info.server.close(); } catch (error) { logger.warn(`Error closing expired session ${sessionId}:`, { error }); } delete activeSessions[sessionId]; } if (expiredSessions.length > 0) { logger.info(`Cleaned up ${expiredSessions.length} expired sessions`); } }, CLEANUP_INTERVAL_MS); } // Health check endpoint router.get('/health', (req, res) => { res.json({ status: 'healthy', mode: STATELESS_MODE ? 'stateless' : 'stateful', activeSessions: STATELESS_MODE ? 'N/A' : Object.keys(activeSessions).length, uptime: process.uptime() }); }); // Main MCP endpoint - handles POST requests for client-to-server communication router.post('/mcp', async (req, res) => { logger.debug('Received POST request to /mcp'); try { if (STATELESS_MODE) { // Stateless mode: create new instances for each request await handleStatelessRequest(req, res, createServerInstance); } else { // Stateful mode: manage sessions await handleStatefulRequest(req, res, createServerInstance, activeSessions); } } catch (error) { logger.error('Error handling MCP request:', { error }); if (!res.headersSent) { res.status(500).json({ jsonrpc: '2.0', error: { code: -32603, message: 'Internal server error', }, id: null, }); } } }); // Handle GET requests for server-to-client notifications via SSE (stateful mode only) router.get('/mcp', async (req, res) => { if (STATELESS_MODE) { logger.warn('GET request to /mcp in stateless mode'); res.status(405).json({ jsonrpc: "2.0", error: { code: -32000, message: "Method not allowed in stateless mode." }, id: null }); return; } const sessionId = req.headers['mcp-session-id'] as string | undefined; if (!sessionId || !activeSessions[sessionId]) { res.status(400).json({ jsonrpc: "2.0", error: { code: -32602, message: "Invalid or missing session ID" }, id: null }); return; } const sessionInfo = activeSessions[sessionId]; sessionInfo.lastActivity = new Date(); try { await sessionInfo.transport.handleRequest(req, res); } catch (error) { logger.error(`Error handling GET request for session ${sessionId}:`, { error }); if (!res.headersSent) { res.status(500).json({ jsonrpc: "2.0", error: { code: -32603, message: "Internal server error" }, id: null }); } } }); // Main MCP endpoint - handles POST requests for client-to-server communication router.post('/mcp', async (req, res) => { logger.debug('Received POST request to /mcp'); try { if (STATELESS_MODE) { // Stateless mode: create new instances for each request await handleStatelessRequest(req, res, createServerInstance); } else { // Stateful mode: manage sessions await handleStatefulRequest(req, res, createServerInstance, activeSessions); } } catch (error) { logger.error('Error handling MCP request:', { error }); if (!res.headersSent) { res.status(500).json({ jsonrpc: '2.0', error: { code: -32603, message: 'Internal server error', }, id: null, }); } } }); // Handle GET requests for server-to-client notifications via SSE (stateful mode only) router.get('/mcp', async (req, res) => { if (STATELESS_MODE) { logger.warn('GET request to /mcp in stateless mode'); res.status(405).json({ jsonrpc: "2.0", error: { code: -32000, message: "Method not allowed in stateless mode." }, id: null }); return; } const sessionId = req.headers['mcp-session-id'] as string | undefined; if (!sessionId || !activeSessions[sessionId]) { res.status(400).json({ jsonrpc: "2.0", error: { code: -32602, message: "Invalid or missing session ID" }, id: null }); return; } const sessionInfo = activeSessions[sessionId]; sessionInfo.lastActivity = new Date(); try { await sessionInfo.transport.handleRequest(req, res); } catch (error) { logger.error(`Error handling GET request for session ${sessionId}:`, { error }); if (!res.headersSent) { res.status(500).json({ jsonrpc: "2.0", error: { code: -32603, message: "Internal server error" }, id: null }); } } }); // Handle DELETE requests for session termination (stateful mode only) router.delete('/mcp', async (req, res) => { if (STATELESS_MODE) { logger.warn('DELETE request to /mcp in stateless mode'); res.status(405).json({ jsonrpc: "2.0", error: { code: -32000, message: "Method not allowed in stateless mode." }, id: null }); return; } const sessionId = req.headers['mcp-session-id'] as string | undefined; if (!sessionId || !activeSessions[sessionId]) { res.status(400).json({ jsonrpc: "2.0", error: { code: -32602, message: "Invalid or missing session ID" }, id: null }); return; } const sessionInfo = activeSessions[sessionId]; try { await sessionInfo.transport.handleRequest(req, res); // Clean up session after handling the DELETE request setTimeout(() => { if (activeSessions[sessionId]) { logger.info(`Terminating session: ${sessionId}`); try { sessionInfo.transport.close(); sessionInfo.server.close(); } catch (error) { logger.warn(`Error closing session ${sessionId}:`, { error }); } delete activeSessions[sessionId]; } }, 100); // Small delay to ensure response is sent } catch (error) { logger.error(`Error handling DELETE request for session ${sessionId}:`, { error }); if (!res.headersSent) { res.status(500).json({ jsonrpc: "2.0", error: { code: -32603, message: "Internal server error" }, id: null }); } } }); // Cleanup function for graceful shutdown (router as any).cleanup = () => { logger.info('Cleaning up Streamable HTTP router...'); if (cleanupInterval) { clearInterval(cleanupInterval); } // Close all active sessions for (const [sessionId, info] of Object.entries(activeSessions)) { try { info.transport.close(); info.server.close(); } catch (error) { logger.warn(`Error closing session ${sessionId} during cleanup:`, { error }); } } // Clear sessions Object.keys(activeSessions).forEach(key => delete activeSessions[key]); logger.info('Streamable HTTP router cleanup completed'); }; return router; } /** * Handles requests in stateless mode - creates new instances for each request */ async function handleStatelessRequest( req: express.Request, res: express.Response, createServerInstance: () => Promise<McpServer> ) { logger.debug('Handling request in stateless mode'); const server = await createServerInstance(); const transport = new StreamableHTTPServerTransport({ sessionIdGenerator: undefined, // Disable session management }); // Clean up on response close res.on('close', () => { logger.debug('Request closed, cleaning up stateless resources'); try { transport.close(); server.close(); } catch (error) { logger.warn('Error cleaning up stateless resources:', { error }); } }); try { await server.connect(transport); await transport.handleRequest(req, res, req.body); } catch (error) { logger.error('Error in stateless request handling:', { error }); throw error; } } /** * Handles requests in stateful mode - manages sessions */ async function handleStatefulRequest( req: express.Request, res: express.Response, createServerInstance: () => Promise<McpServer>, activeSessions: Record<string, SessionInfo> ) { const sessionId = req.headers['mcp-session-id'] as string | undefined; let sessionInfo: SessionInfo; if (sessionId && activeSessions[sessionId]) { // Reuse existing session logger.debug(`Reusing existing session: ${sessionId}`); sessionInfo = activeSessions[sessionId]; sessionInfo.lastActivity = new Date(); } else if (!sessionId && isInitializeRequest(req.body)) { // New initialization request logger.debug('Creating new session for initialize request'); const server = await createServerInstance(); const transport = new StreamableHTTPServerTransport({ sessionIdGenerator: () => randomUUID(), onsessioninitialized: (newSessionId) => { logger.info(`Session initialized: ${newSessionId}`); // Store the session info activeSessions[newSessionId] = { transport, server, createdAt: new Date(), lastActivity: new Date() }; } }); // Clean up transport when closed transport.onclose = () => { if (transport.sessionId && activeSessions[transport.sessionId]) { logger.info(`Transport closed for session: ${transport.sessionId}`); try { activeSessions[transport.sessionId].server.close(); } catch (error) { logger.warn(`Error closing server for session ${transport.sessionId}:`, { error }); } delete activeSessions[transport.sessionId]; } }; sessionInfo = { transport, server, createdAt: new Date(), lastActivity: new Date() }; try { await server.connect(transport); } catch (error) { logger.error('Error connecting server to transport:', { error }); throw error; } } else { // Invalid request logger.warn('Invalid request: no valid session ID or initialize request'); res.status(400).json({ jsonrpc: '2.0', error: { code: -32602, message: 'Bad Request: No valid session ID provided or not an initialize request', }, id: null, }); return; } // Handle the request try { await sessionInfo.transport.handleRequest(req, res, req.body); } catch (error) { logger.error('Error handling stateful request:', { error }); throw error; } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/PuroDelphi/mcpFirebird'

If you have feedback or need assistance with the MCP directory API, please join our Discord server