Skip to main content
Glama

Multi Cluster Kubernetes MCP Server

by feibai406
scaling.py16.3 kB
from typing import Dict, Any, Optional import os from kubernetes import client from ..utils.k8s_client import KubernetesClient # Initialize client with kubeconfig directory from environment or default kubeconfig_dir = os.environ.get('KUBECONFIG_DIR', os.path.expanduser('~/.kube')) k8s_client = KubernetesClient(kubeconfig_dir) async def k8s_scale_resource(context: str, namespace: str, resource_type: str, name: str, replicas: int) -> Dict[str, Any]: """ Scale a deployment, statefulset, or replicaset to the specified number of replicas. Args: context (str): Name of the Kubernetes context to use namespace (str): Namespace where the resource is located resource_type (str): Type of resource (deployment, statefulset, replicaset) name (str): Name of the resource replicas (int): Desired number of replicas Returns: Dict[str, Any]: Information about the scaled resource Raises: RuntimeError: If there's an error accessing the Kubernetes API """ try: api_client = k8s_client.get_api_client(context) apps_v1 = client.AppsV1Api(api_client) resource_type = resource_type.lower() # Validate replica count if replicas < 0: raise ValueError("Replica count cannot be negative") # Handle different resource types if resource_type == "deployment": # Check if the deployment exists first try: deployment = apps_v1.read_namespaced_deployment(name=name, namespace=namespace) except client.rest.ApiException as e: if e.status == 404: raise RuntimeError(f"Deployment '{name}' not found in namespace '{namespace}'") raise # Create a deployment scale object scale = client.V1Scale( api_version="apps/v1", kind="Scale", metadata=client.V1ObjectMeta( name=name, namespace=namespace ), spec=client.V1ScaleSpec( replicas=replicas ) ) # Apply the scale result = apps_v1.patch_namespaced_deployment_scale( name=name, namespace=namespace, body=scale ) return { "resource_type": "deployment", "name": result.metadata.name, "namespace": result.metadata.namespace, "replicas": { "desired": result.spec.replicas, "current": result.status.replicas } } elif resource_type == "statefulset": # Check if the statefulset exists first try: statefulset = apps_v1.read_namespaced_stateful_set(name=name, namespace=namespace) except client.rest.ApiException as e: if e.status == 404: raise RuntimeError(f"StatefulSet '{name}' not found in namespace '{namespace}'") raise # Create a statefulset scale object scale = client.V1Scale( api_version="apps/v1", kind="Scale", metadata=client.V1ObjectMeta( name=name, namespace=namespace ), spec=client.V1ScaleSpec( replicas=replicas ) ) # Apply the scale result = apps_v1.patch_namespaced_stateful_set_scale( name=name, namespace=namespace, body=scale ) return { "resource_type": "statefulset", "name": result.metadata.name, "namespace": result.metadata.namespace, "replicas": { "desired": result.spec.replicas, "current": result.status.replicas } } elif resource_type == "replicaset": # Check if the replicaset exists first try: replicaset = apps_v1.read_namespaced_replica_set(name=name, namespace=namespace) except client.rest.ApiException as e: if e.status == 404: raise RuntimeError(f"ReplicaSet '{name}' not found in namespace '{namespace}'") raise # Create a replicaset scale object scale = client.V1Scale( api_version="apps/v1", kind="Scale", metadata=client.V1ObjectMeta( name=name, namespace=namespace ), spec=client.V1ScaleSpec( replicas=replicas ) ) # Apply the scale result = apps_v1.patch_namespaced_replica_set_scale( name=name, namespace=namespace, body=scale ) return { "resource_type": "replicaset", "name": result.metadata.name, "namespace": result.metadata.namespace, "replicas": { "desired": result.spec.replicas, "current": result.status.replicas } } else: raise ValueError(f"Unsupported resource type: {resource_type}. Supported types: deployment, statefulset, replicaset") except client.rest.ApiException as e: raise RuntimeError(f"Failed to scale {resource_type}: {str(e)}") except Exception as e: raise RuntimeError(f"Failed to scale {resource_type}: {str(e)}") async def k8s_autoscale_resource(context: str, namespace: str, resource_type: str, name: str, min_replicas: int, max_replicas: int, cpu_percent: Optional[int] = 80) -> Dict[str, Any]: """ Configure a Horizontal Pod Autoscaler (HPA) for a deployment, statefulset, or replicaset. Args: context (str): Name of the Kubernetes context to use namespace (str): Namespace where the resource is located resource_type (str): Type of resource (deployment, statefulset, replicaset) name (str): Name of the resource min_replicas (int): Minimum number of replicas max_replicas (int): Maximum number of replicas cpu_percent (int, optional): Target CPU utilization percentage. Defaults to 80. Returns: Dict[str, Any]: Information about the created or updated HPA Raises: RuntimeError: If there's an error accessing the Kubernetes API """ try: api_client = k8s_client.get_api_client(context) autoscaling_v1 = client.AutoscalingV1Api(api_client) resource_type = resource_type.lower() # Validate min and max replicas if min_replicas < 1: raise ValueError("Minimum replicas cannot be less than 1") if max_replicas < min_replicas: raise ValueError("Maximum replicas cannot be less than minimum replicas") if cpu_percent is not None and (cpu_percent < 1 or cpu_percent > 100): raise ValueError("CPU target percentage must be between 1 and 100") # Map resource type to API version and kind api_version = "apps/v1" kind = resource_type.capitalize() # Create an HPA object hpa_metadata = client.V1ObjectMeta( name=name, namespace=namespace ) # Create target reference based on resource type target_ref = client.V1CrossVersionObjectReference( api_version=api_version, kind=kind, name=name ) # Create HPA spec hpa_spec = client.V1HorizontalPodAutoscalerSpec( min_replicas=min_replicas, max_replicas=max_replicas, target_cpu_utilization_percentage=cpu_percent, scale_target_ref=target_ref ) # Create the full HPA object hpa = client.V1HorizontalPodAutoscaler( api_version="autoscaling/v1", kind="HorizontalPodAutoscaler", metadata=hpa_metadata, spec=hpa_spec ) # Try to update an existing HPA first try: # Check if HPA already exists existing_hpa = autoscaling_v1.read_namespaced_horizontal_pod_autoscaler( name=name, namespace=namespace ) # Update existing HPA result = autoscaling_v1.replace_namespaced_horizontal_pod_autoscaler( name=name, namespace=namespace, body=hpa ) update_type = "updated" except client.rest.ApiException as e: if e.status == 404: # Create new HPA if it doesn't exist result = autoscaling_v1.create_namespaced_horizontal_pod_autoscaler( namespace=namespace, body=hpa ) update_type = "created" else: raise # Format the response return { "action": update_type, "name": result.metadata.name, "namespace": result.metadata.namespace, "target": { "kind": result.spec.scale_target_ref.kind, "name": result.spec.scale_target_ref.name, "api_version": result.spec.scale_target_ref.api_version }, "min_replicas": result.spec.min_replicas, "max_replicas": result.spec.max_replicas, "current_replicas": result.status.current_replicas, "target_cpu_percentage": result.spec.target_cpu_utilization_percentage } except client.rest.ApiException as e: raise RuntimeError(f"Failed to configure autoscaler: {str(e)}") except Exception as e: raise RuntimeError(f"Failed to configure autoscaler: {str(e)}") async def k8s_update_resources(context: str, namespace: str, resource_type: str, name: str, container: str, memory_request: Optional[str] = None, memory_limit: Optional[str] = None, cpu_request: Optional[str] = None, cpu_limit: Optional[str] = None) -> Dict[str, Any]: """ Update resource requests and limits for a container in a deployment, statefulset, or daemonset. Args: context (str): Name of the Kubernetes context to use namespace (str): Namespace where the resource is located resource_type (str): Type of resource (deployment, statefulset, daemonset) name (str): Name of the resource container (str): Name of the container to update memory_request (str, optional): Memory request (e.g. "64Mi", "128Mi", "1Gi") memory_limit (str, optional): Memory limit (e.g. "128Mi", "256Mi", "2Gi") cpu_request (str, optional): CPU request (e.g. "100m", "0.5", "1") cpu_limit (str, optional): CPU limit (e.g. "200m", "1", "2") Returns: Dict[str, Any]: Information about the updated resource Raises: RuntimeError: If there's an error accessing the Kubernetes API """ try: api_client = k8s_client.get_api_client(context) apps_v1 = client.AppsV1Api(api_client) resource_type = resource_type.lower() # Validate at least one resource parameter is provided if not any([memory_request, memory_limit, cpu_request, cpu_limit]): raise ValueError("At least one resource limit or request must be specified") # Get the current resource based on type if resource_type == "deployment": current_resource = apps_v1.read_namespaced_deployment(name=name, namespace=namespace) elif resource_type == "statefulset": current_resource = apps_v1.read_namespaced_stateful_set(name=name, namespace=namespace) elif resource_type == "daemonset": current_resource = apps_v1.read_namespaced_daemon_set(name=name, namespace=namespace) else: raise ValueError(f"Unsupported resource type: {resource_type}. Supported types: deployment, statefulset, daemonset") # Find the specified container and update its resources container_found = False for c in current_resource.spec.template.spec.containers: if c.name == container: container_found = True # Create resources object if it doesn't exist if not hasattr(c, 'resources') or c.resources is None: c.resources = client.V1ResourceRequirements( requests={}, limits={} ) # Update resource requests if not hasattr(c.resources, 'requests') or c.resources.requests is None: c.resources.requests = {} if memory_request: c.resources.requests['memory'] = memory_request if cpu_request: c.resources.requests['cpu'] = cpu_request # Update resource limits if not hasattr(c.resources, 'limits') or c.resources.limits is None: c.resources.limits = {} if memory_limit: c.resources.limits['memory'] = memory_limit if cpu_limit: c.resources.limits['cpu'] = cpu_limit break if not container_found: raise ValueError(f"Container '{container}' not found in {resource_type} '{name}'") # Update the resource with the modified container if resource_type == "deployment": result = apps_v1.patch_namespaced_deployment( name=name, namespace=namespace, body=current_resource ) elif resource_type == "statefulset": result = apps_v1.patch_namespaced_stateful_set( name=name, namespace=namespace, body=current_resource ) elif resource_type == "daemonset": result = apps_v1.patch_namespaced_daemon_set( name=name, namespace=namespace, body=current_resource ) # Return information about the updated resource return { "resource_type": resource_type, "name": result.metadata.name, "namespace": result.metadata.namespace, "container": container, "resources": { "requests": { "memory": memory_request or "unchanged", "cpu": cpu_request or "unchanged" }, "limits": { "memory": memory_limit or "unchanged", "cpu": cpu_limit or "unchanged" } }, "message": f"Resource {resource_type}/{name} container {container} updated with new resource constraints" } except client.rest.ApiException as e: if e.status == 404: raise RuntimeError(f"{resource_type.capitalize()} '{name}' not found in namespace '{namespace}'") raise RuntimeError(f"Failed to update resources: {str(e)}") except Exception as e: raise RuntimeError(f"Failed to update resources: {str(e)}")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/feibai406/k8s-multicluster-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server