Skip to main content
Glama
webhooks.tsβ€’12.9 kB
/** * Webhook Forwarding System * * Forwards events to external systems via HTTP webhooks for integrations and monitoring. */ import { EventEnvelope, EventFilter } from './event-types.js'; import { logger } from '../logger/logger.js'; // Import fetch and Response for Node.js compatibility import { fetch, Response } from 'undici'; export interface WebhookConfig { url: string; method?: 'POST' | 'PUT' | 'PATCH'; headers?: Record<string, string>; timeout?: number; retries?: number; retryDelay?: number; secret?: string; filters?: EventFilter[]; enabled?: boolean; batchSize?: number; batchTimeout?: number; } export interface WebhookDelivery { id: string; webhookId: string; event: EventEnvelope; url: string; status: 'pending' | 'delivered' | 'failed' | 'retrying'; attempts: number; createdAt: number; lastAttemptAt?: number; deliveredAt?: number; error?: string; } export interface WebhookStats { totalEvents: number; deliveredEvents: number; failedEvents: number; pendingEvents: number; averageDeliveryTime: number; errorRate: number; } /** * Webhook delivery manager */ export class WebhookForwarder { private webhooks = new Map<string, WebhookConfig>(); private deliveryQueue: WebhookDelivery[] = []; private deliveryHistory: WebhookDelivery[] = []; private processingInterval?: NodeJS.Timeout; private batchQueues = new Map<string, EventEnvelope[]>(); private batchTimeouts = new Map<string, NodeJS.Timeout>(); private stats = new Map<string, WebhookStats>(); constructor() { // Start processing queue this.processingInterval = setInterval(() => { this.processDeliveryQueue(); }, 1000); } /** * Register a webhook endpoint */ registerWebhook(id: string, config: WebhookConfig): void { const fullConfig: Required<WebhookConfig> = { method: 'POST', headers: { 'Content-Type': 'application/json' }, timeout: 5000, retries: 3, retryDelay: 1000, secret: '', filters: [], enabled: true, batchSize: 1, batchTimeout: 1000, ...config, }; this.webhooks.set(id, fullConfig); this.stats.set(id, this.createWebhookStats()); logger.info('Webhook registered', { webhookId: id, url: config.url, enabled: fullConfig.enabled, }); } /** * Unregister a webhook endpoint */ unregisterWebhook(id: string): void { this.webhooks.delete(id); this.stats.delete(id); // Clear any pending batches const timeout = this.batchTimeouts.get(id); if (timeout) { clearTimeout(timeout); this.batchTimeouts.delete(id); } this.batchQueues.delete(id); logger.info('Webhook unregistered', { webhookId: id }); } /** * Forward an event to registered webhooks */ async forwardEvent(event: EventEnvelope): Promise<void> { for (const [webhookId, config] of this.webhooks) { if (!config.enabled) continue; // Apply filters if (config.filters && config.filters.length > 0) { const passesFilters = config.filters.some(filter => filter(event)); if (!passesFilters) continue; } // Handle batching if ((config.batchSize ?? 1) > 1) { await this.addToBatch(webhookId, event, config); } else { await this.queueDelivery(webhookId, event, config); } } } /** * Get webhook statistics */ getWebhookStats(webhookId?: string): WebhookStats | Record<string, WebhookStats> { if (webhookId) { return this.stats.get(webhookId) || this.createWebhookStats(); } const allStats: Record<string, WebhookStats> = {}; for (const [id, stats] of this.stats) { allStats[id] = { ...stats }; } return allStats; } /** * Get delivery history */ getDeliveryHistory(webhookId?: string, limit = 100): WebhookDelivery[] { let history = this.deliveryHistory; if (webhookId) { history = history.filter(d => d.webhookId === webhookId); } return history.sort((a, b) => b.createdAt - a.createdAt).slice(0, limit); } /** * Retry failed deliveries */ async retryFailedDeliveries(webhookId?: string): Promise<number> { const failedDeliveries = this.deliveryHistory.filter( d => d.status === 'failed' && (!webhookId || d.webhookId === webhookId) ); for (const delivery of failedDeliveries) { delivery.status = 'pending'; delivery.error = undefined as unknown as string; this.deliveryQueue.push(delivery); } logger.info('Retrying failed deliveries', { count: failedDeliveries.length, webhookId, }); return failedDeliveries.length; } /** * Update webhook configuration */ updateWebhook(id: string, updates: Partial<WebhookConfig>): void { const existing = this.webhooks.get(id); if (!existing) { throw new Error(`Webhook ${id} not found`); } const updated = { ...existing, ...updates }; this.webhooks.set(id, updated); logger.info('Webhook configuration updated', { webhookId: id, updates: Object.keys(updates), }); } /** * Test webhook connectivity */ async testWebhook(id: string): Promise<{ success: boolean; latency: number; error?: string }> { const config = this.webhooks.get(id); if (!config) { throw new Error(`Webhook ${id} not found`); } const testEvent: EventEnvelope = { id: `test-${Date.now()}`, type: 'cipher:test', data: { message: 'Webhook connectivity test', timestamp: Date.now() }, metadata: { timestamp: Date.now(), source: 'webhook-test' }, }; const startTime = Date.now(); try { const response = await this.deliverEvent(config, testEvent); const latency = Date.now() - startTime; return { success: response.ok, latency, ...(response.ok ? {} : { error: `HTTP ${response.status}: ${response.statusText}` }), }; } catch (error) { const latency = Date.now() - startTime; return { success: false, latency, error: error instanceof Error ? error.message : String(error), }; } } /** * Dispose of the webhook forwarder */ dispose(): void { if (this.processingInterval) { clearInterval(this.processingInterval); } // Clear all batch timeouts for (const timeout of this.batchTimeouts.values()) { clearTimeout(timeout); } this.batchTimeouts.clear(); this.batchQueues.clear(); logger.info('Webhook forwarder disposed'); } private async addToBatch( webhookId: string, event: EventEnvelope, config: WebhookConfig ): Promise<void> { if (!this.batchQueues.has(webhookId)) { this.batchQueues.set(webhookId, []); } const batch = this.batchQueues.get(webhookId)!; batch.push(event); // Check if batch is full if (batch.length >= (config.batchSize || 1)) { await this.flushBatch(webhookId, config); return; } // Set timeout for batch if not already set if (!this.batchTimeouts.has(webhookId)) { const timeout = setTimeout(() => { this.flushBatch(webhookId, config); }, config.batchTimeout); this.batchTimeouts.set(webhookId, timeout); } } private async flushBatch(webhookId: string, config: WebhookConfig): Promise<void> { const batch = this.batchQueues.get(webhookId); if (!batch || batch.length === 0) return; // Clear timeout const timeout = this.batchTimeouts.get(webhookId); if (timeout) { clearTimeout(timeout); this.batchTimeouts.delete(webhookId); } // Create batch event const batchEvent: EventEnvelope = { id: `batch-${Date.now()}`, type: 'cipher:batch', data: { events: batch, count: batch.length }, metadata: { timestamp: Date.now(), source: 'batch' }, }; await this.queueDelivery(webhookId, batchEvent, config); // Clear batch this.batchQueues.set(webhookId, []); } private async queueDelivery( webhookId: string, event: EventEnvelope, config: WebhookConfig ): Promise<void> { const delivery: WebhookDelivery = { id: `delivery-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`, webhookId, event, url: config.url, status: 'pending', attempts: 0, createdAt: Date.now(), }; this.deliveryQueue.push(delivery); this.updateStats(webhookId, stats => stats.totalEvents++); } private async processDeliveryQueue(): Promise<void> { const pendingDeliveries = this.deliveryQueue.filter(d => d.status === 'pending'); for (const delivery of pendingDeliveries.slice(0, 10)) { // Process max 10 at a time await this.processDelivery(delivery); } // Clean up old completed deliveries this.deliveryQueue = this.deliveryQueue.filter( d => d.status === 'pending' || d.status === 'retrying' ); } private async processDelivery(delivery: WebhookDelivery): Promise<void> { const config = this.webhooks.get(delivery.webhookId); if (!config || !config.enabled) { delivery.status = 'failed'; delivery.error = 'Webhook disabled or not found'; this.moveToHistory(delivery); return; } delivery.attempts++; delivery.lastAttemptAt = Date.now(); delivery.status = 'retrying'; try { const response = await this.deliverEvent(config, delivery.event); if (response.ok) { delivery.status = 'delivered'; delivery.deliveredAt = Date.now(); this.updateStats(delivery.webhookId, stats => { stats.deliveredEvents++; if (delivery.deliveredAt && delivery.createdAt) { const deliveryTime = delivery.deliveredAt - delivery.createdAt; stats.averageDeliveryTime = (stats.averageDeliveryTime + deliveryTime) / 2; } }); } else { throw new Error(`HTTP ${response.status}: ${response.statusText}`); } } catch (error) { const errorMessage = error instanceof Error ? error.message : String(error); if ((delivery.attempts ?? 0) >= (config.retries ?? 0)) { delivery.status = 'failed'; delivery.error = errorMessage; this.updateStats(delivery.webhookId, stats => { stats.failedEvents++; stats.errorRate = stats.failedEvents / stats.totalEvents; }); } else { delivery.status = 'pending'; // Add delay before retry setTimeout( () => { delivery.status = 'pending'; }, (config.retryDelay ?? 0) * delivery.attempts ); } logger.warn('Webhook delivery failed', { deliveryId: delivery.id, webhookId: delivery.webhookId, attempt: delivery.attempts, error: errorMessage, }); } if (delivery.status === 'delivered' || delivery.status === 'failed') { this.moveToHistory(delivery); } } private async deliverEvent(config: WebhookConfig, event: EventEnvelope): Promise<Response> { const body = JSON.stringify(event); const headers = { ...config.headers }; // Add signature if secret is provided if (config.secret) { const crypto = await import('crypto'); const signature = crypto.createHmac('sha256', config.secret).update(body).digest('hex'); headers['X-Webhook-Signature'] = `sha256=${signature}`; } return fetch(config.url, { method: config.method || 'POST', headers, body, signal: AbortSignal.timeout(config.timeout || 30000), }); } private moveToHistory(delivery: WebhookDelivery): void { this.deliveryHistory.push(delivery); // Maintain history size (keep last 1000) if (this.deliveryHistory.length > 1000) { this.deliveryHistory = this.deliveryHistory.slice(-1000); } } private updateStats(webhookId: string, updater: (stats: WebhookStats) => void): void { const stats = this.stats.get(webhookId); if (stats) { updater(stats); } } private createWebhookStats(): WebhookStats { return { totalEvents: 0, deliveredEvents: 0, failedEvents: 0, pendingEvents: 0, averageDeliveryTime: 0, errorRate: 0, }; } } /** * Webhook filter builders for common use cases */ export class WebhookFilters { /** * Filter by event type */ static byEventType(...eventTypes: string[]): EventFilter { return (event: EventEnvelope) => eventTypes.includes(event.type); } /** * Filter by session ID */ static bySessionId(sessionId: string): EventFilter { return (event: EventEnvelope) => event.metadata.sessionId === sessionId; } /** * Filter by event source */ static bySource(source: string): EventFilter { return (event: EventEnvelope) => event.metadata.source === source; } /** * Filter by priority */ static byPriority(priority: 'high' | 'normal' | 'low'): EventFilter { return (event: EventEnvelope) => event.metadata.priority === priority; } /** * Filter by time range */ static byTimeRange(start: number, end: number): EventFilter { return (event: EventEnvelope) => event.metadata.timestamp >= start && event.metadata.timestamp <= end; } /** * Combine filters with AND logic */ static and(...filters: EventFilter[]): EventFilter { return (event: EventEnvelope) => filters.every(filter => filter(event)); } /** * Combine filters with OR logic */ static or(...filters: EventFilter[]): EventFilter { return (event: EventEnvelope) => filters.some(filter => filter(event)); } /** * Negate a filter */ static not(filter: EventFilter): EventFilter { return (event: EventEnvelope) => !filter(event); } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/campfirein/cipher'

If you have feedback or need assistance with the MCP directory API, please join our Discord server