Skip to main content
Glama

Kubectl MCP Tool

minimal_mcp_server.py7.85 kB
#!/usr/bin/env python3 """ Absolutely minimal MCP server implementation. This implements only the essential parts of the protocol to work with Cursor. """ import asyncio import json import os import sys import subprocess from datetime import datetime # Create log directory LOG_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "logs") os.makedirs(LOG_DIR, exist_ok=True) # Set up logging log_file = open(os.path.join(LOG_DIR, "minimal_mcp.log"), "w") def log(message): """Log a message to both the log file and stderr.""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] formatted = f"{timestamp} - {message}" print(f"LOG: {formatted}", file=sys.stderr) log_file.write(f"{formatted}\n") log_file.flush() # Basic server info SERVER_NAME = "kubectl-mcp-minimal" SERVER_VERSION = "0.1.0" # MCP protocol implementation async def stdio_transport(): """Create an asyncio transport over stdin/stdout.""" loop = asyncio.get_event_loop() reader = asyncio.StreamReader() protocol = asyncio.StreamReaderProtocol(reader) # Handle stdin await loop.connect_read_pipe(lambda: protocol, sys.stdin) # Handle stdout w_transport, w_protocol = await loop.connect_write_pipe( asyncio.streams.FlowControlMixin, sys.stdout ) writer = asyncio.StreamWriter(w_transport, w_protocol, None, loop) return reader, writer async def read_message(reader): """Read a JSON-RPC message from stdin.""" line = await reader.readline() if not line: return None line_str = line.decode('utf-8').strip() log(f"RECEIVED: {line_str}") try: return json.loads(line_str) except json.JSONDecodeError as e: log(f"JSON parse error: {e}") return None def write_message(writer, message): """Write a JSON-RPC message to stdout.""" json_str = json.dumps(message) log(f"SENDING: {json_str}") writer.write(f"{json_str}\n".encode('utf-8')) def run_kubectl(command): """Run a kubectl command and return the output.""" log(f"Running kubectl: {command}") try: result = subprocess.run( command, shell=True, capture_output=True, text=True, check=False ) success = result.returncode == 0 output = result.stdout if success else result.stderr return { "command": command, "output": output.strip(), "success": success } except Exception as e: log(f"Error running command: {e}") return { "command": command, "output": f"Error: {str(e)}", "success": False } async def run_server(): """Run the MCP server.""" log("Starting MCP server") reader, writer = await stdio_transport() # Wait for initialize request log("Waiting for initialization message") # Handle messages while True: message = await read_message(reader) if message is None: log("Received empty message, shutting down") break method = message.get("method") message_id = message.get("id") if method == "initialize": # Handle initialization log("Handling initialization request") response = { "jsonrpc": "2.0", "id": message_id, "result": { "name": SERVER_NAME, "version": SERVER_VERSION, "capabilities": { "tools": [ { "name": "run_kubectl", "description": "Run kubectl command", "parameters": { "type": "object", "properties": { "command": { "type": "string", "description": "The kubectl command to run" } }, "required": ["command"] } }, { "name": "get_pods", "description": "Get pods in the specified namespace", "parameters": { "type": "object", "properties": { "namespace": { "type": "string", "description": "Namespace to get pods from (optional)" } } } } ] } } } write_message(writer, response) elif method == "callTool": # Handle tool calls log("Handling tool call") params = message.get("params", {}) tool_name = params.get("name", "") tool_params = params.get("parameters", {}) log(f"Tool call: {tool_name}, params: {tool_params}") if tool_name == "run_kubectl": cmd = tool_params.get("command", "") result = run_kubectl(f"kubectl {cmd}") response = { "jsonrpc": "2.0", "id": message_id, "result": result } elif tool_name == "get_pods": namespace = tool_params.get("namespace", "") cmd = "kubectl get pods" + (f" -n {namespace}" if namespace else "") result = run_kubectl(cmd) response = { "jsonrpc": "2.0", "id": message_id, "result": result } else: # Unknown tool response = { "jsonrpc": "2.0", "id": message_id, "error": { "code": -32601, "message": f"Tool not found: {tool_name}" } } write_message(writer, response) elif method == "shutdown": # Handle shutdown log("Handling shutdown request") response = { "jsonrpc": "2.0", "id": message_id, "result": None } write_message(writer, response) break else: # Unknown method log(f"Unknown method: {method}") response = { "jsonrpc": "2.0", "id": message_id, "error": { "code": -32601, "message": f"Method not found: {method}" } } write_message(writer, response) log("Shutting down server") await writer.drain() writer.close() if __name__ == "__main__": try: # Unbuffered output os.environ["PYTHONUNBUFFERED"] = "1" # Run the server log(f"Starting server, Python version: {sys.version}") log(f"Current directory: {os.getcwd()}") asyncio.run(run_server()) except KeyboardInterrupt: log("Server stopped by keyboard interrupt") except Exception as e: log(f"Fatal error: {str(e)}") finally: log_file.close()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/rohitg00/kubectl-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server