Skip to main content
Glama

MCP TypeScript SDK

taskResumability.test.ts8.6 kB
import { createServer, type Server } from 'node:http'; import { AddressInfo } from 'node:net'; import { randomUUID } from 'node:crypto'; import { Client } from '../client/index.js'; import { StreamableHTTPClientTransport } from '../client/streamableHttp.js'; import { McpServer } from '../server/mcp.js'; import { StreamableHTTPServerTransport } from '../server/streamableHttp.js'; import { CallToolResultSchema, LoggingMessageNotificationSchema } from '../types.js'; import { z } from 'zod'; import { InMemoryEventStore } from '../examples/shared/inMemoryEventStore.js'; describe('Transport resumability', () => { let server: Server; let mcpServer: McpServer; let serverTransport: StreamableHTTPServerTransport; let baseUrl: URL; let eventStore: InMemoryEventStore; beforeEach(async () => { // Create event store for resumability eventStore = new InMemoryEventStore(); // Create a simple MCP server mcpServer = new McpServer( { name: 'test-server', version: '1.0.0' }, { capabilities: { logging: {} } } ); // Add a simple notification tool that completes quickly mcpServer.tool( 'send-notification', 'Sends a single notification', { message: z.string().describe('Message to send').default('Test notification') }, async ({ message }, { sendNotification }) => { // Send notification immediately await sendNotification({ method: "notifications/message", params: { level: "info", data: message } }); return { content: [{ type: 'text', text: 'Notification sent' }] }; } ); // Add a long-running tool that sends multiple notifications mcpServer.tool( 'run-notifications', 'Sends multiple notifications over time', { count: z.number().describe('Number of notifications to send').default(10), interval: z.number().describe('Interval between notifications in ms').default(50) }, async ({ count, interval }, { sendNotification }) => { // Send notifications at specified intervals for (let i = 0; i < count; i++) { await sendNotification({ method: "notifications/message", params: { level: "info", data: `Notification ${i + 1} of ${count}` } }); // Wait for the specified interval before sending next notification if (i < count - 1) { await new Promise(resolve => setTimeout(resolve, interval)); } } return { content: [{ type: 'text', text: `Sent ${count} notifications` }] }; } ); // Create a transport with the event store serverTransport = new StreamableHTTPServerTransport({ sessionIdGenerator: () => randomUUID(), eventStore }); // Connect the transport to the MCP server await mcpServer.connect(serverTransport); // Create and start an HTTP server server = createServer(async (req, res) => { await serverTransport.handleRequest(req, res); }); // Start the server on a random port baseUrl = await new Promise<URL>((resolve) => { server.listen(0, '127.0.0.1', () => { const addr = server.address() as AddressInfo; resolve(new URL(`http://127.0.0.1:${addr.port}`)); }); }); }); afterEach(async () => { // Clean up resources await mcpServer.close().catch(() => { }); await serverTransport.close().catch(() => { }); server.close(); }); it('should store session ID when client connects', async () => { // Create and connect a client const client = new Client({ name: 'test-client', version: '1.0.0' }); const transport = new StreamableHTTPClientTransport(baseUrl); await client.connect(transport); // Verify session ID was generated expect(transport.sessionId).toBeDefined(); // Clean up await transport.close(); }); it('should have session ID functionality', async () => { // The ability to store a session ID when connecting const client = new Client({ name: 'test-client-reconnection', version: '1.0.0' }); const transport = new StreamableHTTPClientTransport(baseUrl); // Make sure the client can connect and get a session ID await client.connect(transport); expect(transport.sessionId).toBeDefined(); // Clean up await transport.close(); }); // This test demonstrates the capability to resume long-running tools // across client disconnection/reconnection it('should resume long-running notifications with lastEventId', async () => { // Create unique client ID for this test const clientId = 'test-client-long-running'; const notifications = []; let lastEventId: string | undefined; // Create first client const client1 = new Client({ id: clientId, name: 'test-client', version: '1.0.0' }); // Set up notification handler for first client client1.setNotificationHandler(LoggingMessageNotificationSchema, (notification) => { if (notification.method === 'notifications/message') { notifications.push(notification.params); } }); // Connect first client const transport1 = new StreamableHTTPClientTransport(baseUrl); await client1.connect(transport1); const sessionId = transport1.sessionId; expect(sessionId).toBeDefined(); // Start a long-running notification stream with tracking of lastEventId const onLastEventIdUpdate = jest.fn((eventId: string) => { lastEventId = eventId; }); expect(lastEventId).toBeUndefined(); // Start the notification tool with event tracking using request const toolPromise = client1.request({ method: 'tools/call', params: { name: 'run-notifications', arguments: { count: 3, interval: 10 } } }, CallToolResultSchema, { resumptionToken: lastEventId, onresumptiontoken: onLastEventIdUpdate }); // Wait for some notifications to arrive (not all) - shorter wait time await new Promise(resolve => setTimeout(resolve, 20)); // Verify we received some notifications and lastEventId was updated expect(notifications.length).toBeGreaterThan(0); expect(notifications.length).toBeLessThan(4); expect(onLastEventIdUpdate).toHaveBeenCalled(); expect(lastEventId).toBeDefined(); // Disconnect first client without waiting for completion // When we close the connection, it will cause a ConnectionClosed error for // any in-progress requests, which is expected behavior await transport1.close(); // Save the promise so we can catch it after closing const catchPromise = toolPromise.catch(err => { // This error is expected - the connection was intentionally closed if (err?.code !== -32000) { // ConnectionClosed error code console.error("Unexpected error type during transport close:", err); } }); // Add a short delay to ensure clean disconnect before reconnecting await new Promise(resolve => setTimeout(resolve, 10)); // Wait for the rejection to be handled await catchPromise; // Create second client with same client ID const client2 = new Client({ id: clientId, name: 'test-client', version: '1.0.0' }); // Set up notification handler for second client client2.setNotificationHandler(LoggingMessageNotificationSchema, (notification) => { if (notification.method === 'notifications/message') { notifications.push(notification.params); } }); // Connect second client with same session ID const transport2 = new StreamableHTTPClientTransport(baseUrl, { sessionId }); await client2.connect(transport2); // Resume the notification stream using lastEventId // This is the key part - we're resuming the same long-running tool using lastEventId await client2.request({ method: 'tools/call', params: { name: 'run-notifications', arguments: { count: 1, interval: 5 } } }, CallToolResultSchema, { resumptionToken: lastEventId, // Pass the lastEventId from the previous session onresumptiontoken: onLastEventIdUpdate }); // Verify we eventually received at leaset a few motifications expect(notifications.length).toBeGreaterThan(1); // Clean up await transport2.close(); }); });

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Jeffwalters9597/MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server