Skip to main content
Glama
sse.ts7.48 kB
import express, { Request, Response } from 'express'; import { Server } from '@modelcontextprotocol/sdk/server/index.js'; import { ServerTransport } from '@modelcontextprotocol/sdk/server/types.js'; import { JSONRPCRequest, JSONRPCResponse } from '@modelcontextprotocol/sdk/types.js'; import { Config } from '../config/index'; import pino from 'pino'; import { v4 as uuidv4 } from 'uuid'; const logger = pino({ level: process.env.LOG_LEVEL || 'info' }); interface SSEClient { id: string; response: Response; lastActivity: Date; } export class SSEServerTransport implements ServerTransport { private server: Server; private app: express.Application; private config: Config; private clients: Map<string, SSEClient> = new Map(); private cleanupInterval?: NodeJS.Timer; constructor(server: Server, config: Config) { this.server = server; this.config = config; this.app = express(); this.setupMiddleware(); this.setupRoutes(); this.startCleanup(); } private setupMiddleware() { this.app.use(express.json({ limit: '10mb' })); // CORS this.app.use((req, res, next) => { res.header('Access-Control-Allow-Origin', '*'); res.header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS'); res.header('Access-Control-Allow-Headers', 'Content-Type, Authorization, X-Client-ID'); if (req.method === 'OPTIONS') { return res.sendStatus(200); } next(); }); // Request logging this.app.use((req, res, next) => { logger.info({ method: req.method, path: req.path }, 'SSE request'); next(); }); } private setupRoutes() { // Health check this.app.get('/health', (req, res) => { res.json({ status: 'ok', service: 'tak-server-mcp-sse', version: '0.1.0', clients: this.clients.size }); }); // SSE endpoint this.app.get('/sse', (req, res) => { const clientId = req.headers['x-client-id'] as string || uuidv4(); // Set SSE headers res.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'X-Client-ID': clientId }); // Register client const client: SSEClient = { id: clientId, response: res, lastActivity: new Date() }; this.clients.set(clientId, client); // Send initial connection event this.sendEvent(client, 'connected', { clientId }); // Keep connection alive const keepAlive = setInterval(() => { res.write(':keepalive\n\n'); }, 30000); // Handle client disconnect req.on('close', () => { clearInterval(keepAlive); this.clients.delete(clientId); logger.info(`SSE client disconnected: ${clientId}`); }); logger.info(`SSE client connected: ${clientId}`); }); // Request endpoint this.app.post('/sse/request', async (req, res) => { const clientId = req.headers['x-client-id'] as string; if (!clientId || !this.clients.has(clientId)) { return res.status(400).json({ error: 'Invalid or missing client ID' }); } try { const request = req.body as JSONRPCRequest; const client = this.clients.get(clientId)!; client.lastActivity = new Date(); logger.debug({ method: request.method, clientId }, 'Processing SSE request'); // Process request const response = await this.handleRequest(request); // Send response via SSE this.sendEvent(client, 'response', response); // Also return in HTTP response for acknowledgment res.json({ success: true, id: request.id }); } catch (error) { logger.error('Error handling SSE request:', error); res.status(500).json({ error: error instanceof Error ? error.message : 'Internal error' }); } }); // Subscribe to specific events (for real-time TAK updates) this.app.post('/sse/subscribe', (req, res) => { const clientId = req.headers['x-client-id'] as string; if (!clientId || !this.clients.has(clientId)) { return res.status(400).json({ error: 'Invalid or missing client ID' }); } const { events } = req.body; const client = this.clients.get(clientId)!; // Store subscription preferences (would be used by TAK event handlers) (client as any).subscriptions = events; res.json({ success: true, subscribed: events }); }); } private sendEvent(client: SSEClient, event: string, data: any) { try { const message = [ `event: ${event}`, `data: ${JSON.stringify(data)}`, `id: ${Date.now()}`, '', '' ].join('\n'); client.response.write(message); } catch (error) { logger.error(`Failed to send SSE event to client ${client.id}:`, error); this.clients.delete(client.id); } } private async handleRequest(request: JSONRPCRequest): Promise<JSONRPCResponse> { return new Promise((resolve) => { const mockConnection = { send: (response: JSONRPCResponse) => { resolve(response); } }; this.server['handleRequest'](request, mockConnection); }); } // Broadcast to all clients (useful for TAK events) public broadcast(event: string, data: any) { for (const client of this.clients.values()) { this.sendEvent(client, event, data); } } // Broadcast to specific clients based on subscriptions public broadcastToSubscribers(event: string, data: any, filter?: (client: SSEClient) => boolean) { for (const client of this.clients.values()) { if (!filter || filter(client)) { this.sendEvent(client, event, data); } } } private startCleanup() { // Clean up inactive clients every 5 minutes this.cleanupInterval = setInterval(() => { const now = new Date(); const timeout = 5 * 60 * 1000; // 5 minutes for (const [id, client] of this.clients.entries()) { if (now.getTime() - client.lastActivity.getTime() > timeout) { logger.info(`Cleaning up inactive SSE client: ${id}`); try { client.response.end(); } catch (error) { // Client might already be disconnected } this.clients.delete(id); } } }, 60000); // Check every minute } async start(): Promise<void> { const port = this.config.mcp.port || 3001; return new Promise((resolve, reject) => { this.app.listen(port, () => { logger.info(`SSE transport listening on port ${port}`); resolve(); }).on('error', reject); }); } async close(): Promise<void> { if (this.cleanupInterval) { clearInterval(this.cleanupInterval); } // Close all client connections for (const client of this.clients.values()) { try { this.sendEvent(client, 'shutdown', { message: 'Server shutting down' }); client.response.end(); } catch (error) { // Ignore errors during shutdown } } this.clients.clear(); logger.info('SSE transport closed'); } } export async function createSSETransport(server: Server, config: Config): Promise<SSEServerTransport> { const transport = new SSEServerTransport(server, config); await transport.start(); return transport; }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jfuginay/tak-server-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server