Skip to main content
Glama

Kubectl MCP Tool

kubectl_jsonrpc_server.py12.3 kB
#!/usr/bin/env python3 """ Simple JSON-RPC based MCP server for kubectl operations. This server uses standard JSON-RPC 2.0 to handle MCP protocol requests. """ import asyncio import json import os import sys import subprocess import traceback from datetime import datetime # Create logs directory LOG_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "logs") os.makedirs(LOG_DIR, exist_ok=True) # Open debug log file debug_file = open(os.path.join(LOG_DIR, "kubectl_jsonrpc_debug.log"), "w") def log(message): """Write a log message to the debug file and stderr.""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] debug_msg = f"{timestamp} - {message}" print(f"DEBUG: {debug_msg}", file=sys.stderr) debug_file.write(f"{debug_msg}\n") debug_file.flush() log(f"Starting kubectl JSON-RPC server") log(f"Python version: {sys.version}") log(f"Current directory: {os.getcwd()}") # Server information SERVER_NAME = "kubectl-mcp-tool" SERVER_VERSION = "0.1.0" async def read_line(): """Read a line from stdin asynchronously.""" loop = asyncio.get_event_loop() return await loop.run_in_executor(None, sys.stdin.readline) def write_response(response): """Write a response to stdout.""" json_str = json.dumps(response) log(f"Sending response: {json_str}") print(json_str, flush=True) def run_kubectl_command(command): """Run a kubectl command and return the result.""" log(f"Running kubectl command: {command}") try: result = subprocess.run( command, shell=True, check=True, capture_output=True, text=True ) return { "command": command, "output": result.stdout.strip(), "success": True } except subprocess.CalledProcessError as e: log(f"kubectl command failed: {e}") return { "command": command, "output": e.stderr.strip(), "success": False, "error": str(e) } except Exception as e: log(f"Error running kubectl command: {e}") return { "command": command, "output": f"Error: {str(e)}", "success": False, "error": str(e) } def process_natural_language(query): """Process a natural language query for kubectl operations.""" log(f"Processing natural language query: {query}") query_lower = query.lower() if "get pods" in query_lower: command = "kubectl get pods" if "namespace" in query_lower: parts = query_lower.split("namespace") if len(parts) > 1: namespace = parts[1].strip() command = f"kubectl get pods -n {namespace}" return run_kubectl_command(command) elif "namespaces" in query_lower: command = "kubectl get namespaces" return run_kubectl_command(command) elif "current context" in query_lower: command = "kubectl config current-context" return run_kubectl_command(command) else: # Try as direct kubectl command command = f"kubectl {query}" return run_kubectl_command(command) async def run_server(): """Run the JSON-RPC server for MCP.""" log("Starting server loop") try: while True: log("Waiting for message") line = await read_line() if not line: log("Empty line received, exiting") break log(f"Received: {line.strip()}") try: request = json.loads(line) request_id = request.get("id", 0) method = request.get("method") if method == "initialize": # MCP initialization log("Handling initialize method") response = { "jsonrpc": "2.0", "id": request_id, "result": { "name": SERVER_NAME, "version": SERVER_VERSION, "capabilities": { "tools": [ { "name": "process_natural_language", "description": "Process natural language queries for kubectl operations", "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": "Natural language query" } }, "required": ["query"] } }, { "name": "get_pods", "description": "Get all pods in the specified namespace", "parameters": { "type": "object", "properties": { "namespace": { "type": "string", "description": "Optional namespace (default: current namespace)" } } } }, { "name": "get_namespaces", "description": "Get all namespaces", "parameters": { "type": "object", "properties": {} } }, { "name": "run_kubectl_command", "description": "Run a raw kubectl command", "parameters": { "type": "object", "properties": { "command": { "type": "string", "description": "The kubectl command to run" } }, "required": ["command"] } } ] } } } write_response(response) elif method == "callTool": # Tool call handling log("Handling callTool method") params = request.get("params", {}) tool_name = params.get("name") tool_params = params.get("parameters", {}) log(f"Tool call: {tool_name} with params {tool_params}") if tool_name == "process_natural_language": query = tool_params.get("query", "") result = process_natural_language(query) response = { "jsonrpc": "2.0", "id": request_id, "result": result } elif tool_name == "get_pods": namespace = tool_params.get("namespace", "") cmd = f"kubectl get pods" + (f" -n {namespace}" if namespace else "") result = run_kubectl_command(cmd) response = { "jsonrpc": "2.0", "id": request_id, "result": result } elif tool_name == "get_namespaces": result = run_kubectl_command("kubectl get namespaces") response = { "jsonrpc": "2.0", "id": request_id, "result": result } elif tool_name == "run_kubectl_command": cmd = tool_params.get("command", "") result = run_kubectl_command(cmd) response = { "jsonrpc": "2.0", "id": request_id, "result": result } else: response = { "jsonrpc": "2.0", "id": request_id, "error": { "code": -32601, "message": f"Unknown tool: {tool_name}" } } write_response(response) elif method == "shutdown": # Handle shutdown request log("Received shutdown request") response = { "jsonrpc": "2.0", "id": request_id, "result": None } write_response(response) break else: # Unknown method log(f"Unknown method: {method}") response = { "jsonrpc": "2.0", "id": request_id, "error": { "code": -32601, "message": f"Method not found: {method}" } } write_response(response) except json.JSONDecodeError as e: log(f"Failed to parse JSON: {e}") error_response = { "jsonrpc": "2.0", "id": None, "error": { "code": -32700, "message": f"Parse error: {str(e)}" } } write_response(error_response) except Exception as e: log(f"Error processing request: {e}\n{traceback.format_exc()}") error_response = { "jsonrpc": "2.0", "id": request.get("id") if "request" in locals() else None, "error": { "code": -32603, "message": f"Internal error: {str(e)}" } } write_response(error_response) except Exception as e: log(f"Fatal error: {e}\n{traceback.format_exc()}") finally: log("Server shutting down") if __name__ == "__main__": try: # Set env var for unbuffered output os.environ["PYTHONUNBUFFERED"] = "1" # Run the server asyncio.run(run_server()) except KeyboardInterrupt: log("Server stopped by keyboard interrupt") except Exception as e: log(f"Fatal error: {e}\n{traceback.format_exc()}") finally: debug_file.close()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/rohitg00/kubectl-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server