Skip to main content
Glama

Sumanshu Arora

kubernetes.pyβ€’35.7 kB
""" Kubernetes deployment backend for managing deployments on Kubernetes clusters. """ import logging import time import uuid from contextlib import suppress from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional from kubernetes import client, config from kubernetes.client.rest import ApiException from mcp_template.backends import BaseDeploymentBackend from mcp_template.utils.image_utils import normalize_image_name logger = logging.getLogger(__name__) class KubernetesDeploymentService(BaseDeploymentBackend): """Kubernetes deployment service for managing MCP server deployments. This service manages Kubernetes deployments using the official Kubernetes Python client. It supports dynamic pod creation, scaling, and service discovery. """ def __init__( self, namespace: str = "mcp-servers", kubeconfig_path: Optional[str] = None ): """Initialize Kubernetes service. Args: namespace: Kubernetes namespace for deployments kubeconfig_path: Path to kubeconfig file (optional) """ super().__init__() self.namespace = namespace self.kubeconfig_path = kubeconfig_path self._ensure_kubernetes_available() self._ensure_namespace_exists() @property def is_available(self): """ Ensure backend is available """ with suppress(RuntimeError): self._ensure_kubernetes_available() return True return False def _ensure_kubernetes_available(self): """Check if Kubernetes API is available and configure the client.""" try: # Try to load configuration if self.kubeconfig_path: config.load_kube_config(config_file=self.kubeconfig_path) else: try: # Try in-cluster config first (for running inside a pod) config.load_incluster_config() except config.config_exception.ConfigException: # Fall back to local kubeconfig config.load_kube_config() # Initialize API clients self.apps_v1 = client.AppsV1Api() self.core_v1 = client.CoreV1Api() self.autoscaling_v1 = client.AutoscalingV1Api() # Test connection self.core_v1.get_api_resources() logger.info("Connected to Kubernetes API") except Exception as e: logger.error(f"Failed to connect to Kubernetes API: {e}") raise RuntimeError(f"Kubernetes backend unavailable: {e}") def _ensure_namespace_exists(self, dry_run: bool = False): """Ensure the target namespace exists.""" try: if dry_run: logger.info(f"[DRY RUN] Would check/create namespace {self.namespace}") return self.core_v1.read_namespace(name=self.namespace) logger.debug(f"Namespace {self.namespace} exists") except ApiException as e: if e.status == 404: # Create namespace namespace_body = client.V1Namespace( metadata=client.V1ObjectMeta( name=self.namespace, labels={"app.kubernetes.io/managed-by": "mcp-templates"}, ) ) if dry_run: logger.info(f"[DRY RUN] Would create namespace {self.namespace}") else: self.core_v1.create_namespace(body=namespace_body) logger.info(f"Created namespace {self.namespace}") else: raise def _generate_deployment_name(self, template_id: str) -> str: """Generate a unique deployment name.""" # Use template_id as base name, add random suffix for uniqueness safe_name = template_id.lower().replace("_", "-").replace(" ", "-") # Kubernetes names must be DNS-1123 compliant safe_name = "".join(c for c in safe_name if c.isalnum() or c == "-") suffix = str(uuid.uuid4())[:8] return f"{safe_name}-{suffix}" def _create_helm_values( self, template_id: str, config: Dict[str, Any], template_data: Dict[str, Any], k8s_config: Dict[str, Any] = None, ) -> Dict[str, Any]: """Create Helm values from template configuration and Kubernetes configuration.""" # Extract image information if not k8s_config: k8s_config = self._config image_repo = template_data.get("docker_image", template_id) image_repo = normalize_image_name(image_repo) if image_repo and len(image_repo.split(":")) > 1: image_repo = ":".join(image_repo.split(":")[:-1]) image_tag = image_repo.split(":")[-1] else: image_tag = template_data.get("tag", "latest") # Determine MCP server type from transport configuration transport_config = template_data.get("transport", {}) if isinstance(transport_config, dict): # New format: {"default": "http", "supported": [...], "port": 7071} server_type = transport_config.get("default", "http") port = transport_config.get("port", 8080) else: # Legacy format: ["stdio", "http"] or just "http" server_type = "http" # Default to HTTP port = template_data.get("port", 8080) if "stdio" in transport_config: server_type = "stdio" elif "http" in transport_config: port = template_data.get("port", 8080) command = template_data.get("command", []) # Build environment variables for the deployment env_vars = config.get("env", {}).copy() # Add transport-specific environment variables if server_type == "http": env_vars["MCP_TRANSPORT"] = "http" env_vars["MCP_PORT"] = str(port) else: env_vars["MCP_TRANSPORT"] = "stdio" # Build Helm values with defaults and Kubernetes configuration overrides values = { "template_id": template_id, # Add template ID for labeling "image": { "repository": image_repo, "tag": image_tag, "pullPolicy": ( "IfNotPresent" if not template_data.get("pull_image", True) else "Always" ), }, "replicaCount": k8s_config.get("replicas", 1), "mcp": { "type": server_type, "port": port, "command": command, "env": env_vars, # Template environment variables with transport config "config": config, # Template configuration (will be passed as env vars) }, "service": { "type": k8s_config.get("service_type", "ClusterIP"), "port": port, }, "resources": k8s_config.get( "resources", { "requests": {"cpu": "100m", "memory": "128Mi"}, "limits": {"cpu": "500m", "memory": "512Mi"}, }, ), } return values def _render_helm_template( self, deployment_name: str, values: Dict[str, Any] ) -> List[Dict[str, Any]]: """Render Helm chart templates with values.""" # Get chart directory chart_dir = Path(__file__).parent.parent.parent / "charts" / "mcp-server" if not chart_dir.exists(): raise RuntimeError(f"Helm chart not found at {chart_dir}") # Load templates manifests = [] # Template context context = { "Values": values, "Chart": {"Name": "mcp-server", "Version": "0.1.0", "AppVersion": "1.0.0"}, "Release": { "Name": deployment_name, "Namespace": self.namespace, "Service": "mcp-templates", }, } # Simple template rendering (simplified for this implementation) # In a full implementation, you'd use a proper Helm template engine deployment_manifest = self._render_deployment(context) manifests.append(deployment_manifest) if values["mcp"]["type"] == "http": service_manifest = self._render_service(context) manifests.append(service_manifest) if values["mcp"]["config"]: configmap_manifest = self._render_configmap(context) manifests.append(configmap_manifest) return manifests def _render_deployment(self, context: Dict[str, Any]) -> Dict[str, Any]: """Render Deployment manifest.""" values = context["Values"] name = context["Release"]["Name"] namespace = context["Release"]["Namespace"] template_id = values.get("template_id", "unknown") # Create deployment specification deployment = { "apiVersion": "apps/v1", "kind": "Deployment", "metadata": { "name": name, "namespace": namespace, "labels": { "app.kubernetes.io/name": name, "app.kubernetes.io/instance": name, "app.kubernetes.io/managed-by": "mcp-templates", "mcp-template.io/template-name": template_id, }, }, "spec": { "replicas": values["replicaCount"], "selector": { "matchLabels": { "app.kubernetes.io/name": name, "app.kubernetes.io/instance": name, } }, "template": { "metadata": { "labels": { "app.kubernetes.io/name": name, "app.kubernetes.io/instance": name, } }, "spec": { "containers": [ { "name": "mcp-server", "image": f"{values['image']['repository']}:{values['image']['tag']}", "imagePullPolicy": values["image"]["pullPolicy"], "env": [ { "name": "MCP_SERVER_TYPE", "value": values["mcp"]["type"], }, ] + [ {"name": k, "value": str(v)} for k, v in values["mcp"]["env"].items() ], "resources": values["resources"], } ] }, }, }, } # Add HTTP-specific configuration if values["mcp"]["type"] == "http": container = deployment["spec"]["template"]["spec"]["containers"][0] container["ports"] = [ { "name": "http", "containerPort": values["mcp"]["port"], "protocol": "TCP", } ] container["env"].append( {"name": "MCP_PORT", "value": str(values["mcp"]["port"])} ) # Add stdio-specific configuration elif values["mcp"]["type"] == "stdio" and values["mcp"]["command"]: container = deployment["spec"]["template"]["spec"]["containers"][0] container["command"] = values["mcp"]["command"] return deployment def _render_service(self, context: Dict[str, Any]) -> Dict[str, Any]: """Render Service manifest.""" values = context["Values"] name = context["Release"]["Name"] namespace = context["Release"]["Namespace"] return { "apiVersion": "v1", "kind": "Service", "metadata": { "name": name, "namespace": namespace, "labels": { "app.kubernetes.io/name": name, "app.kubernetes.io/instance": name, "app.kubernetes.io/managed-by": "mcp-templates", }, }, "spec": { "type": values["service"]["type"], "ports": [ { "port": values["service"]["port"], "targetPort": "http", "protocol": "TCP", "name": "http", } ], "selector": { "app.kubernetes.io/name": name, "app.kubernetes.io/instance": name, }, }, } def _render_configmap(self, context: Dict[str, Any]) -> Dict[str, Any]: """Render ConfigMap manifest.""" values = context["Values"] name = context["Release"]["Name"] namespace = context["Release"]["Namespace"] return { "apiVersion": "v1", "kind": "ConfigMap", "metadata": { "name": f"{name}-config", "namespace": namespace, "labels": { "app.kubernetes.io/name": name, "app.kubernetes.io/instance": name, "app.kubernetes.io/managed-by": "mcp-templates", }, }, "data": {k: str(v) for k, v in values["mcp"]["config"].items()}, } def deploy_template( self, template_id: str, config: Dict[str, Any], template_data: Dict[str, Any], backend_config: Dict[str, Any], pull_image: bool = True, dry_run: bool = False, ) -> Dict[str, Any]: """Deploy a template to Kubernetes. Args: template_id: Unique identifier for the template config: Template configuration parameters (passed as env vars to container) template_data: Template metadata and configuration backend_config: Any banckend specific configuration pull_image: Whether to pull the container image before deployment dry_run: Whether to performm actual depolyment. False means yes, True means No Returns: Dict containing deployment information """ try: deployment_name = self._generate_deployment_name(template_id) logger.info(f"Deploying template {template_id} as {deployment_name}") # Create Helm values using both template config and Kubernetes config values = self._create_helm_values( template_id, config, template_data, self._config ) values["image"]["pullPolicy"] = "Always" if pull_image else "IfNotPresent" # Render manifests manifests = self._render_helm_template(deployment_name, values) # Apply manifests to cluster created_resources = [] for manifest in manifests: try: if dry_run: logger.info( f"[DRY RUN] Would create {manifest['kind']} {manifest['metadata']['name']} in namespace {manifest['metadata']['namespace']}" ) created_resources.append( (manifest["kind"], manifest["metadata"]["name"]) ) continue if manifest["kind"] == "Deployment": # Convert manifest dict to proper Kubernetes object deployment_obj = client.V1Deployment( api_version=manifest["apiVersion"], kind=manifest["kind"], metadata=client.V1ObjectMeta( name=manifest["metadata"]["name"], namespace=manifest["metadata"]["namespace"], labels=manifest["metadata"]["labels"], ), spec=client.V1DeploymentSpec( replicas=manifest["spec"]["replicas"], selector=client.V1LabelSelector( match_labels=manifest["spec"]["selector"][ "matchLabels" ] ), template=client.V1PodTemplateSpec( metadata=client.V1ObjectMeta( labels=manifest["spec"]["template"]["metadata"][ "labels" ] ), spec=client.V1PodSpec( containers=[ client.V1Container( name=container["name"], image=container["image"], image_pull_policy=container[ "imagePullPolicy" ], env=[ client.V1EnvVar( name=env["name"], value=env["value"], ) for env in container["env"] ], ports=[ client.V1ContainerPort( name=port["name"], container_port=port[ "containerPort" ], protocol=port["protocol"], ) for port in container.get( "ports", [] ) ], resources=client.V1ResourceRequirements( requests=container["resources"].get( "requests" ), limits=container["resources"].get( "limits" ), ), command=container.get("command"), ) for container in manifest["spec"][ "template" ]["spec"]["containers"] ] ), ), ), ) result = self.apps_v1.create_namespaced_deployment( namespace=self.namespace, body=deployment_obj ) created_resources.append(("Deployment", result.metadata.name)) elif manifest["kind"] == "Service": service_obj = client.V1Service( api_version=manifest["apiVersion"], kind=manifest["kind"], metadata=client.V1ObjectMeta( name=manifest["metadata"]["name"], namespace=manifest["metadata"]["namespace"], labels=manifest["metadata"]["labels"], ), spec=client.V1ServiceSpec( type=manifest["spec"]["type"], ports=[ client.V1ServicePort( name=port["name"], port=port["port"], target_port=port["targetPort"], protocol=port["protocol"], ) for port in manifest["spec"]["ports"] ], selector=manifest["spec"]["selector"], ), ) result = self.core_v1.create_namespaced_service( namespace=self.namespace, body=service_obj ) created_resources.append(("Service", result.metadata.name)) elif manifest["kind"] == "ConfigMap": configmap_obj = client.V1ConfigMap( api_version=manifest["apiVersion"], kind=manifest["kind"], metadata=client.V1ObjectMeta( name=manifest["metadata"]["name"], namespace=manifest["metadata"]["namespace"], labels=manifest["metadata"]["labels"], ), data=manifest["data"], ) result = self.core_v1.create_namespaced_config_map( namespace=self.namespace, body=configmap_obj ) created_resources.append(("ConfigMap", result.metadata.name)) except ApiException as e: logger.error(f"Failed to create {manifest['kind']}: {e}") # Cleanup any created resources self._cleanup_resources(created_resources) raise if not dry_run: # Wait for deployment to be ready self._wait_for_deployment_ready(deployment_name) # Get deployment info deployment_info = self._get_deployment_details(deployment_name) else: deployment_info = {"endpoint": None} return { "success": True, "template_id": template_id, "deployment_name": deployment_name, "deployment_id": deployment_name, "namespace": self.namespace, "status": "deployed" if not dry_run else "dry-run", "created_resources": created_resources, "endpoint": deployment_info.get("endpoint"), "replicas": values["replicaCount"], "image": f"{values['image']['repository']}:{values['image']['tag']}", "deployed_at": datetime.now().isoformat(), } except Exception as e: logger.error(f"Failed to deploy template {template_id}: {e}") return { "success": False, "template_id": template_id, "error": str(e), "deployed_at": datetime.now().isoformat(), } def _wait_for_deployment_ready(self, deployment_name: str, timeout: int = 300): """Wait for deployment to be ready.""" start_time = time.time() while time.time() - start_time < timeout: try: deployment = self.apps_v1.read_namespaced_deployment( name=deployment_name, namespace=self.namespace ) if ( deployment.status.ready_replicas and deployment.status.ready_replicas == deployment.spec.replicas ): logger.info(f"Deployment {deployment_name} is ready") return except ApiException: pass time.sleep(5) raise RuntimeError( f"Deployment {deployment_name} did not become ready within {timeout} seconds" ) def _cleanup_resources(self, resources: List[tuple]): """Cleanup created resources on failure.""" for resource_type, resource_name in resources: try: if resource_type == "Deployment": self.apps_v1.delete_namespaced_deployment( name=resource_name, namespace=self.namespace ) elif resource_type == "Service": self.core_v1.delete_namespaced_service( name=resource_name, namespace=self.namespace ) elif resource_type == "ConfigMap": self.core_v1.delete_namespaced_config_map( name=resource_name, namespace=self.namespace ) logger.info(f"Cleaned up {resource_type} {resource_name}") except ApiException as e: logger.warning( f"Failed to cleanup {resource_type} {resource_name}: {e}" ) def _get_deployment_details(self, deployment_name: str) -> Dict[str, Any]: """Get detailed deployment information.""" try: deployment = self.apps_v1.read_namespaced_deployment( name=deployment_name, namespace=self.namespace ) # Extract template name from labels template_name = deployment.metadata.labels.get( "mcp-template.io/template-name", "unknown" ) # Try to get service endpoint and ports endpoint = None ports_display = "unknown" try: service = self.core_v1.read_namespaced_service( name=deployment_name, namespace=self.namespace ) if service.spec.ports and len(service.spec.ports) > 0: svc_port = service.spec.ports[0].port if service.spec.type == "ClusterIP": endpoint = f"http://{service.metadata.name}.{self.namespace}.svc.cluster.local:{svc_port}" ports_display = str(svc_port) elif service.spec.type == "NodePort": node_port = service.spec.ports[0].node_port endpoint = f"http://localhost:{node_port}" ports_display = str(node_port) else: ports_display = str(svc_port) else: ports_display = "unknown" except ApiException: pass # Determine transport type from endpoint transport = "http" if endpoint else "stdio" return { "id": deployment.metadata.name, "name": deployment.metadata.name, "template": template_name, "namespace": deployment.metadata.namespace, "replicas": deployment.spec.replicas, "ready_replicas": deployment.status.ready_replicas or 0, "available_replicas": deployment.status.available_replicas or 0, "status": "running" if deployment.status.ready_replicas else "pending", "endpoint": endpoint, "ports": ports_display, "transport": transport, "created": ( deployment.metadata.creation_timestamp.isoformat() if deployment.metadata.creation_timestamp else None ), "backend_type": "kubernetes", } except ApiException as e: return {"error": str(e)} def list_deployments(self) -> List[Dict[str, Any]]: """List Kubernetes deployments.""" try: deployments = self.apps_v1.list_namespaced_deployment( namespace=self.namespace, label_selector="app.kubernetes.io/managed-by=mcp-templates", ) result = [] for deployment in deployments.items: details = self._get_deployment_details(deployment.metadata.name) result.append(details) return result except ApiException as e: logger.error(f"Failed to list deployments: {e}") return [] def delete_deployment(self, deployment_name: str) -> bool: """Delete a Kubernetes deployment.""" try: # Delete deployment try: self.apps_v1.delete_namespaced_deployment( name=deployment_name, namespace=self.namespace ) logger.info(f"Deleted deployment {deployment_name}") except ApiException as e: if e.status != 404: logger.warning( f"Failed to delete deployment {deployment_name}: {e}" ) # Delete service try: self.core_v1.delete_namespaced_service( name=deployment_name, namespace=self.namespace ) logger.info(f"Deleted service {deployment_name}") except ApiException as e: if e.status != 404: logger.warning(f"Failed to delete service {deployment_name}: {e}") # Delete configmap try: self.core_v1.delete_namespaced_config_map( name=f"{deployment_name}-config", namespace=self.namespace ) logger.info(f"Deleted configmap {deployment_name}-config") except ApiException as e: if e.status != 404: logger.warning( f"Failed to delete configmap {deployment_name}-config: {e}" ) return True except Exception as e: logger.error(f"Failed to delete deployment {deployment_name}: {e}") return False def stop_deployment(self, deployment_name: str, force: bool = False) -> bool: """Stop a Kubernetes deployment (scale to 0).""" try: # Scale deployment to 0 replicas deployment = self.apps_v1.read_namespaced_deployment( name=deployment_name, namespace=self.namespace ) deployment.spec.replicas = 0 self.apps_v1.patch_namespaced_deployment( name=deployment_name, namespace=self.namespace, body=deployment ) logger.info(f"Scaled deployment {deployment_name} to 0 replicas") return True except ApiException as e: logger.error(f"Failed to stop deployment {deployment_name}: {e}") return False def get_deployment_info( self, deployment_name: str, include_logs: bool = False, lines: int = 10 ) -> Dict[str, Any]: """Get detailed Kubernetes deployment information.""" try: details = self._get_deployment_details(deployment_name) if include_logs: logs_result = self.get_deployment_logs(deployment_name, lines) details["logs"] = ( logs_result.get("logs", "") if isinstance(logs_result, dict) else logs_result ) return details except Exception as e: return {"error": str(e)} def get_deployment_logs( self, deployment_name: str, lines: int = 10, follow: bool = False, since: str = None, until: str = None, ) -> str: """ Get logs from deployment pods, with support for follow, since, and until. Args: deployment_name: Name of deployment lines: Number of lines to tail follow: Stream logs if True since: Start time (RFC3339 or seconds) until: End time (RFC3339 or seconds) """ try: pods = self.core_v1.list_namespaced_pod( namespace=self.namespace, label_selector=f"app.kubernetes.io/name={deployment_name}", ) if not pods.items: return "No pods found" pod = pods.items[0] kwargs = { "name": pod.metadata.name, "namespace": self.namespace, "tail_lines": lines, } if follow: kwargs["follow"] = True if since: kwargs["since_time"] = since if until: kwargs["until_time"] = until logs = self.core_v1.read_namespaced_pod_log(**kwargs) return {"success": True, "logs": logs} except ApiException: return {"success": False, "logs": ""} def connect_to_deployment(self, deployment_id: str): """Connect to deployment shell (not implemented for Kubernetes).""" raise NotImplementedError( "Shell connection not supported for Kubernetes deployments" ) def cleanup_stopped_containers( self, template_name: Optional[str] = None ) -> Dict[str, Any]: """Clean up stopped containers (scale 0 deployments).""" try: label_selector = "app.kubernetes.io/managed-by=mcp-templates" if template_name: label_selector += f",app.kubernetes.io/name={template_name}" deployments = self.apps_v1.list_namespaced_deployment( namespace=self.namespace, label_selector=label_selector ) cleaned_up = [] total = len(deployments.items) for deployment in deployments.items: if deployment.spec.replicas == 0: if self.delete_deployment(deployment.metadata.name): cleaned_up.append(deployment.metadata.name) return { "success": total == len(cleaned_up), "cleaned_up": cleaned_up, "count": len(cleaned_up), } except Exception as e: return {"success": False, "error": str(e)} def cleanup_dangling_images(self) -> Dict[str, Any]: """Clean up dangling images (not applicable for Kubernetes).""" return {"message": "Image cleanup not applicable for Kubernetes deployments"}

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Data-Everything/mcp-server-templates'

If you have feedback or need assistance with the MCP directory API, please join our Discord server