Skip to main content
Glama
event-emitter.ts7.2 kB
/** * Central Event Emitter * Core event bus for all async operations in the MCP server * Now uses pluggable event bus implementations (in-memory or Redis) * Part of Jaxon Digital Optimizely DXP MCP Server - DXP-136 Phase 3 */ import { getEventBus, getConfigFromEnvironment } from './event-bus-factory'; import { validateEvent } from './event-types'; import { EventBusInterface, HistoryOptions, HealthStatus, BusStats } from './event-bus-interface'; import { DXPEvent } from './event-types'; /** * Pending subscription for lazy initialization */ interface PendingSubscription { eventType: string; callback: (event: DXPEvent) => void; } /** * DXP Event Emitter Wrapper * Maintains backward compatibility with Phase 1/2 code * Delegates to pluggable event bus implementation */ export class DXPEventEmitter { private eventBus: EventBusInterface | null; private initialized: boolean; private _pendingSubscriptions: PendingSubscription[]; private initializationPromise: Promise<void> | null; constructor() { this.eventBus = null; // Will be initialized lazily this.initialized = false; this._pendingSubscriptions = []; this.initializationPromise = null; } /** * Initialize the event bus (lazy initialization with mutex pattern) * Prevents race condition when multiple calls happen concurrently * DXP-144: Fixed race condition with Promise-based mutex */ private async _ensureInitialized(): Promise<void> { // Fast path: already initialized if (this.initialized) { return; } // Initialization in progress - wait for it if (this.initializationPromise) { await this.initializationPromise; return; } // Start initialization (only first caller gets here) this.initializationPromise = this._initialize(); await this.initializationPromise; } /** * Internal initialization method (separated for mutex pattern) * DXP-144: Extracted from _ensureInitialized for thread-safe initialization */ private async _initialize(): Promise<void> { const config = getConfigFromEnvironment(); this.eventBus = await getEventBus(config); this.initialized = true; } /** * Emit a validated event (backward compatible API) * @param event - Event object */ async emitEvent(event: DXPEvent): Promise<void> { try { // Validate event structure validateEvent(event); // Ensure bus is initialized await this._ensureInitialized(); // Publish to event bus if (this.eventBus) { await this.eventBus.publish(event); } } catch (error) { console.error('[EVENT EMITTER] Failed to emit event:', (error as Error).message); console.error('[EVENT EMITTER] Event:', event); } } /** * Get active operations (backward compatible) */ async getActiveOperations(): Promise<string[]> { await this._ensureInitialized(); if (!this.eventBus) return []; const stats = await this.eventBus.getStats(); return stats.activeOperations || []; } /** * Get statistics (backward compatible) */ async getStats(): Promise<BusStats> { await this._ensureInitialized(); if (!this.eventBus) { return { totalEvents: 0, eventsByType: {}, activeOperations: [], subscriptionCount: 0 }; } return await this.eventBus.getStats(); } /** * Subscribe to events (new in Phase 3) * @param pattern - Event pattern (e.g., 'deployment.*') * @param callback - Callback function * @returns Subscription ID */ async subscribe(pattern: string, callback: (event: DXPEvent) => void): Promise<string> { await this._ensureInitialized(); if (!this.eventBus) { throw new Error('Event bus not initialized'); } return await this.eventBus.subscribe(pattern, callback); } /** * Unsubscribe from events (new in Phase 3) * @param subscriptionId - Subscription ID */ async unsubscribe(subscriptionId: string): Promise<void> { await this._ensureInitialized(); if (!this.eventBus) { throw new Error('Event bus not initialized'); } return await this.eventBus.unsubscribe(subscriptionId); } /** * Get event history (new in Phase 3) * @param options - Query options * @returns Array of events */ async getHistory(options: HistoryOptions = {}): Promise<DXPEvent[]> { await this._ensureInitialized(); if (!this.eventBus) return []; return await this.eventBus.getHistory(options); } /** * Get bus health status (new in Phase 3) */ async getHealth(): Promise<HealthStatus> { await this._ensureInitialized(); if (!this.eventBus) { return { healthy: false, details: { error: 'Event bus not initialized' } }; } return await this.eventBus.getHealth(); } /** * On (backward compatible with EventEmitter API) * Used by resource handlers that call emitter.on('event', callback) * @param eventType - Event type or pattern * @param callback - Callback function */ on(eventType: string, callback: (event: DXPEvent) => void): void { // Store callback for lazy initialization if (!this._pendingSubscriptions) { this._pendingSubscriptions = []; } this._pendingSubscriptions.push({ eventType, callback }); // Try to initialize immediately if not already done this._ensureInitialized().then(() => { // Subscribe to events if (this.eventBus) { this.eventBus.subscribe(eventType, callback).catch(err => { console.error('[EVENT EMITTER] Failed to subscribe:', err); }); } }).catch(err => { console.error('[EVENT EMITTER] Failed to initialize for subscription:', err); }); } /** * Reset the emitter (for testing) * DXP-144: Also clears initialization promise to prevent reuse */ async reset(): Promise<void> { if (this.eventBus) { await this.eventBus.close(); } this.eventBus = null; this.initialized = false; this._pendingSubscriptions = []; this.initializationPromise = null; } } // Singleton instance let globalEmitter: DXPEventEmitter | null = null; /** * Get the global event emitter instance * @returns Global emitter */ export function getGlobalEmitter(): DXPEventEmitter { if (!globalEmitter) { globalEmitter = new DXPEventEmitter(); } return globalEmitter; } /** * Reset the global emitter (for testing) */ export async function resetGlobalEmitter(): Promise<void> { if (globalEmitter) { await globalEmitter.reset(); } globalEmitter = null; }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/JaxonDigital/optimizely-dxp-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server