Skip to main content
Glama

mcp-server-kubernetes

by Flux159
sse.test.ts7.81 kB
import { expect, test, describe, beforeAll, afterAll } from "vitest"; import { Server } from "@modelcontextprotocol/sdk/server/index.js"; import { startSSEServer } from "../src/utils/sse.js"; import { CallToolRequestSchema, ListToolsRequestSchema, } from "@modelcontextprotocol/sdk/types.js"; import { KubernetesManager } from "../src/utils/kubernetes-manager.js"; import { kubectlGetSchema, kubectlGet } from "../src/tools/kubectl-get.js"; import express from "express"; import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js"; import { findAvailablePort } from "./port-helper.js"; // Modified version of startSSEServer that returns the Express app function startSSEServerWithReturn(server: Server): Promise<any> { return new Promise((resolve, reject) => { const app = express(); let transports: Array<SSEServerTransport> = []; app.get("/sse", async (req, res) => { const transport = new SSEServerTransport("/messages", res); transports.push(transport); await server.connect(transport); }); app.post("/messages", (req, res) => { const transport = transports.find( (t) => t.sessionId === req.query.sessionId ); if (transport) { transport.handlePostMessage(req, res); } else { res .status(404) .send("Not found. Must pass valid sessionId as query param."); } }); app.get("/health", async (req, res) => { res.json({ status: "ok" }); }); app.get("/ready", async (req, res) => { // For Kubernetes readiness probe try { res.json({ status: "ready", timestamp: new Date().toISOString(), service: "mcp-kubernetes-server" }); } catch (error) { console.error("Readiness check failed:", error); res.status(503).json({ status: "not ready", reason: "Server initialization incomplete", timestamp: new Date().toISOString() }); } }); const port = parseInt(process.env.PORT || "3000"); const serverInstance = app.listen(port, () => { console.log( `mcp-kubernetes-server is listening on port ${port}\nUse the following url to connect to the server:\nhttp://localhost:${port}/sse` ); resolve(serverInstance); }); serverInstance.on("error", reject); }); } describe("SSE transport", () => { let server: Server; let serverUrl: string; let actualPort: number; let expressApp: any; beforeAll(async () => { const k8sManager = new KubernetesManager(); // Create a minimal server with just the kubectl_get tool server = new Server( { name: "test-server", version: "1.0.0", }, { capabilities: { tools: {}, }, } ); // Set up the kubectl_list tool server.setRequestHandler(ListToolsRequestSchema, async () => { return { tools: [kubectlGetSchema], }; }); server.setRequestHandler(CallToolRequestSchema, async (request) => { const { name, arguments: input = {} } = request.params; switch (name) { case "kubectl_get": return await kubectlGet( k8sManager, input as { resourceType: string; name?: string; namespace?: string; output?: string; allNamespaces?: boolean; labelSelector?: string; fieldSelector?: string; sortBy?: string; } ); default: throw new Error(`Unknown tool: ${name}`); } }); // Find an available port instead of using a fixed one actualPort = await findAvailablePort(3001); process.env.PORT = actualPort.toString(); // Start the SSE server and get the Express app reference expressApp = await startSSEServerWithReturn(server); serverUrl = `http://localhost:${actualPort}`; // Wait a bit for server to fully start await new Promise((resolve) => setTimeout(resolve, 1000)); }); afterAll(async () => { try { if (expressApp && expressApp.close) { expressApp.close(); } await server.close(); } catch (error) { console.warn("Error during cleanup:", error); } }); test("should respond to readiness check", async () => { try { // Send a GET request to the /ready endpoint const readyUrl = `${serverUrl}/ready`; const response = await fetch(readyUrl, { method: "GET", headers: { accept: "application/json", }, }); expect(response.status).toBe(200); const responseJson = await response.json(); expect(responseJson.status).toBe("ready"); } catch (error) { console.error("Error during readiness check:", error); throw error; } }); test("SSE connection and tool call", async () => { // Step 1: Connect to SSE endpoint const sseResponse = await fetch(`${serverUrl}/sse`); expect(sseResponse.status).toBe(200); // Get the session ID from the endpoint event const reader = sseResponse.body?.getReader(); const decoder = new TextDecoder(); let sessionId: string | undefined; while (true) { const { done, value } = await reader!.read(); if (done) break; const chunk = decoder.decode(value); const lines = chunk.split("\n"); for (const line of lines) { if (line.startsWith("event: endpoint")) { const dataLine = lines[lines.indexOf(line) + 1]; const data = dataLine.replace("data: ", ""); sessionId = data.split("sessionId=")[1]; break; } } if (sessionId) break; } expect(sessionId).toBeDefined(); // Step 2: Make a tool call using the session ID const toolCallResponse = await fetch( `${serverUrl}/messages?sessionId=${sessionId}`, { method: "POST", headers: { "Content-Type": "application/json", }, body: JSON.stringify({ jsonrpc: "2.0", id: 1234, method: "tools/call", params: { name: "kubectl_get", arguments: { resourceType: "pods", namespace: "default", output: "json", }, }, }), } ); expect(toolCallResponse.status).toBe(202); expect(await toolCallResponse.text()).toBe("Accepted"); // Step 3: Read the SSE response for the tool call result let toolCallResult: any; while (true) { const { done, value } = await reader!.read(); if (done) break; const chunk = decoder.decode(value); const lines = chunk.split("\n"); for (const line of lines) { if (line.startsWith("event: message")) { const dataLine = lines[lines.indexOf(line) + 1]; toolCallResult = JSON.parse(dataLine.replace("data: ", "")); break; } } if (toolCallResult) break; } // Verify the tool call result expect(toolCallResult.jsonrpc).toBe("2.0"); expect(toolCallResult.id).toBe(1234); if (toolCallResult.result) { expect(toolCallResult.result.content[0].type).toBe("text"); const responseText = toolCallResult.result.content[0].text; // If it's JSON, parse it and check structure try { const parsedResponse = JSON.parse(responseText); expect(parsedResponse.items).toBeDefined(); expect(Array.isArray(parsedResponse.items)).toBe(true); } catch (e) { // If not JSON (formatted output), just check it contains pod data expect(responseText).toContain("NAME"); expect(responseText).toContain("NAMESPACE"); } } }); });

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Flux159/mcp-server-kubernetes'

If you have feedback or need assistance with the MCP directory API, please join our Discord server