Skip to main content
Glama

MCP TypeScript SDK

streamableHttp.test.ts46 kB
import { createServer, type Server, IncomingMessage, ServerResponse } from "node:http"; import { AddressInfo } from "node:net"; import { randomUUID } from "node:crypto"; import { EventStore, StreamableHTTPServerTransport, EventId, StreamId } from "./streamableHttp.js"; import { McpServer } from "./mcp.js"; import { CallToolResult, JSONRPCMessage } from "../types.js"; import { z } from "zod"; import { AuthInfo } from "./auth/types.js"; /** * Test server configuration for StreamableHTTPServerTransport tests */ interface TestServerConfig { sessionIdGenerator: (() => string) | undefined; enableJsonResponse?: boolean; customRequestHandler?: (req: IncomingMessage, res: ServerResponse, parsedBody?: unknown) => Promise<void>; eventStore?: EventStore; } /** * Helper to create and start test HTTP server with MCP setup */ async function createTestServer(config: TestServerConfig = { sessionIdGenerator: (() => randomUUID()) }): Promise<{ server: Server; transport: StreamableHTTPServerTransport; mcpServer: McpServer; baseUrl: URL; }> { const mcpServer = new McpServer( { name: "test-server", version: "1.0.0" }, { capabilities: { logging: {} } } ); mcpServer.tool( "greet", "A simple greeting tool", { name: z.string().describe("Name to greet") }, async ({ name }): Promise<CallToolResult> => { return { content: [{ type: "text", text: `Hello, ${name}!` }] }; } ); const transport = new StreamableHTTPServerTransport({ sessionIdGenerator: config.sessionIdGenerator, enableJsonResponse: config.enableJsonResponse ?? false, eventStore: config.eventStore }); await mcpServer.connect(transport); const server = createServer(async (req, res) => { try { if (config.customRequestHandler) { await config.customRequestHandler(req, res); } else { await transport.handleRequest(req, res); } } catch (error) { console.error("Error handling request:", error); if (!res.headersSent) res.writeHead(500).end(); } }); const baseUrl = await new Promise<URL>((resolve) => { server.listen(0, "127.0.0.1", () => { const addr = server.address() as AddressInfo; resolve(new URL(`http://127.0.0.1:${addr.port}`)); }); }); return { server, transport, mcpServer, baseUrl }; } /** * Helper to create and start authenticated test HTTP server with MCP setup */ async function createTestAuthServer(config: TestServerConfig = { sessionIdGenerator: (() => randomUUID()) }): Promise<{ server: Server; transport: StreamableHTTPServerTransport; mcpServer: McpServer; baseUrl: URL; }> { const mcpServer = new McpServer( { name: "test-server", version: "1.0.0" }, { capabilities: { logging: {} } } ); mcpServer.tool( "profile", "A user profile data tool", { active: z.boolean().describe("Profile status") }, async ({ active }, { authInfo }): Promise<CallToolResult> => { return { content: [{ type: "text", text: `${active ? 'Active' : 'Inactive'} profile from token: ${authInfo?.token}!` }] }; } ); const transport = new StreamableHTTPServerTransport({ sessionIdGenerator: config.sessionIdGenerator, enableJsonResponse: config.enableJsonResponse ?? false, eventStore: config.eventStore }); await mcpServer.connect(transport); const server = createServer(async (req: IncomingMessage & { auth?: AuthInfo }, res) => { try { if (config.customRequestHandler) { await config.customRequestHandler(req, res); } else { req.auth = { token: req.headers["authorization"]?.split(" ")[1] } as AuthInfo; await transport.handleRequest(req, res); } } catch (error) { console.error("Error handling request:", error); if (!res.headersSent) res.writeHead(500).end(); } }); const baseUrl = await new Promise<URL>((resolve) => { server.listen(0, "127.0.0.1", () => { const addr = server.address() as AddressInfo; resolve(new URL(`http://127.0.0.1:${addr.port}`)); }); }); return { server, transport, mcpServer, baseUrl }; } /** * Helper to stop test server */ async function stopTestServer({ server, transport }: { server: Server; transport: StreamableHTTPServerTransport }): Promise<void> { // First close the transport to ensure all SSE streams are closed await transport.close(); // Close the server without waiting indefinitely server.close(); } /** * Common test messages */ const TEST_MESSAGES = { initialize: { jsonrpc: "2.0", method: "initialize", params: { clientInfo: { name: "test-client", version: "1.0" }, protocolVersion: "2025-03-26", capabilities: { }, }, id: "init-1", } as JSONRPCMessage, toolsList: { jsonrpc: "2.0", method: "tools/list", params: {}, id: "tools-1", } as JSONRPCMessage }; /** * Helper to extract text from SSE response * Note: Can only be called once per response stream. For multiple reads, * get the reader manually and read multiple times. */ async function readSSEEvent(response: Response): Promise<string> { const reader = response.body?.getReader(); const { value } = await reader!.read(); return new TextDecoder().decode(value); } /** * Helper to send JSON-RPC request */ async function sendPostRequest(baseUrl: URL, message: JSONRPCMessage | JSONRPCMessage[], sessionId?: string, extraHeaders?: Record<string, string>): Promise<Response> { const headers: Record<string, string> = { "Content-Type": "application/json", Accept: "application/json, text/event-stream", ...extraHeaders }; if (sessionId) { headers["mcp-session-id"] = sessionId; // After initialization, include the protocol version header headers["mcp-protocol-version"] = "2025-03-26"; } return fetch(baseUrl, { method: "POST", headers, body: JSON.stringify(message), }); } function expectErrorResponse(data: unknown, expectedCode: number, expectedMessagePattern: RegExp): void { expect(data).toMatchObject({ jsonrpc: "2.0", error: expect.objectContaining({ code: expectedCode, message: expect.stringMatching(expectedMessagePattern), }), }); } describe("StreamableHTTPServerTransport", () => { let server: Server; let transport: StreamableHTTPServerTransport; let baseUrl: URL; let sessionId: string; beforeEach(async () => { const result = await createTestServer(); server = result.server; transport = result.transport; baseUrl = result.baseUrl; }); afterEach(async () => { await stopTestServer({ server, transport }); }); async function initializeServer(): Promise<string> { const response = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize); expect(response.status).toBe(200); const newSessionId = response.headers.get("mcp-session-id"); expect(newSessionId).toBeDefined(); return newSessionId as string; } it("should initialize server and generate session ID", async () => { const response = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize); expect(response.status).toBe(200); expect(response.headers.get("content-type")).toBe("text/event-stream"); expect(response.headers.get("mcp-session-id")).toBeDefined(); }); it("should reject second initialization request", async () => { // First initialize const sessionId = await initializeServer(); expect(sessionId).toBeDefined(); // Try second initialize const secondInitMessage = { ...TEST_MESSAGES.initialize, id: "second-init" }; const response = await sendPostRequest(baseUrl, secondInitMessage); expect(response.status).toBe(400); const errorData = await response.json(); expectErrorResponse(errorData, -32600, /Server already initialized/); }); it("should reject batch initialize request", async () => { const batchInitMessages: JSONRPCMessage[] = [ TEST_MESSAGES.initialize, { jsonrpc: "2.0", method: "initialize", params: { clientInfo: { name: "test-client-2", version: "1.0" }, protocolVersion: "2025-03-26", }, id: "init-2", } ]; const response = await sendPostRequest(baseUrl, batchInitMessages); expect(response.status).toBe(400); const errorData = await response.json(); expectErrorResponse(errorData, -32600, /Only one initialization request is allowed/); }); it("should handle post requests via sse response correctly", async () => { sessionId = await initializeServer(); const response = await sendPostRequest(baseUrl, TEST_MESSAGES.toolsList, sessionId); expect(response.status).toBe(200); // Read the SSE stream for the response const text = await readSSEEvent(response); // Parse the SSE event const eventLines = text.split("\n"); const dataLine = eventLines.find(line => line.startsWith("data:")); expect(dataLine).toBeDefined(); const eventData = JSON.parse(dataLine!.substring(5)); expect(eventData).toMatchObject({ jsonrpc: "2.0", result: expect.objectContaining({ tools: expect.arrayContaining([ expect.objectContaining({ name: "greet", description: "A simple greeting tool", }), ]), }), id: "tools-1", }); }); it("should call a tool and return the result", async () => { sessionId = await initializeServer(); const toolCallMessage: JSONRPCMessage = { jsonrpc: "2.0", method: "tools/call", params: { name: "greet", arguments: { name: "Test User", }, }, id: "call-1", }; const response = await sendPostRequest(baseUrl, toolCallMessage, sessionId); expect(response.status).toBe(200); const text = await readSSEEvent(response); const eventLines = text.split("\n"); const dataLine = eventLines.find(line => line.startsWith("data:")); expect(dataLine).toBeDefined(); const eventData = JSON.parse(dataLine!.substring(5)); expect(eventData).toMatchObject({ jsonrpc: "2.0", result: { content: [ { type: "text", text: "Hello, Test User!", }, ], }, id: "call-1", }); }); it("should reject requests without a valid session ID", async () => { const response = await sendPostRequest(baseUrl, TEST_MESSAGES.toolsList); expect(response.status).toBe(400); const errorData = await response.json(); expectErrorResponse(errorData, -32000, /Bad Request/); expect(errorData.id).toBeNull(); }); it("should reject invalid session ID", async () => { // First initialize to be in valid state await initializeServer(); // Now try with invalid session ID const response = await sendPostRequest(baseUrl, TEST_MESSAGES.toolsList, "invalid-session-id"); expect(response.status).toBe(404); const errorData = await response.json(); expectErrorResponse(errorData, -32001, /Session not found/); }); it("should establish standalone SSE stream and receive server-initiated messages", async () => { // First initialize to get a session ID sessionId = await initializeServer(); // Open a standalone SSE stream const sseResponse = await fetch(baseUrl, { method: "GET", headers: { Accept: "text/event-stream", "mcp-session-id": sessionId, "mcp-protocol-version": "2025-03-26", }, }); expect(sseResponse.status).toBe(200); expect(sseResponse.headers.get("content-type")).toBe("text/event-stream"); // Send a notification (server-initiated message) that should appear on SSE stream const notification: JSONRPCMessage = { jsonrpc: "2.0", method: "notifications/message", params: { level: "info", data: "Test notification" }, }; // Send the notification via transport await transport.send(notification); // Read from the stream and verify we got the notification const text = await readSSEEvent(sseResponse); const eventLines = text.split("\n"); const dataLine = eventLines.find(line => line.startsWith("data:")); expect(dataLine).toBeDefined(); const eventData = JSON.parse(dataLine!.substring(5)); expect(eventData).toMatchObject({ jsonrpc: "2.0", method: "notifications/message", params: { level: "info", data: "Test notification" }, }); }); it("should not close GET SSE stream after sending multiple server notifications", async () => { sessionId = await initializeServer(); // Open a standalone SSE stream const sseResponse = await fetch(baseUrl, { method: "GET", headers: { Accept: "text/event-stream", "mcp-session-id": sessionId, "mcp-protocol-version": "2025-03-26", }, }); expect(sseResponse.status).toBe(200); const reader = sseResponse.body?.getReader(); // Send multiple notifications const notification1: JSONRPCMessage = { jsonrpc: "2.0", method: "notifications/message", params: { level: "info", data: "First notification" } }; // Just send one and verify it comes through - then the stream should stay open await transport.send(notification1); const { value, done } = await reader!.read(); const text = new TextDecoder().decode(value); expect(text).toContain("First notification"); expect(done).toBe(false); // Stream should still be open }); it("should reject second SSE stream for the same session", async () => { sessionId = await initializeServer(); // Open first SSE stream const firstStream = await fetch(baseUrl, { method: "GET", headers: { Accept: "text/event-stream", "mcp-session-id": sessionId, "mcp-protocol-version": "2025-03-26", }, }); expect(firstStream.status).toBe(200); // Try to open a second SSE stream with the same session ID const secondStream = await fetch(baseUrl, { method: "GET", headers: { Accept: "text/event-stream", "mcp-session-id": sessionId, "mcp-protocol-version": "2025-03-26", }, }); // Should be rejected expect(secondStream.status).toBe(409); // Conflict const errorData = await secondStream.json(); expectErrorResponse(errorData, -32000, /Only one SSE stream is allowed per session/); }); it("should reject GET requests without Accept: text/event-stream header", async () => { sessionId = await initializeServer(); // Try GET without proper Accept header const response = await fetch(baseUrl, { method: "GET", headers: { Accept: "application/json", "mcp-session-id": sessionId, "mcp-protocol-version": "2025-03-26", }, }); expect(response.status).toBe(406); const errorData = await response.json(); expectErrorResponse(errorData, -32000, /Client must accept text\/event-stream/); }); it("should reject POST requests without proper Accept header", async () => { sessionId = await initializeServer(); // Try POST without Accept: text/event-stream const response = await fetch(baseUrl, { method: "POST", headers: { "Content-Type": "application/json", Accept: "application/json", // Missing text/event-stream "mcp-session-id": sessionId, }, body: JSON.stringify(TEST_MESSAGES.toolsList), }); expect(response.status).toBe(406); const errorData = await response.json(); expectErrorResponse(errorData, -32000, /Client must accept both application\/json and text\/event-stream/); }); it("should reject unsupported Content-Type", async () => { sessionId = await initializeServer(); // Try POST with text/plain Content-Type const response = await fetch(baseUrl, { method: "POST", headers: { "Content-Type": "text/plain", Accept: "application/json, text/event-stream", "mcp-session-id": sessionId, }, body: "This is plain text", }); expect(response.status).toBe(415); const errorData = await response.json(); expectErrorResponse(errorData, -32000, /Content-Type must be application\/json/); }); it("should handle JSON-RPC batch notification messages with 202 response", async () => { sessionId = await initializeServer(); // Send batch of notifications (no IDs) const batchNotifications: JSONRPCMessage[] = [ { jsonrpc: "2.0", method: "someNotification1", params: {} }, { jsonrpc: "2.0", method: "someNotification2", params: {} }, ]; const response = await sendPostRequest(baseUrl, batchNotifications, sessionId); expect(response.status).toBe(202); }); it("should handle batch request messages with SSE stream for responses", async () => { sessionId = await initializeServer(); // Send batch of requests const batchRequests: JSONRPCMessage[] = [ { jsonrpc: "2.0", method: "tools/list", params: {}, id: "req-1" }, { jsonrpc: "2.0", method: "tools/call", params: { name: "greet", arguments: { name: "BatchUser" } }, id: "req-2" }, ]; const response = await sendPostRequest(baseUrl, batchRequests, sessionId); expect(response.status).toBe(200); expect(response.headers.get("content-type")).toBe("text/event-stream"); const reader = response.body?.getReader(); // The responses may come in any order or together in one chunk const { value } = await reader!.read(); const text = new TextDecoder().decode(value); // Check that both responses were sent on the same stream expect(text).toContain('"id":"req-1"'); expect(text).toContain('"tools"'); // tools/list result expect(text).toContain('"id":"req-2"'); expect(text).toContain('Hello, BatchUser'); // tools/call result }); it("should properly handle invalid JSON data", async () => { sessionId = await initializeServer(); // Send invalid JSON const response = await fetch(baseUrl, { method: "POST", headers: { "Content-Type": "application/json", Accept: "application/json, text/event-stream", "mcp-session-id": sessionId, }, body: "This is not valid JSON", }); expect(response.status).toBe(400); const errorData = await response.json(); expectErrorResponse(errorData, -32700, /Parse error/); }); it("should return 400 error for invalid JSON-RPC messages", async () => { sessionId = await initializeServer(); // Invalid JSON-RPC (missing required jsonrpc version) const invalidMessage = { method: "tools/list", params: {}, id: 1 }; // missing jsonrpc version const response = await sendPostRequest(baseUrl, invalidMessage as JSONRPCMessage, sessionId); expect(response.status).toBe(400); const errorData = await response.json(); expect(errorData).toMatchObject({ jsonrpc: "2.0", error: expect.anything(), }); }); it("should reject requests to uninitialized server", async () => { // Create a new HTTP server and transport without initializing const { server: uninitializedServer, transport: uninitializedTransport, baseUrl: uninitializedUrl } = await createTestServer(); // Transport not used in test but needed for cleanup // No initialization, just send a request directly const uninitializedMessage: JSONRPCMessage = { jsonrpc: "2.0", method: "tools/list", params: {}, id: "uninitialized-test", }; // Send a request to uninitialized server const response = await sendPostRequest(uninitializedUrl, uninitializedMessage, "any-session-id"); expect(response.status).toBe(400); const errorData = await response.json(); expectErrorResponse(errorData, -32000, /Server not initialized/); // Cleanup await stopTestServer({ server: uninitializedServer, transport: uninitializedTransport }); }); it("should send response messages to the connection that sent the request", async () => { sessionId = await initializeServer(); const message1: JSONRPCMessage = { jsonrpc: "2.0", method: "tools/list", params: {}, id: "req-1" }; const message2: JSONRPCMessage = { jsonrpc: "2.0", method: "tools/call", params: { name: "greet", arguments: { name: "Connection2" } }, id: "req-2" }; // Make two concurrent fetch connections for different requests const req1 = sendPostRequest(baseUrl, message1, sessionId); const req2 = sendPostRequest(baseUrl, message2, sessionId); // Get both responses const [response1, response2] = await Promise.all([req1, req2]); const reader1 = response1.body?.getReader(); const reader2 = response2.body?.getReader(); // Read responses from each stream (requires each receives its specific response) const { value: value1 } = await reader1!.read(); const text1 = new TextDecoder().decode(value1); expect(text1).toContain('"id":"req-1"'); expect(text1).toContain('"tools"'); // tools/list result const { value: value2 } = await reader2!.read(); const text2 = new TextDecoder().decode(value2); expect(text2).toContain('"id":"req-2"'); expect(text2).toContain('Hello, Connection2'); // tools/call result }); it("should keep stream open after sending server notifications", async () => { sessionId = await initializeServer(); // Open a standalone SSE stream const sseResponse = await fetch(baseUrl, { method: "GET", headers: { Accept: "text/event-stream", "mcp-session-id": sessionId, "mcp-protocol-version": "2025-03-26", }, }); // Send several server-initiated notifications await transport.send({ jsonrpc: "2.0", method: "notifications/message", params: { level: "info", data: "First notification" }, }); await transport.send({ jsonrpc: "2.0", method: "notifications/message", params: { level: "info", data: "Second notification" }, }); // Stream should still be open - it should not close after sending notifications expect(sseResponse.bodyUsed).toBe(false); }); // The current implementation will close the entire transport for DELETE // Creating a temporary transport/server where we don't care if it gets closed it("should properly handle DELETE requests and close session", async () => { // Setup a temporary server for this test const tempResult = await createTestServer(); const tempServer = tempResult.server; const tempUrl = tempResult.baseUrl; // Initialize to get a session ID const initResponse = await sendPostRequest(tempUrl, TEST_MESSAGES.initialize); const tempSessionId = initResponse.headers.get("mcp-session-id"); // Now DELETE the session const deleteResponse = await fetch(tempUrl, { method: "DELETE", headers: { "mcp-session-id": tempSessionId || "", "mcp-protocol-version": "2025-03-26", }, }); expect(deleteResponse.status).toBe(200); // Clean up - don't wait indefinitely for server close tempServer.close(); }); it("should reject DELETE requests with invalid session ID", async () => { // Initialize the server first to activate it sessionId = await initializeServer(); // Try to delete with invalid session ID const response = await fetch(baseUrl, { method: "DELETE", headers: { "mcp-session-id": "invalid-session-id", "mcp-protocol-version": "2025-03-26", }, }); expect(response.status).toBe(404); const errorData = await response.json(); expectErrorResponse(errorData, -32001, /Session not found/); }); describe("protocol version header validation", () => { it("should accept requests with matching protocol version", async () => { sessionId = await initializeServer(); // Send request with matching protocol version const response = await sendPostRequest(baseUrl, TEST_MESSAGES.toolsList, sessionId); expect(response.status).toBe(200); }); it("should accept requests without protocol version header", async () => { sessionId = await initializeServer(); // Send request without protocol version header const response = await fetch(baseUrl, { method: "POST", headers: { "Content-Type": "application/json", Accept: "application/json, text/event-stream", "mcp-session-id": sessionId, // No mcp-protocol-version header }, body: JSON.stringify(TEST_MESSAGES.toolsList), }); expect(response.status).toBe(200); }); it("should reject requests with unsupported protocol version", async () => { sessionId = await initializeServer(); // Send request with unsupported protocol version const response = await fetch(baseUrl, { method: "POST", headers: { "Content-Type": "application/json", Accept: "application/json, text/event-stream", "mcp-session-id": sessionId, "mcp-protocol-version": "1999-01-01", // Unsupported version }, body: JSON.stringify(TEST_MESSAGES.toolsList), }); expect(response.status).toBe(400); const errorData = await response.json(); expectErrorResponse(errorData, -32000, /Bad Request: Unsupported protocol version \(supported versions: .+\)/); }); it("should accept when protocol version differs from negotiated version", async () => { sessionId = await initializeServer(); // Spy on console.warn to verify warning is logged const warnSpy = jest.spyOn(console, 'warn').mockImplementation(); // Send request with different but supported protocol version const response = await fetch(baseUrl, { method: "POST", headers: { "Content-Type": "application/json", Accept: "application/json, text/event-stream", "mcp-session-id": sessionId, "mcp-protocol-version": "2024-11-05", // Different but supported version }, body: JSON.stringify(TEST_MESSAGES.toolsList), }); // Request should still succeed expect(response.status).toBe(200); warnSpy.mockRestore(); }); it("should handle protocol version validation for GET requests", async () => { sessionId = await initializeServer(); // GET request with unsupported protocol version const response = await fetch(baseUrl, { method: "GET", headers: { Accept: "text/event-stream", "mcp-session-id": sessionId, "mcp-protocol-version": "invalid-version", }, }); expect(response.status).toBe(400); const errorData = await response.json(); expectErrorResponse(errorData, -32000, /Bad Request: Unsupported protocol version \(supported versions: .+\)/); }); it("should handle protocol version validation for DELETE requests", async () => { sessionId = await initializeServer(); // DELETE request with unsupported protocol version const response = await fetch(baseUrl, { method: "DELETE", headers: { "mcp-session-id": sessionId, "mcp-protocol-version": "invalid-version", }, }); expect(response.status).toBe(400); const errorData = await response.json(); expectErrorResponse(errorData, -32000, /Bad Request: Unsupported protocol version \(supported versions: .+\)/); }); }); }); describe("StreamableHTTPServerTransport with AuthInfo", () => { let server: Server; let transport: StreamableHTTPServerTransport; let baseUrl: URL; let sessionId: string; beforeEach(async () => { const result = await createTestAuthServer(); server = result.server; transport = result.transport; baseUrl = result.baseUrl; }); afterEach(async () => { await stopTestServer({ server, transport }); }); async function initializeServer(): Promise<string> { const response = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize); expect(response.status).toBe(200); const newSessionId = response.headers.get("mcp-session-id"); expect(newSessionId).toBeDefined(); return newSessionId as string; } it("should call a tool with authInfo", async () => { sessionId = await initializeServer(); const toolCallMessage: JSONRPCMessage = { jsonrpc: "2.0", method: "tools/call", params: { name: "profile", arguments: {active: true}, }, id: "call-1", }; const response = await sendPostRequest(baseUrl, toolCallMessage, sessionId, {'authorization': 'Bearer test-token'}); expect(response.status).toBe(200); const text = await readSSEEvent(response); const eventLines = text.split("\n"); const dataLine = eventLines.find(line => line.startsWith("data:")); expect(dataLine).toBeDefined(); const eventData = JSON.parse(dataLine!.substring(5)); expect(eventData).toMatchObject({ jsonrpc: "2.0", result: { content: [ { type: "text", text: "Active profile from token: test-token!", }, ], }, id: "call-1", }); }); it("should calls tool without authInfo when it is optional", async () => { sessionId = await initializeServer(); const toolCallMessage: JSONRPCMessage = { jsonrpc: "2.0", method: "tools/call", params: { name: "profile", arguments: {active: false}, }, id: "call-1", }; const response = await sendPostRequest(baseUrl, toolCallMessage, sessionId); expect(response.status).toBe(200); const text = await readSSEEvent(response); const eventLines = text.split("\n"); const dataLine = eventLines.find(line => line.startsWith("data:")); expect(dataLine).toBeDefined(); const eventData = JSON.parse(dataLine!.substring(5)); expect(eventData).toMatchObject({ jsonrpc: "2.0", result: { content: [ { type: "text", text: "Inactive profile from token: undefined!", }, ], }, id: "call-1", }); }); }); // Test JSON Response Mode describe("StreamableHTTPServerTransport with JSON Response Mode", () => { let server: Server; let transport: StreamableHTTPServerTransport; let baseUrl: URL; let sessionId: string; beforeEach(async () => { const result = await createTestServer({ sessionIdGenerator: (() => randomUUID()), enableJsonResponse: true }); server = result.server; transport = result.transport; baseUrl = result.baseUrl; // Initialize and get session ID const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize); sessionId = initResponse.headers.get("mcp-session-id") as string; }); afterEach(async () => { await stopTestServer({ server, transport }); }); it("should return JSON response for a single request", async () => { const toolsListMessage: JSONRPCMessage = { jsonrpc: "2.0", method: "tools/list", params: {}, id: "json-req-1", }; const response = await sendPostRequest(baseUrl, toolsListMessage, sessionId); expect(response.status).toBe(200); expect(response.headers.get("content-type")).toBe("application/json"); const result = await response.json(); expect(result).toMatchObject({ jsonrpc: "2.0", result: expect.objectContaining({ tools: expect.arrayContaining([ expect.objectContaining({ name: "greet" }) ]) }), id: "json-req-1" }); }); it("should return JSON response for batch requests", async () => { const batchMessages: JSONRPCMessage[] = [ { jsonrpc: "2.0", method: "tools/list", params: {}, id: "batch-1" }, { jsonrpc: "2.0", method: "tools/call", params: { name: "greet", arguments: { name: "JSON" } }, id: "batch-2" } ]; const response = await sendPostRequest(baseUrl, batchMessages, sessionId); expect(response.status).toBe(200); expect(response.headers.get("content-type")).toBe("application/json"); const results = await response.json(); expect(Array.isArray(results)).toBe(true); expect(results).toHaveLength(2); // Batch responses can come in any order const listResponse = results.find((r: { id?: string }) => r.id === "batch-1"); const callResponse = results.find((r: { id?: string }) => r.id === "batch-2"); expect(listResponse).toEqual(expect.objectContaining({ jsonrpc: "2.0", id: "batch-1", result: expect.objectContaining({ tools: expect.arrayContaining([ expect.objectContaining({ name: "greet" }) ]) }) })); expect(callResponse).toEqual(expect.objectContaining({ jsonrpc: "2.0", id: "batch-2", result: expect.objectContaining({ content: expect.arrayContaining([ expect.objectContaining({ type: "text", text: "Hello, JSON!" }) ]) }) })); }); }); // Test pre-parsed body handling describe("StreamableHTTPServerTransport with pre-parsed body", () => { let server: Server; let transport: StreamableHTTPServerTransport; let baseUrl: URL; let sessionId: string; let parsedBody: unknown = null; beforeEach(async () => { const result = await createTestServer({ customRequestHandler: async (req, res) => { try { if (parsedBody !== null) { await transport.handleRequest(req, res, parsedBody); parsedBody = null; // Reset after use } else { await transport.handleRequest(req, res); } } catch (error) { console.error("Error handling request:", error); if (!res.headersSent) res.writeHead(500).end(); } }, sessionIdGenerator: (() => randomUUID()) }); server = result.server; transport = result.transport; baseUrl = result.baseUrl; // Initialize and get session ID const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize); sessionId = initResponse.headers.get("mcp-session-id") as string; }); afterEach(async () => { await stopTestServer({ server, transport }); }); it("should accept pre-parsed request body", async () => { // Set up the pre-parsed body parsedBody = { jsonrpc: "2.0", method: "tools/list", params: {}, id: "preparsed-1", }; // Send an empty body since we'll use pre-parsed body const response = await fetch(baseUrl, { method: "POST", headers: { "Content-Type": "application/json", Accept: "application/json, text/event-stream", "mcp-session-id": sessionId, }, // Empty body - we're testing pre-parsed body body: "" }); expect(response.status).toBe(200); expect(response.headers.get("content-type")).toBe("text/event-stream"); const reader = response.body?.getReader(); const { value } = await reader!.read(); const text = new TextDecoder().decode(value); // Verify the response used the pre-parsed body expect(text).toContain('"id":"preparsed-1"'); expect(text).toContain('"tools"'); }); it("should handle pre-parsed batch messages", async () => { parsedBody = [ { jsonrpc: "2.0", method: "tools/list", params: {}, id: "batch-1" }, { jsonrpc: "2.0", method: "tools/call", params: { name: "greet", arguments: { name: "PreParsed" } }, id: "batch-2" } ]; const response = await fetch(baseUrl, { method: "POST", headers: { "Content-Type": "application/json", Accept: "application/json, text/event-stream", "mcp-session-id": sessionId, }, body: "" // Empty as we're using pre-parsed }); expect(response.status).toBe(200); const reader = response.body?.getReader(); const { value } = await reader!.read(); const text = new TextDecoder().decode(value); expect(text).toContain('"id":"batch-1"'); expect(text).toContain('"tools"'); }); it("should prefer pre-parsed body over request body", async () => { // Set pre-parsed to tools/list parsedBody = { jsonrpc: "2.0", method: "tools/list", params: {}, id: "preparsed-wins", }; // Send actual body with tools/call - should be ignored const response = await fetch(baseUrl, { method: "POST", headers: { "Content-Type": "application/json", Accept: "application/json, text/event-stream", "mcp-session-id": sessionId, }, body: JSON.stringify({ jsonrpc: "2.0", method: "tools/call", params: { name: "greet", arguments: { name: "Ignored" } }, id: "ignored-id" }) }); expect(response.status).toBe(200); const reader = response.body?.getReader(); const { value } = await reader!.read(); const text = new TextDecoder().decode(value); // Should have processed the pre-parsed body expect(text).toContain('"id":"preparsed-wins"'); expect(text).toContain('"tools"'); expect(text).not.toContain('"ignored-id"'); }); }); // Test resumability support describe("StreamableHTTPServerTransport with resumability", () => { let server: Server; let transport: StreamableHTTPServerTransport; let baseUrl: URL; let sessionId: string; let mcpServer: McpServer; const storedEvents: Map<string, { eventId: string, message: JSONRPCMessage }> = new Map(); // Simple implementation of EventStore const eventStore: EventStore = { async storeEvent(streamId: string, message: JSONRPCMessage): Promise<string> { const eventId = `${streamId}_${randomUUID()}`; storedEvents.set(eventId, { eventId, message }); return eventId; }, async replayEventsAfter(lastEventId: EventId, { send }: { send: (eventId: EventId, message: JSONRPCMessage) => Promise<void> }): Promise<StreamId> { const streamId = lastEventId.split('_')[0]; // Extract stream ID from the event ID // For test simplicity, just return all events with matching streamId that aren't the lastEventId for (const [eventId, { message }] of storedEvents.entries()) { if (eventId.startsWith(streamId) && eventId !== lastEventId) { await send(eventId, message); } } return streamId; }, }; beforeEach(async () => { storedEvents.clear(); const result = await createTestServer({ sessionIdGenerator: () => randomUUID(), eventStore }); server = result.server; transport = result.transport; baseUrl = result.baseUrl; mcpServer = result.mcpServer; // Verify resumability is enabled on the transport expect((transport)['_eventStore']).toBeDefined(); // Initialize the server const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize); sessionId = initResponse.headers.get("mcp-session-id") as string; expect(sessionId).toBeDefined(); }); afterEach(async () => { await stopTestServer({ server, transport }); storedEvents.clear(); }); it("should store and include event IDs in server SSE messages", async () => { // Open a standalone SSE stream const sseResponse = await fetch(baseUrl, { method: "GET", headers: { Accept: "text/event-stream", "mcp-session-id": sessionId, "mcp-protocol-version": "2025-03-26", }, }); expect(sseResponse.status).toBe(200); expect(sseResponse.headers.get("content-type")).toBe("text/event-stream"); // Send a notification that should be stored with an event ID const notification: JSONRPCMessage = { jsonrpc: "2.0", method: "notifications/message", params: { level: "info", data: "Test notification with event ID" }, }; // Send the notification via transport await transport.send(notification); // Read from the stream and verify we got the notification with an event ID const reader = sseResponse.body?.getReader(); const { value } = await reader!.read(); const text = new TextDecoder().decode(value); // The response should contain an event ID expect(text).toContain('id: '); expect(text).toContain('"method":"notifications/message"'); // Extract the event ID const idMatch = text.match(/id: ([^\n]+)/); expect(idMatch).toBeTruthy(); // Verify the event was stored const eventId = idMatch![1]; expect(storedEvents.has(eventId)).toBe(true); const storedEvent = storedEvents.get(eventId); expect(eventId.startsWith('_GET_stream')).toBe(true); expect(storedEvent?.message).toMatchObject(notification); }); it("should store and replay MCP server tool notifications", async () => { // Establish a standalone SSE stream const sseResponse = await fetch(baseUrl, { method: "GET", headers: { Accept: "text/event-stream", "mcp-session-id": sessionId, }, }); expect(sseResponse.status).toBe(200); // Send a server notification through the MCP server await mcpServer.server.sendLoggingMessage({ level: "info", data: "First notification from MCP server" }); // Read the notification from the SSE stream const reader = sseResponse.body?.getReader(); const { value } = await reader!.read(); const text = new TextDecoder().decode(value); // Verify the notification was sent with an event ID expect(text).toContain('id: '); expect(text).toContain('First notification from MCP server'); // Extract the event ID const idMatch = text.match(/id: ([^\n]+)/); expect(idMatch).toBeTruthy(); const firstEventId = idMatch![1]; // Send a second notification await mcpServer.server.sendLoggingMessage({ level: "info", data: "Second notification from MCP server" }); // Close the first SSE stream to simulate a disconnect await reader!.cancel(); // Reconnect with the Last-Event-ID to get missed messages const reconnectResponse = await fetch(baseUrl, { method: "GET", headers: { Accept: "text/event-stream", "mcp-session-id": sessionId, "mcp-protocol-version": "2025-03-26", "last-event-id": firstEventId }, }); expect(reconnectResponse.status).toBe(200); // Read the replayed notification const reconnectReader = reconnectResponse.body?.getReader(); const reconnectData = await reconnectReader!.read(); const reconnectText = new TextDecoder().decode(reconnectData.value); // Verify we received the second notification that was sent after our stored eventId expect(reconnectText).toContain('Second notification from MCP server'); expect(reconnectText).toContain('id: '); }); }); // Test stateless mode describe("StreamableHTTPServerTransport in stateless mode", () => { let server: Server; let transport: StreamableHTTPServerTransport; let baseUrl: URL; beforeEach(async () => { const result = await createTestServer({ sessionIdGenerator: undefined }); server = result.server; transport = result.transport; baseUrl = result.baseUrl; }); afterEach(async () => { await stopTestServer({ server, transport }); }); it("should operate without session ID validation", async () => { // Initialize the server first const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize); expect(initResponse.status).toBe(200); // Should NOT have session ID header in stateless mode expect(initResponse.headers.get("mcp-session-id")).toBeNull(); // Try request without session ID - should work in stateless mode const toolsResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.toolsList); expect(toolsResponse.status).toBe(200); }); it("should handle POST requests with various session IDs in stateless mode", async () => { await sendPostRequest(baseUrl, TEST_MESSAGES.initialize); // Try with a random session ID - should be accepted const response1 = await fetch(baseUrl, { method: "POST", headers: { "Content-Type": "application/json", Accept: "application/json, text/event-stream", "mcp-session-id": "random-id-1", }, body: JSON.stringify({ jsonrpc: "2.0", method: "tools/list", params: {}, id: "t1" }), }); expect(response1.status).toBe(200); // Try with another random session ID - should also be accepted const response2 = await fetch(baseUrl, { method: "POST", headers: { "Content-Type": "application/json", Accept: "application/json, text/event-stream", "mcp-session-id": "different-id-2", }, body: JSON.stringify({ jsonrpc: "2.0", method: "tools/list", params: {}, id: "t2" }), }); expect(response2.status).toBe(200); }); it("should reject second SSE stream even in stateless mode", async () => { // Despite no session ID requirement, the transport still only allows // one standalone SSE stream at a time // Initialize the server first await sendPostRequest(baseUrl, TEST_MESSAGES.initialize); // Open first SSE stream const stream1 = await fetch(baseUrl, { method: "GET", headers: { Accept: "text/event-stream", "mcp-protocol-version": "2025-03-26" }, }); expect(stream1.status).toBe(200); // Open second SSE stream - should still be rejected, stateless mode still only allows one const stream2 = await fetch(baseUrl, { method: "GET", headers: { Accept: "text/event-stream", "mcp-protocol-version": "2025-03-26" }, }); expect(stream2.status).toBe(409); // Conflict - only one stream allowed }); });

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Jeffwalters9597/MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server