ako_clusters
List all Kubernetes clusters that have AKO deployed, including their version and operational status.
Instructions
[READ] List all K8s clusters that have AKO deployed, with version and status.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
| result | Yes |
Implementation Reference
- mcp_server/server.py:471-474 (handler)MCP tool handler for 'ako_clusters'. Decorated with @mcp.tool and @vmware_tool, it imports list_clusters from vmware_avi.ops.ako_multi_cluster and runs it via _capture_output to return formatted Rich console output as a string.
def ako_clusters() -> str: """[READ] List all K8s clusters that have AKO deployed, with version and status.""" from vmware_avi.ops.ako_multi_cluster import list_clusters return _capture_output(list_clusters) - Core implementation of list_clusters() invoked by the ako_clusters tool. Runs 'kubectl config get-contexts' to discover all K8s contexts, then checks each for AKO pods in avi-system namespace to report status and version via a Rich table.
def list_clusters() -> None: """List all K8s contexts and their AKO deployment status.""" result = subprocess.run( ["kubectl", "config", "get-contexts", "-o", "name"], capture_output=True, text=True, timeout=30, ) if result.returncode != 0: console.print(f"[red]Failed to list contexts: {result.stderr.strip()}[/red]") raise SystemExit(1) contexts = [c.strip() for c in result.stdout.strip().split("\n") if c.strip()] table = Table(title="AKO Cluster Overview") table.add_column("Context") table.add_column("AKO Status") table.add_column("AKO Version") for ctx in contexts: pod_check = subprocess.run( [ "kubectl", "--context", ctx, "get", "pods", "-n", "avi-system", "-l", "app.kubernetes.io/name=ako", "-o", "jsonpath={.items[0].status.phase}:{.items[0].spec.containers[0].image}", ], capture_output=True, text=True, timeout=30, ) if pod_check.returncode == 0 and pod_check.stdout: parts = pod_check.stdout.split(":", 1) phase = parts[0] if parts else "Unknown" image = parts[1] if len(parts) > 1 else "N/A" version = image.split(":")[-1] if ":" in image else "latest" status_color = "green" if phase == "Running" else "red" table.add_row(ctx, f"[{status_color}]{phase}[/{status_color}]", version) else: table.add_row(ctx, "[dim]Not deployed[/dim]", "-") console.print(table) - tests/test_mcp_tools.py:44-47 (registration)Test registration entry confirming 'ako_clusters' is part of the expected 29-tool set exposed by the MCP server.
"ako_clusters", "ako_cluster_overview", "ako_amko_status", } - vmware_avi/cli.py:391-396 (registration)CLI command registration for 'ako clusters' subcommand that also calls list_clusters(), mirroring the MCP tool functionality.
@ako_app.command("clusters") def ako_clusters_cmd() -> None: """List all clusters with AKO deployed.""" from vmware_avi.ops.ako_multi_cluster import list_clusters list_clusters()