Skip to main content
Glama

k8s-mcp-server

client.go22.8 kB
// Package k8s provides a client for interacting with the Kubernetes API. package k8s import ( "bytes" "context" "encoding/json" "fmt" "io" "path/filepath" "strings" "sync" "time" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/yaml" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/discovery" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/homedir" metricsclientset "k8s.io/metrics/pkg/client/clientset/versioned" ) // Client encapsulates Kubernetes client functionality including dynamic, // discovery, and metrics clients. // It also caches API resource information for performance. type Client struct { clientset *kubernetes.Clientset dynamicClient dynamic.Interface discoveryClient *discovery.DiscoveryClient metricsClientset *metricsclientset.Clientset // Add metrics client restConfig *rest.Config apiResourceCache map[string]*schema.GroupVersionResource cacheLock sync.RWMutex } // NewClient creates a new Kubernetes client. // It initializes the standard clientset, dynamic client, discovery client, // and metrics client using the provided kubeconfig path or the default path. // If kubeconfigPath is empty, it defaults to ~/.kube/config. func NewClient(kubeconfigPath string) (*Client, error) { var kubeconfig string if kubeconfigPath != "" { kubeconfig = kubeconfigPath } else if home := homedir.HomeDir(); home != "" { kubeconfig = filepath.Join(home, ".kube", "config") } config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) if err != nil { return nil, fmt.Errorf("failed to create Kubernetes configuration: %w", err) } clientset, err := kubernetes.NewForConfig(config) if err != nil { return nil, fmt.Errorf("failed to create Kubernetes client: %w", err) } dynamicClient, err := dynamic.NewForConfig(config) if err != nil { return nil, fmt.Errorf("failed to create dynamic client: %w", err) } discoveryClient, err := discovery.NewDiscoveryClientForConfig(config) if err != nil { return nil, fmt.Errorf("failed to create discovery client: %w", err) } // Initialize metrics client metricsClient, err := metricsclientset.NewForConfig(config) if err != nil { return nil, fmt.Errorf("failed to create metrics client: %w", err) } return &Client{ clientset: clientset, dynamicClient: dynamicClient, discoveryClient: discoveryClient, metricsClientset: metricsClient, // Assign metrics client restConfig: config, apiResourceCache: make(map[string]*schema.GroupVersionResource), }, nil } // GetAPIResources retrieves all API resource types in the cluster. // It uses the discovery client to fetch server-preferred resources. // Filters resources based on includeNamespaceScoped and includeClusterScoped flags. // Returns a slice of maps, each representing an API resource, or an error. func (c *Client) GetAPIResources(ctx context.Context, includeNamespaceScoped, includeClusterScoped bool) ([]map[string]interface{}, error) { resourceLists, err := c.discoveryClient.ServerPreferredResources() if err != nil && !discovery.IsGroupDiscoveryFailedError(err) { return nil, fmt.Errorf("failed to retrieve API resources: %w", err) } var resources []map[string]interface{} for _, resourceList := range resourceLists { for _, resource := range resourceList.APIResources { if (resource.Namespaced && !includeNamespaceScoped) || (!resource.Namespaced && !includeClusterScoped) { continue } resources = append(resources, map[string]interface{}{ "name": resource.Name, "singularName": resource.SingularName, "namespaced": resource.Namespaced, "kind": resource.Kind, "group": resource.Group, "version": resource.Version, "verbs": resource.Verbs, }) } } return resources, nil } // GetResource retrieves detailed information about a specific resource. // It uses the dynamic client to fetch the resource by kind, name, and namespace. // It utilizes a cached GroupVersionResource (GVR) for efficiency. // Returns the unstructured content of the resource as a map, or an error. func (c *Client) GetResource(ctx context.Context, kind, name, namespace string) (map[string]interface{}, error) { gvr, err := c.getCachedGVR(kind) if err != nil { return nil, err } var obj *unstructured.Unstructured if namespace != "" { obj, err = c.dynamicClient.Resource(*gvr).Namespace(namespace).Get(ctx, name, metav1.GetOptions{}) } else { obj, err = c.dynamicClient.Resource(*gvr).Get(ctx, name, metav1.GetOptions{}) } if err != nil { return nil, fmt.Errorf("failed to retrieve resource: %w", err) } return obj.UnstructuredContent(), nil } // ListResources lists all instances of a specific resource type. // It uses the dynamic client and supports filtering by namespace, labelSelector, // and fieldSelector. // It utilizes a cached GroupVersionResource (GVR) for efficiency. // Returns a slice of maps, each representing a resource instance, or an error. func (c *Client) ListResources(ctx context.Context, kind, namespace, labelSelector, fieldSelector string) ([]map[string]interface{}, error) { gvr, err := c.getCachedGVR(kind) if err != nil { return nil, err } options := metav1.ListOptions{ LabelSelector: labelSelector, FieldSelector: fieldSelector, } var list *unstructured.UnstructuredList if namespace != "" { list, err = c.dynamicClient.Resource(*gvr).Namespace(namespace).List(ctx, options) } else { list, err = c.dynamicClient.Resource(*gvr).List(ctx, options) } if err != nil { return nil, fmt.Errorf("failed to list resources: %w", err) } var resources []map[string]interface{} for _, item := range list.Items { metadata := item.GetLabels() resources = append(resources, map[string]interface{}{ "name": item.GetName(), "kind": item.GetKind(), "namespace": item.GetNamespace(), "labels": metadata, }) } return resources, nil } // CreateOrUpdateResource creates a new resource or updates an existing one. // It parses the provided manifest string into an unstructured object. // It uses the dynamic client to first attempt an update, and if that fails // (e.g., resource not found), it attempts to create the resource. // Requires the resource manifest to include a name. // Returns the unstructured content of the created/updated resource, or an error. func (c *Client) CreateOrUpdateResourceJSON(ctx context.Context, namespace, manifestJSON, kind string) (map[string]interface{}, error) { // Decode JSON into unstructured object directly (no YAML conversion) obj := &unstructured.Unstructured{} if err := json.Unmarshal([]byte(manifestJSON), &obj.Object); err != nil { return nil, fmt.Errorf("failed to parse resource manifest JSON: %w", err) } // Determine the resource GVR gvr, err := c.getCachedGVR(kind) if err != nil { return nil, err } // Check if ns exists _, err = c.clientset.CoreV1().Namespaces().Get(ctx, namespace, metav1.GetOptions{}) if err == nil { fmt.Printf("Namespace %s exists\n", namespace) } if errors.IsNotFound(err) { fmt.Printf("Namespace %s does not exist, creating one\n", namespace) _, err = c.clientset.CoreV1().Namespaces().Create(ctx, &corev1.Namespace{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ "kubernetes.io/metadata.name": namespace, }, Name: namespace, }, Spec: corev1.NamespaceSpec{ Finalizers: []corev1.FinalizerName{corev1.FinalizerKubernetes}, }, Status: corev1.NamespaceStatus{ Phase: corev1.NamespaceActive, Conditions: nil, }, }, metav1.CreateOptions{}) } if err != nil { return nil, fmt.Errorf("failed to retrieve namespace resource: %w", err) } obj.SetNamespace(namespace) if obj.GetName() == "" { return nil, fmt.Errorf("resource name is required") } resource := c.dynamicClient.Resource(*gvr).Namespace(obj.GetNamespace()) // Try to patch; if not found, create rawJSON := []byte(manifestJSON) // manifestJSON is already JSON result, err := resource.Patch( ctx, obj.GetName(), types.MergePatchType, rawJSON, metav1.PatchOptions{}, ) if errors.IsNotFound(err) { result, err = resource.Create(ctx, obj, metav1.CreateOptions{}) } if err != nil { return nil, fmt.Errorf("failed to create or patch resource: %w", err) } return result.UnstructuredContent(), nil } // CreateOrUpdateResourceYAML creates a new resource or updates an existing one from a YAML manifest. // This function is specifically designed for YAML input and provides optimized YAML parsing. // It converts the YAML manifest to JSON internally and then uses the dynamic client // to first attempt an update, and if that fails (e.g., resource not found), it attempts to create the resource. // Requires the resource manifest to include a name. // Returns the unstructured content of the created/updated resource, or an error. // // Parameters: // - ctx: Context for the operation // - namespace: Target namespace for the resource (overrides manifest namespace if provided) // - yamlManifest: YAML manifest string of the Kubernetes resource // - kind: Resource kind (optional, will be inferred from manifest if empty) // // Example YAML manifest: // // apiVersion: v1 // kind: Pod // metadata: // name: my-pod // namespace: default // spec: // containers: // - name: nginx // image: nginx:latest func (c *Client) CreateOrUpdateResourceYAML(ctx context.Context, namespace, yamlManifest, kind string) (map[string]interface{}, error) { // Convert YAML to JSON jsonData, err := yaml.YAMLToJSON([]byte(yamlManifest)) if err != nil { return nil, fmt.Errorf("failed to parse YAML manifest: %w", err) } // Parse the converted JSON into unstructured object obj := &unstructured.Unstructured{} if err := json.Unmarshal(jsonData, &obj.Object); err != nil { return nil, fmt.Errorf("failed to parse converted JSON from YAML manifest: %w", err) } // Determine the resource GVR gvr, err := c.getCachedGVR(kind) if err != nil { return nil, err } // Set namespace if provided (overrides manifest namespace) if namespace != "" { obj.SetNamespace(namespace) } if obj.GetName() == "" { return nil, fmt.Errorf("resource name is required in YAML manifest") } resource := c.dynamicClient.Resource(*gvr).Namespace(obj.GetNamespace()) // Try to patch; if not found, create result, err := resource.Patch( ctx, obj.GetName(), types.MergePatchType, jsonData, metav1.PatchOptions{}, ) if errors.IsNotFound(err) { result, err = resource.Create(ctx, obj, metav1.CreateOptions{}) } if err != nil { return nil, fmt.Errorf("failed to create or patch resource from YAML manifest: %w", err) } return result.UnstructuredContent(), nil } // DeleteResource deletes a specific resource. // It uses the dynamic client to delete the resource by kind, name, and namespace. // It utilizes a cached GroupVersionResource (GVR) for efficiency. // Returns an error if the deletion fails. func (c *Client) DeleteResource(ctx context.Context, kind, name, namespace string) error { gvr, err := c.getCachedGVR(kind) if err != nil { return err } var deleteErr error if namespace != "" { deleteErr = c.dynamicClient.Resource(*gvr).Namespace(namespace).Delete(ctx, name, metav1.DeleteOptions{}) } else { deleteErr = c.dynamicClient.Resource(*gvr).Delete(ctx, name, metav1.DeleteOptions{}) } if deleteErr != nil { return fmt.Errorf("failed to delete resource: %w", deleteErr) } return nil } // getCachedGVR retrieves the GroupVersionResource for a given kind, using a cache for performance func (c *Client) getCachedGVR(kind string) (*schema.GroupVersionResource, error) { c.cacheLock.RLock() if gvr, exists := c.apiResourceCache[kind]; exists { c.cacheLock.RUnlock() return gvr, nil } c.cacheLock.RUnlock() // Cache miss; fetch from discovery client resourceLists, err := c.discoveryClient.ServerPreferredResources() if err != nil && !discovery.IsGroupDiscoveryFailedError(err) { return nil, fmt.Errorf("failed to retrieve API resources: %w", err) } for _, resourceList := range resourceLists { gv, err := schema.ParseGroupVersion(resourceList.GroupVersion) if err != nil { continue } for _, resource := range resourceList.APIResources { if resource.Kind == kind { gvr := &schema.GroupVersionResource{ Group: gv.Group, Version: gv.Version, Resource: resource.Name, } c.cacheLock.Lock() c.apiResourceCache[kind] = gvr c.cacheLock.Unlock() return gvr, nil } } } return nil, fmt.Errorf("resource type %s not found", kind) } // DescribeResource retrieves detailed information about a specific resource, similar to GetResource. // It uses the dynamic client to fetch the resource by kind, name, and namespace. // It utilizes a cached GroupVersionResource (GVR) for efficiency. // Returns the unstructured content of the resource as a map, or an error. // Note: This function currently has the same implementation as GetResource. func (c *Client) DescribeResource(ctx context.Context, kind, name, namespace string) (map[string]interface{}, error) { gvr, err := c.getCachedGVR(kind) if err != nil { return nil, err } var obj *unstructured.Unstructured if namespace != "" { obj, err = c.dynamicClient.Resource(*gvr).Namespace(namespace).Get(ctx, name, metav1.GetOptions{}) } else { obj, err = c.dynamicClient.Resource(*gvr).Get(ctx, name, metav1.GetOptions{}) } if err != nil { return nil, fmt.Errorf("failed to retrieve resource: %w", err) } return obj.UnstructuredContent(), nil } // GetPodsLogs retrieves the logs for a specific pod. // It uses the corev1 clientset to fetch logs, limiting to the last 100 lines by default. // If containerName is provided, it gets logs for that specific container. // If containerName is empty and the pod has multiple containers, it gets logs from all containers. // Returns the logs as a string, or an error. func (c *Client) GetPodsLogs(ctx context.Context, namespace, containerName, podName string) (string, error) { tailLines := int64(100) podLogOptions := &corev1.PodLogOptions{ TailLines: &tailLines, } // If container name is provided, use it if containerName != "" { podLogOptions.Container = containerName req := c.clientset.CoreV1().Pods(namespace).GetLogs(podName, podLogOptions) logs, err := req.Stream(ctx) if err != nil { return "", fmt.Errorf("failed to get logs for container '%s': %w", containerName, err) } defer logs.Close() buf := new(bytes.Buffer) if _, err := io.Copy(buf, logs); err != nil { return "", fmt.Errorf("failed to read logs: %w", err) } return buf.String(), nil } // If no container name provided, first get the pod to check its containers pod, err := c.clientset.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{}) if err != nil { return "", fmt.Errorf("failed to get pod details: %w", err) } // If the pod has only one container, get logs from that container if len(pod.Spec.Containers) == 1 { req := c.clientset.CoreV1().Pods(namespace).GetLogs(podName, podLogOptions) logs, err := req.Stream(ctx) if err != nil { return "", fmt.Errorf("failed to get logs: %w", err) } defer logs.Close() buf := new(bytes.Buffer) if _, err := io.Copy(buf, logs); err != nil { return "", fmt.Errorf("failed to read logs: %w", err) } return buf.String(), nil } // If the pod has multiple containers, get logs from each container var allLogs strings.Builder for _, container := range pod.Spec.Containers { containerLogOptions := podLogOptions.DeepCopy() containerLogOptions.Container = container.Name req := c.clientset.CoreV1().Pods(namespace).GetLogs(podName, containerLogOptions) logs, err := req.Stream(ctx) if err != nil { allLogs.WriteString(fmt.Sprintf("\n--- Error getting logs for container %s: %v ---\n", container.Name, err)) continue } allLogs.WriteString(fmt.Sprintf("\n--- Logs for container %s ---\n", container.Name)) buf := new(bytes.Buffer) _, err = io.Copy(buf, logs) logs.Close() if err != nil { allLogs.WriteString(fmt.Sprintf("Error reading logs: %v\n", err)) } else { allLogs.WriteString(buf.String()) } } return allLogs.String(), nil } // GetPodMetrics retrieves CPU and Memory metrics for a specific pod. // It uses the metrics clientset to fetch pod metrics. // Returns a map containing pod metadata and container metrics, or an error. func (c *Client) GetPodMetrics(ctx context.Context, namespace, podName string) (map[string]interface{}, error) { podMetrics, err := c.metricsClientset.MetricsV1beta1().PodMetricses(namespace).Get(ctx, podName, metav1.GetOptions{}) if err != nil { return nil, fmt.Errorf("failed to get metrics for pod '%s' in namespace '%s': %w", podName, namespace, err) } metricsResult := map[string]interface{}{ "podName": podName, "namespace": namespace, "timestamp": podMetrics.Timestamp.Time, "window": podMetrics.Window.Duration.String(), "containers": []map[string]interface{}{}, } containerMetricsList := []map[string]interface{}{} for _, container := range podMetrics.Containers { containerMetrics := map[string]interface{}{ "name": container.Name, "cpu": container.Usage.Cpu().String(), // Format Quantity "memory": container.Usage.Memory().String(), // Format Quantity } containerMetricsList = append(containerMetricsList, containerMetrics) } metricsResult["containers"] = containerMetricsList return metricsResult, nil } // GetNodeMetrics retrieves CPU and Memory metrics for a specific Node. // It uses the metrics clientset to fetch node metrics. // Returns a map containing node metadata and resource usage, or an error. func (c *Client) GetNodeMetrics(ctx context.Context, nodeName string) (map[string]interface{}, error) { nodeMetrics, err := c.metricsClientset.MetricsV1beta1().NodeMetricses().Get(ctx, nodeName, metav1.GetOptions{}) if err != nil { return nil, fmt.Errorf("failed to get metrics for node '%s': %w", nodeName, err) } metricsResult := map[string]interface{}{ "nodeName": nodeName, "timestamp": nodeMetrics.Timestamp.Time, "window": nodeMetrics.Window.Duration.String(), "usage": map[string]string{ "cpu": nodeMetrics.Usage.Cpu().String(), // Format Quantity "memory": nodeMetrics.Usage.Memory().String(), // Format Quantity }, } return metricsResult, nil } // GetEvents retrieves events for a specific namespace or all namespaces. // It uses the corev1 clientset to fetch events. // Returns a slice of maps, each representing an event, or an error. func (c *Client) GetEvents(ctx context.Context, namespace string) ([]map[string]interface{}, error) { var eventList *corev1.EventList var err error if namespace != "" { eventList, err = c.clientset.CoreV1().Events(namespace).List(ctx, metav1.ListOptions{}) } else { eventList, err = c.clientset.CoreV1().Events("").List(ctx, metav1.ListOptions{}) } if err != nil { return nil, fmt.Errorf("failed to retrieve events: %w", err) } var events []map[string]interface{} for _, event := range eventList.Items { events = append(events, map[string]interface{}{ "name": event.Name, "namespace": event.Namespace, "reason": event.Reason, "message": event.Message, "source": event.Source.Component, "type": event.Type, "count": event.Count, "firstTime": event.FirstTimestamp.Time, "lastTime": event.LastTimestamp.Time, }) } return events, nil } // GetIngresses retrieves ingresses and returns specific fields: name, namespace, hosts, paths, and backend services. // It uses the networking.k8s.io/v1 clientset to fetch ingresses. // Returns a slice of maps, each representing an ingress with the requested fields, or an error. func (c *Client) GetIngresses(ctx context.Context, host string) ([]map[string]interface{}, error) { ingresses, err := c.clientset.NetworkingV1().Ingresses("").List(ctx, metav1.ListOptions{}) if err != nil { return nil, fmt.Errorf("failed to retrieve ingresses: %w", err) } var ingressList []map[string]interface{} for _, ingress := range ingresses.Items { // Check if this ingress has any rules matching the given host hasMatchingHost := false var matchingPaths []string var matchingBackendServices []string for _, rule := range ingress.Spec.Rules { // If host filter is specified, only process rules matching the host if host != "" && rule.Host != host { continue } // If we reach here, either no host filter or host matches if host == "" || rule.Host == host { hasMatchingHost = true if rule.HTTP != nil { for _, path := range rule.HTTP.Paths { matchingPaths = append(matchingPaths, path.Path) // Extract backend service information if path.Backend.Service != nil { matchingBackendServices = append(matchingBackendServices, path.Backend.Service.Name) } } } } } // Only add this ingress if it has matching rules if hasMatchingHost { ingressList = append(ingressList, map[string]interface{}{ "name": ingress.Name, "namespace": ingress.Namespace, "paths": matchingPaths, "backendServices": matchingBackendServices, }) } } return ingressList, nil } // RolloutRestart restarts any Kubernetes workload with a pod template (Deployment, DaemonSet, StatefulSet, etc.). // It patches the spec.template.metadata.annotations with the current timestamp. // Returns the patched resource content or an error if the resource doesn't support rollout restart. func (c *Client) RolloutRestart(ctx context.Context, kind, name, namespace string) (map[string]interface{}, error) { gvr, err := c.getCachedGVR(kind) if err != nil { return nil, fmt.Errorf("failed to get GVR for kind %s: %w", kind, err) } resource := c.dynamicClient.Resource(*gvr).Namespace(namespace) patch := []byte(fmt.Sprintf( `{"spec":{"template":{"metadata":{"annotations":{"kubectl.kubernetes.io/restartedAt":"%s"}}}}}`, time.Now().Format(time.RFC3339), )) result, err := resource.Patch(ctx, name, types.StrategicMergePatchType, patch, metav1.PatchOptions{}) if err != nil { return nil, fmt.Errorf("failed to rollout restart %s %s/%s: %w", kind, namespace, name, err) } content := result.UnstructuredContent() spec, found, _ := unstructured.NestedMap(content, "spec", "template") if !found || spec == nil { return nil, fmt.Errorf("resource kind %s does not support rollout restart (no spec.template)", kind) } return content, nil }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/reza-gholizade/k8s-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server