Skip to main content
Glama
pool.go8.72 kB
package k8s import ( "context" "fmt" "strings" "sync" "time" "github.com/strowk/mcp-k8s-go/internal/k8s/list_mapping" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/discovery/cached/memory" "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/restmapper" "k8s.io/client-go/tools/cache" ) // ClientPool is a pool of Kubernetes clientsets and informers // that can be used to interact with Kubernetes resources. // // It is thread-safe and can be used from multiple goroutines. // It caches the clientsets and informers for each context // to avoid creating them multiple times. type ClientPool interface { GetClientset(k8sContext string) (kubernetes.Interface, error) GetDynamicClient(k8sContext string) (dynamic.Interface, error) GetInformer( k8sCtx string, kind string, group string, version string, ) (informers.GenericInformer, error) GetListMapping(k8sCtx, kind, group, version string) list_mapping.ListMapping } type resolvedResource struct { gvk *schema.GroupVersionKind mapping *meta.RESTMapping informer informers.GenericInformer listMapping list_mapping.ListMapping } type pool struct { clients map[string]kubernetes.Interface dynamicClients map[string]dynamic.Interface getClientsetMutex *sync.Mutex getDynamicClientMutex *sync.Mutex keyToResource map[string]*resolvedResource gvkToResource map[schema.GroupVersionKind]*resolvedResource getInformerMutex *sync.Mutex listMappingResolvers []list_mapping.ListMappingResolver } func NewClientPool(listMappingResolvers []list_mapping.ListMappingResolver) ClientPool { return &pool{ clients: make(map[string]kubernetes.Interface), getClientsetMutex: &sync.Mutex{}, dynamicClients: make(map[string]dynamic.Interface), getDynamicClientMutex: &sync.Mutex{}, keyToResource: make(map[string]*resolvedResource), gvkToResource: make(map[schema.GroupVersionKind]*resolvedResource), getInformerMutex: &sync.Mutex{}, listMappingResolvers: listMappingResolvers, } } func (p *pool) GetInformer( k8sCtx string, kind string, group string, version string, ) (informers.GenericInformer, error) { // creating informer needs to be thread-safe to avoid creating // multiple informers for the same resource p.getInformerMutex.Lock() defer p.getInformerMutex.Unlock() // this looks up if we have a resource with informer already // for exactly the same requested context and "lookup" gvk key := fmt.Sprintf("%s/%s/%s/%s", k8sCtx, kind, group, version) if res, ok := p.keyToResource[key]; ok { return res.informer, nil } // if not, then we resolve gvk and mapping from what server has res, err := p.resolve(k8sCtx, kind, group, version) if err != nil { return nil, err } // it is still possible for this resource to be known already // just with different key, so we check if we have it already, // now by canonical resolved gvk alreadySetupResource, ok := p.gvkToResource[*res.gvk] if ok { // rememeber that this key is resolved to already known resource p.keyToResource[key] = alreadySetupResource // and we can return the informer for it return alreadySetupResource.informer, nil } // if not, then we setup informer and cache it err = res.setupInformer(k8sCtx) if err != nil { return nil, err } p.keyToResource[key] = res p.gvkToResource[*res.gvk] = res return res.informer, nil } func (p *pool) resolve( k8sCtx string, kind string, group string, version string, ) (*resolvedResource, error) { clientset, err := p.GetClientset(k8sCtx) if err != nil { return nil, err } mapper := restmapper.NewDeferredDiscoveryRESTMapper(memory.NewMemCacheClient(clientset.Discovery())) serverPreferredResources, err := clientset.Discovery().ServerPreferredResources() if serverPreferredResources == nil && err != nil { return nil, err } lookupGvk := schema.GroupVersionKind{ Group: strings.ToLower(group), Version: strings.ToLower(version), Kind: strings.ToLower(kind), } var resolvedGvk *schema.GroupVersionKind var resolvedMapping *meta.RESTMapping lookingForResource: for _, r := range serverPreferredResources { for _, apiResource := range r.APIResources { resourceKind := apiResource.Kind resourceGroup := apiResource.Group resourceVersion := apiResource.Version if resourceGroup == "" || resourceVersion == "" { // some resources have empty group or version, which is then present in the containing resource list // for example: apps/v1 // we need to set the group and version to the one from the containing resource list split := strings.SplitN(r.GroupVersion, "/", 2) if len(split) == 2 { resourceGroup = split[0] resourceVersion = split[1] } else { resourceVersion = r.GroupVersion } } if strings.EqualFold(apiResource.Kind, lookupGvk.Kind) { // some resources cannot have correct RESTMapping, but we need to create it // to check if the requested resource matches what we have found in the server // , but we can at first at least look if the kind matches what we are looking for gvk := schema.GroupVersionKind{ Group: resourceGroup, Version: resourceVersion, Kind: resourceKind, } mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version) if err != nil { return nil, fmt.Errorf("failed to get rest mapping for %s: %w", gvk.String(), err) } if strings.EqualFold(mapping.GroupVersionKind.Kind, lookupGvk.Kind) && // if group or version were not specified, we would ignore them when matching // and this would simply match the first resource with matching kind (lookupGvk.Group == "" || strings.EqualFold(mapping.GroupVersionKind.Group, lookupGvk.Group)) && (lookupGvk.Version == "" || strings.EqualFold(mapping.GroupVersionKind.Version, lookupGvk.Version)) { resolvedGvk = &gvk resolvedMapping = mapping break lookingForResource } } } } if resolvedGvk == nil { return nil, fmt.Errorf("resource %s/%s/%s not found", group, version, kind) } return &resolvedResource{ gvk: resolvedGvk, mapping: resolvedMapping, }, nil } func (res *resolvedResource) setupInformer( k8sCtx string, ) error { cfg := GetKubeConfigForContext(k8sCtx) restConfig, err := cfg.ClientConfig() if err != nil { return err } dynClient, err := dynamic.NewForConfig(restConfig) if err != nil { return err } factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynClient, 10*time.Minute, metav1.NamespaceAll, nil) informer := factory.ForResource(res.mapping.Resource) go informer.Informer().Run(context.Background().Done()) isSynced := cache.WaitForCacheSync(context.Background().Done(), informer.Informer().HasSynced) if !isSynced { return fmt.Errorf("informer for resource %s/%s/%s is not synced", res.mapping.GroupVersionKind.Group, res.mapping.GroupVersionKind.Version, res.mapping.GroupVersionKind.Kind) } res.informer = informer return nil } func (p *pool) GetClientset(k8sContext string) (kubernetes.Interface, error) { p.getClientsetMutex.Lock() defer p.getClientsetMutex.Unlock() var effectiveContext string if k8sContext == "" { var err error effectiveContext, err = GetCurrentContext() if err != nil { return nil, err } } else { effectiveContext = k8sContext } if !IsContextAllowed(effectiveContext) { return nil, fmt.Errorf("context %s is not allowed", effectiveContext) } key := effectiveContext if client, ok := p.clients[key]; ok { return client, nil } client, err := getClientset(k8sContext) if err != nil { return nil, err } p.clients[key] = client return client, nil } func getClientset(k8sContext string) (kubernetes.Interface, error) { kubeConfig := GetKubeConfigForContext(k8sContext) config, err := kubeConfig.ClientConfig() if err != nil { return nil, err } clientset, err := kubernetes.NewForConfig(config) if err != nil { return nil, err } return clientset, nil } func (p *pool) GetDynamicClient(k8sContext string) (dynamic.Interface, error) { p.getDynamicClientMutex.Lock() defer p.getDynamicClientMutex.Unlock() if k8sContext == "" { k8sContext = "default" } if client, ok := p.dynamicClients[k8sContext]; ok { return client, nil } kubeConfig := GetKubeConfigForContext(k8sContext) config, err := kubeConfig.ClientConfig() if err != nil { return nil, err } client, err := dynamic.NewForConfig(config) if err != nil { return nil, err } p.dynamicClients[k8sContext] = client return client, nil }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/strowk/mcp-k8s-go'

If you have feedback or need assistance with the MCP directory API, please join our Discord server