Skip to main content
Glama

Puzzlebox

by cliffhall
sse-client-utils.test.ts16.2 kB
import http from "http"; import { AddressInfo } from "net"; import { Readable } from "stream"; import { JsonRpcResponse, ToolsListJsonResponse } from "../types.ts"; import { establishSseSession, sendJsonRpcMessage, waitForSseResponse, ActiveSseConnection, } from "../sse-client-utils.ts"; // Helper to delay execution const delay = (ms: number) => new Promise((res) => setTimeout(res, ms)); describe("SSE Client Utilities", () => { let server: http.Server; let serverAddress: AddressInfo; let activeSseConnections: Map<string, ActiveSseConnection>; // To hold onto client connections for manipulation in tests let sseResponseSocket: http.ServerResponse | null = null; // --- Setup and Teardown --- beforeEach((done) => { activeSseConnections = new Map(); server = http.createServer((req, res) => { // SSE Endpoint if (req.url === "/sse" && req.method === "GET") { res.writeHead(200, { "Content-Type": "text/event-stream", Connection: "keep-alive", "Cache-Control": "no-cache", }); // Store the response socket to send events later sseResponseSocket = res; // Send the initial endpoint event with a session ID const sessionId = "test-session-id-12345"; res.write(`event: endpoint\n`); res.write(`data: /sse?sessionId=${sessionId}\n\n`); // Keep the connection open. Don't call res.end(). req.on("close", () => { sseResponseSocket = null; }); return; } // Message Endpoint if (req.url?.startsWith("/message") && req.method === "POST") { // Acknowledge the message immediately res.writeHead(202, { "Content-Type": "application/json" }); res.end(JSON.stringify({ status: "accepted" })); return; } // Default handler for unexpected requests res.writeHead(404); res.end("Not Found"); }); server.listen(0, "127.0.0.1", () => { serverAddress = server.address() as AddressInfo; done(); }); }); afterEach((done) => { // Clean up all active connections activeSseConnections.forEach((conn) => { conn.request.destroy(); }); activeSseConnections.clear(); sseResponseSocket = null; server.close(() => { done(); }); }); // --- Test Cases --- describe("establishSseSession", () => { it("should establish an SSE session and return a session ID", async () => { const { sessionId, sseResponseStream } = await establishSseSession( serverAddress, activeSseConnections, ); expect(sessionId).toBe("test-session-id-12345"); expect(sseResponseStream).toBeInstanceOf(http.IncomingMessage); expect(activeSseConnections.has(sessionId)).toBe(true); expect(activeSseConnections.get(sessionId)?.response).toBe( sseResponseStream, ); }); it("should reject if the server returns a non-200 status", async () => { // Stop the real server and start a faulty one await new Promise<void>((res) => server.close(() => res())); server = http.createServer((req, res) => { res.writeHead(500); res.end(); }); await new Promise<void>((res) => server.listen(serverAddress.port, serverAddress.address, res), ); await expect( establishSseSession(serverAddress, activeSseConnections), ).rejects.toThrow("SSE connection failed with status 500"); }); it("should reject on timeout if no session ID is received", async () => { // Stop the real server and start one that never sends the endpoint event await new Promise<void>((res) => server.close(() => res())); server = http.createServer((req, res) => { res.writeHead(200, { "Content-Type": "text/event-stream" }); // Never sends the event }); await new Promise<void>((res) => server.listen(serverAddress.port, serverAddress.address, res), ); // Use jest fake timers to avoid waiting the full 5s jest.useFakeTimers(); const promise = establishSseSession(serverAddress, activeSseConnections); jest.advanceTimersByTime(5001); await expect(promise).rejects.toThrow( "Timeout waiting for SSE endpoint event (session ID)", ); jest.useRealTimers(); }); it("should reject on a connection error", async () => { // This covers the clientRequest.on('error') handler await new Promise<void>((res) => server.close(() => res())); await expect( establishSseSession(serverAddress, activeSseConnections), ).rejects.toThrow(/ECONNREFUSED/); }); it("should ignore other SSE events before the endpoint event", async () => { // This covers the `else` block in the data handler when an event is not the 'endpoint' await new Promise<void>((res) => server.close(() => res())); server = http.createServer((req, res) => { if (req.url === "/sse") { res.writeHead(200, { "Content-Type": "text/event-stream" }); // Send a different event first res.write("event: ping\n"); res.write("data: keepalive\n\n"); // Then send the real one const sessionId = "session-after-ping"; res.write(`event: endpoint\n`); res.write(`data: /sse?sessionId=${sessionId}\n\n`); } }); await new Promise<void>((res) => server.listen(serverAddress.port, serverAddress.address, res), ); const { sessionId } = await establishSseSession( serverAddress, activeSseConnections, ); expect(sessionId).toBe("session-after-ping"); }); // NEW! This test covers line 178 it("should ignore request errors after the session is established", async () => { const { sessionId } = await establishSseSession( serverAddress, activeSseConnections, ); const connection = activeSseConnections.get(sessionId); expect(connection).toBeDefined(); // This error occurs *after* the promise has settled. The code is // designed to ignore it, preventing an unhandled rejection. connection!.request.emit("error", new Error("late error")); // We just need to ensure no crash happens. await delay(20); expect(activeSseConnections.has(sessionId)).toBe(true); }); }); describe("sendJsonRpcMessage", () => { it("should send a POST request and resolve on 202 status", async () => { const payload = { jsonrpc: "2.0" as const, id: 1, method: "test" }; await expect( sendJsonRpcMessage(serverAddress, "test-session", payload), ).resolves.toBeUndefined(); }); it("should reject if the server returns a non-202 status", async () => { await new Promise<void>((res) => server.close(() => res())); server = http.createServer((req, res) => { res.writeHead(400, { "Content-Type": "application/json" }); res.end(JSON.stringify({ error: "bad request" })); }); await new Promise<void>((res) => server.listen(serverAddress.port, serverAddress.address, res), ); const payload = { jsonrpc: "2.0" as const, id: 2, method: "test" }; await expect( sendJsonRpcMessage(serverAddress, "test-session", payload), ).rejects.toThrow( 'POST /message expected status 202 but got 400. Body: {"error":"bad request"}', ); }); it("should reject on a connection error", async () => { // This covers the clientRequest.on('error') handler await new Promise<void>((res) => server.close(() => res())); const payload = { jsonrpc: "2.0" as const, id: 3, method: "test" }; await expect( sendJsonRpcMessage(serverAddress, "test-session", payload), ).rejects.toThrow(/ECONNREFUSED/); }); it("should reject on a response stream error", async () => { // This covers the res.on('error') handler for the POST response await new Promise<void>((res) => server.close(() => res())); server = http.createServer((req, res) => { if (req.method === "POST") { res.writeHead(202); // Send headers res.socket?.destroy(new Error("Socket error during response")); } }); await new Promise<void>((res) => server.listen(serverAddress.port, serverAddress.address, res), ); const payload = { jsonrpc: "2.0" as const, id: 4, method: "test" }; // FIX: The client receives a 'socket hang up' error, not the server-side error message. await expect( sendJsonRpcMessage(serverAddress, "test-session", payload), ).rejects.toThrow("socket hang up"); }); // NEW! This test covers lines 226-229 it("should reject if the response stream errors while reading a non-202 response", async () => { await new Promise<void>((res) => server.close(() => res())); server = http.createServer((req, res) => { if (req.method === "POST") { res.writeHead(500); // Send a non-202 status // Destroy the socket before the body can be read. This triggers // the `res.on('error')` handler in the client. res.destroy(); } }); await new Promise<void>((res) => server.listen(serverAddress.port, serverAddress.address, res), ); const payload = { jsonrpc: "2.0" as const, id: 5, method: "test" }; // The error is a generic 'socket hang up' because the request is aborted. await expect( sendJsonRpcMessage(serverAddress, "test-session", payload), ).rejects.toThrow("socket hang up"); }); }); describe("waitForSseResponse", () => { let mockStream: Readable; beforeEach(() => { // Create a mock stream that we can push data to mockStream = new Readable({ read() {} }); }); it("should resolve with the correct parsed response when its ID matches", async () => { const expectedId = 123; const responsePayload: JsonRpcResponse = { jsonrpc: "2.0", id: expectedId, result: { success: true }, }; const promise = waitForSseResponse<JsonRpcResponse>( mockStream as http.IncomingMessage, expectedId, ); mockStream.push(`event: message\n`); mockStream.push(`data: ${JSON.stringify(responsePayload)}\n\n`); await expect(promise).resolves.toEqual(responsePayload); }); it("should ignore other messages and wait for the correct one", async () => { const expectedId = 456; const responsePayload: JsonRpcResponse = { jsonrpc: "2.0", id: expectedId, result: "final answer", }; const ignoredPayload = { jsonrpc: "2.0", id: 999, result: "wrong" }; const notification = { jsonrpc: "2.0", method: "notifications/progress", params: { percent: 50 }, }; const promise = waitForSseResponse<JsonRpcResponse>( mockStream as http.IncomingMessage, expectedId, ); mockStream.push(`data: ${JSON.stringify(ignoredPayload)}\n\n`); mockStream.push(`data: ${JSON.stringify(notification)}\n\n`); await delay(10); mockStream.push(`data: ${JSON.stringify(responsePayload)}\n\n`); await expect(promise).resolves.toEqual(responsePayload); }); it("should reject on timeout if the response never arrives", async () => { jest.useFakeTimers(); const promise = waitForSseResponse( mockStream as http.IncomingMessage, 789, 5000, ); jest.advanceTimersByTime(5001); await expect(promise).rejects.toThrow( "Timeout waiting for response id 789 on SSE stream", ); jest.useRealTimers(); }); it("should reject if the stream closes prematurely", async () => { const promise = waitForSseResponse( mockStream as http.IncomingMessage, 101, ); await delay(10); mockStream.emit("close"); await expect(promise).rejects.toThrow( "SSE stream closed/ended unexpectedly while waiting for response id 101", ); }); it("should reject if the stream emits an error", async () => { // This covers the errorHandler const promise = waitForSseResponse( mockStream as http.IncomingMessage, 102, ); const testError = new Error("Test stream error"); await delay(10); mockStream.emit("error", testError); await expect(promise).rejects.toThrow(testError); }); it("should ignore messages with invalid JSON and continue waiting", async () => { // This covers the JSON.parse catch block const expectedId = 789; const responsePayload: JsonRpcResponse = { jsonrpc: "2.0", id: expectedId, result: "finally", }; const promise = waitForSseResponse<JsonRpcResponse>( mockStream as http.IncomingMessage, expectedId, ); mockStream.push("data: {this is not json}\n\n"); await delay(10); mockStream.push(`data: ${JSON.stringify(responsePayload)}\n\n`); await expect(promise).resolves.toEqual(responsePayload); }); it("should ignore SSE messages without a data field", async () => { // This covers the `else` block for when sseData is null const expectedId = 890; const responsePayload: JsonRpcResponse = { jsonrpc: "2.0", id: expectedId, result: "got it", }; const promise = waitForSseResponse<JsonRpcResponse>( mockStream as http.IncomingMessage, expectedId, ); mockStream.push("event: ping\n\n"); await delay(10); mockStream.push(`data: ${JSON.stringify(responsePayload)}\n\n`); await expect(promise).resolves.toEqual(responsePayload); }); // NEW! This test covers line 398 it("should ignore SSE comments and continue waiting", async () => { const expectedId = 999; const responsePayload: JsonRpcResponse = { jsonrpc: "2.0", id: expectedId, result: "after comment", }; const promise = waitForSseResponse<JsonRpcResponse>( mockStream as http.IncomingMessage, expectedId, ); // SSE comments start with a colon. They should be ignored. mockStream.push(": this is a server comment\n\n"); await delay(10); mockStream.push(`data: ${JSON.stringify(responsePayload)}\n\n`); await expect(promise).resolves.toEqual(responsePayload); }); }); // --- Integration Test --- describe("Full Request-Response Cycle", () => { it("should successfully get a response for a request", async () => { // 1. Establish SSE connection const { sessionId, sseResponseStream } = await establishSseSession( serverAddress, activeSseConnections, ); expect(sessionId).toBe("test-session-id-12345"); // 2. Concurrently wait for the response on the SSE stream const requestId = 1; const waitPromise = waitForSseResponse<ToolsListJsonResponse>( sseResponseStream, requestId, 5000, ); // 3. Send the JSON-RPC message via POST const requestPayload = { jsonrpc: "2.0" as const, id: requestId, method: "tools/list", }; await sendJsonRpcMessage(serverAddress, sessionId, requestPayload); // 4. Simulate the server processing the POST and sending the response await delay(50); expect(sseResponseSocket).not.toBeNull(); const responsePayload: ToolsListJsonResponse = { jsonrpc: "2.0", id: requestId, result: { tools: [ { name: "calculator", description: "A calculator", inputSchema: {}, }, ], }, }; sseResponseSocket?.write(`data: ${JSON.stringify(responsePayload)}\n\n`); // 5. Await the response and verify it const response = await waitPromise; expect(response.id).toBe(requestId); expect(response.result.tools[0].name).toBe("calculator"); }); }); });

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/cliffhall/puzzlebox'

If you have feedback or need assistance with the MCP directory API, please join our Discord server