Skip to main content
Glama
manager.go9.4 kB
package lease import ( "context" "crypto/sha1" "fmt" "strings" "time" "github.com/weibaohui/k8m/pkg/comm/utils" "github.com/weibaohui/k8m/pkg/flag" coordinationv1 "k8s.io/api/coordination/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" ) // Manager 中文函数注释:Lease 管理器接口,负责创建/续约/删除以及监听与清理。 type Manager interface { Init(ctx context.Context, opts Options) error EnsureOnConnect(ctx context.Context, clusterID string) error EnsureOnDisconnect(ctx context.Context, clusterID string) error StartWatcher(ctx context.Context, onConnect func(string), onDisconnect func(string)) error StartLeaderCleanup(ctx context.Context) error } type manager struct { clientset *kubernetes.Clientset namespace string instanceID string durationSec int renewSec int resyncPeriod time.Duration } // NewManager 中文函数注释:创建一个 Lease 管理器实例。 func NewManager() Manager { return &manager{} } // Init 中文函数注释:初始化 Lease 管理器,设置宿主 ClientSet、命名空间与续约参数。 func (m *manager) Init(ctx context.Context, opts Options) error { klog.V(6).Infof("Init lease") cs, hasCluster, err := utils.GetClientSet(opts.ClusterID) if err != nil { klog.V(6).Infof("GetClientSet %v", err.Error()) return fmt.Errorf("初始化宿主 GetClientSet 失败: %w", err) } // 没有可用的集群,那么就无法执行这个模式了 if !hasCluster { klog.V(2).Infof("[Lease] 无可用的 K8s 集群,退出初始化") return fmt.Errorf("no available k8s cluster") } m.clientset = cs if opts.Namespace == "" { m.namespace = utils.DetectNamespace() } else { m.namespace = opts.Namespace } m.durationSec = opts.LeaseDurationSeconds if m.durationSec <= 0 { m.durationSec = 60 } m.renewSec = opts.LeaseRenewIntervalSeconds if m.renewSec <= 0 || m.renewSec >= m.durationSec { m.renewSec = m.durationSec / 3 if m.renewSec <= 0 { m.renewSec = 20 } } if opts.ResyncPeriod <= 0 { m.resyncPeriod = 30 * time.Second } else { m.resyncPeriod = opts.ResyncPeriod } m.instanceID = utils.GenerateInstanceID() klog.V(6).Infof("Lease 管理器初始化完成,ns=%s, duration=%ds, renew=%ds, instance=%s", m.namespace, m.durationSec, m.renewSec, m.instanceID) return nil } // EnsureOnConnect 中文函数注释:在连接前确保租约占有;若有效 Lease 已存在则返回提示;若不存在或已过期则创建并占有。 func (m *manager) EnsureOnConnect(ctx context.Context, clusterID string) error { klog.V(6).Infof("EnsureOnConnect %s", clusterID) if m.clientset == nil { return nil } name := m.leaseName(clusterID) lc := m.clientset.CoordinationV1().Leases(m.namespace) l, err := lc.Get(ctx, name, metav1.GetOptions{}) if err == nil { if isLeaseValid(l, m.durationSec) { klog.V(6).Infof("集群[%s]已连接,责任者:%s,跳过创建 Lease", clusterID, deref(l.Spec.HolderIdentity)) return fmt.Errorf("cluster already connected by %s", deref(l.Spec.HolderIdentity)) } // 存在但已过期:由 Leader 清理,这里不更新,直接返回 klog.V(6).Infof("集群[%s] Lease 已过期但仍存在,等待 Leader 清理", clusterID) return fmt.Errorf("lease exists but expired") } now := metav1.MicroTime{Time: time.Now()} clusterIDBase64 := utils.UrlSafeBase64Encode(clusterID) lease := &coordinationv1.Lease{ ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: m.namespace, Labels: map[string]string{ "app": "k8m", "type": "cluster-sync", "clusterID": string(clusterIDBase64), }, }, Spec: coordinationv1.LeaseSpec{ HolderIdentity: ptrString(m.instanceID), LeaseDurationSeconds: ptrInt32(int32(m.durationSec)), RenewTime: &now, }, } if _, err := lc.Create(ctx, lease, metav1.CreateOptions{}); err != nil { klog.V(6).Infof("创建集群[%s] Lease 失败:%v", clusterID, err) return err } klog.V(6).Infof("创建集群[%s] Lease 成功,由当前实例负责续约", clusterID) go m.renewLoop(context.Background(), name) return nil } // EnsureOnDisconnect 中文函数注释:断开集群时,若当前实例是责任者则删除 Lease;否则跳过删除。 func (m *manager) EnsureOnDisconnect(ctx context.Context, clusterID string) error { if m.clientset == nil { return nil } name := m.leaseName(clusterID) lc := m.clientset.CoordinationV1().Leases(m.namespace) l, err := lc.Get(ctx, name, metav1.GetOptions{}) if err == nil && deref(l.Spec.HolderIdentity) == m.instanceID { if err := lc.Delete(ctx, name, metav1.DeleteOptions{}); err != nil { klog.V(6).Infof("删除 Lease 失败:%v", err) return err } klog.V(6).Infof("删除 Lease 成功:%s", name) } return nil } // StartWatcher 中文函数注释:启动 Lease 监听器,对有效 Lease 的新增/更新触发本地连接,对删除触发本地断开。 func (m *manager) StartWatcher(ctx context.Context, onConnect func(string), onDisconnect func(string)) error { if m.clientset == nil { return nil } // 仅监听指定命名空间和标签 selector := labels.SelectorFromSet(labels.Set{"app": "k8m", "type": "cluster-sync"}) factory := informers.NewSharedInformerFactoryWithOptions(m.clientset, m.resyncPeriod, informers.WithNamespace(m.namespace), informers.WithTweakListOptions(func(lo *metav1.ListOptions) { lo.LabelSelector = selector.String() })) informer := factory.Coordination().V1().Leases().Informer() informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj any) { l := obj.(*coordinationv1.Lease) if !isLeaseValid(l, m.durationSec) { return } cid := l.Labels["clusterID"] clusterID, err := utils.UrlSafeBase64Decode(cid) if err != nil { klog.V(6).Infof("解码 Lease 标签 clusterID 失败:%v", err) return } if deref(l.Spec.HolderIdentity) == m.instanceID { return } klog.V(6).Infof("有效 Lease 新增,外部连接集群:%s", clusterID) go onConnect(clusterID) }, DeleteFunc: func(obj any) { var l *coordinationv1.Lease if dfo, ok := obj.(cache.DeletedFinalStateUnknown); ok { l, _ = dfo.Obj.(*coordinationv1.Lease) } else { l, _ = obj.(*coordinationv1.Lease) } if l == nil { return } cid := l.Labels["clusterID"] clusterID, err := utils.UrlSafeBase64Decode(cid) if err != nil { klog.V(6).Infof("解码 Lease 标签 clusterID 失败:%v", err) return } klog.V(6).Infof("Lease 删除,断开本地集群:%s", clusterID) go onDisconnect(clusterID) }, }) factory.Start(ctx.Done()) klog.V(6).Infof("Lease 监听器已启动,命名空间:%s", m.namespace) return nil } // StartLeaderCleanup 仅由 Leader 调用,定期清理过期 Lease;通过 Delete 事件驱动所有实例断开。 func (m *manager) StartLeaderCleanup(ctx context.Context) error { if m.clientset == nil { return nil } ticker := time.NewTicker(30 * time.Second) go func() { for { select { case <-ctx.Done(): ticker.Stop() return case <-ticker.C: lc := m.clientset.CoordinationV1().Leases(m.namespace) ls, err := lc.List(ctx, metav1.ListOptions{LabelSelector: "app=k8m,type=cluster-sync"}) if err != nil { klog.V(6).Infof("清理过期 Lease 失败:%v", err) continue } for _, l := range ls.Items { if !isLeaseValid(&l, m.durationSec) { _ = lc.Delete(ctx, l.Name, metav1.DeleteOptions{}) klog.V(6).Infof("删除过期 Lease:%s", l.Name) } } } } }() klog.V(6).Infof("Lease 过期清理任务启动(Leader)") return nil } func (m *manager) leaseName(clusterID string) string { cfg := flag.Init() prefix := strings.ToLower(cfg.ProductName) sum := sha1.Sum([]byte(clusterID)) return fmt.Sprintf("%s-cluster-%x", prefix, sum[:4]) } func (m *manager) renewLoop(ctx context.Context, name string) { ticker := time.NewTicker(time.Duration(m.renewSec) * time.Second) defer ticker.Stop() lc := m.clientset.CoordinationV1().Leases(m.namespace) for { select { case <-ctx.Done(): return case <-ticker.C: l, err := lc.Get(ctx, name, metav1.GetOptions{}) if err != nil { klog.V(6).Infof("续约退出:Lease 不存在或网络错误:%v", err) return } if deref(l.Spec.HolderIdentity) != m.instanceID { klog.V(6).Infof("续约退出:责任已转移至[%s]", deref(l.Spec.HolderIdentity)) return } now := metav1.MicroTime{Time: time.Now()} l.Spec.RenewTime = &now if _, err := lc.Update(ctx, l, metav1.UpdateOptions{}); err != nil { klog.V(6).Infof("续约失败:%v", err) } else { klog.V(6).Infof("续约成功:%s", name) } } } } // isLeaseValid 中文函数注释:判断 Lease 是否仍在有效期内。 func isLeaseValid(l *coordinationv1.Lease, durationSec int) bool { if l == nil || l.Spec.RenewTime == nil || l.Spec.LeaseDurationSeconds == nil { return false } d := time.Duration(durationSec) * time.Second return time.Since(l.Spec.RenewTime.Time) < d } func ptrString(s string) *string { return &s } func ptrInt32(i int32) *int32 { return &i } func deref(p *string) string { if p == nil { return "" } return *p }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/weibaohui/k8m'

If you have feedback or need assistance with the MCP directory API, please join our Discord server