Skip to main content
Glama
jmandel

Smart EHR MCP Server

by jmandel
SseConnectionManager.ts11.5 kB
import type * as express from 'express'; import type * as A2ATypes from '../types'; // For event types import type { NotificationService, CoreTaskEvent } from '../interfaces'; // Import interface and event type import { EventEmitter } from 'events'; // Define the structure for storing SSE subscription info interface SseSubscriptionInfo { res: express.Response; requestId: string | number | null; intervalId: NodeJS.Timeout; } /** * Manages Server-Sent Event (SSE) connections for A2A tasks. * Implements NotificationService to integrate with A2AServerCore event emission. */ export class SseConnectionManager implements NotificationService { // <-- Implement interface private readonly subscriptions: Map<string, SseSubscriptionInfo[]> = new Map(); /** * Adds a new SSE subscription for a given task ID. * Sets up keep-alive messages and handles connection closure. */ addSubscription(taskId: string, requestId: string | number | null, res: express.Response & EventEmitter): void { let taskSubscriptions = this.subscriptions.get(taskId); if (!taskSubscriptions) { taskSubscriptions = []; this.subscriptions.set(taskId, taskSubscriptions); } if (taskSubscriptions.some(sub => sub.res === res)) { console.warn(`[SseManager] Attempted to add duplicate SSE subscription for task ${taskId}.`); return; } const keepAliveInterval = setInterval(() => { if (!res.closed) { this.sendKeepAliveComment(res); } else { console.warn(`[SseManager] Keep-alive detected closed connection for task ${taskId}. Clearing interval.`); clearInterval(keepAliveInterval); this.removeSubscription(taskId, res); // Attempt cleanup } }, 30000); // Send comment every 30s const newSubscription: SseSubscriptionInfo = { res, requestId, intervalId: keepAliveInterval }; taskSubscriptions.push(newSubscription); console.log(`[SseManager] Added SSE subscription for task ${taskId}. Total: ${taskSubscriptions.length}`); let closeListenerExecuted = false; // Flag to ensure listener runs only once const closeListener = () => { if (closeListenerExecuted) return; closeListenerExecuted = true; console.log(`[SseManager] SSE connection closed by client for task ${taskId}.`); clearInterval(keepAliveInterval); this.removeSubscription(taskId, res); // No need to explicitly remove the listener here if removeSubscription handles it, // but good practice if res might persist. // res.removeListener('close', closeListener); }; res.on('close', closeListener); } /** * Removes an SSE subscription for a given task ID and response object. * Clears the keep-alive interval associated with the subscription. */ removeSubscription(taskId: string, resToRemove: express.Response & EventEmitter): void { const taskSubscriptions = this.subscriptions.get(taskId); if (!taskSubscriptions) return; const index = taskSubscriptions.findIndex(sub => sub.res === resToRemove); if (index !== -1) { const [removedSubscription] = taskSubscriptions.splice(index, 1); clearInterval(removedSubscription.intervalId); // Try removing the 'close' listener we added, though the emitter might be gone try { resToRemove.removeListener('close', () => {}); // Pass a dummy or find the real one if stored } catch (e) { /* ignore error if listener removal fails */ } console.log(`[SseManager] Removed SSE subscription for task ${taskId}. Remaining: ${taskSubscriptions.length}`); if (taskSubscriptions.length === 0) { this.subscriptions.delete(taskId); console.log(`[SseManager] No more SSE subscriptions for task ${taskId}.`); } } else { // console.warn(`[SseManager] Attempted to remove non-existent subscription for task ${taskId}.`); } } /** * Sends a keep-alive comment to a specific response stream. */ private sendKeepAliveComment(res: express.Response): void { if (res.closed) return; // Check again just before writing try { res.write(':keep-alive\n\n'); } catch (error) { console.error(`[SseManager] Failed to write keep-alive comment:`, error); // Don't necessarily remove subscription for failed keep-alive, maybe transient issue } } /** * Sends an SSE data line to a specific response stream. * Assumes the data provided is a fully formatted string ready to be written. * Returns true if the event was written successfully, false otherwise. */ sendSseDataString(res: express.Response, dataLine: string): boolean { if (res.closed) { console.warn(`[SseManager] Attempted to send SSE data string to closed connection.`); return false; } try { // Only write the data line res.write(dataLine); // console.log(`[SseManager] Sent SSE data string.`); // Debug logging return true; } catch (error) { console.error(`[SseManager] Failed to write SSE data string:`, error); return false; } } /** * Broadcasts an SSE event data payload to all active subscribers for a specific task ID. * Formats the event according to JSON-RPC 2.0 spec before sending. * If the event marks the end of the stream (`final: true`), closes the connections after sending. */ broadcast(taskId: string, _eventType: string, data: any & { final?: boolean }): void { // Check for final flag in data const taskSubscriptions = this.subscriptions.get(taskId); if (!taskSubscriptions || taskSubscriptions.length === 0) { return; } const logEventType = 'status' in data ? 'TaskStatusUpdate' : 'artifact' in data ? 'TaskArtifactUpdate' : 'Unknown'; const isFinalEvent = data.final === true; console.log(`[SseManager] Broadcasting ${logEventType} data (final: ${isFinalEvent}) to ${taskSubscriptions.length} subscribers for task ${taskId}.`); const subsToClose: SseSubscriptionInfo[] = []; // Iterate over a copy const activeSubs = [...taskSubscriptions]; activeSubs.forEach(subInfo => { const jsonRpcPayload = { jsonrpc: "2.0", id: subInfo.requestId, // Use the stored request ID result: data // The actual event data (TaskStatusUpdateEvent or TaskArtifactUpdateEvent) }; const dataLine = `data: ${JSON.stringify(jsonRpcPayload)}\n\n`; const success = this.sendSseDataString(subInfo.res, dataLine); console.log(`[SseManager] Sent SSE data to subscriber for task ${taskId}. Data: ${dataLine}. Success: ${success}`); if (!success || isFinalEvent) { // If sending failed OR if this is the final event, prepare to close/remove. if (!success) { console.warn(`[SseManager] Failed sending SSE data to subscriber for task ${taskId}. Marking for removal.`); } // Add to list even if send was successful, if it's the final event. subsToClose.push(subInfo); } }); // Clean up connections that failed or received the final event subsToClose.forEach(subInfo => { try { // Ensure interval is cleared before potentially closing clearInterval(subInfo.intervalId); if (!subInfo.res.closed) { console.log(`[SseManager] Closing SSE connection for task ${taskId} (reqId: ${subInfo.requestId}) after final event or send failure.`); subInfo.res.end(); // Close the connection } } catch (closeError) { console.error(`[SseManager] Error ending response stream for task ${taskId}:`, closeError); } // Remove subscription regardless of close error this.removeSubscription(taskId, subInfo.res); }); } /** * Checks if there are any active subscriptions for a given task ID. */ hasSubscriptions(taskId: string): boolean { const taskSubscriptions = this.subscriptions.get(taskId); return !!taskSubscriptions && taskSubscriptions.length > 0; } /** * Handles task events emitted by the core and broadcasts them via SSE. */ async notify(event: CoreTaskEvent): Promise<void> { let eventType: string | null = null; let eventData: any = null; // Determine SSE event type based on CoreTaskEvent structure if ('status' in event) { // TaskStatusUpdateEvent eventType = 'TaskStatusUpdate'; eventData = event; // Pass the whole event object } else if ('artifact' in event) { // TaskArtifactUpdateEvent eventType = 'TaskArtifactUpdate'; eventData = event; // Pass the whole event object } if (eventType && eventData) { if (this.hasSubscriptions(event.id)) { console.log(`[SseManager] Received notify event for task ${event.id}, broadcasting '${eventType}'.`); console.log(`[SseManager] Event data:`, eventData); // Use the existing broadcast logic this.broadcast(event.id, eventType, eventData); } else { // console.log(`[SseManager] Received notify event for task ${event.id}, but no active SSE subscriptions.`); } } else { console.warn(`[SseManager] Received unknown event type in notify:`, event); } // Return void promise as notification is fire-and-forget from core's perspective return Promise.resolve(); } /** * Forcefully closes all active SSE connections managed by this manager. * Implements optional closeAll from NotificationService. */ // Add async/Promise<void> to match interface, though implementation is synchronous async closeAll(): Promise<void> { console.log(`[SseManager] Closing all active SSE connections (${this.subscriptions.size} tasks)...`); let closedCount = 0; this.subscriptions.forEach((subscriptions, taskId) => { console.log(`[SseManager] Closing ${subscriptions.length} SSE connection(s) for task ${taskId}`); subscriptions.forEach(subInfo => { try { clearInterval(subInfo.intervalId); if (!subInfo.res.closed) { subInfo.res.end(); closedCount++; } } catch (err) { console.error(`[SseManager] Error closing SSE connection for task ${taskId}:`, err); } }); }); this.subscriptions.clear(); console.log(`[SseManager] Finished closing SSE connections. ${closedCount} streams ended.`); return Promise.resolve(); // Fulfill the promise } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jmandel/health-record-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server