Skip to main content
Glama

OPNSense MCP Server

SSETransportServer.ts6.83 kB
import { createServer, Server, IncomingMessage, ServerResponse } from "http"; import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js"; import { Transport } from "@modelcontextprotocol/sdk/shared/transport.js"; import { TransportOptions } from "./types.js"; interface SSEConnection { transport: SSEServerTransport; sessionId: string; } export class SSETransportServer { private server: Server | null = null; private port: number; private host: string; private corsOrigin: string; private connections: Map<string, SSEConnection> = new Map(); private onConnectionCallback: | ((transport: Transport) => Promise<void>) | null = null; constructor(options: TransportOptions = {}) { this.port = options.port || 3000; this.host = options.host || "0.0.0.0"; this.corsOrigin = options.corsOrigin || "*"; } async start(): Promise<void> { return new Promise((resolve, reject) => { this.server = createServer(async (req, res) => { // Set CORS headers res.setHeader("Access-Control-Allow-Origin", this.corsOrigin); res.setHeader("Access-Control-Allow-Methods", "GET, POST, OPTIONS"); res.setHeader("Access-Control-Allow-Headers", "Content-Type"); // Handle preflight if (req.method === "OPTIONS") { res.writeHead(204); res.end(); return; } const url = new URL(req.url || "/", `http://${req.headers.host}`); // Handle SSE connection establishment if (req.method === "GET" && url.pathname === "/sse") { await this.handleSSEConnection(req, res); } // Handle message posts else if (req.method === "POST" && url.pathname === "/messages") { await this.handlePostMessage(req, res); } // Health check endpoint else if (req.method === "GET" && url.pathname === "/health") { res.writeHead(200, { "Content-Type": "application/json" }); res.end( JSON.stringify({ status: "healthy", transport: "sse", port: this.port, connections: this.connections.size, }), ); } else { res.writeHead(404); res.end("Not Found"); } }); this.server.on("error", reject); this.server.listen(this.port, this.host, () => { console.log( `SSE MCP Server listening on http://${this.host}:${this.port}`, ); console.log(`- SSE endpoint: http://${this.host}:${this.port}/sse`); console.log( `- Messages endpoint: http://${this.host}:${this.port}/messages`, ); console.log(`- Health check: http://${this.host}:${this.port}/health`); resolve(); }); }); } private async handleSSEConnection( req: IncomingMessage, res: ServerResponse, ): Promise<void> { try { // Create a new transport for this connection const transport = new SSEServerTransport("/messages", res); // Don't call transport.start() here - Server.connect() will do it automatically // Extract session ID from the transport (this is set by the SDK) // We'll need to track this for routing POST messages const sessionId = this.extractSessionId(transport); if (sessionId) { this.connections.set(sessionId, { transport, sessionId }); console.log(`New SSE connection established: ${sessionId}`); } // Notify that a new connection is ready // The callback should call Server.connect() which will start the transport if (this.onConnectionCallback) { await this.onConnectionCallback(transport); } // Clean up on connection close req.on("close", () => { if (sessionId) { this.connections.delete(sessionId); console.log(`SSE connection closed: ${sessionId}`); } }); } catch (error) { console.error("Error handling SSE connection:", error); // Don't write headers if SSE has already started if (!res.headersSent) { res.writeHead(500); res.end("Internal Server Error"); } } } private async handlePostMessage( req: IncomingMessage, res: ServerResponse, ): Promise<void> { try { const url = new URL(req.url || "/", `http://${req.headers.host}`); const sessionId = url.searchParams.get("sessionId"); if (!sessionId) { res.writeHead(400, { "Content-Type": "application/json" }); res.end(JSON.stringify({ error: "Missing sessionId parameter" })); return; } const connection = this.connections.get(sessionId); if (!connection) { res.writeHead(400, { "Content-Type": "application/json" }); res.end(JSON.stringify({ error: "Invalid sessionId" })); return; } // Let the transport handle the message await connection.transport.handlePostMessage(req, res); } catch (error) { console.error("Error handling POST message:", error); res.writeHead(500, { "Content-Type": "application/json" }); res.end(JSON.stringify({ error: "Internal server error" })); } } private extractSessionId(transport: SSEServerTransport): string | null { // The SSEServerTransport generates a session ID internally // We need to extract it from the transport instance // This is a bit of a hack, but necessary for routing try { // Access the private _sessionId property return (transport as any)._sessionId || null; } catch { return null; } } // This method allows the TransportManager to set up a callback // for when new connections are established onConnection(callback: (transport: Transport) => Promise<void>): void { this.onConnectionCallback = callback; } // Get the first available transport (for backward compatibility) getTransport(): Transport { const firstConnection = this.connections.values().next().value; if (!firstConnection) { throw new Error( "No SSE connections available. Waiting for client connection...", ); } return firstConnection.transport; } async stop(): Promise<void> { return new Promise((resolve) => { // Close all active connections for (const connection of this.connections.values()) { try { // The transport will close the SSE stream (connection.transport as any).close(); } catch (error) { console.error("Error closing connection:", error); } } this.connections.clear(); if (this.server) { this.server.close(() => { console.log("SSE MCP Server stopped"); resolve(); }); } else { resolve(); } }); } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/vespo92/OPNSenseMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server