Skip to main content
Glama
pod.py12.1 kB
from typing import Optional, Dict, List from core.context import use_current_context from core.permissions import check_readonly_permission from server.server import mcp from kubernetes.client import CoreV1Api from core.kubeconfig import get_api_clients @mcp.tool() @use_current_context def pod_list(context_name: str, namespace: str): """ List all pods in a given namespace. Args: context_name: The Kubernetes context name namespace: The Kubernetes namespace Returns: List of pod basic information """ core_v1: CoreV1Api = get_api_clients(context_name)["core"] pods = core_v1.list_namespaced_pod(namespace) result = [{"name": pod.metadata.name} for pod in pods.items] return result @mcp.tool() @use_current_context def pod_detail(context_name: str, namespace: str, name: str): """ Get details of a specific pod. Args: context_name: The Kubernetes context name namespace: The Kubernetes namespace name: The pod name Returns: Detailed information about the pod """ core_v1: CoreV1Api = get_api_clients(context_name)["core"] pod = core_v1.read_namespaced_pod(name, namespace) containers = [] for c in pod.spec.containers: container_info = { "name": c.name, "image": c.image, "ports": [{"container_port": p.container_port, "protocol": p.protocol} for p in (c.ports or [])], "resources": { "requests": c.resources.requests if c.resources and hasattr(c.resources, "requests") else {}, "limits": c.resources.limits if c.resources and hasattr(c.resources, "limits") else {} }, "environment": [{"name": env.name, "value": env.value if hasattr(env, "value") else "from secret"} for env in (c.env or [])] } containers.append(container_info) volumes = [] if pod.spec.volumes: for vol in pod.spec.volumes: volume_info = {"name": vol.name} # 볼륨 타입 확인 및 정보 추가 if hasattr(vol, "config_map") and vol.config_map: volume_info["type"] = "configMap" volume_info["config_map_name"] = vol.config_map.name elif hasattr(vol, "secret") and vol.secret: volume_info["type"] = "secret" volume_info["secret_name"] = vol.secret.secret_name elif hasattr(vol, "persistent_volume_claim") and vol.persistent_volume_claim: volume_info["type"] = "pvc" volume_info["claim_name"] = vol.persistent_volume_claim.claim_name elif hasattr(vol, "host_path") and vol.host_path: volume_info["type"] = "hostPath" volume_info["path"] = vol.host_path.path elif hasattr(vol, "empty_dir") and vol.empty_dir: volume_info["type"] = "emptyDir" else: volume_info["type"] = "other" volumes.append(volume_info) conditions = [] if pod.status.conditions: for condition in pod.status.conditions: conditions.append({ "type": condition.type, "status": condition.status, "last_transition_time": condition.last_transition_time, "reason": condition.reason, "message": condition.message }) networking = { "pod_ip": pod.status.pod_ip, "host_ip": pod.status.host_ip, "node_name": pod.spec.node_name } metadata = { "creation_timestamp": pod.metadata.creation_timestamp, "labels": pod.metadata.labels or {}, "annotations": pod.metadata.annotations or {}, "owner_references": [{ "kind": ref.kind, "name": ref.name, "uid": ref.uid } for ref in (pod.metadata.owner_references or [])] } status_info = { "phase": pod.status.phase, "start_time": pod.status.start_time, "container_statuses": [] } if pod.status.container_statuses: for cs in pod.status.container_statuses: container_status = { "name": cs.name, "ready": cs.ready, "restart_count": cs.restart_count, "image": cs.image, "image_id": cs.image_id, "container_id": cs.container_id } if cs.state: state_info = {} if hasattr(cs.state, "running") and cs.state.running: state_info["current"] = "running" state_info["started_at"] = cs.state.running.started_at elif hasattr(cs.state, "waiting") and cs.state.waiting: state_info["current"] = "waiting" state_info["reason"] = cs.state.waiting.reason state_info["message"] = cs.state.waiting.message elif hasattr(cs.state, "terminated") and cs.state.terminated: state_info["current"] = "terminated" state_info["exit_code"] = cs.state.terminated.exit_code state_info["reason"] = cs.state.terminated.reason state_info["message"] = cs.state.terminated.message state_info["started_at"] = cs.state.terminated.started_at state_info["finished_at"] = cs.state.terminated.finished_at container_status["state"] = state_info status_info["container_statuses"].append(container_status) result = { "name": pod.metadata.name, "namespace": pod.metadata.namespace, "status": status_info, "spec": { "containers": containers, "volumes": volumes, "restart_policy": pod.spec.restart_policy, "service_account": pod.spec.service_account, "dns_policy": pod.spec.dns_policy, "node_selector": pod.spec.node_selector or {}, "tolerations": [{ "key": t.key, "operator": t.operator, "effect": t.effect, "toleration_seconds": t.toleration_seconds } for t in (pod.spec.tolerations or [])] }, "metadata": metadata, "networking": networking, "conditions": conditions } return result @mcp.tool() @use_current_context @check_readonly_permission def pod_create(context_name: str, namespace: str, name: str, image: str, labels: Optional[Dict[str, str]] = None, command: Optional[List[str]] = None, args: Optional[List[str]] = None, env_vars: Optional[Dict[str, str]] = None): """ Create a new pod in the specified namespace. Args: context_name: The Kubernetes context name namespace: The Kubernetes namespace name: The name for the new pod image: The container image to use labels: Optional dictionary of pod labels command: Optional command to run in the container args: Optional arguments for the command env_vars: Optional environment variables for the container Returns: Information about the created pod """ from kubernetes.client import V1Pod, V1ObjectMeta, V1PodSpec, V1Container, V1EnvVar core_v1: CoreV1Api = get_api_clients(context_name)["core"] # Prepare environment variables if provided container_env = None if env_vars: container_env = [V1EnvVar(name=k, value=v) for k, v in env_vars.items()] # Create container container = V1Container( name=name, image=image, command=command, args=args, env=container_env ) # Create pod spec pod_spec = V1PodSpec(containers=[container]) # Create pod metadata pod_metadata = V1ObjectMeta(name=name, namespace=namespace, labels=labels) # Create pod pod = V1Pod( api_version="v1", kind="Pod", metadata=pod_metadata, spec=pod_spec ) # Create the pod in Kubernetes created_pod = core_v1.create_namespaced_pod(namespace=namespace, body=pod) result = { "name": created_pod.metadata.name, "namespace": created_pod.metadata.namespace, "status": "Created", } return result @mcp.tool() @use_current_context @check_readonly_permission def pod_update(context_name: str, namespace: str, name: str, labels: Optional[Dict[str, str]] = None): """ Update an existing pod's metadata (only labels can be updated for an existing pod). Args: context_name: The Kubernetes context name namespace: The Kubernetes namespace name: The pod name labels: New labels to apply to the pod Returns: Information about the updated pod """ core_v1: CoreV1Api = get_api_clients(context_name)["core"] # Get the current pod pod = core_v1.read_namespaced_pod(name=name, namespace=namespace) # Update pod labels if provided if labels: pod.metadata.labels = labels # Update the pod in Kubernetes updated_pod = core_v1.patch_namespaced_pod( name=name, namespace=namespace, body={"metadata": {"labels": labels}} ) result = { "name": updated_pod.metadata.name, "namespace": updated_pod.metadata.namespace, "status": updated_pod.status.phase, "labels": updated_pod.metadata.labels, } return result @mcp.tool() @use_current_context @check_readonly_permission def pod_delete(context_name: str, namespace: str, name: str): """ Delete a pod from the specified namespace. Args: context_name: The Kubernetes context name namespace: The Kubernetes namespace name: The pod name to delete Returns: Status of the deletion operation """ core_v1: CoreV1Api = get_api_clients(context_name)["core"] try: # Delete the pod api_response = core_v1.delete_namespaced_pod( name=name, namespace=namespace, body={} # Default deletion options ) # Check if the response indicates success if api_response.status == "Success": return { "name": name, "namespace": namespace, "status": "Deleted", "message": f"Pod {name} deleted successfully" } else: return { "name": name, "namespace": namespace, "status": "Failed", "message": f"Failed to delete pod {name}: {api_response.status}" } except Exception as e: return { "name": name, "namespace": namespace, "status": "Error", "message": f"An error occurred while deleting pod {name}: {str(e)}" } @mcp.tool() @use_current_context def pod_logs(context_name: str, namespace: str, name: str, container: str = None, tail_lines: int = 100, previous: bool = False): """ Get logs from a pod or a specific container within the pod. Args: context_name: The Kubernetes context name namespace: The Kubernetes namespace name: The pod name container: Optional container name (if pod has multiple containers) tail_lines: Number of lines to retrieve from the end of the logs previous: Whether to get logs from a previous instance of the container Returns: Pod logs """ core_v1: CoreV1Api = get_api_clients(context_name)["core"] logs = core_v1.read_namespaced_pod_log( name=name, namespace=namespace, container=container, tail_lines=tail_lines, previous=previous ) result = { "name": name, "namespace": namespace, "container": container, "logs": logs } return result

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/bourbonkk/k8s-pilot'

If you have feedback or need assistance with the MCP directory API, please join our Discord server