Skip to main content
Glama
orneryd

M.I.M.I.R - Multi-agent Intelligent Memory & Insight Repository

by orneryd
sse.ts6.22 kB
/** * @fileoverview Server-Sent Events (SSE) management for real-time execution updates * * This module provides SSE functionality for streaming real-time execution progress * to connected frontend clients. Supports event broadcasting, client management, * and graceful error handling for disconnected clients. * * @module api/orchestration/sse * @since 1.0.0 */ /** * Map of execution IDs to arrays of SSE response streams * Each execution can have multiple connected clients receiving updates */ const sseClients = new Map<string, any[]>(); /** * Send Server-Sent Events (SSE) to all connected clients for a specific execution * * Broadcasts real-time execution progress events to connected frontend clients * using the SSE protocol. Handles client write failures gracefully to prevent * one failed client from affecting others. * * @param executionId - Unique identifier for the execution session * @param event - Event type name (e.g., 'task-start', 'task-complete', 'execution-complete') * @param data - Payload data to send to clients (will be JSON stringified) * * @example * // Example 1: Notify clients that task execution started * sendSSEEvent('exec-1234567890', 'task-start', { * taskId: 'task-1', * taskTitle: 'Validate Environment', * progress: 1, * total: 5 * }); * * @example * // Example 2: Send task completion with results * sendSSEEvent('exec-1234567890', 'task-complete', { * taskId: 'task-2', * status: 'success', * duration: 15000, * progress: 2, * total: 5 * }); * * @example * // Example 3: Notify all clients of execution completion * sendSSEEvent('exec-1234567890', 'execution-complete', { * executionId: 'exec-1234567890', * status: 'completed', * successful: 5, * failed: 0, * totalDuration: 120000 * }); * * @since 1.0.0 */ export function sendSSEEvent(executionId: string, event: string, data: any): void { const clients = sseClients.get(executionId) || []; const message = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`; clients.forEach((client) => { try { client.write(message); } catch (error) { console.error('Failed to send SSE event:', error); } }); } /** * Register a new SSE client for an execution * * Adds a response stream to the list of clients receiving real-time updates * for a specific execution. Multiple clients can be registered for the same * execution ID. * * @param executionId - Unique identifier for the execution session * @param responseStream - Express response object configured for SSE streaming * * @example * // Example: Register client when they connect to SSE endpoint * app.get('/api/executions/:executionId/events', (req, res) => { * res.setHeader('Content-Type', 'text/event-stream'); * res.setHeader('Cache-Control', 'no-cache'); * res.setHeader('Connection', 'keep-alive'); * * registerSSEClient(req.params.executionId, res); * * req.on('close', () => { * unregisterSSEClient(req.params.executionId, res); * }); * }); * * @since 1.0.0 */ export function registerSSEClient(executionId: string, responseStream: any): void { const clients = sseClients.get(executionId) || []; clients.push(responseStream); sseClients.set(executionId, clients); console.log(`📡 SSE client registered for execution: ${executionId} (${clients.length} total clients)`); } /** * Unregister an SSE client from an execution * * Removes a response stream from the list of clients for an execution. * Automatically cleans up empty client lists to prevent memory leaks. * * @param executionId - Unique identifier for the execution session * @param responseStream - Express response object to remove * * @example * // Example: Unregister client when they disconnect * req.on('close', () => { * unregisterSSEClient(executionId, res); * console.log('Client disconnected from SSE stream'); * }); * * @since 1.0.0 */ export function unregisterSSEClient(executionId: string, responseStream: any): void { const clients = sseClients.get(executionId) || []; const filteredClients = clients.filter(client => client !== responseStream); if (filteredClients.length === 0) { sseClients.delete(executionId); console.log(`📡 All SSE clients disconnected for execution: ${executionId}`); } else { sseClients.set(executionId, filteredClients); console.log(`📡 SSE client unregistered for execution: ${executionId} (${filteredClients.length} remaining)`); } } /** * Get count of connected SSE clients for an execution * * @param executionId - Unique identifier for the execution session * @returns Number of active SSE client connections * * @example * // Example: Check if any clients are listening * const clientCount = getSSEClientCount('exec-1234567890'); * if (clientCount > 0) { * sendSSEEvent('exec-1234567890', 'status-update', { message: 'Processing...' }); * } * * @since 1.0.0 */ export function getSSEClientCount(executionId: string): number { return (sseClients.get(executionId) || []).length; } /** * Close all SSE connections for an execution * * Ends all active SSE streams for an execution and removes them from the registry. * Use this when an execution completes to clean up resources and notify clients. * * @param executionId - Unique identifier for the execution session * * @example * // Example: Close all connections when execution completes * try { * sendSSEEvent(executionId, 'execution-complete', { status: 'completed' }); * await new Promise(resolve => setTimeout(resolve, 100)); // Let final events flush * closeSSEConnections(executionId); * } catch (error) { * console.error('Error closing SSE connections:', error); * } * * @since 1.0.0 */ export function closeSSEConnections(executionId: string): void { const clients = sseClients.get(executionId) || []; clients.forEach(client => { try { client.end(); } catch (error) { console.error(`Failed to close SSE client for ${executionId}:`, error); } }); sseClients.delete(executionId); console.log(`📡 Closed ${clients.length} SSE connections for execution: ${executionId}`); }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/orneryd/Mimir'

If you have feedback or need assistance with the MCP directory API, please join our Discord server