kube-mcp

by lochgeo
Verified
#!/usr/bin/env python3 import asyncio import json import signal from typing import Dict, List, Optional from datetime import datetime from kubernetes import client, config from kubernetes.client import V1Pod, V1Container, V1PodSpec, V1ObjectMeta from mcp.shared.exceptions import McpError from mcp.server.fastmcp import FastMCP from pydantic import BaseModel # Resource tracking class class ResourceTracker: def __init__(self, kind: str, name: str, namespace: str): self.kind = kind self.name = name self.namespace = namespace self.created_at = datetime.now() # Kubernetes Manager class KubernetesManager: def __init__(self): self.resources: List[ResourceTracker] = [] config.load_kube_config() # Load default kubeconfig self.core_api = client.CoreV1Api() self.apps_api = client.AppsV1Api() # Register signal handlers signal.signal(signal.SIGINT, lambda s, f: asyncio.create_task(self.cleanup())) signal.signal(signal.SIGTERM, lambda s, f: asyncio.create_task(self.cleanup())) async def cleanup(self): """Clean up all tracked resources in reverse order.""" for resource in reversed(self.resources): try: await self.delete_resource( resource.kind, resource.name, resource.namespace ) except Exception as e: print(f"Failed to delete {resource.kind} {resource.name}: {e}") self.resources.clear() def track_resource(self, kind: str, name: str, namespace: str): self.resources.append(ResourceTracker(kind, name, namespace)) async def delete_resource(self, kind: str, name: str, namespace: str): kind = kind.lower() if kind == "pod": await asyncio.to_thread( self.core_api.delete_namespaced_pod, name, namespace ) elif kind == "deployment": await asyncio.to_thread( self.apps_api.delete_namespaced_deployment, name, namespace ) elif kind == "service": await asyncio.to_thread( self.core_api.delete_namespaced_service, name, namespace ) self.resources = [ r for r in self.resources if not (r.kind == kind and r.name == name and r.namespace == namespace) ] def get_core_api(self): return self.core_api def get_apps_api(self): return self.apps_api # Container templates container_templates: Dict[str, V1Container] = { "ubuntu": V1Container( name="main", image="ubuntu:latest", command=["/bin/bash", "-c", "sleep infinity"], resources=client.V1ResourceRequirements( limits={"cpu": "200m", "memory": "256Mi"}, requests={"cpu": "100m", "memory": "128Mi"}, ), liveness_probe=client.V1Probe( _exec=client.V1ExecAction(command=["cat", "/proc/1/status"]), initial_delay_seconds=5, period_seconds=10, ), ), "nginx": V1Container( name="main", image="nginx:latest", ports=[client.V1ContainerPort(container_port=80)], resources=client.V1ResourceRequirements( limits={"cpu": "200m", "memory": "256Mi"}, requests={"cpu": "100m", "memory": "128Mi"}, ), liveness_probe=client.V1Probe( http_get=client.V1HTTPGetAction(path="/", port=80), initial_delay_seconds=5, period_seconds=10, ), readiness_probe=client.V1Probe( http_get=client.V1HTTPGetAction(path="/", port=80), initial_delay_seconds=2, period_seconds=5, ), ), "busybox": V1Container( name="main", image="busybox:latest", command=["sh", "-c", "sleep infinity"], resources=client.V1ResourceRequirements( limits={"cpu": "100m", "memory": "64Mi"}, requests={"cpu": "50m", "memory": "32Mi"}, ), liveness_probe=client.V1Probe( _exec=client.V1ExecAction(command=["true"]), period_seconds=10, ), ), "alpine": V1Container( name="main", image="alpine:latest", command=["sh", "-c", "sleep infinity"], resources=client.V1ResourceRequirements( limits={"cpu": "100m", "memory": "64Mi"}, requests={"cpu": "50m", "memory": "32Mi"}, ), liveness_probe=client.V1Probe( _exec=client.V1ExecAction(command=["true"]), period_seconds=10, ), ), } k8s_manager = KubernetesManager() # FastMCP Server Setup mcp = FastMCP(name="kube-mcp") # Define Tool Input Schemas with Pydantic class ListPodsInput(BaseModel): namespace: str = "default" class ListDeploymentsInput(BaseModel): namespace: str = "default" class ListServicesInput(BaseModel): namespace: str = "default" class CreatePodInput(BaseModel): name: str namespace: str template: str # Will validate against container_templates keys in the tool command: Optional[List[str]] = None class DeletePodInput(BaseModel): name: str namespace: str ignoreNotFound: bool = False class DescribePodInput(BaseModel): name: str namespace: str class GetLogsInput(BaseModel): resourceType: str name: Optional[str] = None namespace: str = "default" tail: Optional[int] = 100 # Define Tools @mcp.tool() async def list_pods(input_data: ListPodsInput): pods = await asyncio.to_thread( k8s_manager.get_core_api().list_namespaced_pod, input_data.namespace ) return [ { "type": "text", "text": json.dumps( {"pods": [pod.to_dict() for pod in pods.items]}, indent=2 ), } ] @mcp.tool() async def list_deployments(input_data: ListDeploymentsInput): deployments = await asyncio.to_thread( k8s_manager.get_apps_api().list_namespaced_deployment, input_data.namespace ) return [ { "type": "text", "text": json.dumps( {"deployments": [d.to_dict() for d in deployments.items]}, indent=2 ), } ] @mcp.tool() async def list_services(input_data: ListServicesInput): services = await asyncio.to_thread( k8s_manager.get_core_api().list_namespaced_service, input_data.namespace ) return [ { "type": "text", "text": json.dumps( {"services": [s.to_dict() for s in services.items]}, indent=2 ), } ] @mcp.tool() async def list_namespaces(): namespaces = await asyncio.to_thread(k8s_manager.get_core_api().list_namespace) return [ { "type": "text", "text": json.dumps( {"namespaces": [n.to_dict() for n in namespaces.items]}, indent=2 ), } ] @mcp.tool() async def create_pod(input_data: CreatePodInput): if input_data.template not in container_templates: raise McpError(f"Invalid template: {input_data.template}") container = container_templates[input_data.template] if input_data.command: container.command = input_data.command container.args = None pod = V1Pod( api_version="v1", kind="Pod", metadata=V1ObjectMeta( name=input_data.name, namespace=input_data.namespace, labels={"mcp-managed": "true", "app": input_data.name}, ), spec=V1PodSpec(containers=[container]), ) try: response = await asyncio.to_thread( k8s_manager.get_core_api().create_namespaced_pod, input_data.namespace, pod ) k8s_manager.track_resource("Pod", input_data.name, input_data.namespace) return [ { "type": "text", "text": json.dumps( {"podName": response.metadata.name, "status": "created"}, indent=2 ), } ] except client.exceptions.ApiException as e: raise McpError(f"Failed to create pod: {e}") @mcp.tool() async def delete_pod(input_data: DeletePodInput): try: await asyncio.to_thread( k8s_manager.get_core_api().delete_namespaced_pod, input_data.name, input_data.namespace, ) return [ { "type": "text", "text": json.dumps({"success": True, "status": "deleted"}, indent=2), } ] except client.exceptions.ApiException as e: if input_data.ignoreNotFound and e.status == 404: return [ { "type": "text", "text": json.dumps( {"success": True, "status": "not_found"}, indent=2 ), } ] raise McpError(f"Failed to delete pod: {e}") @mcp.tool() async def describe_pod(input_data: DescribePodInput): try: pod = await asyncio.to_thread( k8s_manager.get_core_api().read_namespaced_pod, input_data.name, input_data.namespace, ) return [{"type": "text", "text": json.dumps(pod.to_dict(), indent=2)}] except client.exceptions.ApiException as e: if e.status == 404: raise McpError("Pod not found") raise McpError(f"Failed to describe pod: {e}") @mcp.tool() async def cleanup(): await k8s_manager.cleanup() return [{"type": "text", "text": json.dumps({"success": True}, indent=2)}] @mcp.tool() async def list_nodes(): nodes = await asyncio.to_thread(k8s_manager.get_core_api().list_node) return [ { "type": "text", "text": json.dumps({"nodes": [n.to_dict() for n in nodes.items]}, indent=2), } ] @mcp.tool() async def get_logs(input_data: GetLogsInput): if input_data.resourceType != "pod" or not input_data.name: raise McpError("Only pod logs supported with a name") try: logs = await asyncio.to_thread( k8s_manager.get_core_api().read_namespaced_pod_log, input_data.name, input_data.namespace, tail_lines=input_data.tail, ) return [ { "type": "text", "text": json.dumps({"logs": {input_data.name: logs}}, indent=2), } ] except client.exceptions.ApiException as e: raise McpError(f"Failed to get logs: {e}") @mcp.resource("k8s://namespaces") async def read_namespaces(): try: api_call = k8s_manager.get_core_api().list_namespace result = await asyncio.to_thread(api_call) return [ { "uri": uri, "mimeType": "application/json", "text": json.dumps([item.to_dict() for item in result.items], indent=2), } ] except McpError as e: raise e except Exception as e: raise McpError(f"Failed to read resource: {e}")