Skip to main content
Glama

Multi Cluster Kubernetes MCP Server

by feibai406
describe.py54.5 kB
from typing import Dict, Any, List, Optional import os import json from kubernetes import client from ..utils.k8s_client import KubernetesClient # Initialize client with kubeconfig directory from environment or default kubeconfig_dir = os.environ.get('KUBECONFIG_DIR', os.path.expanduser('~/.kube')) k8s_client = KubernetesClient(kubeconfig_dir) async def describe_k8s_resource(context: str, resource_type: str, name: Optional[str] = None, namespace: Optional[str] = None, selector: Optional[str] = None, all_namespaces: bool = False) -> Dict[str, Any]: """ Show detailed information about a specific resource or group of resources using the Kubernetes Python client instead of kubectl. Supported resource types: - Core resources: pod, service, namespace, node, secret, configmap, persistentvolume, persistentvolumeclaim, serviceaccount, endpoint - Apps resources: deployment, statefulset, daemonset, replicaset - Networking resources: ingress - Batch resources: job, cronjob Args: context (str): Name of the Kubernetes context to use resource_type (str): Type of resource to describe (e.g., pod, deployment, service) name (str, optional): Name of the resource to describe namespace (str, optional): Namespace of the resource(s) selector (str, optional): Label selector to filter resources all_namespaces (bool, optional): If True, include resources across all namespaces Returns: Dict[str, Any]: Detailed information about the resource(s) Raises: RuntimeError: If there's an error accessing the Kubernetes API """ try: # Get API client for the context api_client = k8s_client.get_api_client(context) # Create result structure result = { "resource_type": resource_type, "context": context } # Add optional parameters to result if provided if namespace and not all_namespaces: result["namespace"] = namespace elif all_namespaces: result["namespace"] = "all" if name: result["name"] = name if selector: result["selector"] = selector # Get structured information using the K8s API structured_info = await _get_structured_resource_info( api_client, resource_type, name, namespace, selector, all_namespaces ) # Add structured information to result result.update(structured_info) # Generate a human-readable description similar to kubectl describe result["description"] = await _generate_description( structured_info, resource_type, name, namespace ) return result except Exception as e: raise RuntimeError(f"Failed to describe {resource_type}: {str(e)}") async def _generate_description(info: Dict[str, Any], resource_type: str, name: Optional[str] = None, namespace: Optional[str] = None) -> Dict[str, str]: """ Generate a human-readable description from structured information. Args: info (Dict[str, Any]): Structured resource information resource_type (str): Type of resource name (str, optional): Name of the resource namespace (str, optional): Namespace of the resource Returns: Dict[str, str]: Sections of description text """ sections = {} # Process each item in the structured info if "items" in info and info["items"]: for i, item in enumerate(info["items"]): # Generate a title for this item if len(info["items"]) > 1: item_title = f"{resource_type.capitalize()} {i+1}: {item.get('name', 'unknown')}" else: item_title = f"{resource_type.capitalize()}: {item.get('name', 'unknown')}" # Extract basic information sections[item_title] = "" # Add namespace if available if "namespace" in item: sections[item_title] += f"Namespace: {item['namespace']}\n" # Add common metadata sections[item_title] += _format_basic_info(item) # Add resource-specific sections if resource_type.lower() == "pod": sections[f"Status for {item.get('name', 'unknown')}"] = _format_pod_status(item) sections[f"Containers for {item.get('name', 'unknown')}"] = _format_containers(item) sections[f"Volumes for {item.get('name', 'unknown')}"] = _format_volumes(item) elif resource_type.lower() == "service": sections[f"Service Details for {item.get('name', 'unknown')}"] = _format_service_info(item) elif resource_type.lower() == "deployment": sections[f"Deployment Status for {item.get('name', 'unknown')}"] = _format_deployment_status(item) elif resource_type.lower() == "node": sections[f"Node Status for {item.get('name', 'unknown')}"] = _format_node_status(item) sections[f"Node Resources for {item.get('name', 'unknown')}"] = _format_node_resources(item) elif resource_type.lower() == "daemonset": sections[f"DaemonSet Status for {item.get('name', 'unknown')}"] = _format_daemonset_status(item) elif resource_type.lower() == "replicaset": sections[f"ReplicaSet Status for {item.get('name', 'unknown')}"] = _format_replicaset_status(item) elif resource_type.lower() == "configmap": sections[f"ConfigMap Data for {item.get('name', 'unknown')}"] = _format_configmap_data(item) elif resource_type.lower() == "secret": sections[f"Secret Data for {item.get('name', 'unknown')}"] = _format_secret_data(item) elif resource_type.lower() == "namespace": sections[f"Namespace Status for {item.get('name', 'unknown')}"] = _format_namespace_status(item) elif resource_type.lower() == "persistentvolume": sections[f"PersistentVolume Details for {item.get('name', 'unknown')}"] = _format_pv_info(item) elif resource_type.lower() == "persistentvolumeclaim": sections[f"PersistentVolumeClaim Details for {item.get('name', 'unknown')}"] = _format_pvc_info(item) elif resource_type.lower() == "ingress": sections[f"Ingress Details for {item.get('name', 'unknown')}"] = _format_ingress_info(item) elif resource_type.lower() == "job": sections[f"Job Details for {item.get('name', 'unknown')}"] = _format_job_info(item) elif resource_type.lower() == "cronjob": sections[f"CronJob Details for {item.get('name', 'unknown')}"] = _format_cronjob_info(item) # Add conditions if available if "conditions" in item or ("status" in item and "conditions" in item["status"]): conditions = item.get("conditions", item.get("status", {}).get("conditions", [])) if conditions: sections[f"Conditions for {item.get('name', 'unknown')}"] = _format_conditions(conditions) # If no items were found if not sections and "error" in info: sections["Error"] = info["error"] elif not sections and "message" in info: sections["Message"] = info["message"] elif not sections: sections["Info"] = "No resources found" return sections def _format_basic_info(item: Dict[str, Any]) -> str: """Format basic information about a resource.""" result = "" # Add creation timestamp if available if "creation_timestamp" in item: result += f"Creation Timestamp: {item['creation_timestamp']}\n" # Add labels if available if "labels" in item and item["labels"]: result += "Labels: " for i, (key, value) in enumerate(item["labels"].items()): if i > 0: result += " " result += f"{key}={value}\n" # Add annotations if available if "annotations" in item and item["annotations"]: result += "Annotations: " for i, (key, value) in enumerate(item["annotations"].items()): if i > 0: result += " " result += f"{key}={value}\n" return result def _format_pod_status(pod: Dict[str, Any]) -> str: """Format pod status information.""" result = "" status = pod.get("status_phase", "Unknown") result += f"Status: {status}\n" if "node" in pod: result += f"Node: {pod['node']}\n" if "ip" in pod: result += f"IP: {pod['ip']}\n" if "start_time" in pod: result += f"Start Time: {pod['start_time']}\n" return result def _format_containers(pod: Dict[str, Any]) -> str: """Format container information for a pod.""" result = "" for container in pod.get("containers", []): result += f"Container: {container.get('name', 'unknown')}\n" result += f" Image: {container.get('image', 'unknown')}\n" # Add ports if available if "ports" in container and container["ports"]: result += " Ports:\n" for port in container["ports"]: result += f" {port.get('container_port', 'unknown')}/{port.get('protocol', 'TCP')}\n" # Add resources if available if "resources" in container and container["resources"]: result += " Resources:\n" if "requests" in container["resources"] and container["resources"]["requests"]: result += " Requests:\n" for resource, value in container["resources"]["requests"].items(): result += f" {resource}: {value}\n" if "limits" in container["resources"] and container["resources"]["limits"]: result += " Limits:\n" for resource, value in container["resources"]["limits"].items(): result += f" {resource}: {value}\n" result += "\n" return result def _format_volumes(pod: Dict[str, Any]) -> str: """Format volume information for a pod.""" result = "" for volume in pod.get("volumes", []): result += f"Volume: {volume.get('name', 'unknown')}\n" result += f" Type: {volume.get('type', 'unknown')}\n" result += "\n" return result def _format_service_info(service: Dict[str, Any]) -> str: """Format service information.""" result = "" result += f"Type: {service.get('type', 'unknown')}\n" result += f"ClusterIP: {service.get('cluster_ip', 'unknown')}\n" if "external_ips" in service and service["external_ips"]: result += f"External IPs: {', '.join(service['external_ips'])}\n" if "ports" in service and service["ports"]: result += "Ports:\n" for port in service["ports"]: result += f" {port.get('name', '')} {port.get('port', 'unknown')}/{port.get('protocol', 'TCP')}" if "target_port" in port: result += f" -> {port['target_port']}" if "node_port" in port and port["node_port"]: result += f" (NodePort: {port['node_port']})" result += "\n" if "selector" in service and service["selector"]: result += "Selector: " for i, (key, value) in enumerate(service["selector"].items()): if i > 0: result += " " result += f"{key}={value}\n" return result def _format_deployment_status(deployment: Dict[str, Any]) -> str: """Format deployment status information.""" result = "" result += f"Replicas: {deployment.get('replicas', 'unknown')}\n" result += f"Strategy: {deployment.get('strategy', 'unknown')}\n" if "status" in deployment: status = deployment["status"] result += "Status:\n" result += f" Ready Replicas: {status.get('ready_replicas', 0)}\n" result += f" Updated Replicas: {status.get('updated_replicas', 0)}\n" result += f" Available Replicas: {status.get('available_replicas', 0)}\n" if "unavailable_replicas" in status and status["unavailable_replicas"]: result += f" Unavailable: {status['unavailable_replicas']}\n" if "selector" in deployment and deployment["selector"]: result += "Selector: " for i, (key, value) in enumerate(deployment["selector"].items()): if i > 0: result += " " result += f"{key}={value}\n" return result def _format_node_status(node: Dict[str, Any]) -> str: """Format node status information.""" result = "" if "status" in node and "addresses" in node["status"]: result += "Addresses:\n" for address in node["status"]["addresses"]: result += f" {address.get('type', 'unknown')}: {address.get('address', 'unknown')}\n" if "spec" in node and "taints" in node["spec"] and node["spec"]["taints"]: result += "Taints:\n" for taint in node["spec"]["taints"]: result += f" {taint.get('key', 'unknown')}={taint.get('value', 'unknown')}:{taint.get('effect', 'unknown')}\n" return result def _format_node_resources(node: Dict[str, Any]) -> str: """Format node resource information.""" result = "" if "status" in node: if "capacity" in node["status"]: result += "Capacity:\n" for resource, value in node["status"]["capacity"].items(): result += f" {resource}: {value}\n" if "allocatable" in node["status"]: result += "Allocatable:\n" for resource, value in node["status"]["allocatable"].items(): result += f" {resource}: {value}\n" return result def _format_daemonset_status(daemonset: Dict[str, Any]) -> str: """Format daemonset status information.""" result = "" result += f"Desired Number: {daemonset.get('desired_number', 'unknown')}\n" result += f"Update Strategy: {daemonset.get('update_strategy', 'unknown')}\n" if "status" in daemonset: status = daemonset["status"] result += "Status:\n" result += f" Current Number: {status.get('current_number', 0)}\n" result += f" Ready Number: {status.get('ready_number', 0)}\n" result += f" Updated Number: {status.get('updated_number', 0)}\n" result += f" Available Number: {status.get('available_number', 0)}\n" if "selector" in daemonset and daemonset["selector"]: result += "Selector: " for i, (key, value) in enumerate(daemonset["selector"].items()): if i > 0: result += " " result += f"{key}={value}\n" return result def _format_replicaset_status(replicaset: Dict[str, Any]) -> str: """Format replicaset status information.""" result = "" result += f"Replicas: {replicaset.get('replicas', 'unknown')}\n" if "status" in replicaset: status = replicaset["status"] result += "Status:\n" result += f" Ready Replicas: {status.get('ready_replicas', 0)}\n" result += f" Available Replicas: {status.get('available_replicas', 0)}\n" if "fully_labeled_replicas" in status: result += f" Fully Labeled: {status['fully_labeled_replicas']}\n" if "selector" in replicaset and replicaset["selector"]: result += "Selector: " for i, (key, value) in enumerate(replicaset["selector"].items()): if i > 0: result += " " result += f"{key}={value}\n" if "pod_template_hash" in replicaset: result += f"Pod Template Hash: {replicaset['pod_template_hash']}\n" return result def _format_configmap_data(configmap: Dict[str, Any]) -> str: """Format configmap data.""" result = "" if "data" in configmap and configmap["data"]: result += "Data:\n" for key, value in configmap["data"].items(): result += f" {key}: " # Limit output for large values if len(str(value)) > 100: result += f"{str(value)[:100]}...\n" else: result += f"{value}\n" if "binary_data" in configmap and configmap["binary_data"]: result += "Binary Data:\n" for key in configmap["binary_data"].keys(): result += f" {key}: <binary data>\n" return result def _format_secret_data(secret: Dict[str, Any]) -> str: """Format secret data (without revealing it).""" result = "" result += f"Type: {secret.get('type', 'unknown')}\n" if "data" in secret and secret["data"]: result += "Data:\n" for key in secret["data"].keys(): result += f" {key}: <sensitive data>\n" return result def _format_namespace_status(namespace: Dict[str, Any]) -> str: """Format namespace status information.""" result = "" result += f"Status: {namespace.get('status', 'unknown')}\n" if "resource_quota" in namespace and namespace["resource_quota"]: result += "Resource Quotas:\n" for quota in namespace["resource_quota"]: result += f" {quota.get('name', 'unknown')}:\n" if "hard" in quota: result += " Hard:\n" for resource, value in quota["hard"].items(): result += f" {resource}: {value}\n" if "limit_range" in namespace and namespace["limit_range"]: result += "Limit Ranges:\n" for limit in namespace["limit_range"]: result += f" {limit.get('name', 'unknown')}\n" return result def _format_pv_info(pv: Dict[str, Any]) -> str: """Format persistent volume information.""" result = "" result += f"Status: {pv.get('status', 'unknown')}\n" result += f"Capacity: {pv.get('capacity', {}).get('storage', 'unknown')}\n" result += f"Access Modes: {', '.join(pv.get('access_modes', []))}\n" result += f"Storage Class: {pv.get('storage_class', 'unknown')}\n" result += f"Reclaim Policy: {pv.get('reclaim_policy', 'unknown')}\n" if "claim_ref" in pv and pv["claim_ref"]: claim = pv["claim_ref"] result += "Claim:\n" result += f" Namespace: {claim.get('namespace', 'unknown')}\n" result += f" Name: {claim.get('name', 'unknown')}\n" if "source" in pv and pv["source"]: result += "Source:\n" for source_type, source_value in pv["source"].items(): if isinstance(source_value, dict): result += f" {source_type}:\n" for key, value in source_value.items(): result += f" {key}: {value}\n" else: result += f" {source_type}: {source_value}\n" return result def _format_pvc_info(pvc: Dict[str, Any]) -> str: """Format persistent volume claim information.""" result = "" result += f"Status: {pvc.get('status', 'unknown')}\n" result += f"Volume: {pvc.get('volume_name', 'unknown')}\n" result += f"Capacity: {pvc.get('capacity', {}).get('storage', 'unknown')}\n" result += f"Access Modes: {', '.join(pvc.get('access_modes', []))}\n" result += f"Storage Class: {pvc.get('storage_class', 'unknown')}\n" if "volume_mode" in pvc: result += f"Volume Mode: {pvc['volume_mode']}\n" return result def _format_ingress_info(ingress: Dict[str, Any]) -> str: """Format ingress information.""" result = "" result += f"Class: {ingress.get('ingress_class', 'unknown')}\n" if "rules" in ingress and ingress["rules"]: result += "Rules:\n" for rule in ingress["rules"]: if "host" in rule: result += f" Host: {rule['host']}\n" if "http" in rule and "paths" in rule["http"]: for path in rule["http"]["paths"]: result += f" Path: {path.get('path', '/')}\n" result += f" Path Type: {path.get('path_type', 'Prefix')}\n" if "backend" in path: backend = path["backend"] if "service" in backend: service = backend["service"] result += f" Backend: service={service.get('name', 'unknown')}, " if "port" in service: port = service["port"] if "number" in port: result += f"port={port['number']}\n" elif "name" in port: result += f"port={port['name']}\n" else: result += "\n" else: result += "\n" if "tls" in ingress and ingress["tls"]: result += "TLS:\n" for tls in ingress["tls"]: result += f" Hosts: {', '.join(tls.get('hosts', []))}\n" result += f" Secret Name: {tls.get('secret_name', 'unknown')}\n" return result def _format_job_info(job: Dict[str, Any]) -> str: """Format job information.""" result = "" result += f"Completions: {job.get('completions', 'unknown')}\n" result += f"Parallelism: {job.get('parallelism', 'unknown')}\n" if "status" in job: status = job["status"] result += "Status:\n" result += f" Active: {status.get('active', 0)}\n" result += f" Succeeded: {status.get('succeeded', 0)}\n" result += f" Failed: {status.get('failed', 0)}\n" if "start_time" in status: result += f"Start Time: {status['start_time']}\n" if "completion_time" in status: result += f"Completion Time: {status['completion_time']}\n" return result def _format_cronjob_info(cronjob: Dict[str, Any]) -> str: """Format cronjob information.""" result = "" result += f"Schedule: {cronjob.get('schedule', 'unknown')}\n" result += f"Concurrency Policy: {cronjob.get('concurrency_policy', 'unknown')}\n" result += f"Suspend: {cronjob.get('suspend', False)}\n" if "last_schedule_time" in cronjob: result += f"Last Schedule Time: {cronjob['last_schedule_time']}\n" if "active_jobs" in cronjob and cronjob["active_jobs"]: result += "Active Jobs:\n" for job in cronjob["active_jobs"]: result += f" {job.get('name', 'unknown')}\n" return result def _format_conditions(conditions: List[Dict[str, Any]]) -> str: """Format conditions information.""" result = "" for condition in conditions: result += f"Type: {condition.get('type', 'unknown')}\n" result += f"Status: {condition.get('status', 'unknown')}\n" if "reason" in condition: result += f"Reason: {condition['reason']}\n" if "message" in condition: result += f"Message: {condition['message']}\n" result += "\n" return result async def _get_structured_resource_info(api_client, resource_type: str, name: Optional[str] = None, namespace: Optional[str] = None, selector: Optional[str] = None, all_namespaces: bool = False) -> Dict[str, Any]: """ Helper function to get structured information about resources using the K8s API. Returns: Dict[str, Any]: Structured resource information """ # Normalize resource type (remove plural forms if present) resource_type = resource_type.lower() if resource_type.endswith('s') and resource_type not in ['access', 'endpoints', 'status']: resource_type = resource_type[:-1] # Initialize appropriate API client based on resource type result = {"items": []} try: # Handle core resources if resource_type in ['pod', 'service', 'namespace', 'node', 'secret', 'configmap', 'persistentvolume', 'persistentvolumeclaim', 'serviceaccount', 'endpoint']: core_v1 = client.CoreV1Api(api_client) # Handle different resource types if resource_type == 'pod': if name and namespace: item = core_v1.read_namespaced_pod(name, namespace) result["items"] = [_extract_pod_info(item)] elif namespace and not all_namespaces: pods = core_v1.list_namespaced_pod(namespace, label_selector=selector) result["items"] = [_extract_pod_info(pod) for pod in pods.items] else: pods = core_v1.list_pod_for_all_namespaces(label_selector=selector) result["items"] = [_extract_pod_info(pod) for pod in pods.items] elif resource_type == 'service': if name and namespace: item = core_v1.read_namespaced_service(name, namespace) result["items"] = [_extract_service_info(item)] elif namespace and not all_namespaces: services = core_v1.list_namespaced_service(namespace, label_selector=selector) result["items"] = [_extract_service_info(svc) for svc in services.items] else: services = core_v1.list_service_for_all_namespaces(label_selector=selector) result["items"] = [_extract_service_info(svc) for svc in services.items] elif resource_type == 'node': if name: item = core_v1.read_node(name) result["items"] = [_extract_node_info(item)] else: nodes = core_v1.list_node(label_selector=selector) result["items"] = [_extract_node_info(node) for node in nodes.items] elif resource_type == 'configmap': if name and namespace: item = core_v1.read_namespaced_config_map(name, namespace) result["items"] = [_extract_configmap_info(item)] elif namespace and not all_namespaces: configmaps = core_v1.list_namespaced_config_map(namespace, label_selector=selector) result["items"] = [_extract_configmap_info(cm) for cm in configmaps.items] else: configmaps = core_v1.list_config_map_for_all_namespaces(label_selector=selector) result["items"] = [_extract_configmap_info(cm) for cm in configmaps.items] elif resource_type == 'secret': if name and namespace: item = core_v1.read_namespaced_secret(name, namespace) result["items"] = [_extract_secret_info(item)] elif namespace and not all_namespaces: secrets = core_v1.list_namespaced_secret(namespace, label_selector=selector) result["items"] = [_extract_secret_info(secret) for secret in secrets.items] else: secrets = core_v1.list_secret_for_all_namespaces(label_selector=selector) result["items"] = [_extract_secret_info(secret) for secret in secrets.items] elif resource_type == 'namespace': if name: item = core_v1.read_namespace(name) result["items"] = [_extract_namespace_info(item)] else: namespaces = core_v1.list_namespace(label_selector=selector) result["items"] = [_extract_namespace_info(ns) for ns in namespaces.items] elif resource_type == 'persistentvolume': if name: item = core_v1.read_persistent_volume(name) result["items"] = [_extract_pv_info(item)] else: pvs = core_v1.list_persistent_volume(label_selector=selector) result["items"] = [_extract_pv_info(pv) for pv in pvs.items] elif resource_type == 'persistentvolumeclaim': if name and namespace: item = core_v1.read_namespaced_persistent_volume_claim(name, namespace) result["items"] = [_extract_pvc_info(item)] elif namespace and not all_namespaces: pvcs = core_v1.list_namespaced_persistent_volume_claim(namespace, label_selector=selector) result["items"] = [_extract_pvc_info(pvc) for pvc in pvcs.items] else: pvcs = core_v1.list_persistent_volume_claim_for_all_namespaces(label_selector=selector) result["items"] = [_extract_pvc_info(pvc) for pvc in pvcs.items] # Handle apps resources elif resource_type in ['deployment', 'statefulset', 'daemonset', 'replicaset']: apps_v1 = client.AppsV1Api(api_client) if resource_type == 'deployment': if name and namespace: item = apps_v1.read_namespaced_deployment(name, namespace) result["items"] = [_extract_deployment_info(item)] elif namespace and not all_namespaces: deployments = apps_v1.list_namespaced_deployment(namespace, label_selector=selector) result["items"] = [_extract_deployment_info(dep) for dep in deployments.items] else: deployments = apps_v1.list_deployment_for_all_namespaces(label_selector=selector) result["items"] = [_extract_deployment_info(dep) for dep in deployments.items] elif resource_type == 'statefulset': if name and namespace: item = apps_v1.read_namespaced_stateful_set(name, namespace) result["items"] = [_extract_statefulset_info(item)] elif namespace and not all_namespaces: statefulsets = apps_v1.list_namespaced_stateful_set(namespace, label_selector=selector) result["items"] = [_extract_statefulset_info(sts) for sts in statefulsets.items] else: statefulsets = apps_v1.list_stateful_set_for_all_namespaces(label_selector=selector) result["items"] = [_extract_statefulset_info(sts) for sts in statefulsets.items] elif resource_type == 'daemonset': if name and namespace: item = apps_v1.read_namespaced_daemon_set(name, namespace) result["items"] = [_extract_daemonset_info(item)] elif namespace and not all_namespaces: daemonsets = apps_v1.list_namespaced_daemon_set(namespace, label_selector=selector) result["items"] = [_extract_daemonset_info(ds) for ds in daemonsets.items] else: daemonsets = apps_v1.list_daemon_set_for_all_namespaces(label_selector=selector) result["items"] = [_extract_daemonset_info(ds) for ds in daemonsets.items] elif resource_type == 'replicaset': if name and namespace: item = apps_v1.read_namespaced_replica_set(name, namespace) result["items"] = [_extract_replicaset_info(item)] elif namespace and not all_namespaces: replicasets = apps_v1.list_namespaced_replica_set(namespace, label_selector=selector) result["items"] = [_extract_replicaset_info(rs) for rs in replicasets.items] else: replicasets = apps_v1.list_replica_set_for_all_namespaces(label_selector=selector) result["items"] = [_extract_replicaset_info(rs) for rs in replicasets.items] # Handle networking resources elif resource_type == 'ingress': networking_v1 = client.NetworkingV1Api(api_client) if name and namespace: item = networking_v1.read_namespaced_ingress(name, namespace) result["items"] = [_extract_ingress_info(item)] elif namespace and not all_namespaces: ingresses = networking_v1.list_namespaced_ingress(namespace, label_selector=selector) result["items"] = [_extract_ingress_info(ing) for ing in ingresses.items] else: ingresses = networking_v1.list_ingress_for_all_namespaces(label_selector=selector) result["items"] = [_extract_ingress_info(ing) for ing in ingresses.items] # Handle batch resources elif resource_type in ['job', 'cronjob']: batch_v1 = client.BatchV1Api(api_client) if resource_type == 'job': if name and namespace: item = batch_v1.read_namespaced_job(name, namespace) result["items"] = [_extract_job_info(item)] elif namespace and not all_namespaces: jobs = batch_v1.list_namespaced_job(namespace, label_selector=selector) result["items"] = [_extract_job_info(job) for job in jobs.items] else: jobs = batch_v1.list_job_for_all_namespaces(label_selector=selector) result["items"] = [_extract_job_info(job) for job in jobs.items] elif resource_type == 'cronjob': if name and namespace: item = batch_v1.read_namespaced_cron_job(name, namespace) result["items"] = [_extract_cronjob_info(item)] elif namespace and not all_namespaces: cronjobs = batch_v1.list_namespaced_cron_job(namespace, label_selector=selector) result["items"] = [_extract_cronjob_info(cj) for cj in cronjobs.items] else: cronjobs = batch_v1.list_cron_job_for_all_namespaces(label_selector=selector) result["items"] = [_extract_cronjob_info(cj) for cj in cronjobs.items] # For other resources, use the custom objects API if possible else: result["message"] = f"Structured data not available for resource type '{resource_type}'" except Exception as e: result["error"] = str(e) return result def _extract_pod_info(pod) -> Dict[str, Any]: """Extract key information from a pod object.""" return { "name": pod.metadata.name, "namespace": pod.metadata.namespace, "status_phase": pod.status.phase, "node": pod.spec.node_name, "ip": pod.status.pod_ip, "start_time": pod.status.start_time.isoformat() if pod.status.start_time else None, "containers": [ { "name": container.name, "image": container.image, "ports": [ {"container_port": port.container_port, "protocol": port.protocol} for port in (container.ports or []) ], "resources": { "requests": container.resources.requests if hasattr(container.resources, "requests") else {}, "limits": container.resources.limits if hasattr(container.resources, "limits") else {} } if container.resources else {} } for container in pod.spec.containers ], "conditions": [ { "type": condition.type, "status": condition.status, "reason": condition.reason } for condition in (pod.status.conditions or []) ], "volumes": [ { "name": volume.name, "type": _determine_volume_type(volume) } for volume in (pod.spec.volumes or []) ] } def _extract_service_info(svc) -> Dict[str, Any]: """Extract key information from a service object.""" return { "name": svc.metadata.name, "namespace": svc.metadata.namespace, "type": svc.spec.type, "cluster_ip": svc.spec.cluster_ip, "external_ips": svc.spec.external_i_ps if hasattr(svc.spec, "external_i_ps") else [], "ports": [ { "name": port.name, "protocol": port.protocol, "port": port.port, "target_port": port.target_port, "node_port": port.node_port if hasattr(port, "node_port") else None } for port in (svc.spec.ports or []) ], "selector": svc.spec.selector or {} } def _extract_node_info(node) -> Dict[str, Any]: """Extract key information from a node object.""" return { "name": node.metadata.name, "labels": node.metadata.labels or {}, "status": { "capacity": node.status.capacity, "allocatable": node.status.allocatable, "conditions": [ { "type": condition.type, "status": condition.status, "reason": condition.reason, "message": condition.message } for condition in (node.status.conditions or []) ], "addresses": [ {"type": addr.type, "address": addr.address} for addr in (node.status.addresses or []) ] }, "spec": { "pod_cidr": node.spec.pod_cidr if hasattr(node.spec, "pod_cidr") else None, "taints": [ { "key": taint.key, "value": taint.value, "effect": taint.effect } for taint in (node.spec.taints or []) ] if hasattr(node.spec, "taints") else [] } } def _extract_deployment_info(deployment) -> Dict[str, Any]: """Extract key information from a deployment object.""" return { "name": deployment.metadata.name, "namespace": deployment.metadata.namespace, "replicas": deployment.spec.replicas, "strategy": deployment.spec.strategy.type if deployment.spec.strategy else None, "selector": deployment.spec.selector.match_labels if deployment.spec.selector else {}, "status": { "replicas": deployment.status.replicas, "ready_replicas": deployment.status.ready_replicas, "updated_replicas": deployment.status.updated_replicas, "available_replicas": deployment.status.available_replicas, "unavailable_replicas": deployment.status.unavailable_replicas, "conditions": [ { "type": condition.type, "status": condition.status, "reason": condition.reason, "message": condition.message } for condition in (deployment.status.conditions or []) ] if deployment.status.conditions else [] } } def _extract_statefulset_info(statefulset) -> Dict[str, Any]: """Extract key information from a statefulset object.""" return { "name": statefulset.metadata.name, "namespace": statefulset.metadata.namespace, "replicas": statefulset.spec.replicas, "service_name": statefulset.spec.service_name, "selector": statefulset.spec.selector.match_labels if statefulset.spec.selector else {}, "update_strategy": statefulset.spec.update_strategy.type if hasattr(statefulset.spec, "update_strategy") else None, "status": { "replicas": statefulset.status.replicas, "ready_replicas": statefulset.status.ready_replicas, "current_replicas": statefulset.status.current_replicas, "updated_replicas": statefulset.status.updated_replicas, "conditions": [ { "type": condition.type, "status": condition.status, "reason": condition.reason, "message": condition.message } for condition in (statefulset.status.conditions or []) ] if hasattr(statefulset.status, "conditions") and statefulset.status.conditions else [] } } def _extract_daemonset_info(daemonset) -> Dict[str, Any]: """Extract key information from a daemonset object.""" return { "name": daemonset.metadata.name, "namespace": daemonset.metadata.namespace, "desired_number": daemonset.status.desired_number_scheduled, "update_strategy": daemonset.spec.update_strategy.type if hasattr(daemonset.spec, "update_strategy") else None, "selector": daemonset.spec.selector.match_labels if daemonset.spec.selector else {}, "status": { "current_number": daemonset.status.current_number_scheduled, "ready_number": daemonset.status.number_ready, "updated_number": daemonset.status.updated_number_scheduled, "available_number": daemonset.status.number_available, "conditions": [ { "type": condition.type, "status": condition.status, "reason": condition.reason, "message": condition.message } for condition in (daemonset.status.conditions or []) ] if hasattr(daemonset.status, "conditions") and daemonset.status.conditions else [] } } def _extract_replicaset_info(replicaset) -> Dict[str, Any]: """Extract key information from a replicaset object.""" pod_template_hash = None if replicaset.metadata.labels and "pod-template-hash" in replicaset.metadata.labels: pod_template_hash = replicaset.metadata.labels["pod-template-hash"] return { "name": replicaset.metadata.name, "namespace": replicaset.metadata.namespace, "replicas": replicaset.spec.replicas, "selector": replicaset.spec.selector.match_labels if replicaset.spec.selector else {}, "pod_template_hash": pod_template_hash, "status": { "replicas": replicaset.status.replicas, "ready_replicas": replicaset.status.ready_replicas, "available_replicas": replicaset.status.available_replicas, "fully_labeled_replicas": replicaset.status.fully_labeled_replicas, "conditions": [ { "type": condition.type, "status": condition.status, "reason": condition.reason, "message": condition.message } for condition in (replicaset.status.conditions or []) ] if hasattr(replicaset.status, "conditions") and replicaset.status.conditions else [] } } def _extract_configmap_info(configmap) -> Dict[str, Any]: """Extract key information from a configmap object.""" return { "name": configmap.metadata.name, "namespace": configmap.metadata.namespace, "data": configmap.data or {}, "binary_data": configmap.binary_data or {}, "labels": configmap.metadata.labels or {}, "annotations": configmap.metadata.annotations or {}, "creation_timestamp": configmap.metadata.creation_timestamp.isoformat() if configmap.metadata.creation_timestamp else None } def _extract_secret_info(secret) -> Dict[str, Any]: """Extract key information from a secret object (without revealing data).""" return { "name": secret.metadata.name, "namespace": secret.metadata.namespace, "type": secret.type, "data": {key: "<sensitive data>" for key in (secret.data or {}).keys()}, "labels": secret.metadata.labels or {}, "annotations": secret.metadata.annotations or {}, "creation_timestamp": secret.metadata.creation_timestamp.isoformat() if secret.metadata.creation_timestamp else None } def _extract_namespace_info(namespace) -> Dict[str, Any]: """Extract key information from a namespace object.""" return { "name": namespace.metadata.name, "status": namespace.status.phase if hasattr(namespace.status, "phase") else "Active", "labels": namespace.metadata.labels or {}, "annotations": namespace.metadata.annotations or {}, "creation_timestamp": namespace.metadata.creation_timestamp.isoformat() if namespace.metadata.creation_timestamp else None, # Resource quota and limit range would need to be fetched separately "resource_quota": [], "limit_range": [] } def _extract_pv_info(pv) -> Dict[str, Any]: """Extract key information from a persistent volume object.""" source = {} # Extract volume source type and details volume_source_attributes = [ "host_path", "nfs", "gce_persistent_disk", "aws_elastic_block_store", "iscsi", "cinder", "cephfs", "fc", "flex_volume", "vsphere_volume", "quobyte", "azure_disk", "azure_file", "csi" ] for source_type in volume_source_attributes: if hasattr(pv.spec, source_type) and getattr(pv.spec, source_type) is not None: source_obj = getattr(pv.spec, source_type) if source_type == "host_path": source[source_type] = {"path": source_obj.path} elif source_type == "nfs": source[source_type] = {"server": source_obj.server, "path": source_obj.path} elif source_type == "csi": source[source_type] = { "driver": source_obj.driver, "volume_handle": source_obj.volume_handle } else: source[source_type] = "<volume details available>" claim_ref = None if hasattr(pv.spec, "claim_ref") and pv.spec.claim_ref: claim_ref = { "namespace": pv.spec.claim_ref.namespace, "name": pv.spec.claim_ref.name } return { "name": pv.metadata.name, "status": pv.status.phase, "capacity": pv.spec.capacity, "access_modes": pv.spec.access_modes, "reclaim_policy": pv.spec.persistent_volume_reclaim_policy, "storage_class": pv.spec.storage_class_name, "source": source, "claim_ref": claim_ref, "labels": pv.metadata.labels or {}, "annotations": pv.metadata.annotations or {}, "creation_timestamp": pv.metadata.creation_timestamp.isoformat() if pv.metadata.creation_timestamp else None } def _extract_pvc_info(pvc) -> Dict[str, Any]: """Extract key information from a persistent volume claim object.""" return { "name": pvc.metadata.name, "namespace": pvc.metadata.namespace, "status": pvc.status.phase, "volume_name": pvc.spec.volume_name, "capacity": pvc.status.capacity if hasattr(pvc.status, "capacity") else {}, "access_modes": pvc.spec.access_modes, "storage_class": pvc.spec.storage_class_name, "volume_mode": pvc.spec.volume_mode, "labels": pvc.metadata.labels or {}, "annotations": pvc.metadata.annotations or {}, "creation_timestamp": pvc.metadata.creation_timestamp.isoformat() if pvc.metadata.creation_timestamp else None } def _extract_ingress_info(ingress) -> Dict[str, Any]: """Extract key information from an ingress object.""" rules = [] if ingress.spec.rules: for rule in ingress.spec.rules: rule_info = {"host": rule.host} if rule.http and rule.http.paths: paths = [] for path in rule.http.paths: path_info = { "path": path.path, "path_type": path.path_type } if path.backend and path.backend.service: service_info = { "name": path.backend.service.name } if path.backend.service.port: if hasattr(path.backend.service.port, "number"): service_info["port"] = {"number": path.backend.service.port.number} elif hasattr(path.backend.service.port, "name"): service_info["port"] = {"name": path.backend.service.port.name} path_info["backend"] = {"service": service_info} paths.append(path_info) rule_info["http"] = {"paths": paths} rules.append(rule_info) tls = [] if ingress.spec.tls: for tls_item in ingress.spec.tls: tls.append({ "hosts": tls_item.hosts, "secret_name": tls_item.secret_name }) return { "name": ingress.metadata.name, "namespace": ingress.metadata.namespace, "ingress_class": ingress.spec.ingress_class_name, "rules": rules, "tls": tls, "labels": ingress.metadata.labels or {}, "annotations": ingress.metadata.annotations or {}, "creation_timestamp": ingress.metadata.creation_timestamp.isoformat() if ingress.metadata.creation_timestamp else None } def _extract_job_info(job) -> Dict[str, Any]: """Extract key information from a job object.""" return { "name": job.metadata.name, "namespace": job.metadata.namespace, "completions": job.spec.completions, "parallelism": job.spec.parallelism, "status": { "active": job.status.active, "succeeded": job.status.succeeded, "failed": job.status.failed, "start_time": job.status.start_time.isoformat() if job.status.start_time else None, "completion_time": job.status.completion_time.isoformat() if job.status.completion_time else None, "conditions": [ { "type": condition.type, "status": condition.status, "reason": condition.reason, "message": condition.message } for condition in (job.status.conditions or []) ] if hasattr(job.status, "conditions") and job.status.conditions else [] }, "labels": job.metadata.labels or {}, "annotations": job.metadata.annotations or {}, "creation_timestamp": job.metadata.creation_timestamp.isoformat() if job.metadata.creation_timestamp else None } def _extract_cronjob_info(cronjob) -> Dict[str, Any]: """Extract key information from a cronjob object.""" active_jobs = [] if hasattr(cronjob.status, "active") and cronjob.status.active: for job_ref in cronjob.status.active: active_jobs.append({ "name": job_ref.name, "namespace": job_ref.namespace }) return { "name": cronjob.metadata.name, "namespace": cronjob.metadata.namespace, "schedule": cronjob.spec.schedule, "concurrency_policy": cronjob.spec.concurrency_policy, "suspend": cronjob.spec.suspend, "last_schedule_time": cronjob.status.last_schedule_time.isoformat() if hasattr(cronjob.status, "last_schedule_time") and cronjob.status.last_schedule_time else None, "active_jobs": active_jobs, "labels": cronjob.metadata.labels or {}, "annotations": cronjob.metadata.annotations or {}, "creation_timestamp": cronjob.metadata.creation_timestamp.isoformat() if cronjob.metadata.creation_timestamp else None } def _determine_volume_type(volume) -> str: """Determine the type of a volume based on its attributes.""" volume_types = [ "host_path", "empty_dir", "gce_persistent_disk", "aws_elastic_block_store", "secret", "config_map", "persistent_volume_claim", "projected" ] for vol_type in volume_types: if hasattr(volume, vol_type): return vol_type return "unknown"

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/feibai406/k8s-multicluster-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server