Skip to main content
Glama
helm_cmd.go15.5 kB
package helm import ( "bytes" "encoding/base64" "encoding/json" "fmt" "os" "os/exec" "strings" "time" "github.com/weibaohui/k8m/pkg/flag" "github.com/weibaohui/k8m/pkg/service" "gorm.io/gorm" "k8s.io/klog/v2" "github.com/duke-git/lancet/v2/slice" "github.com/weibaohui/k8m/internal/dao" "github.com/weibaohui/k8m/pkg/models" "sigs.k8s.io/yaml" ) // HelmCmd 通过调用 helm 二进制命令实现 Helm 接口 // 注意:部分方法需解析 helm 命令输出,部分功能可能与 SDK 实现略有差异 type HelmCmd struct { HelmBin string // helm 二进制路径,默认 "helm" repoCacheDir string // clusterID string kubeconfigPath string token string caFile string apiServer string cluster *service.ClusterConfig } // NewBackgroundHelmCmd 独立后台执行Helm命令 func NewBackgroundHelmCmd(helmBin string) *HelmCmd { if helmBin == "" { helmBin = "helm" } cfg := flag.Init() h := &HelmCmd{ HelmBin: helmBin, repoCacheDir: cfg.HelmCachePath, } // 确保目录存在 if err := os.MkdirAll(h.repoCacheDir, 0755); err != nil { klog.V(6).Infof("[helm-cmd] warn: create repo cache dir failed: %v", err) } return h } func NewHelmCmd(helmBin string, clusterID string, cluster *service.ClusterConfig) *HelmCmd { if helmBin == "" { helmBin = "helm" } cfg := flag.Init() h := &HelmCmd{ HelmBin: helmBin, repoCacheDir: cfg.HelmCachePath, clusterID: clusterID, cluster: cluster, } // 确保目录存在 if err := os.MkdirAll(h.repoCacheDir, 0755); err != nil { klog.V(6).Infof("[helm-cmd] warn: create repo cache dir failed: %v", err) } // 将kubeconfig 字符串 存放到临时目录 // 每次都固定格式,<cluster_name>-kubeconfig.yaml encodedClusterID := base64.URLEncoding.EncodeToString([]byte(clusterID)) kubeconfigPath := fmt.Sprintf("%s/%s-kubeconfig.yaml", h.repoCacheDir, encodedClusterID) kubeconfig := cluster.GetKubeconfig() if err := os.WriteFile(kubeconfigPath, []byte(kubeconfig), 0644); err != nil { klog.V(6).Infof("[helm-cmd] warn: write kubeconfig to file failed: %v", err) } h.kubeconfigPath = kubeconfigPath if cluster.IsInCluster { h.fillK8sToken() } return h } // NewHelmCmdWithNoCluster 仅Helm命令 func NewHelmCmdWithNoCluster(helmBin string) *HelmCmd { if helmBin == "" { helmBin = "helm" } cfg := flag.Init() h := &HelmCmd{ HelmBin: helmBin, repoCacheDir: cfg.HelmCachePath, } // 确保目录存在 if err := os.MkdirAll(h.repoCacheDir, 0755); err != nil { klog.V(6).Infof("[helm-cmd] warn: create repo cache dir failed: %v", err) } return h } // runAndLog 执行 helm 命令并输出日志,支持 shell 特性和可选 stdin func (h *HelmCmd) runAndLog(args []string, stdin string) ([]byte, error) { args = slice.InsertAt(args, 0, "--debug") if h.cluster != nil && h.cluster.IsInCluster { accessArgs := []string{ "--kube-token", h.token, "--kube-apiserver", h.apiServer, "--kube-ca-file", h.caFile, } // 参数只有一个,直接追加访问参数 args = append(args, accessArgs...) } // 最后一个参数都加上 2>/dev/null args = append(args, "2>/dev/null") cmdStr := h.HelmBin + " " + strings.Join(args, " ") klog.V(6).Infof("[helm-cmd] exec: %s\n", cmdStr) cmd := exec.Command("sh", "-c", cmdStr) cmd.Env = append(os.Environ(), fmt.Sprintf("%s=%s", "HELM_CACHE_HOME", h.repoCacheDir), ) if h.cluster != nil && !h.cluster.IsInCluster { // 不在集群内 cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", "KUBECONFIG", h.kubeconfigPath), ) } if stdin != "" { cmd.Stdin = strings.NewReader(stdin) } var stdout, stderr bytes.Buffer cmd.Stdout = &stdout cmd.Stderr = &stderr err := cmd.Run() if stdout.Len() > 0 { klog.V(6).Infof("[helm-cmd] stdout: %s\n", stdout.String()) } if stderr.Len() > 0 { klog.V(6).Infof("[helm-cmd] stderr: %s\n", stderr.String()) err = fmt.Errorf("%s", stderr.String()) } return stdout.Bytes(), err } // AddOrUpdateRepo 添加或更新 Helm 仓库 func (h *HelmCmd) AddOrUpdateRepo(helmRepo *models.HelmRepository) error { // 1. 先执行数据库操作,保存 HelmRepository 信息 // 2. 再执行 helm repo add/update // 判断该名称、URL的仓库是否存在 if id, err := helmRepo.GetIDByNameAndURL(nil); err == nil && id > 0 { helmRepo.ID = id } else { // 第一次创建,先保存到数据库 if err = helmRepo.Save(nil); err != nil { return fmt.Errorf("save helm repository to database error: %v", err) } } // 3. helm repo add args := []string{"repo", "add", helmRepo.Name, helmRepo.URL} if helmRepo.Username != "" { args = append(args, "--username", helmRepo.Username) } if helmRepo.Password != "" { args = append(args, "--password", helmRepo.Password) } out, err := h.runAndLog(args, "") if err != nil && !strings.Contains(string(out), "already exists") { return fmt.Errorf("helm repo add failed: %v, output: %s", err, string(out)) } _, err = h.updateRepoByName(helmRepo) if err != nil { return err } return nil } func (h *HelmCmd) updateRepoByName(helmRepo *models.HelmRepository) (bool, error) { // 4. helm repo update out, err := h.runAndLog([]string{"repo", "update", helmRepo.Name}, "") if err != nil { return false, fmt.Errorf("helm repo update failed: %v, output: %s", err, string(out)) } // 5. helm repo index 文件分析,记录所有chart到数据库 cachePath := fmt.Sprintf("%s/repository/%s-index.yaml", h.repoCacheDir, helmRepo.Name) indexData, err := os.ReadFile(cachePath) if err != nil { klog.V(6).Infof("[helm-cmd] warn: read repo index file failed: %v\n", err) return true, nil // 不阻断主流程 } var index IndexFile if err := yaml.Unmarshal(indexData, &index); err != nil { klog.V(6).Infof("[helm-cmd] warn: unmarshal repo index file failed: %v\n", err) return true, nil } // 清空数据库中对应的chart repo dao.DB().Where("repository_id = ?", helmRepo.ID).Delete(models.HelmChart{}) for chartName, versionList := range index.Entries { if len(versionList) == 0 { continue } slice.SortBy(versionList, func(a *ChartVersion, b *ChartVersion) bool { return a.Created.After(b.Created) }) ct := versionList[0] if ct.Metadata != nil { m := models.HelmChart{ RepositoryID: helmRepo.ID, RepositoryName: helmRepo.Name, Name: chartName, } if ct.Version != "" { m.LatestVersion = ct.Version } if ct.Description != "" { m.Description = ct.Description } if ct.Home != "" { m.Home = ct.Home } if ct.Icon != "" { m.Icon = ct.Icon } if ct.KubeVersion != "" { m.KubeVersion = ct.KubeVersion } if ct.AppVersion != "" { m.AppVersion = ct.AppVersion } m.Deprecated = ct.Deprecated if len(ct.Keywords) > 0 { m.Keywords = strings.Join(ct.Keywords, ",") } if len(ct.Sources) > 0 { m.Sources = ct.Sources[0] } err = m.Save(nil) if err != nil { klog.V(6).Infof("[helm-cmd] warn: save helm chart to database error: %v\n", err) } } } if !index.Generated.IsZero() { // 更新索引时间 helmRepo.Generated = index.Generated.Format(time.DateTime) _ = helmRepo.Save(nil) } return false, nil } func (h *HelmCmd) GetReleaseHistory(namespace string, releaseName string) ([]*models.ReleaseHistory, error) { out, err := h.runAndLog([]string{"history", releaseName, "-n", namespace, "-o", "json"}, "") if err != nil { return nil, fmt.Errorf("helm history failed: %v, output: %s", err, string(out)) } var releases []*models.ReleaseHistory if err := json.Unmarshal(out, &releases); err != nil { return nil, fmt.Errorf("unmarshal helm history output failed: %v, output: %s", err, string(out)) } return releases, nil } func (h *HelmCmd) UninstallRelease(namespace string, releaseName string) error { out, err := h.runAndLog([]string{"uninstall", releaseName, "-n", namespace}, "") // 删除数据库HelmRelease if err == nil { _ = models.DeleteHelmReleaseByNsAndReleaseName(namespace, releaseName, h.clusterID) // 忽略错误 } if err != nil { return fmt.Errorf("helm uninstall failed: %v, output: %s", err, string(out)) } return nil } func (h *HelmCmd) InstallRelease(namespace, releaseName, repoName, chartName, version string, values ...string) error { chartRef := fmt.Sprintf("%s/%s", repoName, chartName) args := []string{"install", releaseName, chartRef, "--namespace", namespace, "--version", version, "--create-namespace"} stdin := "" if len(values) > 0 && values[0] != "" { args = append(args, "-f", "-") stdin = values[0] } ob, _ := h.runAndLog(args, stdin) // 安装成功后记录到数据库 release := &models.HelmRelease{ Cluster: h.clusterID, ReleaseName: releaseName, RepoName: repoName, Namespace: namespace, ChartName: chartName, ChartVersion: version, Values: stdin, Status: "installed", Result: string(ob), } _ = release.Save(nil) // 忽略错误,防止影响主流程 return nil } func (h *HelmCmd) UpgradeRelease(ns, name string, values ...string) error { // helm upgrade <release-name> <chart-path-or-name> hr, err := models.GetHelmReleaseByNsAndReleaseName(ns, name, h.clusterID) if err != nil { return fmt.Errorf("get repoName from db failed: %v", err) } args := []string{"upgrade", name, fmt.Sprintf("%s/%s", hr.RepoName, hr.ChartName), "--namespace", ns, "--version", hr.ChartVersion} stdin := "" if len(values) > 0 && values[0] != "" { args = append(args, "-f", "-") stdin = values[0] } out, err := h.runAndLog(args, stdin) // 更新values、result hr.Result = string(out) hr.Values = stdin _ = hr.Save(nil) // 忽略错误,防止影响主流程 if err != nil { return fmt.Errorf("helm upgrade failed: %v, output: %s", err, string(out)) } return nil } func (h *HelmCmd) GetChartValue(repoName, chartName, version string) (string, error) { chartRef := fmt.Sprintf("%s/%s", repoName, chartName) args := []string{"show", "values", chartRef, "--version", version} out, err := h.runAndLog(args, "") if err != nil { return "", fmt.Errorf("helm show values failed: %v, output: %s", err, string(out)) } return string(out), nil } func (h *HelmCmd) GetChartVersions(repoName string, chartName string) ([]string, error) { chartRef := fmt.Sprintf("%s/%s", repoName, chartName) args := []string{"search", "repo", chartRef, "-o", "json", "--versions"} out, err := h.runAndLog(args, "") if err != nil { return nil, fmt.Errorf("helm search repo failed: %v, output: %s", err, string(out)) } var result []struct { Version string `json:"version"` } if err := json.Unmarshal(out, &result); err != nil { return nil, fmt.Errorf("unmarshal helm search output failed: %v, output: %s", err, string(out)) } var versions []string for _, r := range result { versions = append(versions, r.Version) } klog.Infof("helm search result: %v", versions) return versions, nil } func (h *HelmCmd) UpdateReposIndex(ids string) { // 解析ids为数组 idsArray := strings.Split(ids, ",") m := models.HelmRepository{} list, _, err := m.List(nil, func(db *gorm.DB) *gorm.DB { return db.Where("id in ?", idsArray).Find(&m) }) if err != nil { klog.V(6).Infof("get helm repository list error: %v", err) return } // 遍历ids,更新每个repo的index for _, item := range list { _, _ = h.updateRepoByName(item) } } func (h *HelmCmd) UpdateAllReposIndex() { m := models.HelmRepository{} list, _, err := m.List(nil) if err != nil { klog.V(6).Infof("get helm repository list error: %v", err) return } for _, item := range list { _, _ = h.updateRepoByName(item) } } func (h *HelmCmd) GetReleaseList() ([]*models.Release, error) { out, err := h.runAndLog([]string{"list", "-A", "-o", "json"}, "") if err != nil { return nil, fmt.Errorf("helm list failed: %v, output: %s", err, string(out)) } var releases []*models.Release if err := json.Unmarshal(out, &releases); err != nil { return nil, fmt.Errorf("unmarshal helm list output failed: %v, output: %s", err, string(out)) } return releases, nil } func (h *HelmCmd) GetRepoList() ([]*RepoVO, error) { out, err := h.runAndLog([]string{"repo", "list", "-o", "json"}, "") if err != nil { return nil, fmt.Errorf("helm repo list failed: %v, output: %s", err, string(out)) } var list []*RepoVO if err := json.Unmarshal(out, &list); err != nil { return nil, fmt.Errorf("unmarshal helm repo list output failed: %v, output: %s", err, string(out)) } return list, nil } func (h *HelmCmd) GetReleaseNote(ns string, name string) (string, error) { out, err := h.runAndLog([]string{"get", "notes", name, "-n", ns}, "") if err != nil { return "", fmt.Errorf("helm get notes failed: %v, output: %s", err, string(out)) } return string(out), nil } func (h *HelmCmd) GetReleaseNoteWithRevision(ns string, name string, revision string) (string, error) { out, err := h.runAndLog([]string{"get", "notes", name, "-n", ns, "--revision", revision}, "") if err != nil { return "", fmt.Errorf("helm get notes failed: %v, output: %s", err, string(out)) } return string(out), nil } func (h *HelmCmd) GetReleaseValues(ns string, name string) (string, error) { out, err := h.runAndLog([]string{"get", "values", name, "-n", ns, "--all", "-o", "yaml"}, "") if err != nil { return "", fmt.Errorf("helm get values failed: %v, output: %s", err, string(out)) } return string(out), nil } func (h *HelmCmd) GetReleaseValuesWithRevision(ns string, name string, revision string) (string, error) { out, err := h.runAndLog([]string{"get", "values", name, "-n", ns, "--revision", revision, "-o", "yaml"}, "") if err != nil { return "", fmt.Errorf("helm get values failed: %v, output: %s", err, string(out)) } return string(out), nil } func (h *HelmCmd) RemoveRepo(repoName string) error { out, err := h.runAndLog([]string{"repo", "remove", repoName}, "") if err != nil { return fmt.Errorf("helm remove repo failed: %v, output: %s", err, string(out)) } return nil } // InCluster 模式填充参数 func (h *HelmCmd) fillK8sToken() { // 获取 ServiceAccount token tokenBytes, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/token") if err != nil { klog.V(6).Infof("failed to read token: %v", err) } token := strings.TrimSpace(string(tokenBytes)) // 获取 CA 证书路径 caFile := "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" // 获取 API Server 地址 host := os.Getenv("KUBERNETES_SERVICE_HOST") port := os.Getenv("KUBERNETES_SERVICE_PORT") apiServer := fmt.Sprintf("https://%s:%s", host, port) // "helm", "list", // "--kube-token", token, // "--kube-apiserver", apiServer, // "--kube-ca-file", caFile, // "--namespace", "default", h.token = token h.caFile = caFile h.apiServer = apiServer } // ReAddMissingRepo 重新添加丢失的Repo,比如在容器环境中重启了。 func (h *HelmCmd) ReAddMissingRepo() { m := models.HelmRepository{} list, _, err := m.List(nil) if err != nil { klog.V(6).Infof("get helm repository list error: %v", err) return } repoList, err := h.GetRepoList() if err != nil { klog.V(6).Infof("get helm repository list error: %v", err) return } var repos []string for _, vo := range repoList { repos = append(repos, vo.Name) } for _, item := range list { if !slice.Contain(repos, item.Name) { klog.V(6).Infof("helm repository adding %s", item.Name) _ = h.AddOrUpdateRepo(item) } } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/weibaohui/k8m'

If you have feedback or need assistance with the MCP directory API, please join our Discord server