Skip to main content
Glama
leader.go3.12 kB
package leader import ( "context" "fmt" "time" "github.com/weibaohui/k8m/pkg/comm/utils" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/klog/v2" ) // Config Leader 选举配置 // 包含命名空间、锁名称、选举时长、续租截止、重试周期以及领导开始/结束回调。 // LeaseDuration > RenewDeadline > RetryPeriod * 3 type Config struct { Namespace string LockName string LeaseDuration time.Duration // 租约持续时间,默认 15s RenewDeadline time.Duration // 续租截止时间,默认 10s RetryPeriod time.Duration // 重试周期,默认 2s ClusterID string // ClusterID 指定的集群唯一ID(文件名/Context),优先使用该集群的配置 OnStartedLeading func(ctx context.Context) OnStoppedLeading func() } // Run 启动 Leader 选举逻辑 // 支持集群优先级:指定 ClusterID -> InCluster -> 本地 kubeconfig。 // 仅当以上方式均不可用时,降级为本地 Leader(不进行选举)。 func Run(ctx context.Context, cfg Config) error { clientset, hasCluster, err := utils.GetClientSet(cfg.ClusterID) if err != nil { return fmt.Errorf("get clientset failed: %w", err) } if cfg.Namespace == "" { cfg.Namespace = utils.DetectNamespace() } // 非集群模式:既没有指定集群可用,也不是 InCluster,也没有本地 kubeconfig if !hasCluster { klog.V(2).Infof("[leader] 无可用的 K8s 集群,直接作为 Leader 运行(不进行选举)") if cfg.OnStartedLeading != nil { cfg.OnStartedLeading(ctx) } return nil } if cfg.LeaseDuration == 0 { cfg.LeaseDuration = 15 * time.Second } if cfg.RenewDeadline == 0 { cfg.RenewDeadline = 10 * time.Second } if cfg.RetryPeriod == 0 { cfg.RetryPeriod = 2 * time.Second } id := utils.GenerateInstanceID() klog.V(2).Infof("[leader] 我的选举 ID:%s", id) lock := &resourcelock.LeaseLock{ LeaseMeta: metav1.ObjectMeta{ Name: cfg.LockName, Namespace: cfg.Namespace, }, Client: clientset.CoordinationV1(), LockConfig: resourcelock.ResourceLockConfig{ Identity: id, }, } leaderelectionCfg := leaderelection.LeaderElectionConfig{ Lock: lock, ReleaseOnCancel: true, LeaseDuration: cfg.LeaseDuration, RenewDeadline: cfg.RenewDeadline, RetryPeriod: cfg.RetryPeriod, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: func(c context.Context) { if cfg.OnStartedLeading != nil { cfg.OnStartedLeading(c) } }, OnStoppedLeading: func() { if cfg.OnStoppedLeading != nil { cfg.OnStoppedLeading() } }, OnNewLeader: func(identity string) { if identity == id { klog.V(2).Infof("[leader] 我成为新的 Leader:%s", id) } else { klog.V(2).Infof("[leader] 选举产生新的 Leader:%s", identity) } }, }, } klog.V(2).Infof("[leader] 开始进行 Leader 选举(锁=%s/%s)", cfg.Namespace, cfg.LockName) leaderelection.RunOrDie(ctx, leaderelectionCfg) return nil }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/weibaohui/k8m'

If you have feedback or need assistance with the MCP directory API, please join our Discord server