Skip to main content
Glama

mcp-server-kubernetes

by Flux159
import { expect, test, describe, beforeEach, afterEach } from "vitest"; import { Client } from "@modelcontextprotocol/sdk/client/index.js"; import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js"; import { z } from "zod"; // Define the response type for easier use in tests const KubectlResponseSchema = z.object({ content: z.array( z.object({ type: z.literal("text"), text: z.string() }) ) }); type KubectlResponse = z.infer<typeof KubectlResponseSchema>; async function sleep(ms: number): Promise<void> { return new Promise((resolve) => setTimeout(resolve, ms)); } // Helper function to retry operations that might be flaky async function retry<T>( operation: () => Promise<T>, maxRetries: number = 2, delayMs: number = 1000 ): Promise<T> { let lastError: Error | unknown; for (let attempt = 1; attempt <= maxRetries; attempt++) { try { return await operation(); } catch (error) { lastError = error; console.warn( `Attempt ${attempt}/${maxRetries} failed. Retrying in ${delayMs}ms...` ); await sleep(delayMs); } } throw lastError; } describe("kubectl_rollout command", () => { let transport: StdioClientTransport; let client: Client; const testNamespace = "rollout-test-" + Math.random().toString(36).substring(2, 7); const deploymentName = "rollout-test-app"; beforeEach(async () => { transport = new StdioClientTransport({ command: "bun", args: ["src/index.ts"], stderr: "pipe", }); client = new Client( { name: "test-client", version: "1.0.0", }, { capabilities: {}, } ); await client.connect(transport); await sleep(1000); // Create a test namespace await retry(async () => { await client.request( { method: "tools/call", params: { name: "kubectl_create", arguments: { resourceType: "namespace", name: testNamespace }, }, }, z.any() ); }); // Create a test deployment const deploymentManifest = { apiVersion: "apps/v1", kind: "Deployment", metadata: { name: deploymentName, namespace: testNamespace }, spec: { replicas: 1, selector: { matchLabels: { app: "rollout-test-app" } }, template: { metadata: { labels: { app: "rollout-test-app" } }, spec: { containers: [ { name: "nginx", image: "nginx:1.20.0", // Start with specific version ports: [ { containerPort: 80 } ] } ] } } } }; await retry(async () => { await client.request( { method: "tools/call", params: { name: "kubectl_apply", arguments: { manifest: JSON.stringify(deploymentManifest), namespace: testNamespace }, }, }, z.any() ); }); // Wait for deployment to be ready await retry(async () => { const response = await client.request( { method: "tools/call", params: { name: "kubectl_rollout", arguments: { subCommand: "status", resourceType: "deployment", name: deploymentName, namespace: testNamespace }, }, }, z.any() ) as KubectlResponse; if (!response.content[0].text.includes("successfully rolled out")) { throw new Error("Deployment not ready yet"); } return response; }, 3, 1500); // Optimized from 5 retries with 3000ms delay }); afterEach(async () => { try { // Delete the test namespace and all resources in it try { await client.request( { method: "tools/call", params: { name: "kubectl_delete", arguments: { resourceType: "namespace", name: testNamespace, force: true }, }, }, z.any() ); } catch (e) { // Ignore error if namespace doesn't exist } await transport.close(); await sleep(1000); } catch (e) { console.error("Error during cleanup:", e); } }); test("kubectl_rollout can check status of a deployment", async () => { const result = await client.request( { method: "tools/call", params: { name: "kubectl_rollout", arguments: { subCommand: "status", resourceType: "deployment", name: deploymentName, namespace: testNamespace }, }, }, z.any() ) as KubectlResponse; expect(result.content[0].type).toBe("text"); expect(result.content[0].text).toContain("successfully rolled out"); }); test("kubectl_rollout can restart a deployment", async () => { const result = await client.request( { method: "tools/call", params: { name: "kubectl_rollout", arguments: { subCommand: "restart", resourceType: "deployment", name: deploymentName, namespace: testNamespace }, }, }, z.any() ) as KubectlResponse; expect(result.content[0].type).toBe("text"); expect(result.content[0].text).toContain("restarted"); // Wait for the restart to complete await retry(async () => { const response = await client.request( { method: "tools/call", params: { name: "kubectl_rollout", arguments: { subCommand: "status", resourceType: "deployment", name: deploymentName, namespace: testNamespace }, }, }, z.any() ) as KubectlResponse; if (!response.content[0].text.includes("successfully rolled out")) { throw new Error("Deployment restart not complete"); } return response; }, 3, 1500); // Optimized from 5 retries with 3000ms delay }); test("kubectl_rollout can pause and resume a deployment", async () => { // Pause the deployment const pauseResult = await client.request( { method: "tools/call", params: { name: "kubectl_rollout", arguments: { subCommand: "pause", resourceType: "deployment", name: deploymentName, namespace: testNamespace }, }, }, z.any() ) as KubectlResponse; expect(pauseResult.content[0].type).toBe("text"); expect(pauseResult.content[0].text).toContain("paused"); // Verify it's paused by checking the deployment const getResult = await client.request( { method: "tools/call", params: { name: "kubectl_get", arguments: { resourceType: "deployment", name: deploymentName, namespace: testNamespace, output: "json" }, }, }, z.any() ) as KubectlResponse; const deployment = JSON.parse(getResult.content[0].text); expect(deployment.spec.paused).toBe(true); // Resume the deployment const resumeResult = await client.request( { method: "tools/call", params: { name: "kubectl_rollout", arguments: { subCommand: "resume", resourceType: "deployment", name: deploymentName, namespace: testNamespace }, }, }, z.any() ) as KubectlResponse; expect(resumeResult.content[0].type).toBe("text"); expect(resumeResult.content[0].text).toContain("resumed"); // Verify it's resumed const getUpdatedResult = await client.request( { method: "tools/call", params: { name: "kubectl_get", arguments: { resourceType: "deployment", name: deploymentName, namespace: testNamespace, output: "json" }, }, }, z.any() ) as KubectlResponse; const updatedDeployment = JSON.parse(getUpdatedResult.content[0].text); // Check if paused is either false or undefined (both mean not paused) expect(updatedDeployment.spec.paused || false).toBe(false); }); test("kubectl_rollout can show history of a deployment", async () => { // Get the rollout history const historyResult = await client.request( { method: "tools/call", params: { name: "kubectl_rollout", arguments: { subCommand: "history", resourceType: "deployment", name: deploymentName, namespace: testNamespace }, }, }, z.any() ) as KubectlResponse; expect(historyResult.content[0].type).toBe("text"); expect(historyResult.content[0].text).toContain("REVISION"); // There should be at least one revision expect(historyResult.content[0].text).toMatch(/1\s+/); }); test("kubectl_rollout handles errors gracefully", async () => { const nonExistentResource = "non-existent-deployment-" + Date.now(); try { await client.request( { method: "tools/call", params: { name: "kubectl_rollout", arguments: { subCommand: "status", resourceType: "deployment", name: nonExistentResource, namespace: testNamespace }, }, }, z.any() ); // If we get here, the test has failed expect(true).toBe(false); // This should not execute } catch (error: any) { // Expect an error response expect(error.message).toContain("Failed to execute rollout command"); } }); }); describe("kubectl_rollout command error handling", () => { let transport: StdioClientTransport; let client: Client; beforeEach(async () => { transport = new StdioClientTransport({ command: "bun", args: ["src/index.ts"], stderr: "pipe", }); client = new Client( { name: "test-client", version: "1.0.0", }, { capabilities: {}, } ); await client.connect(transport); await sleep(1000); }); afterEach(async () => { try { await transport.close(); await sleep(1000); } catch (e) { console.error("Error during cleanup:", e); } }); test("kubectl_rollout handles errors gracefully", async () => { const nonExistentResource = "non-existent-deployment-" + Date.now(); try { await client.request( { method: "tools/call", params: { name: "kubectl_rollout", arguments: { subCommand: "status", resourceType: "deployment", name: nonExistentResource, namespace: "default" }, }, }, KubectlResponseSchema ); // If we get here, the test has failed expect(true).toBe(false); // This should not execute } catch (error: any) { // Expect an error response expect(error.message).toContain("Failed to execute rollout command"); } }); test("kubectl_generic can execute rollout commands", async () => { try { // This should fail but in a controlled way so we can test the error handling await client.request( { method: "tools/call", params: { name: "kubectl_generic", arguments: { command: "rollout", subCommand: "status", resourceType: "deployment", name: "nginx", // Likely doesn't exist but gives predictable error namespace: "default" }, }, }, KubectlResponseSchema ); // If we get here, it might be because the deployment actually exists // So we don't fail the test, just note it } catch (error: any) { // This is expected, just make sure it's a proper error expect(error.message).toBeTruthy(); } }); });

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Flux159/mcp-server-kubernetes'

If you have feedback or need assistance with the MCP directory API, please join our Discord server