Skip to main content
Glama

Sumanshu Arora

kubernetes_probe.py25.2 kB
""" Kubernetes probe for discovering MCP server tools from Kubernetes pods. """ import asyncio import json import logging import subprocess import time from typing import Any, Dict, List, Optional from kubernetes import client, config from kubernetes.client.rest import ApiException from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed from .base_probe import ( DISCOVERY_RETRIES, DISCOVERY_RETRY_SLEEP, DISCOVERY_TIMEOUT, BaseProbe, ) logger = logging.getLogger(__name__) # Kubernetes-specific constants POD_READY_TIMEOUT = 60 SERVICE_PORT_RANGE = (8000, 9000) class KubernetesProbe(BaseProbe): """Probe Kubernetes pods to discover MCP server tools.""" def __init__(self, namespace: str = "mcp-servers"): """Initialize Kubernetes probe. Args: namespace: Kubernetes namespace to use for probe operations """ super().__init__() self.namespace = namespace self._init_kubernetes_client() def _init_kubernetes_client(self): """Initialize Kubernetes client configuration.""" try: # Try to load in-cluster config first config.load_incluster_config() logger.debug("Loaded in-cluster Kubernetes config") except config.ConfigException: try: # Fall back to kubeconfig config.load_kube_config() logger.debug("Loaded kubeconfig") except config.ConfigException as e: logger.error("Failed to load Kubernetes config: %s", e) raise self.k8s_apps_v1 = client.AppsV1Api() self.k8s_core_v1 = client.CoreV1Api() def discover_tools_from_image( self, image_name: str, server_args: Optional[List[str]] = None, env_vars: Optional[Dict[str, str]] = None, timeout: int = DISCOVERY_TIMEOUT, ) -> Optional[Dict[str, Any]]: """ Discover tools from MCP server Kubernetes image. Args: image_name: Container image name to probe server_args: Arguments to pass to the MCP server env_vars: Environment variables to pass to the pod timeout: Timeout for discovery process Returns: Dictionary containing discovered tools and metadata, or None if failed """ logger.info("Discovering tools from MCP Kubernetes image: %s", image_name) try: # Try MCP stdio first result = self._try_mcp_stdio_discovery(image_name, server_args, env_vars) if result: return result # Fallback to HTTP probe (for non-standard MCP servers) return self._try_http_discovery(image_name, timeout, env_vars) except (ApiException, Exception) as e: if isinstance(e, ApiException): logger.warning( "Tool discovery timed out for image %s after %d seconds", image_name, timeout, ) else: logger.error( "Failed to discover tools from image %s: %s", image_name, e ) return None @retry( stop=stop_after_attempt(DISCOVERY_RETRIES), wait=wait_fixed(DISCOVERY_RETRY_SLEEP), retry=retry_if_exception_type((ApiException, OSError, Exception)), reraise=True, ) def _try_mcp_stdio_discovery( self, image_name: str, server_args: Optional[List[str]], env_vars: Optional[Dict[str, str]], ) -> Optional[Dict[str, Any]]: """Try to discover tools using MCP stdio protocol via Kubernetes Pod.""" try: args = server_args or [] # Use the same MCP client as Docker, but through kubectl exec result = self._discover_tools_via_kubernetes_mcp(image_name, args, env_vars) if result: logger.info( "Successfully discovered tools via MCP stdio from %s", image_name ) result["discovery_method"] = "kubernetes_mcp_stdio" return result except (ApiException, Exception) as e: logger.debug("MCP stdio discovery failed for %s: %s", image_name, e) return None def _discover_tools_via_kubernetes_mcp( self, image_name: str, args: Optional[List[str]] = None, env_vars: Optional[Dict[str, str]] = None, ) -> Optional[Dict[str, Any]]: """ Discover tools from MCP server running in Kubernetes pod using stdio. This method creates a temporary pod and uses kubectl exec to communicate with the MCP server via stdin/stdout, similar to Docker's approach. Args: image_name: Container image name args: Additional arguments for the MCP server env_vars: Environment variables to pass to the pod Returns: Dictionary containing discovered tools and metadata, or None if failed """ pod_name = f"mcp-discovery-{image_name.replace('/', '-').replace(':', '-')}-{int(time.time())}" try: # Create pod manifest for stdio discovery pod_manifest = self._create_stdio_pod_manifest( pod_name, image_name, args, env_vars ) # Create the pod self.k8s_core_v1.create_namespaced_pod( namespace=self.namespace, body=pod_manifest ) # Wait for pod to be ready if not self._wait_for_pod_ready(pod_name, timeout=60): logger.error(f"Pod {pod_name} did not become ready") return None # Use kubectl exec to run the MCP protocol handshake return self._execute_mcp_handshake_via_kubectl(pod_name, args) except Exception as e: logger.debug("Kubernetes MCP discovery failed for %s: %s", image_name, e) return None finally: # Cleanup pod try: self.k8s_core_v1.delete_namespaced_pod( name=pod_name, namespace=self.namespace ) except Exception: pass # Ignore cleanup errors @retry( stop=stop_after_attempt(DISCOVERY_RETRIES), wait=wait_fixed(DISCOVERY_RETRY_SLEEP), retry=retry_if_exception_type((ApiException, OSError, Exception)), reraise=True, ) def _try_http_discovery( self, image_name: str, timeout: int, env_vars: Optional[Dict[str, str]] = None ) -> Optional[Dict[str, Any]]: """Try to discover tools using HTTP endpoints with proper MCP protocol.""" pod_name = None service_name = None try: # Generate unique names pod_name = self._generate_pod_name(image_name) service_name = self._generate_service_name(image_name) # Find available port port = self._find_available_port() if not port: logger.error("No available ports found for service") return None # Create pod if not self._create_http_pod(pod_name, image_name, port, env_vars): return None # Create service if not self._create_service(service_name, pod_name, port): return None # Wait for pod to be ready if not self._wait_for_pod_ready(pod_name, timeout): return None # Use MCPConnection for unified HTTP discovery with FastMCP support service_url = self._get_service_url(service_name, port) async def _discover_via_mcp_connection(): from mcp_template.core.mcp_connection import MCPConnection connection = MCPConnection(timeout=timeout) try: # Use smart endpoint discovery success = await connection.connect_http_smart(service_url) if success: tools = await connection.list_tools() if tools: return tools finally: await connection.disconnect() return None tools = asyncio.run(_discover_via_mcp_connection()) if tools: return { "tools": self._normalize_mcp_tools(tools), "discovery_method": "kubernetes_http_mcp", "timestamp": time.time(), "source_image": image_name, "pod_name": pod_name, "service_name": service_name, "port": port, } return None except (ApiException, Exception) as e: logger.debug("HTTP discovery failed for %s: %s", image_name, e) return None finally: # Always cleanup resources if pod_name: self._cleanup_pod(pod_name) if service_name: self._cleanup_service(service_name) def _generate_job_name(self, image_name: str) -> str: """Generate unique job name.""" clean_name = image_name.replace("/", "-").replace(":", "-") timestamp = int(time.time()) return f"mcp-tool-discovery-job-{clean_name}-{timestamp}"[:63] # K8s name limit def _generate_pod_name(self, image_name: str) -> str: """Generate unique pod name.""" clean_name = image_name.replace("/", "-").replace(":", "-") timestamp = int(time.time()) return f"mcp-tool-discovery-{clean_name}-{timestamp}"[:63] # K8s name limit def _generate_service_name(self, image_name: str) -> str: """Generate unique service name.""" clean_name = image_name.replace("/", "-").replace(":", "-") timestamp = int(time.time()) return f"mcp-discovery-svc-{clean_name}-{timestamp}"[:63] # K8s name limit def _find_available_port(self) -> Optional[int]: """Find an available port for the service.""" # For Kubernetes, we can use any port since it's internal # Just return a port from our range return SERVICE_PORT_RANGE[0] def _create_discovery_job_manifest( self, job_name: str, image_name: str, args: List[str], env_vars: Optional[Dict[str, str]], ) -> Dict[str, Any]: """Create a Kubernetes Job manifest for MCP stdio discovery.""" env_list = [] if env_vars: env_list = [{"name": k, "value": v} for k, v in env_vars.items()] return { "apiVersion": "batch/v1", "kind": "Job", "metadata": { "name": job_name, "namespace": self.namespace, "labels": {"app": "mcp-tool-discovery", "type": "stdio-probe"}, }, "spec": { "ttlSecondsAfterFinished": 300, # Cleanup after 5 minutes "backoffLimit": 1, "template": { "spec": { "restartPolicy": "Never", "containers": [ { "name": "mcp-stdio-probe", "image": image_name, "args": args, "env": env_list, "resources": { "requests": {"memory": "64Mi", "cpu": "100m"}, "limits": {"memory": "256Mi", "cpu": "500m"}, }, } ], } }, }, } def _create_http_pod( self, pod_name: str, image_name: str, port: int, env_vars: Optional[Dict[str, str]], ) -> bool: """Create pod with HTTP server (fallback method).""" try: env_list = [] if env_vars: env_list = [{"name": k, "value": v} for k, v in env_vars.items()] pod_manifest = { "apiVersion": "v1", "kind": "Pod", "metadata": { "name": pod_name, "namespace": self.namespace, "labels": {"app": "mcp-tool-discovery", "type": "http-probe"}, }, "spec": { "restartPolicy": "Never", "containers": [ { "name": "mcp-http-probe", "image": image_name, "ports": [{"containerPort": port}], "env": env_list, "resources": { "requests": {"memory": "64Mi", "cpu": "100m"}, "limits": {"memory": "256Mi", "cpu": "500m"}, }, } ], }, } self.k8s_core_v1.create_namespaced_pod( namespace=self.namespace, body=pod_manifest ) logger.debug("Pod %s created successfully", pod_name) return True except ApiException as e: logger.error("Failed to create pod %s: %s", pod_name, e) return False def _create_service(self, service_name: str, pod_name: str, port: int) -> bool: """Create service to expose the pod.""" try: service_manifest = { "apiVersion": "v1", "kind": "Service", "metadata": { "name": service_name, "namespace": self.namespace, "labels": {"app": "mcp-tool-discovery"}, }, "spec": { "selector": {"app": "mcp-tool-discovery", "type": "http-probe"}, "ports": [{"protocol": "TCP", "port": port, "targetPort": port}], "type": "ClusterIP", }, } self.k8s_core_v1.create_namespaced_service( namespace=self.namespace, body=service_manifest ) logger.debug("Service %s created successfully", service_name) return True except ApiException as e: logger.error("Failed to create service %s: %s", service_name, e) return False def _wait_for_job_completion( self, job_name: str, timeout: int ) -> Optional[Dict[str, Any]]: """Wait for job to complete and extract results.""" start_time = time.time() while time.time() - start_time < timeout: try: job = self.k8s_apps_v1.read_namespaced_job_status( name=job_name, namespace=self.namespace ) if job.status.succeeded: # Job completed successfully, get logs return self._extract_mcp_tools_from_job_logs(job_name) elif job.status.failed: logger.debug("Job %s failed", job_name) return None time.sleep(1) except ApiException as e: logger.debug("Error checking job status: %s", e) return None logger.warning("Job %s did not complete within %d seconds", job_name, timeout) return None def _wait_for_pod_ready(self, pod_name: str, timeout: int) -> bool: """Wait for pod to be ready to accept requests.""" start_time = time.time() while time.time() - start_time < timeout: try: pod = self.k8s_core_v1.read_namespaced_pod_status( name=pod_name, namespace=self.namespace ) if pod.status.phase == "Running": # Check if all containers are ready if pod.status.container_statuses: all_ready = all( container.ready for container in pod.status.container_statuses ) if all_ready: logger.debug("Pod %s is ready", pod_name) return True elif pod.status.phase in ["Failed", "Succeeded"]: logger.debug( "Pod %s finished with phase %s", pod_name, pod.status.phase ) return False time.sleep(1) except ApiException as e: logger.debug("Error checking pod status: %s", e) return False logger.warning( "Pod %s did not become ready within %d seconds", pod_name, timeout, ) return False def _extract_mcp_tools_from_job_logs( self, job_name: str ) -> Optional[Dict[str, Any]]: """Extract MCP tools information from job logs.""" try: # Get pods for this job pods = self.k8s_core_v1.list_namespaced_pod( namespace=self.namespace, label_selector=f"job-name={job_name}" ) if not pods.items: logger.debug("No pods found for job %s", job_name) return None # Get logs from the first pod pod_name = pods.items[0].metadata.name logs = self.k8s_core_v1.read_namespaced_pod_log( name=pod_name, namespace=self.namespace ) # Try to parse MCP tools from logs # This is a simplified implementation - in practice you'd need # to implement proper MCP protocol parsing return self._parse_mcp_tools_from_logs(logs) except ApiException as e: logger.debug("Failed to get job logs: %s", e) return None def _parse_mcp_tools_from_logs(self, logs: str) -> Optional[Dict[str, Any]]: """Parse MCP tools from container logs.""" # This is a placeholder implementation # In practice, you'd implement proper MCP protocol parsing try: # Look for JSON output in logs for line in logs.split("\n"): if line.strip().startswith("{") and "tools" in line: return json.loads(line.strip()) except (json.JSONDecodeError, Exception) as e: logger.debug("Failed to parse tools from logs: %s", e) return None def _get_service_url(self, service_name: str, port: int) -> str: """Get the URL for accessing the service.""" return f"http://{service_name}.{self.namespace}.svc.cluster.local:{port}" def _cleanup_job(self, job_name: str): """Clean up the discovery job.""" try: self.k8s_apps_v1.delete_namespaced_job( name=job_name, namespace=self.namespace, propagation_policy="Background" ) logger.debug("Cleaned up job %s", job_name) except ApiException as e: logger.debug("Failed to cleanup job %s: %s", job_name, e) def _cleanup_pod(self, pod_name: str): """Clean up the discovery pod.""" try: self.k8s_core_v1.delete_namespaced_pod( name=pod_name, namespace=self.namespace ) logger.debug("Cleaned up pod %s", pod_name) except ApiException as e: logger.debug("Failed to cleanup pod %s: %s", pod_name, e) def _cleanup_service(self, service_name: str): """Clean up the discovery service.""" try: self.k8s_core_v1.delete_namespaced_service( name=service_name, namespace=self.namespace ) logger.debug("Cleaned up service %s", service_name) except ApiException as e: logger.debug("Failed to cleanup service %s: %s", service_name, e) def _create_stdio_pod_manifest( self, pod_name: str, image_name: str, args: Optional[List[str]], env_vars: Optional[Dict[str, str]], ) -> Dict[str, Any]: """Create a Kubernetes Pod manifest for MCP stdio discovery.""" env_list = [] if env_vars: env_list = [{"name": k, "value": v} for k, v in env_vars.items()] # Add MCP_TRANSPORT=stdio to ensure stdio mode env_list.append({"name": "MCP_TRANSPORT", "value": "stdio"}) container_spec = { "name": "mcp-stdio-probe", "image": image_name, "env": env_list, "stdin": True, "stdinOnce": True, "tty": False, "resources": { "requests": {"memory": "64Mi", "cpu": "100m"}, "limits": {"memory": "256Mi", "cpu": "500m"}, }, } # Add args if provided if args: container_spec["args"] = args return { "apiVersion": "v1", "kind": "Pod", "metadata": { "name": pod_name, "namespace": self.namespace, "labels": {"app": "mcp-tool-discovery", "type": "stdio-probe"}, }, "spec": { "restartPolicy": "Never", "containers": [container_spec], }, } def _execute_mcp_handshake_via_kubectl( self, pod_name: str, args: Optional[List[str]] ) -> Optional[Dict[str, Any]]: """Execute MCP protocol handshake via kubectl attach to pod's stdin.""" try: # Use kubectl attach to connect to the pod's stdin/stdout kubectl_cmd = ["kubectl", "attach", "-i", pod_name, "-n", self.namespace] logger.debug(f"kubectl command: {' '.join(kubectl_cmd)}") # Create the MCP handshake messages init_request = { "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": { "protocolVersion": "2025-03-26", "capabilities": {"roots": {"listChanged": True}, "sampling": {}}, "clientInfo": {"name": "ExampleClient", "version": "1.0.0"}, }, } initialized_notification = { "jsonrpc": "2.0", "method": "notifications/initialized", } tools_request = { "jsonrpc": "2.0", "id": 2, "method": "tools/list", } # Prepare input for the MCP server mcp_input = ( json.dumps(init_request) + "\n" + json.dumps(initialized_notification) + "\n" + json.dumps(tools_request) + "\n" ) logger.debug(f"MCP input: {mcp_input}") # Execute kubectl with the MCP input result = subprocess.run( kubectl_cmd, input=mcp_input, capture_output=True, text=True, timeout=30 ) logger.debug(f"kubectl exit code: {result.returncode}") logger.debug(f"kubectl stdout: {result.stdout}") logger.debug(f"kubectl stderr: {result.stderr}") if result.returncode != 0: logger.error(f"kubectl exec failed: {result.stderr}") return None # Parse the output to extract MCP responses return self._parse_mcp_responses(result.stdout) except Exception as e: logger.error(f"Failed to execute MCP handshake via kubectl: {e}") return None def _parse_mcp_responses(self, output: str) -> Optional[Dict[str, Any]]: """Parse MCP server responses from kubectl exec output.""" try: lines = output.strip().split("\n") tools = [] for line in lines: line = line.strip() if not line or not line.startswith("{"): continue try: response = json.loads(line) # Look for tools/list response if ( response.get("id") == 2 and "result" in response and "tools" in response["result"] ): tools = response["result"]["tools"] break except json.JSONDecodeError: continue if tools: return { "discovery_method": "kubernetes_mcp_stdio", "timestamp": time.time(), "tools": self._normalize_mcp_tools(tools), } return None except Exception as e: logger.error(f"Failed to parse MCP responses: {e}") return None

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Data-Everything/mcp-server-templates'

If you have feedback or need assistance with the MCP directory API, please join our Discord server