Skip to main content
Glama
sse.ts3.49 kB
import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js"; import express, { Router } from "express"; import { Logger } from "winston"; import { env } from "../env.js"; import { Services } from "../services/services.js"; import { getServer, respondNoValidSessionId, respondTransportMismatch, } from "./shared.js"; import { loggableError } from "@mcpx/toolkit-core/logging"; import { extractMetadata, logMetadataWarnings } from "./metadata.js"; import { scheduleProbeTransportTermination, setupPingMonitoring, } from "./liveness.js"; export function buildSSERouter( authGuard: express.RequestHandler, services: Services, logger: Logger, ): Router { const router = Router(); router.get("/sse", authGuard, async (req, res) => { const transport = new SSEServerTransport("/messages", res); const sessionId = transport.sessionId; const metadata = extractMetadata(req.headers, req.body); logMetadataWarnings(metadata, sessionId, logger); services.sessions.addSession(sessionId, { transport: { type: "sse", transport: transport }, metadata, consumerConfig: undefined, }); logger.debug("SSE connection established", { sessionId, sessionCount: Object.keys(services.sessions).length, }); const server = await getServer(services, logger, metadata.isProbe); await server.connect(transport); const stopPing = setupPingMonitoring( server, transport, sessionId, metadata, { pingIntervalMs: env.PING_INTERVAL_MS, maxMissedPings: env.MAX_MISSED_PINGS, }, logger, ); if (metadata.isProbe) { const opt = { probeClientsGraceLivenessPeriodMs: env.PROBE_CLIENTS_GRACE_LIVENESS_PERIOD_MS, }; scheduleProbeTransportTermination( services, server, transport, opt, stopPing, ); logger.info( "Initialized empty server for probe client transport, will be terminated shortly", { sessionId, metadata, ...opt }, ); } transport.onerror = (error: Error): void => { logger.error("Session transport error", { sessionId, error, metadata }); transport.close().catch(() => { // Ignore errors on close }); stopPing(); }; res.on("close", async () => { await server.close(); await transport.close(); stopPing(); services.sessions.removeSession(sessionId); logger.debug("SSE connection closed", { sessionId }); }); res.on("error", async (e) => { const error = loggableError(e); logger.error("SSE connection error, terminating", { sessionId, error }); await server.close(); await transport.close(); stopPing(); services.sessions.removeSession(sessionId); }); }); router.post("/messages", async (req, res) => { const sessionId = req.query["sessionId"] as string; const session = services.sessions.getSession(sessionId); if (session) { switch (session.transport.type) { case "sse": await session.transport.transport.handlePostMessage( req, res, req.body, ); break; case "streamableHttp": respondTransportMismatch(res); break; } } else { logger.warn("No session found for POST /messages", { sessionId }); respondNoValidSessionId(res); } }); return router; }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/TheLunarCompany/lunar'

If you have feedback or need assistance with the MCP directory API, please join our Discord server