Skip to main content
Glama
manager.go7.63 kB
package kubernetes import ( "context" "errors" "fmt" "os" "strconv" "strings" "github.com/containers/kubernetes-mcp-server/pkg/api" authenticationv1api "k8s.io/api/authentication/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" clientcmdapi "k8s.io/client-go/tools/clientcmd/api" "k8s.io/klog/v2" ) type Manager struct { accessControlClientset *AccessControlClientset config api.BaseConfig } var _ api.Openshift = (*Manager)(nil) var ( ErrorKubeconfigInClusterNotAllowed = errors.New("kubeconfig manager cannot be used in in-cluster deployments") ErrorInClusterNotInCluster = errors.New("in-cluster manager cannot be used outside of a cluster") ) func NewKubeconfigManager(config api.BaseConfig, kubeconfigContext string) (*Manager, error) { if IsInCluster(config) { return nil, ErrorKubeconfigInClusterNotAllowed } pathOptions := clientcmd.NewDefaultPathOptions() if config.GetKubeConfigPath() != "" { pathOptions.LoadingRules.ExplicitPath = config.GetKubeConfigPath() } clientCmdConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig( pathOptions.LoadingRules, &clientcmd.ConfigOverrides{ ClusterInfo: clientcmdapi.Cluster{Server: ""}, CurrentContext: kubeconfigContext, }) restConfig, err := clientCmdConfig.ClientConfig() if err != nil { return nil, fmt.Errorf("failed to create kubernetes rest config from kubeconfig: %v", err) } return NewManager(config, restConfig, clientCmdConfig) } func NewInClusterManager(config api.BaseConfig) (*Manager, error) { if config.GetKubeConfigPath() != "" { return nil, fmt.Errorf("kubeconfig file %s cannot be used with the in-cluster deployments: %v", config.GetKubeConfigPath(), ErrorKubeconfigInClusterNotAllowed) } if !IsInCluster(config) { return nil, ErrorInClusterNotInCluster } restConfig, err := InClusterConfig() if err != nil { return nil, fmt.Errorf("failed to create in-cluster kubernetes rest config: %v", err) } // Create a dummy kubeconfig clientcmdapi.Config for in-cluster config to be used in places where clientcmd.ClientConfig is required clientCmdConfig := clientcmdapi.NewConfig() clientCmdConfig.Clusters["cluster"] = &clientcmdapi.Cluster{ Server: restConfig.Host, InsecureSkipTLSVerify: restConfig.Insecure, } clientCmdConfig.AuthInfos["user"] = &clientcmdapi.AuthInfo{ Token: restConfig.BearerToken, } clientCmdConfig.Contexts[inClusterKubeConfigDefaultContext] = &clientcmdapi.Context{ Cluster: "cluster", AuthInfo: "user", } clientCmdConfig.CurrentContext = inClusterKubeConfigDefaultContext return NewManager(config, restConfig, clientcmd.NewDefaultClientConfig(*clientCmdConfig, nil)) } func NewManager(config api.BaseConfig, restConfig *rest.Config, clientCmdConfig clientcmd.ClientConfig) (*Manager, error) { if config == nil { return nil, errors.New("config cannot be nil") } if restConfig == nil { return nil, errors.New("restConfig cannot be nil") } if clientCmdConfig == nil { return nil, errors.New("clientCmdConfig cannot be nil") } // Apply QPS and Burst from environment variables if set (primarily for testing) applyRateLimitFromEnv(restConfig) k8s := &Manager{ config: config, } var err error // TODO: Won't work because not all client-go clients use the shared context (e.g. discovery client uses context.TODO()) //k8s.restConfig.Wrap(func(original http.RoundTripper) http.RoundTripper { // return &impersonateRoundTripper{original} //}) k8s.accessControlClientset, err = NewAccessControlClientset(k8s.config, clientCmdConfig, restConfig) if err != nil { return nil, err } return k8s, nil } func (m *Manager) VerifyToken(ctx context.Context, token, audience string) (*authenticationv1api.UserInfo, []string, error) { tokenReviewClient := m.accessControlClientset.AuthenticationV1().TokenReviews() tokenReview := &authenticationv1api.TokenReview{ TypeMeta: metav1.TypeMeta{ APIVersion: "authentication.k8s.io/v1", Kind: "TokenReview", }, Spec: authenticationv1api.TokenReviewSpec{ Token: token, Audiences: []string{audience}, }, } result, err := tokenReviewClient.Create(ctx, tokenReview, metav1.CreateOptions{}) if err != nil { return nil, nil, fmt.Errorf("failed to create token review: %v", err) } if !result.Status.Authenticated { if result.Status.Error != "" { return nil, nil, fmt.Errorf("token authentication failed: %s", result.Status.Error) } return nil, nil, fmt.Errorf("token authentication failed") } return &result.Status.User, result.Status.Audiences, nil } func (m *Manager) Derived(ctx context.Context) (*Kubernetes, error) { authorization, ok := ctx.Value(OAuthAuthorizationHeader).(string) if !ok || !strings.HasPrefix(authorization, "Bearer ") { if m.config.IsRequireOAuth() { return nil, errors.New("oauth token required") } return &Kubernetes{m.accessControlClientset}, nil } klog.V(5).Infof("%s header found (Bearer), using provided bearer token", OAuthAuthorizationHeader) derivedCfg := &rest.Config{ Host: m.accessControlClientset.RESTConfig().Host, APIPath: m.accessControlClientset.RESTConfig().APIPath, WrapTransport: m.accessControlClientset.RESTConfig().WrapTransport, // Copy only server verification TLS settings (CA bundle and server name) TLSClientConfig: rest.TLSClientConfig{ Insecure: m.accessControlClientset.RESTConfig().Insecure, ServerName: m.accessControlClientset.RESTConfig().ServerName, CAFile: m.accessControlClientset.RESTConfig().CAFile, CAData: m.accessControlClientset.RESTConfig().CAData, }, BearerToken: strings.TrimPrefix(authorization, "Bearer "), // pass custom UserAgent to identify the client UserAgent: CustomUserAgent, QPS: m.accessControlClientset.RESTConfig().QPS, Burst: m.accessControlClientset.RESTConfig().Burst, Timeout: m.accessControlClientset.RESTConfig().Timeout, Impersonate: rest.ImpersonationConfig{}, } clientCmdApiConfig, err := m.accessControlClientset.clientCmdConfig.RawConfig() if err != nil { if m.config.IsRequireOAuth() { klog.Errorf("failed to get kubeconfig: %v", err) return nil, fmt.Errorf("failed to get kubeconfig: %w", err) } return &Kubernetes{m.accessControlClientset}, nil } clientCmdApiConfig.AuthInfos = make(map[string]*clientcmdapi.AuthInfo) derived, err := NewAccessControlClientset(m.config, clientcmd.NewDefaultClientConfig(clientCmdApiConfig, nil), derivedCfg) if err != nil { if m.config.IsRequireOAuth() { klog.Errorf("failed to create derived clientset: %v", err) return nil, fmt.Errorf("failed to create derived clientset: %w", err) } return &Kubernetes{m.accessControlClientset}, nil } return &Kubernetes{derived}, nil } // Invalidate invalidates the cached discovery information. func (m *Manager) Invalidate() { m.accessControlClientset.DiscoveryClient().Invalidate() } // applyRateLimitFromEnv applies QPS and Burst rate limits from environment variables if set. // This is primarily useful for tests to avoid client-side rate limiting. // Environment variables: // - KUBE_CLIENT_QPS: Sets the QPS (queries per second) limit // - KUBE_CLIENT_BURST: Sets the burst limit func applyRateLimitFromEnv(cfg *rest.Config) { if qpsStr := os.Getenv("KUBE_CLIENT_QPS"); qpsStr != "" { if qps, err := strconv.ParseFloat(qpsStr, 32); err == nil { cfg.QPS = float32(qps) } } if burstStr := os.Getenv("KUBE_CLIENT_BURST"); burstStr != "" { if burst, err := strconv.Atoi(burstStr); err == nil { cfg.Burst = burst } } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/containers/kubernetes-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server