Skip to main content
Glama

Kubectl MCP Tool

test_new_features.py6.52 kB
#!/usr/bin/env python3 """ Test script for the new features added to kubectl-mcp-tool """ import json import sys import time import logging import subprocess from typing import Dict, Any # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class MCP_Client: """Simple client to test MCP server features.""" def __init__(self, server_process): """Initialize the client with a server process.""" self.server_process = server_process self.request_id = 0 def send_request(self, method: str, params: Dict[str, Any] = None) -> Dict[str, Any]: """Send a request to the MCP server and get the response.""" if params is None: params = {} self.request_id += 1 request = { "jsonrpc": "2.0", "id": self.request_id, "method": method, "params": params } # Send the request logger.info(f"Sending request: {json.dumps(request)}") self.server_process.stdin.write(json.dumps(request) + "\n") self.server_process.stdin.flush() # Read the response while True: response_line = self.server_process.stdout.readline().strip() if not response_line: continue try: response = json.loads(response_line) # Skip heartbeat messages if response.get("method") == "heartbeat": continue logger.info(f"Received response: {json.dumps(response)}") return response except json.JSONDecodeError: logger.error(f"Failed to decode response: {response_line}") continue def initialize(self) -> Dict[str, Any]: """Initialize the MCP connection.""" return self.send_request("initialize") def list_tools(self) -> Dict[str, Any]: """List available tools.""" return self.send_request("tools/list") def call_tool(self, name: str, arguments: Dict[str, Any] = None) -> Dict[str, Any]: """Call a tool with the given name and arguments.""" if arguments is None: arguments = {} params = { "name": name, "arguments": arguments } return self.send_request("tools/call", params) def main(): """Run the tests for the new features.""" logger.info("Starting MCP server...") server_process = subprocess.Popen( ["./simple_kubectl_mcp.py"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1 ) # Give the server time to start time.sleep(1) client = MCP_Client(server_process) try: # Initialize connection response = client.initialize() logger.info(f"Initialization result: {response.get('result', {})}") # List available tools response = client.list_tools() tools = response.get("result", {}).get("tools", []) tool_names = [tool.get("name") for tool in tools] logger.info(f"Available tools: {tool_names}") # Test each new feature # 1. Test set_namespace logger.info("\n--- Testing set_namespace ---") response = client.call_tool("set_namespace", {"namespace": "kube-system"}) logger.info(f"set_namespace result: {response.get('result', {})}") # 2. Test resource listing tools logger.info("\n--- Testing list_pods ---") response = client.call_tool("list_pods", {"namespace": "kube-system"}) result = response.get("result", {}) pod_count = result.get("count", 0) logger.info(f"Found {pod_count} pods in kube-system namespace") logger.info("\n--- Testing list_services ---") response = client.call_tool("list_services", {"namespace": "kube-system"}) result = response.get("result", {}) service_count = result.get("count", 0) logger.info(f"Found {service_count} services in kube-system namespace") logger.info("\n--- Testing list_deployments ---") response = client.call_tool("list_deployments", {"namespace": "kube-system"}) result = response.get("result", {}) deployment_count = result.get("count", 0) logger.info(f"Found {deployment_count} deployments in kube-system namespace") logger.info("\n--- Testing list_nodes ---") response = client.call_tool("list_nodes") result = response.get("result", {}) node_count = result.get("count", 0) logger.info(f"Found {node_count} nodes in the cluster") logger.info("\n--- Testing list_namespaces ---") response = client.call_tool("list_namespaces") result = response.get("result", {}) namespace_count = result.get("count", 0) logger.info(f"Found {namespace_count} namespaces in the cluster") # 3. Test kubectl utilities logger.info("\n--- Testing explain_resource ---") response = client.call_tool("explain_resource", {"resource": "pods"}) result = response.get("result", {}) explanation = result.get("explanation", "") logger.info(f"Explanation for 'pods' resource: {explanation[:100]}...") logger.info("\n--- Testing list_api_resources ---") response = client.call_tool("list_api_resources") result = response.get("result", {}) api_resource_count = result.get("count", 0) logger.info(f"Found {api_resource_count} API resources") # 4. Test Helm chart operations (skipping actual installation to avoid side effects) # Instead, just verify that the commands are formed correctly logger.info("\n--- Testing Helm chart operations ---") logger.info("Note: Not actually installing charts to avoid side effects") # All tests passed logger.info("\n--- All tests completed successfully ---") except Exception as e: logger.error(f"Test failed: {str(e)}") finally: # Clean up logger.info("Stopping MCP server...") server_process.terminate() server_process.wait(timeout=5) if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/rohitg00/kubectl-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server