Skip to main content
Glama
in-memory-bus.ts10.5 kB
/** * In-Memory Event Bus * Default/fallback event bus implementation using Node.js EventEmitter * Part of Jaxon Digital Optimizely DXP MCP Server - DXP-136 Phase 3 */ import { EventEmitter } from 'events'; import { EventBusInterface, EventBusConfig, HistoryOptions, HealthStatus, BusStats } from './event-bus-interface'; import { DXPEvent, validateEvent, isTerminalEvent } from './event-types'; /** * Active operation tracking */ interface ActiveOperation { type: string; startTime: number; lastEvent: DXPEvent; } /** * Subscription tracking */ interface Subscription { pattern: string; callback: (event: DXPEvent) => void; listener: (event: DXPEvent) => void; } /** * Event with internal timestamp */ interface HistoricalEvent extends DXPEvent { _timestamp: number; } /** * In-Memory Event Bus Implementation * Uses Node.js EventEmitter for local pub/sub * Provides event history for replay capability */ export class InMemoryEventBus extends EventBusInterface { private emitter: EventEmitter; private activeOperations: Map<string, ActiveOperation>; private eventHistory: HistoricalEvent[]; private maxHistoryAge: number; private maxHistorySize: number; private subscriptions: Map<string, Subscription>; private nextSubscriptionId: number; private stats: { totalEvents: number; eventsByType: Record<string, number>; activeOperationsCount: number; subscriptionCount: number; }; private cleanupInterval: NodeJS.Timeout | null; constructor() { super(); this.emitter = new EventEmitter(); this.emitter.setMaxListeners(100); // Allow many listeners for monitoring // Track active operations this.activeOperations = new Map<string, ActiveOperation>(); // Event history for replay (last 24 hours) this.eventHistory = []; this.maxHistoryAge = 24 * 60 * 60 * 1000; // 24 hours in milliseconds this.maxHistorySize = 10000; // Max events to keep // Subscription tracking this.subscriptions = new Map<string, Subscription>(); this.nextSubscriptionId = 1; // Stats for monitoring this.stats = { totalEvents: 0, eventsByType: {}, activeOperationsCount: 0, subscriptionCount: 0 }; // Start cleanup interval this.cleanupInterval = setInterval(() => this.cleanupHistory(), 60 * 60 * 1000); // Every hour } /** * Initialize the event bus */ async initialize(_config: EventBusConfig = {}): Promise<void> { // In-memory bus needs no initialization if (process.env.DEBUG === 'true') { console.error('[EVENT BUS] In-memory event bus initialized'); } return Promise.resolve(); } /** * Publish an event to the bus */ async publish(event: DXPEvent): Promise<boolean> { try { // Validate event structure validateEvent(event); // Add to history this.addToHistory(event); // Update stats this.stats.totalEvents++; this.stats.eventsByType[event.eventType] = (this.stats.eventsByType[event.eventType] || 0) + 1; // Track active operation this.trackOperation(event); // Emit on both specific event type and wildcard this.emitter.emit(event.eventType, event); this.emitter.emit('*', event); // Wildcard listener for all events // Debug logging if (process.env.DEBUG === 'true') { console.error(`[EVENT BUS] Published ${event.eventType} for ${event.operationId}`); } return true; } catch (error) { console.error('[EVENT BUS] Failed to publish event:', (error as Error).message); console.error('[EVENT BUS] Event:', event); return false; } } /** * Subscribe to events matching a pattern */ async subscribe(pattern: string, callback: (event: DXPEvent) => void): Promise<string> { const subscriptionId = `sub-${this.nextSubscriptionId++}`; // Create listener that matches pattern const listener = (event: DXPEvent) => { if (this.matchesPattern(event.eventType, pattern)) { try { callback(event); } catch (error) { console.error(`[EVENT BUS] Error in subscription callback (${subscriptionId}):`, error); } } }; // Subscribe to wildcard to get all events this.emitter.on('*', listener); // Track subscription this.subscriptions.set(subscriptionId, { pattern, callback, listener }); this.stats.subscriptionCount++; if (process.env.DEBUG === 'true') { console.error(`[EVENT BUS] Subscribed to pattern: ${pattern} (${subscriptionId})`); } return subscriptionId; } /** * Unsubscribe from events */ async unsubscribe(subscriptionId: string): Promise<void> { const subscription = this.subscriptions.get(subscriptionId); if (!subscription) { throw new Error(`Subscription not found: ${subscriptionId}`); } // Remove listener this.emitter.off('*', subscription.listener); // Remove from tracking this.subscriptions.delete(subscriptionId); this.stats.subscriptionCount--; if (process.env.DEBUG === 'true') { console.error(`[EVENT BUS] Unsubscribed: ${subscriptionId}`); } } /** * Get event history (for replay) */ async getHistory(options: HistoryOptions = {}): Promise<DXPEvent[]> { const { pattern = '*', limit = 100, since = null } = options; let events: HistoricalEvent[] = this.eventHistory; // Filter by timestamp if (since) { const sinceTime = since instanceof Date ? since.getTime() : new Date(since).getTime(); events = events.filter(e => new Date(e.timestamp).getTime() >= sinceTime); } // Filter by pattern if (pattern !== '*') { events = events.filter(e => this.matchesPattern(e.eventType, pattern)); } // Limit results return events.slice(-limit); } /** * Get bus health status */ async getHealth(): Promise<HealthStatus> { return { healthy: true, type: 'in-memory', details: { activeOperations: this.activeOperations.size, subscriptions: this.subscriptions.size, historySize: this.eventHistory.length } }; } /** * Get bus statistics */ async getStats(): Promise<BusStats> { return { ...this.stats, activeOperations: Array.from(this.activeOperations.keys()), historySize: this.eventHistory.length }; } /** * Close/cleanup the event bus */ async close(): Promise<void> { if (this.cleanupInterval) { clearInterval(this.cleanupInterval); } this.emitter.removeAllListeners(); this.subscriptions.clear(); this.activeOperations.clear(); this.eventHistory = []; if (process.env.DEBUG === 'true') { console.error('[EVENT BUS] In-memory event bus closed'); } } // Internal helper methods /** * Track operation lifecycle */ private trackOperation(event: DXPEvent): void { const { operationId, eventType } = event; if (!this.activeOperations.has(operationId)) { // New operation this.activeOperations.set(operationId, { type: event.eventType.split('.')[0], // deployment, export, download startTime: Date.now(), lastEvent: event }); this.stats.activeOperationsCount++; } else { // Update existing operation const op = this.activeOperations.get(operationId)!; op.lastEvent = event; // Remove if terminal state if (isTerminalEvent(eventType)) { this.activeOperations.delete(operationId); this.stats.activeOperationsCount--; } } } /** * Add event to history */ private addToHistory(event: DXPEvent): void { this.eventHistory.push({ ...event, _timestamp: Date.now() // Internal tracking }); // Limit history size if (this.eventHistory.length > this.maxHistorySize) { this.eventHistory.shift(); } } /** * Clean up old events from history */ private cleanupHistory(): void { const cutoff = Date.now() - this.maxHistoryAge; this.eventHistory = this.eventHistory.filter(e => e._timestamp >= cutoff); if (process.env.DEBUG === 'true') { console.error(`[EVENT BUS] Cleaned history, ${this.eventHistory.length} events remaining`); } } /** * Match event type against pattern * Supports wildcards: deployment.* matches deployment.started, deployment.failed, etc. */ private matchesPattern(eventType: string, pattern: string): boolean { // Exact match if (pattern === eventType || pattern === '*') { return true; } // Wildcard match (e.g., deployment.*) if (pattern.endsWith('.*')) { const prefix = pattern.slice(0, -2); // Remove .* return eventType.startsWith(prefix + '.'); } // Prefix match (e.g., deployment) if (pattern.endsWith('*')) { const prefix = pattern.slice(0, -1); // Remove * return eventType.startsWith(prefix); } return false; } /** * Reset the event bus (for testing) */ reset(): void { this.emitter.removeAllListeners(); this.activeOperations.clear(); this.eventHistory = []; this.subscriptions.clear(); this.stats = { totalEvents: 0, eventsByType: {}, activeOperationsCount: 0, subscriptionCount: 0 }; } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/JaxonDigital/optimizely-dxp-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server