Skip to main content
Glama
connection-manager.tsβ€’12.2 kB
import { WebSocket } from 'ws'; import { v4 as uuidv4 } from 'uuid'; import { logger } from '@core/logger/index.js'; import { WebSocketConnection, WebSocketResponse, WebSocketConnectionStats, WebSocketEventType, } from './types.js'; export class WebSocketConnectionManager { private connections = new Map<string, WebSocketConnection>(); private sessionConnections = new Map<string, Set<string>>(); private connectionSessions = new Map<string, string>(); private stats = { totalMessagesReceived: 0, totalMessagesSent: 0, connectionsCreated: 0, }; private eventSubscriber?: any; // We'll set this after initialization constructor( private maxConnections: number = 1000, private connectionTimeout: number = 300000 // 5 minutes ) { // Start connection cleanup interval setInterval(() => { this.cleanupStaleConnections(); }, 60000); // Check every minute } /** * Set the event subscriber to notify when sessions are bound */ setEventSubscriber(eventSubscriber: any): void { this.eventSubscriber = eventSubscriber; } /** * Add a new WebSocket connection */ addConnection(ws: WebSocket, sessionId?: string): string { if (this.connections.size >= this.maxConnections) { logger.warn('WebSocket connection limit reached', { currentConnections: this.connections.size, maxConnections: this.maxConnections, }); ws.close(1013, 'Server overloaded'); throw new Error('Maximum connections exceeded'); } const connectionId = uuidv4(); const now = Date.now(); const connection: WebSocketConnection = { id: connectionId, ws, sessionId: sessionId || undefined, subscribedEvents: new Set(), connectedAt: now, lastActivity: now, }; this.connections.set(connectionId, connection); this.connectionSessions.set(connectionId, sessionId || ''); this.stats.connectionsCreated++; if (sessionId) { this.bindToSession(connectionId, sessionId); } // Set up WebSocket event handlers this.setupConnectionHandlers(connection); logger.info('WebSocket connection added', { connectionId, sessionId, totalConnections: this.connections.size, }); return connectionId; } /** * Remove a WebSocket connection */ removeConnection(connectionId: string): void { const connection = this.connections.get(connectionId); if (!connection) { return; } // Remove from session bindings if (connection.sessionId) { const sessionConnections = this.sessionConnections.get(connection.sessionId); if (sessionConnections) { sessionConnections.delete(connectionId); if (sessionConnections.size === 0) { this.sessionConnections.delete(connection.sessionId); } } } // Clean up mappings this.connections.delete(connectionId); this.connectionSessions.delete(connectionId); // Close WebSocket if still open if (connection.ws.readyState === WebSocket.OPEN) { connection.ws.close(); } logger.info('WebSocket connection removed', { connectionId, sessionId: connection.sessionId, totalConnections: this.connections.size, }); } /** * Bind a connection to a session */ bindToSession(connectionId: string, sessionId: string): void { const connection = this.connections.get(connectionId); if (!connection) { logger.warn('Attempted to bind non-existent connection to session', { connectionId, sessionId, }); return; } // Update connection connection.sessionId = sessionId; this.connectionSessions.set(connectionId, sessionId); // Add to session mapping if (!this.sessionConnections.has(sessionId)) { this.sessionConnections.set(sessionId, new Set()); } this.sessionConnections.get(sessionId)!.add(connectionId); // Send connection update to notify client of session binding this.sendConnectionUpdate(connectionId); // Notify event subscriber to start listening to this session's events if (this.eventSubscriber) { this.eventSubscriber.subscribeToSession(sessionId); } logger.debug('Connection bound to session', { connectionId, sessionId, }); } /** * Get the session ID for a connection */ getConnectionSessionId(connectionId: string): string | undefined { const connection = this.connections.get(connectionId); return connection?.sessionId; } /** * Send updated connection info to a specific connection */ sendConnectionUpdate(connectionId: string): void { const connection = this.connections.get(connectionId); if (!connection || connection.ws.readyState !== WebSocket.OPEN) { return; } const updateMessage = { event: 'connectionUpdated', data: { connectionId, sessionId: connection.sessionId, timestamp: Date.now(), }, timestamp: Date.now(), }; try { connection.ws.send(JSON.stringify(updateMessage)); logger.debug('Sent connection update', { connectionId, sessionId: connection.sessionId, }); } catch (error) { logger.error('Failed to send connection update', { connectionId, error: error instanceof Error ? error.message : String(error), }); } } /** * Subscribe a connection to specific event types */ subscribeToEvents(connectionId: string, eventTypes: WebSocketEventType[]): void { const connection = this.connections.get(connectionId); if (!connection) { logger.warn('Attempted to subscribe non-existent connection to events', { connectionId, eventTypes, }); return; } eventTypes.forEach(eventType => { connection.subscribedEvents!.add(eventType); }); logger.debug('Connection subscribed to events', { connectionId, eventTypes, totalSubscriptions: connection.subscribedEvents!.size, }); } /** * Unsubscribe a connection from specific event types */ unsubscribeFromEvents(connectionId: string, eventTypes: WebSocketEventType[]): void { const connection = this.connections.get(connectionId); if (!connection) { return; } eventTypes.forEach(eventType => { connection.subscribedEvents!.delete(eventType); }); logger.debug('Connection unsubscribed from events', { connectionId, eventTypes, totalSubscriptions: connection.subscribedEvents!.size, }); } /** * Broadcast message to all connections in a session */ broadcastToSession(sessionId: string, message: WebSocketResponse): void { const connectionIds = this.sessionConnections.get(sessionId); if (!connectionIds || connectionIds.size === 0) { return; } for (const connectionId of connectionIds) { this.sendToConnection(connectionId, message); } } /** * Broadcast message to all active connections */ broadcastToAll(message: WebSocketResponse): void { let sentCount = 0; for (const connectionId of this.connections.keys()) { if (this.sendToConnection(connectionId, message)) { sentCount++; } } logger.debug('Message broadcast to all connections', { totalConnections: this.connections.size, sentCount, event: message.event, }); } /** * Broadcast message to connections subscribed to specific event type */ broadcastToSubscribers(eventType: WebSocketEventType, message: WebSocketResponse): void { let sentCount = 0; for (const connection of this.connections.values()) { // If no subscriptions, send all events (default behavior) // If has subscriptions, only send subscribed events const shouldSend = connection.subscribedEvents!.size === 0 || connection.subscribedEvents!.has(eventType); if (shouldSend && this.sendToConnection(connection.id, message)) { sentCount++; } } logger.debug('Message broadcast to subscribers', { eventType, totalConnections: this.connections.size, sentCount, event: message.event, }); } /** * Send message to a specific connection */ private sendToConnection(connectionId: string, message: WebSocketResponse): boolean { const connection = this.connections.get(connectionId); if (!connection || connection.ws.readyState !== WebSocket.OPEN) { return false; } try { const messageStr = JSON.stringify(message); connection.ws.send(messageStr); connection.lastActivity = Date.now(); this.stats.totalMessagesSent++; return true; } catch (error: any) { logger.error('Failed to send WebSocket message', { connectionId, error: error.message, }); return false; } } /** * Get connection statistics */ getStats(): WebSocketConnectionStats { const now = Date.now(); let totalDuration = 0; let activeConnections = 0; const activeSessions = new Set<string>(); for (const connection of this.connections.values()) { if (connection.ws.readyState === WebSocket.OPEN) { activeConnections++; totalDuration += now - connection.connectedAt; if (connection.sessionId) { activeSessions.add(connection.sessionId); } } } const averageConnectionDuration = activeConnections > 0 ? totalDuration / activeConnections : 0; return { totalConnections: this.stats.connectionsCreated, activeConnections, totalSessions: this.sessionConnections.size, activeSessions: activeSessions.size, totalMessagesReceived: this.stats.totalMessagesReceived, totalMessagesSent: this.stats.totalMessagesSent, averageConnectionDuration, }; } /** * Get all active session IDs */ getActiveSessions(): string[] { return Array.from(this.sessionConnections.keys()).filter(sessionId => { const connections = this.sessionConnections.get(sessionId); return connections && connections.size > 0; }); } /** * Check if a session has active connections */ hasActiveConnections(sessionId: string): boolean { const connections = this.sessionConnections.get(sessionId); if (!connections) return false; for (const connectionId of connections) { const connection = this.connections.get(connectionId); if (connection && connection.ws.readyState === WebSocket.OPEN) { return true; } } return false; } /** * Set up WebSocket event handlers for a connection */ private setupConnectionHandlers(connection: WebSocketConnection): void { connection.ws.on('close', () => { this.removeConnection(connection.id); }); connection.ws.on('error', error => { logger.error('WebSocket connection error', { connectionId: connection.id, error: error.message, }); this.removeConnection(connection.id); }); connection.ws.on('pong', () => { connection.lastActivity = Date.now(); }); } /** * Clean up stale connections */ private cleanupStaleConnections(): void { const now = Date.now(); const staleConnections: string[] = []; for (const [connectionId, connection] of this.connections) { // Check if connection is stale const isStale = now - connection.lastActivity > this.connectionTimeout || connection.ws.readyState !== WebSocket.OPEN; if (isStale) { staleConnections.push(connectionId); } } if (staleConnections.length > 0) { logger.info('Cleaning up stale WebSocket connections', { staleCount: staleConnections.length, totalConnections: this.connections.size, }); staleConnections.forEach(connectionId => { this.removeConnection(connectionId); }); } } /** * Send heartbeat pings to all connections */ sendHeartbeat(): void { for (const connection of this.connections.values()) { if (connection.ws.readyState === WebSocket.OPEN) { try { connection.ws.ping(); } catch (error) { logger.warn('Failed to send heartbeat ping', { connectionId: connection.id, error: error instanceof Error ? error.message : String(error), }); this.removeConnection(connection.id); } } } } /** * Record incoming message for stats */ recordIncomingMessage(): void { this.stats.totalMessagesReceived++; } /** * Dispose of the connection manager */ dispose(): void { // Close all connections for (const connection of this.connections.values()) { if (connection.ws.readyState === WebSocket.OPEN) { connection.ws.close(); } } // Clear all maps this.connections.clear(); this.sessionConnections.clear(); this.connectionSessions.clear(); logger.info('WebSocket connection manager disposed'); } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/campfirein/cipher'

If you have feedback or need assistance with the MCP directory API, please join our Discord server