kubernetes-mcp-server
by manusa
Verified
- kubernetes-mcp-server
- pkg
- kubernetes
package kubernetes
import (
"context"
"github.com/manusa/kubernetes-mcp-server/pkg/version"
authv1 "k8s.io/api/authorization/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/yaml"
"regexp"
"strings"
)
const (
AppKubernetesComponent = "app.kubernetes.io/component"
AppKubernetesManagedBy = "app.kubernetes.io/managed-by"
AppKubernetesName = "app.kubernetes.io/name"
AppKubernetesPartOf = "app.kubernetes.io/part-of"
)
func (k *Kubernetes) ResourcesList(ctx context.Context, gvk *schema.GroupVersionKind, namespace string) (string, error) {
rl, err := k.resourcesList(ctx, gvk, namespace)
if err != nil {
return "", err
}
return marshal(rl.Items)
}
func (k *Kubernetes) ResourcesGet(ctx context.Context, gvk *schema.GroupVersionKind, namespace, name string) (string, error) {
gvr, err := k.resourceFor(gvk)
if err != nil {
return "", err
}
// If it's a namespaced resource and namespace wasn't provided, try to use the default configured one
if namespaced, nsErr := k.isNamespaced(gvk); nsErr == nil && namespaced {
namespace = namespaceOrDefault(namespace)
}
rg, err := k.dynamicClient.Resource(*gvr).Namespace(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return "", err
}
return marshal(rg)
}
func (k *Kubernetes) ResourcesCreateOrUpdate(ctx context.Context, resource string) (string, error) {
separator := regexp.MustCompile(`\r?\n---\r?\n`)
resources := separator.Split(resource, -1)
var parsedResources []*unstructured.Unstructured
for _, r := range resources {
var obj unstructured.Unstructured
if err := yaml.NewYAMLToJSONDecoder(strings.NewReader(r)).Decode(&obj); err != nil {
return "", err
}
parsedResources = append(parsedResources, &obj)
}
return k.resourcesCreateOrUpdate(ctx, parsedResources)
}
func (k *Kubernetes) ResourcesDelete(ctx context.Context, gvk *schema.GroupVersionKind, namespace, name string) error {
gvr, err := k.resourceFor(gvk)
if err != nil {
return err
}
// If it's a namespaced resource and namespace wasn't provided, try to use the default configured one
if namespaced, nsErr := k.isNamespaced(gvk); nsErr == nil && namespaced {
namespace = namespaceOrDefault(namespace)
}
return k.dynamicClient.Resource(*gvr).Namespace(namespace).Delete(ctx, name, metav1.DeleteOptions{})
}
func (k *Kubernetes) resourcesList(ctx context.Context, gvk *schema.GroupVersionKind, namespace string) (*unstructured.UnstructuredList, error) {
gvr, err := k.resourceFor(gvk)
if err != nil {
return nil, err
}
// Check if operation is allowed for all namespaces (applicable for namespaced resources)
isNamespaced, _ := k.isNamespaced(gvk)
if isNamespaced && !k.canIUse(ctx, gvr, namespace, "list") && namespace == "" {
namespace = configuredNamespace()
}
return k.dynamicClient.Resource(*gvr).Namespace(namespace).List(ctx, metav1.ListOptions{})
}
func (k *Kubernetes) resourcesCreateOrUpdate(ctx context.Context, resources []*unstructured.Unstructured) (string, error) {
for i, obj := range resources {
gvk := obj.GroupVersionKind()
gvr, rErr := k.resourceFor(&gvk)
if rErr != nil {
return "", rErr
}
namespace := obj.GetNamespace()
// If it's a namespaced resource and namespace wasn't provided, try to use the default configured one
if namespaced, nsErr := k.isNamespaced(&gvk); nsErr == nil && namespaced {
namespace = namespaceOrDefault(namespace)
}
resources[i], rErr = k.dynamicClient.Resource(*gvr).Namespace(namespace).Apply(ctx, obj.GetName(), obj, metav1.ApplyOptions{
FieldManager: version.BinaryName,
})
if rErr != nil {
return "", rErr
}
// Clear the cache to ensure the next operation is performed on the latest exposed APIs
if gvk.Kind == "CustomResourceDefinition" {
k.deferredDiscoveryRESTMapper.Reset()
}
}
marshalledYaml, err := marshal(resources)
if err != nil {
return "", err
}
return "# The following resources (YAML) have been created or updated successfully\n" + marshalledYaml, nil
}
func (k *Kubernetes) resourceFor(gvk *schema.GroupVersionKind) (*schema.GroupVersionResource, error) {
m, err := k.deferredDiscoveryRESTMapper.RESTMapping(schema.GroupKind{Group: gvk.Group, Kind: gvk.Kind}, gvk.Version)
if err != nil {
return nil, err
}
return &m.Resource, nil
}
func (k *Kubernetes) isNamespaced(gvk *schema.GroupVersionKind) (bool, error) {
apiResourceList, err := k.discoveryClient.ServerResourcesForGroupVersion(gvk.GroupVersion().String())
if err != nil {
return false, err
}
for _, apiResource := range apiResourceList.APIResources {
if apiResource.Kind == gvk.Kind {
return apiResource.Namespaced, nil
}
}
return false, nil
}
func (k *Kubernetes) supportsGroupVersion(groupVersion string) bool {
if _, err := k.discoveryClient.ServerResourcesForGroupVersion(groupVersion); err != nil {
return false
}
return true
}
func (k *Kubernetes) canIUse(ctx context.Context, gvr *schema.GroupVersionResource, namespace, verb string) bool {
response, err := k.clientSet.AuthorizationV1().SelfSubjectAccessReviews().Create(ctx, &authv1.SelfSubjectAccessReview{
Spec: authv1.SelfSubjectAccessReviewSpec{ResourceAttributes: &authv1.ResourceAttributes{
Namespace: namespace,
Verb: verb,
Group: gvr.Group,
Version: gvr.Version,
Resource: gvr.Resource,
}},
}, metav1.CreateOptions{})
if err != nil {
// TODO: maybe return the error too
return false
}
return response.Status.Allowed
}