clusters.go•47.2 kB
package service
import (
"context"
"encoding/base64"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/duke-git/lancet/v2/slice"
"github.com/robfig/cron/v3"
"github.com/weibaohui/k8m/internal/dao"
"github.com/weibaohui/k8m/pkg/comm/utils"
"github.com/weibaohui/k8m/pkg/constants"
"github.com/weibaohui/k8m/pkg/flag"
"github.com/weibaohui/k8m/pkg/k8sgpt/analysis"
"github.com/weibaohui/k8m/pkg/models"
"github.com/weibaohui/kom/kom"
komaws "github.com/weibaohui/kom/kom/aws"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
)
type clusterService struct {
clusterConfigs []*ClusterConfig // 文件名+context名称 -> 集群配置
AggregateDelaySeconds int // 聚合延迟时间
callbackRegisterFunc func(cluster *ClusterConfig) func() // 用来注册回调参数的回调方法
// 心跳管理
heartbeatCancel sync.Map // 心跳取消函数,改为sync.Map
HeartbeatIntervalSeconds int // 心跳间隔秒数,默认30
HeartbeatFailureThreshold int // 心跳失败阈值,默认3
// 自动重连管理
reconnectCancel sync.Map // 自动重连取消函数,改为sync.Map
ReconnectMaxIntervalSeconds int // 自动重连最大退避秒数,默认3600
MaxRetryAttempts int // 最大重试次数,默认100次
}
func newClusterService() *clusterService {
cfg := flag.Init()
// Service.ClusterService()使用了init启动,那么会优先于main函数中的执行逻辑(config update from db)
// 导致return 实例的时候,使用的是cfg中的默认值
// 因此我们在下面加载下数据库中的配置,确保在后台管理界面中设置的值,是生效的
_ = ConfigService().UpdateFlagFromDBConfig()
return &clusterService{
clusterConfigs: []*ClusterConfig{},
AggregateDelaySeconds: 61,
HeartbeatIntervalSeconds: cfg.HeartbeatIntervalSeconds,
HeartbeatFailureThreshold: cfg.HeartbeatFailureThreshold,
ReconnectMaxIntervalSeconds: cfg.ReconnectMaxIntervalSeconds,
MaxRetryAttempts: cfg.MaxRetryAttempts,
}
}
func (c *clusterService) UpdateHeartbeatSettings() {
cfg := flag.Init()
c.HeartbeatIntervalSeconds = cfg.HeartbeatIntervalSeconds
c.HeartbeatFailureThreshold = cfg.HeartbeatFailureThreshold
c.ReconnectMaxIntervalSeconds = cfg.ReconnectMaxIntervalSeconds
c.MaxRetryAttempts = cfg.MaxRetryAttempts
klog.V(4).Infof("更新集群心跳和重连配置:心跳间隔 %d 秒,心跳失败阈值 %d,重连最大间隔 %d 秒,最大重试次数 %d",
c.HeartbeatIntervalSeconds, c.HeartbeatFailureThreshold, c.ReconnectMaxIntervalSeconds, c.MaxRetryAttempts)
}
func (c *clusterService) SetRegisterCallbackFunc(callback func(cluster *ClusterConfig) func()) {
c.callbackRegisterFunc = callback
}
type ClusterConfig struct {
ClusterID string `json:"cluster_id,omitempty"` // 自动生成,不要赋值
ClusterIDBase64 string `json:"cluster_id_base64,omitempty"` // 自动生成,不要赋值
FileName string `json:"fileName,omitempty"` // kubeconfig 文件名称
ContextName string `json:"contextName,omitempty"` // context名称
ClusterName string `json:"clusterName,omitempty"` // 集群名称
Server string `json:"server,omitempty"` // 集群地址
ServerVersion string `json:"serverVersion,omitempty"` // 通过这个值来判断集群是否可用
HeartbeatHistory []HeartbeatRecord `json:"heartbeat_history,omitempty"`
UserName string `json:"userName,omitempty"` // 用户名
Namespace string `json:"namespace,omitempty"` // kubeconfig 限制Namespace
Err string `json:"err,omitempty"` // 连接错误信息
NodeStatusAggregated bool `json:"nodeStatusAggregated,omitempty"` // 是否已聚合节点状态
PodStatusAggregated bool `json:"podStatusAggregated,omitempty"` // 是否已聚合容器组状态
PVCStatusAggregated bool `json:"pvcStatusAggregated,omitempty"` // 是否已聚合pcv状态
PVStatusAggregated bool `json:"pvStatusAggregated,omitempty"` // 是否已聚合pv状态
IngressStatusAggregated bool `json:"ingressStatusAggregated,omitempty"` // 是否已聚合ingress状态
ClusterConnectStatus constants.ClusterConnectStatus `json:"clusterConnectStatus,omitempty"` // 集群连接状态
IsInCluster bool `json:"isInCluster,omitempty"` // 是否为集群内运行获取到的配置
watchStatus sync.Map // watch 类型为key,比如pod,deploy,node,pvc,sc
restConfig *rest.Config // 直连rest.Config
kubeConfig []byte // 集群配置.kubeconfig原始文件内容
Source ClusterConfigSource `json:"source,omitempty"` // 配置文件来源
K8sGPTProblemsCount int `json:"k8s_gpt_problems_count,omitempty"` // k8sGPT 扫描结果
K8sGPTProblemsResult *analysis.ResultWithStatus `json:"k8s_gpt_problems,omitempty"` // k8sGPT 扫描结果
NotAfter *time.Time `json:"not_after,omitempty"`
AWSConfig *komaws.EKSAuthConfig `json:"aws_config,omitempty"` // AWS EKS配置信息
IsAWSEKS bool `json:"is_aws_eks,omitempty"` // 标识是否为AWS EKS集群
// kom 集群注册配置项
DBID uint `json:"id,omitempty"` // 数据库ID
ProxyURL string `json:"proxy_url,omitempty"` // HTTP 代理,例如 http://127.0.0.1:7890
Timeout int `json:"timeout,omitempty"` // 请求超时时间,单位为秒,默认为 30 秒
QPS float32 `json:"qps,omitempty"` // 每秒查询数限制,默认为 200
Burst int `json:"burst,omitempty"` // 突发请求数限制,默认为 2000
}
type ClusterConfigSource string
var ClusterConfigSourceFile ClusterConfigSource = "File"
var ClusterConfigSourceDB ClusterConfigSource = "DB"
var ClusterConfigSourceInCluster ClusterConfigSource = "InCluster"
var ClusterConfigSourceAWS ClusterConfigSource = "AWS"
// 记录每个集群的watch 启动情况
// watch 有多种类型,需要记录
type clusterWatchStatus struct {
WatchType string `json:"watchType,omitempty"`
Started bool `json:"started,omitempty"`
StartedTime time.Time `json:"startedTime,omitempty"`
Watcher watch.Interface `json:"-"`
}
// HeartbeatRecord 心跳结果记录条目
// 中文说明:index 为在当前窗口中的位置(1..N),success 表示本次心跳是否成功,time 为发生时间(本地时区)
type HeartbeatRecord struct {
Index int `json:"index"`
Success bool `json:"success"`
Time string `json:"time"`
}
// appendHeartbeatRecord 追加一条心跳记录,并裁剪为阈值长度
// 中文函数注释:将成功/失败结果与本地时间写入 HeartbeatHistory,保持长度不超过阈值
func (c *clusterService) appendHeartbeatRecord(cluster *ClusterConfig, success bool, ts time.Time) {
if cluster == nil {
return
}
if cluster.HeartbeatHistory == nil {
cluster.HeartbeatHistory = make([]HeartbeatRecord, 0)
}
// 追加一条记录
rec := HeartbeatRecord{
Success: success,
Time: ts.Local().Format("2006-01-02 15:04:05"),
}
cluster.HeartbeatHistory = append(cluster.HeartbeatHistory, rec)
// 裁剪为最近阈值条目
threshold := c.HeartbeatFailureThreshold
if threshold <= 0 {
threshold = 3 // 兜底:默认 3 次
}
if len(cluster.HeartbeatHistory) > threshold {
cluster.HeartbeatHistory = cluster.HeartbeatHistory[len(cluster.HeartbeatHistory)-threshold:]
}
// 重新标注窗口内的序号为 1..N
for i := range cluster.HeartbeatHistory {
cluster.HeartbeatHistory[i].Index = i + 1
}
}
// SetClusterWatchStarted 设置集群Watch启动状态
func (c *ClusterConfig) SetClusterWatchStarted(watchType string, watcher watch.Interface) {
c.watchStatus.Store(watchType, &clusterWatchStatus{
WatchType: watchType,
Started: true,
StartedTime: time.Now(),
Watcher: watcher,
})
}
// GetClusterWatchStatus 获取集群Watch状态
func (c *ClusterConfig) GetClusterWatchStatus(watchType string) bool {
if value, ok := c.watchStatus.Load(watchType); ok {
if watcher, ok := value.(*clusterWatchStatus); ok {
return watcher.Started
}
}
return false
}
func (c *ClusterConfig) GetKubeconfig() string {
return string(c.kubeConfig)
}
// GetClusterID 根据ClusterConfig,按照 文件名+context名称 获取clusterID
func (c *ClusterConfig) GetClusterID() string {
// 原有逻辑
id := fmt.Sprintf("%s/%s", c.FileName, c.ContextName)
if c.IsInCluster {
id = "InCluster"
}
if id == "InCluster/InCluster" {
id = "InCluster"
}
c.ClusterID = id
c.ClusterIDBase64 = base64.StdEncoding.EncodeToString([]byte(id))
return id
}
func (c *ClusterConfig) GetRestConfig() *rest.Config {
return c.restConfig
}
// ClusterID 根据ClusterConfig,按照 文件名+context名称 获取clusterID
func (c *clusterService) ClusterID(clusterConfig *ClusterConfig) string {
return clusterConfig.GetClusterID()
}
// GetClusterByID 获取ClusterConfig
func (c *clusterService) GetClusterByID(id string) *ClusterConfig {
if id == "" {
return nil
}
if id == "InCluster" {
// InCluster 并没有使用ClusterConfig
predicate := func(index int, item *ClusterConfig) bool {
return item.IsInCluster
}
if v, ok := slice.FindBy(c.clusterConfigs, predicate); ok {
return v
}
}
// 解析selectedCluster
// 第一个/前面的字符是fileName。其他的都是contextName
// 有可能会出现多个/,如config/aws-x-x-x/demo
slashIndex := strings.Index(id, "/")
if slashIndex == -1 {
return nil
}
fileName := id[:slashIndex]
contextName := id[slashIndex+1:]
for _, clusterConfig := range c.clusterConfigs {
if clusterConfig.FileName == fileName && clusterConfig.ContextName == contextName {
return clusterConfig
}
}
return nil
}
// GetCertificateExpiry 获取集群证书的过期时间
func (c *ClusterConfig) GetCertificateExpiry() time.Time {
// 检查 kubeConfig 是否为空
if len(c.kubeConfig) == 0 {
klog.V(8).Infof("设置NotAfter, 集群[%s] kubeConfig为空", c.ClusterID)
return time.Time{}
}
config, err := clientcmd.Load(c.kubeConfig)
if err != nil {
klog.V(8).Infof("设置NotAfter, 解析文件[%s]失败: %v", c.ClusterID, err)
return time.Time{}
}
// 检查 config 是否为空
if config == nil {
klog.V(8).Infof("设置NotAfter, 集群[%s] config为空", c.ClusterID)
return time.Time{}
}
// 检查 CurrentContext 是否为空
if config.CurrentContext == "" {
klog.V(8).Infof("设置NotAfter, 集群[%s] CurrentContext为空", c.ClusterID)
return time.Time{}
}
// 检查 Contexts 是否为空
if config.Contexts == nil {
klog.V(8).Infof("设置NotAfter, 集群[%s] Contexts为空", c.ClusterID)
return time.Time{}
}
// 检查当前 context 是否存在
currentContext, contextExists := config.Contexts[config.CurrentContext]
if !contextExists || currentContext == nil {
klog.V(8).Infof("设置NotAfter, 集群[%s] 当前context[%s]不存在", c.ClusterID, config.CurrentContext)
return time.Time{}
}
// 检查 AuthInfos 是否为空
if config.AuthInfos == nil {
klog.V(8).Infof("设置NotAfter, 集群[%s] AuthInfos为空", c.ClusterID)
return time.Time{}
}
// 检查 AuthInfo 名称是否为空
if currentContext.AuthInfo == "" {
klog.V(8).Infof("设置NotAfter, 集群[%s] AuthInfo名称为空", c.ClusterID)
return time.Time{}
}
// 获取 authInfo
authInfo, exists := config.AuthInfos[currentContext.AuthInfo]
if !exists || authInfo == nil {
klog.V(8).Infof("设置NotAfter, 集群[%s] authInfo[%s]不存在", c.ClusterID, currentContext.AuthInfo)
return time.Time{}
}
// 检查证书数据是否为空
if len(authInfo.ClientCertificateData) == 0 {
klog.V(8).Infof("设置NotAfter, 集群[%s] ClientCertificateData为空", c.ClusterID)
return time.Time{}
}
// 解析证书
cert, err := utils.ParseCertificate(authInfo.ClientCertificateData)
if err != nil {
klog.V(8).Infof("设置NotAfter, 集群[%s]解析证书失败: %v", c.ClusterID, err)
return time.Time{}
}
// 检查证书是否为空
if cert == nil {
klog.V(8).Infof("设置NotAfter, 集群[%s]解析出的证书为空", c.ClusterID)
return time.Time{}
}
return cert.NotAfter.Local()
}
// IsConnected 判断集群是否连接
func (c *clusterService) IsConnected(selectedCluster string) bool {
cluster := c.GetClusterByID(selectedCluster)
if cluster == nil {
return false
}
if cluster.ClusterConnectStatus == "" {
return false
}
// 加强语义:必须已成功获取过 ServerVersion 才认为“已连接”
connected := cluster.ClusterConnectStatus == constants.ClusterConnectStatusConnected && cluster.ServerVersion != ""
return connected
}
func (c *clusterService) DelayStartFunc(f func()) {
// 延迟启动cron
// 设置一次性任务的执行时间,例如 5 秒后执行
schedule := utils.DelayStartSchedule(c.AggregateDelaySeconds)
cronInstance := cron.New()
_, err := cronInstance.AddFunc(schedule, f)
if err != nil {
klog.Errorf("延迟方法注册失败%v", err)
return
}
cronInstance.Start()
klog.V(6).Infof("延迟启动cron %ds: %s", c.AggregateDelaySeconds, schedule)
}
// Connect 重新连接集群
// 中文函数注释:尝试连接指定集群。仅在集群不在"已连接"或"连接中"状态时执行实际连接操作。
// 连接过程会先清理旧的连接资源,再尝试重新注册。注意此函数不负责重试逻辑,重试由上层的自动重连循环处理。
func (c *clusterService) Connect(clusterID string) {
klog.V(4).Infof("连接集群 %s 开始", clusterID)
cc := c.GetClusterByID(clusterID)
if cc == nil {
klog.V(4).Infof("集群[%s] 不存在,无法连接", clusterID)
return
}
// 只有当集群不是"已连接"或"连接中"状态时,才执行连接操作
if !(cc.ClusterConnectStatus == constants.ClusterConnectStatusConnected ||
cc.ClusterConnectStatus == constants.ClusterConnectStatusConnecting) {
klog.V(4).Infof("集群[%s] 当前状态为[%s],开始连接操作", clusterID, cc.ClusterConnectStatus)
// 清理连接资源,但不停止自动重连(第二个参数为 false)
c.disconnectWithOption(clusterID, false)
// 更新状态为"连接中"
cc.ClusterConnectStatus = constants.ClusterConnectStatusConnecting
// 尝试注册集群
_, err := c.RegisterCluster(cc)
if err != nil {
klog.V(4).Infof("集群[%s] 连接失败: %v,等待下一次重试", clusterID, err)
// 注意:这里不设置状态,让上层重试循环继续工作
if cc.ClusterConnectStatus == constants.ClusterConnectStatusConnecting {
cc.ClusterConnectStatus = constants.ClusterConnectStatusFailed
}
} else {
klog.V(4).Infof("集群[%s] 连接成功", clusterID)
}
} else {
klog.V(4).Infof("集群[%s] 当前状态为[%s],跳过连接操作", clusterID, cc.ClusterConnectStatus)
}
klog.V(4).Infof("连接集群 %s 完毕", clusterID)
}
// disconnectWithOption 断开连接(可选是否停止自动重连)
// 中文函数注释:幂等清理指定集群的连接状态与资源;当 stopReconnect 为 true 时,连同自动重连循环一并停止,
// 为 false 时仅做资源清理以便在自动重连循环内使用,避免自我取消导致循环中断。
func (c *clusterService) disconnectWithOption(clusterID string, stopReconnect bool) {
klog.V(6).Infof("Disconnect 开始清理集群 %s 原始信息(停止自动重连:%t)", clusterID, stopReconnect)
cc := c.GetClusterByID(clusterID)
if cc == nil {
return
}
// 停止心跳
c.StopHeartbeat(clusterID)
// 根据需要停止自动重连
if stopReconnect {
c.StopReconnect(clusterID)
}
// 清理本地状态
cc.ServerVersion = ""
cc.restConfig = nil
cc.Err = ""
cc.ClusterConnectStatus = constants.ClusterConnectStatusDisconnected
cc.watchStatus.Range(func(key, value interface{}) bool {
if v, ok := value.(*clusterWatchStatus); ok {
if v.Watcher != nil {
v.Watcher.Stop()
klog.V(6).Infof("%s 停止 Watch %s", cc.ClusterName, v.WatchType)
}
}
return true
})
// 从kom解除
kom.Clusters().RemoveClusterById(clusterID)
klog.V(6).Infof("Disconnect 完成清理集群 %s", clusterID)
}
// Disconnect 断开连接
// 中文函数注释:幂等清理指定集群的连接状态与资源,并停止自动重连;用于外部显式断开场景。
func (c *clusterService) Disconnect(clusterID string) {
c.disconnectWithOption(clusterID, true)
// 集成 Lease(断开后删除租约,仅责任者删除)
_ = LeaseManager().EnsureOnDisconnect(context.Background(), clusterID)
}
// Scan 扫描集群
func (c *clusterService) Scan() {
cfg := flag.Init()
c.ScanClustersInDir(cfg.KubeConfig)
c.ScanClustersInDB()
}
// AllClusters 获取所有集群
func (c *clusterService) AllClusters() []*ClusterConfig {
return c.clusterConfigs
}
// ConnectedClusters 获取已连接的集群
func (c *clusterService) ConnectedClusters() []*ClusterConfig {
connected := slice.Filter(c.AllClusters(), func(index int, item *ClusterConfig) bool {
return item.ClusterConnectStatus == constants.ClusterConnectStatusConnected
})
return connected
}
// FirstClusterID 获取第一个集群ID
func (c *clusterService) FirstClusterID() string {
clusters := c.ConnectedClusters()
var selectedCluster string
if len(clusters) > 0 {
cluster := clusters[0]
selectedCluster = c.ClusterID(cluster)
}
return selectedCluster
}
// RegisterClustersByPath 根据kubeconfig地址注册集群
func (c *clusterService) RegisterClustersByPath(filePath string) {
// 如果c.clusterConfigs为空,则返回
if len(c.clusterConfigs) == 0 {
klog.V(6).Infof("clusterConfigs为空,不进行注册")
return
}
// 处理路径中的 ~ 符号
expandedPath, err := utils.ExpandHomePath(filePath)
if err != nil {
klog.V(6).Infof("展开路径失败: %v", err)
return
}
filePath = expandedPath
content, err := os.ReadFile(filePath)
if err != nil {
klog.V(6).Infof("读取文件[%s]失败: %v", filePath, err)
return
}
config, err := clientcmd.Load(content)
if err != nil {
klog.V(6).Infof("解析文件[%s]失败: %v", filePath, err)
}
contextName := config.CurrentContext
fileName := filepath.Base(filePath)
c.Connect(fmt.Sprintf("%s/%s", fileName, contextName))
}
// ScanClustersInDir 扫描文件夹下的kubeconfig文件,仅扫描形成列表但是不注册集群
func (c *clusterService) ScanClustersInDir(path string) {
// 处理路径中的 ~ 符号
expandedPath, err := utils.ExpandHomePath(path)
if err != nil {
klog.V(6).Infof("展开路径失败: %v", err)
return
}
path = expandedPath
// 1. 通过kubeconfig文件,找到所在目录
dir := filepath.Dir(path)
// 2. 通过所在目录,找到同目录下的所有文件
files, err := os.ReadDir(dir)
if err != nil {
klog.V(6).Infof("读取文件夹[%s]失败: %v", dir, err)
return
}
// 3. 检查每个文件是否为有效的kubeconfig文件
for _, file := range files {
if file.IsDir() {
continue
}
filePath := filepath.Join(dir, file.Name())
content, err := os.ReadFile(filePath)
if err != nil {
klog.V(6).Infof("读取文件[%s]失败: %v", filePath, err)
continue
}
config, err := clientcmd.Load(content)
if err != nil {
klog.V(6).Infof("解析文件[%s]失败: %v", filePath, err)
continue // 解析失败,跳过该文件
}
for contextName := range config.Contexts {
context := config.Contexts[contextName]
cluster := config.Clusters[context.Cluster]
clusterConfig := &ClusterConfig{
FileName: file.Name(),
ContextName: contextName,
ClusterID: fmt.Sprintf("%s/%s", file.Name(), contextName),
UserName: context.AuthInfo,
ClusterName: context.Cluster,
Namespace: context.Namespace,
kubeConfig: content,
ClusterConnectStatus: constants.ClusterConnectStatusDisconnected,
Source: ClusterConfigSourceFile,
}
clusterConfig.Server = cluster.Server
c.AddToClusterList(clusterConfig)
}
}
}
func (c *clusterService) ScanClustersInDB() {
var list []*models.KubeConfig
err := dao.DB().Model(&models.KubeConfig{}).Find(&list).Error
if err != nil {
klog.Errorf("查询集群失败: %v", err)
return
}
for i, cc := range c.clusterConfigs {
if cc.Source == ClusterConfigSourceDB || cc.Source == ClusterConfigSourceAWS {
// 查一下list中是否存在
filter := slice.Filter(list, func(index int, item *models.KubeConfig) bool {
if item.Server == cc.Server && item.User == cc.UserName && item.Cluster == cc.ClusterName {
return true
}
return false
})
if len(filter) == 0 {
// 在数据库中也不存在
// 从list中删除
// 删除前先断开连接,避免watcher泄露
c.Disconnect(cc.ClusterID)
c.clusterConfigs = slice.DeleteAt(c.clusterConfigs, i)
}
}
}
// 2. 处理数据库中的配置
for _, item := range list {
config, err := clientcmd.Load([]byte(item.Content))
if err != nil {
klog.V(6).Infof("解析集群 [%s]失败: %v", item.Server, err)
continue
}
// 检查每个context
for contextName := range config.Contexts {
context := config.Contexts[contextName]
cluster := config.Clusters[context.Cluster]
if context.AuthInfo == item.User {
// 检查是否已存在该配置
exists := false
for _, cc := range c.clusterConfigs {
if (cc.FileName == string(ClusterConfigSourceAWS)) && cc.Server == cluster.Server && cc.ContextName == contextName {
exists = true
break
}
}
// 如果不存在,添加新配置
if !exists {
clusterConfig := &ClusterConfig{
ContextName: contextName,
UserName: context.AuthInfo,
ClusterName: context.Cluster,
Namespace: context.Namespace,
kubeConfig: []byte(item.Content),
ClusterConnectStatus: constants.ClusterConnectStatusDisconnected,
Server: cluster.Server,
Source: ClusterConfigSourceDB,
// 从数据库中读取 kom 配置项
ProxyURL: item.ProxyURL,
Timeout: item.Timeout,
QPS: item.QPS,
Burst: item.Burst,
DBID: item.ID,
}
if item.DisplayName != "" {
clusterConfig.FileName = item.DisplayName
} else {
clusterConfig.FileName = fmt.Sprintf("%d-%s", item.ID, contextName)
}
// aws 单独处理
if item.IsAWSEKS {
clusterConfig.Source = ClusterConfigSourceAWS
eksConfig := &komaws.EKSAuthConfig{
AccessKey: item.AccessKey,
SecretAccessKey: item.SecretAccessKey,
Region: item.Region,
ClusterName: item.ClusterName,
}
clusterConfig.AWSConfig = eksConfig
clusterConfig.IsAWSEKS = true
clusterConfig.FileName = string(ClusterConfigSourceAWS)
}
clusterConfig.Server = cluster.Server
c.AddToClusterList(clusterConfig)
}
}
}
}
}
func (c *clusterService) AddToClusterList(clusterConfig *ClusterConfig) {
// 判断是否已经存在
if c.GetClusterByID(clusterConfig.GetClusterID()) != nil {
return
}
c.clusterConfigs = append(c.clusterConfigs, clusterConfig)
}
// Deprecated
// RegisterClustersInDir 注册集群,扫描文件夹下的kubeconfig文件,注册集群
func (c *clusterService) RegisterClustersInDir(path string) {
// 1. 通过kubeconfig文件,找到所在目录
dir := filepath.Dir(path)
// 2. 通过所在目录,找到同目录下的所有文件
files, err := os.ReadDir(dir)
if err != nil {
klog.V(6).Infof("读取文件夹[%s]失败: %v", dir, err)
return
}
// 3. 检查每个文件是否为有效的kubeconfig文件
for _, file := range files {
if file.IsDir() {
continue
}
filePath := filepath.Join(dir, file.Name())
content, err := os.ReadFile(filePath)
if err != nil {
klog.V(6).Infof("读取文件[%s]失败: %v", filePath, err)
continue
}
config, err := clientcmd.Load(content)
if err != nil {
klog.V(6).Infof("解析文件[%s]失败: %v", filePath, err)
continue // 解析失败,跳过该文件
}
for contextName := range config.Contexts {
context := config.Contexts[contextName]
cluster := config.Clusters[context.Cluster]
clusterConfig := &ClusterConfig{
FileName: file.Name(),
ContextName: contextName,
UserName: context.AuthInfo,
ClusterName: context.Cluster,
Namespace: context.Namespace,
kubeConfig: content,
ClusterConnectStatus: constants.ClusterConnectStatusDisconnected,
Source: ClusterConfigSourceFile,
}
clusterConfig.Server = cluster.Server
c.AddToClusterList(clusterConfig)
}
}
// 注册
for _, clusterConfig := range c.clusterConfigs {
// 改为只注册CurrentContext的这个
_, _ = c.RegisterCluster(clusterConfig)
}
// 打印serverVersion
for _, clusterConfig := range c.clusterConfigs {
klog.V(6).Infof("ServerVersion: %s/%s: %s[%s] using user: %s", clusterConfig.FileName, clusterConfig.ContextName, clusterConfig.ServerVersion, clusterConfig.Server, clusterConfig.UserName)
}
}
// RegisterCluster 从已扫描的集群列表中注册指定的某个集群
// 负责单次注册尝试,不处理重试逻辑。连接失败时返回错误,
// 由调用方(Connect 方法)负责设置状态为 Failed 以配合上层的重试机制。
// 致命错误(如配置错误)会直接在本方法中设置为失败状态。
func (c *clusterService) RegisterCluster(clusterConfig *ClusterConfig) (bool, error) {
if clusterConfig == nil {
return false, fmt.Errorf("集群配置为空")
}
clusterID := clusterConfig.GetClusterID()
klog.V(6).Infof("开始注册集群 %s [来源:%s]", clusterID, clusterConfig.Source)
// AWS EKS 集群处理
if clusterConfig.IsAWSEKS {
if clusterConfig.AWSConfig == nil {
err := fmt.Errorf("AWS EKS 集群[%s]缺少 AWSConfig", clusterID)
clusterConfig.ClusterConnectStatus = constants.ClusterConnectStatusFailed // 配置错误属于致命错误
clusterConfig.Err = err.Error()
return false, err
}
// 构建注册选项
opts := c.buildRegisterOptions(clusterConfig)
// 使用带 ID 的注册确保幂等
if _, err := kom.Clusters().RegisterAWSClusterWithID(clusterConfig.AWSConfig, clusterID, opts...); err != nil {
klog.V(4).Infof("注册 AWS 集群[%s]失败: %v", clusterID, err)
clusterConfig.Err = err.Error()
return false, err // 保持"连接中"状态
}
// 注册成功后校验连通性
if err := c.LoadRestConfig(clusterConfig); err != nil {
clusterConfig.Err = err.Error()
return false, err // 保持"连接中"状态
}
} else {
// 非 AWS 集群处理
if err := c.LoadRestConfig(clusterConfig); err != nil {
clusterConfig.Err = err.Error()
return false, err // 保持"连接中"状态
}
if clusterConfig.IsInCluster {
// InCluster 模式
if _, err := kom.Clusters().RegisterInCluster(); err != nil {
klog.V(4).Infof("注册集群[%s]失败: %v", clusterID, err)
clusterConfig.Err = err.Error()
return false, err // 保持"连接中"状态
}
} else {
// 集群外模式
opts := c.buildRegisterOptions(clusterConfig)
if _, err := kom.Clusters().RegisterByConfigWithID(clusterConfig.restConfig, clusterID, opts...); err != nil {
klog.V(4).Infof("注册集群[%s]失败: %v", clusterID, err)
clusterConfig.Err = err.Error()
return false, err // 保持"连接中"状态
}
}
}
// 所有注册步骤成功完成,更新状态
klog.V(4).Infof("成功注册集群: %s [%s]", clusterID, clusterConfig.Server)
clusterConfig.ClusterConnectStatus = constants.ClusterConnectStatusConnected
clusterConfig.Err = "" // 清除错误信息
// 启动心跳监测
c.StartHeartbeat(clusterID)
// 执行回调注册
if c.callbackRegisterFunc != nil {
c.callbackRegisterFunc(clusterConfig)
}
// 集成 Lease(连接成功后创建租约并占有)
_ = LeaseManager().EnsureOnConnect(context.Background(), clusterID)
return true, nil
}
// LoadRestConfig 校验集群是否可连接,并更新状态
func (c *clusterService) LoadRestConfig(config *ClusterConfig) error {
var restConfig *rest.Config
var err error
if config.IsInCluster {
// 集群内模式(严格处理错误,避免误判为连通)
restConfig, err = rest.InClusterConfig()
if err != nil {
klog.V(6).Infof("加载 InCluster 配置失败 %s: %v", config.GetClusterID(), err)
config.Err = err.Error()
config.ClusterConnectStatus = constants.ClusterConnectStatusFailed
return err
}
} else {
// 集群外模式
lines := strings.Split(string(config.kubeConfig), "\n")
for i, line := range lines {
if strings.HasPrefix(line, "current-context:") {
lines[i] = "current-context: " + config.ContextName
}
}
bytes := []byte(strings.Join(lines, "\n"))
restConfig, err = clientcmd.RESTConfigFromKubeConfig(bytes)
}
config.restConfig = restConfig
if config.IsAWSEKS {
theaws := kom.Clusters().GetClusterById(config.ClusterID)
if theaws != nil && theaws.AWSAuthProvider != nil {
if token, _, errT := theaws.AWSAuthProvider.GetToken(context.Background()); errT == nil {
config.restConfig.BearerToken = token
klog.V(6).Infof("成功为 AWS EKS 集群[%s]设置 Bearer Token", config.ClusterID)
} else {
klog.V(4).Infof("获取 AWS EKS 集群[%s] Token 失败: %v", config.ClusterID, errT)
}
} else {
klog.V(4).Infof("无法为 AWS EKS 集群[%s]获取 Token:集群或 AWSAuthProvider 不可用", config.ClusterID)
}
}
// 校验集群是否可连接
clientset, err := kubernetes.NewForConfig(restConfig)
if err != nil {
klog.V(6).Infof("创建clientset失败 %s: %v", config.GetClusterID(), err)
config.Err = err.Error()
config.ClusterConnectStatus = constants.ClusterConnectStatusFailed
return err
}
// 尝试获取集群版本以验证连接
info, err := clientset.ServerVersion()
if err != nil {
klog.V(6).Infof("连接集群失败 %s: %v", config.GetClusterID(), err)
config.Err = err.Error()
config.ClusterConnectStatus = constants.ClusterConnectStatusFailed
return err
}
klog.V(6).Infof("LoadRestConfig 获取集群 版本成功 %s", config.GetClusterID())
config.ServerVersion = info.GitVersion
return err
}
// RegisterInCluster 将InCluster的配置注册到集群列表中
func (c *clusterService) RegisterInCluster() {
// 获取InCluster的配置
config, err := rest.InClusterConfig()
if err != nil {
cfg := flag.Init()
cfg.InCluster = false
klog.Errorf("获取InCluster的配置失败,InCluster模式关闭.错误:%v", err)
return
}
// 3. 生成 ClusterConfig
clusterConfig := &ClusterConfig{
ClusterName: "kubernetes", // InCluster 模式没有 context, 设定默认名称
FileName: "InCluster",
ContextName: "InCluster",
ClusterID: "InCluster",
Server: config.Host,
IsInCluster: true,
restConfig: config,
ClusterConnectStatus: constants.ClusterConnectStatusDisconnected,
Source: ClusterConfigSourceInCluster,
}
c.AddToClusterList(clusterConfig)
_, _ = c.RegisterCluster(clusterConfig)
}
func (c *ClusterConfig) SetClusterScanStatus(result *analysis.ResultWithStatus) {
c.K8sGPTProblemsCount = result.Problems
c.K8sGPTProblemsResult = result
}
func (c *ClusterConfig) GetClusterScanResult() *analysis.ResultWithStatus {
return c.K8sGPTProblemsResult
}
// =========================== 集群服务增强方法 ===========================
// validateAWSConfig 验证AWS配置
func (c *clusterService) validateAWSConfig(config *komaws.EKSAuthConfig) error {
if config == nil {
return fmt.Errorf("AWS配置不能为空")
}
if config.AccessKey == "" {
return fmt.Errorf("AWS Access Key不能为空")
}
if config.SecretAccessKey == "" {
return fmt.Errorf("AWS Secret Access Key不能为空")
}
if config.Region == "" {
return fmt.Errorf("AWS区域不能为空")
}
if config.ClusterName == "" {
return fmt.Errorf("EKS集群名称不能为空")
}
return nil
}
// RegisterAWSEKSCluster 注册AWS EKS集群
func (c *clusterService) RegisterAWSEKSCluster(config *komaws.EKSAuthConfig) (*ClusterConfig, error) {
// 参数验证
if err := c.validateAWSConfig(config); err != nil {
return nil, fmt.Errorf("AWS配置验证失败: %w", err)
}
// 将项目内部的AWSEKSConfig转换为kom库要求的kom.aws.EKSAuthConfig
eksAuthConfig := &komaws.EKSAuthConfig{
AccessKey: config.AccessKey,
SecretAccessKey: config.SecretAccessKey,
Region: config.Region,
ClusterName: config.ClusterName,
}
kg := komaws.NewKubeconfigGenerator()
content, err := kg.GenerateFromAWS(eksAuthConfig)
if err != nil {
return nil, fmt.Errorf("生成AWS EKS集群配置文件失败: %w", err)
}
kubeconfig, err := clientcmd.Load([]byte(content))
if err != nil {
klog.V(6).Infof("解析 AWS EKS集群kubeconfig配置失败: %v", err)
return nil, fmt.Errorf("生成AWS EKS集群配置文件失败: %w", err)
}
// 只取第一个context
var contextName string
var clusterConfig *ClusterConfig
for name := range kubeconfig.Contexts {
contextName = name
break
}
if contextName != "" {
context := kubeconfig.Contexts[contextName]
cluster := kubeconfig.Clusters[context.Cluster]
clusterConfig = &ClusterConfig{
FileName: string(ClusterConfigSourceAWS),
ContextName: contextName,
ClusterName: context.Cluster,
Namespace: context.Namespace,
Server: cluster.Server,
kubeConfig: []byte(content),
ClusterConnectStatus: constants.ClusterConnectStatusDisconnected,
Source: ClusterConfigSourceAWS,
IsAWSEKS: true,
AWSConfig: config,
}
clusterID := clusterConfig.GetClusterID()
// 构建注册选项(使用默认值,因为此方法没有传入 kom 配置项)
opts := c.buildRegisterOptions(clusterConfig)
// 使用kom统一的AWS EKS集群注册方法
_, err = kom.Clusters().RegisterAWSClusterWithID(eksAuthConfig, clusterID, opts...)
if err != nil {
clusterConfig.ClusterConnectStatus = constants.ClusterConnectStatusFailed
clusterConfig.Err = err.Error()
return nil, fmt.Errorf("注册AWS EKS集群失败: %w", err)
}
// 添加到集群列表
c.AddToClusterList(clusterConfig)
// 校验连通性并设置ServerVersion
if err := c.LoadRestConfig(clusterConfig); err != nil {
clusterConfig.ClusterConnectStatus = constants.ClusterConnectStatusFailed
clusterConfig.Err = err.Error()
return nil, fmt.Errorf("AWS EKS集群连通性校验失败: %w", err)
}
clusterConfig.ClusterConnectStatus = constants.ClusterConnectStatusConnected
clusterConfig.Err = ""
klog.V(4).Infof("成功注册AWS EKS集群: %s [%s]", config.ClusterName, clusterID)
}
return clusterConfig, nil
}
// buildRegisterOptions 根据 ClusterConfig 构建 kom 注册选项
func (c *clusterService) buildRegisterOptions(clusterConfig *ClusterConfig) []kom.RegisterOption {
klog.V(6).Infof("开始构建集群 %s 的注册选项配置", clusterConfig.ClusterID)
var opts []kom.RegisterOption
// 设置代理
if clusterConfig.ProxyURL != "" {
klog.V(6).Infof("设置集群 %s 代理URL: %s", clusterConfig.ClusterID, clusterConfig.ProxyURL)
opts = append(opts, kom.RegisterProxyURL(clusterConfig.ProxyURL))
} else {
klog.V(6).Infof("集群 %s 未设置代理URL", clusterConfig.ClusterID)
}
// 设置超时时间
if clusterConfig.Timeout > 0 {
klog.V(6).Infof("设置集群 %s 超时时间: %d 秒", clusterConfig.ClusterID, clusterConfig.Timeout)
opts = append(opts, kom.RegisterTimeout(time.Duration(clusterConfig.Timeout)*time.Second))
} else {
klog.V(6).Infof("集群 %s 使用默认超时时间", clusterConfig.ClusterID)
}
// 设置 QPS
if clusterConfig.QPS > 0 {
klog.V(6).Infof("设置集群 %s QPS 限制: %.2f", clusterConfig.ClusterID, clusterConfig.QPS)
opts = append(opts, kom.RegisterQPS(clusterConfig.QPS))
} else {
klog.V(6).Infof("集群 %s 使用默认 QPS 限制", clusterConfig.ClusterID)
}
// 设置 Burst
if clusterConfig.Burst > 0 {
klog.V(6).Infof("设置集群 %s Burst 限制: %d", clusterConfig.ClusterID, clusterConfig.Burst)
opts = append(opts, kom.RegisterBurst(clusterConfig.Burst))
} else {
klog.V(6).Infof("集群 %s 使用默认 Burst 限制", clusterConfig.ClusterID)
}
klog.V(6).Infof("集群 %s 注册选项配置完成,共配置 %d 个选项", clusterConfig.ClusterID, len(opts))
return opts
}
// UpdateClusterConfig 更新已加载集群的配置参数
// @Description 根据数据库ID更新已加载集群的ProxyURL、Timeout、QPS、Burst配置,并重新注册已连接的集群
// @Param dbID 数据库中的集群配置ID
// @Param proxyURL HTTP代理URL
// @Param timeout 请求超时时间(秒)
// @Param qps 每秒查询数限制
// @Param burst 突发请求数限制
func (c *clusterService) UpdateClusterConfig(dbID uint, proxyURL string, timeout int, qps float32, burst int) error {
klog.V(6).Infof("开始更新集群配置,数据库ID: %d", dbID)
// 查找对应的集群配置
var targetCluster *ClusterConfig
for _, cluster := range c.clusterConfigs {
if cluster.DBID == dbID {
targetCluster = cluster
break
}
}
if targetCluster == nil {
klog.V(4).Infof("未找到数据库ID为 %d 的集群配置", dbID)
return fmt.Errorf("未找到数据库ID为 %d 的集群配置", dbID)
}
klog.V(6).Infof("找到集群配置: %s [%s]", targetCluster.ClusterID, targetCluster.Server)
// 记录原始配置用于日志
oldProxyURL := targetCluster.ProxyURL
oldTimeout := targetCluster.Timeout
oldQPS := targetCluster.QPS
oldBurst := targetCluster.Burst
// 更新配置参数
targetCluster.ProxyURL = proxyURL
targetCluster.Timeout = timeout
targetCluster.QPS = qps
targetCluster.Burst = burst
klog.V(6).Infof("集群 %s 配置更新: ProxyURL [%s->%s], Timeout [%d->%d], QPS [%.2f->%.2f], Burst [%d->%d]",
targetCluster.ClusterID, oldProxyURL, proxyURL, oldTimeout, timeout, oldQPS, qps, oldBurst, burst)
// 如果集群已连接,需要重新注册以应用新配置
if targetCluster.ClusterConnectStatus == constants.ClusterConnectStatusConnected {
klog.V(6).Infof("集群 %s 已连接,开始重新注册以应用新配置", targetCluster.ClusterID)
// 重新连接,这会使用新的配置参数
go func() {
time.Sleep(200 * time.Millisecond) // 稍微延迟一下再重连
c.Connect(targetCluster.ClusterID)
}()
klog.V(4).Infof("集群 %s 配置更新完成,已启动重新连接", targetCluster.ClusterID)
} else {
klog.V(6).Infof("集群 %s 未连接,配置更新完成,下次连接时将使用新配置", targetCluster.ClusterID)
}
return nil
}
// StartHeartbeat 启动心跳任务
// @Description 周期性检测集群连通性并记录心跳历史;当心跳失败次数达到阈值时,自动取消当前心跳、清理历史并执行重连。
// @Param clusterID 集群ID
func (c *clusterService) StartHeartbeat(clusterID string) {
// 初始化心跳配置默认值
if c.HeartbeatIntervalSeconds <= 0 {
c.HeartbeatIntervalSeconds = 30
}
if c.HeartbeatFailureThreshold <= 0 {
c.HeartbeatFailureThreshold = 3
}
// 如果已有心跳,先停止
if cancelInterface, ok := c.heartbeatCancel.Load(clusterID); ok {
if cancel, ok := cancelInterface.(context.CancelFunc); ok && cancel != nil {
cancel()
}
c.heartbeatCancel.Delete(clusterID)
}
cluster := c.GetClusterByID(clusterID)
if cluster == nil {
klog.V(6).Infof("启动心跳失败:未找到集群 %s", clusterID)
return
}
// 仅在已连接时启动心跳
if cluster.ClusterConnectStatus != constants.ClusterConnectStatusConnected {
klog.V(6).Infof("集群 %s 非已连接状态,心跳不启动", clusterID)
return
}
ctx, cancel := context.WithCancel(context.Background())
c.heartbeatCancel.Store(clusterID, cancel)
interval := time.Duration(c.HeartbeatIntervalSeconds) * time.Second
ticker := time.NewTicker(interval)
klog.V(6).Infof("集群 %s 心跳启动,间隔 %ds,失败阈值 %d,自动重连最大退避秒数 %ds", clusterID, c.HeartbeatIntervalSeconds, c.HeartbeatFailureThreshold, c.ReconnectMaxIntervalSeconds)
go func() {
defer ticker.Stop()
failureCount := 0
for {
select {
case <-ctx.Done():
klog.V(6).Infof("集群 %s 心跳已停止", clusterID)
return
case <-ticker.C:
// 若集群不再是已连接状态,则停止心跳
if cluster.ClusterConnectStatus != constants.ClusterConnectStatusConnected {
klog.V(6).Infof("集群 %s 心跳检测:状态已非已连接,停止心跳", clusterID)
cancel()
return
}
// restConfig 必须存在
if cluster.restConfig == nil {
failureCount++
klog.V(6).Infof("集群 %s 心跳检测失败:restConfig 不存在(累计失败 %d)", clusterID, failureCount)
// 记录本次心跳失败
c.appendHeartbeatRecord(cluster, false, time.Now())
} else {
clientset, err := kubernetes.NewForConfig(cluster.restConfig)
if err != nil {
failureCount++
klog.V(6).Infof("集群 %s 创建 clientset 失败:%v(累计失败 %d)", clusterID, err, failureCount)
// 记录本次心跳失败
c.appendHeartbeatRecord(cluster, false, time.Now())
} else {
sv, err := clientset.Discovery().ServerVersion()
if err != nil {
failureCount++
klog.V(6).Infof("集群 %s 心跳检测读取版本失败:%v(累计失败 %d)", clusterID, err, failureCount)
cluster.Err = err.Error()
// 记录本次心跳失败
c.appendHeartbeatRecord(cluster, false, time.Now())
} else {
// 成功,重置失败计数并同步版本
failureCount = 0
if sv != nil {
cluster.ServerVersion = sv.GitVersion
}
klog.V(6).Infof("集群 %s 心跳检测成功,当前版本:%s", clusterID, cluster.ServerVersion)
// 记录本次心跳成功
c.appendHeartbeatRecord(cluster, true, time.Now())
}
}
}
if failureCount >= c.HeartbeatFailureThreshold {
// 达到失败阈值,切换为断开并停止心跳,并启动独立的自动重连循环
cluster.ClusterConnectStatus = constants.ClusterConnectStatusDisconnected
klog.V(6).Infof("集群 %s 心跳连续失败达到阈值,状态切换为未连接,启动自动重连循环", clusterID)
// 停止当前心跳循环
cancel()
// 启动自动重连循环(退避重试直到成功或被停止)
c.StartReconnect(clusterID)
return
}
}
}
}()
}
// StopHeartbeat 停止指定集群的心跳任务
// @Description 若心跳存在则停止并清理取消函数。
func (c *clusterService) StopHeartbeat(clusterID string) {
if cancelInterface, ok := c.heartbeatCancel.Load(clusterID); ok {
if cancel, ok := cancelInterface.(context.CancelFunc); ok && cancel != nil {
cancel()
}
c.heartbeatCancel.Delete(clusterID)
klog.V(6).Infof("集群 %s 心跳任务已停止", clusterID)
}
}
// StartReconnect 启动指定集群的自动重连循环
// 中文函数注释:当集群处于不可用状态时,周期性地执行“先断开清理、再尝试连接”,并采用指数退避策略(最大退避秒数可配置);
// 当检测到集群成功连接后,自动结束重连循环。日志均为中文,便于观察。
func (c *clusterService) StartReconnect(clusterID string) {
// 初始化重连配置默认值
if c.ReconnectMaxIntervalSeconds <= 0 {
c.ReconnectMaxIntervalSeconds = 3600
}
// 若已有自动重连任务,先停止
if cancelInterface, ok := c.reconnectCancel.Load(clusterID); ok {
if cancel, ok := cancelInterface.(context.CancelFunc); ok && cancel != nil {
cancel()
}
c.reconnectCancel.Delete(clusterID)
}
ctx, cancel := context.WithCancel(context.Background())
c.reconnectCancel.Store(clusterID, cancel)
klog.V(6).Infof("集群 %s 自动重连循环启动(最大退避 %ds,最大重试次数 %d)", clusterID, c.ReconnectMaxIntervalSeconds, c.MaxRetryAttempts)
go func(id string) {
attempt := 0
backoff := 1 // 初始退避秒数
maxIntervalSeconds := c.ReconnectMaxIntervalSeconds
maxRetryAttempts := c.MaxRetryAttempts
// 如果最大重试次数小于等于0,则设置默认值100
if maxRetryAttempts <= 0 {
maxRetryAttempts = 100
}
for {
select {
case <-ctx.Done():
klog.V(6).Infof("集群 %s 自动重连循环已停止", id)
return
default:
}
// 若已连接则结束重连
if c.IsConnected(id) {
klog.V(6).Infof("集群 %s 已连接,自动重连循环结束", id)
cancel()
c.reconnectCancel.Delete(id)
return
}
attempt++
// 检查是否超过最大重试次数
if attempt > maxRetryAttempts {
klog.V(6).Infof("集群 %s 自动重连已达到最大重试次数 %d,停止重连", id, maxRetryAttempts)
cancel()
c.reconnectCancel.Delete(id)
return
}
klog.V(6).Infof("集群 %s 自动重连第 %d 次尝试:先断开清理后重连", id, attempt)
// 尝试连接
c.Connect(id)
// 若连接成功,结束重连循环
if c.IsConnected(id) {
klog.V(6).Infof("集群 %s 自动重连成功", id)
cancel()
c.reconnectCancel.Delete(id)
return
}
// 指数退避,封顶
backoff = min(backoff*2, maxIntervalSeconds)
klog.V(6).Infof("集群 %s 自动重连失败,%ds 后重试", id, backoff)
time.Sleep(time.Duration(backoff) * time.Second)
}
}(clusterID)
}
// StopReconnect 停止指定集群的自动重连循环
// 中文函数注释:若自动重连循环存在则停止,并清理取消函数。
func (c *clusterService) StopReconnect(clusterID string) {
if cancelInterface, ok := c.reconnectCancel.Load(clusterID); ok {
if cancel, ok := cancelInterface.(context.CancelFunc); ok && cancel != nil {
cancel()
}
c.reconnectCancel.Delete(clusterID)
klog.V(6).Infof("集群 %s 自动重连循环已停止", clusterID)
}
}