sse.test.ts•7.81 kB
import { expect, test, describe, beforeAll, afterAll } from "vitest";
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { startSSEServer } from "../src/utils/sse.js";
import {
CallToolRequestSchema,
ListToolsRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
import { KubernetesManager } from "../src/utils/kubernetes-manager.js";
import { kubectlGetSchema, kubectlGet } from "../src/tools/kubectl-get.js";
import express from "express";
import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
import { findAvailablePort } from "./port-helper.js";
// Modified version of startSSEServer that returns the Express app
function startSSEServerWithReturn(server: Server): Promise<any> {
return new Promise((resolve, reject) => {
const app = express();
let transports: Array<SSEServerTransport> = [];
app.get("/sse", async (req, res) => {
const transport = new SSEServerTransport("/messages", res);
transports.push(transport);
await server.connect(transport);
});
app.post("/messages", (req, res) => {
const transport = transports.find(
(t) => t.sessionId === req.query.sessionId
);
if (transport) {
transport.handlePostMessage(req, res);
} else {
res
.status(404)
.send("Not found. Must pass valid sessionId as query param.");
}
});
app.get("/health", async (req, res) => {
res.json({ status: "ok" });
});
app.get("/ready", async (req, res) => {
// For Kubernetes readiness probe
try {
res.json({
status: "ready",
timestamp: new Date().toISOString(),
service: "mcp-kubernetes-server"
});
} catch (error) {
console.error("Readiness check failed:", error);
res.status(503).json({
status: "not ready",
reason: "Server initialization incomplete",
timestamp: new Date().toISOString()
});
}
});
const port = parseInt(process.env.PORT || "3000");
const serverInstance = app.listen(port, () => {
console.log(
`mcp-kubernetes-server is listening on port ${port}\nUse the following url to connect to the server:\nhttp://localhost:${port}/sse`
);
resolve(serverInstance);
});
serverInstance.on("error", reject);
});
}
describe("SSE transport", () => {
let server: Server;
let serverUrl: string;
let actualPort: number;
let expressApp: any;
beforeAll(async () => {
const k8sManager = new KubernetesManager();
// Create a minimal server with just the kubectl_get tool
server = new Server(
{
name: "test-server",
version: "1.0.0",
},
{
capabilities: {
tools: {},
},
}
);
// Set up the kubectl_list tool
server.setRequestHandler(ListToolsRequestSchema, async () => {
return {
tools: [kubectlGetSchema],
};
});
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: input = {} } = request.params;
switch (name) {
case "kubectl_get":
return await kubectlGet(
k8sManager,
input as {
resourceType: string;
name?: string;
namespace?: string;
output?: string;
allNamespaces?: boolean;
labelSelector?: string;
fieldSelector?: string;
sortBy?: string;
}
);
default:
throw new Error(`Unknown tool: ${name}`);
}
});
// Find an available port instead of using a fixed one
actualPort = await findAvailablePort(3001);
process.env.PORT = actualPort.toString();
// Start the SSE server and get the Express app reference
expressApp = await startSSEServerWithReturn(server);
serverUrl = `http://localhost:${actualPort}`;
// Wait a bit for server to fully start
await new Promise((resolve) => setTimeout(resolve, 1000));
});
afterAll(async () => {
try {
if (expressApp && expressApp.close) {
expressApp.close();
}
await server.close();
} catch (error) {
console.warn("Error during cleanup:", error);
}
});
test("should respond to readiness check", async () => {
try {
// Send a GET request to the /ready endpoint
const readyUrl = `${serverUrl}/ready`;
const response = await fetch(readyUrl, {
method: "GET",
headers: {
accept: "application/json",
},
});
expect(response.status).toBe(200);
const responseJson = await response.json();
expect(responseJson.status).toBe("ready");
} catch (error) {
console.error("Error during readiness check:", error);
throw error;
}
});
test("SSE connection and tool call", async () => {
// Step 1: Connect to SSE endpoint
const sseResponse = await fetch(`${serverUrl}/sse`);
expect(sseResponse.status).toBe(200);
// Get the session ID from the endpoint event
const reader = sseResponse.body?.getReader();
const decoder = new TextDecoder();
let sessionId: string | undefined;
while (true) {
const { done, value } = await reader!.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split("\n");
for (const line of lines) {
if (line.startsWith("event: endpoint")) {
const dataLine = lines[lines.indexOf(line) + 1];
const data = dataLine.replace("data: ", "");
sessionId = data.split("sessionId=")[1];
break;
}
}
if (sessionId) break;
}
expect(sessionId).toBeDefined();
// Step 2: Make a tool call using the session ID
const toolCallResponse = await fetch(
`${serverUrl}/messages?sessionId=${sessionId}`,
{
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
jsonrpc: "2.0",
id: 1234,
method: "tools/call",
params: {
name: "kubectl_get",
arguments: {
resourceType: "pods",
namespace: "default",
output: "json",
},
},
}),
}
);
expect(toolCallResponse.status).toBe(202);
expect(await toolCallResponse.text()).toBe("Accepted");
// Step 3: Read the SSE response for the tool call result
let toolCallResult: any;
while (true) {
const { done, value } = await reader!.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split("\n");
for (const line of lines) {
if (line.startsWith("event: message")) {
const dataLine = lines[lines.indexOf(line) + 1];
toolCallResult = JSON.parse(dataLine.replace("data: ", ""));
break;
}
}
if (toolCallResult) break;
}
// Verify the tool call result
expect(toolCallResult.jsonrpc).toBe("2.0");
expect(toolCallResult.id).toBe(1234);
if (toolCallResult.result) {
expect(toolCallResult.result.content[0].type).toBe("text");
const responseText = toolCallResult.result.content[0].text;
// If it's JSON, parse it and check structure
try {
const parsedResponse = JSON.parse(responseText);
expect(parsedResponse.items).toBeDefined();
expect(Array.isArray(parsedResponse.items)).toBe(true);
} catch (e) {
// If not JSON (formatted output), just check it contains pod data
expect(responseText).toContain("NAME");
expect(responseText).toContain("NAMESPACE");
}
}
});
});