Skip to main content
Glama
index.test.ts51.3 kB
// SPDX-FileCopyrightText: Copyright Orangebot, Inc. and Medplum contributors // SPDX-License-Identifier: Apache-2.0 import type { Bundle, Communication, Parameters, Subscription, SubscriptionChannel, SubscriptionStatus, } from '@medplum/fhirtypes'; import { WS } from 'jest-websocket-mock'; import type { SubscriptionEventMap } from '.'; import { resourceMatchesSubscriptionCriteria, SubscriptionEmitter, SubscriptionManager } from '.'; import { MockMedplumClient } from '../client-test-utils'; import { generateId } from '../crypto'; import { Logger, LogLevel } from '../logger'; import { OperationOutcomeError } from '../outcomes'; import { createReference, sleep } from '../utils'; import { ReconnectingWebSocket } from '../websockets/reconnecting-websocket'; import { sendHandshakeBundle, sendSubscriptionMessage } from './test-utils'; const ONE_HOUR = 60 * 60 * 1000; const MOCK_SUBSCRIPTION_ID = '7b081dd8-a2d2-40dd-9596-58a7305a73b0'; const SECOND_SUBSCRIPTION_ID = '0474fa07-b98a-4172-b430-5ae234c95222'; const medplum = new MockMedplumClient(); medplum.addNextResourceId(MOCK_SUBSCRIPTION_ID); describe('ReconnectingWebSocket', () => { let wsServer: WS; let reconnectingWebSocket: ReconnectingWebSocket; beforeEach(() => { wsServer = new WS('wss://example.com/ws/subscriptions-r4', { jsonProtocol: true }); reconnectingWebSocket = new ReconnectingWebSocket('wss://example.com/ws/subscriptions-r4'); }); afterEach(async () => { reconnectingWebSocket.close(); await sleep(0); WS.clean(); }); test('.close()', async () => { await wsServer.connected; expect(reconnectingWebSocket.readyState).toStrictEqual(WebSocket.OPEN); reconnectingWebSocket.close(); expect(reconnectingWebSocket.readyState).not.toStrictEqual(WebSocket.OPEN); }); test('Getting readyState of underlying WebSocket', async () => { expect(reconnectingWebSocket.readyState).toStrictEqual(WebSocket.CONNECTING); await wsServer.connected; expect(reconnectingWebSocket.readyState).toStrictEqual(WebSocket.OPEN); reconnectingWebSocket.close(); expect(reconnectingWebSocket.readyState).toStrictEqual(WebSocket.CLOSING); await wsServer.closed; expect(reconnectingWebSocket.readyState).toStrictEqual(WebSocket.CLOSED); }); test('Sending before WebSocket is connected', async () => { expect(() => reconnectingWebSocket.send(JSON.stringify({ hello: 'medplum' }))).not.toThrow(); expect(() => reconnectingWebSocket.send(JSON.stringify({ med: 'plum' }))).not.toThrow(); // Test that open is fired await new Promise<void>((resolve) => { reconnectingWebSocket.addEventListener('open', () => { resolve(); }); }); await expect(wsServer).toReceiveMessage({ hello: 'medplum' }); await expect(wsServer).toReceiveMessage({ med: 'plum' }); }); test('Wait for `open` before sending', async () => { // Test that open is fired await new Promise<void>((resolve) => { reconnectingWebSocket.addEventListener('open', () => { resolve(); }); }); expect(() => reconnectingWebSocket.send(JSON.stringify({ hello: 'medplum' }))).not.toThrow(); await expect(wsServer).toReceiveMessage({ hello: 'medplum' }); expect(() => reconnectingWebSocket.send(JSON.stringify({ med: 'plum' }))).not.toThrow(); await expect(wsServer).toReceiveMessage({ med: 'plum' }); }); test('Should emit `message` when message received from WebSocket', async () => { await wsServer.connected; const receivedEvent = await new Promise<MessageEvent>((resolve) => { reconnectingWebSocket.addEventListener('message', (event) => { resolve(event as MessageEvent); }); wsServer.send({ med: 'plum' }); }); expect(receivedEvent?.type).toStrictEqual('message'); expect(receivedEvent?.data).toBeDefined(); expect(JSON.parse(receivedEvent.data)).toStrictEqual({ med: 'plum' }); }); test('Should emit `error` when error received from WebSocket', async () => { await wsServer.connected; const receivedEvent = await new Promise<ErrorEvent>((resolve) => { reconnectingWebSocket.addEventListener('error', (event) => { resolve(event as ErrorEvent); }); wsServer.error(); }); expect(receivedEvent?.type).toStrictEqual('error'); }); test('Should emit `close` when server closes connection', async () => { await wsServer.connected; const receivedEvent = await new Promise<CloseEvent>((resolve) => { reconnectingWebSocket.addEventListener('close', (event) => { resolve(event as CloseEvent); }); wsServer.close(); }); expect(receivedEvent?.type).toStrictEqual('close'); }); }); describe('SubscriptionEmitter', () => { test('getCriteria()', () => { const emitter = new SubscriptionEmitter(); expect(emitter.getCriteria().size).toStrictEqual(0); emitter._addCriteria('Communication'); expect(emitter.getCriteria().size).toStrictEqual(1); // Should be able to add again without changing count emitter._addCriteria('Communication'); expect(emitter.getCriteria().size).toStrictEqual(1); emitter._addCriteria('DiagnosticReport'); expect(emitter.getCriteria().size).toStrictEqual(2); emitter._removeCriteria('DiagnosticReport'); expect(emitter.getCriteria().size).toStrictEqual(1); }); }); describe('SubscriptionManager', () => { describe('Constructor', () => { let wsServer: WS; let defaultManager: SubscriptionManager; beforeEach(() => { wsServer = new WS('wss://example.com/ws/subscriptions-r4', { jsonProtocol: true }); defaultManager = new SubscriptionManager(medplum, 'wss://example.com/ws/subscriptions-r4'); }); afterEach(async () => { defaultManager.closeWebSocket(); await sleep(0); WS.clean(); }); test('should throw if not passed a `MedplumClient`', () => { // @ts-expect-error Invalid value for `medplum` expect(() => new SubscriptionManager(undefined, 'wss://example.com/ws/subscriptions-r4')).toThrow( OperationOutcomeError ); }); test('should throw if `wsUrl` is not a URL or URL string', async () => { // @ts-expect-error Invalid value for `wsUrl` expect(() => new SubscriptionManager(medplum, undefined)).toThrow(OperationOutcomeError); // @ts-expect-error Invalid value for `wsUrl` expect(() => new SubscriptionManager(medplum, new WebSocket('wss://example.com/ws/subscriptions-r4'))).toThrow( OperationOutcomeError ); }); test('should NOT throw if `wsUrl` is a VALID URL or URL string', async () => { const manager1 = new SubscriptionManager(medplum, 'wss://example.com/ws/subscriptions-r4'); expect(manager1).toBeDefined(); await wsServer.connected; const manager2 = new SubscriptionManager(medplum, new URL('wss://example.com/ws/subscriptions-r4')); expect(manager2).toBeDefined(); await wsServer.connected; }); test('should throw if `wsUrl` is an INVALID URL string', () => { expect(() => new SubscriptionManager(medplum, 'abc123')).toThrow(OperationOutcomeError); }); }); describe('addCriteria()', () => { let wsServer: WS; let defaultManager: SubscriptionManager; beforeEach(() => { wsServer = new WS('wss://example.com/ws/subscriptions-r4', { jsonProtocol: true }); defaultManager = new SubscriptionManager(medplum, 'wss://example.com/ws/subscriptions-r4'); }); afterEach(async () => { defaultManager.closeWebSocket(); await sleep(0); WS.clean(); }); beforeAll(() => { medplum.router.addRoute('GET', `fhir/R4/Subscription/${MOCK_SUBSCRIPTION_ID}/$get-ws-binding-token`, () => { return { resourceType: 'Parameters', parameter: [ { name: 'token', valueString: 'token-123', }, { name: 'expiration', valueDateTime: new Date(Date.now() + ONE_HOUR).toISOString(), }, { name: 'websocket-url', valueUrl: 'wss://example.com/ws/subscriptions-r4', }, ], } as Parameters; }); }); test('should add a criteria and receive messages for that criteria', async () => { await wsServer.connected; const emitter = defaultManager.addCriteria('Communication'); expect(emitter).toBeInstanceOf(SubscriptionEmitter); const connectPromise = new Promise<string>((resolve) => { const handler = (event: SubscriptionEventMap['connect']): void => { emitter.removeEventListener('connect', handler); resolve(event.payload.subscriptionId); }; emitter.addEventListener('connect', handler); }); await expect(wsServer).toReceiveMessage({ type: 'bind-with-token', payload: { token: 'token-123' } }); sendHandshakeBundle(wsServer, MOCK_SUBSCRIPTION_ID); const subscriptionId = await connectPromise; expect(typeof subscriptionId).toStrictEqual('string'); const timestamp = new Date().toISOString(); const resource = { resourceType: 'Communication', id: generateId() } as Communication; const sentBundle = { resourceType: 'Bundle', timestamp, type: 'history', entry: [ { resource: { resourceType: 'SubscriptionStatus', type: 'event-notification', subscription: { reference: `Subscription/${subscriptionId}` }, notificationEvent: [{ eventNumber: '0', timestamp, focus: createReference(resource) }], } as SubscriptionStatus, }, { resource, fullUrl: `https://example.com/fhir/R4/Communication/${resource.id}`, }, ], } as Bundle; const receivedBundle = await new Promise<Bundle>((resolve) => { const handler = (event: SubscriptionEventMap['message']): void => { resolve(event.payload); emitter.removeEventListener('message', handler); }; emitter.addEventListener('message', handler); wsServer.send(sentBundle); }); expect(receivedBundle).toStrictEqual(sentBundle); }); test('should emit `error` when token or url missing from `Subscription/$get-ws-binding-token` operation', async () => { const originalError = console.error; console.error = jest.fn(); const manager1 = new SubscriptionManager(medplum, 'wss://example.com/ws/subscriptions-r4'); await wsServer.connected; medplum.router.addRoute('GET', `fhir/R4/Subscription/${MOCK_SUBSCRIPTION_ID}/$get-ws-binding-token`, () => { return { resourceType: 'Parameters', parameter: [ { name: 'expiration', valueDateTime: new Date(Date.now() + ONE_HOUR).toISOString(), }, { name: 'websocket-url', valueUrl: 'wss://example.com/ws/subscriptions-r4', }, ], } as Parameters; }); const criteriaEmitter1 = manager1.addCriteria('Communication'); const [masterEvent1, criteriaEvent1] = await new Promise<SubscriptionEventMap['error'][]>((resolve, reject) => { const promises = []; promises.push( new Promise<SubscriptionEventMap['error']>((resolve) => { manager1.getMasterEmitter().addEventListener('error', (event) => { resolve(event); }); }) ); promises.push( new Promise<SubscriptionEventMap['error']>((resolve) => { criteriaEmitter1.addEventListener('error', (event) => { resolve(event); }); }) ); Promise.all(promises).then(resolve).catch(reject); }); expect(masterEvent1?.type).toStrictEqual('error'); expect(masterEvent1?.payload).toBeInstanceOf(OperationOutcomeError); expect(criteriaEvent1?.type).toStrictEqual('error'); expect(criteriaEvent1?.payload).toBeInstanceOf(OperationOutcomeError); medplum.router.addRoute('GET', `fhir/R4/Subscription/${MOCK_SUBSCRIPTION_ID}/$get-ws-binding-token`, () => { return { resourceType: 'Parameters', parameter: [ { name: 'token', valueString: 'token-123', }, { name: 'expiration', valueDateTime: new Date(Date.now() + ONE_HOUR).toISOString(), }, ], } as Parameters; }); const manager2 = new SubscriptionManager(medplum, 'wss://example.com/ws/subscriptions-r4'); await wsServer.connected; const criteriaEmitter2 = manager2.addCriteria('Communication'); const [masterEvent2, criteriaEvent2] = await new Promise<SubscriptionEventMap['error'][]>((resolve, reject) => { const promises = []; promises.push( new Promise<SubscriptionEventMap['error']>((resolve) => { manager2.getMasterEmitter().addEventListener('error', (event) => { resolve(event); }); }) ); promises.push( new Promise<SubscriptionEventMap['error']>((resolve) => { criteriaEmitter2.addEventListener('error', (event) => { resolve(event); }); }) ); Promise.all(promises).then(resolve).catch(reject); }); expect(masterEvent2?.type).toStrictEqual('error'); expect(masterEvent2?.payload).toBeInstanceOf(OperationOutcomeError); expect(criteriaEvent2?.type).toStrictEqual('error'); expect(criteriaEvent2?.payload).toBeInstanceOf(OperationOutcomeError); expect(console.error).toHaveBeenCalledTimes(2); console.error = originalError; manager1.closeWebSocket(); manager2.closeWebSocket(); }); test('should track separate `Subscription` resources for same criteria with different `subscriptionProps`', async () => { const originalWarn = console.warn; console.warn = jest.fn(); await wsServer.connected; medplum.router.addRoute('GET', `fhir/R4/Subscription/${MOCK_SUBSCRIPTION_ID}/$get-ws-binding-token`, () => { return { resourceType: 'Parameters', parameter: [ { name: 'token', valueString: 'token-123', }, { name: 'expiration', valueDateTime: new Date(Date.now() + ONE_HOUR).toISOString(), }, { name: 'websocket-url', valueUrl: 'wss://example.com/ws/subscriptions-r4', }, ], } as Parameters; }); medplum.router.addRoute('GET', `fhir/R4/Subscription/${SECOND_SUBSCRIPTION_ID}/$get-ws-binding-token`, () => { return { resourceType: 'Parameters', parameter: [ { name: 'token', valueString: 'token-123', }, { name: 'expiration', valueDateTime: new Date(Date.now() + ONE_HOUR).toISOString(), }, { name: 'websocket-url', valueUrl: 'wss://example.com/ws/subscriptions-r4', }, ], } as Parameters; }); defaultManager.addCriteria('Communication'); medplum.addNextResourceId(SECOND_SUBSCRIPTION_ID); defaultManager.addCriteria('Communication', { extension: [ { url: 'https://medplum.com/fhir/StructureDefinition/subscription-supported-interaction', valueCode: 'create', }, ], }); expect(defaultManager.getCriteriaCount()).toStrictEqual(2); defaultManager.removeCriteria('Communication'); expect(defaultManager.getCriteriaCount()).toStrictEqual(1); defaultManager.removeCriteria('Communication', { extension: [ { url: 'https://medplum.com/fhir/StructureDefinition/subscription-supported-interaction', valueCode: 'create', }, ], }); expect(defaultManager.getCriteriaCount()).toStrictEqual(0); expect(console.warn).toHaveBeenCalledTimes(2); console.warn = originalWarn; medplum.addNextResourceId(MOCK_SUBSCRIPTION_ID); }); }); describe('removeCriteria()', () => { let wsServer: WS; let emitter: SubscriptionEmitter; let defaultManager: SubscriptionManager; beforeAll(async () => { medplum.router.addRoute('GET', `fhir/R4/Subscription/${MOCK_SUBSCRIPTION_ID}/$get-ws-binding-token`, () => { return { resourceType: 'Parameters', parameter: [ { name: 'token', valueString: 'token-123', }, { name: 'expiration', valueDateTime: new Date(Date.now() + ONE_HOUR).toISOString(), }, { name: 'websocket-url', valueUrl: 'wss://example.com/ws/subscriptions-r4', }, ], } as Parameters; }); wsServer = new WS('wss://example.com/ws/subscriptions-r4', { jsonProtocol: true }); defaultManager = new SubscriptionManager(medplum, 'wss://example.com/ws/subscriptions-r4'); await wsServer.connected; emitter = defaultManager.addCriteria('Communication'); expect(emitter).toBeInstanceOf(SubscriptionEmitter); const connectPromise = new Promise<string>((resolve) => { const handler = (event: SubscriptionEventMap['connect']): void => { emitter.removeEventListener('connect', handler); resolve(event.payload.subscriptionId); }; emitter.addEventListener('connect', handler); }); await expect(wsServer).toReceiveMessage({ type: 'bind-with-token', payload: { token: 'token-123' } }); sendHandshakeBundle(wsServer, MOCK_SUBSCRIPTION_ID); const subscriptionId = await connectPromise; expect(typeof subscriptionId).toStrictEqual('string'); }); afterAll(async () => { defaultManager.closeWebSocket(); wsServer.close(); await wsServer.closed; WS.clean(); }); test('should not throw when remove has been called on a criteria that is not known', () => { const originalWarn = console.warn; console.warn = jest.fn(); expect(() => defaultManager.removeCriteria('DiagnosticReport')).not.toThrow(); expect(console.warn).toHaveBeenCalledTimes(1); console.warn = originalWarn; }); test('should not clean up a criteria if there are outstanding listeners', async () => { let success = false; const emitter = defaultManager.addCriteria('Communication'); expect(emitter).toBeInstanceOf(SubscriptionEmitter); let receivedDisconnect = false; const handler = (): void => { emitter.removeEventListener('disconnect', handler); if (!success) { receivedDisconnect = true; } }; emitter.addEventListener('disconnect', handler); defaultManager.removeCriteria('Communication'); expect(wsServer).not.toHaveReceivedMessages([{ type: 'unbind-from-token', payload: { token: 'token-123' } }]); emitter.removeEventListener('disconnect', handler); success = true; if (receivedDisconnect) { throw new Error('Received `disconnect` when not expected'); } }); test('should clean up for a criteria if we are the last subscriber', async () => { let success = false; const handler = (): void => { emitter.removeEventListener('disconnect', handler); expect(true).toBeTruthy(); success = true; }; emitter.addEventListener('disconnect', handler); defaultManager.removeCriteria('Communication'); await expect(wsServer).toReceiveMessage({ type: 'unbind-from-token', payload: { token: 'token-123' } }); emitter.removeEventListener('disconnect', handler); if (!success) { throw new Error('Expected to receive `disconnect` message'); } }); }); describe('getCriteriaCount()', () => { let wsServer: WS; let defaultManager: SubscriptionManager; beforeAll(() => { medplum.router.addRoute('GET', `fhir/R4/Subscription/${MOCK_SUBSCRIPTION_ID}/$get-ws-binding-token`, () => { return { resourceType: 'Parameters', parameter: [ { name: 'token', valueString: 'token-123', }, { name: 'expiration', valueDateTime: new Date(Date.now() + ONE_HOUR).toISOString(), }, { name: 'websocket-url', valueUrl: 'wss://example.com/ws/subscriptions-r4', }, ], } as Parameters; }); wsServer = new WS('wss://example.com/ws/subscriptions-r4'); defaultManager = new SubscriptionManager(medplum, 'wss://example.com/ws/subscriptions-r4'); }); afterAll(async () => { defaultManager.closeWebSocket(); wsServer.close(); await wsServer.closed; WS.clean(); }); test('should return the correct amount of criteria', async () => { const originalWarn = console.warn; console.warn = jest.fn(); await wsServer.connected; expect(defaultManager.getCriteriaCount()).toStrictEqual(0); defaultManager.addCriteria('Communication'); expect(defaultManager.getCriteriaCount()).toStrictEqual(1); defaultManager.addCriteria('Communication', { extension: [ { url: 'https://medplum.com/fhir/StructureDefinition/subscription-supported-interaction', valueCode: 'create', }, ], }); expect(defaultManager.getCriteriaCount()).toStrictEqual(2); defaultManager.removeCriteria('Communication'); expect(defaultManager.getCriteriaCount()).toStrictEqual(1); defaultManager.removeCriteria('Communication', { extension: [ { url: 'https://medplum.com/fhir/StructureDefinition/subscription-supported-interaction', valueCode: 'create', }, ], }); expect(defaultManager.getCriteriaCount()).toStrictEqual(0); expect(console.warn).toHaveBeenCalledTimes(2); console.warn = originalWarn; }); }); describe('closeWebSocket()', () => { let wsServer: WS; let defaultManager: SubscriptionManager; beforeEach(() => { wsServer = new WS('wss://example.com/ws/subscriptions-r4', { jsonProtocol: true }); defaultManager = new SubscriptionManager(medplum, 'wss://example.com/ws/subscriptions-r4'); }); afterEach(async () => { defaultManager.closeWebSocket(); wsServer.close(); await wsServer.closed; WS.clean(); }); test('should close websocket and emit `close` when called', async () => { await wsServer.connected; const criteriaEmitter = defaultManager.addCriteria('Communication'); const [masterEvent, criteriaEvent] = await new Promise<SubscriptionEventMap['close'][]>((resolve, reject) => { const promises = []; promises.push( new Promise<SubscriptionEventMap['close']>((resolve) => { defaultManager.getMasterEmitter().addEventListener('close', (event) => { resolve(event); }); }) ); promises.push( new Promise<SubscriptionEventMap['close']>((resolve) => { criteriaEmitter.addEventListener('close', (event) => { resolve(event); }); }) ); expect(() => defaultManager.closeWebSocket()).not.toThrow(); Promise.all(promises).then(resolve).catch(reject); }); await wsServer.closed; expect(masterEvent?.type).toStrictEqual('close'); expect(criteriaEvent?.type).toStrictEqual('close'); }); test('should not emit close twice', async () => { await wsServer.connected; const event = await new Promise<SubscriptionEventMap['close']>((resolve) => { defaultManager.getMasterEmitter().addEventListener('close', (event) => { resolve(event); }); expect(() => defaultManager.closeWebSocket()).not.toThrow(); }); expect(event?.type).toStrictEqual('close'); await wsServer.closed; await new Promise<void>((resolve, reject) => { defaultManager.getMasterEmitter().addEventListener('close', () => { reject(new Error('Expected not to call')); }); expect(() => defaultManager.closeWebSocket()).not.toThrow(); setTimeout(() => resolve(), 250); }); await wsServer.closed; }); }); describe('getMasterEmitter()', () => { test('should always get the same emitter', async () => { const wsServer = new WS('wss://example.com/ws/subscriptions-r4'); const manager = new SubscriptionManager(medplum, 'wss://example.com/ws/subscriptions-r4'); await wsServer.connected; expect(manager.getMasterEmitter()).toStrictEqual(manager.getMasterEmitter()); wsServer.close(); await wsServer.closed; WS.clean(); }); }); describe('Scenarios', () => { let medplum: MockMedplumClient; let wsServer: WS; let defaultManager: SubscriptionManager; beforeEach(() => { medplum = new MockMedplumClient(); medplum.addNextResourceId(MOCK_SUBSCRIPTION_ID); medplum.router.addRoute('GET', `fhir/R4/Subscription/${MOCK_SUBSCRIPTION_ID}/$get-ws-binding-token`, () => { return { resourceType: 'Parameters', parameter: [ { name: 'token', valueString: 'token-123', }, { name: 'expiration', valueDateTime: new Date(Date.now() + ONE_HOUR).toISOString(), }, { name: 'websocket-url', valueUrl: 'wss://example.com/ws/subscriptions-r4', }, ], } as Parameters; }); medplum.router.addRoute('GET', `fhir/R4/Subscription/${SECOND_SUBSCRIPTION_ID}/$get-ws-binding-token`, () => { return { resourceType: 'Parameters', parameter: [ { name: 'token', valueString: 'token-123', }, { name: 'expiration', valueDateTime: new Date(Date.now() + ONE_HOUR).toISOString(), }, { name: 'websocket-url', valueUrl: 'wss://example.com/ws/subscriptions-r4', }, ], } as Parameters; }); defaultManager = new SubscriptionManager(medplum, 'wss://example.com/ws/subscriptions-r4'); wsServer = new WS('wss://example.com/ws/subscriptions-r4', { jsonProtocol: true }); }); afterEach(async () => { defaultManager.closeWebSocket(); wsServer.close(); await wsServer.closed; WS.clean(); }); test("should warn when receiving notification for subscription we aren't expecting", async () => { const originalWarn = console.warn; console.warn = jest.fn(); // @ts-expect-error We don't use defaultManager const _manager = new SubscriptionManager(medplum, 'wss://example.com/ws/subscriptions-r4'); await wsServer.connected; const timestamp = new Date().toISOString(); const resource = { resourceType: 'Communication', id: generateId() } as Communication; const sentBundle = { resourceType: 'Bundle', timestamp, type: 'history', entry: [ { resource: { resourceType: 'SubscriptionStatus', type: 'event-notification', subscription: { reference: `Subscription/${MOCK_SUBSCRIPTION_ID}` }, notificationEvent: [{ eventNumber: '0', timestamp, focus: createReference(resource) }], } as SubscriptionStatus, }, { resource, fullUrl: `https://example.com/fhir/R4/Communication/${resource.id}`, }, ], } as Bundle; wsServer.send(sentBundle); expect(console.warn).toHaveBeenCalled(); console.warn = originalWarn; }); test('should emit `heartbeat` event when heartbeat received', async () => { await wsServer.connected; const timestamp = new Date().toISOString(); const sentBundle = { resourceType: 'Bundle', timestamp, type: 'history', entry: [ { resource: { resourceType: 'SubscriptionStatus', status: 'active', type: 'heartbeat', subscription: { reference: `Subscription/${MOCK_SUBSCRIPTION_ID}` }, } as SubscriptionStatus, }, ], } as Bundle; const receivedBundle = await new Promise<Bundle>((resolve) => { const emitter = defaultManager.getMasterEmitter(); emitter.addEventListener('heartbeat', (event) => { resolve(event.payload); }); wsServer.send(sentBundle); }); expect(receivedBundle).toStrictEqual(sentBundle); }); test('should emit `error` event when invalid message comes in over WebSocket', async () => { const originalError = console.error; console.error = jest.fn(); await wsServer.connected; const emitter = defaultManager.addCriteria('Communication'); const [receivedEvent1, receivedEvent2] = await new Promise<SubscriptionEventMap['error'][]>((resolve, reject) => { const promises = []; promises.push( new Promise<SubscriptionEventMap['error']>((resolve) => { defaultManager.getMasterEmitter().addEventListener('error', (event) => { resolve(event); }); }) ); promises.push( new Promise<SubscriptionEventMap['error']>((resolve) => { emitter.addEventListener('error', (event) => { resolve(event); }); }) ); wsServer.send('invalid_json'); Promise.all(promises).then(resolve).catch(reject); }); expect(receivedEvent1?.type).toStrictEqual('error'); expect(receivedEvent1?.payload).toBeInstanceOf(TypeError); expect(receivedEvent1?.payload?.message).toMatch(/^Cannot read properties of undefined*/); expect(receivedEvent2?.type).toStrictEqual('error'); expect(receivedEvent2?.payload).toBeInstanceOf(TypeError); expect(receivedEvent2?.payload?.message).toMatch(/^Cannot read properties of undefined*/); expect(console.error).toHaveBeenCalledTimes(1); console.error = originalError; }); test('should reconnect after WebSocket disconnects and receive messages', async () => { // TODO: Figure out why we are getting so many warnings around receiving messages for unknown subscriptions // This seems to happen only when running the whole test suite // In isolation, this console.warn mock reports 0 calls :thinking-face: console.warn = jest.fn(); const receivedEvent1Promise = new Promise<SubscriptionEventMap['open']>((resolve) => { defaultManager.getMasterEmitter().addEventListener('open', (event) => { resolve(event); }); }); await wsServer.connected; const receivedEvent1 = await receivedEvent1Promise; expect(receivedEvent1?.type).toStrictEqual('open'); const receivedEvent2Promise = new Promise<SubscriptionEventMap['connect']>((resolve) => { defaultManager.getMasterEmitter().addEventListener('connect', (event) => { resolve(event); }); }); const emitter = defaultManager.addCriteria('Communication'); expect(defaultManager.getCriteriaCount()).toStrictEqual(1); await expect(wsServer).toReceiveMessage({ type: 'bind-with-token', payload: { token: 'token-123' } }); sendHandshakeBundle(wsServer, MOCK_SUBSCRIPTION_ID); const receivedEvent2 = await receivedEvent2Promise; expect(receivedEvent2.type).toStrictEqual('connect'); const receivedEvent3Promise = new Promise<SubscriptionEventMap['message']>((resolve) => { emitter.addEventListener('message', (event) => { resolve(event); }); }); await sendSubscriptionMessage(wsServer, medplum, MOCK_SUBSCRIPTION_ID, 'Hello, Medplum!'); const receivedEvent3 = await receivedEvent3Promise; expect(receivedEvent3.type).toStrictEqual('message'); const receivedEvent4Promise = new Promise<SubscriptionEventMap['close']>((resolve) => { emitter.addEventListener('close', (event) => { resolve(event); }); }); wsServer.close(); const receivedEvent4 = await receivedEvent4Promise; expect(receivedEvent4?.type).toStrictEqual('close'); await wsServer.closed; WS.clean(); // Make sure count is still 1 after closing WebSocket expect(defaultManager.getCriteriaCount()).toStrictEqual(1); const receivedEvent5Promise = new Promise<SubscriptionEventMap['open']>((resolve) => { emitter.addEventListener('open', (event) => { resolve(event); }); }); // Now we we reopen the server, the emitter should emit `open` wsServer = new WS('wss://example.com/ws/subscriptions-r4', { jsonProtocol: true }); const receivedEvent5 = await receivedEvent5Promise; expect(receivedEvent5?.type).toStrictEqual('open'); await wsServer.connected; const receivedEvent6Promise = new Promise<SubscriptionEventMap['connect']>((resolve) => { emitter.addEventListener('connect', (event) => { resolve(event); }); }); // Make sure we establish the subscription await expect(wsServer).toReceiveMessage({ type: 'bind-with-token', payload: { token: 'token-123' } }); // Give some time to add subscription to list on client await sleep(100); // Then send a handshake message sendHandshakeBundle(wsServer, MOCK_SUBSCRIPTION_ID); // Now check connect was emitted const receivedEvent6 = await receivedEvent6Promise; expect(receivedEvent6.type).toStrictEqual('connect'); expect(typeof receivedEvent6?.payload?.subscriptionId).toStrictEqual('string'); expect(receivedEvent6?.payload?.subscriptionId?.length).toBeGreaterThan(0); // Make sure we get notifications for subscription events const receivedEvent7Promise = new Promise<SubscriptionEventMap['message']>((resolve) => { emitter.addEventListener('message', (event) => { resolve(event); }); }); await sendSubscriptionMessage(wsServer, medplum, MOCK_SUBSCRIPTION_ID, 'Hello, again!'); const receivedEvent7 = await receivedEvent7Promise; expect(receivedEvent7.type).toStrictEqual('message'); }, 30000); test('should emit close when pong not received from server within time limit', async () => { const manager = new SubscriptionManager(medplum, 'wss://example.com/ws/subscriptions-r4', { pingIntervalMs: 500, }); const receivedEvent1Promise = new Promise<SubscriptionEventMap['open']>((resolve) => { manager.getMasterEmitter().addEventListener('open', (event) => { resolve(event); }); }); await wsServer.connected; const receivedEvent1 = await receivedEvent1Promise; expect(receivedEvent1?.type).toStrictEqual('open'); // Wait for ping to be received await expect(wsServer).toReceiveMessage({ type: 'ping' }); const receivedEvent3Promise = new Promise<SubscriptionEventMap['close']>((resolve) => { manager.getMasterEmitter().addEventListener('close', (event) => { resolve(event); }); }); const closeEvent = await receivedEvent3Promise; expect(closeEvent.type).toStrictEqual('close'); }); test('should NOT emit close if server responds with pong', async () => { wsServer.on('connection', (socket) => { socket.on('message', (msg) => { const data = JSON.parse(msg as string) as { type?: string }; if (data?.type === 'ping') { socket.send(JSON.stringify({ type: 'pong' })); } }); }); const manager = new SubscriptionManager(medplum, 'wss://example.com/ws/subscriptions-r4', { pingIntervalMs: 500, }); const receivedEvent1Promise = new Promise<SubscriptionEventMap['open']>((resolve) => { manager.getMasterEmitter().addEventListener('open', (event) => { resolve(event); }); }); await wsServer.connected; const receivedEvent1 = await receivedEvent1Promise; expect(receivedEvent1?.type).toStrictEqual('open'); // Wait for ping to be received await expect(wsServer).toReceiveMessage({ type: 'ping' }); let receivedClose = false; manager.getMasterEmitter().addEventListener('close', () => { receivedClose = true; }); // Then wait for potential close await sleep(1500); expect(receivedClose).toStrictEqual(false); }); test('should reconnect WebSocket and refresh subscriptions when profile changes', async () => { const receivedEvent1Promise = new Promise<SubscriptionEventMap['open']>((resolve) => { defaultManager.getMasterEmitter().addEventListener('open', (event) => { resolve(event); }); }); await wsServer.connected; const receivedEvent1 = await receivedEvent1Promise; expect(receivedEvent1?.type).toStrictEqual('open'); const receivedEvent2Promise = new Promise<SubscriptionEventMap['connect']>((resolve) => { defaultManager.getMasterEmitter().addEventListener('connect', (event) => { resolve(event); }); }); const emitter1 = defaultManager.addCriteria('Communication'); expect(defaultManager.getCriteriaCount()).toStrictEqual(1); await expect(wsServer).toReceiveMessage({ type: 'bind-with-token', payload: { token: 'token-123' } }); sendHandshakeBundle(wsServer, MOCK_SUBSCRIPTION_ID); const receivedEvent2 = await receivedEvent2Promise; expect(receivedEvent2.type).toStrictEqual('connect'); const receivedEvent3Promise = new Promise<SubscriptionEventMap['message']>((resolve) => { emitter1.addEventListener('message', (event) => { resolve(event); }); }); await sendSubscriptionMessage(wsServer, medplum, MOCK_SUBSCRIPTION_ID, 'Hello, Medplum!'); const receivedEvent3 = await receivedEvent3Promise; expect(receivedEvent3.type).toStrictEqual('message'); const receivedEvent4Promise = new Promise<SubscriptionEventMap['close']>((resolve) => { emitter1.addEventListener('close', (event) => { resolve(event); }); }); medplum.setProfile(undefined); const receivedEvent4 = await receivedEvent4Promise; expect(receivedEvent4.type).toStrictEqual('close'); expect(defaultManager.getCriteriaCount()).toStrictEqual(1); await wsServer.closed; WS.clean(); const receivedEvent5Promise = new Promise<SubscriptionEventMap['open']>((resolve) => { emitter1.addEventListener('open', (event) => { resolve(event); }); }); wsServer = new WS('wss://example.com/ws/subscriptions-r4', { jsonProtocol: true }); medplum.addNextResourceId(SECOND_SUBSCRIPTION_ID); // Set profile to a new profile medplum.setProfile({ resourceType: 'Practitioner', id: generateId() }); const receivedEvent5 = await receivedEvent5Promise; expect(receivedEvent5.type).toStrictEqual('open'); const receivedEvent6Promise = new Promise<SubscriptionEventMap['connect']>((resolve) => { defaultManager.getMasterEmitter().addEventListener('connect', (event) => { resolve(event); }); }); const emitter2 = defaultManager.addCriteria('Communication'); expect(defaultManager.getCriteriaCount()).toStrictEqual(1); await expect(wsServer).toReceiveMessage({ type: 'bind-with-token', payload: { token: 'token-123' } }); await sleep(100); sendHandshakeBundle(wsServer, SECOND_SUBSCRIPTION_ID); const receivedEvent6 = await receivedEvent6Promise; expect(receivedEvent6.type).toStrictEqual('connect'); const receivedEvent7Promise = new Promise<SubscriptionEventMap['message']>((resolve) => { emitter2.addEventListener('message', (event) => { resolve(event); }); }); await sendSubscriptionMessage(wsServer, medplum, SECOND_SUBSCRIPTION_ID, 'Hello, Medplum!'); const receivedEvent7 = await receivedEvent7Promise; expect(receivedEvent7.type).toStrictEqual('message'); }); }); }); describe('resourceMatchesSubscriptionCriteria', () => { test.each([ [ { type: 'rest-hook', endpoint: 'Bot/123', }, true, ], [{ type: 'websocket' }, true], [{ type: 'email' }, false], [{ type: 'message' }, false], [{ type: 'sms' }, false], ] as [SubscriptionChannel, boolean][])( 'should return true for a resource that matches the criteria with channel %j', async (subChannel, expectedMatch) => { const subscription: Subscription = { resourceType: 'Subscription', status: 'active', reason: 'test subscription', criteria: 'Communication', channel: subChannel, extension: [ { url: 'https://medplum.com/fhir/StructureDefinition/fhir-path-criteria-expression', valueString: '%previous.status = "in-progress" and %current.status = "completed"', }, { url: 'https://medplum.com/fhir/StructureDefinition/subscription-supported-interaction', valueCode: 'update', }, ], }; const result1 = await resourceMatchesSubscriptionCriteria({ resource: { resourceType: 'Communication', status: 'in-progress', }, subscription, context: { interaction: 'create' }, getPreviousResource: async () => undefined, }); expect(result1).toBe(false); const result2 = await resourceMatchesSubscriptionCriteria({ resource: { resourceType: 'Communication', status: 'completed', }, subscription, context: { interaction: 'update' }, getPreviousResource: async () => ({ resourceType: 'Communication', status: 'in-progress', }), }); expect(result2).toBe(expectedMatch); } ); describe('Account matching logic', () => { const ORGANIZATION_ONE = { reference: 'Organization/123' }; const ORGANIZATION_TWO = { reference: 'Organization/456' }; const ORGANIZATION_THREE = { reference: 'Organization/789' }; const ORGANIZATION_FOUR = { reference: 'Organization/999' }; test.each([ { description: 'should return true when subscription has meta.account and resource has matching account in meta.accounts even if meta.account differs', subscriptionMeta: { account: ORGANIZATION_ONE }, resourceMeta: { account: ORGANIZATION_TWO, accounts: [ORGANIZATION_TWO, ORGANIZATION_ONE], }, shouldMatch: true, }, { description: 'should return true when subscription has meta.accounts and resource has matching account in meta.accounts', subscriptionMeta: { accounts: [ORGANIZATION_ONE, ORGANIZATION_TWO] }, resourceMeta: { accounts: [ORGANIZATION_THREE, ORGANIZATION_ONE] }, shouldMatch: true, }, { description: 'should return false when subscription has meta.accounts but resource has no matching accounts in meta.accounts', subscriptionMeta: { accounts: [ORGANIZATION_ONE, ORGANIZATION_TWO] }, resourceMeta: { accounts: [ORGANIZATION_THREE, ORGANIZATION_FOUR] }, shouldMatch: false, }, { description: 'should return true when subscription has meta.accounts and resource has no meta.accounts but matching meta.account', subscriptionMeta: { accounts: [ORGANIZATION_ONE, ORGANIZATION_TWO] }, resourceMeta: { account: ORGANIZATION_TWO }, shouldMatch: true, }, { description: 'should return false when subscription has meta.accounts and resource has no meta.accounts but non-matching meta.account', subscriptionMeta: { accounts: [ORGANIZATION_ONE, ORGANIZATION_TWO] }, resourceMeta: { account: ORGANIZATION_THREE }, shouldMatch: false, }, { description: 'should return false when subscription has meta.accounts but resource has neither meta.accounts nor meta.account', subscriptionMeta: { accounts: [ORGANIZATION_ONE, ORGANIZATION_TWO] }, resourceMeta: {}, shouldMatch: false, }, { description: 'should return true when subscription has meta.accounts with single account and resource has matching meta.account', subscriptionMeta: { accounts: [ORGANIZATION_ONE] }, resourceMeta: { account: ORGANIZATION_ONE }, shouldMatch: true, }, { description: 'should return false when subscription has empty meta.accounts array', subscriptionMeta: { accounts: [], account: ORGANIZATION_ONE }, resourceMeta: { account: ORGANIZATION_TWO }, shouldMatch: false, }, { description: 'should return true when subscription has meta.account and resource has matching meta.account', subscriptionMeta: { account: ORGANIZATION_ONE }, resourceMeta: { account: ORGANIZATION_ONE }, shouldMatch: true, }, { description: 'should return false when subscription has meta.account and resource has non-matching meta.account', subscriptionMeta: { account: ORGANIZATION_ONE }, resourceMeta: { account: ORGANIZATION_TWO }, shouldMatch: false, }, { description: 'should return true when subscription has meta.account and resource has both meta.account and meta.accounts where meta.account NOT in accounts but subscription account matches one in the combined list', subscriptionMeta: { account: ORGANIZATION_ONE }, resourceMeta: { account: ORGANIZATION_TWO, accounts: [ORGANIZATION_THREE, ORGANIZATION_ONE], }, shouldMatch: true, }, { description: 'should return true when subscription has meta.account and resource has both meta.account and meta.accounts where meta.account NOT in accounts and subscription account matches the resource meta.account', subscriptionMeta: { account: ORGANIZATION_TWO }, resourceMeta: { account: ORGANIZATION_TWO, accounts: [ORGANIZATION_THREE, ORGANIZATION_ONE], }, shouldMatch: true, }, { description: 'should return false when subscription has meta.account and resource has both meta.account and meta.accounts where meta.account NOT in accounts and no match in combined list', subscriptionMeta: { account: ORGANIZATION_FOUR }, resourceMeta: { account: ORGANIZATION_TWO, accounts: [ORGANIZATION_THREE, ORGANIZATION_ONE], }, shouldMatch: false, }, { description: 'should return true when resource has meta.account and subscription has both meta.account and meta.accounts where meta.account NOT in accounts but resource account matches one in combined list', subscriptionMeta: { account: ORGANIZATION_TWO, accounts: [ORGANIZATION_THREE, ORGANIZATION_ONE], }, resourceMeta: { account: ORGANIZATION_ONE }, shouldMatch: true, }, { description: 'should return true when resource has meta.account and subscription has both meta.account and meta.accounts where meta.account NOT in accounts and resource account matches subscription meta.account', subscriptionMeta: { account: ORGANIZATION_TWO, accounts: [ORGANIZATION_THREE, ORGANIZATION_ONE], }, resourceMeta: { account: ORGANIZATION_TWO }, shouldMatch: true, }, { description: 'should return false when resource has meta.account and subscription has both meta.account and meta.accounts where meta.account NOT in accounts and no match', subscriptionMeta: { account: ORGANIZATION_TWO, accounts: [ORGANIZATION_THREE, ORGANIZATION_ONE], }, resourceMeta: { account: ORGANIZATION_FOUR }, shouldMatch: false, }, { description: 'should return true when neither subscription nor resource have account metadata', subscriptionMeta: {}, resourceMeta: {}, shouldMatch: true, }, { description: 'should return true when subscription has meta.account and resource has matching account in meta.accounts', subscriptionMeta: { account: ORGANIZATION_ONE }, resourceMeta: { accounts: [ORGANIZATION_TWO, ORGANIZATION_ONE] }, shouldMatch: true, }, { description: 'should return false when subscription has meta.account and resource has non-matching accounts in meta.accounts', subscriptionMeta: { account: ORGANIZATION_ONE }, resourceMeta: { accounts: [ORGANIZATION_TWO, ORGANIZATION_THREE] }, shouldMatch: false, }, ])('$description', async ({ subscriptionMeta, resourceMeta, shouldMatch }) => { const log = jest.fn(); const subscription: Subscription = { id: '123', resourceType: 'Subscription', status: 'active', reason: 'test subscription', criteria: 'Communication', channel: { type: 'rest-hook', endpoint: 'Bot/123', }, meta: subscriptionMeta, }; const matches = await resourceMatchesSubscriptionCriteria({ resource: { id: '123', resourceType: 'Communication', status: 'in-progress', meta: resourceMeta, }, subscription, context: { interaction: 'create' }, getPreviousResource: async () => undefined, logger: new Logger(log, undefined, LogLevel.DEBUG), }); expect(matches).toStrictEqual(shouldMatch); if (!shouldMatch) { expect(log).toHaveBeenCalledWith(expect.stringContaining('Subscription suppressed due to mismatched accounts')); } else { expect(log).not.toHaveBeenCalledWith(expect.stringContaining('Subscription suppressed')); } }); }); });

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/medplum/medplum'

If you have feedback or need assistance with the MCP directory API, please join our Discord server