MCP GitHub Issue Server
by sammcj
- src
- events
import { EventEmitter } from 'events';
import { Logger } from '../logging/index.js';
import { EventHealthMonitor } from './health-monitor.js';
import { EventBatchProcessor } from './batch-processor.js';
import {
AtlasEvent,
EventHandler,
EventSubscription,
EventTypes,
TaskEvent,
CacheEvent,
ErrorEvent,
BatchEvent,
TransactionEvent,
SystemEvent,
EventHandlerOptions,
SerializableError,
} from '../types/events.js';
interface EventStats {
emitted: number;
handled: number;
errors: number;
lastEmitted?: number;
avgHandleTime: number;
lastErrorTime?: number;
consecutiveErrors: number;
}
export class EventManager {
private static instance: EventManager | null = null;
private static initializationPromise: Promise<EventManager> | null = null;
private readonly emitter: EventEmitter;
private static logger: Logger | null = null;
private logger?: Logger;
private readonly maxListeners: number = 100;
private readonly debugMode: boolean = false; // Force debug mode off for MCP compatibility
private initialized = false;
private readonly activeSubscriptions = new Set<EventSubscription>();
private readonly maxSubscriptions = 1000; // Prevent unbounded growth
private readonly subscriptionTimeouts = new Map<string, NodeJS.Timeout>();
private isShuttingDown = false;
private readonly eventStats = new Map<EventTypes | '*', EventStats>();
private readonly healthMonitor: EventHealthMonitor;
private readonly batchProcessor: EventBatchProcessor;
private cleanupTimeout?: NodeJS.Timeout;
private readonly CLEANUP_INTERVAL = 60000; // 1 minute
private errorHandler = (error: Error) => {
if (EventManager.logger) {
EventManager.logger.error('EventEmitter error', { error });
}
};
private rejectionHandler = (reason: unknown, promise: Promise<unknown>) => {
if (EventManager.logger) {
EventManager.logger.error('Unhandled promise rejection in event handler', {
reason,
promise,
});
}
};
private setupErrorHandling(): void {
// Handle emitter errors
this.emitter.on('error', this.errorHandler);
// Handle uncaught promise rejections in handlers
process.on('unhandledRejection', this.rejectionHandler);
}
private cleanupErrorHandling(): void {
// Remove error handlers
this.emitter.off('error', this.errorHandler);
process.off('unhandledRejection', this.rejectionHandler);
}
setLogger(logger: Logger): void {
if (!EventManager.logger) {
EventManager.logger = logger.child({ component: 'EventManager' });
this.logger = EventManager.logger;
}
}
private constructor() {
// Don't initialize logger in constructor to avoid circular dependency
this.emitter = new EventEmitter();
this.emitter.setMaxListeners(this.maxListeners);
// Debug mode always off for MCP compatibility
this.debugMode = false;
this.healthMonitor = new EventHealthMonitor();
this.batchProcessor = new EventBatchProcessor({
maxBatchSize: 100,
maxWaitTime: 1000,
flushInterval: 5000,
});
this.setupErrorHandling();
this.startCleanupInterval();
}
private startCleanupInterval(): void {
// Clear any existing interval
if (this.cleanupTimeout) {
clearInterval(this.cleanupTimeout);
}
this.cleanupTimeout = setInterval(() => {
if (this.isShuttingDown) return;
try {
this.cleanupStaleStats();
this.cleanupStaleSubscriptions();
this.checkSubscriptionLimit();
this.monitorEventHealth();
} catch (error) {
this.logger?.error('Cleanup interval error', { error });
}
}, this.CLEANUP_INTERVAL);
// Ensure interval is cleaned up if process exits
process.on('beforeExit', () => {
if (this.cleanupTimeout) {
clearInterval(this.cleanupTimeout);
}
});
}
private cleanupStaleSubscriptions(): void {
const now = Date.now();
const SUBSCRIPTION_TIMEOUT = 24 * 60 * 60 * 1000; // 24 hours
for (const subscription of this.activeSubscriptions) {
if (now - subscription.createdAt > SUBSCRIPTION_TIMEOUT) {
subscription.unsubscribe();
this.logger?.debug('Removed stale subscription', {
type: subscription.type,
age: now - subscription.createdAt,
});
}
}
}
private checkSubscriptionLimit(): void {
if (this.activeSubscriptions.size > this.maxSubscriptions) {
this.logger?.warn('Subscription limit exceeded, removing oldest', {
current: this.activeSubscriptions.size,
limit: this.maxSubscriptions,
});
// Sort by creation time and remove oldest
const sortedSubs = Array.from(this.activeSubscriptions).sort(
(a, b) => a.createdAt - b.createdAt
);
const toRemove = sortedSubs.slice(0, sortedSubs.length - this.maxSubscriptions);
toRemove.forEach(sub => sub.unsubscribe());
}
}
private monitorEventHealth(): void {
const now = Date.now();
const ERROR_THRESHOLD = 5; // consecutive errors
const ERROR_WINDOW = 60000; // 1 minute
for (const [type, stats] of this.eventStats.entries()) {
if (
stats.consecutiveErrors >= ERROR_THRESHOLD &&
stats.lastErrorTime &&
now - stats.lastErrorTime < ERROR_WINDOW
) {
this.emitError('event_handler_degraded', new Error('Event handler health degraded'), {
eventType: type,
consecutiveErrors: stats.consecutiveErrors,
avgHandleTime: stats.avgHandleTime,
});
}
}
}
private cleanupStaleStats(): void {
const now = Date.now();
const STALE_THRESHOLD = 24 * 60 * 60 * 1000; // 24 hours
for (const [type, stats] of this.eventStats.entries()) {
if (stats.lastEmitted && now - stats.lastEmitted > STALE_THRESHOLD) {
this.eventStats.delete(type);
}
}
}
static async initialize(): Promise<EventManager> {
// Return existing instance if available
if (EventManager.instance) {
return EventManager.instance;
}
// If initialization is in progress, wait for it
if (EventManager.initializationPromise) {
return EventManager.initializationPromise;
}
// Start new initialization with mutex
EventManager.initializationPromise = (async () => {
try {
// Double-check instance hasn't been created while waiting
if (EventManager.instance) {
return EventManager.instance;
}
const instance = new EventManager();
instance.initialized = true;
EventManager.instance = instance;
return EventManager.instance;
} catch (error) {
throw new Error(
`Failed to initialize EventManager: ${error instanceof Error ? error.message : String(error)}`
);
} finally {
EventManager.initializationPromise = null;
}
})();
return EventManager.initializationPromise;
}
static getInstance(): EventManager {
if (!EventManager.instance || !EventManager.instance.initialized) {
throw new Error('EventManager not initialized. Call EventManager.initialize() first.');
}
return EventManager.instance;
}
async shutdown(): Promise<void> {
this.isShuttingDown = true;
// Clear cleanup interval
if (this.cleanupTimeout) {
clearInterval(this.cleanupTimeout);
this.cleanupTimeout = undefined;
}
// Clear all subscription timeouts
for (const timeout of this.subscriptionTimeouts.values()) {
clearTimeout(timeout);
}
this.subscriptionTimeouts.clear();
// Wait for active handlers to complete
await this.healthMonitor.waitForActiveHandlers();
// Cleanup resources
this.removeAllListeners();
this.eventStats.clear();
await this.batchProcessor.shutdown();
this.healthMonitor.cleanup();
this.cleanupErrorHandling();
this.isShuttingDown = false;
}
emit<T extends AtlasEvent>(
event: T,
options?: { batch?: boolean; priority?: 'high' | 'medium' | 'low' }
): boolean {
if (this.isShuttingDown) {
this.logger?.warn('Rejecting event during shutdown', {
type: event.type,
timestamp: event.timestamp,
});
return false;
}
try {
if (this.debugMode && EventManager.logger !== null) {
try {
const debugInfo: Record<string, unknown> = {
type: event.type,
timestamp: event.timestamp,
batch: options?.batch,
};
// Handle different event types' metadata/context
if ('metadata' in event) {
// Ensure metadata is serializable
debugInfo.metadata = JSON.parse(JSON.stringify(event.metadata));
} else if ('context' in event) {
// Ensure context is serializable
debugInfo.context = JSON.parse(JSON.stringify(event.context));
}
EventManager.logger.debug('Emitting event', debugInfo);
} catch {
// If debug logging fails, log a simpler message
const safeDebugInfo = {
type: event.type,
timestamp: event.timestamp,
error: 'Failed to stringify event details',
};
EventManager.logger.debug('Emitting event (simplified)', safeDebugInfo);
}
}
// Add timestamp and metadata
if (!event.timestamp) {
event.timestamp = Date.now();
}
// Update event stats
const stats = this.eventStats.get(event.type) || {
emitted: 0,
handled: 0,
errors: 0,
avgHandleTime: 0,
consecutiveErrors: 0,
};
stats.emitted++;
stats.lastEmitted = event.timestamp;
this.eventStats.set(event.type, stats);
// Check circuit breaker
if (
stats.consecutiveErrors >= 5 && // threshold
stats.lastErrorTime &&
Date.now() - stats.lastErrorTime < 60000
) {
// 1 minute window
this.logger?.warn('Circuit breaker active, rejecting event', {
type: event.type,
consecutiveErrors: stats.consecutiveErrors,
lastError: new Date(stats.lastErrorTime).toISOString(),
});
return false;
}
// Check if event should be batched
if (options?.batch) {
this.batchProcessor.addEvent(event, async events => {
const batchStartTime = Date.now();
let batchResults;
try {
batchResults = await Promise.all(
events.map(e => {
const typeResult = this.emitter.emit(e.type, e);
const wildcardResult = this.emitter.emit('*', e);
return typeResult || wildcardResult;
})
);
// Update success metrics
const successCount = batchResults.filter(Boolean).length;
if (successCount > 0) {
stats.handled += successCount;
stats.consecutiveErrors = 0;
stats.avgHandleTime = stats.avgHandleTime * 0.9 + (Date.now() - batchStartTime) * 0.1;
}
} catch (error) {
stats.errors++;
stats.consecutiveErrors++;
stats.lastErrorTime = Date.now();
throw error;
}
});
return true; // Batch queued successfully
}
// Emit event directly if not batched
const typeResult = this.emitter.emit(event.type, event);
const wildcardResult = this.emitter.emit('*', event);
// Update handled count if any listeners processed the event
if (typeResult || wildcardResult) {
stats.handled++;
}
return typeResult || wildcardResult;
} catch (error) {
if (EventManager.logger) {
EventManager.logger.error('Event emission failed', {
error,
event: {
type: event.type,
timestamp: event.timestamp,
batch: options?.batch,
},
});
// Emit error event
this.emitError('event_emission_failed', error as Error, {
eventType: event.type,
batch: options?.batch,
});
}
return false;
}
}
on<T extends AtlasEvent>(
type: EventTypes | '*',
handler: EventHandler<T>,
options: EventHandlerOptions = {}
): EventSubscription {
if (this.debugMode && EventManager.logger) {
EventManager.logger.debug('Adding event listener', { type });
}
// Create unique handler ID for health monitoring
const handlerId = `${type}_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
// Wrap handler with health monitoring
const monitoredHandler = this.healthMonitor.wrapHandler(type, handler, handlerId);
const { timeout = 5000, maxRetries = 3 } = options;
// Wrap handler with timeout and retry logic
const wrappedHandler = async (event: T) => {
let attempts = 0;
while (attempts < maxRetries) {
try {
const handlerPromise = monitoredHandler(event);
await Promise.race([
handlerPromise,
new Promise((_, reject) =>
setTimeout(() => reject(new Error('Handler timeout')), timeout)
),
]);
break;
} catch (error) {
attempts++;
const stats = this.eventStats.get(type) || {
emitted: 0,
handled: 0,
errors: 0,
avgHandleTime: 0,
consecutiveErrors: 0,
};
stats.errors++;
this.eventStats.set(type, stats);
if (EventManager.logger) {
EventManager.logger.error('Event handler error', {
error,
eventType: type,
attempt: attempts,
handlerId,
});
}
if (attempts === maxRetries) {
this.emitError('event_handler_error', error as Error, {
eventType: type,
attempts,
handlerId,
});
}
}
}
};
this.emitter.on(type, wrappedHandler);
// Create subscription with enhanced cleanup
const subscription: EventSubscription = {
unsubscribe: () => {
this.emitter.off(type, wrappedHandler);
this.activeSubscriptions.delete(subscription);
if (this.debugMode && EventManager.logger) {
EventManager.logger.debug('Removed event listener', {
type,
handlerId,
remainingListeners: this.listenerCount(type),
totalSubscriptions: this.activeSubscriptions.size,
});
}
},
type,
createdAt: Date.now(),
};
this.activeSubscriptions.add(subscription);
return subscription;
}
once<T extends AtlasEvent>(
type: EventTypes | '*',
handler: EventHandler<T>,
options: EventHandlerOptions = {}
): EventSubscription {
if (this.debugMode && EventManager.logger) {
EventManager.logger.debug('Adding one-time event listener', { type });
}
// Create unique handler ID for health monitoring
const handlerId = `${type}_once_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
// Wrap handler with health monitoring
const monitoredHandler = this.healthMonitor.wrapHandler(type, handler, handlerId);
const { timeout = 5000, maxRetries = 1 } = options;
// Wrap handler with timeout and retry logic
const wrappedHandler = async (event: T) => {
let attempts = 0;
while (attempts < maxRetries) {
try {
const handlerPromise = monitoredHandler(event);
await Promise.race([
handlerPromise,
new Promise((_, reject) =>
setTimeout(() => reject(new Error('Handler timeout')), timeout)
),
]);
break;
} catch (error) {
attempts++;
const stats = this.eventStats.get(type) || {
emitted: 0,
handled: 0,
errors: 0,
avgHandleTime: 0,
consecutiveErrors: 0,
};
stats.errors++;
this.eventStats.set(type, stats);
if (EventManager.logger) {
EventManager.logger.error('One-time event handler error', {
error,
eventType: type,
attempt: attempts,
handlerId,
});
}
if (attempts === maxRetries) {
this.emitError('event_handler_error', error as Error, {
eventType: type,
oneTime: true,
attempts,
handlerId,
});
}
}
}
};
this.emitter.once(type, wrappedHandler);
// Create subscription with enhanced cleanup
const subscription: EventSubscription = {
unsubscribe: () => {
this.emitter.off(type, wrappedHandler);
this.activeSubscriptions.delete(subscription);
if (this.debugMode && EventManager.logger) {
EventManager.logger.debug('Removed one-time event listener', {
type,
handlerId,
remainingListeners: this.listenerCount(type),
totalSubscriptions: this.activeSubscriptions.size,
});
}
},
type,
createdAt: Date.now(),
};
this.activeSubscriptions.add(subscription);
return subscription;
}
removeAllListeners(type?: EventTypes | '*'): void {
if (type) {
this.emitter.removeAllListeners(type);
// Remove matching subscriptions
for (const subscription of this.activeSubscriptions) {
if (subscription.type === type) {
this.activeSubscriptions.delete(subscription);
}
}
} else {
this.emitter.removeAllListeners();
this.activeSubscriptions.clear();
}
if (this.debugMode && EventManager.logger) {
EventManager.logger.debug('Removed listeners', {
type: type || 'all',
remainingSubscriptions: this.activeSubscriptions.size,
});
}
}
/**
* Gets event statistics for monitoring and debugging
*/
getEventStats(): Map<
EventTypes | '*',
{
emitted: number;
handled: number;
errors: number;
lastEmitted?: number;
}
> {
return new Map(this.eventStats);
}
/**
* Gets active subscription information for monitoring
*/
getActiveSubscriptions(): Array<{
type: EventTypes | '*';
createdAt: number;
age: number;
}> {
const now = Date.now();
return Array.from(this.activeSubscriptions).map(sub => ({
type: sub.type,
createdAt: sub.createdAt,
age: now - sub.createdAt,
}));
}
/**
* Gets health statistics for event handlers
*/
getHandlerHealthStats(): Map<
string,
{
successCount: number;
errorCount: number;
avgResponseTime: number;
lastExecuted?: number;
consecutiveFailures: number;
isCircuitOpen: boolean;
nextRetryTime?: number;
}
> {
return this.healthMonitor.getAllHandlerStats();
}
/**
* Manually reset circuit breaker for a handler
*/
resetHandlerCircuitBreaker(handlerId: string): void {
this.healthMonitor.resetCircuitBreaker(handlerId);
}
/**
* Cleanup resources and stop monitoring
*/
cleanup(): void {
if (this.cleanupTimeout) {
clearInterval(this.cleanupTimeout);
this.cleanupTimeout = undefined;
}
this.removeAllListeners();
this.eventStats.clear();
this.healthMonitor.cleanup();
this.batchProcessor.cleanup();
this.cleanupErrorHandling();
}
listenerCount(type: EventTypes | '*'): number {
return this.emitter.listenerCount(type);
}
private emitError(context: string, error: Error, metadata?: Record<string, unknown>): void {
try {
// Convert Error to SerializableError
const serializableError: SerializableError = {
name: error.name,
message: error.message,
stack: error.stack,
};
// Add any additional enumerable properties
for (const key of Object.keys(error)) {
try {
const value = (error as unknown as Record<string, unknown>)[key];
// Only include if JSON serializable
JSON.stringify(value);
serializableError[key] = value;
} catch {
// Skip non-serializable properties
continue;
}
}
// Ensure metadata is serializable
const safeMetadata = metadata ? JSON.parse(JSON.stringify(metadata)) : {};
const errorEvent: ErrorEvent = {
type: EventTypes.SYSTEM_ERROR,
timestamp: Date.now(),
error: serializableError,
context: {
component: 'EventManager',
operation: context,
...safeMetadata,
},
};
this.emitter.emit(EventTypes.SYSTEM_ERROR, errorEvent);
} catch (emitError) {
// Last resort error logging with minimal info to ensure it works
if (EventManager.logger) {
EventManager.logger.error('Failed to emit error event', {
errorMessage: error.message,
errorName: error.name,
context,
emitErrorMessage: emitError instanceof Error ? emitError.message : String(emitError),
});
}
}
}
// Typed event emission helpers
emitTaskEvent(event: TaskEvent): void {
this.emit(event);
}
emitCacheEvent(event: CacheEvent): void {
this.emit(event);
}
emitErrorEvent(event: ErrorEvent): void {
this.emit(event);
}
emitBatchEvent(event: BatchEvent): void {
this.emit(event, { batch: true });
}
emitTransactionEvent(event: TransactionEvent): void {
this.emit(event);
}
emitSystemEvent(event: SystemEvent): void {
this.emit(event);
}
}