Skip to main content
Glama

Multi Cluster Kubernetes MCP Server

by feibai406
rollouts.py35.5 kB
from typing import Dict, Any, List, Optional import os import json import datetime from kubernetes import client from ..utils.k8s_client import KubernetesClient # Initialize client with kubeconfig directory from environment or default kubeconfig_dir = os.environ.get('KUBECONFIG_DIR', os.path.expanduser('~/.kube')) k8s_client = KubernetesClient(kubeconfig_dir) async def get_k8s_rollout_status(context: str, namespace: str, resource_type: str, name: str) -> Dict[str, Any]: """ Get the status of a rollout for a deployment, daemonset, or statefulset. Args: context (str): Name of the Kubernetes context to use namespace (str): Namespace where the resource is located resource_type (str): Type of resource (deployment, daemonset, statefulset) name (str): Name of the resource Returns: Dict[str, Any]: Information about the rollout status Raises: RuntimeError: If there's an error accessing the Kubernetes API """ try: api_client = k8s_client.get_api_client(context) apps_v1 = client.AppsV1Api(api_client) resource_type = resource_type.lower() result = {} if resource_type == "deployment": deployment = apps_v1.read_namespaced_deployment(name=name, namespace=namespace) # Get rollout status result = { "name": deployment.metadata.name, "namespace": deployment.metadata.namespace, "generation": deployment.metadata.generation, "observed_generation": deployment.status.observed_generation, "replicas": { "desired": deployment.spec.replicas, "updated": deployment.status.updated_replicas, "ready": deployment.status.ready_replicas, "available": deployment.status.available_replicas, "unavailable": deployment.status.unavailable_replicas }, "conditions": [ { "type": condition.type, "status": condition.status, "reason": condition.reason, "message": condition.message, "last_update": condition.last_update_time.isoformat() if condition.last_update_time else None } for condition in deployment.status.conditions ] if deployment.status.conditions else [] } elif resource_type == "statefulset": statefulset = apps_v1.read_namespaced_stateful_set(name=name, namespace=namespace) result = { "name": statefulset.metadata.name, "namespace": statefulset.metadata.namespace, "generation": statefulset.metadata.generation, "observed_generation": statefulset.status.observed_generation, "replicas": { "desired": statefulset.spec.replicas, "updated": statefulset.status.updated_replicas, "ready": statefulset.status.ready_replicas, "available": getattr(statefulset.status, "available_replicas", None) }, "current_revision": statefulset.status.current_revision, "update_revision": statefulset.status.update_revision, "conditions": [ { "type": condition.type, "status": condition.status, "reason": condition.reason, "message": condition.message, "last_transition": condition.last_transition_time.isoformat() if condition.last_transition_time else None } for condition in statefulset.status.conditions ] if statefulset.status.conditions else [] } elif resource_type == "daemonset": daemonset = apps_v1.read_namespaced_daemon_set(name=name, namespace=namespace) result = { "name": daemonset.metadata.name, "namespace": daemonset.metadata.namespace, "generation": daemonset.metadata.generation, "observed_generation": daemonset.status.observed_generation, "replicas": { "desired": daemonset.status.desired_number_scheduled, "current": daemonset.status.current_number_scheduled, "ready": daemonset.status.number_ready, "available": daemonset.status.number_available, "unavailable": daemonset.status.number_unavailable, "misscheduled": daemonset.status.number_misscheduled }, "conditions": [ { "type": condition.type, "status": condition.status, "reason": condition.reason, "message": condition.message, "last_transition": condition.last_transition_time.isoformat() if condition.last_transition_time else None } for condition in daemonset.status.conditions ] if daemonset.status.conditions else [] } else: raise ValueError(f"Unsupported resource type: {resource_type}. Supported types: deployment, statefulset, daemonset") return result except client.rest.ApiException as e: if e.status == 404: raise RuntimeError(f"{resource_type.capitalize()} '{name}' not found in namespace '{namespace}'") raise RuntimeError(f"Failed to get rollout status: {str(e)}") except Exception as e: raise RuntimeError(f"Failed to get rollout status: {str(e)}") async def get_k8s_rollout_history(context: str, namespace: str, resource_type: str, name: str) -> Dict[str, Any]: """ Get the revision history of a rollout for a deployment, daemonset, or statefulset using the Kubernetes Python client. Args: context (str): Name of the Kubernetes context to use namespace (str): Namespace where the resource is located resource_type (str): Type of resource (deployment, daemonset, statefulset) name (str): Name of the resource Returns: Dict[str, Any]: Information about the rollout history Raises: RuntimeError: If there's an error accessing the Kubernetes API """ try: # Get the API client api_client = k8s_client.get_api_client(context) apps_v1 = client.AppsV1Api(api_client) resource_type = resource_type.lower() if resource_type not in ["deployment", "statefulset", "daemonset"]: raise ValueError(f"Unsupported resource type: {resource_type}. Supported types: deployment, statefulset, daemonset") # Initialize the result structure result = { "resource": { "type": resource_type, "name": name, "namespace": namespace }, "revisions": [] } # Get the resource and its revision history if resource_type == "deployment": # Get the deployment deployment = apps_v1.read_namespaced_deployment(name=name, namespace=namespace) # Get the replica sets controlled by this deployment replica_sets = apps_v1.list_namespaced_replica_set( namespace=namespace, label_selector=f"app={name}" ) # If no specific label selector worked, try to extract from deployment selector if not replica_sets.items and deployment.spec.selector and deployment.spec.selector.match_labels: selector = ",".join([f"{k}={v}" for k, v in deployment.spec.selector.match_labels.items()]) replica_sets = apps_v1.list_namespaced_replica_set( namespace=namespace, label_selector=selector ) # Process the replica sets to extract revision information for rs in replica_sets.items: # Only include replica sets that are part of this deployment if rs.metadata.owner_references and any( ref.name == deployment.metadata.name and ref.kind == "Deployment" for ref in rs.metadata.owner_references ): revision = rs.metadata.annotations.get("deployment.kubernetes.io/revision", "unknown") change_cause = rs.metadata.annotations.get("kubernetes.io/change-cause", "<none>") result["revisions"].append({ "revision": revision, "change_cause": change_cause, "replica_set": rs.metadata.name, "created_at": rs.metadata.creation_timestamp.isoformat() if rs.metadata.creation_timestamp else None, "image": rs.spec.template.spec.containers[0].image if rs.spec.template.spec.containers else None, "ready_replicas": rs.status.ready_replicas }) # Sort revisions by revision number (descending) result["revisions"].sort(key=lambda x: int(x["revision"]) if x["revision"].isdigit() else 0, reverse=True) elif resource_type == "statefulset": # Get the statefulset statefulset = apps_v1.read_namespaced_stateful_set(name=name, namespace=namespace) # For StatefulSets, get the current and update revisions current_revision = statefulset.status.current_revision update_revision = statefulset.status.update_revision result["revisions"].append({ "revision": "current", "revision_hash": current_revision, "change_cause": statefulset.metadata.annotations.get("kubernetes.io/change-cause", "<none>"), "created_at": statefulset.metadata.creation_timestamp.isoformat() if statefulset.metadata.creation_timestamp else None, "image": statefulset.spec.template.spec.containers[0].image if statefulset.spec.template.spec.containers else None }) # Add update revision if different from current if update_revision and update_revision != current_revision: result["revisions"].append({ "revision": "update", "revision_hash": update_revision, "change_cause": statefulset.metadata.annotations.get("kubernetes.io/change-cause", "<none>"), "created_at": statefulset.metadata.creation_timestamp.isoformat() if statefulset.metadata.creation_timestamp else None, "image": statefulset.spec.template.spec.containers[0].image if statefulset.spec.template.spec.containers else None }) elif resource_type == "daemonset": # Get the daemonset daemonset = apps_v1.read_namespaced_daemon_set(name=name, namespace=namespace) # For DaemonSets, we can look at the controller revision history # But this requires additional API calls to get the ControllerRevision objects core_v1 = client.CoreV1Api(api_client) # Get controller revisions for this daemonset label_selector = ",".join([f"{k}={v}" for k, v in daemonset.spec.selector.match_labels.items()]) if daemonset.spec.selector and daemonset.spec.selector.match_labels else None if label_selector: # Get ControllerRevision objects try: apps_api = client.AppsV1Api(api_client) controller_revisions = apps_api.list_namespaced_controller_revision( namespace=namespace, label_selector=label_selector ) for rev in controller_revisions.items: # Only include revisions that are part of this daemonset if rev.metadata.owner_references and any( ref.name == daemonset.metadata.name and ref.kind == "DaemonSet" for ref in rev.metadata.owner_references ): result["revisions"].append({ "revision": str(rev.revision), "change_cause": rev.metadata.annotations.get("kubernetes.io/change-cause", "<none>"), "created_at": rev.metadata.creation_timestamp.isoformat() if rev.metadata.creation_timestamp else None }) # Sort revisions by revision number (descending) result["revisions"].sort(key=lambda x: int(x["revision"]) if x["revision"].isdigit() else 0, reverse=True) except Exception as e: # If we can't get controller revisions, at least return the current revision result["revisions"].append({ "revision": "current", "change_cause": daemonset.metadata.annotations.get("kubernetes.io/change-cause", "<none>"), "created_at": daemonset.metadata.creation_timestamp.isoformat() if daemonset.metadata.creation_timestamp else None, "image": daemonset.spec.template.spec.containers[0].image if daemonset.spec.template.spec.containers else None }) else: # If no label selector available, return just the current revision result["revisions"].append({ "revision": "current", "change_cause": daemonset.metadata.annotations.get("kubernetes.io/change-cause", "<none>"), "created_at": daemonset.metadata.creation_timestamp.isoformat() if daemonset.metadata.creation_timestamp else None, "image": daemonset.spec.template.spec.containers[0].image if daemonset.spec.template.spec.containers else None }) return result except client.rest.ApiException as e: if e.status == 404: raise RuntimeError(f"{resource_type.capitalize()} '{name}' not found in namespace '{namespace}'") raise RuntimeError(f"Failed to get rollout history: {str(e)}") except Exception as e: raise RuntimeError(f"Failed to get rollout history: {str(e)}") async def k8s_rollout_undo(context: str, namespace: str, resource_type: str, name: str, to_revision: Optional[int] = None) -> Dict[str, Any]: """ Undo a rollout to a previous revision for a deployment, daemonset, or statefulset. Args: context (str): Name of the Kubernetes context to use namespace (str): Namespace where the resource is located resource_type (str): Type of resource (deployment, daemonset, statefulset) name (str): Name of the resource to_revision (int, optional): The revision to roll back to. If None, rolls back to the previous revision. Returns: Dict[str, Any]: Result of the rollout undo operation Raises: RuntimeError: If there's an error accessing the Kubernetes API """ try: resource_type = resource_type.lower() if resource_type not in ["deployment", "statefulset", "daemonset"]: raise ValueError(f"Unsupported resource type: {resource_type}. Supported types: deployment, statefulset, daemonset") # Get the API client api_client = k8s_client.get_api_client(context) apps_v1 = client.AppsV1Api(api_client) if resource_type == "deployment": # Get current deployment deployment = apps_v1.read_namespaced_deployment(name=name, namespace=namespace) # For deployments, we need to find the target ReplicaSet and apply its template # Get the ReplicaSets associated with this deployment selector = ",".join([f"{k}={v}" for k, v in deployment.spec.selector.match_labels.items()]) replica_sets = apps_v1.list_namespaced_replica_set( namespace=namespace, label_selector=selector ) if to_revision is not None: # Find the specific revision target_rs = None for rs in replica_sets.items: if rs.metadata.annotations and "deployment.kubernetes.io/revision" in rs.metadata.annotations: if rs.metadata.annotations["deployment.kubernetes.io/revision"] == str(to_revision): target_rs = rs break if target_rs: # Update the deployment with the template from the target RS deployment.spec.template = target_rs.spec.template # Add rollback annotation if deployment.spec.template.metadata is None: deployment.spec.template.metadata = client.V1ObjectMeta() if deployment.spec.template.metadata.annotations is None: deployment.spec.template.metadata.annotations = {} deployment.spec.template.metadata.annotations["kubernetes.io/rollback"] = f"to-revision-{to_revision}" deployment.spec.template.metadata.annotations["kubernetes.io/rollback-timestamp"] = datetime.datetime.now().isoformat() apps_v1.patch_namespaced_deployment( name=name, namespace=namespace, body=deployment ) else: raise RuntimeError(f"Could not find ReplicaSet with revision {to_revision}") else: # Find the previous revision sorted_rs = [] for rs in replica_sets.items: if rs.metadata.annotations and "deployment.kubernetes.io/revision" in rs.metadata.annotations: try: revision = int(rs.metadata.annotations["deployment.kubernetes.io/revision"]) sorted_rs.append((revision, rs)) except ValueError: continue sorted_rs.sort(key=lambda x: x[0], reverse=True) if len(sorted_rs) > 1: # The second one is the previous revision previous_rs = sorted_rs[1][1] deployment.spec.template = previous_rs.spec.template # Add rollback annotation if deployment.spec.template.metadata is None: deployment.spec.template.metadata = client.V1ObjectMeta() if deployment.spec.template.metadata.annotations is None: deployment.spec.template.metadata.annotations = {} prev_revision = sorted_rs[1][0] deployment.spec.template.metadata.annotations["kubernetes.io/rollback"] = f"to-revision-{prev_revision}" deployment.spec.template.metadata.annotations["kubernetes.io/rollback-timestamp"] = datetime.datetime.now().isoformat() apps_v1.patch_namespaced_deployment( name=name, namespace=namespace, body=deployment ) else: raise RuntimeError("Could not find previous revision") elif resource_type == "statefulset": # For StatefulSets, we need to get the current and update revisions statefulset = apps_v1.read_namespaced_stateful_set(name=name, namespace=namespace) # Get revision history history_result = await get_k8s_rollout_history(context, namespace, resource_type, name) if to_revision is not None: # Find the target revision target_revision = None for rev in history_result["revisions"]: if rev["revision"] == str(to_revision): target_revision = rev break if not target_revision: raise RuntimeError(f"Could not find revision {to_revision}") else: # Get the previous revision revisions = history_result["revisions"] if len(revisions) < 2: raise RuntimeError("No previous revision found") target_revision = revisions[1] # Second revision is the previous one # We can only do this by updating to the partition strategy # and then rolling back, which is complex in the API # For now, we'll patch with a timestamp to force a rollout patch = { "spec": { "template": { "metadata": { "annotations": { "kubernetes.io/rollback-to": target_revision.get("revision_hash", ""), "kubernetes.io/rollback-timestamp": datetime.datetime.now().isoformat() } } } } } # Apply the patch apps_v1.patch_namespaced_stateful_set( name=name, namespace=namespace, body=patch ) elif resource_type == "daemonset": # For DaemonSets, similar to StatefulSets daemonset = apps_v1.read_namespaced_daemon_set(name=name, namespace=namespace) # Get revision history history_result = await get_k8s_rollout_history(context, namespace, resource_type, name) if to_revision is not None: # Find the target revision target_revision = None for rev in history_result["revisions"]: if rev["revision"] == str(to_revision): target_revision = rev break if not target_revision: raise RuntimeError(f"Could not find revision {to_revision}") else: # Get the previous revision revisions = history_result["revisions"] if len(revisions) < 2: raise RuntimeError("No previous revision found") target_revision = revisions[1] # Second revision is the previous one # We need to locate the ControllerRevision and apply its template # This is simplified and may need enhancement patch = { "spec": { "template": { "metadata": { "annotations": { "kubernetes.io/rollback-to-revision": str(target_revision.get("revision", "")), "kubernetes.io/rollback-timestamp": datetime.datetime.now().isoformat() } } } } } # Apply the patch apps_v1.patch_namespaced_daemon_set( name=name, namespace=namespace, body=patch ) # Return success result return { "success": True, "message": f"Rollout undo initiated for {resource_type}/{name}", "resource": { "type": resource_type, "name": name, "namespace": namespace }, "revision": to_revision if to_revision is not None else "previous" } except client.rest.ApiException as e: if e.status == 404: raise RuntimeError(f"{resource_type.capitalize()} '{name}' not found in namespace '{namespace}'") raise RuntimeError(f"Failed to undo rollout: {str(e)}") except Exception as e: raise RuntimeError(f"Failed to undo rollout: {str(e)}") async def k8s_rollout_restart(context: str, namespace: str, resource_type: str, name: str) -> Dict[str, Any]: """ Restart a rollout for a deployment, daemonset, or statefulset using the Kubernetes Python client. Args: context (str): Name of the Kubernetes context to use namespace (str): Namespace where the resource is located resource_type (str): Type of resource (deployment, daemonset, statefulset) name (str): Name of the resource Returns: Dict[str, Any]: Result of the rollout restart operation Raises: RuntimeError: If there's an error accessing the Kubernetes API """ try: resource_type = resource_type.lower() if resource_type not in ["deployment", "statefulset", "daemonset"]: raise ValueError(f"Unsupported resource type: {resource_type}. Supported types: deployment, statefulset, daemonset") # Get the API client api_client = k8s_client.get_api_client(context) apps_v1 = client.AppsV1Api(api_client) # The restart strategy is to add a restart annotation with the current timestamp restart_annotation = { "spec": { "template": { "metadata": { "annotations": { "kubectl.kubernetes.io/restartedAt": datetime.datetime.now().isoformat() } } } } } # Apply the patch based on resource type if resource_type == "deployment": apps_v1.patch_namespaced_deployment( name=name, namespace=namespace, body=restart_annotation ) elif resource_type == "statefulset": apps_v1.patch_namespaced_stateful_set( name=name, namespace=namespace, body=restart_annotation ) elif resource_type == "daemonset": apps_v1.patch_namespaced_daemon_set( name=name, namespace=namespace, body=restart_annotation ) # Return success result return { "success": True, "message": f"Rollout restart initiated for {resource_type}/{name}", "resource": { "type": resource_type, "name": name, "namespace": namespace } } except client.rest.ApiException as e: if e.status == 404: raise RuntimeError(f"{resource_type.capitalize()} '{name}' not found in namespace '{namespace}'") raise RuntimeError(f"Failed to restart rollout: {str(e)}") except Exception as e: raise RuntimeError(f"Failed to restart rollout: {str(e)}") async def k8s_rollout_pause(context: str, namespace: str, resource_type: str, name: str) -> Dict[str, Any]: """ Pause a rollout for a deployment, daemonset, or statefulset using the Kubernetes Python client. Args: context (str): Name of the Kubernetes context to use namespace (str): Namespace where the resource is located resource_type (str): Type of resource (deployment, daemonset, statefulset) name (str): Name of the resource Returns: Dict[str, Any]: Result of the rollout pause operation Raises: RuntimeError: If there's an error accessing the Kubernetes API """ try: resource_type = resource_type.lower() if resource_type not in ["deployment", "statefulset", "daemonset"]: raise ValueError(f"Unsupported resource type: {resource_type}. Supported types: deployment, statefulset, daemonset") # Get the API client api_client = k8s_client.get_api_client(context) apps_v1 = client.AppsV1Api(api_client) if resource_type == "deployment": # For deployments, pausing means setting paused=True in the spec patch = { "spec": { "paused": True } } # Apply the patch apps_v1.patch_namespaced_deployment( name=name, namespace=namespace, body=patch ) elif resource_type == "statefulset": # StatefulSets don't have a direct pause mechanism in the API # The common approach is to adjust the partition so no further updates happen statefulset = apps_v1.read_namespaced_stateful_set(name=name, namespace=namespace) # Set the partition to the current replica count to prevent further updates partition = statefulset.spec.replicas patch = { "spec": { "updateStrategy": { "type": "RollingUpdate", "rollingUpdate": { "partition": partition } } } } # Apply the patch apps_v1.patch_namespaced_stateful_set( name=name, namespace=namespace, body=patch ) elif resource_type == "daemonset": # DaemonSets don't have a direct pause mechanism either # We'll use a similar approach to StatefulSets with maxUnavailable=0 patch = { "spec": { "updateStrategy": { "type": "RollingUpdate", "rollingUpdate": { "maxUnavailable": 0 } } } } # Apply the patch apps_v1.patch_namespaced_daemon_set( name=name, namespace=namespace, body=patch ) # Return success result return { "success": True, "message": f"Rollout paused for {resource_type}/{name}", "resource": { "type": resource_type, "name": name, "namespace": namespace } } except client.rest.ApiException as e: if e.status == 404: raise RuntimeError(f"{resource_type.capitalize()} '{name}' not found in namespace '{namespace}'") raise RuntimeError(f"Failed to pause rollout: {str(e)}") except Exception as e: raise RuntimeError(f"Failed to pause rollout: {str(e)}") async def k8s_rollout_resume(context: str, namespace: str, resource_type: str, name: str) -> Dict[str, Any]: """ Resume a rollout for a deployment, daemonset, or statefulset using the Kubernetes Python client. Args: context (str): Name of the Kubernetes context to use namespace (str): Namespace where the resource is located resource_type (str): Type of resource (deployment, daemonset, statefulset) name (str): Name of the resource Returns: Dict[str, Any]: Result of the rollout resume operation Raises: RuntimeError: If there's an error accessing the Kubernetes API """ try: resource_type = resource_type.lower() if resource_type not in ["deployment", "statefulset", "daemonset"]: raise ValueError(f"Unsupported resource type: {resource_type}. Supported types: deployment, statefulset, daemonset") # Get the API client api_client = k8s_client.get_api_client(context) apps_v1 = client.AppsV1Api(api_client) if resource_type == "deployment": # For deployments, resuming means setting paused=False in the spec patch = { "spec": { "paused": False } } # Apply the patch apps_v1.patch_namespaced_deployment( name=name, namespace=namespace, body=patch ) elif resource_type == "statefulset": # Resume a StatefulSet by resetting the partition to 0 patch = { "spec": { "updateStrategy": { "type": "RollingUpdate", "rollingUpdate": { "partition": 0 } } } } # Apply the patch apps_v1.patch_namespaced_stateful_set( name=name, namespace=namespace, body=patch ) elif resource_type == "daemonset": # Resume a DaemonSet by resetting maxUnavailable to 1 (default) patch = { "spec": { "updateStrategy": { "type": "RollingUpdate", "rollingUpdate": { "maxUnavailable": 1 } } } } # Apply the patch apps_v1.patch_namespaced_daemon_set( name=name, namespace=namespace, body=patch ) # Return success result return { "success": True, "message": f"Rollout resumed for {resource_type}/{name}", "resource": { "type": resource_type, "name": name, "namespace": namespace } } except client.rest.ApiException as e: if e.status == 404: raise RuntimeError(f"{resource_type.capitalize()} '{name}' not found in namespace '{namespace}'") raise RuntimeError(f"Failed to resume rollout: {str(e)}") except Exception as e: raise RuntimeError(f"Failed to resume rollout: {str(e)}")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/feibai406/k8s-multicluster-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server