Skip to main content
Glama
event-bus-factory.ts4.07 kB
/** * Event Bus Factory * Creates appropriate event bus implementation based on configuration * Handles fallback from Redis to in-memory on connection failures * Part of Jaxon Digital Optimizely DXP MCP Server - DXP-136 Phase 3 */ import { InMemoryEventBus } from './in-memory-bus'; import { RedisEventBus } from './redis-event-bus'; import { EventBusInterface, EventBusConfig } from './event-bus-interface'; /** * Event Bus Types */ export const BUS_TYPES = { MEMORY: 'memory', REDIS: 'redis' } as const; /** * Bus type literal type */ export type BusType = typeof BUS_TYPES[keyof typeof BUS_TYPES]; /** * Factory configuration */ export interface FactoryConfig extends EventBusConfig { type?: string; fallbackToMemory?: boolean; } /** * Singleton event bus instance */ let eventBusInstance: EventBusInterface | null = null; /** * Create event bus instance based on configuration * @param config - Configuration object * @returns Event bus instance */ export async function createEventBus(config: FactoryConfig = {}): Promise<EventBusInterface> { const { type = process.env.EVENT_BUS_TYPE || BUS_TYPES.MEMORY, redisUrl = process.env.REDIS_URL, projectId = process.env.PROJECT_NAME || 'default', fallbackToMemory = true } = config; // Normalize type const busType = type.toLowerCase(); try { if (busType === BUS_TYPES.REDIS) { // Attempt Redis connection if (!redisUrl) { throw new Error('REDIS_URL is required when EVENT_BUS_TYPE=redis'); } if (process.env.DEBUG === 'true') { console.error('[EVENT BUS FACTORY] Creating Redis event bus...'); } const redisBus = new RedisEventBus(); await redisBus.initialize({ redisUrl, projectId }); console.error('[EVENT BUS] Redis event bus initialized successfully'); return redisBus; } else if (busType === BUS_TYPES.MEMORY) { // In-memory event bus (default) if (process.env.DEBUG === 'true') { console.error('[EVENT BUS FACTORY] Creating in-memory event bus...'); } const memoryBus = new InMemoryEventBus(); await memoryBus.initialize({ projectId }); return memoryBus; } else { throw new Error(`Unknown event bus type: ${busType}. Must be 'memory' or 'redis'`); } } catch (error) { console.error('[EVENT BUS FACTORY] Failed to create event bus:', (error as Error).message); // Fallback to in-memory if Redis fails if (busType === BUS_TYPES.REDIS && fallbackToMemory) { console.error('[EVENT BUS FACTORY] Falling back to in-memory event bus...'); const memoryBus = new InMemoryEventBus(); await memoryBus.initialize({ projectId }); console.error('[EVENT BUS] In-memory event bus initialized (fallback mode)'); return memoryBus; } throw error; } } /** * Get or create singleton event bus instance * @param config - Configuration object (only used on first call) * @returns Event bus instance */ export async function getEventBus(config: FactoryConfig = {}): Promise<EventBusInterface> { if (!eventBusInstance) { eventBusInstance = await createEventBus(config); } return eventBusInstance; } /** * Reset event bus instance (for testing) */ export async function resetEventBus(): Promise<void> { if (eventBusInstance) { await eventBusInstance.close(); eventBusInstance = null; } } /** * Get event bus configuration from environment * @returns Configuration object */ export function getConfigFromEnvironment(): FactoryConfig { return { type: process.env.EVENT_BUS_TYPE || BUS_TYPES.MEMORY, redisUrl: process.env.REDIS_URL, projectId: process.env.PROJECT_NAME || 'default', fallbackToMemory: process.env.EVENT_BUS_FALLBACK !== 'false' // Default true }; }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/JaxonDigital/optimizely-dxp-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server