kubernetes-mcp-server
by manusa
Verified
- kubernetes-mcp-server
- pkg
- kubernetes
package kubernetes
import (
"github.com/fsnotify/fsnotify"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/discovery"
"k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"sigs.k8s.io/yaml"
)
// InClusterConfig is a variable that holds the function to get the in-cluster config
// Exposed for testing
var InClusterConfig = rest.InClusterConfig
type CloseWatchKubeConfig func() error
type Kubernetes struct {
cfg *rest.Config
kubeConfigFiles []string
CloseWatchKubeConfig CloseWatchKubeConfig
clientSet *kubernetes.Clientset
discoveryClient *discovery.DiscoveryClient
deferredDiscoveryRESTMapper *restmapper.DeferredDiscoveryRESTMapper
dynamicClient *dynamic.DynamicClient
}
func NewKubernetes() (*Kubernetes, error) {
cfg, err := resolveClientConfig()
if err != nil {
return nil, err
}
clientSet, err := kubernetes.NewForConfig(cfg)
if err != nil {
return nil, err
}
discoveryClient, err := discovery.NewDiscoveryClientForConfig(cfg)
if err != nil {
return nil, err
}
dynamicClient, err := dynamic.NewForConfig(cfg)
if err != nil {
return nil, err
}
return &Kubernetes{
cfg: cfg,
kubeConfigFiles: resolveConfig().ConfigAccess().GetLoadingPrecedence(),
clientSet: clientSet,
discoveryClient: discoveryClient,
deferredDiscoveryRESTMapper: restmapper.NewDeferredDiscoveryRESTMapper(memory.NewMemCacheClient(discoveryClient)),
dynamicClient: dynamicClient,
}, nil
}
func (k *Kubernetes) WatchKubeConfig(onKubeConfigChange func() error) {
if len(k.kubeConfigFiles) == 0 {
return
}
watcher, err := fsnotify.NewWatcher()
if err != nil {
return
}
for _, file := range k.kubeConfigFiles {
_ = watcher.Add(file)
}
go func() {
for {
select {
case _, ok := <-watcher.Events:
if !ok {
return
}
_ = onKubeConfigChange()
case _, ok := <-watcher.Errors:
if !ok {
return
}
}
}
}()
if k.CloseWatchKubeConfig != nil {
_ = k.CloseWatchKubeConfig()
}
k.CloseWatchKubeConfig = watcher.Close
}
func (k *Kubernetes) Close() {
if k.CloseWatchKubeConfig != nil {
_ = k.CloseWatchKubeConfig()
}
}
func marshal(v any) (string, error) {
switch t := v.(type) {
case []unstructured.Unstructured:
for i := range t {
t[i].SetManagedFields(nil)
}
case []*unstructured.Unstructured:
for i := range t {
t[i].SetManagedFields(nil)
}
case unstructured.Unstructured:
t.SetManagedFields(nil)
case *unstructured.Unstructured:
t.SetManagedFields(nil)
}
ret, err := yaml.Marshal(v)
if err != nil {
return "", err
}
return string(ret), nil
}
func resolveConfig() clientcmd.ClientConfig {
pathOptions := clientcmd.NewDefaultPathOptions()
return clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
&clientcmd.ClientConfigLoadingRules{ExplicitPath: pathOptions.GetDefaultFilename()},
&clientcmd.ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: ""}})
}
func resolveClientConfig() (*rest.Config, error) {
inClusterConfig, err := InClusterConfig()
if err == nil && inClusterConfig != nil {
return inClusterConfig, nil
}
return resolveConfig().ClientConfig()
}
func configuredNamespace() string {
if ns, _, nsErr := resolveConfig().Namespace(); nsErr == nil {
return ns
}
return ""
}
func namespaceOrDefault(namespace string) string {
if namespace == "" {
return configuredNamespace()
}
return namespace
}