Skip to main content
Glama
client.test.ts23.7 kB
// SPDX-FileCopyrightText: Copyright Orangebot, Inc. and Medplum contributors // SPDX-License-Identifier: Apache-2.0 import type { AckCode } from '@medplum/core'; import { Hl7Message, isOperationOutcome, OperationOutcomeError, sleep } from '@medplum/core'; import type { Socket } from 'node:net'; import net, { createServer } from 'node:net'; import { Hl7Client } from './client'; import type { Hl7Connection } from './connection'; import { ReturnAckCategory } from './connection'; import type { Hl7ErrorEvent } from './events'; import { Hl7Server } from './server'; import { MockServer, MockSocket } from './test-utils'; describe('Hl7Client', () => { const usedPorts = [] as number[]; // Helper function to get a random port number // This helps avoid conflicts when running tests in parallel function getRandomPort(): number { let port = Math.floor(Math.random() * 10000) + 30000; while (usedPorts.includes(port)) { port = Math.floor(Math.random() * 10000) + 30000; } // Once we have an unused port, add it to used ports and return it usedPorts.push(port); return port; } describe('sendAndWait', () => { const port = getRandomPort(); const defaultResponseCb = (message: Hl7Message): Hl7Message => { return message.buildAck(); }; let hl7Server: Hl7Server; let hl7Client: Hl7Client; let nextResponseCb: ((message: Hl7Message) => Hl7Message) | undefined = undefined; beforeAll(async () => { hl7Server = new Hl7Server((connection) => { connection.addEventListener('message', ({ message }) => { // Check if a response cb has been set, otherwise use the default if (nextResponseCb) { connection.send(nextResponseCb(message)); } else { connection.send(defaultResponseCb(message)); } }); }); await hl7Server.start(port); }); beforeEach(async () => { nextResponseCb = undefined; hl7Client = new Hl7Client({ host: 'localhost', port, }); await hl7Client.connect(); }); afterEach(async () => { await hl7Client.close(); }); afterAll(async () => { await hl7Server.stop(); }); test('Resolves on when receiving ACK containing the message control ID', async () => { const ack = await hl7Client.sendAndWait( Hl7Message.parse( 'MSH|^~\\&|ADT1|MCM|LABADT|MCM|198808181126|SECURITY|ADT^A01|MSG00001|P|2.2\r' + 'PID|||PATID1234^5^M11||JONES^WILLIAM^A^III||19610615|M-' ) ); expect(ack).toBeDefined(); }); test('Does not resolve on the incorrect message control ID', async () => { // Set up a custom response callback that sends an ACK with a different message control ID nextResponseCb = (message: Hl7Message) => { // Create an ACK with a different message control ID than the original message const ack = message.buildAck(); // Change the message control ID in the ACK to something different ack.getSegment('MSA')?.setField(2, 'DIFFERENT_MSG_ID'); return ack; }; let timedOut = false; try { await hl7Client.sendAndWait( Hl7Message.parse( 'MSH|^~\\&|ADT1|MCM|LABADT|MCM|198808181126|SECURITY|ADT^A01|MSG00001|P|2.2\r' + 'PID|||PATID1234^5^M11||JONES^WILLIAM^A^III||19610615|M-' ), { timeoutMs: 100 } ); } catch (err) { if ( isOperationOutcome((err as OperationOutcomeError).outcome) && (err as OperationOutcomeError).outcome.issue?.[0].details?.text === 'Client timeout' ) { timedOut = true; } else { throw err; } } expect(timedOut).toStrictEqual(true); }); test('Emits warning when receiving an ACK for a message control ID not found in pending messages', async () => { // Set up a custom response callback that sends an ACK with a different message control ID nextResponseCb = (message: Hl7Message) => { // Create an ACK with a different message control ID than the original message const ack = message.buildAck(); // Change the message control ID in the ACK to something different ack.getSegment('MSA')?.setField(2, 'UNKNOWN_MSG_ID'); return ack; }; // Listen for warning events let warningEvent: any = null; hl7Client.addEventListener('error', (event) => { if ( event.error instanceof OperationOutcomeError && isOperationOutcome(event.error.outcome) && event.error.outcome.issue?.[0].severity === 'warning' && event.error.outcome.issue?.[0].details?.text === 'Response received for unknown message control ID' ) { warningEvent = event; } }); // Send a message and wait for the ACK with wrong message control ID await hl7Client .sendAndWait( Hl7Message.parse( 'MSH|^~\\&|ADT1|MCM|LABADT|MCM|198808181126|SECURITY|ADT^A01|MSG00001|P|2.2\r' + 'PID|||PATID1234^5^M11||JONES^WILLIAM^A^III||19610615|M-' ), { timeoutMs: 100 } ) .catch(() => { // Expected to timeout since ACK has wrong message control ID }); // Wait until next tick for event to process await sleep(0); expect(warningEvent).toBeDefined(); expect(warningEvent.error.outcome.issue[0].diagnostics).toContain('UNKNOWN_MSG_ID'); }); test('Rejects when message response times out', async () => { // Set up a custom response callback that doesn't respond (to trigger timeout) nextResponseCb = (_message: Hl7Message) => { // Don't send any response, which will cause the client to timeout return null as any; }; let timeoutError: any = null; try { await hl7Client.sendAndWait( Hl7Message.parse( 'MSH|^~\\&|ADT1|MCM|LABADT|MCM|198808181126|SECURITY|ADT^A01|MSG00001|P|2.2\r' + 'PID|||PATID1234^5^M11||JONES^WILLIAM^A^III||19610615|M-' ), { timeoutMs: 100 } ); } catch (err) { timeoutError = err; } expect(timeoutError).toBeDefined(); expect(timeoutError).toBeInstanceOf(OperationOutcomeError); expect(isOperationOutcome(timeoutError.outcome)).toBe(true); expect(timeoutError.outcome.issue?.[0].code).toBe('timeout'); expect(timeoutError.outcome.issue?.[0].details?.text).toBe('Client timeout'); expect(timeoutError.outcome.issue?.[0].diagnostics).toContain('Request timed out after waiting 100 milliseconds'); }); test('Rejects outstanding promises and emits error when close is called', async () => { // Set up a custom response callback that doesn't respond immediately nextResponseCb = (_message: Hl7Message) => { // Don't send any response, keeping the promise pending return null as any; }; // Start a sendAndWait that will remain pending const pendingPromise = hl7Client.sendAndWait( Hl7Message.parse( 'MSH|^~\\&|ADT1|MCM|LABADT|MCM|198808181126|SECURITY|ADT^A01|MSG00001|P|2.2\r' + 'PID|||PATID1234^5^M11||JONES^WILLIAM^A^III||19610615|M-' ), { timeoutMs: 1000 } // Long timeout to ensure it stays pending ); // Wait a bit to ensure the message is sent and pending await sleep(0); // Listen for error events let closeErrorEvent: Hl7ErrorEvent | undefined = undefined; hl7Client.addEventListener('error', (event) => { if ( event.error instanceof OperationOutcomeError && isOperationOutcome(event.error.outcome) && event.error.outcome.issue?.[0].details?.text === 'Messages were still pending when connection closed' ) { closeErrorEvent = event as Hl7ErrorEvent; } }); let errorFromClose: Error | undefined; // Close the client while message is pending const closePromise = hl7Client.close().catch((err) => { errorFromClose = err; }); // Verify the pending promise was rejected let promiseRejected = false; let rejectionError: OperationOutcomeError | undefined = undefined; try { await pendingPromise; } catch (err) { promiseRejected = true; rejectionError = err as OperationOutcomeError; } expect(promiseRejected).toBe(true); expect(rejectionError).toBeInstanceOf(OperationOutcomeError); expect(isOperationOutcome(rejectionError?.outcome)).toBe(true); expect(rejectionError?.outcome.issue?.[0].code).toBe('incomplete'); expect(rejectionError?.outcome.issue?.[0].details?.text).toBe('Message was still pending when connection closed'); // Close itself should not emit any errors await closePromise; expect(errorFromClose).toBeUndefined(); // Verify the error event was emitted expect(closeErrorEvent).toBeDefined(); expect( ((closeErrorEvent as unknown as Hl7ErrorEvent)?.error as OperationOutcomeError)?.outcome?.issue?.[0].details ?.text ).toBe('Messages were still pending when connection closed'); expect( ((closeErrorEvent as unknown as Hl7ErrorEvent)?.error as OperationOutcomeError)?.outcome?.issue?.[0].diagnostics ).toContain('Hl7Connection closed while 1 messages were pending'); }); test('Sending a message without a message control ID rejects', async () => { const message = Hl7Message.parse( 'MSH|^~\\&|ADT1|MCM|LABADT|MCM|198808181126|SECURITY|ADT^A01|MSG00001|P|2.2\r' + 'PID|||PATID1234^5^M11||JONES^WILLIAM^A^III||19610615|M-' ); message.getSegment('MSH')?.setField(10, ''); let threw = false; try { await hl7Client.sendAndWait(message); } catch (err) { threw = true; expect(err).toBeInstanceOf(OperationOutcomeError); expect(isOperationOutcome((err as OperationOutcomeError).outcome)).toBe(true); expect((err as OperationOutcomeError).outcome.issue?.[0].details?.text).toBe('Required field missing: MSH.10'); } expect(threw).toStrictEqual(true); }); test.each(['AA', 'AE', 'AR', 'CA', 'CE', 'CR'] as const satisfies AckCode[])( 'Returns on %s when returnAck is specified as ReturnAckCategory.FIRST', async (ackCode) => { nextResponseCb = (message: Hl7Message) => { return message.buildAck({ ackCode }); }; const response = await hl7Client.sendAndWait( Hl7Message.parse( 'MSH|^~\\&|ADT1|MCM|LABADT|MCM|198808181126|SECURITY|ADT^A01|MSG00001|P|2.2\r' + 'PID|||PATID1234^5^M11||JONES^WILLIAM^A^III||19610615|M-' ), { returnAck: ReturnAckCategory.FIRST } ); expect(response).toBeDefined(); // Should return on the first ACK expect(response.getSegment('MSA')?.getField(1)?.toString()).toStrictEqual(ackCode); } ); test.each(['AA', 'AE', 'AR', 'CE', 'CR'] as const)( 'Returns on %s when returnAck is specified as ReturnAckCategory.APPLICATION', async (ackCode) => { // Set up a custom response callback that sends the specific ACK code nextResponseCb = (message: Hl7Message) => { return message.buildAck({ ackCode }); }; const response = await hl7Client.sendAndWait( Hl7Message.parse( 'MSH|^~\\&|ADT1|MCM|LABADT|MCM|198808181126|SECURITY|ADT^A01|MSG00001|P|2.2\r' + 'PID|||PATID1234^5^M11||JONES^WILLIAM^A^III||19610615|M-' ), { returnAck: ReturnAckCategory.APPLICATION } ); expect(response).toBeDefined(); expect(response.getSegment('MSA')?.getField(1)?.toString()).toStrictEqual(ackCode); } ); test('Does not return on CA when returnAck is specified as ReturnAckCategory.APPLICATION', async () => { // Set up a custom response callback that sends a CA ACK nextResponseCb = (message: Hl7Message) => { return message.buildAck({ ackCode: 'CA' }); }; let timedOut = false; try { await hl7Client.sendAndWait( Hl7Message.parse( 'MSH|^~\\&|ADT1|MCM|LABADT|MCM|198808181126|SECURITY|ADT^A01|MSG00001|P|2.2\r' + 'PID|||PATID1234^5^M11||JONES^WILLIAM^A^III||19610615|M-' ), { returnAck: 'application', timeoutMs: 100 } ); } catch (err) { if ( isOperationOutcome((err as OperationOutcomeError).outcome) && (err as OperationOutcomeError).outcome.issue?.[0].details?.text === 'Client timeout' ) { timedOut = true; } else { throw err; } } expect(timedOut).toStrictEqual(true); }); }); // Test the basic connection and timeout functionality test('Connection timeout when server is unreachable', async () => { // Use a port where no server is running const unreachablePort = getRandomPort(); // Create client with a short timeout (500ms) const client = new Hl7Client({ host: 'localhost', port: unreachablePort, connectTimeout: 500, }); // Attempt to connect should fail with timeout error await expect(client.connect()).rejects.toThrow(); // Cleanup await client.close(); }); // Test sending to a non-responsive server test('Connection timeout when server does not respond', async () => { const port = getRandomPort(); // Create client with a short timeout const client = new Hl7Client({ host: '10.255.255.1', // Use an unreachable IP address port, connectTimeout: 500, }); // Attempt to connect should fail with timeout error await expect(client.connect()).rejects.toThrow('Connection timeout after 500ms'); // Close the connection await client.close(); }, 1000); // Test cancelling a connection attempt test('Cancel connection attempt', async () => { const port = getRandomPort(); // Create a server that delays accepting connections const state = { pendingSocket: undefined as Socket | undefined, }; const slowServer = createServer((socket) => { state.pendingSocket = socket; // We'll handle the socket manually later }); // Start the server await new Promise<void>((resolve) => { slowServer.listen(port, () => resolve()); }); // Create client with a long timeout const client = new Hl7Client({ host: 'localhost', port, connectTimeout: 1000, // Long enough that we can cancel before it times out }); let err: Error | undefined; // Start connection but don't await it const pendingConnectPromise = client.connect().catch((_err) => { err = _err; }); expect(pendingConnectPromise).toBeDefined(); // Close the client immediately to cancel the connection await client.close(); expect(err).toStrictEqual(new Error('Client closed while connecting')); // Stop the server await new Promise<void>((resolve) => { slowServer.close(() => resolve()); }); // Clean up any pending socket if (state.pendingSocket && !state.pendingSocket.destroyed) { state.pendingSocket.destroy(); } }, 2000); // Test making multiple connection attempts in succession test('Multiple connection attempts do not create parallel connections', async () => { const port = getRandomPort(); // Track connection count const state = { maxParallelConnections: 0, currentConnections: 0, }; // Create a server that tracks connection counts const server = createServer((socket) => { state.currentConnections++; state.maxParallelConnections = Math.max(state.maxParallelConnections, state.currentConnections); socket.on('close', () => { state.currentConnections--; }); }); // Start the server await new Promise<void>((resolve) => { server.listen(port, resolve); }); // Create client with a moderate timeout const client = new Hl7Client({ host: 'localhost', port, connectTimeout: 1000, }); const connectionPromise = client.connect(); // Make multiple connection attempts in rapid succession // Wait for all connection attempts to complete or fail const results = await Promise.allSettled([client.connect(), client.connect(), client.connect()]); // Get resolved connection from first promise for comparison with all the other results const connection = await connectionPromise; // All attempts to connect should resolve to the same connection expect(results).toMatchObject([ expect.objectContaining({ status: 'fulfilled', value: connection }), expect.objectContaining({ status: 'fulfilled', value: connection }), expect.objectContaining({ status: 'fulfilled', value: connection }), ]); // Give some time for the server side listener to be invoked await sleep(500); // Sleep so that events can fire await sleep(0); // Cleanup await client.close(); // Stop the server await new Promise<void>((resolve) => { server.close(() => resolve()); }); // The fix should ensure only one connection is active at a time expect(state.maxParallelConnections).toBe(1); }); // Test successful connection after timeout test('Can connect again after a timeout', async () => { const port = getRandomPort(); // Create a client with a very short timeout const client = new Hl7Client({ host: 'localhost', port, connectTimeout: 100, }); // First connection should fail (no server) await expect(client.connect()).rejects.toThrow(); // Now start a server const server = new Hl7Server((connection) => { connection.addEventListener('message', ({ message }) => { connection.send(message.buildAck()); }); }); await server.start(port); // Second connection attempt should succeed await client.connect(); // Test that the connection works by sending a message const response = await client.sendAndWait( Hl7Message.parse( 'MSH|^~\\&|ADT1|MCM|LABADT|MCM|198808181126|SECURITY|ADT^A01|MSG00001|P|2.2\r' + 'PID|||PATID1234^5^M11||JONES^WILLIAM^A^III||19610615|M-' ) ); expect(response).toBeDefined(); // Cleanup await client.close(); await server.stop(); }); // Test case for reusing connection test('Reuses connection if already connected', async () => { const port = getRandomPort(); // Track connection count let connectionCount = 0; // Create a server that tracks connection counts const server = new Hl7Server((connection) => { connectionCount++; connection.addEventListener('message', ({ message }) => { connection.send(message.buildAck()); }); }); await server.start(port); // Create client const client = new Hl7Client({ host: 'localhost', port, }); // Connect multiple times in sequence const conn1 = await client.connect(); const conn2 = await client.connect(); // Should reuse the same connection expect(conn1).toBe(conn2); // Send a message to verify the connection is working const response = await client.sendAndWait( Hl7Message.parse( 'MSH|^~\\&|ADT1|MCM|LABADT|MCM|198808181126|SECURITY|ADT^A01|MSG00001|P|2.2\r' + 'PID|||PATID1234^5^M11||JONES^WILLIAM^A^III||19610615|M-' ) ); expect(response).toBeDefined(); // Cleanup await client.close(); await server.stop(); // Should only have seen one connection expect(connectionCount).toBe(1); }); test('Creates new connection whenever connection is closed from other side and a new message is sent', async () => { const port = getRandomPort(); let serverSideConnection!: Hl7Connection; // Create a server that tracks connection counts const server = new Hl7Server((connection) => { serverSideConnection = connection; connection.addEventListener('message', ({ message }) => { connection.send(message.buildAck()); }); }); await server.start(port); // Create client with keepAlive = true const client = new Hl7Client({ host: 'localhost', port, keepAlive: true, }); let ack = await client.sendAndWait( Hl7Message.parse( 'MSH|^~\\&|ADT1|MCM|LABADT|MCM|198808181126|SECURITY|ADT^A01|MSG00001|P|2.2\r' + 'PID|||PATID1234^5^M11||JONES^WILLIAM^A^III||19610615|M-' ) ); expect(ack).toBeDefined(); await serverSideConnection.close(); // We need to wait until next tick for client-side to close their connection in response to server-side closing // Otherwise when we go to send, the first tick the connection won't show as closed await sleep(0); // Should succeed ack = await client.sendAndWait( Hl7Message.parse( 'MSH|^~\\&|ADT1|MCM|LABADT|MCM|198808181126|SECURITY|ADT^A01|MSG00001|P|2.2\r' + 'PID|||PATID1234^5^M11||JONES^WILLIAM^A^III||19610615|M-' ) ); expect(ack).toBeDefined(); await client.close(); await server.stop(); }); describe('Using MockSocket', () => { test('Does not fail many calls to sendAndWait in a row when connection is broken', async () => { // Scenario: Disconnected client gets a lot of calls to sendAndWait at once // Previously we would keep aborting connection attempts every time another call to `sendAndWait` is made since we always call connect // This test makes sure that if we call `sendAndWait` a lot that the one pending connection attempt is reused and all the calls resolve instead of all rejecting // Except for the last call which finally connects const clientMockSocket = new MockSocket(); const serverMockSocket = new MockSocket(); const mockServer = new MockServer(); // Mock connect and createServer so we can wait to connect with a delay jest.spyOn(net, 'connect').mockImplementation(() => clientMockSocket as unknown as net.Socket); jest.spyOn(net, 'createServer').mockImplementation((( connectionListener?: (socket: net.Socket) => void ): net.Server => { mockServer.connectionListener = connectionListener as any; return mockServer as unknown as net.Server; }) as any); const hl7Server = new Hl7Server((connection) => { connection.addEventListener('message', (event) => { connection.send(event.message.buildAck()); }); }); await hl7Server.start(9001); // Create client with keepAlive = true const client = new Hl7Client({ host: 'localhost', port: 9001, // Port doesn't matter since we are mocking socket keepAlive: true, }); const promises = []; for (let i = 0; i < 6; i++) { promises.push( client.sendAndWait( Hl7Message.parse( `MSH|^~\\&|ADT1|MCM|LABADT|MCM|198808181126|SECURITY|ADT^A01|MSG0000${i + 1}|P|2.2\r` + 'PID|||PATID1234^5^M11||JONES^WILLIAM^A^III||19610615|M-' ) ) ); await sleep(50); } // Delayed connect for client after all calls to sendAndWait mockServer.mockConnect(clientMockSocket, serverMockSocket); await Promise.all(promises); await client.close(); await hl7Server.stop(); }); }); });

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/medplum/medplum'

If you have feedback or need assistance with the MCP directory API, please join our Discord server