Skip to main content
Glama

Kubectl MCP Tool

mcp_server.py40.6 kB
#!/usr/bin/env python3 """ MCP server implementation for kubectl-mcp-tool. """ import json import sys import logging import asyncio import os from typing import Dict, Any, List, Optional, Callable, Awaitable import warnings warnings.filterwarnings( "ignore", category=RuntimeWarning, message=r".*found in sys.modules after import of package.*" ) try: # Import the official MCP SDK with FastMCP from mcp.server.fastmcp import FastMCP except ImportError: logging.error("MCP SDK not found. Installing...") import subprocess try: subprocess.check_call([ sys.executable, "-m", "pip", "install", "mcp>=1.5.0" ]) from mcp.server.fastmcp import FastMCP except Exception as e: logging.error(f"Failed to install MCP SDK: {e}") raise from .natural_language import process_query # Configure logging logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") logger = logging.getLogger("mcp-server") class MCPServer: """MCP server implementation.""" def __init__(self, name: str): """Initialize the MCP server.""" self.name = name # Create a new server instance using the FastMCP API self.server = FastMCP(name=name) # Check for required dependencies self.dependencies_available = self._check_dependencies() if not self.dependencies_available: logger.warning("Some dependencies are missing. Certain operations may not work correctly.") # Register tools using the new FastMCP API self.setup_tools() def setup_tools(self): """Set up the tools for the MCP server.""" @self.server.tool() def get_pods(namespace: Optional[str] = None) -> Dict[str, Any]: """Get all pods in the specified namespace.""" try: from kubernetes import client, config config.load_kube_config() v1 = client.CoreV1Api() if namespace: pods = v1.list_namespaced_pod(namespace) else: pods = v1.list_pod_for_all_namespaces() return { "success": True, "pods": [ { "name": pod.metadata.name, "namespace": pod.metadata.namespace, "status": pod.status.phase, "ip": pod.status.pod_ip } for pod in pods.items ] } except Exception as e: logger.error(f"Error getting pods: {e}") return {"success": False, "error": str(e)} @self.server.tool() def get_namespaces() -> Dict[str, Any]: """Get all Kubernetes namespaces.""" try: from kubernetes import client, config config.load_kube_config() v1 = client.CoreV1Api() namespaces = v1.list_namespace() return { "success": True, "namespaces": [ns.metadata.name for ns in namespaces.items] } except Exception as e: logger.error(f"Error getting namespaces: {e}") return {"success": False, "error": str(e)} @self.server.tool() def get_services(namespace: Optional[str] = None) -> Dict[str, Any]: """Get all services in the specified namespace.""" try: from kubernetes import client, config config.load_kube_config() v1 = client.CoreV1Api() if namespace: services = v1.list_namespaced_service(namespace) else: services = v1.list_service_for_all_namespaces() return { "success": True, "services": [ { "name": svc.metadata.name, "namespace": svc.metadata.namespace, "type": svc.spec.type, "cluster_ip": svc.spec.cluster_ip } for svc in services.items ] } except Exception as e: logger.error(f"Error getting services: {e}") return {"success": False, "error": str(e)} @self.server.tool() def get_nodes() -> Dict[str, Any]: """Get all nodes in the cluster.""" try: from kubernetes import client, config config.load_kube_config() v1 = client.CoreV1Api() nodes = v1.list_node() return { "success": True, "nodes": [ { "name": node.metadata.name, "status": ( "Ready" if any( cond.type == "Ready" and cond.status == "True" for cond in node.status.conditions ) else "NotReady" ), "addresses": [ addr.address for addr in node.status.addresses ], } for node in nodes.items ], } except Exception as e: logger.error(f"Error getting nodes: {e}") return {"success": False, "error": str(e)} @self.server.tool() def get_configmaps(namespace: Optional[str] = None) -> Dict[str, Any]: """Get all ConfigMaps in the specified namespace.""" try: from kubernetes import client, config config.load_kube_config() v1 = client.CoreV1Api() if namespace: cms = v1.list_namespaced_config_map(namespace) else: cms = v1.list_config_map_for_all_namespaces() return { "success": True, "configmaps": [ { "name": cm.metadata.name, "namespace": cm.metadata.namespace, "data": cm.data } for cm in cms.items ] } except Exception as e: logger.error(f"Error getting ConfigMaps: {e}") return {"success": False, "error": str(e)} @self.server.tool() def get_secrets(namespace: Optional[str] = None) -> Dict[str, Any]: """Get all Secrets in the specified namespace.""" try: from kubernetes import client, config config.load_kube_config() v1 = client.CoreV1Api() if namespace: secrets = v1.list_namespaced_secret(namespace) else: secrets = v1.list_secret_for_all_namespaces() return { "success": True, "secrets": [ { "name": secret.metadata.name, "namespace": secret.metadata.namespace, "type": secret.type } for secret in secrets.items ] } except Exception as e: logger.error(f"Error getting Secrets: {e}") return {"success": False, "error": str(e)} @self.server.tool() def install_helm_chart(name: str, chart: str, namespace: str, repo: Optional[str] = None, values: Optional[dict] = None) -> Dict[str, Any]: """Install a Helm chart.""" if not self._check_helm_availability(): return {"success": False, "error": "Helm is not available on this system"} try: import subprocess, tempfile, yaml, os # Handle repo addition as a separate step if provided if repo: try: # Add the repository (assumed format: "repo_name=repo_url") repo_parts = repo.split('=') if len(repo_parts) != 2: return {"success": False, "error": "Repository format should be 'repo_name=repo_url'"} repo_name, repo_url = repo_parts repo_add_cmd = ["helm", "repo", "add", repo_name, repo_url] logger.debug(f"Running command: {' '.join(repo_add_cmd)}") subprocess.check_output(repo_add_cmd, stderr=subprocess.PIPE, text=True) # Update repositories repo_update_cmd = ["helm", "repo", "update"] logger.debug(f"Running command: {' '.join(repo_update_cmd)}") subprocess.check_output(repo_update_cmd, stderr=subprocess.PIPE, text=True) # Use the chart with repo prefix if needed if '/' not in chart: chart = f"{repo_name}/{chart}" except subprocess.CalledProcessError as e: logger.error(f"Error adding Helm repo: {e.stderr if hasattr(e, 'stderr') else str(e)}") return {"success": False, "error": f"Failed to add Helm repo: {e.stderr if hasattr(e, 'stderr') else str(e)}"} # Prepare the install command cmd = ["helm", "install", name, chart, "-n", namespace] # Create namespace if it doesn't exist try: ns_cmd = ["kubectl", "get", "namespace", namespace] subprocess.check_output(ns_cmd, stderr=subprocess.PIPE, text=True) except subprocess.CalledProcessError: logger.info(f"Namespace {namespace} not found, creating it") create_ns_cmd = ["kubectl", "create", "namespace", namespace] try: subprocess.check_output(create_ns_cmd, stderr=subprocess.PIPE, text=True) except subprocess.CalledProcessError as e: logger.error(f"Error creating namespace: {e.stderr if hasattr(e, 'stderr') else str(e)}") return {"success": False, "error": f"Failed to create namespace: {e.stderr if hasattr(e, 'stderr') else str(e)}"} # Handle values file if provided values_file = None try: if values: with tempfile.NamedTemporaryFile("w", delete=False) as f: yaml.dump(values, f) values_file = f.name cmd += ["-f", values_file] # Execute the install command logger.debug(f"Running command: {' '.join(cmd)}") result = subprocess.check_output(cmd, stderr=subprocess.PIPE, text=True) return { "success": True, "message": f"Helm chart {chart} installed as {name} in {namespace}", "details": result } except subprocess.CalledProcessError as e: error_msg = e.stderr if hasattr(e, 'stderr') else str(e) logger.error(f"Error installing Helm chart: {error_msg}") return {"success": False, "error": f"Failed to install Helm chart: {error_msg}"} finally: # Clean up the temporary values file if values_file and os.path.exists(values_file): os.unlink(values_file) except Exception as e: logger.error(f"Unexpected error installing Helm chart: {str(e)}") return {"success": False, "error": f"Unexpected error: {str(e)}"} @self.server.tool() def upgrade_helm_chart(name: str, chart: str, namespace: str, repo: Optional[str] = None, values: Optional[dict] = None) -> Dict[str, Any]: """Upgrade a Helm release.""" if not self._check_helm_availability(): return {"success": False, "error": "Helm is not available on this system"} try: import subprocess, tempfile, yaml, os # Handle repo addition as a separate step if provided if repo: try: # Add the repository (assumed format: "repo_name=repo_url") repo_parts = repo.split('=') if len(repo_parts) != 2: return {"success": False, "error": "Repository format should be 'repo_name=repo_url'"} repo_name, repo_url = repo_parts repo_add_cmd = ["helm", "repo", "add", repo_name, repo_url] logger.debug(f"Running command: {' '.join(repo_add_cmd)}") subprocess.check_output(repo_add_cmd, stderr=subprocess.PIPE, text=True) # Update repositories repo_update_cmd = ["helm", "repo", "update"] logger.debug(f"Running command: {' '.join(repo_update_cmd)}") subprocess.check_output(repo_update_cmd, stderr=subprocess.PIPE, text=True) # Use the chart with repo prefix if needed if '/' not in chart: chart = f"{repo_name}/{chart}" except subprocess.CalledProcessError as e: logger.error(f"Error adding Helm repo: {e.stderr if hasattr(e, 'stderr') else str(e)}") return {"success": False, "error": f"Failed to add Helm repo: {e.stderr if hasattr(e, 'stderr') else str(e)}"} # Prepare the upgrade command cmd = ["helm", "upgrade", name, chart, "-n", namespace] # Handle values file if provided values_file = None try: if values: with tempfile.NamedTemporaryFile("w", delete=False) as f: yaml.dump(values, f) values_file = f.name cmd += ["-f", values_file] # Execute the upgrade command logger.debug(f"Running command: {' '.join(cmd)}") result = subprocess.check_output(cmd, stderr=subprocess.PIPE, text=True) return { "success": True, "message": f"Helm release {name} upgraded with chart {chart} in {namespace}", "details": result } except subprocess.CalledProcessError as e: error_msg = e.stderr if hasattr(e, 'stderr') else str(e) logger.error(f"Error upgrading Helm chart: {error_msg}") return {"success": False, "error": f"Failed to upgrade Helm chart: {error_msg}"} finally: # Clean up the temporary values file if values_file and os.path.exists(values_file): os.unlink(values_file) except Exception as e: logger.error(f"Unexpected error upgrading Helm chart: {str(e)}") return {"success": False, "error": f"Unexpected error: {str(e)}"} @self.server.tool() def uninstall_helm_chart(name: str, namespace: str) -> Dict[str, Any]: """Uninstall a Helm release.""" if not self._check_helm_availability(): return {"success": False, "error": "Helm is not available on this system"} try: import subprocess cmd = ["helm", "uninstall", name, "-n", namespace] logger.debug(f"Running command: {' '.join(cmd)}") try: result = subprocess.check_output(cmd, stderr=subprocess.PIPE, text=True) return { "success": True, "message": f"Helm release {name} uninstalled from {namespace}", "details": result } except subprocess.CalledProcessError as e: error_msg = e.stderr if hasattr(e, 'stderr') else str(e) logger.error(f"Error uninstalling Helm chart: {error_msg}") return {"success": False, "error": f"Failed to uninstall Helm chart: {error_msg}"} except Exception as e: logger.error(f"Unexpected error uninstalling Helm chart: {str(e)}") return {"success": False, "error": f"Unexpected error: {str(e)}"} @self.server.tool() def get_rbac_roles(namespace: Optional[str] = None) -> Dict[str, Any]: """Get all RBAC roles in the specified namespace.""" try: from kubernetes import client, config config.load_kube_config() rbac = client.RbacAuthorizationV1Api() if namespace: roles = rbac.list_namespaced_role(namespace) else: roles = rbac.list_role_for_all_namespaces() return { "success": True, "roles": [role.metadata.name for role in roles.items] } except Exception as e: logger.error(f"Error getting RBAC roles: {e}") return {"success": False, "error": str(e)} @self.server.tool() def get_cluster_roles() -> Dict[str, Any]: """Get all cluster-wide RBAC roles.""" try: from kubernetes import client, config config.load_kube_config() rbac = client.RbacAuthorizationV1Api() roles = rbac.list_cluster_role() return { "success": True, "cluster_roles": [role.metadata.name for role in roles.items] } except Exception as e: logger.error(f"Error getting cluster roles: {e}") return {"success": False, "error": str(e)} @self.server.tool() def get_events(namespace: Optional[str] = None) -> Dict[str, Any]: """Get all events in the specified namespace.""" try: from kubernetes import client, config config.load_kube_config() v1 = client.CoreV1Api() if namespace: events = v1.list_namespaced_event(namespace) else: events = v1.list_event_for_all_namespaces() return { "success": True, "events": [ { "name": event.metadata.name, "namespace": event.metadata.namespace, "type": event.type, "reason": event.reason, "message": event.message } for event in events.items ] } except Exception as e: logger.error(f"Error getting events: {e}") return {"success": False, "error": str(e)} @self.server.tool() def get_resource_usage(namespace: Optional[str] = None) -> Dict[str, Any]: """Get resource usage statistics via kubectl top.""" if not self._check_kubectl_availability(): return {"success": False, "error": "kubectl is not available on this system"} try: import subprocess import json # Get pod resource usage pod_cmd = ["kubectl", "top", "pods", "--no-headers"] if namespace: pod_cmd += ["-n", namespace] else: pod_cmd += ["--all-namespaces"] pod_cmd += ["-o", "json"] # If the cluster doesn't support JSON output format for top command, # fall back to parsing text output try: pod_output = subprocess.check_output(pod_cmd, stderr=subprocess.PIPE, text=True) pod_data = json.loads(pod_output) except (subprocess.CalledProcessError, json.JSONDecodeError): # Fall back to text output and manual parsing pod_cmd = ["kubectl", "top", "pods"] if namespace: pod_cmd += ["-n", namespace] else: pod_cmd += ["--all-namespaces"] pod_output = subprocess.check_output(pod_cmd, stderr=subprocess.PIPE, text=True) pod_data = {"text_output": pod_output} # Get node resource usage try: node_cmd = ["kubectl", "top", "nodes", "--no-headers", "-o", "json"] node_output = subprocess.check_output(node_cmd, stderr=subprocess.PIPE, text=True) node_data = json.loads(node_output) except (subprocess.CalledProcessError, json.JSONDecodeError): # Fall back to text output node_cmd = ["kubectl", "top", "nodes"] node_output = subprocess.check_output(node_cmd, stderr=subprocess.PIPE, text=True) node_data = {"text_output": node_output} return { "success": True, "pod_usage": pod_data, "node_usage": node_data } except subprocess.CalledProcessError as e: error_msg = e.stderr if hasattr(e, 'stderr') else str(e) logger.error(f"Error getting resource usage: {error_msg}") return {"success": False, "error": f"Failed to get resource usage: {error_msg}"} except Exception as e: logger.error(f"Unexpected error getting resource usage: {str(e)}") return {"success": False, "error": f"Unexpected error: {str(e)}"} @self.server.tool() def switch_context(context_name: str) -> Dict[str, Any]: """Switch current kubeconfig context.""" try: import subprocess cmd = ["kubectl", "config", "use-context", context_name] subprocess.check_output(cmd) return {"success": True, "message": f"Switched context to {context_name}"} except Exception as e: logger.error(f"Error switching context: {e}") return {"success": False, "error": str(e)} @self.server.tool() def get_current_context() -> Dict[str, Any]: """Get current kubeconfig context.""" try: import subprocess cmd = ["kubectl", "config", "current-context"] output = subprocess.check_output(cmd, text=True).strip() return {"success": True, "context": output} except Exception as e: logger.error(f"Error getting current context: {e}") return {"success": False, "error": str(e)} @self.server.tool() def kubectl_explain(resource: str) -> Dict[str, Any]: """Explain a Kubernetes resource using kubectl explain.""" try: import subprocess cmd = ["kubectl", "explain", resource] output = subprocess.check_output(cmd, text=True) return {"success": True, "explanation": output} except Exception as e: logger.error(f"Error explaining resource: {e}") return {"success": False, "error": str(e)} @self.server.tool() def get_api_resources() -> Dict[str, Any]: """List Kubernetes API resources.""" try: import subprocess cmd = ["kubectl", "api-resources"] output = subprocess.check_output(cmd, text=True) return {"success": True, "resources": output} except Exception as e: logger.error(f"Error getting api-resources: {e}") return {"success": False, "error": str(e)} @self.server.tool() def health_check() -> Dict[str, Any]: """Check cluster health by pinging the API server.""" try: from kubernetes import client, config config.load_kube_config() v1 = client.CoreV1Api() v1.get_api_resources() return {"success": True, "message": "Cluster API is reachable"} except Exception as e: logger.error(f"Cluster health check failed: {e}") return {"success": False, "error": str(e)} @self.server.tool() def get_pod_events(pod_name: str, namespace: str = "default") -> Dict[str, Any]: """Get events for a specific pod.""" try: from kubernetes import client, config config.load_kube_config() v1 = client.CoreV1Api() field_selector = f"involvedObject.name={pod_name}" events = v1.list_namespaced_event(namespace, field_selector=field_selector) return { "success": True, "events": [ { "name": event.metadata.name, "type": event.type, "reason": event.reason, "message": event.message, "timestamp": event.last_timestamp.isoformat() if event.last_timestamp else None } for event in events.items ] } except Exception as e: logger.error(f"Error getting pod events: {e}") return {"success": False, "error": str(e)} @self.server.tool() def check_pod_health(pod_name: str, namespace: str = "default") -> Dict[str, Any]: """Check the health status of a pod.""" try: from kubernetes import client, config config.load_kube_config() v1 = client.CoreV1Api() pod = v1.read_namespaced_pod(pod_name, namespace) status = pod.status return { "success": True, "phase": status.phase, "conditions": [c.type for c in status.conditions] if status.conditions else [] } except Exception as e: logger.error(f"Error checking pod health: {e}") return {"success": False, "error": str(e)} @self.server.tool() def get_deployments(namespace: Optional[str] = None) -> Dict[str, Any]: """Get all deployments in the specified namespace.""" try: from kubernetes import client, config config.load_kube_config() apps_v1 = client.AppsV1Api() if namespace: deployments = apps_v1.list_namespaced_deployment(namespace) else: deployments = apps_v1.list_deployment_for_all_namespaces() return { "success": True, "deployments": [ { "name": d.metadata.name, "namespace": d.metadata.namespace, "replicas": d.status.replicas } for d in deployments.items ] } except Exception as e: logger.error(f"Error getting deployments: {e}") return {"success": False, "error": str(e)} @self.server.tool() def create_deployment(name: str, image: str, replicas: int, namespace: Optional[str] = "default") -> Dict[str, Any]: """Create a new deployment.""" try: from kubernetes import client, config config.load_kube_config() apps_v1 = client.AppsV1Api() deployment = client.V1Deployment( metadata=client.V1ObjectMeta(name=name), spec=client.V1DeploymentSpec( replicas=replicas, selector=client.V1LabelSelector( match_labels={"app": name} ), template=client.V1PodTemplateSpec( metadata=client.V1ObjectMeta( labels={"app": name} ), spec=client.V1PodSpec( containers=[ client.V1Container( name=name, image=image ) ] ) ) ) ) apps_v1.create_namespaced_deployment( body=deployment, namespace=namespace ) return { "success": True, "message": f"Deployment {name} created successfully" } except Exception as e: logger.error(f"Error creating deployment: {e}") return {"success": False, "error": str(e)} @self.server.tool() def delete_resource(resource_type: str, name: str, namespace: Optional[str] = "default") -> Dict[str, Any]: """Delete a Kubernetes resource.""" try: from kubernetes import client, config config.load_kube_config() if resource_type == "pod": v1 = client.CoreV1Api() v1.delete_namespaced_pod(name=name, namespace=namespace) elif resource_type == "deployment": apps_v1 = client.AppsV1Api() apps_v1.delete_namespaced_deployment(name=name, namespace=namespace) elif resource_type == "service": v1 = client.CoreV1Api() v1.delete_namespaced_service(name=name, namespace=namespace) else: return {"success": False, "error": f"Unsupported resource type: {resource_type}"} return { "success": True, "message": f"{resource_type} {name} deleted successfully" } except Exception as e: logger.error(f"Error deleting resource: {e}") return {"success": False, "error": str(e)} @self.server.tool() def get_logs(pod_name: str, namespace: Optional[str] = "default", container: Optional[str] = None, tail: Optional[int] = None) -> Dict[str, Any]: """Get logs from a pod.""" try: from kubernetes import client, config config.load_kube_config() v1 = client.CoreV1Api() logs = v1.read_namespaced_pod_log( name=pod_name, namespace=namespace, container=container, tail_lines=tail ) return { "success": True, "logs": logs } except Exception as e: logger.error(f"Error getting logs: {e}") return {"success": False, "error": str(e)} @self.server.tool() def port_forward(pod_name: str, local_port: int, pod_port: int, namespace: Optional[str] = "default") -> Dict[str, Any]: """Forward local port to pod port.""" try: import subprocess cmd = [ "kubectl", "port-forward", f"pod/{pod_name}", f"{local_port}:{pod_port}", "-n", namespace ] process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) return { "success": True, "message": f"Port forwarding started: localhost:{local_port} -> {pod_name}:{pod_port}", "process_pid": process.pid } except Exception as e: logger.error(f"Error setting up port forward: {e}") return {"success": False, "error": str(e)} @self.server.tool() def scale_deployment(name: str, replicas: int, namespace: Optional[str] = "default") -> Dict[str, Any]: """Scale a deployment.""" try: from kubernetes import client, config config.load_kube_config() apps_v1 = client.AppsV1Api() # Get the deployment deployment = apps_v1.read_namespaced_deployment( name=name, namespace=namespace ) # Update replicas deployment.spec.replicas = replicas # Apply the update apps_v1.patch_namespaced_deployment( name=name, namespace=namespace, body=deployment ) return { "success": True, "message": f"Deployment {name} scaled to {replicas} replicas" } except Exception as e: logger.error(f"Error scaling deployment: {e}") return {"success": False, "error": str(e)} def _check_dependencies(self) -> bool: """Check for required command-line tools.""" all_available = True for tool in ["kubectl", "helm"]: if not self._check_tool_availability(tool): logger.warning(f"{tool} not found in PATH. Operations requiring {tool} will not work.") all_available = False return all_available def _check_tool_availability(self, tool: str) -> bool: """Check if a specific tool is available.""" try: import subprocess, shutil # Use shutil.which for more reliable cross-platform checking if shutil.which(tool) is None: return False # Also verify it runs correctly if tool == "kubectl": subprocess.check_output([tool, "version", "--client"], stderr=subprocess.PIPE) elif tool == "helm": subprocess.check_output([tool, "version"], stderr=subprocess.PIPE) return True except (subprocess.SubprocessError, FileNotFoundError): return False def _check_kubectl_availability(self) -> bool: """Check if kubectl is available.""" return self._check_tool_availability("kubectl") def _check_helm_availability(self) -> bool: """Check if helm is available.""" return self._check_tool_availability("helm") async def serve_stdio(self): """Serve the MCP server over stdio transport.""" # Add detailed logging for debugging Cursor integration logger.info("Starting MCP server with stdio transport") logger.info(f"Working directory: {os.getcwd()}") logger.info(f"Python executable: {sys.executable}") logger.info(f"Python version: {sys.version}") # Log Kubernetes configuration kube_config = os.environ.get('KUBECONFIG', '~/.kube/config') expanded_path = os.path.expanduser(kube_config) logger.info(f"KUBECONFIG: {kube_config} (expanded: {expanded_path})") if os.path.exists(expanded_path): logger.info(f"Kubernetes config file exists at {expanded_path}") else: logger.warning(f"Kubernetes config file does not exist at {expanded_path}") # Log dependency check results logger.info(f"Dependencies check result: {'All available' if self.dependencies_available else 'Some missing'}") # Continue with normal server startup await self.server.run_stdio_async() async def serve_sse(self, port: int): """Serve the MCP server over SSE transport.""" logger.info(f"Starting MCP server with SSE transport on port {port}") try: # Newer versions of FastMCP expose a keyword argument for the port await self.server.run_sse_async(port=port) except TypeError: # Fall back to the legacy signature that takes no parameters logger.warning( "FastMCP.run_sse_async() does not accept a 'port' parameter in this version. " "Falling back to the default signature (using FastMCP's internal default port)." ) await self.server.run_sse_async() if __name__ == "__main__": import asyncio import argparse # Ensure that logging, os, sys, FastMCP, MCPServer, and the logger instance # are imported or defined earlier in the file as needed. # Based on our previous views, most of these should already be in place. parser = argparse.ArgumentParser(description="Run the Kubectl MCP Server.") parser.add_argument( "--transport", type=str, choices=["stdio", "sse"], default="stdio", help="Communication transport to use (stdio or sse). Default: stdio.", ) parser.add_argument( "--port", type=int, default=8080, help="Port to use for SSE transport. Default: 8080.", ) args = parser.parse_args() server_name = "kubectl_mcp_server" # Ensure MCPServer class is defined above this block mcp_server = MCPServer(name=server_name) # Ensure logger is defined at the module level # logger = logging.getLogger(__name__) # Or however it's set up loop = asyncio.get_event_loop() try: if args.transport == "stdio": logger.info(f"Starting {server_name} with stdio transport.") loop.run_until_complete(mcp_server.serve_stdio()) elif args.transport == "sse": logger.info(f"Starting {server_name} with SSE transport on port {args.port}.") loop.run_until_complete(mcp_server.serve_sse(port=args.port)) except KeyboardInterrupt: logger.info("Server shutdown requested by user.") except Exception as e: logger.error(f"Server exited with error: {e}", exc_info=True) finally: logger.info("Shutting down server.")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/rohitg00/kubectl-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server