Skip to main content
Glama
waldzellai

Exa Websets MCP Server

by waldzellai
EventProcessor.ts11.8 kB
/** * Event Processor Implementation * * Processes events from the event queue using registered handlers. * Supports concurrent processing, timeout handling, and dead letter queue. */ import { EventEmitter } from 'events'; import { WebsetEvent, EventType } from '../types/websets.js'; import { EventHandler, EventHandlerRegistry, EventProcessorConfig, EventProcessingResult, EventProcessorStats, DEFAULT_EVENT_SYSTEM_CONFIG } from './EventTypes.js'; /** * Registry for managing event handlers */ export class DefaultEventHandlerRegistry implements EventHandlerRegistry { public handlers = new Map<EventType, EventHandler[]>(); register(handler: EventHandler): void { // Register handler for all event types it can handle const allEventTypes: EventType[] = [ 'webset.created', 'webset.deleted', 'webset.idle', 'webset.paused', 'webset.item.created', 'webset.item.enriched', 'webset.search.created', 'webset.search.updated', 'webset.search.canceled', 'webset.search.completed', ]; for (const eventType of allEventTypes) { // Create a mock event to test canHandle const mockEvent = { id: 'test', type: eventType, data: {} as any, createdAt: new Date().toISOString() }; if (handler.canHandle(mockEvent)) { if (!this.handlers.has(eventType)) { this.handlers.set(eventType, []); } const typeHandlers = this.handlers.get(eventType)!; typeHandlers.push(handler); // Sort by priority (higher priority first) typeHandlers.sort((a, b) => b.priority - a.priority); } } } unregister(handler: EventHandler): void { for (const [eventType, typeHandlers] of this.handlers.entries()) { const index = typeHandlers.indexOf(handler); if (index !== -1) { typeHandlers.splice(index, 1); // Remove empty arrays if (typeHandlers.length === 0) { this.handlers.delete(eventType); } } } } getHandlers(eventType: EventType): EventHandler[] { return this.handlers.get(eventType) || []; } getAllHandlers(): EventHandler[] { const allHandlers: EventHandler[] = []; for (const handlers of this.handlers.values()) { allHandlers.push(...handlers); } return allHandlers; } clear(): void { this.handlers.clear(); } } /** * Event processor for handling events with registered handlers */ export class EventProcessor extends EventEmitter { private readonly config: EventProcessorConfig; private readonly handlerRegistry: EventHandlerRegistry; private readonly stats: EventProcessorStats = { totalProcessed: 0, totalFailed: 0, successful: 0, failed: 0, averageProcessingTime: 0, activeHandlers: 0, activeWorkers: 0, processingRate: 0, errorRate: 0, handlerStats: {}, }; private readonly processingTimes: number[] = []; private readonly activeWorkers = new Set<string>(); private readonly deadLetterQueue: WebsetEvent[] = []; private isShuttingDown = false; private rateCalculationInterval?: NodeJS.Timeout; private lastProcessedCount = 0; constructor( config: Partial<EventProcessorConfig> = {}, handlerRegistry?: EventHandlerRegistry ) { super(); this.config = { ...DEFAULT_EVENT_SYSTEM_CONFIG.processor, ...config }; this.handlerRegistry = handlerRegistry || new DefaultEventHandlerRegistry(); this.startRateCalculation(); } /** * Process a single event * @param event The event to process * @returns Promise that resolves to processing result */ async processEvent(event: WebsetEvent): Promise<EventProcessingResult> { if (this.isShuttingDown) { throw new Error('Processor is shutting down'); } const workerId = `worker-${Date.now()}-${Math.random()}`; const startTime = Date.now(); this.activeWorkers.add(workerId); this.stats.activeWorkers = this.activeWorkers.size; this.stats.totalProcessed++; try { // Get handlers for this event type const handlers = this.handlerRegistry.getHandlers(event.type); if (handlers.length === 0) { // No handlers registered for this event type this.emit('noHandlers', event); return { duration: Date.now() - startTime, metadata: { reason: 'no_handlers' }, }; } // Process with timeout await this.processWithTimeout(event, handlers); const duration = Date.now() - startTime; this.stats.successful++; this.recordProcessingTime(duration); this.emit('processed', event, { duration }); return { duration, }; } catch (error) { const duration = Date.now() - startTime; this.stats.failed++; // Add to dead letter queue if enabled if (this.config.enableDeadLetterQueue) { this.addToDeadLetterQueue(event); } this.emit('failed', event, error); return { duration, error: error as Error, }; } finally { this.activeWorkers.delete(workerId); this.stats.activeWorkers = this.activeWorkers.size; } } /** * Process event with timeout * @param event The event to process * @param handlers Array of handlers to use */ private async processWithTimeout(event: WebsetEvent, handlers: EventHandler[]): Promise<void> { const timeoutPromise = new Promise<never>((_, reject) => { setTimeout(() => { reject(new Error(`Event processing timeout after ${this.config.timeout}ms`)); }, this.config.timeout); }); const processingPromise = this.executeHandlers(event, handlers); await Promise.race([processingPromise, timeoutPromise]); } /** * Execute all handlers for an event * @param event The event to process * @param handlers Array of handlers to execute */ private async executeHandlers(event: WebsetEvent, handlers: EventHandler[]): Promise<void> { const errors: Error[] = []; // Execute handlers in priority order for (const handler of handlers) { try { await handler.handle(event); } catch (error) { errors.push(error as Error); this.emit('handlerError', event, handler, error); } } // If all handlers failed, throw combined error if (errors.length === handlers.length && errors.length > 0) { const combinedError = new Error( `All handlers failed: ${errors.map(e => e.message).join('; ')}` ); throw combinedError; } } /** * Add event to dead letter queue * @param event The event to add */ private addToDeadLetterQueue(event: WebsetEvent): void { if (!this.config.deadLetterQueue) { return; } // Check if dead letter queue is full if (this.deadLetterQueue.length >= this.config.deadLetterQueue.maxSize) { // Remove oldest event this.deadLetterQueue.shift(); } this.deadLetterQueue.push(event); this.emit('deadLetter', event); } /** * Register an event handler * @param handler The handler to register */ registerHandler(handler: EventHandler): void { this.handlerRegistry.register(handler); this.emit('handlerRegistered', handler); } /** * Unregister an event handler * @param handler The handler to unregister */ unregisterHandler(handler: EventHandler): void { this.handlerRegistry.unregister(handler); this.emit('handlerUnregistered', handler); } /** * Get handlers for a specific event type * @param eventType The event type * @returns Array of handlers */ getHandlers(eventType: EventType): EventHandler[] { return this.handlerRegistry.getHandlers(eventType); } /** * Clear all registered handlers */ clearHandlers(): void { this.handlerRegistry.clear(); this.emit('handlersCleared'); } /** * Record processing time for statistics * @param duration Processing duration in milliseconds */ private recordProcessingTime(duration: number): void { this.processingTimes.push(duration); // Keep only last 1000 processing times if (this.processingTimes.length > 1000) { this.processingTimes.shift(); } this.stats.averageProcessingTime = this.processingTimes.reduce((sum, time) => sum + time, 0) / this.processingTimes.length; } /** * Start processing rate calculation */ private startRateCalculation(): void { this.rateCalculationInterval = setInterval(() => { const currentProcessed = this.stats.totalProcessed; const processedSinceLastCheck = currentProcessed - this.lastProcessedCount; // Calculate rate per second (interval is 1 second) this.stats.processingRate = processedSinceLastCheck; this.lastProcessedCount = currentProcessed; }, 1000); } /** * Stop processing rate calculation */ private stopRateCalculation(): void { if (this.rateCalculationInterval) { clearInterval(this.rateCalculationInterval); this.rateCalculationInterval = undefined; } } /** * Get current processor statistics * @returns Current statistics */ getStats(): EventProcessorStats { return { ...this.stats }; } /** * Get dead letter queue events * @returns Array of events in dead letter queue */ getDeadLetterQueue(): WebsetEvent[] { return [...this.deadLetterQueue]; } /** * Clear dead letter queue */ clearDeadLetterQueue(): void { const clearedCount = this.deadLetterQueue.length; this.deadLetterQueue.length = 0; this.emit('deadLetterCleared', clearedCount); } /** * Reset processor statistics */ resetStats(): void { this.stats.totalProcessed = 0; this.stats.successful = 0; this.stats.failed = 0; this.stats.processingRate = 0; this.stats.averageProcessingTime = 0; this.processingTimes.length = 0; this.lastProcessedCount = 0; } /** * Gracefully shutdown the processor * @param timeout Maximum time to wait for shutdown in milliseconds * @returns Promise that resolves when shutdown is complete */ async shutdown(timeout: number = 30000): Promise<void> { this.isShuttingDown = true; this.stopRateCalculation(); // Wait for all active workers to complete const startTime = Date.now(); while (this.activeWorkers.size > 0 && (Date.now() - startTime) < timeout) { await new Promise(resolve => setTimeout(resolve, 100)); } if (this.activeWorkers.size > 0) { console.warn(`Processor shutdown with ${this.activeWorkers.size} workers still active`); } // Clean up arrays to prevent memory leaks this.processingTimes.length = 0; this.deadLetterQueue.length = 0; this.activeWorkers.clear(); // Clear handler registry this.handlerRegistry.clear(); // Remove all event listeners this.removeAllListeners(); this.emit('shutdown'); } /** * Health check for the processor * @returns Health status information */ healthCheck(): { healthy: boolean; activeWorkers: number; deadLetterQueueSize: number; processingRate: number; errorRate: number; } { const errorRate = this.stats.totalProcessed > 0 ? this.stats.failed / this.stats.totalProcessed : 0; return { healthy: !this.isShuttingDown && this.activeWorkers.size < this.config.concurrency, activeWorkers: this.activeWorkers.size, deadLetterQueueSize: this.deadLetterQueue.length, processingRate: this.stats.processingRate, errorRate, }; } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/waldzellai/exa-mcp-server-websets'

If you have feedback or need assistance with the MCP directory API, please join our Discord server