Skip to main content
Glama

mcp-nextcloud-calendar

mcp-transport.ts12.6 kB
import express from 'express'; type Request = express.Request; type Response = express.Response; type RequestHandler = express.RequestHandler; import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'; import { SSEServerTransport } from '@modelcontextprotocol/sdk/server/sse.js'; import { v4 as uuidv4 } from 'uuid'; // Import Node.js globals explicitly for ESLint import { setInterval, clearInterval } from 'node:timers'; // Import config for keep-alive settings import { loadConfig } from '../config/config.js'; /** * Model Context Protocol Transport Implementation * * This file implements both: * 1. Latest MCP Streamable HTTP transport specification (March 2025) * 2. Legacy HTTP+SSE transport for backward compatibility * * The Streamable HTTP transport: * - Uses a single endpoint (/mcp) for both SSE streams and client messages * - Provides proper session management with Mcp-Session-Id headers * - Supports GET, POST, and DELETE methods at a single endpoint * * Legacy HTTP+SSE transport (for backward compatibility): * - Uses separate endpoints for SSE streams (/sse) and messages (/messages) * - Maintains the older protocol for clients that don't support Streamable HTTP * * Implementation: * Modern Streamable HTTP: * 1. GET /mcp - Initializes an SSE stream for server-to-client communication * 2. POST /mcp - Handles client-to-server messages with session management * 3. DELETE /mcp - Terminates a session when a client is done * * Legacy HTTP+SSE: * 1. GET /sse - Establishes SSE connection * 2. POST /messages?sessionId=X - Sends messages to server */ // Store transports by session ID const transports: { [sessionId: string]: SSEServerTransport } = {}; // Global map to track keep-alive interval timers const keepAliveTimers: { [sessionId: string]: ReturnType<typeof setInterval> } = {}; // Function to clean up all resources (for testing) export function cleanupAllResources() { // Stop all keep-alive timers Object.keys(keepAliveTimers).forEach((sessionId) => { clearInterval(keepAliveTimers[sessionId]); delete keepAliveTimers[sessionId]; }); // Clear all transports Object.keys(transports).forEach((sessionId) => { delete transports[sessionId]; }); } // Get keep-alive interval from config const config = loadConfig(); const KEEP_ALIVE_INTERVAL = config.server.keepAliveInterval; /** * Starts a keep-alive pinger for a specific session * @param sessionId The session ID to track * @param res The Express response object to write keep-alive events to */ function startKeepAlivePinger(sessionId: string, res: Response): void { // Clear existing timer if there is one if (keepAliveTimers[sessionId]) { clearInterval(keepAliveTimers[sessionId]); console.log(`Cleared existing keep-alive timer for session ${sessionId}`); } // Create a new keep-alive interval keepAliveTimers[sessionId] = setInterval(() => { try { if (transports[sessionId]) { res.write('event: ping\ndata: keep-alive\n\n'); console.log(`Sent keep-alive ping for session ${sessionId}`); } else { stopKeepAlivePinger(sessionId); } } catch (error) { console.error(`Error sending keep-alive ping for session ${sessionId}:`, error); stopKeepAlivePinger(sessionId); } }, KEEP_ALIVE_INTERVAL); console.log( `Started keep-alive pinger for session ${sessionId} (interval: ${KEEP_ALIVE_INTERVAL}ms)`, ); } /** * Stops a keep-alive pinger for a specific session * @param sessionId The session ID to stop pinging */ function stopKeepAlivePinger(sessionId: string): void { if (keepAliveTimers[sessionId]) { clearInterval(keepAliveTimers[sessionId]); delete keepAliveTimers[sessionId]; console.log(`Stopped keep-alive pinger for session ${sessionId}`); } } /** * Cleans up resources when a session is closed * @param sessionId The session ID to clean up */ function cleanupSession(sessionId: string): void { console.log(`Cleaning up session ${sessionId}`); // Stop keep-alive pinger stopKeepAlivePinger(sessionId); // Remove transport if (transports[sessionId]) { delete transports[sessionId]; console.log(`Removed transport for session ${sessionId}`); } console.log(`Active sessions after cleanup: ${Object.keys(transports).length}`); } // Streamable HTTP transport handlers export function setupMcpTransport(server: McpServer) { // Unified MCP endpoint - handles both GET for SSE and POST for messages // Implements the Streamable HTTP transport (latest spec) const mcpHandler: RequestHandler = async (req: Request, res: Response): Promise<void> => { console.log( `MCP ${req.method} request received, session ID: ${req.headers['mcp-session-id'] || 'none'}`, ); console.log( `Active sessions: ${Object.keys(transports).length} [${Object.keys(transports).join(', ')}]`, ); // For DELETE requests - terminate session if it exists if (req.method === 'DELETE') { const sessionId = req.headers['mcp-session-id'] as string; if (sessionId && transports[sessionId]) { cleanupSession(sessionId); res.status(202).end(); return; } res.status(404).end(); return; } // For GET requests - initialize SSE stream if (req.method === 'GET') { console.log('Streamable HTTP GET request received (SSE stream)'); // Set appropriate headers for SSE res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Cache-Control', 'no-cache, no-transform'); res.setHeader('Connection', 'keep-alive'); try { // Create a new SSE transport for the Streamable HTTP endpoint const transport = new SSEServerTransport('/mcp', res); // Use provided session ID if exists, otherwise use transport's generated one const sessionId = (req.headers['mcp-session-id'] as string) || transport.sessionId; console.log(`Streamable HTTP SSE transport created with sessionId: ${sessionId}`); // Store transport with session ID transports[sessionId] = transport; console.log( `Active sessions: ${Object.keys(transports).length} [${Object.keys(transports).join(', ')}]`, ); // Add session ID header if not provided by client if (!req.headers['mcp-session-id']) { res.setHeader('Mcp-Session-Id', sessionId); console.log(`Added Mcp-Session-Id header: ${sessionId}`); } // Add a close event handler res.on('close', () => { console.log(`Streamable HTTP SSE connection closed for sessionId: ${sessionId}`); cleanupSession(sessionId); }); // Connect to the MCP server - this will start the transport automatically await server.connect(transport); console.log(`Streamable HTTP SSE transport connected to MCP server`); // Start keep-alive pinger startKeepAlivePinger(sessionId, res); } catch (error) { console.error('Error establishing Streamable HTTP SSE connection:', error); if (!res.headersSent) { res.status(500).end('Error establishing SSE connection'); } else { res.end(); } } return; } // For POST requests - handle client messages if (req.method === 'POST') { const sessionId = req.headers['mcp-session-id'] as string; // If we have a session ID and an associated transport, handle the message if (sessionId && transports[sessionId]) { const transport = transports[sessionId]; try { console.log(`Processing message for session ${sessionId}`); // Pass parsed body to handlePostMessage to avoid it trying to parse the body again await transport.handlePostMessage(req, res, req.body); return; } catch (error) { console.error(`Error handling message for session ${sessionId}:`, error); // Only set status if headers haven't been sent yet if (!res.headersSent) { res.status(500).json({ error: 'Error processing message', message: error instanceof Error ? error.message : String(error), }); } return; } } // If no session ID or no transport found with that ID, // this might be an initialization request if (!sessionId || !transports[sessionId]) { // Generate a new session ID if not provided const newSessionId = sessionId || uuidv4(); // If this is an initialization request, create a new transport // Note: We're handling this differently than regular messages // as there's no existing transport yet // Set the session ID header for the response res.setHeader('Mcp-Session-Id', newSessionId); // Process the initialization request // For now, we'll just accept it with a 202 status res.status(202).end(); return; } // If we reach here, session ID was provided but no transport exists res.status(404).json({ error: 'Session not found', code: -32001, message: 'No session found with the provided ID', }); return; } // Method not allowed for other HTTP methods res.status(405).end(); return; }; // Legacy SSE endpoint handler (for backward compatibility) const sseHandler: RequestHandler = async (req: Request, res: Response): Promise<void> => { console.log('Legacy SSE connection request received'); // Set appropriate headers for SSE res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Cache-Control', 'no-cache, no-transform'); res.setHeader('Connection', 'keep-alive'); try { // Create a new SSE transport with the legacy endpoint const transport = new SSEServerTransport('/messages', res); const sessionId = transport.sessionId; console.log(`Legacy SSE transport created with sessionId: ${sessionId}`); // Store the transport by session ID transports[sessionId] = transport; console.log( `Active sessions: ${Object.keys(transports).length} [${Object.keys(transports).join(', ')}]`, ); // Add a close event handler to clean up resources when the connection closes res.on('close', () => { console.log(`Legacy SSE connection closed for sessionId: ${sessionId}`); cleanupSession(sessionId); }); // Connect to the MCP server - this will start the transport automatically await server.connect(transport); console.log(`Legacy SSE transport connected to MCP server`); // Start keep-alive pinger startKeepAlivePinger(sessionId, res); } catch (error) { console.error('Error establishing SSE connection:', error); res.end(); } }; // Legacy message handling endpoint (for backward compatibility) const messageHandler: RequestHandler = async (req: Request, res: Response): Promise<void> => { const sessionId = req.query.sessionId as string; console.log(`Legacy message POST received, query sessionId: ${sessionId || 'none'}`); console.log( `Active sessions: ${Object.keys(transports).length} [${Object.keys(transports).join(', ')}]`, ); if (!sessionId) { console.error('No session ID provided in request URL'); res.status(400).json({ error: 'Missing sessionId parameter' }); return; } const transport = transports[sessionId]; if (!transport) { console.warn(`No transport found for sessionId ${sessionId}`); res.status(404).json({ error: 'No transport found for sessionId' }); return; } try { console.log(`Processing legacy message for session ${sessionId}`); // Pass parsed body to handlePostMessage to avoid it trying to parse the body again await transport.handlePostMessage(req, res, req.body); } catch (error) { console.error(`Error handling legacy message for session ${sessionId}:`, error); // Only set status if headers haven't been sent yet if (!res.headersSent) { res.status(500).json({ error: 'Error processing message', message: error instanceof Error ? error.message : String(error), }); } } }; return { mcpHandler, // Streamable HTTP handler sseHandler, // Legacy SSE handler messageHandler, // Legacy message handler transports, keepAliveTimers, // Export timers for testing and debugging }; }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Cheffromspace/mcp-nextcloud-calendar'

If you have feedback or need assistance with the MCP directory API, please join our Discord server