Skip to main content
Glama
watcher.go6.33 kB
// Package watcher 实现Kubernetes事件监听器 package watcher import ( "context" "fmt" "time" "github.com/robfig/cron/v3" utils2 "github.com/weibaohui/k8m/pkg/comm/utils" "github.com/weibaohui/k8m/pkg/eventhandler/config" "github.com/weibaohui/k8m/pkg/models" "github.com/weibaohui/k8m/pkg/service" "github.com/weibaohui/kom/kom" eventsv1 "k8s.io/api/events/v1" "k8s.io/apimachinery/pkg/watch" "k8s.io/klog/v2" ) // EventWatcher 事件监听器 type EventWatcher struct { cfg *config.EventHandlerConfig eventCh chan *models.K8sEvent ctx context.Context cancel context.CancelFunc } // NewEventWatcher 创建事件监听器 func NewEventWatcher() *EventWatcher { ctx, cancel := context.WithCancel(context.Background()) cfg := config.DefaultEventHandlerConfig() return &EventWatcher{ cfg: cfg, eventCh: make(chan *models.K8sEvent, cfg.Watcher.BufferSize), ctx: ctx, cancel: cancel, } } // Start 启动事件监听器 func (w *EventWatcher) Start() { if w.cfg.Enabled { klog.V(6).Infof("启动事件监听器") // 启动事件处理goroutine go w.processEvents() // 启动事件监听 go w.watchEvents() } else { klog.V(6).Infof("事件转发功能未开启") } } // Stop 停止事件监听器 func (w *EventWatcher) Stop() { if w.cfg.Enabled { klog.V(6).Infof("停止事件监听器") w.cancel() // close(w.eventCh) // 不主动关闭 eventCh,避免并发写入导致 panic; // 依赖 w.ctx.Done() 让 processEvents 与 HandleEvent 自然退出。 } } // watchEvents 监听Kubernetes事件 func (w *EventWatcher) watchEvents() { for { select { case <-w.ctx.Done(): return default: if err := w.doWatch(); err != nil { klog.Errorf("事件监听失败: %v", err) time.Sleep(5 * time.Second) // 失败后等待5秒重试 } } } } // doWatch 执行事件监听 func (w *EventWatcher) doWatch() error { // 中文函数注释:使用定时任务每分钟检查所有已连接集群,未开启事件Watch则为其启动,并将告警事件入队处理 klog.V(6).Infof("开始监听Kubernetes事件") inst := cron.New() _, err := inst.AddFunc("@every 1m", func() { clusters := service.ClusterService().ConnectedClusters() for _, cluster := range clusters { if !cluster.GetClusterWatchStatus("event") { selectedCluster := service.ClusterService().ClusterID(cluster) watcher := w.watchSingleCluster(selectedCluster) if watcher != nil { cluster.SetClusterWatchStarted("event", watcher) } } } }) if err != nil { klog.Errorf("新增Event状态定时更新任务报错: %v\n", err) } inst.Start() klog.V(6).Infof("新增Event状态定时更新任务【@every 1m】\n") <-w.ctx.Done() inst.Stop() return nil } // watchSingleCluster 启动单个集群的事件监听 func (w *EventWatcher) watchSingleCluster(selectedCluster string) watch.Interface { ctx := utils2.GetContextWithAdminFromCtx(w.ctx) var watcher watch.Interface var evt eventsv1.Event err := kom.Cluster(selectedCluster).WithContext(ctx).Resource(&evt).AllNamespace().Watch(&watcher).Error if err != nil { klog.Errorf("%s 创建Event监听器失败 %v", selectedCluster, err) return nil } go func() { klog.V(6).Infof("%s 开始事件监听", selectedCluster) defer watcher.Stop() for e := range watcher.ResultChan() { if err := kom.Cluster(selectedCluster).WithContext(ctx).Tools().ConvertRuntimeObjectToTypedObject(e.Object, &evt); err != nil { klog.V(6).Infof("%s 无法将对象转换为 *events.v1.Event 类型: %v", selectedCluster, err) return } m := &models.K8sEvent{ Type: evt.Type, Reason: evt.Reason, Cluster: selectedCluster, Level: evt.Type, Namespace: evt.Regarding.Namespace, Name: evt.Regarding.Name, Message: evt.Note, Timestamp: func() time.Time { // 事件时间优先使用 EventTime;若为零值则回退到对象创建时间;仍为空则使用当前时间 if !evt.EventTime.IsZero() { return evt.EventTime.Time } if !evt.ObjectMeta.CreationTimestamp.IsZero() { return evt.ObjectMeta.CreationTimestamp.Time } return time.Now() }(), Processed: false, Attempts: 0, EvtKey: string(evt.UID), } klog.V(6).Infof("") if err := w.HandleEvent(m); err != nil { klog.V(6).Infof("%s 事件处理失败: %v", selectedCluster, err) } else { klog.V(6).Infof("%s 入队事件 [ %s/%s ] 类型=%s 原因=%s", selectedCluster, m.Namespace, m.Name, m.Type, m.Reason) } } }() return watcher } // HandleEvent 处理单个事件(供外部调用) func (w *EventWatcher) HandleEvent(event *models.K8sEvent) error { if event == nil { return fmt.Errorf("事件不能为空") } if !w.shouldProcessEvent(event) { klog.V(6).Infof("事件 %s 不满足规则,跳过", event.EvtKey) return nil } timer := time.NewTimer(1 * time.Second) defer timer.Stop() // 中文函数注释:将事件发送到处理通道;若通道满则每1秒重试,直至发送成功或监听器停止 for { // 优先响应停止信号,避免在通道关闭后发送导致panic select { case <-w.ctx.Done(): return fmt.Errorf("监听器已停止,取消事件发送: %s", event.EvtKey) default: } select { case w.eventCh <- event: return nil default: // 通道满,等待一秒后重试 klog.V(6).Infof("事件通道繁忙,等待重试发送: %s", event.EvtKey) select { case <-timer.C: // 继续下一轮重试 case <-w.ctx.Done(): if !timer.Stop() { <-timer.C } return fmt.Errorf("监听器已停止,取消事件发送: %s", event.EvtKey) } } } } // processEvents 处理接收到的事件 func (w *EventWatcher) processEvents() { for { select { case <-w.ctx.Done(): return case event, ok := <-w.eventCh: if !ok { return } // 初步过滤事件 if err := event.SaveEvent(); err != nil { klog.Errorf("存储/更新事件失败: %v", err) } else { klog.V(6).Infof("事件存储成功: %s", event.EvtKey) } } } } // shouldProcessEvent 判断是否应该处理事件 func (w *EventWatcher) shouldProcessEvent(event *models.K8sEvent) bool { // 只处理警告类型事件 if !event.IsWarning() { return false } return true }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/weibaohui/k8m'

If you have feedback or need assistance with the MCP directory API, please join our Discord server