Skip to main content
Glama

Multi Cluster Kubernetes MCP Server

by feibai406
diagnosis.py16.4 kB
from typing import Dict, Any, List, Optional import os import re from kubernetes import client from ..utils.k8s_client import KubernetesClient # Initialize client with kubeconfig directory from environment or default kubeconfig_dir = os.environ.get('KUBECONFIG_DIR', os.path.expanduser('~/.kube')) k8s_client = KubernetesClient(kubeconfig_dir) async def diagnose_k8s_application(context: str, namespace: str, app_name: str, resource_type: Optional[str] = "deployment") -> Dict[str, Any]: """ Diagnose issues with a Kubernetes application by checking resource status, events, and logs. Args: context (str): Name of the Kubernetes context to use namespace (str): Namespace where the application is located app_name (str): Name of the application/deployment/statefulset resource_type (str, optional): Type of resource to diagnose (deployment, statefulset, etc.) Returns: Dict[str, Any]: Diagnostic information about the application Raises: RuntimeError: If there's an error accessing the Kubernetes API """ try: api_client = k8s_client.get_api_client(context) core_v1 = client.CoreV1Api(api_client) apps_v1 = client.AppsV1Api(api_client) resource_type = resource_type.lower() result = { "application": app_name, "namespace": namespace, "resource_type": resource_type, "status": {}, "issues": [], "pods": [], "events": [] } # 1. Check resource status based on type try: if resource_type == "deployment": resource = apps_v1.read_namespaced_deployment(name=app_name, namespace=namespace) result["status"] = { "replicas": { "desired": resource.spec.replicas, "ready": resource.status.ready_replicas or 0, "available": resource.status.available_replicas or 0, "unavailable": resource.status.unavailable_replicas or 0 }, "conditions": [ { "type": condition.type, "status": condition.status, "reason": condition.reason, "message": condition.message } for condition in resource.status.conditions or [] ] } # Check for deployment health if resource.status.unavailable_replicas: result["issues"].append({ "severity": "warning", "issue": f"Deployment has {resource.status.unavailable_replicas} unavailable replicas" }) if resource.status.ready_replicas != resource.spec.replicas: result["issues"].append({ "severity": "warning", "issue": f"Deployment has {resource.status.ready_replicas or 0}/{resource.spec.replicas} ready replicas" }) # Get pod selector pod_selector = resource.spec.selector.match_labels elif resource_type == "statefulset": resource = apps_v1.read_namespaced_stateful_set(name=app_name, namespace=namespace) result["status"] = { "replicas": { "desired": resource.spec.replicas, "ready": resource.status.ready_replicas or 0, "current": resource.status.current_replicas or 0, "updated": resource.status.updated_replicas or 0 } } # Check for statefulset health if resource.status.ready_replicas != resource.spec.replicas: result["issues"].append({ "severity": "warning", "issue": f"StatefulSet has {resource.status.ready_replicas or 0}/{resource.spec.replicas} ready replicas" }) # Get pod selector pod_selector = resource.spec.selector.match_labels elif resource_type == "service": resource = core_v1.read_namespaced_service(name=app_name, namespace=namespace) result["status"] = { "type": resource.spec.type, "cluster_ip": resource.spec.cluster_ip, "ports": [ { "name": port.name, "port": port.port, "target_port": port.target_port, "protocol": port.protocol } for port in resource.spec.ports or [] ] } # Get pod selector pod_selector = resource.spec.selector else: # For other resource types, try to get pods by label selector pod_selector = {"app": app_name} result["status"] = {"message": f"Using default label selector for {resource_type}: {pod_selector}"} except client.rest.ApiException as e: if e.status == 404: result["issues"].append({ "severity": "error", "issue": f"{resource_type.capitalize()} '{app_name}' not found in namespace '{namespace}'" }) return result else: raise # 2. Get pods associated with the application using selector selector_str = ",".join([f"{k}={v}" for k, v in pod_selector.items()]) pods = core_v1.list_namespaced_pod( namespace=namespace, label_selector=selector_str ) if not pods.items: result["issues"].append({ "severity": "warning", "issue": f"No pods found for {resource_type}/{app_name} with selector {selector_str}" }) # 3. Check pod status and look for common issues for pod in pods.items: pod_status = { "name": pod.metadata.name, "phase": pod.status.phase, "node": pod.spec.node_name, "restart_count": sum(container.restart_count for container in (pod.status.container_statuses or [])), "ready": all(container.ready for container in (pod.status.container_statuses or [])), "containers": [] } # Check container status for container in (pod.status.container_statuses or []): container_status = { "name": container.name, "ready": container.ready, "restart_count": container.restart_count, "image": container.image, "state": "unknown" } # Determine container state if container.state.running: container_status["state"] = "running" elif container.state.waiting: container_status["state"] = "waiting" container_status["reason"] = container.state.waiting.reason container_status["message"] = container.state.waiting.message # Check for common issues if container.state.waiting.reason == "CrashLoopBackOff": result["issues"].append({ "severity": "error", "issue": f"Container {container.name} in pod {pod.metadata.name} is in CrashLoopBackOff state", "message": container.state.waiting.message }) elif container.state.waiting.reason == "ImagePullBackOff": result["issues"].append({ "severity": "error", "issue": f"Container {container.name} in pod {pod.metadata.name} cannot pull image", "message": container.state.waiting.message }) elif container.state.waiting.reason == "ErrImagePull": result["issues"].append({ "severity": "error", "issue": f"Container {container.name} in pod {pod.metadata.name} has image pull error", "message": container.state.waiting.message }) elif container.state.terminated: container_status["state"] = "terminated" container_status["reason"] = container.state.terminated.reason container_status["exit_code"] = container.state.terminated.exit_code # Check for OOMKilled if container.state.terminated.reason == "OOMKilled": result["issues"].append({ "severity": "error", "issue": f"Container {container.name} in pod {pod.metadata.name} was terminated due to OOM (Out of Memory)", "recommendation": "Consider increasing memory limits for this container" }) # Check for other termination issues elif container.state.terminated.exit_code != 0: result["issues"].append({ "severity": "error", "issue": f"Container {container.name} in pod {pod.metadata.name} terminated with exit code {container.state.terminated.exit_code}", "reason": container.state.terminated.reason }) # Check for high restart count if container.restart_count > 5: result["issues"].append({ "severity": "warning", "issue": f"Container {container.name} in pod {pod.metadata.name} has restarted {container.restart_count} times", "recommendation": "Check logs for errors" }) pod_status["containers"].append(container_status) # Add pod status to result result["pods"].append(pod_status) # 4. Get recent events related to the application events = core_v1.list_namespaced_event( namespace=namespace, field_selector=f"involvedObject.name={app_name}" ) # Add additional pod events for pod in pods.items: pod_events = core_v1.list_namespaced_event( namespace=namespace, field_selector=f"involvedObject.name={pod.metadata.name}" ) events.items.extend(pod_events.items) # Sort events by last timestamp sorted_events = sorted( events.items, key=lambda e: e.last_timestamp if e.last_timestamp else e.event_time, reverse=True ) # Extract key event information for event in sorted_events[:10]: # Limit to 10 most recent events event_info = { "type": event.type, "reason": event.reason, "message": event.message, "count": event.count, "object": f"{event.involved_object.kind}/{event.involved_object.name}", "timestamp": event.last_timestamp.isoformat() if event.last_timestamp else ( event.event_time.isoformat() if event.event_time else None ) } # Flag warning/error events if event.type == "Warning": if any(error_reason in event.reason for error_reason in ["Failed", "Error", "BackOff"]): result["issues"].append({ "severity": "error", "issue": f"Event: {event.reason} for {event.involved_object.kind}/{event.involved_object.name}", "message": event.message }) result["events"].append(event_info) # 5. Check logs for the first problematic pod if issues were found problematic_pods = [pod for pod in result["pods"] if not pod["ready"] or pod["restart_count"] > 0] if problematic_pods and len(result["issues"]) > 0: target_pod = problematic_pods[0] # Try to get logs for the first container in the problematic pod if target_pod["containers"]: target_container = target_pod["containers"][0]["name"] try: logs = core_v1.read_namespaced_pod_log( name=target_pod["name"], namespace=namespace, container=target_container, tail_lines=50 # Get last 50 lines ) # Look for common error patterns in logs oom_pattern = re.compile(r"(Out of memory|Killed|oom-kill)") permission_pattern = re.compile(r"(permission denied|unauthorized|forbidden)", re.IGNORECASE) connection_pattern = re.compile(r"(connection refused|cannot connect|connect failed)", re.IGNORECASE) if oom_pattern.search(logs): result["issues"].append({ "severity": "error", "issue": f"Out of Memory issue detected in logs for container {target_container}", "recommendation": "Increase memory limits and/or optimize application memory usage" }) if permission_pattern.search(logs): result["issues"].append({ "severity": "error", "issue": f"Permission or authorization issue detected in logs for container {target_container}", "recommendation": "Check RBAC permissions or service account settings" }) if connection_pattern.search(logs): result["issues"].append({ "severity": "warning", "issue": f"Connection issue detected in logs for container {target_container}", "recommendation": "Check network policies, service connectivity, or firewall rules" }) # Add logs to result result["log_sample"] = { "pod": target_pod["name"], "container": target_container, "content": logs[-2000:] if len(logs) > 2000 else logs # Limit log size } except Exception as log_error: result["log_error"] = str(log_error) # 6. Add general recommendations based on issues found if any(issue["severity"] == "error" for issue in result["issues"]): result["status"]["health"] = "unhealthy" elif any(issue["severity"] == "warning" for issue in result["issues"]): result["status"]["health"] = "degraded" else: result["status"]["health"] = "healthy" return result except Exception as e: raise RuntimeError(f"Failed to diagnose application: {str(e)}")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/feibai406/k8s-multicluster-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server