Skip to main content
Glama

Kubectl MCP Tool

test_utils.py12.1 kB
#!/usr/bin/env python3 """ Test utilities for kubectl-mcp-tool MCP testing. Provides functions for test setup, validation, and cleanup. """ import os import time import json import uuid import logging import subprocess from typing import Dict, List, Any, Tuple, Optional, Union, Callable from contextlib import contextmanager # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) # Check if we're running in mock mode MOCK_MODE = os.environ.get("MCP_TEST_MOCK_MODE", "0") == "1" def setup_test_namespace(prefix="mcp-test-", max_retries=5, wait_time=3) -> str: """ Create a test namespace with retry logic. Args: prefix: Prefix for the namespace name max_retries: Maximum number of retry attempts wait_time: Time to wait between retries (seconds) Returns: The name of the created namespace Raises: RuntimeError: If namespace creation fails after all retries """ if MOCK_MODE: # In mock mode, just return a fake namespace name return f"{prefix}{uuid.uuid4().hex[:8]}" namespace_name = f"{prefix}{uuid.uuid4().hex[:8]}" for attempt in range(max_retries): try: logger.info(f"Creating namespace {namespace_name} (attempt {attempt+1}/{max_retries})") result = subprocess.run( ["kubectl", "create", "namespace", namespace_name], capture_output=True, check=True, text=True ) logger.info(f"Namespace {namespace_name} created successfully") return namespace_name except subprocess.CalledProcessError as e: logger.warning(f"Failed to create namespace: {e.stderr}") if attempt < max_retries - 1: logger.info(f"Retrying in {wait_time} seconds...") time.sleep(wait_time) else: raise RuntimeError(f"Failed to create namespace after {max_retries} attempts") # This shouldn't be reachable, but just in case raise RuntimeError("Unexpected error in namespace creation") def cleanup_test_namespace(namespace: str) -> bool: """ Delete a test namespace. Args: namespace: The namespace to delete Returns: True if successful, False otherwise """ if MOCK_MODE: # In mock mode, just pretend we deleted it logger.info(f"[MOCK] Deleted namespace {namespace}") return True try: logger.info(f"Deleting namespace {namespace}") subprocess.run( ["kubectl", "delete", "namespace", namespace, "--wait=false"], capture_output=True, check=True, text=True ) logger.info(f"Namespace {namespace} deletion initiated") return True except subprocess.CalledProcessError as e: logger.error(f"Failed to delete namespace: {e.stderr}") return False @contextmanager def namespace_context(prefix="mcp-test-"): """ Context manager for creating and cleaning up a test namespace. Args: prefix: Prefix for the namespace name Yields: The name of the created namespace """ namespace = setup_test_namespace(prefix) try: yield namespace finally: cleanup_test_namespace(namespace) def deploy_test_resources(namespace: str, yaml_file: str) -> bool: """ Deploy test resources from a YAML file to a namespace. Args: namespace: The namespace to deploy to yaml_file: Path to the YAML file containing resources Returns: True if successful, False otherwise """ if MOCK_MODE: # In mock mode, just pretend we deployed it logger.info(f"[MOCK] Deployed resources from {yaml_file} to {namespace}") return True try: logger.info(f"Deploying resources from {yaml_file} to {namespace}") subprocess.run( ["kubectl", "apply", "-f", yaml_file, "-n", namespace], capture_output=True, check=True, text=True ) logger.info(f"Resources deployed successfully to {namespace}") return True except subprocess.CalledProcessError as e: logger.error(f"Failed to deploy resources: {e.stderr}") return False def wait_for_pod_ready(pod_name: str, namespace: str, timeout: int = 60) -> bool: """ Wait for a pod to be ready. Args: pod_name: The name of the pod to wait for namespace: The namespace containing the pod timeout: Maximum time to wait in seconds Returns: True if the pod is ready, False otherwise """ if MOCK_MODE: # In mock mode, just pretend the pod is ready logger.info(f"[MOCK] Pod {pod_name} is ready in namespace {namespace}") return True logger.info(f"Waiting for pod {pod_name} in namespace {namespace} to be ready") start_time = time.time() while time.time() - start_time < timeout: try: result = subprocess.run( ["kubectl", "get", "pod", pod_name, "-n", namespace, "-o", "jsonpath={.status.phase}"], capture_output=True, check=True, text=True ) status = result.stdout.strip() if status == "Running": # Check if all containers are ready ready_result = subprocess.run( ["kubectl", "get", "pod", pod_name, "-n", namespace, "-o", "jsonpath={.status.containerStatuses[*].ready}"], capture_output=True, check=True, text=True ) ready_statuses = ready_result.stdout.strip().split() if all(status == "true" for status in ready_statuses): logger.info(f"Pod {pod_name} is ready") return True logger.debug(f"Pod status: {status}, waiting...") time.sleep(3) except subprocess.CalledProcessError: logger.debug(f"Error checking pod status, retrying...") time.sleep(3) logger.warning(f"Timed out waiting for pod {pod_name} to be ready") return False def generate_test_pod(name: str, image: str = "nginx:latest") -> Dict[str, Any]: """ Generate a simple test pod definition. Args: name: The name for the pod image: The container image to use Returns: A pod resource definition as a dictionary """ return { "apiVersion": "v1", "kind": "Pod", "metadata": { "name": name, "labels": { "app": name, "created-by": "mcp-test" } }, "spec": { "containers": [ { "name": "main", "image": image, "ports": [ { "containerPort": 80 } ] } ] } } def generate_test_deployment(name: str, replicas: int = 1, image: str = "nginx:latest") -> Dict[str, Any]: """ Generate a simple test deployment definition. Args: name: The name for the deployment replicas: The number of replicas image: The container image to use Returns: A deployment resource definition as a dictionary """ return { "apiVersion": "apps/v1", "kind": "Deployment", "metadata": { "name": name, "labels": { "app": name, "created-by": "mcp-test" } }, "spec": { "replicas": replicas, "selector": { "matchLabels": { "app": name } }, "template": { "metadata": { "labels": { "app": name, "created-by": "mcp-test" } }, "spec": { "containers": [ { "name": "main", "image": image, "ports": [ { "containerPort": 80 } ] } ] } } } } def validate_mcp_response(response: Dict[str, Any], schema: Optional[Dict[str, Any]] = None) -> Tuple[bool, str]: """ Validate that an MCP response meets the expected format. Args: response: The MCP response to validate schema: Optional schema to validate against Returns: Tuple of (is_valid, error_message) """ # Basic structure validation if not isinstance(response, dict): return False, "Response is not a dictionary" if "type" not in response: return False, "Response missing 'type' field" # Accept both tool_call and tools response types valid_types = ["tool_call", "tools"] if response["type"] not in valid_types: return False, f"Expected response type to be one of {valid_types}, got '{response['type']}'" # For tool_call responses, check for result field if response["type"] == "tool_call" and "result" not in response: return False, "Response of type 'tool_call' missing 'result' field" # For tools responses, check for tools list if response["type"] == "tools" and "result" not in response: return False, "Response of type 'tools' missing 'result' field" if response["type"] == "tools" and "tools" not in response["result"]: return False, "Response of type 'tools' missing 'tools' field in result" # Schema validation if provided (only for tool_call responses) if schema and response["type"] == "tool_call": # Simplified schema validation - in a real implementation, you might use jsonschema for key, expected_type in schema.items(): if key not in response["result"]: return False, f"Result missing required field '{key}'" if not isinstance(response["result"][key], expected_type): return False, f"Field '{key}' has wrong type: expected {expected_type}, got {type(response['result'][key])}" return True, "" def validate_kubernetes_state(resource_type: str, name: str, namespace: str) -> Tuple[bool, Union[Dict[str, Any], str]]: """ Validate that Kubernetes resources match an expected state. Args: resource_type: The type of resource to check (pod, deployment, etc.) name: The name of the resource namespace: The namespace containing the resource Returns: Tuple of (is_valid, actual_state_or_error) """ if MOCK_MODE: # In mock mode, return a fake successful state logger.info(f"[MOCK] Validating {resource_type}/{name} in namespace {namespace}") return True, { "kind": resource_type.capitalize(), "metadata": { "name": name, "namespace": namespace }, "status": { "phase": "Running" if resource_type == "pod" else None, "availableReplicas": 1 if resource_type == "deployment" else None } } try: result = subprocess.run( ["kubectl", "get", resource_type, name, "-n", namespace, "-o", "json"], capture_output=True, check=True, text=True ) state = json.loads(result.stdout) return True, state except subprocess.CalledProcessError as e: return False, e.stderr except json.JSONDecodeError as e: return False, f"Invalid JSON: {e}" # Add more test utilities as needed

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/rohitg00/kubectl-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server