Skip to main content
Glama
event-bridge.tsβ€’10.1 kB
import { EventManager } from '@core/events/event-manager.js'; import { logger } from '@core/logger/index.js'; import { WebSocketConnectionManager } from './connection-manager.js'; import { WebSocketResponse, WebSocketEventType } from './types.js'; /** * WebSocket Event Bridge - Advanced event routing and transformation * * This class provides additional event transformation and routing capabilities * beyond the basic WebSocketEventSubscriber. It's useful for custom event * handling, filtering, and transformation logic. */ export class WebSocketEventBridge { private eventFilters = new Map<WebSocketEventType, (data: any) => boolean>(); private eventTransformers = new Map<WebSocketEventType, (data: any) => any>(); private routingRules = new Map<WebSocketEventType, 'session' | 'global' | 'subscribers'>(); constructor( private connectionManager: WebSocketConnectionManager, private eventManager: EventManager ) { this.setupDefaultRoutingRules(); } /** * Set up default routing rules for different event types */ private setupDefaultRoutingRules(): void { // Session-specific events this.routingRules.set('thinking', 'session'); this.routingRules.set('chunk', 'session'); this.routingRules.set('response', 'session'); this.routingRules.set('toolCall', 'session'); this.routingRules.set('toolResult', 'session'); this.routingRules.set('conversationReset', 'session'); this.routingRules.set('memoryOperation', 'session'); this.routingRules.set('sessionCreated', 'session'); this.routingRules.set('sessionEnded', 'session'); // Global events this.routingRules.set('systemMessage', 'global'); this.routingRules.set('mcpServerConnected', 'global'); this.routingRules.set('mcpServerDisconnected', 'global'); // Subscriber-based events (filtered by subscription) this.routingRules.set('error', 'subscribers'); this.routingRules.set('availableToolsUpdated', 'subscribers'); } /** * Register an event filter */ registerEventFilter(eventType: WebSocketEventType, filter: (data: any) => boolean): void { this.eventFilters.set(eventType, filter); logger.debug('WebSocket event filter registered', { eventType }); } /** * Register an event transformer */ registerEventTransformer(eventType: WebSocketEventType, transformer: (data: any) => any): void { this.eventTransformers.set(eventType, transformer); logger.debug('WebSocket event transformer registered', { eventType }); } /** * Set routing rule for an event type */ setRoutingRule(eventType: WebSocketEventType, rule: 'session' | 'global' | 'subscribers'): void { this.routingRules.set(eventType, rule); logger.debug('WebSocket routing rule set', { eventType, rule }); } /** * Process and route an event */ processEvent(eventType: WebSocketEventType, data: any): void { try { // Apply filter if exists const filter = this.eventFilters.get(eventType); if (filter && !filter(data)) { logger.debug('Event filtered out', { eventType }); return; } // Apply transformer if exists const transformer = this.eventTransformers.get(eventType); const transformedData = transformer ? transformer(data) : data; // Create WebSocket response const response: WebSocketResponse = { event: eventType, data: transformedData, timestamp: Date.now(), }; // Add sessionId if present if ( transformedData && typeof transformedData === 'object' && 'sessionId' in transformedData ) { response.sessionId = transformedData.sessionId; } // Route based on routing rule const routingRule = this.routingRules.get(eventType) || 'subscribers'; this.routeEvent(routingRule, eventType, response); } catch (error) { logger.error('Error processing WebSocket event', { eventType, error: error instanceof Error ? error.message : String(error), }); } } /** * Route event based on routing rule */ private routeEvent( rule: 'session' | 'global' | 'subscribers', eventType: WebSocketEventType, response: WebSocketResponse ): void { switch (rule) { case 'session': if (response.sessionId) { this.connectionManager.broadcastToSession(response.sessionId, response); } else { logger.warn('Session routing requested but no sessionId provided', { eventType }); } break; case 'global': this.connectionManager.broadcastToAll(response); break; case 'subscribers': this.connectionManager.broadcastToSubscribers(eventType, response); break; default: logger.warn('Unknown routing rule', { rule, eventType }); } } /** * Setup common event filters */ setupCommonFilters(): void { // Filter out noisy debug events in production if (process.env.NODE_ENV === 'production') { this.registerEventFilter('thinking', data => { // Only show thinking for sessions with active connections return data.sessionId && this.connectionManager.hasActiveConnections(data.sessionId); }); } // Filter memory operations by success status this.registerEventFilter('memoryOperation', data => { // Only broadcast successful memory operations unless specifically requested return data.success !== false; }); // Filter tool results by importance this.registerEventFilter('toolResult', data => { // Always show failed tool executions, but successful ones only for subscribed clients return !data.success || data.important; }); logger.info('Common WebSocket event filters setup completed'); } /** * Setup common event transformers */ setupCommonTransformers(): void { // Transform error events to include more context this.registerEventTransformer('error', data => ({ ...data, timestamp: Date.now(), severity: this.getErrorSeverity(data.code || data.message), suggestions: this.getErrorSuggestions(data.code || data.message), })); // Transform chunk events to include progress information this.registerEventTransformer('chunk', data => ({ ...data, timestamp: Date.now(), // Add progress estimation if we can determine it ...(data.messageId && { progress: this.estimateProgress(data) }), })); // Transform tool call events to include execution context this.registerEventTransformer('toolCall', data => ({ ...data, timestamp: Date.now(), category: this.getToolCategory(data.toolName), expectedDuration: this.getExpectedToolDuration(data.toolName), })); logger.info('Common WebSocket event transformers setup completed'); } /** * Get error severity level */ private getErrorSeverity(errorCode: string): 'low' | 'medium' | 'high' | 'critical' { const criticalErrors = ['SYSTEM_ERROR', 'DATABASE_ERROR', 'MEMORY_ERROR']; const highErrors = ['PROCESSING_ERROR', 'LLM_ERROR', 'TOOL_ERROR']; const mediumErrors = ['VALIDATION_ERROR', 'AUTHENTICATION_ERROR']; if (criticalErrors.some(code => errorCode.includes(code))) return 'critical'; if (highErrors.some(code => errorCode.includes(code))) return 'high'; if (mediumErrors.some(code => errorCode.includes(code))) return 'medium'; return 'low'; } /** * Get error suggestions */ private getErrorSuggestions(errorCode: string): string[] { const suggestions: Record<string, string[]> = { WEBSOCKET_ERROR: ['Check your internet connection', 'Try refreshing the page'], PROCESSING_ERROR: [ 'Try rephrasing your request', 'Check if all required fields are provided', ], VALIDATION_ERROR: ['Check your input format', 'Ensure all required fields are provided'], AUTHENTICATION_ERROR: ['Please log in again', 'Check your session is still valid'], }; for (const [code, hints] of Object.entries(suggestions)) { if (errorCode.includes(code)) { return hints; } } return ['Try again in a few moments', 'Contact support if the issue persists']; } /** * Estimate progress for chunk events */ private estimateProgress(data: any): number { // This is a simple estimation - in a real implementation, // you might track message length and estimate based on typical response sizes if (data.isComplete) return 100; if (data.text && data.text.length > 0) { // Simple heuristic: assume we're 10-90% done based on current text length const estimatedProgress = Math.min(90, Math.max(10, data.text.length / 10)); return Math.round(estimatedProgress); } return 10; } /** * Get tool category */ private getToolCategory(toolName: string): string { const categories: Record<string, string[]> = { memory: ['store_memory', 'search_memory', 'extract_knowledge'], knowledge: ['add_node', 'search_graph', 'extract_entities'], system: ['reset', 'config', 'health'], processing: ['run', 'execute', 'process'], }; for (const [category, tools] of Object.entries(categories)) { if (tools.some(tool => toolName.toLowerCase().includes(tool))) { return category; } } return 'general'; } /** * Get expected tool duration in milliseconds */ private getExpectedToolDuration(toolName: string): number { const durations: Record<string, number> = { store_memory: 2000, search_memory: 1500, extract_knowledge: 3000, add_node: 1000, search_graph: 2000, reset: 500, config: 300, }; const lowerToolName = toolName.toLowerCase(); for (const [tool, duration] of Object.entries(durations)) { if (lowerToolName.includes(tool)) { return duration; } } return 2000; // Default 2 seconds } /** * Get bridge statistics */ getStats(): { filtersRegistered: number; transformersRegistered: number; routingRules: Record<string, string>; eventsProcessed: number; } { return { filtersRegistered: this.eventFilters.size, transformersRegistered: this.eventTransformers.size, routingRules: Object.fromEntries(this.routingRules), eventsProcessed: 0, // TODO: Track this }; } /** * Clear all filters, transformers, and routing rules */ clear(): void { this.eventFilters.clear(); this.eventTransformers.clear(); this.routingRules.clear(); this.setupDefaultRoutingRules(); logger.info('WebSocket event bridge cleared and reset to defaults'); } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/campfirein/cipher'

If you have feedback or need assistance with the MCP directory API, please join our Discord server