Skip to main content
Glama
notification.service.ts17 kB
import { Injectable, Logger } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; import { EventEmitter2, OnEvent } from '@nestjs/event-emitter'; import { Alert, AlertSeverity } from './alert.service'; export interface NotificationChannel { id: string; name: string; type: NotificationChannelType; enabled: boolean; config: Record<string, any>; filters?: NotificationFilter[]; } export enum NotificationChannelType { EMAIL = 'email', WEBHOOK = 'webhook', SLACK = 'slack', TEAMS = 'teams', DISCORD = 'discord', SMS = 'sms', WEBSOCKET = 'websocket', } export interface NotificationFilter { field: string; operator: 'eq' | 'ne' | 'in' | 'nin' | 'gt' | 'lt' | 'gte' | 'lte'; value: any; } export interface Notification { id: string; channelId: string; alertId: string; title: string; message: string; severity: AlertSeverity; timestamp: Date; status: NotificationStatus; attempts: number; lastAttempt?: Date; error?: string; metadata?: Record<string, any>; } export enum NotificationStatus { PENDING = 'pending', SENT = 'sent', FAILED = 'failed', RETRYING = 'retrying', } @Injectable() export class NotificationService { private readonly logger = new Logger(NotificationService.name); private readonly channels = new Map<string, NotificationChannel>(); private readonly notifications = new Map<string, Notification>(); private readonly maxRetries = 3; private readonly retryDelay = 5000; // 5秒 constructor( private readonly configService: ConfigService, private readonly eventEmitter: EventEmitter2, ) { this.initializeDefaultChannels(); } /** * 添加通知渠道 */ addChannel(channel: NotificationChannel): void { this.channels.set(channel.id, channel); this.logger.log(`Notification channel added: ${channel.name} (${channel.type})`); } /** * 移除通知渠道 */ removeChannel(channelId: string): boolean { const removed = this.channels.delete(channelId); if (removed) { this.logger.log(`Notification channel removed: ${channelId}`); } return removed; } /** * 获取所有通知渠道 */ getChannels(): NotificationChannel[] { return Array.from(this.channels.values()); } /** * 更新通知渠道 */ updateChannel(channelId: string, updates: Partial<NotificationChannel>): boolean { const channel = this.channels.get(channelId); if (!channel) { return false; } Object.assign(channel, updates); this.logger.log(`Notification channel updated: ${channelId}`); return true; } /** * 发送通知 */ async sendNotification(alert: Alert, channelIds?: string[]): Promise<void> { const targetChannels = channelIds ? channelIds.map(id => this.channels.get(id)).filter(Boolean) as NotificationChannel[] : Array.from(this.channels.values()).filter(channel => channel.enabled); for (const channel of targetChannels) { // 检查过滤器 if (!this.shouldSendToChannel(alert, channel)) { continue; } const notification: Notification = { id: this.generateNotificationId(), channelId: channel.id, alertId: alert.id, title: alert.title, message: this.formatMessage(alert, channel), severity: alert.severity, timestamp: new Date(), status: NotificationStatus.PENDING, attempts: 0, }; this.notifications.set(notification.id, notification); await this.sendToChannel(notification, channel); } } /** * 获取通知历史 */ getNotifications(filters?: { channelId?: string; alertId?: string; status?: NotificationStatus; severity?: AlertSeverity; limit?: number; }): Notification[] { let notifications = Array.from(this.notifications.values()); if (filters) { if (filters.channelId) { notifications = notifications.filter(n => n.channelId === filters.channelId); } if (filters.alertId) { notifications = notifications.filter(n => n.alertId === filters.alertId); } if (filters.status) { notifications = notifications.filter(n => n.status === filters.status); } if (filters.severity) { notifications = notifications.filter(n => n.severity === filters.severity); } } // 按时间倒序排列 notifications.sort((a, b) => b.timestamp.getTime() - a.timestamp.getTime()); if (filters?.limit) { notifications = notifications.slice(0, filters.limit); } return notifications; } /** * 获取通知统计 */ getNotificationStats(): { total: number; byStatus: Record<NotificationStatus, number>; byChannel: Record<string, number>; bySeverity: Record<AlertSeverity, number>; successRate: number; } { const notifications = Array.from(this.notifications.values()); const stats = { total: notifications.length, byStatus: { [NotificationStatus.PENDING]: 0, [NotificationStatus.SENT]: 0, [NotificationStatus.FAILED]: 0, [NotificationStatus.RETRYING]: 0, }, byChannel: {} as Record<string, number>, bySeverity: { [AlertSeverity.INFO]: 0, [AlertSeverity.WARNING]: 0, [AlertSeverity.ERROR]: 0, [AlertSeverity.CRITICAL]: 0, }, successRate: 0, }; let successCount = 0; notifications.forEach(notification => { stats.byStatus[notification.status]++; stats.byChannel[notification.channelId] = (stats.byChannel[notification.channelId] || 0) + 1; stats.bySeverity[notification.severity]++; if (notification.status === NotificationStatus.SENT) { successCount++; } }); stats.successRate = notifications.length > 0 ? (successCount / notifications.length) * 100 : 0; return stats; } /** * 监听告警创建事件 */ @OnEvent('alert.created') async handleAlertCreated(alert: Alert): Promise<void> { await this.sendNotification(alert); } /** * 监听告警确认事件 */ @OnEvent('alert.acknowledged') async handleAlertAcknowledged(alert: Alert): Promise<void> { // 发送确认通知(可选) const acknowledgeChannels = Array.from(this.channels.values()) .filter(channel => channel.enabled && channel.config.notifyOnAcknowledge); if (acknowledgeChannels.length > 0) { const acknowledgeAlert = { ...alert, title: `Alert Acknowledged: ${alert.title}`, message: `Alert has been acknowledged by ${alert.acknowledgedBy}`, }; await this.sendNotification(acknowledgeAlert, acknowledgeChannels.map(c => c.id)); } } /** * 监听告警解决事件 */ @OnEvent('alert.resolved') async handleAlertResolved(alert: Alert): Promise<void> { // 发送解决通知(可选) const resolveChannels = Array.from(this.channels.values()) .filter(channel => channel.enabled && channel.config.notifyOnResolve); if (resolveChannels.length > 0) { const resolveAlert = { ...alert, title: `Alert Resolved: ${alert.title}`, message: `Alert has been resolved`, }; await this.sendNotification(resolveAlert, resolveChannels.map(c => c.id)); } } /** * 重试失败的通知 */ async retryFailedNotifications(): Promise<void> { const failedNotifications = Array.from(this.notifications.values()) .filter(n => n.status === NotificationStatus.FAILED && n.attempts < this.maxRetries); for (const notification of failedNotifications) { const channel = this.channels.get(notification.channelId); if (channel) { notification.status = NotificationStatus.RETRYING; await this.sendToChannel(notification, channel); } } } /** * 清理过期通知 */ cleanupExpiredNotifications(maxAge: number = 30 * 24 * 60 * 60 * 1000): number { // 默认30天 const cutoffTime = new Date(Date.now() - maxAge); let cleanedCount = 0; for (const [id, notification] of this.notifications.entries()) { if (notification.timestamp < cutoffTime) { this.notifications.delete(id); cleanedCount++; } } if (cleanedCount > 0) { this.logger.log(`Cleaned up ${cleanedCount} expired notifications`); } return cleanedCount; } /** * 发送到指定渠道 */ private async sendToChannel(notification: Notification, channel: NotificationChannel): Promise<void> { notification.attempts++; notification.lastAttempt = new Date(); try { switch (channel.type) { case NotificationChannelType.WEBSOCKET: await this.sendWebSocketNotification(notification, channel); break; case NotificationChannelType.WEBHOOK: await this.sendWebhookNotification(notification, channel); break; case NotificationChannelType.EMAIL: await this.sendEmailNotification(notification, channel); break; case NotificationChannelType.SLACK: await this.sendSlackNotification(notification, channel); break; default: throw new Error(`Unsupported notification channel type: ${channel.type}`); } notification.status = NotificationStatus.SENT; this.logger.log(`Notification sent successfully: ${notification.id} via ${channel.type}`); // 发送成功事件 this.eventEmitter.emit('notification.sent', notification); } catch (error) { notification.error = error.message; notification.status = notification.attempts >= this.maxRetries ? NotificationStatus.FAILED : NotificationStatus.RETRYING; this.logger.error(`Failed to send notification ${notification.id} via ${channel.type}:`, error); // 发送失败事件 this.eventEmitter.emit('notification.failed', notification); // 如果还有重试机会,安排重试 if (notification.attempts < this.maxRetries) { setTimeout(() => { this.sendToChannel(notification, channel); }, this.retryDelay * notification.attempts); } } } /** * 发送WebSocket通知 */ private async sendWebSocketNotification(notification: Notification, channel: NotificationChannel): Promise<void> { // 通过事件发送WebSocket通知 this.eventEmitter.emit('websocket.notification', { type: 'alert', data: { id: notification.id, alertId: notification.alertId, title: notification.title, message: notification.message, severity: notification.severity, timestamp: notification.timestamp, }, }); } /** * 发送Webhook通知 */ private async sendWebhookNotification(notification: Notification, channel: NotificationChannel): Promise<void> { const { url, method = 'POST', headers = {} } = channel.config; if (!url) { throw new Error('Webhook URL is required'); } const payload = { id: notification.id, alertId: notification.alertId, title: notification.title, message: notification.message, severity: notification.severity, timestamp: notification.timestamp, }; const response = await fetch(url, { method, headers: { 'Content-Type': 'application/json', ...headers, }, body: JSON.stringify(payload), }); if (!response.ok) { throw new Error(`Webhook request failed: ${response.status} ${response.statusText}`); } } /** * 发送邮件通知 */ private async sendEmailNotification(notification: Notification, channel: NotificationChannel): Promise<void> { // TODO: 实现邮件发送逻辑 // 这里可以集成邮件服务提供商(如SendGrid、AWS SES等) this.logger.warn('Email notification not implemented yet'); throw new Error('Email notification not implemented'); } /** * 发送Slack通知 */ private async sendSlackNotification(notification: Notification, channel: NotificationChannel): Promise<void> { const { webhookUrl } = channel.config; if (!webhookUrl) { throw new Error('Slack webhook URL is required'); } const color = this.getSeverityColor(notification.severity); const payload = { attachments: [{ color, title: notification.title, text: notification.message, fields: [ { title: 'Severity', value: notification.severity.toUpperCase(), short: true, }, { title: 'Time', value: notification.timestamp.toISOString(), short: true, }, ], footer: 'MCP Swagger Monitor', ts: Math.floor(notification.timestamp.getTime() / 1000), }], }; const response = await fetch(webhookUrl, { method: 'POST', headers: { 'Content-Type': 'application/json', }, body: JSON.stringify(payload), }); if (!response.ok) { throw new Error(`Slack webhook request failed: ${response.status} ${response.statusText}`); } } /** * 检查是否应该发送到指定渠道 */ private shouldSendToChannel(alert: Alert, channel: NotificationChannel): boolean { if (!channel.enabled) { return false; } if (!channel.filters || channel.filters.length === 0) { return true; } return channel.filters.every(filter => this.evaluateFilter(alert, filter)); } /** * 评估过滤器 */ private evaluateFilter(alert: Alert, filter: NotificationFilter): boolean { const fieldValue = this.getFieldValue(alert, filter.field); switch (filter.operator) { case 'eq': return fieldValue === filter.value; case 'ne': return fieldValue !== filter.value; case 'in': return Array.isArray(filter.value) && filter.value.includes(fieldValue); case 'nin': return Array.isArray(filter.value) && !filter.value.includes(fieldValue); case 'gt': return fieldValue > filter.value; case 'lt': return fieldValue < filter.value; case 'gte': return fieldValue >= filter.value; case 'lte': return fieldValue <= filter.value; default: return true; } } /** * 获取字段值 */ private getFieldValue(alert: Alert, field: string): any { const fields = field.split('.'); let value: any = alert; for (const f of fields) { value = value?.[f]; } return value; } /** * 格式化消息 */ private formatMessage(alert: Alert, channel: NotificationChannel): string { const template = channel.config.messageTemplate || '{message}'; return template .replace('{title}', alert.title) .replace('{message}', alert.message) .replace('{severity}', alert.severity) .replace('{type}', alert.type) .replace('{serverName}', alert.serverName || 'Unknown') .replace('{timestamp}', alert.timestamp.toISOString()); } /** * 获取严重程度颜色 */ private getSeverityColor(severity: AlertSeverity): string { switch (severity) { case AlertSeverity.INFO: return 'good'; case AlertSeverity.WARNING: return 'warning'; case AlertSeverity.ERROR: return 'danger'; case AlertSeverity.CRITICAL: return '#ff0000'; default: return '#cccccc'; } } /** * 生成通知ID */ private generateNotificationId(): string { return `notification-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`; } /** * 初始化默认通知渠道 */ private initializeDefaultChannels(): void { // WebSocket 渠道(默认启用) this.addChannel({ id: 'websocket-default', name: 'WebSocket Notifications', type: NotificationChannelType.WEBSOCKET, enabled: true, config: { messageTemplate: '[{severity}] {title}: {message}', }, }); // 从配置中加载其他渠道 const webhookUrl = this.configService.get<string>('WEBHOOK_NOTIFICATION_URL'); if (webhookUrl) { this.addChannel({ id: 'webhook-default', name: 'Default Webhook', type: NotificationChannelType.WEBHOOK, enabled: true, config: { url: webhookUrl, method: 'POST', }, }); } const slackWebhookUrl = this.configService.get<string>('SLACK_WEBHOOK_URL'); if (slackWebhookUrl) { this.addChannel({ id: 'slack-default', name: 'Default Slack', type: NotificationChannelType.SLACK, enabled: true, config: { webhookUrl: slackWebhookUrl, }, filters: [ { field: 'severity', operator: 'in', value: [AlertSeverity.ERROR, AlertSeverity.CRITICAL], }, ], }); } this.logger.log('Default notification channels initialized'); } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/zaizaizhao/mcp-swagger-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server