Skip to main content
Glama
websocket-handler.ts14.1 kB
/** * WebSocket Handler for Live Activity * * Provides real-time cognitive activity updates via WebSocket connections. * Broadcasts cognitive activity events to connected clients including * active processes, memory operations, and reasoning events. * * Requirements: 7.1, 7.3 */ import type { Server as HttpServer } from "http"; import { WebSocket, WebSocketServer } from "ws"; import { Logger } from "../utils/logger.js"; /** * Activity event types for WebSocket broadcasts * Requirements: 7.1 */ export type ActivityEventType = | "memory_operation" | "reasoning_update" | "load_change" | "session_event" | "system_event" | "heartbeat" | "connection_established"; /** * Memory operation event data * Requirements: 7.1 */ export interface MemoryOperationData { operation: "store" | "recall" | "update" | "delete" | "search"; memoryId?: string; userId: string; sector?: string; success: boolean; duration?: number; } /** * Reasoning update event data * Requirements: 7.1 */ export interface ReasoningUpdateData { sessionId: string; stage: string; progress: number; activeStreams?: string[]; mode?: string; } /** * Cognitive load change event data * Requirements: 7.3 */ export interface LoadChangeData { /** Current cognitive load level (0-1) */ loadLevel: number; /** Active process count */ activeProcesses: number; /** Memory operations per second */ memoryOpsPerSecond: number; /** Reasoning sessions active */ reasoningSessions: number; /** Suggested visual feedback intensity */ visualIntensity: "low" | "medium" | "high"; } /** * Session event data * Requirements: 7.1 */ export interface SessionEventData { event: "created" | "updated" | "deleted"; sessionId: string; userId?: string; } /** * System event data * Requirements: 7.1 */ export interface SystemEventData { event: "startup" | "shutdown" | "health_change" | "config_change"; status?: string; message?: string; } /** * Activity event structure for WebSocket broadcasts * Requirements: 7.1 */ export interface ActivityEvent { /** Event type identifier */ type: ActivityEventType; /** ISO 8601 timestamp */ timestamp: string; /** Event-specific data */ data: | MemoryOperationData | ReasoningUpdateData | LoadChangeData | SessionEventData | SystemEventData | { message: string }; } /** * WebSocket client connection metadata */ interface WebSocketClient { /** WebSocket connection */ ws: WebSocket; /** Client identifier */ clientId: string; /** Connection timestamp */ connectedAt: Date; /** Last activity timestamp */ lastActivity: Date; /** Subscribed event types (empty = all) */ subscribedTypes: Set<ActivityEventType>; } /** * WebSocket handler configuration */ export interface WebSocketHandlerConfig { /** Path for WebSocket endpoint */ path: string; /** Heartbeat interval in milliseconds */ heartbeatIntervalMs: number; /** Client timeout in milliseconds (no activity) */ clientTimeoutMs: number; /** Maximum clients allowed */ maxClients: number; } /** * Default WebSocket handler configuration */ export const DEFAULT_WEBSOCKET_CONFIG: WebSocketHandlerConfig = { path: "/api/v1/activity/live", heartbeatIntervalMs: 30000, clientTimeoutMs: 120000, maxClients: 100, }; /** * Activity WebSocket Handler * * Manages WebSocket connections for real-time cognitive activity streaming. * Broadcasts activity events to all connected clients and handles * connection lifecycle including heartbeats and cleanup. * * Requirements: 7.1, 7.3 */ export class ActivityWebSocketHandler { private wss: WebSocketServer | null = null; private clients: Map<string, WebSocketClient> = new Map(); private config: WebSocketHandlerConfig; private heartbeatInterval: NodeJS.Timeout | null = null; private cleanupInterval: NodeJS.Timeout | null = null; private isRunning: boolean = false; constructor(config: Partial<WebSocketHandlerConfig> = {}) { this.config = { ...DEFAULT_WEBSOCKET_CONFIG, ...config }; } /** * Attach WebSocket server to HTTP server * Requirements: 7.1 * * @param server - HTTP server to attach to */ attach(server: HttpServer): void { if (this.isRunning) { throw new Error("WebSocket handler is already running"); } this.wss = new WebSocketServer({ server, path: this.config.path, }); this.setupEventHandlers(); this.startHeartbeat(); this.startCleanup(); this.isRunning = true; Logger.info(`WebSocket handler attached at ${this.config.path}`); } /** * Set up WebSocket server event handlers */ private setupEventHandlers(): void { if (!this.wss) return; this.wss.on("connection", (ws: WebSocket, req) => { this.handleConnection(ws, req); }); this.wss.on("error", (error: Error) => { Logger.error("WebSocket server error", error); }); this.wss.on("close", () => { Logger.info("WebSocket server closed"); }); } /** * Handle new WebSocket connection * Requirements: 7.1 */ private handleConnection(ws: WebSocket, _req: { url?: string }): void { // Check max clients limit if (this.clients.size >= this.config.maxClients) { Logger.warn("WebSocket connection rejected: max clients reached"); ws.close(1013, "Maximum clients reached"); return; } const clientId = this.generateClientId(); const client: WebSocketClient = { ws, clientId, connectedAt: new Date(), lastActivity: new Date(), subscribedTypes: new Set(), }; this.clients.set(clientId, client); Logger.debug(`WebSocket client connected: ${clientId}`); // Set up client event handlers ws.on("message", (data: Buffer | string) => { this.handleMessage(client, data); }); ws.on("close", (code: number, reason: Buffer) => { this.handleDisconnect(client, code, reason.toString()); }); ws.on("error", (error: Error) => { Logger.error(`WebSocket client error: ${clientId}`, error); this.removeClient(clientId); }); ws.on("pong", () => { client.lastActivity = new Date(); }); // Send connection established event this.sendToClient(client, { type: "connection_established", timestamp: new Date().toISOString(), data: { message: `Connected as ${clientId}`, }, }); } /** * Handle incoming message from client */ private handleMessage(client: WebSocketClient, data: Buffer | string): void { client.lastActivity = new Date(); try { const message = JSON.parse(data.toString()); // Handle subscription requests if (message.action === "subscribe" && Array.isArray(message.types)) { for (const type of message.types) { if (this.isValidEventType(type)) { client.subscribedTypes.add(type); } } Logger.debug( `Client ${client.clientId} subscribed to: ${[...client.subscribedTypes].join(", ")}` ); } // Handle unsubscribe requests if (message.action === "unsubscribe" && Array.isArray(message.types)) { for (const type of message.types) { client.subscribedTypes.delete(type); } } // Handle ping if (message.action === "ping") { this.sendToClient(client, { type: "heartbeat", timestamp: new Date().toISOString(), data: { message: "pong" }, }); } } catch { // Ignore invalid JSON messages Logger.debug(`Invalid message from client ${client.clientId}`); } } /** * Handle client disconnection */ private handleDisconnect(client: WebSocketClient, code: number, reason: string): void { Logger.debug( `WebSocket client disconnected: ${client.clientId} (code: ${code}, reason: ${reason})` ); this.removeClient(client.clientId); } /** * Remove a client from the connection pool */ private removeClient(clientId: string): void { const client = this.clients.get(clientId); if (client) { try { if (client.ws.readyState === WebSocket.OPEN) { client.ws.close(1000, "Connection closed"); } } catch { // Ignore errors during close } this.clients.delete(clientId); } } /** * Broadcast an activity event to all connected clients * Requirements: 7.1, 7.3 * * @param event - Activity event to broadcast */ broadcast(event: ActivityEvent): void { if (!this.isRunning || this.clients.size === 0) return; const eventData = JSON.stringify(event); for (const client of this.clients.values()) { // Check if client is subscribed to this event type if (client.subscribedTypes.size > 0 && !client.subscribedTypes.has(event.type)) { continue; } this.sendRawToClient(client, eventData); } } /** * Send an event to a specific client */ private sendToClient(client: WebSocketClient, event: ActivityEvent): void { this.sendRawToClient(client, JSON.stringify(event)); } /** * Send raw data to a specific client */ private sendRawToClient(client: WebSocketClient, data: string): void { try { if (client.ws.readyState === WebSocket.OPEN) { client.ws.send(data); } } catch (error) { Logger.error(`Failed to send to client ${client.clientId}`, error); this.removeClient(client.clientId); } } /** * Start heartbeat interval to keep connections alive */ private startHeartbeat(): void { this.heartbeatInterval = setInterval(() => { const heartbeatEvent: ActivityEvent = { type: "heartbeat", timestamp: new Date().toISOString(), data: { message: "keepalive" }, }; for (const client of this.clients.values()) { try { if (client.ws.readyState === WebSocket.OPEN) { client.ws.ping(); this.sendToClient(client, heartbeatEvent); } } catch { this.removeClient(client.clientId); } } }, this.config.heartbeatIntervalMs); } /** * Start cleanup interval to remove stale connections */ private startCleanup(): void { this.cleanupInterval = setInterval(() => { const now = Date.now(); const timeout = this.config.clientTimeoutMs; for (const [clientId, client] of this.clients) { if (now - client.lastActivity.getTime() > timeout) { Logger.debug(`Removing stale client: ${clientId}`); this.removeClient(clientId); } } }, this.config.clientTimeoutMs / 2); } /** * Generate a unique client ID */ private generateClientId(): string { return `ws-${Date.now()}-${Math.random().toString(36).substring(2, 9)}`; } /** * Check if a string is a valid event type */ private isValidEventType(type: string): type is ActivityEventType { const validTypes: ActivityEventType[] = [ "memory_operation", "reasoning_update", "load_change", "session_event", "system_event", "heartbeat", "connection_established", ]; return validTypes.includes(type as ActivityEventType); } /** * Get the number of connected clients */ getClientCount(): number { return this.clients.size; } /** * Check if the handler is running */ getIsRunning(): boolean { return this.isRunning; } /** * Get client connection info */ getClientInfo(): Array<{ clientId: string; connectedAt: Date; subscribedTypes: string[] }> { return Array.from(this.clients.values()).map((client) => ({ clientId: client.clientId, connectedAt: client.connectedAt, subscribedTypes: [...client.subscribedTypes], })); } /** * Close all connections and stop the handler * Requirements: 7.1 */ close(): void { if (!this.isRunning) return; Logger.info("Closing WebSocket handler..."); // Stop intervals if (this.heartbeatInterval) { clearInterval(this.heartbeatInterval); this.heartbeatInterval = null; } if (this.cleanupInterval) { clearInterval(this.cleanupInterval); this.cleanupInterval = null; } // Close all client connections for (const client of this.clients.values()) { try { if (client.ws.readyState === WebSocket.OPEN) { client.ws.close(1001, "Server shutting down"); } } catch { // Ignore errors during shutdown } } this.clients.clear(); // Close WebSocket server if (this.wss) { this.wss.close(); this.wss = null; } this.isRunning = false; Logger.info("WebSocket handler closed"); } } /** * Helper functions for creating activity events */ /** * Create a memory operation event * Requirements: 7.1 */ export function createMemoryOperationEvent(data: MemoryOperationData): ActivityEvent { return { type: "memory_operation", timestamp: new Date().toISOString(), data, }; } /** * Create a reasoning update event * Requirements: 7.1 */ export function createReasoningUpdateEvent(data: ReasoningUpdateData): ActivityEvent { return { type: "reasoning_update", timestamp: new Date().toISOString(), data, }; } /** * Create a cognitive load change event * Requirements: 7.3 */ export function createLoadChangeEvent(data: LoadChangeData): ActivityEvent { return { type: "load_change", timestamp: new Date().toISOString(), data, }; } /** * Create a session event * Requirements: 7.1 */ export function createSessionEvent(data: SessionEventData): ActivityEvent { return { type: "session_event", timestamp: new Date().toISOString(), data, }; } /** * Create a system event * Requirements: 7.1 */ export function createSystemEvent(data: SystemEventData): ActivityEvent { return { type: "system_event", timestamp: new Date().toISOString(), data, }; }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/keyurgolani/ThoughtMcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server