Skip to main content
Glama
webhook-queue.ts10.6 kB
/** * Webhook Queue * Retry queue for webhook deliveries * Part of Jaxon Digital Optimizely DXP MCP Server - DXP-136 Phase 2 */ import WebhookSender from './webhook-sender'; import WebhookLogger from './webhook-logger'; import { SendResult } from './webhook-sender'; /** * Queue options */ export interface QueueOptions { maxRetries?: number; retryDelays?: number[]; maxQueueSize?: number; debug?: boolean; } /** * Queued webhook error */ export interface QueueError { attempt: number; error: string; statusCode?: number; timestamp: number; } /** * Queued webhook */ export interface QueuedWebhook { id: string; url: string; payload: any; headers: Record<string, string>; attempts: number; maxRetries: number; queuedAt: number; lastAttempt: number | null; nextRetry: number; errors: QueueError[]; } /** * Queue statistics */ export interface QueueStats { queued: number; sent: number; failed: number; retrying: number; queueSize: number; activeDeliveries: number; queuesCount: number; } /** * Enqueue options */ export interface EnqueueOptions { webhookId?: string; headers?: Record<string, string>; } /** * Webhook Queue Class * Manages queued webhook deliveries with retry logic */ class WebhookQueue { private options: Required<QueueOptions>; private queues: Map<string, QueuedWebhook[]>; private activeDeliveries: Set<string>; private stats: { queued: number; sent: number; failed: number; retrying: number; }; constructor(options: QueueOptions = {}) { this.options = { maxRetries: options.maxRetries || 3, retryDelays: options.retryDelays || [1000, 2000, 4000], // 1s, 2s, 4s maxQueueSize: options.maxQueueSize || 1000, debug: options.debug || process.env.DEBUG === 'true' }; // Queue storage: Map<url, Array<QueuedWebhook>> this.queues = new Map(); // Active deliveries: Set<webhookId> this.activeDeliveries = new Set(); // Stats this.stats = { queued: 0, sent: 0, failed: 0, retrying: 0 }; } /** * Queue a webhook for delivery * @param url - Webhook URL * @param payload - Event payload * @param options - Delivery options * @returns Webhook ID */ enqueue(url: string, payload: any, options: EnqueueOptions = {}): string | null { const webhookId = options.webhookId || `webhook-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`; // Check queue size if (this.getTotalQueueSize() >= this.options.maxQueueSize) { WebhookLogger.logError(webhookId, url, 'Queue full', { currentSize: this.getTotalQueueSize(), maxSize: this.options.maxQueueSize }); this.stats.failed++; return null; } // Create queued webhook const queuedWebhook: QueuedWebhook = { id: webhookId, url: url, payload: payload, headers: options.headers || {}, attempts: 0, maxRetries: this.options.maxRetries, queuedAt: Date.now(), lastAttempt: null, nextRetry: Date.now(), // Immediate first attempt errors: [] }; // Get or create queue for this URL if (!this.queues.has(url)) { this.queues.set(url, []); } this.queues.get(url)!.push(queuedWebhook); this.stats.queued++; if (this.options.debug) { console.error(`[WEBHOOK QUEUE] Queued ${webhookId} for ${url}`); } // Schedule immediate delivery this.scheduleDelivery(queuedWebhook); return webhookId; } /** * Schedule delivery attempt * @param queuedWebhook - Queued webhook */ private scheduleDelivery(queuedWebhook: QueuedWebhook): void { const delay = Math.max(0, queuedWebhook.nextRetry - Date.now()); setTimeout(async () => { await this.attemptDelivery(queuedWebhook); }, delay); } /** * Attempt webhook delivery * @param queuedWebhook - Queued webhook */ private async attemptDelivery(queuedWebhook: QueuedWebhook): Promise<void> { // Check if already being delivered if (this.activeDeliveries.has(queuedWebhook.id)) { return; } this.activeDeliveries.add(queuedWebhook.id); queuedWebhook.attempts++; queuedWebhook.lastAttempt = Date.now(); if (this.options.debug) { console.error(`[WEBHOOK QUEUE] Attempting delivery ${queuedWebhook.id} (attempt ${queuedWebhook.attempts}/${queuedWebhook.maxRetries})`); } try { // Send webhook const result: SendResult = await WebhookSender.send( queuedWebhook.url, queuedWebhook.payload, { headers: queuedWebhook.headers, webhookId: queuedWebhook.id } ); if (result.success) { // Success - remove from queue this.removeFromQueue(queuedWebhook); this.stats.sent++; WebhookLogger.logSuccess(queuedWebhook.id, queuedWebhook.url, { attempts: queuedWebhook.attempts, responseTime: result.responseTime, statusCode: result.statusCode }); if (this.options.debug) { console.error(`[WEBHOOK QUEUE] Successfully delivered ${queuedWebhook.id}`); } } else { // Failure - check if retryable queuedWebhook.errors.push({ attempt: queuedWebhook.attempts, error: result.error || 'Unknown error', statusCode: result.statusCode, timestamp: Date.now() }); if (result.retryable && queuedWebhook.attempts < queuedWebhook.maxRetries) { // Schedule retry const retryDelay = this.getRetryDelay(queuedWebhook.attempts); queuedWebhook.nextRetry = Date.now() + retryDelay; this.stats.retrying++; WebhookLogger.logRetry(queuedWebhook.id, queuedWebhook.url, queuedWebhook.attempts, retryDelay, result.error || 'Unknown error'); if (this.options.debug) { console.error(`[WEBHOOK QUEUE] Retrying ${queuedWebhook.id} in ${retryDelay}ms (attempt ${queuedWebhook.attempts}/${queuedWebhook.maxRetries})`); } // Schedule next retry this.scheduleDelivery(queuedWebhook); } else { // Max retries reached or not retryable - remove from queue this.removeFromQueue(queuedWebhook); this.stats.failed++; WebhookLogger.logFailure(queuedWebhook.id, queuedWebhook.url, { attempts: queuedWebhook.attempts, errors: queuedWebhook.errors, finalError: result.error }); if (this.options.debug) { console.error(`[WEBHOOK QUEUE] Failed to deliver ${queuedWebhook.id} after ${queuedWebhook.attempts} attempts`); } } } } catch (error: any) { // Unexpected error this.removeFromQueue(queuedWebhook); this.stats.failed++; WebhookLogger.logError(queuedWebhook.id, queuedWebhook.url, error.message); if (this.options.debug) { console.error(`[WEBHOOK QUEUE] Unexpected error delivering ${queuedWebhook.id}:`, error); } } finally { this.activeDeliveries.delete(queuedWebhook.id); } } /** * Get retry delay for attempt number * @param attemptNumber - Current attempt number * @returns Delay in milliseconds */ private getRetryDelay(attemptNumber: number): number { const index = attemptNumber - 1; if (index < this.options.retryDelays.length) { return this.options.retryDelays[index]; } // Use last delay for any attempts beyond configured delays return this.options.retryDelays[this.options.retryDelays.length - 1]; } /** * Remove webhook from queue * @param queuedWebhook - Queued webhook */ private removeFromQueue(queuedWebhook: QueuedWebhook): void { const queue = this.queues.get(queuedWebhook.url); if (queue) { const index = queue.findIndex(w => w.id === queuedWebhook.id); if (index !== -1) { queue.splice(index, 1); } // Clean up empty queues if (queue.length === 0) { this.queues.delete(queuedWebhook.url); } } } /** * Get total queue size across all URLs * @returns Total queued webhooks */ getTotalQueueSize(): number { let total = 0; for (const queue of this.queues.values()) { total += queue.length; } return total; } /** * Get queue statistics * @returns Queue stats */ getStats(): QueueStats { return { ...this.stats, queueSize: this.getTotalQueueSize(), activeDeliveries: this.activeDeliveries.size, queuesCount: this.queues.size }; } /** * Clear all queues */ clear(): void { this.queues.clear(); this.activeDeliveries.clear(); this.stats = { queued: 0, sent: 0, failed: 0, retrying: 0 }; } } // Singleton instance let globalQueue: WebhookQueue | null = null; /** * Get the global webhook queue instance * @returns Global queue */ export function getGlobalWebhookQueue(): WebhookQueue { if (!globalQueue) { globalQueue = new WebhookQueue(); } return globalQueue; } /** * Reset the global queue (for testing) */ export function resetGlobalWebhookQueue(): void { if (globalQueue) { globalQueue.clear(); } globalQueue = null; } export { WebhookQueue };

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/JaxonDigital/optimizely-dxp-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server