Skip to main content
Glama
jmandel

Smart EHR MCP Server

by jmandel
a2aClient.unit.test.ts52.9 kB
// File: a4a/client/test/unit/a2aClient.unit.test.ts import { describe, test, expect, jest, beforeEach, afterEach } from 'bun:test'; // Use bun:test imports import { install, InstalledClock } from '@sinonjs/fake-timers'; // Import SinonJS fake timers import { A2AClient, A2AClientConfig, ClientEventType, ClientManagedState, StatusUpdatePayload, ArtifactUpdatePayload, TaskUpdatePayload, ErrorPayload, ClosePayload, ClientCloseReason, } from '../../src/A2AClient'; // Adjust path if needed import * as A2ATypes from '../../src/types'; import { Task, TaskSendParams, Message, TaskState, TextPart } from '../../src/types'; // --- Test Utilities --- // Store original fetch const originalFetch = globalThis.fetch; let mockFetchImplementation: ((input: RequestInfo | URL, init?: RequestInit) => Promise<Response>) | null = null; // Helper to mock fetch for a test const mockFetch = (implementation: (input: RequestInfo | URL, init?: RequestInit) => Promise<Response>) => { mockFetchImplementation = implementation; // Use jest.fn() provided by bun test, cast to bypass strict type check for properties like 'preconnect' globalThis.fetch = jest.fn(async (input, init) => { if (mockFetchImplementation) { return mockFetchImplementation(input, init); } throw new Error(`fetch mock not provided for ${input.toString()}`); }) as any as typeof fetch; // Cast to satisfy the type requirement }; // Restore fetch after each test const restoreFetch = () => { globalThis.fetch = originalFetch; mockFetchImplementation = null; }; // Helper to create ReadableStream from strings (for SSE) function createReadableStream(...chunks: string[]): ReadableStream<Uint8Array> { const encoder = new TextEncoder(); let chunkIndex = 0; return new ReadableStream({ pull(controller) { if (chunkIndex < chunks.length) { controller.enqueue(encoder.encode(chunks[chunkIndex])); chunkIndex++; } else { controller.close(); } }, }); } // Helper to wait for a specific event with a timeout function waitForEvent<T = unknown>(client: A2AClient, eventName: ClientEventType, timeout = 1000): Promise<T> { return new Promise((resolve, reject) => { const timer = setTimeout(() => { client.off(eventName, listener); console.log(`[waitForEvent TIMEOUT] Event: ${eventName}, Timeout: ${timeout}ms, Final State: ${client.getCurrentState()}`); reject(new Error(`Timeout waiting for event "${eventName}" after ${timeout}ms. Client state: ${client.getCurrentState()}`)); }, timeout); const listener = (payload: T) => { clearTimeout(timer); client.off(eventName, listener); console.log(`[waitForEvent RESOLVED] Event: ${eventName}, Payload:`, payload); resolve(payload); }; client.on(eventName, listener as (...args: any[]) => void); }); } // Helper to wait for client state async function waitForClientState( client: A2AClient, conditionFn: (state: ClientManagedState) => boolean, errorMessage = 'Client state condition not met', timeout = 1000 ): Promise<ClientManagedState> { return new Promise((resolve, reject) => { let checkInterval: any; const checkTimeout = setTimeout(() => { clearInterval(checkInterval); console.log(`[waitForClientState TIMEOUT] Condition: ${conditionFn.toString()}, Timeout: ${timeout}ms, Final State: ${client?.getCurrentState()}`); reject(new Error(`${errorMessage} within ${timeout}ms. Final state: ${client?.getCurrentState()}`)); }, timeout); const checkState = () => { const currentState = client?.getCurrentState(); console.log(`[waitForClientState CHECKING] Current State: ${currentState}, Condition: ${conditionFn.toString()}`); // Log state checks if (client && conditionFn(currentState)) { clearTimeout(checkTimeout); clearInterval(checkInterval); console.log(`[waitForClientState RESOLVED] Condition Met. Final State: ${currentState}`); resolve(currentState); } else if (currentState === 'error' || currentState === 'closed') { clearTimeout(checkTimeout); clearInterval(checkInterval); // If terminal state doesn't meet condition, reject if (!conditionFn(currentState)) { console.log(`[waitForClientState REJECTED] Terminal state ${currentState} did not meet condition.`); reject(new Error(`Client entered terminal state ${currentState} while waiting for condition. ${errorMessage}`)); } else { console.log(`[waitForClientState RESOLVED] Terminal state ${currentState} met condition.`); resolve(currentState); // Terminal state met the condition } } }; checkState(); // Check immediately checkInterval = setInterval(checkState, 50); // Check frequently }); } // --- Test Suite Setup --- const TEST_AGENT_URL = 'http://test-agent.com/a2a'; const TEST_CARD_URL = 'http://test-agent.com/.well-known/agent.json'; const BASE_CONFIG: Partial<A2AClientConfig> = { getAuthHeaders: async () => ({ Authorization: 'Bearer test-token' }), pollIntervalMs: 100, // Fast polling for tests sseInitialReconnectDelayMs: 50, sseMaxReconnectDelayMs: 200, }; // Global clock variable for SinonJS timers let clock: InstalledClock; describe('A2AClient (Unit Tests with Mock Fetch)', () => { beforeEach(() => { // Reset mocks before each test // Use SinonJS fake timers clock = install(); }); afterEach(() => { restoreFetch(); // Use SinonJS fake timers clock.uninstall(); }); // --- Fixtures --- const createAgentCardFixture = (supportsSse = true): A2ATypes.AgentCard => ({ name: 'Test Agent', description: 'Agent for testing', url: TEST_AGENT_URL, version: '1.0', authentication: { schemes: ['Bearer'] }, capabilities: { streaming: supportsSse, pushNotifications: false, stateTransitionHistory: false }, defaultInputModes: ['text/plain'], defaultOutputModes: ['text/plain'], skills: [{ id: 'test-skill', name: 'Test Skill', description: 'Does testing', tags: ['test'] }] }); const createJsonRpcResponse = <TResult>(id: string | number, result: TResult): object => ({ jsonrpc: '2.0', id, result, }); const createJsonRpcErrorResponse = (id: string | number, code: number, message: string): object => ({ jsonrpc: '2.0', id, error: { code, message }, }); const createTaskFixture = (id: string, state: TaskState = 'working', artifacts: A2ATypes.Artifact[] = [], message?: A2ATypes.Message): Task => ({ id, sessionId: `session-${id}`, status: { state, timestamp: new Date().toISOString(), message }, history: [], // History not usually returned unless requested artifacts, createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }); const createSseEvent = (eventType: string, result: any): string => { const data = JSON.stringify({ jsonrpc: '2.0', id: 'sse-event', result }); // Using a fixed ID for SSE events for simplicity return `event: ${eventType}\ndata: ${data}\n\n`; }; // --- Basic Creation Tests --- test('should create client, fetch agent card, and determine strategy (SSE)', async () => { const agentCard = createAgentCardFixture(true); mockFetch(async (input) => { if (input.toString() === TEST_CARD_URL) { return new Response(JSON.stringify(agentCard), { status: 200, headers: { 'Content-Type': 'application/json' } }); } // Initial SSE connect will happen - let it timeout or fail for this test return new Response(null, { status: 404 }); }); const initialParams: TaskSendParams = { message: { role: 'user', parts: [] } }; const client = await A2AClient.create(TEST_AGENT_URL, initialParams, BASE_CONFIG); expect(client).toBeDefined(); expect(client.taskId).toBeDefined(); expect(globalThis.fetch).toHaveBeenCalledWith(TEST_CARD_URL, expect.anything()); // Allow time for async initialization (card fetch + first connect attempt) // Use SinonJS timer control await clock.nextAsync(); // Advance past any initial delays // It should attempt SSE connection expect((client as any)._strategy).toBe('sse'); // State depends on how the initial connect fails, likely connecting-sse or reconnecting-sse expect(['connecting-sse', 'reconnecting-sse', 'error']).toContain(client.getCurrentState()); client.close(); }); test('should create client, fetch agent card, and determine strategy (Poll)', async () => { const agentCard = createAgentCardFixture(false); // No SSE support mockFetch(async (input, init) => { console.log(`[Mock Fetch Poll Test] URL: ${input.toString()}, Method: ${init?.method}`); // DEBUG const url = input.toString(); if (url === TEST_CARD_URL) { return new Response(JSON.stringify(agentCard), { status: 200, headers: { 'Content-Type': 'application/json' } }); } // Initial poll 'send' will happen OR subsequent 'get' if (url === TEST_AGENT_URL && init?.method === 'POST') { const body = JSON.parse(init.body as string); if (body.method === 'tasks/send') { console.log(`[Mock Fetch Poll Test] Handling tasks/send`); // DEBUG const task = createTaskFixture(body.params.id, 'working'); return new Response(JSON.stringify(createJsonRpcResponse(body.id, task)), { status: 200, headers: { 'Content-Type': 'application/json' } }); } // ADDED: Handle tasks/get for subsequent polls in this test if (body.method === 'tasks/get') { console.log(`[Mock Fetch Poll Test] Handling tasks/get`); // DEBUG const task = createTaskFixture(body.params.id, 'working'); // Keep it working return new Response(JSON.stringify(createJsonRpcResponse(body.id, task)), { status: 200, headers: { 'Content-Type': 'application/json' } }); } } console.log(`[Mock Fetch Poll Test] No match, returning 404`); // DEBUG return new Response(null, { status: 404 }); }); const initialParams: TaskSendParams = { message: { role: 'user', parts: [] } }; const client = await A2AClient.create(TEST_AGENT_URL, initialParams, { ...BASE_CONFIG, forcePoll: false }); expect(client).toBeDefined(); expect(client.taskId).toBeDefined(); expect(globalThis.fetch).toHaveBeenCalledWith(TEST_CARD_URL, expect.anything()); // Allow time for async initialization (card fetch + first send) await clock.nextAsync(); // Allow create/send to complete expect((client as any)._strategy).toBe('poll'); // Should have moved to polling state after initial send expect(client.getCurrentState()).toBe('polling'); // Assert state after allowing init client.close(); }); test('should handle agent card fetch failure', async () => { mockFetch(async (input) => { if (input.toString() === TEST_CARD_URL) { return new Response('Not Found', { status: 404 }); } return new Response(null, { status: 404 }); }); const initialParams: TaskSendParams = { message: { role: 'user', parts: [] } }; const client = await A2AClient.create(TEST_AGENT_URL, initialParams, BASE_CONFIG); // Attach listeners BEFORE the async operation completes const errorPromise = waitForEvent<ErrorPayload>(client, 'error'); const closePromise = waitForEvent<ClosePayload>(client, 'close'); // Now wait for the events const errorPayload = await errorPromise; expect(errorPayload.context).toBe('agent-card-fetch'); expect(errorPayload.error.message).toContain('404'); const closePayload = await closePromise; expect(closePayload.reason).toBe('error-fatal'); expect(client.getCurrentState()).toBe('error'); }); // --- Polling Tests --- describe('Polling Strategy Tests', () => { const pollAgentCard = createAgentCardFixture(false); test('should complete task via polling', async () => { const taskId = 'poll-complete-task'; let pollCount = 0; const finalTask = createTaskFixture(taskId, 'completed', [{ index: 0, name: 'result', parts: [{ type: 'text', text: 'Poll Done' }] }]); const workingTask1 = createTaskFixture(taskId, 'working', [{ index: 0, name: 'result', parts: [{ type: 'text', text: 'Step 1...' }] }]); const workingTask2 = createTaskFixture(taskId, 'working', [{ index: 0, name: 'result', parts: [{ type: 'text', text: 'Step 1... Step 2...' }] }]); // Append text mockFetch(async (input, init) => { const url = input.toString(); const body = init?.body ? JSON.parse(init.body as string) : null; console.log(`[Mock Fetch Poll Test] URL: ${url}, Body: ${JSON.stringify(body)}`); if (url === TEST_CARD_URL) return new Response(JSON.stringify(pollAgentCard)); if (url === TEST_AGENT_URL) { if (body?.method === 'tasks/send') { return new Response(JSON.stringify(createJsonRpcResponse(body.id, createTaskFixture(taskId, 'working')))); } if (body?.method === 'tasks/get') { pollCount++; let taskToReturn: Task; if (pollCount === 1) { taskToReturn = workingTask1; } else if (pollCount === 2) { taskToReturn = workingTask2; } else { taskToReturn = finalTask; } console.log(`[Mock Fetch Poll Test] Poll count: ${pollCount}, Returning state: ${taskToReturn.status.state}`); return new Response(JSON.stringify(createJsonRpcResponse(body.id, taskToReturn))); } } return new Response(null, { status: 404 }); }); const initialParams: TaskSendParams = { id: taskId, message: { role: 'user', parts: [{ type: 'text', text: 'poll test' }] } }; const client = await A2AClient.create(TEST_AGENT_URL, initialParams, BASE_CONFIG); // Collect events - Attach listeners EARLY const updates: any[] = []; client.on('status-update', (p) => updates.push({ type: 'status', ...p })); client.on('artifact-update', (p) => updates.push({ type: 'artifact', ...p })); client.on('task-update', (p) => updates.push({ type: 'task', ...p })); console.log("[TEST] Letting client run its polling cycle..."); // Let all timers run until completion await clock.runAllAsync(); console.log("[TEST] Client run complete."); // Now check the final state directly // Since we've advanced the clock enough times and processed microtasks, // the client should have transitioned to the closed state. expect(client.getCurrentState()).toBe('closed'); const finalPolledTask = client.getCurrentTask(); expect(finalPolledTask?.status.state).toBe('completed'); const finalPolledArtifactPart = finalPolledTask?.artifacts?.[0]?.parts?.[0]; expect(finalPolledArtifactPart?.type === 'text' ? finalPolledArtifactPart.text : undefined).toBe('Poll Done'); // Verify events emitted expect(updates.some(u => u.type === 'status' && u.status.state === 'working')).toBe(true); expect(updates.some(u => u.type === 'status' && u.status.state === 'completed')).toBe(true); // Check specific artifact updates based on diffs const artifactUpdates = updates.filter(u => u.type === 'artifact'); expect(artifactUpdates.length).toBe(3); // workingTask1, workingTask2, finalTask expect(artifactUpdates[0].artifact.parts[0].text).toBe('Step 1...'); expect(artifactUpdates[1].artifact.parts[0].text).toBe('Step 1... Step 2...'); expect(artifactUpdates[2].artifact.parts[0].text).toBe('Poll Done'); // Check task updates const taskUpdates = updates.filter(u => u.type === 'task'); expect(taskUpdates.length).toBeGreaterThanOrEqual(3); // Initial send + poll1 + poll2 + final poll expect(updates[updates.length - 1].type).toBe('task'); // Last event should be task update expect(updates[updates.length - 1].task.status.state).toBe('completed'); expect((updates[updates.length - 1].task.artifacts?.[0]?.parts?.[0] as A2ATypes.TextPart)?.text).toBe('Poll Done'); }); test('should handle input-required and resume via polling', async () => { const taskId = 'poll-input-task'; let state: TaskState = 'working'; let pollGetCount = 0; const promptMessage: Message = { role: 'agent', parts: [{ type: 'text', text: 'Need topic' }] }; const finalArtifact: A2ATypes.Artifact = { index: 0, name: 'result', parts: [{ type: 'text', text: 'Joke about input' }]}; mockFetch(async (input, init) => { const url = input.toString(); const body = init?.body ? JSON.parse(init.body as string) : null; if (url === TEST_CARD_URL) return new Response(JSON.stringify(pollAgentCard)); if (url === TEST_AGENT_URL) { if (body?.method === 'tasks/send') { // Initial send OR send after input if (state === 'working') { // Initial create state = 'input-required'; // Move to input required after first send return new Response(JSON.stringify(createJsonRpcResponse(body.id, createTaskFixture(taskId, state, [], promptMessage)))); } else if (state === 'input-required') { // Sending the input state = 'completed'; // Assume it completes after input return new Response(JSON.stringify(createJsonRpcResponse(body.id, createTaskFixture(taskId, state, [finalArtifact])))); } } if (body?.method === 'tasks/get') { pollGetCount++; // Return current state during polling before input is provided return new Response(JSON.stringify(createJsonRpcResponse(body.id, createTaskFixture(taskId, state, [], state === 'input-required' ? promptMessage : undefined)))); } } return new Response(null, { status: 404 }); }); const initialParams: TaskSendParams = { id: taskId, message: { role: 'user', parts: [{ type: 'text', text: 'input poll test' }] } }; const client = await A2AClient.create(TEST_AGENT_URL, initialParams, BASE_CONFIG); // Allow initialization to complete await clock.nextAsync(); expect(client.getCurrentState()).toBe('input-required'); // Type guard before accessing .text const promptPart = client.getCurrentTask()?.status.message?.parts[0]; expect(promptPart?.type === 'text' ? promptPart.text : undefined).toBe('Need topic'); // Send input const inputMessage: Message = { role: 'user', parts: [{ type: 'text', text: 'the topic' }] }; // Start listening for close BEFORE sending const closePromiseInput = waitForEvent<ClosePayload>(client, 'close', 5000); await client.send(inputMessage); // This restarts polling internally // Allow the promise chain within send() and the subsequent event emission to run await clock.nextAsync(); // Add extra tick for event listener await clock.nextAsync(); // Second tick needed for event listener const closePayload = await closePromiseInput; // Now wait expect(closePayload.reason).toBe('task-completed'); expect(client.getCurrentState()).toBe('closed'); expect(client.getCurrentTask()?.status.state).toBe('completed'); // Type guard before accessing .text const finalArtifactPartInput = client.getCurrentTask()?.artifacts?.[0]?.parts?.[0]; expect(finalArtifactPartInput?.type === 'text' ? finalArtifactPartInput.text : undefined).toBe('Joke about input'); }); test('should handle poll errors and retry', async () => { const taskId = 'poll-retry-task'; let pollCount = 0; mockFetch(async (input, init) => { const url = input.toString(); const body = init?.body ? JSON.parse(init.body as string) : null; if (url === TEST_CARD_URL) return new Response(JSON.stringify(pollAgentCard)); if (url === TEST_AGENT_URL) { if (body?.method === 'tasks/send') { return new Response(JSON.stringify(createJsonRpcResponse(body.id, createTaskFixture(taskId, 'working')))); } if (body?.method === 'tasks/get') { pollCount++; if (pollCount === 1) { // Fail first poll return new Response('Server Error', { status: 500 }); } else { // Succeed second poll return new Response(JSON.stringify(createJsonRpcResponse(body.id, createTaskFixture(taskId, 'completed', [{index: 0, parts: [{type: 'text', text: 'Retry ok'}]}])))); } } } return new Response(null, { status: 404 }); }); const initialParams: TaskSendParams = { id: taskId, message: { role: 'user', parts: [] } }; const client = await A2AClient.create(TEST_AGENT_URL, initialParams, BASE_CONFIG); // Attach listeners early const errors: ErrorPayload[] = []; client.on('error', (p) => errors.push(p)); const closePromiseRetry = waitForEvent<ClosePayload>(client, 'close', 2000); console.log("[TEST] Running all timers for poll retry test..."); await clock.runAllAsync(); console.log("[TEST] All timers run for poll retry test."); // Assert that an error occurred expect(errors.length).toBe(1); const errorPayload = errors[0]; expect(errorPayload.context).toBe('poll-get'); // Assert final state is closed (task completed on retry) expect(client.getCurrentState()).toBe('closed'); // Wait for completion close after retry const closePayload = await closePromiseRetry; // Wait for the close console.log("Received close event"); expect(closePayload.reason).toBe('task-completed'); }); test('should fail polling after max retries', async () => { const taskId = 'poll-fail-task'; let pollCount = 0; mockFetch(async (input, init) => { const url = input.toString(); const body = init?.body ? JSON.parse(init.body as string) : null; if (url === TEST_CARD_URL) return new Response(JSON.stringify(pollAgentCard)); if (url === TEST_AGENT_URL) { if (body?.method === 'tasks/send') { return new Response(JSON.stringify(createJsonRpcResponse(body.id, createTaskFixture(taskId, 'working')))); } if (body?.method === 'tasks/get') { pollCount++; console.log(`Mock Fetch: tasks/get attempt ${pollCount}`); return new Response('Server Error Always', { status: 500 }); // Always fail } } return new Response(null, { status: 404 }); }); const initialParams: TaskSendParams = { id: taskId, message: { role: 'user', parts: [] } }; const client = await A2AClient.create(TEST_AGENT_URL, initialParams, { ...BASE_CONFIG, pollMaxErrorAttempts: 2 }); // Lower retry limit // Attach listeners early const errors: ErrorPayload[] = []; client.on('error', (p) => errors.push(p)); const closePromiseFail = waitForEvent<ClosePayload>(client, 'close', 1500); console.log("[TEST] Running all timers for poll fail test..."); await clock.runAllAsync(); console.log("[TEST] All timers run for poll fail test."); // Allow final error/close events to process const closePayload = await closePromiseFail; // Assert errors occurred expect(errors.length).toBe(3); // poll-get, poll-get, poll-retry-failed expect(errors[0].context).toBe('poll-get'); expect(errors[1].context).toBe('poll-get'); expect(errors[2].context).toBe('poll-retry-failed'); expect(closePayload.reason).toBe('poll-retry-failed'); expect(client.getCurrentState()).toBe('error'); // Final state is error }); // Add tests for resume (polling), cancel (polling) similarly... }); // --- SSE Tests --- describe('SSE Strategy Tests', () => { const sseAgentCard = createAgentCardFixture(true); test('should complete task via SSE', async () => { const taskId = 'sse-complete-task'; const finalTask = createTaskFixture(taskId, 'completed', [{ index: 0, name: 'result', parts: [{ type: 'text', text: 'SSE Done' }] }]); const artifactChunk1: Partial<A2ATypes.Artifact> = { index: 0, name: 'result', parts: [{ type: 'text', text: 'SSE part 1...' }], append: false, lastChunk: false }; const artifactChunk2: Partial<A2ATypes.Artifact> = { index: 0, parts: [{ type: 'text', text: 'part 2... ' }], append: true, lastChunk: false }; // Append const artifactChunk3: Partial<A2ATypes.Artifact> = { index: 0, parts: [{ type: 'text', text: 'part 3 FINAL.' }], append: true, lastChunk: true }; // Append + final const sseStream = createReadableStream( createSseEvent('TaskStatusUpdate', { status: { state: 'working' }, final: false }), createSseEvent('TaskArtifactUpdate', { artifact: artifactChunk1 }), createSseEvent('TaskArtifactUpdate', { artifact: artifactChunk2 }), createSseEvent('TaskArtifactUpdate', { artifact: artifactChunk3 }), createSseEvent('TaskStatusUpdate', { status: { state: 'completed' }, final: true }) ); mockFetch(async (input, init) => { const url = input.toString(); const body = init?.body ? JSON.parse(init.body as string) : null; if (url === TEST_CARD_URL) return new Response(JSON.stringify(sseAgentCard)); if (url === TEST_AGENT_URL && body?.method === 'tasks/sendSubscribe') { return new Response(sseStream, { status: 200, headers: { 'Content-Type': 'text/event-stream' } }); } return new Response(null, { status: 404 }); }); const initialParams: TaskSendParams = { id: taskId, message: { role: 'user', parts: [{ type: 'text', text: 'sse test' }] } }; const client = await A2AClient.create(TEST_AGENT_URL, initialParams, BASE_CONFIG); // Collect events const updates: any[] = []; client.on('status-update', (p) => updates.push({ type: 'status', ...p })); client.on('artifact-update', (p) => updates.push({ type: 'artifact', ...p })); client.on('task-update', (p) => updates.push({ type: 'task', ...p })); const closePromiseSSE = waitForEvent<ClosePayload>(client, 'close', 2000); // Give time for async init AND try flushing timer queue await clock.nextAsync(); // Allow create()/stream processing to start const closePayload = await closePromiseSSE; console.log(`[TEST] Received close event: ${closePayload.reason}`, closePayload); expect(closePayload.reason).toBe('task-completed'); expect(client.getCurrentState()).toBe('closed'); const currentTask = client.getCurrentTask(); expect(currentTask?.status.state).toBe('completed'); expect(currentTask?.artifacts?.length).toBe(1); expect(currentTask?.artifacts?.[0]?.name).toBe('result'); expect(currentTask?.artifacts?.[0]?.index).toBe(0); expect(currentTask?.artifacts?.[0]?.parts?.length).toBe(3); // All parts accumulated // Type guard before accessing .text const ssePart1 = currentTask?.artifacts?.[0]?.parts?.[0]; const ssePart2 = currentTask?.artifacts?.[0]?.parts?.[1]; const ssePart3 = currentTask?.artifacts?.[0]?.parts?.[2]; expect(ssePart1?.type === 'text' ? ssePart1.text : undefined).toBe('SSE part 1...'); expect(ssePart2?.type === 'text' ? ssePart2.text : undefined).toBe('part 2... '); expect(ssePart3?.type === 'text' ? ssePart3.text : undefined).toBe('part 3 FINAL.'); // Verify events emitted match synthesized state expect(updates.some(u => u.type === 'status' && u.status.state === 'working')).toBe(true); expect(updates.some(u => u.type === 'status' && u.status.state === 'completed')).toBe(true); expect(updates.some(u => u.type === 'artifact' && u.artifact.name === 'result')).toBe(true); expect(updates.filter(u => u.type === 'task').length).toBeGreaterThanOrEqual(2); // working + completed // Last task update should reflect the final state const lastTaskUpdate = updates.filter(u => u.type === 'task').pop(); expect(lastTaskUpdate.task.status.state).toBe('completed'); // Type guard before accessing .text const lastUpdatePart1 = lastTaskUpdate?.task?.artifacts?.[0]?.parts?.[0]; const lastUpdatePart2 = lastTaskUpdate?.task?.artifacts?.[0]?.parts?.[1]; const lastUpdatePart3 = lastTaskUpdate?.task?.artifacts?.[0]?.parts?.[2]; expect(lastUpdatePart1?.type === 'text' ? lastUpdatePart1.text : undefined).toBe('SSE part 1...'); expect(lastUpdatePart2?.type === 'text' ? lastUpdatePart2.text : undefined).toBe('part 2... '); expect(lastUpdatePart3?.type === 'text' ? lastUpdatePart3.text : undefined).toBe('part 3 FINAL.'); }); test('should handle SSE connection error and reconnect', async () => { const taskId = 'sse-reconnect-task'; let connectAttempt = 0; const finalStream = createReadableStream( createSseEvent('TaskStatusUpdate', { status: { state: 'completed' }, final: true }) ); mockFetch(async (input, init) => { const url = input.toString(); const body = init?.body ? JSON.parse(init.body as string) : null; if (url === TEST_CARD_URL) return new Response(JSON.stringify(sseAgentCard)); if (url === TEST_AGENT_URL && (body?.method === 'tasks/sendSubscribe' || body?.method === 'tasks/resubscribe')) { connectAttempt++; console.log(`Mock Fetch: SSE connect attempt ${connectAttempt}`); if (connectAttempt === 1) { // Fail first attempt return new Response('Gateway Timeout', { status: 504 }); } else { // Succeed second attempt (resubscribe) expect(body.method).toBe('tasks/resubscribe'); // Ensure it's using resubscribe return new Response(finalStream, { status: 200, headers: { 'Content-Type': 'text/event-stream' } }); } } return new Response(null, { status: 404 }); }); const initialParams: TaskSendParams = { id: taskId, message: { role: 'user', parts: [] } }; const client = await A2AClient.create(TEST_AGENT_URL, initialParams, BASE_CONFIG); // Attach listeners early const errorPromiseSSEConnect = waitForEvent<ErrorPayload>(client, 'error', 500); const closePromiseSSEReconnect = waitForEvent<ClosePayload>(client, 'close', 2000); // Let create() fail and emit the first error // await clock.nextAsync(); // Might not be needed if create promise resolves after error let errorPayload = await errorPromiseSSEConnect; expect(errorPayload.context).toBe('sse-connect'); expect(client.getCurrentState()).toBe('reconnecting-sse'); // Should be trying to reconnect // Advance clock to trigger the reconnect timer await clock.tickAsync(BASE_CONFIG.sseInitialReconnectDelayMs! + 10); // Wait for successful close after reconnect const closePayload = await closePromiseSSEReconnect; expect(closePayload.reason).toBe('task-completed'); expect(client.getCurrentState()).toBe('closed'); expect(connectAttempt).toBe(2); // Ensure reconnect happened }); test('should fail SSE after max reconnect attempts', async () => { const taskId = 'sse-fail-reconnect-task'; let connectAttempt = 0; mockFetch(async (input, init) => { const url = input.toString(); const body = init?.body ? JSON.parse(init.body as string) : null; if (url === TEST_CARD_URL) return new Response(JSON.stringify(sseAgentCard)); if (url === TEST_AGENT_URL && (body?.method === 'tasks/sendSubscribe' || body?.method === 'tasks/resubscribe')) { connectAttempt++; console.log(`Mock Fetch: SSE connect attempt ${connectAttempt} (failing)`); return new Response('Gateway Timeout Always', { status: 504 }); // Always fail } return new Response(null, { status: 404 }); }); const initialParams: TaskSendParams = { id: taskId, message: { role: 'user', parts: [] } }; // Lower reconnect attempts for test const client = await A2AClient.create(TEST_AGENT_URL, initialParams, { ...BASE_CONFIG, sseMaxReconnectAttempts: 1 }); // 1. Listen for the first error const firstErrorPromise = waitForEvent<ErrorPayload>(client, 'error', 500); // Short timeout // 2. Await the first error (allow initialization and first connect failure) const firstErrorPayload = await firstErrorPromise; // 3. Assert intermediate state expect(firstErrorPayload.context).toBe('sse-connect'); expect(client.getCurrentState()).toBe('reconnecting-sse'); // 4. Listen for final events AFTER the first error const closePromiseSSEFail = waitForEvent<ClosePayload>(client, 'close', 1500); // 5. Advance time (trigger reconnect attempt + failure) await clock.tickAsync(BASE_CONFIG.pollIntervalMs! + 10); // 6. Await final events const closePayload = await closePromiseSSEFail; // 7. Assert final state expect(closePayload.reason).toBe('sse-reconnect-failed'); expect(client.getCurrentState()).toBe('error'); expect(connectAttempt).toBe(2); // Initial attempt + 1 reconnect attempt }); // Add tests for resume (SSE), cancel (SSE), input-required (SSE) similarly... }); // --- General Tests (Cancel, Send Errors) --- test('should cancel task successfully while polling', async () => { const taskId = 'cancel-poll-task'; const agentCard = createAgentCardFixture(false); // Use polling mockFetch(async (input, init) => { const url = input.toString(); const body = init?.body ? JSON.parse(init.body as string) : null; if (url === TEST_CARD_URL) return new Response(JSON.stringify(agentCard)); if (url === TEST_AGENT_URL) { if (body?.method === 'tasks/send') { // Initial create return new Response(JSON.stringify(createJsonRpcResponse(body.id, createTaskFixture(taskId, 'working')))); } if (body?.method === 'tasks/get') { // Polling - keep working return new Response(JSON.stringify(createJsonRpcResponse(body.id, createTaskFixture(taskId, 'working')))); } if (body?.method === 'tasks/cancel') { // Cancel request return new Response(JSON.stringify(createJsonRpcResponse(body.id, createTaskFixture(taskId, 'canceled')))); } } return new Response(null, { status: 404 }); }); const initialParams: TaskSendParams = { id: taskId, message: { role: 'user', parts: [] } }; const client = await A2AClient.create(TEST_AGENT_URL, initialParams, BASE_CONFIG); await clock.nextAsync(); // Listen for close event before canceling const closePromise = waitForEvent<ClosePayload>(client, 'close'); await client.cancel(); const closePayload = await closePromise; expect(closePayload.reason).toBe('task-canceled-by-client'); expect(client.getCurrentState()).toBe('closed'); expect(client.getCurrentTask()?.status.state).toBe('canceled'); }); test('should throw error when sending message in terminal state', async () => { const taskId = 'send-terminal-task'; const agentCard = createAgentCardFixture(false); mockFetch(async (input, init) => { const url = input.toString(); const body = init?.body ? JSON.parse(init.body as string) : null; if (url === TEST_CARD_URL) return new Response(JSON.stringify(agentCard)); if (url === TEST_AGENT_URL && body?.method === 'tasks/send') { // Complete immediately return new Response(JSON.stringify(createJsonRpcResponse(body.id, createTaskFixture(taskId, 'completed')))); } return new Response(null, { status: 404 }); }); const initialParams: TaskSendParams = { id: taskId, message: { role: 'user', parts: [] } }; const client = await A2AClient.create(TEST_AGENT_URL, initialParams, BASE_CONFIG); // Wait for client to close console.log("[TEST] Running all timers for send terminal test..."); await clock.runAllAsync(); console.log("[TEST] All timers run for send terminal test."); // Ensure client is closed as expected by the mock expect(client.getCurrentState()).toBe('closed'); const inputMessage: Message = { role: 'user', parts: [{ type: 'text', text: 'too late' }] }; await expect(client.send(inputMessage)).rejects.toThrow(`Cannot send message in state: closed`); }); // --- Resume Tests --- test('should resume an input-required task via polling and complete', async () => { const taskId = 'resume-input-task'; const agentCard = createAgentCardFixture(false); // Use polling const promptMessage: Message = { role: 'agent', parts: [{ type: 'text', text: 'Need resume topic' }] }; const finalArtifact: A2ATypes.Artifact = { index: 0, name: 'result', parts: [{ type: 'text', text: 'Joke about resumed topic' }]}; let currentState: TaskState = 'working'; let fetchCount = 0; mockFetch(async (input, init) => { fetchCount++; const url = input.toString(); const body = init?.body ? JSON.parse(init.body as string) : null; console.log(`[Mock Fetch Resume Test #${fetchCount}] URL: ${url}, Method: ${init?.method}, State: ${currentState}, Body: ${JSON.stringify(body)}`); if (url === TEST_CARD_URL) return new Response(JSON.stringify(agentCard)); if (url === TEST_AGENT_URL) { // Phase 1: Initial create/poll to input-required if (body?.method === 'tasks/send' && currentState === 'working') { // Initial create currentState = 'input-required'; console.log(` -> State change: ${currentState}`); return new Response(JSON.stringify(createJsonRpcResponse(body.id, createTaskFixture(taskId, currentState, [], promptMessage)))); } // Phase 2: Resume (uses tasks/get) if (body?.method === 'tasks/get' && currentState === 'input-required') { // Return the input-required state for the resume poll return new Response(JSON.stringify(createJsonRpcResponse(body.id, createTaskFixture(taskId, currentState, [], promptMessage)))); } // Phase 3: Send input via resumed client if (body?.method === 'tasks/send' && currentState === 'input-required') { currentState = 'completed'; // Completes after input console.log(` -> State change: ${currentState}`); return new Response(JSON.stringify(createJsonRpcResponse(body.id, createTaskFixture(taskId, currentState, [finalArtifact])))); } // Phase 3: Final polls after sending input if (body?.method === 'tasks/get' && currentState === 'completed') { return new Response(JSON.stringify(createJsonRpcResponse(body.id, createTaskFixture(taskId, currentState, [finalArtifact])))); } } console.error(`[Mock Fetch Resume Test #${fetchCount}] Unhandled request!`); return new Response('Unhandled Mock Request', { status: 500 }); }); // --- Phase 1: Create and run to input-required --- console.log("[Resume Test Phase 1] Creating initial client..."); const initialParams: TaskSendParams = { id: taskId, message: { role: 'user', parts: [{ type: 'text', text: 'resume test initial' }] } }; const client1 = await A2AClient.create(TEST_AGENT_URL, initialParams, BASE_CONFIG); // Let it run to input-required (should only involve the initial send) await clock.runAllAsync(); expect(client1.getCurrentState()).toBe('input-required'); const taskAfterPhase1 = client1.getCurrentTask(); expect(taskAfterPhase1?.status.state).toBe('input-required'); expect((taskAfterPhase1?.status.message?.parts?.[0] as TextPart)?.text).toBe('Need resume topic'); console.log("[Resume Test Phase 1] Closing initial client..."); client1.close(); expect(client1.getCurrentState()).toBe('closed'); // --- Phase 2: Resume the task --- console.log("[Resume Test Phase 2] Resuming client..."); const client2 = await A2AClient.resume(TEST_AGENT_URL, taskId, BASE_CONFIG); const resumeUpdates: any[] = []; client2.on('task-update', (p) => resumeUpdates.push({type:'task',...p})); // Let resume run its initial poll await clock.runAllAsync(); expect(client2.getCurrentState()).toBe('input-required'); // Should be back to input-required const taskAfterResume = client2.getCurrentTask(); expect(taskAfterResume?.status.state).toBe('input-required'); expect((taskAfterResume?.status.message?.parts?.[0] as TextPart)?.text).toBe('Need resume topic'); expect(resumeUpdates.length).toBeGreaterThanOrEqual(1); // Should get at least one update from resume poll // --- Phase 3: Provide input and complete --- console.log("[Resume Test Phase 3] Sending input via resumed client..."); const inputMessage: Message = { role: 'user', parts: [{ type: 'text', text: 'the resumed topic' }] }; await client2.send(inputMessage); // Let the send and subsequent polling complete await clock.runAllAsync(); console.log("[Resume Test Phase 3] Checking final state..."); expect(client2.getCurrentState()).toBe('closed'); const finalTask = client2.getCurrentTask(); expect(finalTask?.status.state).toBe('completed'); const finalArtifactPart = finalTask?.artifacts?.[0]?.parts?.[0]; expect(finalArtifactPart?.type === 'text' ? finalArtifactPart.text : undefined).toBe('Joke about resumed topic'); // Check updates from the resumed client const finalTaskUpdate = resumeUpdates.pop(); // Get the last task update expect(finalTaskUpdate.task.status.state).toBe('completed'); }); test('should resume an input-required task via SSE and complete', async () => { const taskId = 'resume-sse-task'; const agentCard = createAgentCardFixture(true); // Use SSE const promptMessage: Message = { role: 'agent', parts: [{ type: 'text', text: 'Need SSE resume topic' }] }; const finalArtifact: A2ATypes.Artifact = { index: 0, name: 'result', parts: [{ type: 'text', text: 'Joke about SSE resumed topic' }] }; let currentState: TaskState = 'working'; let fetchCount = 0; // SSE streams for different phases const initialSseStream = createReadableStream( createSseEvent('TaskStatusUpdate', { status: { state: 'working' }, final: false }), createSseEvent('TaskStatusUpdate', { status: { state: 'input-required', message: promptMessage }, final: true }) ); const resumeSseStream = createReadableStream( // Resubscribe might just confirm current state or be empty before closing createSseEvent('TaskStatusUpdate', { status: { state: 'input-required', message: promptMessage }, final: true }) ); mockFetch(async (input, init) => { fetchCount++; const url = input.toString(); const body = init?.body ? JSON.parse(init.body as string) : null; console.log(`[Mock Fetch SSE Resume Test #${fetchCount}] URL: ${url}, Method: ${init?.method}, State: ${currentState}, Body: ${JSON.stringify(body)}`); if (url === TEST_CARD_URL) return new Response(JSON.stringify(agentCard)); if (url === TEST_AGENT_URL) { // Phase 1: Initial create with SSE if (body?.method === 'tasks/sendSubscribe' && currentState === 'working') { currentState = 'input-required'; // State changes after stream ends console.log(` -> Providing initial SSE stream (will end as ${currentState})`); return new Response(initialSseStream, { status: 200, headers: { 'Content-Type': 'text/event-stream' } }); } // Phase 2: Resume with SSE if (body?.method === 'tasks/resubscribe' && currentState === 'input-required') { console.log(` -> Providing resume SSE stream`); return new Response(resumeSseStream, { status: 200, headers: { 'Content-Type': 'text/event-stream' } }); } // Phase 3: Send input (forces polling) if (body?.method === 'tasks/send' && currentState === 'input-required') { currentState = 'completed'; // Completes after input console.log(` -> State change: ${currentState}`); return new Response(JSON.stringify(createJsonRpcResponse(body.id, createTaskFixture(taskId, currentState, [finalArtifact])))); } // Phase 3: Polls after sending input if (body?.method === 'tasks/get' && currentState === 'completed') { return new Response(JSON.stringify(createJsonRpcResponse(body.id, createTaskFixture(taskId, currentState, [finalArtifact])))); } } console.error(`[Mock Fetch SSE Resume Test #${fetchCount}] Unhandled request!`); return new Response('Unhandled Mock Request', { status: 500 }); }); // --- Phase 1: Create and run SSE to input-required --- console.log("[SSE Resume Test Phase 1] Creating initial client..."); const initialParams: TaskSendParams = { id: taskId, message: { role: 'user', parts: [{ type: 'text', text: 'sse resume test initial' }] } }; const client1 = await A2AClient.create(TEST_AGENT_URL, initialParams, BASE_CONFIG); // Let the initial SSE stream run to completion await clock.runAllAsync(); // The stream ended with final:true, but the task is input-required, // so the client should remain input-required expect(client1.getCurrentState()).toBe('input-required'); const taskAfterPhase1 = client1.getCurrentTask(); expect(taskAfterPhase1?.status.state).toBe('input-required'); expect((taskAfterPhase1?.status.message?.parts?.[0] as TextPart)?.text).toBe('Need SSE resume topic'); // No need to explicitly close client1 here, as we are simulating abandoning it // --- Phase 2: Resume the task --- console.log("[SSE Resume Test Phase 2] Resuming client..."); const client2 = await A2AClient.resume(TEST_AGENT_URL, taskId, BASE_CONFIG); const resumeUpdates: any[] = []; client2.on('task-update', (p) => resumeUpdates.push({ type: 'task', ...p })); // Let the resume SSE stream run (it might close immediately) await clock.runAllAsync(); // Even if the resubscribe stream closed, the *task* state dictates the client state expect(client2.getCurrentState()).toBe('input-required'); const taskAfterResume = client2.getCurrentTask(); expect(taskAfterResume?.status.state).toBe('input-required'); expect((taskAfterResume?.status.message?.parts?.[0] as TextPart)?.text).toBe('Need SSE resume topic'); // --- Phase 3: Provide input and complete --- console.log("[SSE Resume Test Phase 3] Sending input via resumed client (will force polling)..."); const inputMessage: Message = { role: 'user', parts: [{ type: 'text', text: 'the sse resumed topic' }] }; await client2.send(inputMessage); // Let the send (which starts polling) and subsequent polling complete await clock.runAllAsync(); console.log("[SSE Resume Test Phase 3] Checking final state..."); expect(client2.getCurrentState()).toBe('closed'); const finalTask = client2.getCurrentTask(); expect(finalTask?.status.state).toBe('completed'); const finalArtifactPart = finalTask?.artifacts?.[0]?.parts?.[0]; expect(finalArtifactPart?.type === 'text' ? finalArtifactPart.text : undefined).toBe('Joke about SSE resumed topic'); // Check updates from the resumed client (post-resume) const finalTaskUpdate = resumeUpdates.find(u => u.task.status.state === 'completed'); expect(finalTaskUpdate).toBeDefined(); // Should have received the completed state update }); });

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jmandel/health-record-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server