Skip to main content
Glama
node_affinity.go12.5 kB
package dynamic import ( "fmt" "strings" "github.com/duke-git/lancet/v2/slice" "github.com/gin-gonic/gin" "github.com/weibaohui/k8m/pkg/comm/utils" "github.com/weibaohui/k8m/pkg/comm/utils/amis" "github.com/weibaohui/kom/kom" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" ) type NodeAffinityController struct{} func RegisterNodeAffinityRoutes(api *gin.RouterGroup) { ctrl := &NodeAffinityController{} api.POST("/:kind/group/:group/version/:version/update_node_affinity/ns/:ns/name/:name", ctrl.UpdateNodeAffinity) api.POST("/:kind/group/:group/version/:version/delete_node_affinity/ns/:ns/name/:name", ctrl.DeleteNodeAffinity) api.POST("/:kind/group/:group/version/:version/add_node_affinity/ns/:ns/name/:name", ctrl.AddNodeAffinity) api.GET("/:kind/group/:group/version/:version/list_node_affinity/ns/:ns/name/:name", ctrl.ListNodeAffinity) } type nodeAffinity struct { Operator string `json:"operator"` Key string `json:"key"` Values []string `json:"values"` } // @Summary 获取节点亲和性配置 // @Security BearerAuth // @Param cluster query string true "集群名称" // @Param kind path string true "资源类型" // @Param group path string true "资源组" // @Param version path string true "资源版本" // @Param ns path string true "命名空间" // @Param name path string true "资源名称" // @Success 200 {array} any // @Router /k8s/cluster/{cluster}/{kind}/group/{group}/version/{version}/list_node_affinity/ns/{ns}/name/{name} [get] func (ac *NodeAffinityController) ListNodeAffinity(c *gin.Context) { name := c.Param("name") ns := c.Param("ns") group := c.Param("group") kind := c.Param("kind") version := c.Param("version") ctx := amis.GetContextWithUser(c) selectedCluster, err := amis.GetSelectedCluster(c) if err != nil { amis.WriteJsonError(c, err) return } // 先获取资源中的定义 var item *unstructured.Unstructured err = kom.Cluster(selectedCluster). WithContext(ctx). CRD(group, version, kind). Namespace(ns).Name(name). Get(&item).Error if err != nil { amis.WriteJsonError(c, err) return } // 获取资源路径 paths, err := getResourcePaths(kind) if err != nil { amis.WriteJsonError(c, err) return } requiredAffinityPath := append(paths, "affinity", "nodeAffinity", "requiredDuringSchedulingIgnoredDuringExecution", "nodeSelectorTerms") // 获取 Affinity 配置并解析 affinity, found, err := unstructured.NestedFieldNoCopy(item.Object, requiredAffinityPath...) if err != nil { amis.WriteJsonError(c, err) return } if !found { // 没有的话,返回一个空列表 amis.WriteJsonList(c, []any{}) return } // 强制转换为数组 nodeSelectorTerms, ok := affinity.([]any) if !ok { amis.WriteJsonError(c, fmt.Errorf("nodeSelectorTerms is not an array")) return } var matchExpressionsList []any // 遍历 nodeSelectorTerms 来提取 matchExpressions for _, term := range nodeSelectorTerms { termMap, ok := term.(map[string]any) if !ok { continue } // 获取 matchExpressions matchExpressions, found, err := unstructured.NestedFieldNoCopy(termMap, "matchExpressions") if err != nil { amis.WriteJsonError(c, err) return } if !found { continue } // 强制转换为数组,并将其添加到 matchExpressionsList matchExpressionsArray, ok := matchExpressions.([]any) if ok { matchExpressionsList = append(matchExpressionsList, matchExpressionsArray...) } } amis.WriteJsonList(c, matchExpressionsList) } // @Summary 添加节点亲和性配置 // @Security BearerAuth // @Param cluster query string true "集群名称" // @Param kind path string true "资源类型" // @Param group path string true "资源组" // @Param version path string true "资源版本" // @Param ns path string true "命名空间" // @Param name path string true "资源名称" // @Param nodeAffinity body nodeAffinity true "节点亲和性配置" // @Success 200 {object} string // @Router /k8s/cluster/{cluster}/{kind}/group/{group}/version/{version}/add_node_affinity/ns/{ns}/name/{name} [post] func (ac *NodeAffinityController) AddNodeAffinity(c *gin.Context) { processNodeAffinity(c, "add") } // @Summary 更新节点亲和性配置 // @Security BearerAuth // @Param cluster query string true "集群名称" // @Param kind path string true "资源类型" // @Param group path string true "资源组" // @Param version path string true "资源版本" // @Param ns path string true "命名空间" // @Param name path string true "资源名称" // @Param nodeAffinity body nodeAffinity true "节点亲和性配置" // @Success 200 {object} string // @Router /k8s/cluster/{cluster}/{kind}/group/{group}/version/{version}/update_node_affinity/ns/{ns}/name/{name} [post] func (ac *NodeAffinityController) UpdateNodeAffinity(c *gin.Context) { processNodeAffinity(c, "modify") } // @Summary 删除节点亲和性配置 // @Security BearerAuth // @Param cluster query string true "集群名称" // @Param kind path string true "资源类型" // @Param group path string true "资源组" // @Param version path string true "资源版本" // @Param ns path string true "命名空间" // @Param name path string true "资源名称" // @Param nodeAffinity body nodeAffinity true "节点亲和性配置" // @Success 200 {object} string // @Router /k8s/cluster/{cluster}/{kind}/group/{group}/version/{version}/delete_node_affinity/ns/{ns}/name/{name} [post] func (ac *NodeAffinityController) DeleteNodeAffinity(c *gin.Context) { processNodeAffinity(c, "delete") } func processNodeAffinity(c *gin.Context, action string) { name := c.Param("name") ns := c.Param("ns") group := c.Param("group") kind := c.Param("kind") version := c.Param("version") ctx := amis.GetContextWithUser(c) selectedCluster, err := amis.GetSelectedCluster(c) if err != nil { amis.WriteJsonError(c, err) return } var info nodeAffinity if err = c.ShouldBindJSON(&info); err != nil { amis.WriteJsonError(c, err) return } // 强制性规则处理。 // operator: In 时,values必须有值。 // operator: Exists DoesNotExist 时,values必须没有值。 if info.Operator == "Exists" || info.Operator == "DoesNotExist" { info.Values = make([]string, 0) } // 先获取资源中的定义 var item *unstructured.Unstructured err = kom.Cluster(selectedCluster). WithContext(ctx). CRD(group, version, kind). Namespace(ns).Name(name). Get(&item).Error if err != nil { amis.WriteJsonError(c, err) return } originalNodeAffinity, err := getNodeSelectorTerms(kind, item, action, info) if err != nil { amis.WriteJsonError(c, err) return } patchData, err := generateRequiredNodeAffinityDynamicPatch(kind, originalNodeAffinity) if err != nil { amis.WriteJsonError(c, err) return } patchJSON := utils.ToJSON(patchData) // 将{}替换为null,为null才会删除 patchJSON = strings.ReplaceAll(patchJSON, "{}", "null") klog.V(6).Infof("UpdateNodeAffinity Patch JSON :\n%s\n", patchJSON) var obj any err = kom.Cluster(selectedCluster). WithContext(ctx). CRD(group, version, kind). Namespace(ns).Name(name). Patch(&obj, types.StrategicMergePatchType, patchJSON).Error amis.WriteJsonErrorOrOK(c, err) } // getNodeSelectorTerms 获取 NodeAffinity 的 NodeSelectorTerms // action : modify\update\add func getNodeSelectorTerms(kind string, item *unstructured.Unstructured, action string, rule nodeAffinity) ([]any, error) { // 获取资源路径 paths, err := getResourcePaths(kind) if err != nil { return nil, err } requiredAffinityPath := append(paths, "affinity", "nodeAffinity", "requiredDuringSchedulingIgnoredDuringExecution", "nodeSelectorTerms") // 获取 Affinity 配置并解析 affinity, found, err := unstructured.NestedFieldNoCopy(item.Object, requiredAffinityPath...) if err != nil { return nil, err } if !found { // 如果没有,那么如果是 add 操作,那么创建一个空的 NodeSelectorTerms affinity = make([]any, 0) } // 强制转换为数组 nodeSelectorTerms, ok := affinity.([]any) if !ok { return nil, fmt.Errorf("nodeSelectorTerms is not an array") } x := utils.ToJSON(affinity) // 如果nodeSelectorTerms被设置为-{},那么json输出为 // [ // {} // ] // 其长度为8 if (len(nodeSelectorTerms) == 0 || len(x) == 8) && action == "add" { nodeSelectorTerms = []any{ map[string]any{ "matchExpressions": []map[string]any{{ "key": rule.Key, "operator": rule.Operator, "values": rule.Values, }, }, }, } return nodeSelectorTerms, nil } // 进行操作:删除或新增 var newNodeSelectorTerms []any for _, term := range nodeSelectorTerms { termMap, ok := term.(map[string]any) if !ok { continue } // 获取 matchExpressions matchExpressions, found, err := unstructured.NestedFieldNoCopy(termMap, "matchExpressions") if err != nil { return nil, err } if !found { continue } // 强制转换为数组 matchExpressionsArray, ok := matchExpressions.([]any) if !ok { continue } // 处理每一个 matchExpression var newMatchExpressions []any for _, expr := range matchExpressionsArray { exprMap, ok := expr.(map[string]any) if !ok { continue } // 比对删除的 key if action == "delete" && exprMap["key"] == rule.Key { // 如果是删除,并且 key 匹配,跳过这个 matchExpression continue } // 如果是修改操作,并且 key 匹配,修改该 matchExpression 的 values if action == "modify" && exprMap["key"] == rule.Key { // 修改新值,不修改原值,直接添加到新列表 newMatchExpressions = append(newMatchExpressions, map[string]any{ "key": rule.Key, "operator": rule.Operator, "values": rule.Values, }) continue } // 否则,保留原有的 matchExpression newMatchExpressions = append(newMatchExpressions, map[string]any{ "key": exprMap["key"], "operator": exprMap["operator"], "values": exprMap["values"], }) } // 如果是新增操作,增加新的 matchExpression if action == "add" { if !slice.ContainBy(newMatchExpressions, func(item any) bool { m := item.(map[string]any) return m["key"] == rule.Key }) { newMatchExpressions = append(newMatchExpressions, map[string]any{ "key": rule.Key, "operator": rule.Operator, "values": rule.Values, }) } } // 将修改后的 matchExpressions 赋值回 termMap // 这里直接将新的 matchExpressions 更新到 termMap 中,不使用 SetNestedField termMap["matchExpressions"] = newMatchExpressions // 将修改后的 termMap 添加到新的 nodeSelectorTerms 列表中 newNodeSelectorTerms = append(newNodeSelectorTerms, termMap) } return newNodeSelectorTerms, nil } // 生成动态的 patch 数据 func generateRequiredNodeAffinityDynamicPatch(kind string, rules []any) (map[string]any, error) { // 打印rules klog.V(6).Infof("generateRequiredNodeAffinityDynamicPatch rules:\n%+v len=%d\n", rules, len(rules)) // 删除时[map[matchExpressions:[]]] len=1 // 先判断是不是要删除,删除层级不一样,单独处理 if utils.ToJSON(rules) == `[ { "matchExpressions": null } ]` { return makeDeleteNodeAffinityDynamicPatch(kind, rules) } // 获取资源路径 paths, err := getResourcePaths(kind) if err != nil { return nil, err } requiredAffinityPath := append(paths, "affinity", "nodeAffinity", "requiredDuringSchedulingIgnoredDuringExecution") // 动态构造 patch 数据 patch := make(map[string]any) current := patch // 按层级动态生成嵌套结构 for _, path := range requiredAffinityPath { if _, exists := current[path]; !exists { current[path] = make(map[string]any) } current = current[path].(map[string]any) } current["nodeSelectorTerms"] = rules return patch, nil } func makeDeleteNodeAffinityDynamicPatch(kind string, rules []any) (map[string]any, error) { // 获取资源路径 paths, err := getResourcePaths(kind) if err != nil { return nil, err } requiredAffinityPath := append(paths, "affinity", "nodeAffinity") // 动态构造 patch 数据 patch := make(map[string]any) current := patch // 按层级动态生成嵌套结构 for _, path := range requiredAffinityPath { if _, exists := current[path]; !exists { current[path] = make(map[string]any) } current = current[path].(map[string]any) } current = nil return patch, nil }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/weibaohui/k8m'

If you have feedback or need assistance with the MCP directory API, please join our Discord server