Skip to main content
Glama
waldzellai

Exa Websets MCP Server

by waldzellai
EventQueue.ts9.25 kB
/** * Event Queue Implementation * * Thread-safe event queue for managing and processing Websets events * with retry logic, backoff strategies, and comprehensive statistics. */ import { EventEmitter } from 'events'; import { WebsetEvent } from '../types/websets.js'; import { QueuedEvent, EventQueueConfig, ExtendedEventQueueStats, EventQueueStats, DEFAULT_EVENT_SYSTEM_CONFIG, getEventPriority, EventType } from './EventTypes.js'; /** * Thread-safe event queue with retry logic and statistics */ export class EventQueue extends EventEmitter { private readonly config: EventQueueConfig; private readonly queue: QueuedEvent[] = []; private readonly processing = new Set<string>(); private readonly stats: ExtendedEventQueueStats = { totalEvents: 0, pendingEvents: 0, processingEvents: 0, completedEvents: 0, failedEvents: 0, totalQueued: 0, currentSize: 0, processing: 0, processed: 0, failed: 0, averageProcessingTime: 0, eventsByType: {} as Record<EventType, number>, queueHealth: 'healthy', }; private readonly processingTimes: number[] = []; private processingInterval?: NodeJS.Timeout; private isShuttingDown = false; constructor(config: Partial<EventQueueConfig> = {}) { super(); this.config = { ...DEFAULT_EVENT_SYSTEM_CONFIG.queue, ...config }; this.startProcessing(); } /** * Add an event to the queue * @param event The event to queue * @returns Promise that resolves when event is queued */ async enqueue(event: WebsetEvent): Promise<void> { if (this.isShuttingDown) { throw new Error('Queue is shutting down'); } if (this.queue.length >= this.config.maxSize) { throw new Error(`Queue is full (max size: ${this.config.maxSize})`); } const queuedEvent: QueuedEvent = { event, priority: getEventPriority(event.type), attempts: 0, maxAttempts: this.config.maxAttempts, nextAttemptAt: new Date(), addedAt: new Date(), queuedAt: new Date(), }; this.queue.push(queuedEvent); this.stats.totalQueued++; this.stats.currentSize = this.queue.length; this.emit('enqueued', queuedEvent); } /** * Get the next batch of events ready for processing * @returns Array of events ready for processing */ private getNextBatch(): QueuedEvent[] { const now = new Date(); const batch: QueuedEvent[] = []; for (let i = 0; i < this.queue.length && batch.length < this.config.batchSize; i++) { const queuedEvent = this.queue[i]; // Skip if already processing if (this.processing.has(queuedEvent.event.id)) { continue; } // Skip if waiting for retry if (queuedEvent.nextRetryAt && queuedEvent.nextRetryAt > now) { continue; } // Skip if max attempts exceeded if (queuedEvent.attempts >= queuedEvent.maxAttempts) { continue; } batch.push(queuedEvent); } return batch; } /** * Process a single event * @param queuedEvent The queued event to process * @returns Promise that resolves when processing is complete */ private async processEvent(queuedEvent: QueuedEvent): Promise<void> { const startTime = Date.now(); const eventId = queuedEvent.event.id; this.processing.add(eventId); this.stats.processing = this.processing.size; queuedEvent.attempts++; try { this.emit('processing', queuedEvent); // Emit the event for handlers to process this.emit('event', queuedEvent.event); // Remove from queue on successful processing const index = this.queue.indexOf(queuedEvent); if (index !== -1) { this.queue.splice(index, 1); this.stats.currentSize = this.queue.length; } this.stats.processed++; this.recordProcessingTime(Date.now() - startTime); this.emit('processed', queuedEvent); } catch (error) { queuedEvent.lastError = error as Error; if (queuedEvent.attempts < queuedEvent.maxAttempts) { // Schedule retry queuedEvent.nextRetryAt = this.calculateNextRetryTime(queuedEvent.attempts); this.emit('retrying', queuedEvent, error); } else { // Max attempts exceeded, move to failed const index = this.queue.indexOf(queuedEvent); if (index !== -1) { this.queue.splice(index, 1); this.stats.currentSize = this.queue.length; } this.stats.failed++; this.emit('failed', queuedEvent, error); } } finally { this.processing.delete(eventId); this.stats.processing = this.processing.size; } } /** * Calculate the next retry time based on attempt number * @param attempt The current attempt number * @returns Date when the next retry should occur */ private calculateNextRetryTime(attempt: number): Date { let delay = this.config.retryDelay; if (this.config.exponentialBackoff) { delay = Math.min( this.config.retryDelay * Math.pow(2, attempt - 1), this.config.maxRetryDelay ); } // Add jitter to prevent thundering herd const jitter = Math.random() * 0.1 * delay; delay += jitter; return new Date(Date.now() + delay); } /** * Record processing time for statistics * @param duration Processing duration in milliseconds */ private recordProcessingTime(duration: number): void { this.processingTimes.push(duration); // Keep only last 1000 processing times for average calculation if (this.processingTimes.length > 1000) { this.processingTimes.shift(); } this.stats.averageProcessingTime = this.processingTimes.reduce((sum, time) => sum + time, 0) / this.processingTimes.length; } /** * Start the processing loop */ private startProcessing(): void { if (this.processingInterval) { return; } this.processingInterval = setInterval(async () => { if (this.isShuttingDown) { return; } const batch = this.getNextBatch(); if (batch.length === 0) { return; } // Process events concurrently const promises = batch.map(queuedEvent => this.processEvent(queuedEvent).catch(error => { // Error handling is done in processEvent console.error('Unexpected error in event processing:', error); }) ); await Promise.allSettled(promises); }, this.config.processingInterval); } /** * Stop the processing loop */ private stopProcessing(): void { if (this.processingInterval) { clearInterval(this.processingInterval); this.processingInterval = undefined; } } /** * Get current queue statistics * @returns Current statistics */ getStats(): EventQueueStats { return { ...this.stats }; } /** * Get current queue size * @returns Number of events in queue */ size(): number { return this.queue.length; } /** * Check if queue is empty * @returns True if queue is empty */ isEmpty(): boolean { return this.queue.length === 0; } /** * Clear all events from the queue */ clear(): void { const clearedCount = this.queue.length; this.queue.length = 0; this.stats.currentSize = 0; this.emit('cleared', clearedCount); } /** * Get events that are waiting for retry * @returns Array of events waiting for retry */ getRetryingEvents(): QueuedEvent[] { const now = new Date(); return this.queue.filter(event => event.nextRetryAt && event.nextRetryAt > now ); } /** * Get events that have failed all retry attempts * @returns Array of failed events still in queue */ getFailedEvents(): QueuedEvent[] { return this.queue.filter(event => event.attempts >= event.maxAttempts ); } /** * Gracefully shutdown the queue * @param timeout Maximum time to wait for shutdown in milliseconds * @returns Promise that resolves when shutdown is complete */ async shutdown(timeout: number = 30000): Promise<void> { this.isShuttingDown = true; this.stopProcessing(); // Wait for all processing to complete const startTime = Date.now(); while (this.processing.size > 0 && (Date.now() - startTime) < timeout) { await new Promise(resolve => setTimeout(resolve, 100)); } if (this.processing.size > 0) { console.warn(`Queue shutdown with ${this.processing.size} events still processing`); } this.emit('shutdown'); } /** * Health check for the queue * @returns Health status information */ healthCheck(): { healthy: boolean; queueSize: number; processing: number; failedEvents: number; retryingEvents: number; } { const failedEvents = this.getFailedEvents().length; const retryingEvents = this.getRetryingEvents().length; return { healthy: !this.isShuttingDown && this.queue.length < this.config.maxSize, queueSize: this.queue.length, processing: this.processing.size, failedEvents, retryingEvents, }; } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/waldzellai/exa-mcp-server-websets'

If you have feedback or need assistance with the MCP directory API, please join our Discord server