/**
* 事件总线实现
*/
import { EventEmitter } from 'events';
import { v4 as uuidv4 } from 'uuid';
import { logger } from '@/utils/logger';
import { IEventBus, EventType, EventHandler, EventMap } from '@/types';
/**
* 事件订阅信息(EventBus内部使用)
*/
interface EventBusSubscription {
id: string;
eventType: EventType;
handler: EventHandler<any>;
filter?: (event: any) => boolean;
once?: boolean;
}
/**
* 事件总线实现类
*/
export class EventBus extends EventEmitter implements IEventBus {
private subscriptions: Map<string, EventBusSubscription> = new Map();
private maxListeners: number = 100;
constructor() {
super();
this.setEventMaxListeners(this.maxListeners);
logger.debug('EventBus initialized');
}
/**
* 发布事件
*/
publish<T extends EventType>(type: T, event: EventMap[T]): void {
try {
// 为事件添加唯一ID和时间戳(如果没有)
if (!event.id) {
(event as any).id = uuidv4();
}
if (!event.timestamp) {
(event as any).timestamp = new Date().toISOString();
}
logger.debug(`Publishing event: ${type}`, { event });
// 发送事件
this.emit(type, event);
// 记录事件发布统计
this.recordEventMetrics(type);
} catch (error) {
logger.error(`Failed to publish event: ${type}`, error as Error);
throw error;
}
}
/**
* 订阅事件
*/
subscribe<T extends EventType>(type: T, handler: EventHandler<T>): string {
try {
const subscriptionId = uuidv4();
// 创建订阅记录
const subscription: EventBusSubscription = {
id: subscriptionId,
eventType: type,
handler: handler as EventHandler<any>,
once: false
};
// 保存订阅
this.subscriptions.set(subscriptionId, subscription);
// 注册事件监听器
this.on(type, handler);
logger.debug(`Event subscription created: ${type}`, { subscriptionId });
return subscriptionId;
} catch (error) {
logger.error(`Failed to subscribe to event: ${type}`, error as Error);
throw error;
}
}
/**
* 一次性订阅事件
*/
onceEvent<T extends EventType>(type: T, handler: EventHandler<T>): string {
try {
const subscriptionId = uuidv4();
// 创建订阅记录
const subscription: EventBusSubscription = {
id: subscriptionId,
eventType: type,
handler: handler as EventHandler<any>,
once: true
};
// 保存订阅
this.subscriptions.set(subscriptionId, subscription);
// 注册一次性事件监听器
EventEmitter.prototype.once.call(this, type, handler);
logger.debug(`Once event subscription created: ${type}`, { subscriptionId });
return subscriptionId;
} catch (error) {
logger.error(`Failed to create once subscription to event: ${type}`, error as Error);
throw error;
}
}
/**
* 取消订阅
*/
unsubscribe(subscriptionId: string): void {
try {
const subscription = this.subscriptions.get(subscriptionId);
if (!subscription) {
logger.warn(`Subscription not found: ${subscriptionId}`);
return;
}
// 移除事件监听器
this.off(subscription.eventType, subscription.handler);
// 删除订阅记录
this.subscriptions.delete(subscriptionId);
logger.debug(`Event subscription removed: ${subscription.eventType}`, { subscriptionId });
} catch (error) {
logger.error(`Failed to unsubscribe: ${subscriptionId}`, error as Error);
throw error;
}
}
/**
* 清除所有订阅
*/
clear(): void {
try {
// 移除所有监听器
this.removeAllListeners();
// 清除订阅记录
this.subscriptions.clear();
logger.info('All event subscriptions cleared');
} catch (error) {
logger.error('Failed to clear event subscriptions', error as Error);
throw error;
}
}
/**
* 获取订阅信息
*/
getSubscription(subscriptionId: string): EventBusSubscription | undefined {
return this.subscriptions.get(subscriptionId);
}
/**
* 获取所有订阅
*/
getAllSubscriptions(): EventBusSubscription[] {
return Array.from(this.subscriptions.values());
}
/**
* 按事件类型获取订阅
*/
getSubscriptionsByType(eventType: EventType): EventBusSubscription[] {
return Array.from(this.subscriptions.values()).filter(
subscription => subscription.eventType === eventType
);
}
/**
* 获取订阅统计
*/
getSubscriptionStats(): Record<string, number> {
const stats: Record<string, number> = {};
for (const subscription of this.subscriptions.values()) {
stats[subscription.eventType] = (stats[subscription.eventType] || 0) + 1;
}
return stats;
}
/**
* 设置最大监听器数量
*/
setEventMaxListeners(maxListeners: number): void {
this.maxListeners = maxListeners;
EventEmitter.prototype.setMaxListeners.call(this, maxListeners);
}
/**
* 获取最大监听器数量
*/
getEventMaxListeners(): number {
return this.maxListeners;
}
/**
* 记录事件指标
*/
private recordEventMetrics(eventType: EventType): void {
// TODO: 实现事件指标记录
// 可以记录事件频率、处理延迟等
}
/**
* 处理订阅者异常
*/
private handleSubscriberError(subscriptionId: string, error: Error): void {
logger.error(`Subscriber error: ${subscriptionId}`, error);
// TODO: 实现错误处理策略
// 可以选择移除有问题的订阅者或记录错误统计
}
/**
* 创建带过滤的订阅
*/
subscribeWithFilter<T extends EventType>(
type: T,
handler: EventHandler<T>,
filter: (event: EventMap[T]) => boolean
): string {
const filteredHandler = (event: EventMap[T]) => {
try {
if (filter(event)) {
handler(event);
}
} catch (error) {
this.handleSubscriberError('filtered', error as Error);
}
};
return this.subscribe(type, filteredHandler);
}
/**
* 创建带超时的订阅
*/
subscribeWithTimeout<T extends EventType>(
type: T,
handler: EventHandler<T>,
timeoutMs: number
): string {
const subscriptionId = uuidv4();
let timeoutId: NodeJS.Timeout;
const timeoutHandler = () => {
this.unsubscribe(subscriptionId);
logger.warn(`Subscription timed out: ${subscriptionId}`);
};
const wrappedHandler = (event: EventMap[T]) => {
clearTimeout(timeoutId);
handler(event);
};
// 订阅事件
this.subscribe(type, wrappedHandler);
// 设置超时
timeoutId = setTimeout(timeoutHandler, timeoutMs);
// 保存超时ID以便清理
const subscription = this.subscriptions.get(subscriptionId);
if (subscription) {
(subscription as any).timeoutId = timeoutId;
}
return subscriptionId;
}
/**
* 销毁事件总线
*/
dispose(): void {
try {
// 清除所有订阅
this.clear();
// 移除所有监听器
this.removeAllListeners();
logger.info('EventBus disposed');
} catch (error) {
logger.error('Failed to dispose EventBus', error as Error);
}
}
}
// 导出全局事件总线实例
export const globalEventBus = new EventBus();