Skip to main content
Glama
subscription.test.ts77.7 kB
// SPDX-FileCopyrightText: Copyright Orangebot, Inc. and Medplum contributors // SPDX-License-Identifier: Apache-2.0 import { InvokeCommand, LambdaClient } from '@aws-sdk/client-lambda'; import { ContentType, LogLevel, Operator, createReference, generateId, getReferenceString, stringify, } from '@medplum/core'; import type { AccessPolicy, AuditEvent, Binary, Bot, DocumentReference, Observation, Patient, ProjectMembership, Resource, Subscription, SubscriptionChannel, } from '@medplum/fhirtypes'; import type { AwsClientStub } from 'aws-sdk-client-mock'; import { mockClient } from 'aws-sdk-client-mock'; import type { Job, Worker } from 'bullmq'; import * as bullmqModule from 'bullmq'; import type { Redis } from 'ioredis'; import fetch from 'node-fetch'; import { createHmac, randomUUID } from 'node:crypto'; import { initAppServices, shutdownApp } from '../app'; import { loadTestConfig } from '../config/loader'; import type { MedplumServerConfig } from '../config/types'; import { tryGetRequestContext } from '../context'; import { Repository, getSystemRepo } from '../fhir/repo'; import * as loggerModule from '../logger'; import { globalLogger } from '../logger'; import * as otelModule from '../otel/otel'; import { getRedisSubscriber } from '../redis'; import type { SubEventsOptions } from '../subscriptions/websockets'; import { createTestProject, withTestContext } from '../test.setup'; import { AuditEventOutcome } from '../util/auditevent'; import type { SubscriptionJobData } from './subscription'; import { execSubscriptionJob, getSubscriptionQueue, initSubscriptionWorker } from './subscription'; jest.mock('node-fetch'); const mockBullmq = jest.mocked(bullmqModule); describe('Subscription Worker', () => { const systemRepo = getSystemRepo(); let repo: Repository; let botRepo: Repository; let mockLambdaClient: AwsClientStub<LambdaClient>; let superAdminRepo: Repository; beforeAll(async () => { const config = await loadTestConfig(); await initAppServices(config); }); afterAll(async () => { await shutdownApp(); }); beforeEach(async () => { (fetch as unknown as jest.Mock).mockClear(); // Create one simple project with no advanced features enabled const { client, repo: _repo } = await withTestContext(() => createTestProject({ withClient: true, withRepo: true, project: { name: 'Test Project', features: [], }, }) ); repo = _repo; superAdminRepo = new Repository({ extendedMode: true, superAdmin: true, author: createReference(client) }); // Create another project, this one with bots enabled const botProjectDetails = await createTestProject({ withClient: true }); botRepo = new Repository({ extendedMode: true, projects: [botProjectDetails.project], author: createReference(botProjectDetails.client), currentProject: botProjectDetails.project, }); mockLambdaClient = mockClient(LambdaClient); mockLambdaClient.on(InvokeCommand).callsFake(({ Payload }) => { const decoder = new TextDecoder(); const event = JSON.parse(decoder.decode(Payload)); const output = typeof event.input === 'string' ? event.input : JSON.stringify(event.input); const encoder = new TextEncoder(); return { LogResult: `U1RBUlQgUmVxdWVzdElkOiAxNDZmY2ZjZi1jMzJiLTQzZjUtODJhNi1lZTBmMzEzMmQ4NzMgVmVyc2lvbjogJExBVEVTVAoyMDIyLTA1LTMwVDE2OjEyOjIyLjY4NVoJMTQ2ZmNmY2YtYzMyYi00M2Y1LTgyYTYtZWUwZjMxMzJkODczCUlORk8gdGVzdApFTkQgUmVxdWVzdElkOiAxNDZmY2ZjZi1jMzJiLTQzZjUtODJhNi1lZTBmMzEzMmQ4NzMKUkVQT1JUIFJlcXVlc3RJZDogMTQ2ZmNmY2YtYzMyYi00M2Y1LTgyYTYtZWUwZjMxMzJkODcz`, Payload: encoder.encode(output), }; }); }); afterEach(() => { mockLambdaClient.restore(); }); test('Send subscriptions', () => withTestContext(async () => { const url = 'https://example.com/subscription'; const subscription = await repo.createResource<Subscription>({ resourceType: 'Subscription', reason: 'test', status: 'active', criteria: 'Patient', channel: { type: 'rest-hook', endpoint: url, }, }); expect(subscription).toBeDefined(); const queue = getSubscriptionQueue() as any; queue.add.mockClear(); const patient = await repo.createResource<Patient>({ resourceType: 'Patient', name: [{ given: ['Alice'], family: 'Smith' }], }); expect(patient).toBeDefined(); expect(queue.add).toHaveBeenCalled(); (fetch as unknown as jest.Mock).mockImplementation(() => ({ status: 200 })); const job = { id: 1, data: queue.add.mock.calls[0][1] } as unknown as Job; await execSubscriptionJob(job); expect(fetch).toHaveBeenCalledWith( url, expect.objectContaining({ method: 'POST', body: stringify(patient), }) ); // Clear the queue queue.add.mockClear(); // Update the patient await repo.updateResource({ ...patient, active: true }); // Update should also trigger the subscription expect(queue.add).toHaveBeenCalled(); // Clear the queue queue.add.mockClear(); // Delete the patient await repo.deleteResource('Patient', patient.id); expect(queue.add).toHaveBeenCalled(); })); test('Status code 201', () => withTestContext(async () => { const url = 'https://example.com/subscription'; const subscription = await repo.createResource<Subscription>({ resourceType: 'Subscription', reason: 'test', status: 'active', criteria: 'Patient', channel: { type: 'rest-hook', endpoint: url, }, }); expect(subscription).toBeDefined(); const queue = getSubscriptionQueue() as any; queue.add.mockClear(); const patient = await repo.createResource<Patient>({ resourceType: 'Patient', name: [{ given: ['Alice'], family: 'Smith' }], }); expect(patient).toBeDefined(); expect(queue.add).toHaveBeenCalled(); (fetch as unknown as jest.Mock).mockImplementation(() => ({ status: 201 })); const job = { id: 1, data: queue.add.mock.calls[0][1] } as unknown as Job; await execSubscriptionJob(job); expect(fetch).toHaveBeenCalledWith( url, expect.objectContaining({ method: 'POST', body: stringify(patient), }) ); })); test('Send subscription with custom headers', () => withTestContext( async () => { const url = 'https://example.com/subscription'; const subscription = await repo.createResource<Subscription>({ resourceType: 'Subscription', reason: 'test', status: 'active', criteria: 'Patient', channel: { type: 'rest-hook', endpoint: url, header: ['Authorization: Basic xyz'], }, }); expect(subscription).toBeDefined(); const queue = getSubscriptionQueue() as any; queue.add.mockClear(); const patient = await repo.createResource<Patient>({ resourceType: 'Patient', name: [{ given: ['Alice'], family: 'Smith' }], }); expect(patient).toBeDefined(); expect(queue.add).toHaveBeenCalled(); (fetch as unknown as jest.Mock).mockImplementation(() => ({ status: 200 })); const job = { id: 1, data: queue.add.mock.calls[0][1] } as unknown as Job; await execSubscriptionJob(job); expect(fetch).toHaveBeenCalledWith( url, expect.objectContaining({ method: 'POST', body: stringify(patient), headers: { 'Content-Type': ContentType.FHIR_JSON, Authorization: 'Basic xyz', 'x-trace-id': '00-12345678901234567890123456789012-3456789012345678-01', traceparent: '00-12345678901234567890123456789012-3456789012345678-01', 'X-Medplum-Subscription': subscription.id, 'X-Medplum-Interaction': 'create', }, }) ); }, { traceId: '00-12345678901234567890123456789012-3456789012345678-01' } )); test('Create-only subscription', () => withTestContext(async () => { const url = 'https://example.com/subscription'; const subscription = await repo.createResource<Subscription>({ resourceType: 'Subscription', reason: 'test', status: 'active', criteria: 'Patient', channel: { type: 'rest-hook', endpoint: url, }, extension: [ { url: 'https://medplum.com/fhir/StructureDefinition/subscription-supported-interaction', valueCode: 'create', }, ], }); expect(subscription).toBeDefined(); // Clear the queue const queue = getSubscriptionQueue() as any; queue.add.mockClear(); // Create the patient const patient = await repo.createResource<Patient>({ resourceType: 'Patient', name: [{ given: ['Alice'], family: 'Smith' }], }); expect(patient).toBeDefined(); // Create should trigger the subscription expect(queue.add).toHaveBeenCalled(); (fetch as unknown as jest.Mock).mockImplementation(() => ({ status: 200 })); const job = { id: 1, data: queue.add.mock.calls[0][1] } as unknown as Job; await execSubscriptionJob(job); expect(fetch).toHaveBeenCalledWith( url, expect.objectContaining({ method: 'POST', body: stringify(patient), }) ); // Clear the queue queue.add.mockClear(); // Update the patient await repo.updateResource({ ...patient, active: true }); // Update should not trigger the subscription expect(queue.add).not.toHaveBeenCalled(); // Delete the patient await repo.deleteResource('Patient', patient.id); expect(queue.add).not.toHaveBeenCalled(); })); test('Delete-only subscription', () => withTestContext( async () => { const url = 'https://example.com/subscription'; const secret = randomUUID(); const subscription = await repo.createResource<Subscription>({ resourceType: 'Subscription', reason: 'test', status: 'active', criteria: 'Patient', channel: { type: 'rest-hook', endpoint: url, }, extension: [ { url: 'https://medplum.com/fhir/StructureDefinition/subscription-supported-interaction', valueCode: 'delete', }, { url: 'https://www.medplum.com/fhir/StructureDefinition/subscription-secret', valueString: secret, }, ], }); expect(subscription).toBeDefined(); // Clear the queue const queue = getSubscriptionQueue() as any; queue.add.mockClear(); // Create the patient const patient = await repo.createResource<Patient>({ resourceType: 'Patient', name: [{ given: ['Alice'], family: 'Smith' }], }); expect(patient).toBeDefined(); // Create should trigger the subscription expect(queue.add).not.toHaveBeenCalled(); // Update the patient await repo.updateResource({ ...patient, active: true }); // Update should not trigger the subscription expect(queue.add).not.toHaveBeenCalled(); // Delete the patient await repo.deleteResource('Patient', patient.id); expect(queue.add).toHaveBeenCalled(); const job = { id: 1, data: queue.add.mock.calls[0][1] } as unknown as Job; await execSubscriptionJob(job); expect(fetch).toHaveBeenCalledWith( url, expect.objectContaining({ method: 'POST', body: '{}', headers: { 'Content-Type': ContentType.FHIR_JSON, 'X-Medplum-Subscription': subscription.id, 'X-Medplum-Interaction': 'delete', 'X-Medplum-Deleted-Resource': `Patient/${patient.id}`, 'X-Signature': createHmac('sha256', secret).update('{}').digest('hex'), 'x-trace-id': '00-12345678901234567890123456789012-3456789012345678-01', traceparent: '00-12345678901234567890123456789012-3456789012345678-01', }, }) ); }, { traceId: '00-12345678901234567890123456789012-3456789012345678-01' } )); test('Send subscriptions with signature', () => withTestContext( async () => { const url = 'https://example.com/subscription'; const secret = '0123456789'; const subscription = await repo.createResource<Subscription>({ resourceType: 'Subscription', reason: 'test', status: 'active', criteria: 'Patient', channel: { type: 'rest-hook', endpoint: url, }, extension: [ { url: 'https://www.medplum.com/fhir/StructureDefinition/subscription-secret', valueString: secret, }, ], }); expect(subscription).toBeDefined(); const queue = getSubscriptionQueue() as any; queue.add.mockClear(); const patient = await repo.createResource<Patient>({ resourceType: 'Patient', name: [{ given: ['Alice'], family: 'Smith' }], }); expect(patient).toBeDefined(); expect(queue.add).toHaveBeenCalled(); (fetch as unknown as jest.Mock).mockImplementation(() => ({ status: 200 })); const body = stringify(patient); const signature = createHmac('sha256', secret).update(body).digest('hex'); const job = { id: 1, data: queue.add.mock.calls[0][1] } as unknown as Job; await execSubscriptionJob(job); expect(fetch).toHaveBeenCalledWith( url, expect.objectContaining({ method: 'POST', body, headers: { 'Content-Type': ContentType.FHIR_JSON, 'X-Signature': signature, 'x-trace-id': '00-12345678901234567890123456789012-3456789012345678-01', traceparent: '00-12345678901234567890123456789012-3456789012345678-01', 'X-Medplum-Subscription': subscription.id, 'X-Medplum-Interaction': 'create', }, }) ); }, { traceId: '00-12345678901234567890123456789012-3456789012345678-01' } )); test('Send subscriptions with legacy signature extension', () => withTestContext( async () => { const url = 'https://example.com/subscription'; const secret = '0123456789'; const subscription = await repo.createResource<Subscription>({ resourceType: 'Subscription', reason: 'test', status: 'active', criteria: 'Patient', channel: { type: 'rest-hook', endpoint: url, }, extension: [ { url: 'https://www.medplum.com/fhir/StructureDefinition-subscriptionSecret', valueString: secret, }, ], }); expect(subscription).toBeDefined(); const queue = getSubscriptionQueue() as any; queue.add.mockClear(); const patient = await repo.createResource<Patient>({ resourceType: 'Patient', name: [{ given: ['Alice'], family: 'Smith' }], }); expect(patient).toBeDefined(); expect(queue.add).toHaveBeenCalled(); (fetch as unknown as jest.Mock).mockImplementation(() => ({ status: 200 })); const body = stringify(patient); const signature = createHmac('sha256', secret).update(body).digest('hex'); const job = { id: 1, data: queue.add.mock.calls[0][1] } as unknown as Job; await execSubscriptionJob(job); expect(fetch).toHaveBeenCalledWith( url, expect.objectContaining({ method: 'POST', body, headers: { 'Content-Type': ContentType.FHIR_JSON, 'X-Signature': signature, 'x-trace-id': '00-12345678901234567890123456789012-3456789012345678-01', traceparent: '00-12345678901234567890123456789012-3456789012345678-01', 'X-Medplum-Subscription': subscription.id, 'X-Medplum-Interaction': 'create', }, }) ); }, { traceId: '00-12345678901234567890123456789012-3456789012345678-01' } )); test('Ignore non-subscription subscriptions', () => withTestContext(async () => { const subscription = await repo.createResource<Subscription>({ resourceType: 'Subscription', reason: 'test', status: 'active', criteria: 'Patient', channel: { type: 'email', // this is what causes the subscription to be ignored }, }); expect(subscription).toBeDefined(); const queue = getSubscriptionQueue() as any; queue.add.mockClear(); const patient = await repo.createResource<Patient>({ resourceType: 'Patient', name: [{ given: ['Alice'], family: 'Smith' }], }); expect(patient).toBeDefined(); expect(queue.add).not.toHaveBeenCalled(); })); test('Ignore subscriptions missing URL', () => withTestContext(async () => { const subscription = await repo.createResource<Subscription>({ resourceType: 'Subscription', reason: 'test', status: 'active', criteria: 'Patient', channel: { type: 'rest-hook', endpoint: '', }, }); expect(subscription).toBeDefined(); const queue = getSubscriptionQueue() as any; queue.add.mockClear(); const patient = await repo.createResource<Patient>({ resourceType: 'Patient', name: [{ given: ['Alice'], family: 'Smith' }], }); expect(patient).toBeDefined(); expect(queue.add).not.toHaveBeenCalled(); })); test.skip('Ignore subscriptions with missing criteria', () => withTestContext(async () => { const subscription = await repo.createResource<Subscription>({ resourceType: 'Subscription', reason: 'test', status: 'active', channel: { type: 'rest-hook', endpoint: 'https://example.com/subscription', }, } as Subscription); expect(subscription).toBeDefined(); const queue = getSubscriptionQueue() as any; queue.add.mockClear(); const patient = await repo.createResource<Patient>({ resourceType: 'Patient', name: [{ given: ['Alice'], family: 'Smith' }], }); expect(patient).toBeDefined(); expect(queue.add).not.toHaveBeenCalled(); })); test('Ignore subscriptions with different criteria resource type', () => withTestContext(async () => { const subscription = await repo.createResource<Subscription>({ resourceType: 'Subscription', reason: 'test', status: 'active', criteria: 'Observation', channel: { type: 'rest-hook', endpoint: 'https://example.com/subscription', }, }); expect(subscription).toBeDefined(); const queue = getSubscriptionQueue() as any; queue.add.mockClear(); const patient = await repo.createResource<Patient>({ resourceType: 'Patient', name: [{ given: ['Alice'], family: 'Smith' }], }); expect(patient).toBeDefined(); expect(queue.add).not.toHaveBeenCalled(); })); test('Ignore subscriptions with different criteria parameter', () => withTestContext(async () => { const subscription = await repo.createResource<Subscription>({ resourceType: 'Subscription', reason: 'test', status: 'active', criteria: 'Observation?status=final', channel: { type: 'rest-hook', endpoint: 'https://example.com/subscription', }, }); expect(subscription).toBeDefined(); const queue = getSubscriptionQueue() as any; queue.add.mockClear(); await repo.createResource<Observation>({ resourceType: 'Observation', status: 'preliminary', code: { text: 'ok' }, }); expect(queue.add).not.toHaveBeenCalled(); await repo.createResource<Observation>({ resourceType: 'Observation', status: 'final', code: { text: 'ok' }, }); expect(queue.add).toHaveBeenCalled(); })); test('Ignore disabled subscriptions', () => withTestContext(async () => { const subscription = await repo.createResource<Subscription>({ resourceType: 'Subscription', reason: 'test', status: 'off', criteria: 'Patient', channel: { type: 'rest-hook', endpoint: 'https://example.com/subscription', }, }); expect(subscription).toBeDefined(); const queue = getSubscriptionQueue() as any; queue.add.mockClear(); const patient = await repo.createResource<Patient>({ resourceType: 'Patient', name: [{ given: ['Alice'], family: 'Smith' }], }); expect(patient).toBeDefined(); expect(queue.add).not.toHaveBeenCalled(); })); test('Ignore resource changes in different project', () => withTestContext(async () => { // Create a subscription in project 1 const subscription = await repo.createResource<Subscription>({ resourceType: 'Subscription', reason: 'test', status: 'active', criteria: 'Patient', channel: { type: 'rest-hook', endpoint: 'https://example.com/subscription', }, }); expect(subscription).toBeDefined(); const queue = getSubscriptionQueue() as any; queue.add.mockClear(); // Create a patient in project 2 const patient = await botRepo.createResource<Patient>({ resourceType: 'Patient', name: [{ given: ['Alice'], family: 'Smith' }], }); expect(patient).toBeDefined(); expect(queue.add).not.toHaveBeenCalled(); })); test('Ignore resource changes in different account compartment', () => withTestContext(async () => { const project = randomUUID(); const account = 'Organization/' + randomUUID(); const subscription = await repo.createResource<Subscription>({ resourceType: 'Subscription', reason: 'test', meta: { project, account: { reference: account, }, }, status: 'active', criteria: 'Patient', channel: { type: 'rest-hook', endpoint: 'https://example.com/subscription', }, }); expect(subscription).toBeDefined(); const queue = getSubscriptionQueue() as any; queue.add.mockClear(); const patient = await repo.createResource<Patient>({ resourceType: 'Patient', meta: { project, }, name: [{ given: ['Alice'], family: 'Smith' }], }); expect(patient).toBeDefined(); expect(queue.add).not.toHaveBeenCalled(); })); test('Retries in preamble errors', () => withTestContext(async () => { const url = 'https://example.com/subscription'; const subscription = await repo.createResource<Subscription>({ resourceType: 'Subscription', reason: 'test', status: 'active', criteria: 'Patient', channel: { type: 'rest-hook', endpoint: url, }, extension: [ { url: 'https://medplum.com/fhir/StructureDefinition/subscription-max-attempts', valueInteger: 3, }, ], }); expect(subscription).toBeDefined(); const queue = getSubscriptionQueue() as any; queue.add.mockClear(); const patient = await repo.createResource<Patient>({ resourceType: 'Patient', name: [{ given: ['Alice'], family: 'Smith' }], }); expect(patient).toBeDefined(); expect(queue.add).toHaveBeenCalled(); // causes an error to be thrown const getLoggerSpy = jest.spyOn(loggerModule, 'getLogger').mockImplementation(() => { throw new Error('Logger not available for some weird reason'); }); const job = { id: 1, data: queue.add.mock.calls[0][1], attemptsMade: 0, changePriority: jest.fn(), } as unknown as Job; // On the first attempt, throws await expect(execSubscriptionJob(job)).rejects.toThrow('Logger not available for some weird reason'); expect(job.changePriority).not.toHaveBeenCalledWith(); // On a later attempt, should not throw job.attemptsMade = 100000; await execSubscriptionJob(job); expect(job.changePriority).not.toHaveBeenCalledWith(); getLoggerSpy.mockRestore(); })); test('Retry on 429', () => withTestContext(async () => { const url = 'https://example.com/subscription'; const subscription = await repo.createResource<Subscription>({ resourceType: 'Subscription', reason: 'test', status: 'active', criteria: 'Patient', channel: { type: 'rest-hook', endpoint: url, }, extension: [ { url: 'https://medplum.com/fhir/StructureDefinition/subscription-max-attempts', valueInteger: 3, }, ], }); expect(subscription).toBeDefined(); const queue = getSubscriptionQueue() as any; queue.add.mockClear(); const patient = await repo.createResource<Patient>({ resourceType: 'Patient', name: [{ given: ['Alice'], family: 'Smith' }], }); expect(patient).toBeDefined(); expect(queue.add).toHaveBeenCalled(); (fetch as unknown as jest.Mock).mockImplementation(() => ({ status: 429 })); const job = { id: 1, data: queue.add.mock.calls[0][1], attemptsMade: 2, changePriority: jest.fn(), } as unknown as Job; // If the job throws, then the QueueScheduler will retry await expect(execSubscriptionJob(job)).rejects.toThrow('Received status 429'); expect(job.changePriority).toHaveBeenCalledWith({ priority: 3 }); })); test('Retry on exception', () => withTestContext(async () => { const url = 'https://example.com/subscription'; const subscription = await repo.createResource<Subscription>({ resourceType: 'Subscription', reason: 'test', status: 'active', criteria: 'Patient', channel: { type: 'rest-hook', endpoint: url, }, extension: [ { url: 'https://medplum.com/fhir/StructureDefinition/subscription-max-attempts', valueInteger: 3, }, ], }); expect(subscription).toBeDefined(); const queue = getSubscriptionQueue() as any; queue.add.mockClear(); const patient = await repo.createResource<Patient>({ resourceType: 'Patient', name: [{ given: ['Alice'], family: 'Smith' }], }); expect(patient).toBeDefined(); expect(queue.add).toHaveBeenCalled(); (fetch as unknown as jest.Mock).mockImplementation(() => { throw new Error('foo'); }); const job = { id: 1, data: queue.add.mock.calls[0][1], attemptsMade: 2, changePriority: jest.fn(), } as unknown as Job; // If the job throws, then the QueueScheduler will retry await expect(execSubscriptionJob(job)).rejects.toThrow('foo'); expect(job.changePriority).toHaveBeenCalledWith({ priority: 3 }); })); test('Do not throw after max job attempts', () => withTestContext(async () => { const url = 'https://example.com/subscription'; const subscription = await repo.createResource<Subscription>({ resourceType: 'Subscription', reason: 'test', status: 'active', criteria: 'Patient', channel: { type: 'rest-hook', endpoint: url, }, extension: [ { url: 'https://medplum.com/fhir/StructureDefinition/subscription-max-attempts', valueInteger: 1, }, ], }); expect(subscription).toBeDefined(); const queue = getSubscriptionQueue() as any; queue.add.mockClear(); const patient = await repo.createResource<Patient>({ resourceType: 'Patient', name: [{ given: ['Alice'], family: 'Smith' }], }); expect(patient).toBeDefined(); expect(queue.add).toHaveBeenCalled(); (fetch as unknown as jest.Mock).mockImplementation(() => { throw new Error(); }); const job = { id: 1, data: queue.add.mock.calls[0][1], attemptsMade: 1 } as unknown as Job; // Job shouldn't throw after max attempts, which will cause it to not retry const result = await execSubscriptionJob(job); expect(result).toBeUndefined(); })); test('Ignore bots if feature not enabled', () => withTestContext(async () => { const nonce = randomUUID(); const bot = await repo.createResource<Bot>({ resourceType: 'Bot', name: 'Test Bot', description: 'Test Bot', runtimeVersion: 'awslambda', code: ` export async function handler(medplum, event) { console.log('${nonce}'); return event.input; } `, }); await systemRepo.createResource<ProjectMembership>({ resourceType: 'ProjectMembership', project: { reference: 'Project/' + bot.meta?.project }, user: createReference(bot), profile: createReference(bot), }); const subscription = await repo.createResource<Subscription>({ resourceType: 'Subscription', reason: 'test', status: 'active', criteria: 'Patient', channel: { type: 'rest-hook', endpoint: getReferenceString(bot as Bot), }, }); const queue = getSubscriptionQueue() as any; queue.add.mockClear(); await repo.createResource<Patient>({ resourceType: 'Patient', name: [{ given: ['Alice'], family: 'Smith' }], }); expect(queue.add).toHaveBeenCalled(); (fetch as unknown as jest.Mock).mockImplementation(() => ({ status: 200 })); const job = { id: 1, data: queue.add.mock.calls[0][1] } as unknown as Job; await execSubscriptionJob(job); expect(fetch).not.toHaveBeenCalled(); const bundle = await repo.search<AuditEvent>({ resourceType: 'AuditEvent', filters: [ { code: 'entity', operator: Operator.EQUALS, value: getReferenceString(subscription), }, ], }); expect(bundle.entry?.length).toStrictEqual(1); const auditEvent = bundle.entry?.[0]?.resource as AuditEvent; expect(auditEvent.outcomeDesc).toStrictEqual('Bots not enabled'); expect(auditEvent.period).toBeDefined(); expect(auditEvent.entity).toHaveLength(3); })); test('Execute bot subscriptions', () => withTestContext(async () => { const bot = await botRepo.createResource<Bot>({ resourceType: 'Bot', name: 'Test Bot', description: 'Test Bot', runtimeVersion: 'awslambda', code: ` export async function handler(medplum, event) { return event.input; } `, }); await systemRepo.createResource<ProjectMembership>({ resourceType: 'ProjectMembership', project: { reference: 'Project/' + bot.meta?.project }, user: createReference(bot), profile: createReference(bot), }); const subscription = await botRepo.createResource<Subscription>({ resourceType: 'Subscription', reason: 'test', status: 'active', criteria: 'Patient', channel: { type: 'rest-hook', endpoint: getReferenceString(bot as Bot), }, }); const queue = getSubscriptionQueue() as any; queue.add.mockClear(); const patient = await botRepo.createResource<Patient>({ resourceType: 'Patient', name: [{ given: ['Alice'], family: 'Smith' }], }); expect(patient).toBeDefined(); expect(queue.add).toHaveBeenCalled(); (fetch as unknown as jest.Mock).mockImplementation(() => ({ status: 200 })); const job = { id: 1, data: queue.add.mock.calls[0][1] } as unknown as Job; await execSubscriptionJob(job); expect(fetch).not.toHaveBeenCalled(); const bundle = await botRepo.search<AuditEvent>({ resourceType: 'AuditEvent', filters: [ { code: 'entity', operator: Operator.EQUALS, value: getReferenceString(subscription), }, ], }); expect(bundle.entry?.length).toStrictEqual(1); expect(bundle.entry?.[0]?.resource?.outcome).toStrictEqual('0'); })); test('Execute bot subscriptions run as user', () => withTestContext(async () => { const bot = await botRepo.createResource<Bot>({ resourceType: 'Bot', name: 'Test Bot', description: 'Test Bot', runAsUser: true, runtimeVersion: 'awslambda', code: ` export async function handler(medplum, event) { return event.input; } `, }); await systemRepo.createResource<ProjectMembership>({ resourceType: 'ProjectMembership', project: { reference: 'Project/' + bot.meta?.project }, user: createReference(bot), profile: createReference(bot), }); const subscription = await botRepo.createResource<Subscription>({ resourceType: 'Subscription', reason: 'test', status: 'active', criteria: 'Patient', channel: { type: 'rest-hook', endpoint: getReferenceString(bot as Bot), }, }); const queue = getSubscriptionQueue() as any; queue.add.mockClear(); const patient = await botRepo.createResource<Patient>({ resourceType: 'Patient', name: [{ given: ['Alice'], family: 'Smith' }], }); expect(patient).toBeDefined(); expect(queue.add).toHaveBeenCalled(); (fetch as unknown as jest.Mock).mockImplementation(() => ({ status: 200 })); const job = { id: 1, data: queue.add.mock.calls[0][1] } as unknown as Job; await execSubscriptionJob(job); expect(fetch).not.toHaveBeenCalled(); const bundle = await botRepo.search<AuditEvent>({ resourceType: 'AuditEvent', filters: [ { code: 'entity', operator: Operator.EQUALS, value: getReferenceString(subscription), }, ], }); expect(bundle.entry?.length).toStrictEqual(1); expect(bundle.entry?.[0]?.resource?.outcome).toStrictEqual('0'); })); test('Stop retries if Subscription status not active', () => withTestContext(async () => { const subscription = await repo.createResource<Subscription>({ resourceType: 'Subscription', reason: 'test', status: 'active', criteria: 'Patient', channel: { type: 'rest-hook', endpoint: 'https://example.com/', }, }); expect(subscription).toBeDefined(); const queue = getSubscriptionQueue() as any; queue.add.mockClear(); const patient = await repo.createResource<Patient>({ resourceType: 'Patient', name: [{ given: ['Alice'], family: 'Smith' }], }); expect(patient).toBeDefined(); expect(queue.add).toHaveBeenCalled(); // At this point the job should be in the queue // But let's change the subscription status to something else await repo.updateResource<Subscription>({ ...subscription, status: 'off', }); const job = { id: 1, data: queue.add.mock.calls[0][1] } as unknown as Job; await execSubscriptionJob(job); // Fetch should not have been called expect(fetch).not.toHaveBeenCalled(); // No AuditEvent resources should have been created const bundle = await repo.search<AuditEvent>({ resourceType: 'AuditEvent', filters: [ { code: 'entity', operator: Operator.EQUALS, value: getReferenceString(subscription), }, ], }); expect(bundle.entry?.length).toStrictEqual(0); })); test('Stop retries if Subscription deleted', () => withTestContext(async () => { const subscription = await repo.createResource<Subscription>({ resourceType: 'Subscription', reason: 'test', status: 'active', criteria: 'Patient', channel: { type: 'rest-hook', endpoint: 'https://example.com/', }, }); expect(subscription).toBeDefined(); const queue = getSubscriptionQueue() as any; queue.add.mockClear(); await repo.createResource<Patient>({ resourceType: 'Patient', name: [{ given: ['Alice'], family: 'Smith' }], }); expect(queue.add).toHaveBeenCalled(); // At this point the job should be in the queue // But let's delete the subscription await repo.deleteResource('Subscription', subscription.id); const job = { id: 1, data: queue.add.mock.calls[0][1] } as unknown as Job; await execSubscriptionJob(job); // Fetch should not have been called expect(fetch).not.toHaveBeenCalled(); // No AuditEvent resources should have been created const bundle = await repo.search<AuditEvent>({ resourceType: 'AuditEvent', filters: [ { code: 'entity', operator: Operator.EQUALS, value: getReferenceString(subscription), }, ], }); expect(bundle.entry?.length).toStrictEqual(0); })); test('Stop retries if Resource deleted', () => withTestContext(async () => { const subscription = await repo.createResource<Subscription>({ resourceType: 'Subscription', reason: 'test', status: 'active', criteria: 'Patient', channel: { type: 'rest-hook', endpoint: 'https://example.com/', }, }); expect(subscription).toBeDefined(); const queue = getSubscriptionQueue() as any; queue.add.mockClear(); const patient = await repo.createResource<Patient>({ resourceType: 'Patient', name: [{ given: ['Alice'], family: 'Smith' }], }); expect(queue.add).toHaveBeenCalled(); // At this point the job should be in the queue // But let's delete the resource await repo.deleteResource('Patient', patient.id); const job = { id: 1, data: queue.add.mock.calls[0][1], attemptsMade: 2 } as unknown as Job; await execSubscriptionJob(job); // Fetch should not have been called expect(fetch).not.toHaveBeenCalled(); // No AuditEvent resources should have been created const bundle = await repo.search<AuditEvent>({ resourceType: 'AuditEvent', filters: [ { code: 'entity', operator: Operator.EQUALS, value: getReferenceString(subscription), }, ], }); expect(bundle.entry?.length).toStrictEqual(0); })); test('AuditEvent has Subscription account details', () => withTestContext(async () => { const project = (await createTestProject()).project.id; const account = { reference: 'Organization/' + randomUUID(), }; const subscription = await systemRepo.createResource<Subscription>({ resourceType: 'Subscription', reason: 'test', meta: { project, account, }, status: 'active', criteria: 'Patient', channel: { type: 'rest-hook', endpoint: 'https://example.com/subscription', }, }); expect(subscription).toBeDefined(); const queue = getSubscriptionQueue() as any; queue.add.mockClear(); const patient = await systemRepo.createResource<Patient>({ resourceType: 'Patient', meta: { project, account, }, name: [{ given: ['Alice'], family: 'Smith' }], }); expect(patient).toBeDefined(); expect(queue.add).toHaveBeenCalled(); (fetch as unknown as jest.Mock).mockImplementation(() => ({ status: 200 })); const job = { id: 1, data: queue.add.mock.calls[0][1] } as unknown as Job; await execSubscriptionJob(job); const bundle = await systemRepo.search<AuditEvent>({ resourceType: 'AuditEvent', filters: [ { code: 'entity', operator: Operator.EQUALS, value: getReferenceString(subscription), }, ], }); expect(bundle.entry?.length).toStrictEqual(1); const auditEvent = bundle.entry?.[0]?.resource as AuditEvent; expect(auditEvent.meta?.account?.reference).toStrictEqual(account.reference); expect(auditEvent.meta?.accounts).toHaveLength(1); expect(auditEvent.meta?.accounts).toContainEqual({ reference: account.reference }); expect(auditEvent.entity).toHaveLength(2); })); test('AuditEvent outcome from custom codes', () => withTestContext(async () => { const project = (await createTestProject()).project.id; const account = { reference: 'Organization/' + randomUUID(), }; const subscription = await systemRepo.createResource<Subscription>({ resourceType: 'Subscription', reason: 'test', meta: { project, account, }, status: 'active', criteria: 'Patient', channel: { type: 'rest-hook', endpoint: 'https://example.com/subscription', }, extension: [ { url: 'https://medplum.com/fhir/StructureDefinition/subscription-success-codes', valueString: '200,201,410-600', }, ], }); expect(subscription).toBeDefined(); const queue = getSubscriptionQueue() as any; queue.add.mockClear(); const patient = await systemRepo.createResource<Patient>({ resourceType: 'Patient', meta: { project, account, }, name: [{ given: ['Alice'], family: 'Smith' }], }); expect(patient).toBeDefined(); expect(queue.add).toHaveBeenCalled(); (fetch as unknown as jest.Mock).mockImplementation(() => ({ status: 515 })); const job = { id: 1, data: queue.add.mock.calls[0][1] } as unknown as Job; await execSubscriptionJob(job); const bundle = await systemRepo.search<AuditEvent>({ resourceType: 'AuditEvent', filters: [ { code: 'entity', operator: Operator.EQUALS, value: getReferenceString(subscription), }, ], }); expect(bundle.entry?.length).toStrictEqual(1); const auditEvent = bundle.entry?.[0].resource as AuditEvent; // Should return a successful AuditEventOutcome with a normally failing status expect(auditEvent.outcome).toStrictEqual(AuditEventOutcome.Success); })); test('FhirPathCriteria extension', () => withTestContext(async () => { const url = 'https://example.com/subscription'; const subscription = await repo.createResource<Subscription>({ resourceType: 'Subscription', reason: 'test', status: 'active', criteria: 'Patient', channel: { type: 'rest-hook', endpoint: url, }, extension: [ { url: 'https://medplum.com/fhir/StructureDefinition/fhir-path-criteria-expression', valueString: '%previous.name!=%current.name', }, ], }); expect(subscription).toBeDefined(); // Create the patient const patient = await repo.createResource<Patient>({ resourceType: 'Patient', name: [{ given: ['Alice'], family: 'Smith' }], }); expect(patient).toBeDefined(); // Clear the queue const queue = getSubscriptionQueue() as any; queue.add.mockClear(); // Clear the queue queue.add.mockClear(); // Update the patient const patient2 = await repo.updateResource({ ...patient, name: [{ given: ['Bob'], family: 'Smith' }] }); expect(queue.add).toHaveBeenCalled(); (fetch as unknown as jest.Mock).mockImplementation(() => ({ status: 200 })); const job = { id: 1, data: queue.add.mock.calls[0][1] } as unknown as Job; await execSubscriptionJob(job); expect(fetch).toHaveBeenCalledWith( url, expect.objectContaining({ method: 'POST', body: stringify(patient2), }) ); })); test('FhirPathCriteria extension not met', () => withTestContext(async () => { const url = 'https://example.com/subscription'; const subscription = await repo.createResource<Subscription>({ resourceType: 'Subscription', reason: 'test', status: 'active', criteria: 'Patient', channel: { type: 'rest-hook', endpoint: url, }, extension: [ { url: 'https://medplum.com/fhir/StructureDefinition/fhir-path-criteria-expression', valueString: '%previous.name=%current.name', }, ], }); expect(subscription).toBeDefined(); // Create the patient const patient = await repo.createResource<Patient>({ resourceType: 'Patient', name: [{ given: ['Alice'], family: 'Smith' }], }); expect(patient).toBeDefined(); // Clear the queue const queue = getSubscriptionQueue() as any; queue.add.mockClear(); // Clear the queue queue.add.mockClear(); // Update the patient await repo.updateResource({ ...patient, name: [{ given: ['Bob'], family: 'Smith' }] }); expect(queue.add).not.toHaveBeenCalled(); })); test('Error during FhirPath evaluation should not result in other Subscriptions not firing', () => withTestContext(async () => { const url = 'https://example.com/subscription'; const subscription1 = await repo.createResource<Subscription>({ resourceType: 'Subscription', reason: 'test', status: 'active', criteria: 'Patient', channel: { type: 'rest-hook', endpoint: url, }, extension: [ { url: 'https://medplum.com/fhir/StructureDefinition/fhir-path-criteria-expression', valueString: '%current.address.postalCode.all(%previous.address.postalCode.contains(%current.address.postalCode)).not()', }, ], }); // We create this subscription second so that if the loop to evaluate the subscription exits early, this subscription won't be evaluated const subscription2 = await repo.createResource<Subscription>({ resourceType: 'Subscription', reason: 'test', status: 'active', criteria: 'Patient', channel: { type: 'rest-hook', endpoint: url, }, }); expect(subscription1).toBeDefined(); expect(subscription2).toBeDefined(); // Create the patient let patient = await repo.createResource<Patient>({ resourceType: 'Patient', name: [{ given: ['Alice'], family: 'Smith' }], // Add address with multiple values, this would cause the FHIRPath expression to throw // since you cannot call contains on an array (must be called on a singleton) address: [{ postalCode: '94134' }, { postalCode: '94135' }], }); expect(patient).toBeDefined(); const queue = getSubscriptionQueue() as any; // Clear mock because we only care about updates for this test, when %previous is not empty from the start queue.add.mockClear(); patient = await repo.updateResource<Patient>({ ...patient, address: [{ postalCode: '94134' }, { postalCode: '94136' }], }); expect(patient).toBeDefined(); expect(queue.add).toHaveBeenCalledTimes(1); expect(queue.add.mock.calls[0][1]?.id).toStrictEqual(patient.id); })); test('Subscription -- Unexpected throw inside of satisfiesAccessPolicy (regression in #3978, see #4003)', () => withTestContext(async () => { globalLogger.level = LogLevel.WARN; const originalConsoleLog = console.log; console.log = jest.fn(); const url = 'https://example.com/subscription'; // Create an access policy in different project // This should trigger an error when the subscription is executed const accessPolicy = await repo.createResource<AccessPolicy>({ resourceType: 'AccessPolicy', resource: [{ resourceType: 'Patient', readonly: false }], }); const { repo: apTestRepo } = await createTestProject({ withClient: true, withRepo: true, project: { name: 'AccessPolicy Throw Project', features: [], }, membership: { accessPolicy: createReference(accessPolicy), }, }); const subscription = await apTestRepo.createResource<Subscription>({ resourceType: 'Subscription', reason: 'test', status: 'active', criteria: 'Patient', channel: { type: 'rest-hook', endpoint: url, }, }); expect(subscription).toBeDefined(); expect(subscription.id).toBeDefined(); // Create the patient const patient = await apTestRepo.createResource<Patient>({ resourceType: 'Patient', name: [{ given: ['Alice'], family: 'Smith' }], }); expect(patient).toBeDefined(); // Clear the queue const queue = getSubscriptionQueue() as any; queue.add.mockClear(); await systemRepo.deleteResource('AccessPolicy', accessPolicy.id); // Update the patient const patient2 = await apTestRepo.updateResource({ ...patient, name: [{ given: ['Bob'], family: 'Smith' }] }); expect(queue.add).toHaveBeenCalled(); (fetch as unknown as jest.Mock).mockImplementation(() => ({ status: 200 })); const job = { id: 1, data: queue.add.mock.calls[0][1] } as unknown as Job; await execSubscriptionJob(job); expect(fetch).toHaveBeenCalledWith( url, expect.objectContaining({ method: 'POST', body: stringify(patient2), }) ); expect(console.log).toHaveBeenCalledWith(expect.stringContaining('Error occurred while checking access policy')); globalLogger.level = LogLevel.NONE; console.log = originalConsoleLog; })); // TODO: Remove this test when enforcing AccessPolicy will not break things test('Subscription -- Rest Hook Sub does not meet AccessPolicy', () => withTestContext(async () => { globalLogger.level = LogLevel.WARN; const originalConsoleLog = console.log; console.log = jest.fn(); const url = 'https://example.com/subscription'; // Create an access policy in different project // This should trigger an error when the subscription is executed const accessPolicy = await repo.createResource<AccessPolicy>({ resourceType: 'AccessPolicy', resource: [{ resourceType: 'Patient', criteria: `Patient?_id=${generateId()}` }], }); const { repo: apTestRepo } = await createTestProject({ withClient: true, withRepo: true, project: { name: 'AccessPolicy Not Met but Should Succeed Project', features: [], }, membership: { accessPolicy: createReference(accessPolicy), }, }); const subscription = await apTestRepo.createResource<Subscription>({ resourceType: 'Subscription', reason: 'test', status: 'active', criteria: 'Patient', channel: { type: 'rest-hook', endpoint: url, }, }); expect(subscription).toBeDefined(); expect(subscription.id).toBeDefined(); // Create the patient const patient = await apTestRepo.createResource<Patient>({ resourceType: 'Patient', name: [{ given: ['Alice'], family: 'Smith' }], }); expect(patient).toBeDefined(); // Clear the queue const queue = getSubscriptionQueue() as any; queue.add.mockClear(); await systemRepo.deleteResource('AccessPolicy', accessPolicy.id); // Update the patient const patient2 = await apTestRepo.updateResource({ ...patient, name: [{ given: ['Bob'], family: 'Smith' }] }); expect(queue.add).toHaveBeenCalled(); (fetch as unknown as jest.Mock).mockImplementation(() => ({ status: 200 })); const job = { id: 1, data: queue.add.mock.calls[0][1] } as unknown as Job; await execSubscriptionJob(job); expect(fetch).toHaveBeenCalledWith( url, expect.objectContaining({ method: 'POST', body: stringify(patient2), }) ); expect(console.log).toHaveBeenCalledWith(expect.stringContaining('Error occurred while checking access policy')); globalLogger.level = LogLevel.NONE; console.log = originalConsoleLog; })); test('Rest Hook Subscription -- Attachments are Rewritten', () => withTestContext(async () => { const url = 'https://example.com/subscription'; const binary = await repo.createResource<Binary>({ resourceType: 'Binary', contentType: ContentType.TEXT, }); const subscription = await repo.createResource<Subscription>({ resourceType: 'Subscription', reason: 'test', status: 'active', criteria: `DocumentReference?location=Binary/${binary.id}`, channel: { type: 'rest-hook', endpoint: url, }, }); expect(subscription).toBeDefined(); const queue = getSubscriptionQueue() as any; queue.add.mockClear(); const documentRef = await repo.createResource<DocumentReference>({ resourceType: 'DocumentReference', status: 'current', content: [ { attachment: { url: `Binary/${binary.id}`, }, }, ], }); expect(documentRef).toBeDefined(); expect(queue.add).toHaveBeenCalled(); (fetch as unknown as jest.Mock).mockImplementation(() => ({ status: 200 })); const job = { id: 1, data: queue.add.mock.calls[0][1] } as unknown as Job; await execSubscriptionJob(job); expect(fetch).toHaveBeenCalledWith( url, expect.objectContaining({ method: 'POST', body: expect.stringContaining('?Expires='), }) ); // Clear the queue queue.add.mockClear(); })); describe('WebSocket Subscriptions', () => { type EventNotificationArgs<T extends Resource> = [T, string, SubEventsOptions]; let subscriber: Redis; let resolveExpected: ((args: EventNotificationArgs<Resource>) => void) | undefined; let rejectNotExpected: ((err: Error) => void) | undefined; beforeAll(async () => { subscriber = getRedisSubscriber(); subscriber.on('message', (_channel, argsArr) => { const parsedArgsArr = JSON.parse(argsArr) as [Resource, string, SubEventsOptions][]; if (resolveExpected) { resolveExpected(parsedArgsArr[0]); } else if (rejectNotExpected) { rejectNotExpected(new Error('Received subscription notification when not expected')); rejectNotExpected = undefined; } }); await subscriber.subscribe('medplum:subscriptions:r4:websockets'); }); afterAll(async () => { await subscriber.quit(); }); async function assertNoWsNotifications(timeoutMs?: number): Promise<void> { let resolve!: () => void; let reject!: (err: Error) => void; const deferredPromise = new Promise<void>((_resolve, _reject) => { resolve = _resolve; reject = _reject; }); const notificationTimeout = setTimeout(resolve, timeoutMs ?? 1000); rejectNotExpected = (err: Error) => { clearTimeout(notificationTimeout); reject(err); }; await deferredPromise; } async function waitForNextSubNotification<T extends Resource>( timeoutMs?: number ): Promise<EventNotificationArgs<T>> { let resolve!: (args: EventNotificationArgs<T>) => void; let reject!: (err: Error) => void; const deferredPromise = new Promise<EventNotificationArgs<T>>((_resolve, _reject) => { resolve = _resolve; reject = _reject; }); const notificationTimeout = setTimeout( () => reject(new Error('Timeout while waiting for sub notification')), timeoutMs ?? 1000 ); resolveExpected = resolve as (args: EventNotificationArgs<Resource>) => void; const args = await deferredPromise; clearTimeout(notificationTimeout); return args; } test('Enabled', () => withTestContext(async () => { const { repo: wsSubRepo } = await createTestProject({ project: { name: 'WebSocket Subs Project', features: ['websocket-subscriptions'] }, withRepo: true, }); const subscription = await wsSubRepo.createResource<Subscription>({ resourceType: 'Subscription', reason: 'test', status: 'active', criteria: 'Patient?name=Alice', channel: { type: 'websocket', }, }); expect(subscription).toBeDefined(); expect(subscription.id).toBeDefined(); let nextArgsPromise = waitForNextSubNotification<Patient>(); const patient = await wsSubRepo.createResource<Patient>({ resourceType: 'Patient', name: [{ given: ['Alice'], family: 'Smith' }], }); expect(patient).toBeDefined(); let notificationArgs = await nextArgsPromise; expect(notificationArgs).toMatchObject<EventNotificationArgs<Patient>>([ expect.objectContaining(patient), subscription.id, { includeResource: true }, ]); // Update the patient nextArgsPromise = waitForNextSubNotification<Patient>(); const updatedPatient = await wsSubRepo.updateResource<Patient>({ ...patient, active: true, }); expect(updatedPatient).toBeDefined(); notificationArgs = await nextArgsPromise; expect(notificationArgs).toMatchObject<EventNotificationArgs<Patient>>([ expect.objectContaining(updatedPatient), subscription.id, { includeResource: true }, ]); // Delete the patient nextArgsPromise = waitForNextSubNotification<Patient>(); await wsSubRepo.deleteResource('Patient', updatedPatient.id); notificationArgs = await nextArgsPromise; expect(notificationArgs).toMatchObject<EventNotificationArgs<Patient>>([ expect.objectContaining(updatedPatient), subscription.id, { includeResource: true }, ]); })); test.each([ [{ type: 'websocket' }, false], // websocket subscriptions should not trigger subscriptions since they have a different persistence story [{ type: 'rest-hook' }, true], // even though endpoint is missing, this is still valid as a trigger of (valid) subscriptions [{ type: 'rest-hook', endpoint: 'https://example.com/subscription' }, true], [{ type: 'message' }, true], ] as [SubscriptionChannel, boolean][])( 'Ignore subscriptions on subscriptions with channel %j', (subChannel, expectedToFire) => withTestContext(async () => { const { repo: wsSubRepo } = await createTestProject({ project: { name: 'WebSocket Subs Project', features: ['websocket-subscriptions'] }, withRepo: true, }); const subscription = await wsSubRepo.createResource<Subscription>({ resourceType: 'Subscription', reason: 'test', status: 'active', criteria: 'Subscription', channel: { type: 'rest-hook', endpoint: 'https://example.com/subscription', }, }); expect(subscription).toBeDefined(); expect(subscription.id).toBeDefined(); const queue = getSubscriptionQueue() as any; queue.add.mockClear(); const sub = await wsSubRepo.createResource<Subscription>({ resourceType: 'Subscription', status: 'active', reason: "raison d'être", criteria: 'Patient?name=somethingrandom', channel: subChannel, }); expect(sub).toBeDefined(); if (expectedToFire) { expect(queue.add).toHaveBeenCalledTimes(1); } else { expect(queue.add).not.toHaveBeenCalled(); } }) ); test('execSubscriptionJob ignores resource versions that cannot be found', () => withTestContext(async () => { const subscription = await repo.createResource<Subscription>({ resourceType: 'Subscription', reason: 'test', status: 'active', criteria: 'Subscription', channel: { type: 'rest-hook', endpoint: 'https://example.com/', }, }); expect(subscription).toBeDefined(); const queue = getSubscriptionQueue() as any; queue.add.mockClear(); const resource = await repo.createResource<Subscription>({ resourceType: 'Subscription', status: 'active', reason: "raison d'être", criteria: 'Patient?name=somethingrandom', channel: { type: 'websocket' }, }); expect(queue.add).not.toHaveBeenCalled(); // No jobs were queued, but we still want to test that execSubscriptionJob handles this gracefully // if the job had made its way to the queue previously under different logic const ctx = tryGetRequestContext(); const jobData: SubscriptionJobData = { subscriptionId: subscription.id, resourceType: resource.resourceType, channelType: subscription.channel.type, id: resource.id, versionId: resource.meta?.versionId as string, interaction: 'create', requestTime: new Date().toISOString(), requestId: ctx?.requestId, traceId: ctx?.traceId, authState: ctx?.authState, }; // For a websocket subscription, this results in "not found" instead of "gone" await repo.deleteResource(resource.resourceType, resource.id); // Should not throw await execSubscriptionJob({ id: '1', data: jobData } as Job); // Fetch should not have been called expect(fetch).not.toHaveBeenCalled(); // No AuditEvent resources should have been created const bundle = await repo.search<AuditEvent>({ resourceType: 'AuditEvent', filters: [ { code: 'entity', operator: Operator.EQUALS, value: getReferenceString(subscription), }, ], }); expect(bundle.entry?.length).toStrictEqual(0); })); test('Feature Flag Not Enabled', () => withTestContext(async () => { globalLogger.level = LogLevel.DEBUG; const originalConsoleLog = console.log; console.log = jest.fn(); const { repo: noWsSubRepo } = await createTestProject({ withRepo: true, project: { name: 'No WebSocket Subs Project' }, }); const subscription = await noWsSubRepo.createResource<Subscription>({ resourceType: 'Subscription', reason: 'test', status: 'active', criteria: 'Patient?name=Alice', channel: { type: 'websocket', }, }); expect(subscription).toBeDefined(); expect(subscription.id).toBeDefined(); const assertPromise = assertNoWsNotifications(); const patient = await noWsSubRepo.createResource<Patient>({ resourceType: 'Patient', name: [{ given: ['Alice'], family: 'Smith' }], }); expect(patient).toBeDefined(); await assertPromise; expect(console.log).toHaveBeenLastCalledWith(expect.stringMatching(/WebSocket Subscriptions/)); console.log = originalConsoleLog; globalLogger.level = LogLevel.NONE; })); test('Access Policy Not Satisfied', () => withTestContext(async () => { globalLogger.level = LogLevel.WARN; const originalConsoleLog = console.log; console.log = jest.fn(); // Create an access policy in different project // This should trigger an error when the subscription is executed const accessPolicy = await repo.createResource<AccessPolicy>({ resourceType: 'AccessPolicy', resource: [{ resourceType: 'Patient', criteria: `Patient?_id=${generateId()}` }], }); // Create an access policy in different project // This should trigger an error when the subscription is executed const { repo: wsRepo } = await createTestProject({ withClient: true, withRepo: true, project: { name: 'WebSockets AccessPolicy Denied Project', features: ['websocket-subscriptions'], }, membership: { accessPolicy: createReference(accessPolicy), }, }); const subscription = await wsRepo.createResource<Subscription>({ resourceType: 'Subscription', reason: 'test', status: 'active', criteria: 'Patient', channel: { type: 'websocket', }, }); expect(subscription).toBeDefined(); expect(subscription.id).toBeDefined(); const assertPromise = assertNoWsNotifications(); const patient = await wsRepo.createResource<Patient>({ resourceType: 'Patient', name: [{ given: ['Alice'], family: 'Smith' }], }); expect(patient).toBeDefined(); await assertPromise; expect(console.log).not.toHaveBeenCalledWith( expect.stringContaining('[Subscription Access Policy]: Access Policy not satisfied on') ); console.log = originalConsoleLog; globalLogger.level = LogLevel.NONE; })); test('Subscription Author Has No Membership', () => withTestContext(async () => { globalLogger.level = LogLevel.WARN; const originalConsoleLog = console.log; console.log = jest.fn(); const accessPolicy = await superAdminRepo.createResource<AccessPolicy>({ resourceType: 'AccessPolicy', resource: [{ resourceType: 'Patient', readonly: true }, { resourceType: 'Subscription' }], }); const { repo: wsRepo, membership } = await createTestProject({ withClient: true, withRepo: true, project: { name: 'WebSockets AccessPolicy No Membership Project', features: ['websocket-subscriptions'], }, membership: { accessPolicy: createReference(accessPolicy), }, }); const subscription = await wsRepo.createResource<Subscription>({ resourceType: 'Subscription', reason: 'test', status: 'active', criteria: 'Patient', channel: { type: 'websocket', }, }); expect(membership.id).toBeDefined(); expect(membership.accessPolicy).toBeDefined(); expect(subscription).toBeDefined(); expect(subscription.id).toBeDefined(); await superAdminRepo.deleteResource('ProjectMembership', membership.id); const assertPromise = assertNoWsNotifications(); const patient = await wsRepo.createResource<Patient>({ resourceType: 'Patient', name: [{ given: ['Alice'], family: 'Smith' }], }); expect(patient).toBeDefined(); await assertPromise; expect(console.log).toHaveBeenCalledWith( expect.stringContaining('[Subscription Access Policy]: No membership for subscription author') ); console.log = originalConsoleLog; globalLogger.level = LogLevel.NONE; })); test('Error Occurred During Check', () => withTestContext(async () => { globalLogger.level = LogLevel.WARN; const originalConsoleLog = console.log; console.log = jest.fn(); const accessPolicy = await superAdminRepo.createResource<AccessPolicy>({ resourceType: 'AccessPolicy', resource: [{ resourceType: 'Patient', readonly: true }, { resourceType: 'Subscription' }], }); const { repo: wsRepo, membership } = await createTestProject({ withClient: true, withRepo: true, project: { name: 'WebSockets AccessPolicy Error Project', features: ['websocket-subscriptions'], }, membership: { accessPolicy: createReference(accessPolicy), }, }); const subscription = await wsRepo.createResource<Subscription>({ resourceType: 'Subscription', reason: 'test', status: 'active', criteria: 'Patient', channel: { type: 'websocket', }, }); expect(membership.id).toBeDefined(); expect(membership.accessPolicy).toBeDefined(); expect(subscription).toBeDefined(); expect(subscription.id).toBeDefined(); await superAdminRepo.deleteResource('AccessPolicy', accessPolicy.id); const assertPromise = assertNoWsNotifications(); const patient = await wsRepo.createResource<Patient>({ resourceType: 'Patient', name: [{ given: ['Alice'], family: 'Smith' }], }); expect(patient).toBeDefined(); await assertPromise; expect(console.log).toHaveBeenCalledWith( expect.stringContaining( '[Subscription Access Policy]: Error occurred while checking access policy for resource' ) ); console.log = originalConsoleLog; globalLogger.level = LogLevel.NONE; })); test('Supported Interaction Extension', () => withTestContext(async () => { const { repo: wsSubRepo } = await createTestProject({ withClient: true, withRepo: true, project: { name: 'WebSocket Subs Project', features: ['websocket-subscriptions'], }, }); const subscription = await wsSubRepo.createResource<Subscription>({ resourceType: 'Subscription', reason: 'test', status: 'active', criteria: 'Patient', channel: { type: 'websocket', }, extension: [ { url: 'https://medplum.com/fhir/StructureDefinition/subscription-supported-interaction', valueCode: 'create', }, ], }); expect(subscription).toBeDefined(); expect(subscription.id).toBeDefined(); const nextArgsPromise = waitForNextSubNotification<Patient>(); const patient = await wsSubRepo.createResource<Patient>({ resourceType: 'Patient', name: [{ given: ['Alice'], family: 'Smith' }], }); expect(patient).toBeDefined(); const notificationArgs = await nextArgsPromise; expect(notificationArgs).toMatchObject<EventNotificationArgs<Patient>>([ expect.objectContaining(patient), subscription.id, { includeResource: true }, ]); // Update the patient // This shouldn't trigger a notification const assertPromise = assertNoWsNotifications(); const updatedPatient = await wsSubRepo.updateResource<Patient>({ ...patient, active: true, }); expect(updatedPatient).toBeDefined(); await assertPromise; })); }); }); describe('Subscription Worker Event Handling', () => { afterEach(() => { jest.restoreAllMocks(); jest.resetModules(); }); test('Worker event handlers work correctly', async () => { // Set up mocks for the bullmq module const handlers = new Map<string, ((job?: Job, error?: Error) => void)[] | undefined>(); const mockWorker = { name: 'MockSubscriptionWorker', on: jest.fn().mockImplementation((event: string, handler: () => void) => { const handlerList = handlers.get(event); if (handlerList) { handlerList.push(handler); } else { handlers.set(event, [handler]); } return mockWorker; }), }; mockBullmq.Worker.mockReturnValue(mockWorker as unknown as Worker); // Now import the subscription worker init function jest.spyOn(globalLogger, 'info').mockImplementation(() => {}); // Mock the logger and metrics functions const recordHistogramValueSpy = jest.spyOn(otelModule, 'recordHistogramValue').mockImplementation(); // Initialize the subscription worker with mock config initSubscriptionWorker({} as MedplumServerConfig); // Create test job objects with the structure expected by the handlers const createTestJob = (id: string, attemptsMade = 0): Job => ({ id, attemptsMade, timestamp: Date.now() - 5000, // 5 seconds ago processedOn: Date.now() - 2000, // 2 seconds ago finishedOn: Date.now(), // now data: { subscriptionId: 'subscription-456', resourceType: 'Patient', id: 'patient-789', versionId: '1', interaction: 'create', requestTime: new Date().toISOString(), }, }) as Job; // Test the 'active' event handler with a first attempt job const firstAttemptJob = createTestJob('job-123', 0); const activeHandlers = handlers.get('active'); expect(activeHandlers).toBeDefined(); if (activeHandlers) { activeHandlers.forEach((handler) => handler(firstAttemptJob)); // Verify recordHistogramValue was called for queuedDuration on first attempt expect(recordHistogramValueSpy).toHaveBeenCalledWith('medplum.subscription.queuedDuration', expect.any(Number)); recordHistogramValueSpy.mockClear(); } // Test 'active' handler with a subsequent attempt job const subsequentAttemptJob = createTestJob('job-123', 1); if (activeHandlers) { activeHandlers.forEach((handler) => handler(subsequentAttemptJob)); // Verify recordHistogramValue was NOT called for subsequent attempts expect(recordHistogramValueSpy).not.toHaveBeenCalled(); recordHistogramValueSpy.mockClear(); } // Test the 'completed' event handler const completedJob = createTestJob('job-456'); const completedHandlers = handlers.get('completed'); expect(completedHandlers).toBeDefined(); if (completedHandlers) { completedHandlers.forEach((handler) => handler(completedJob)); // Verify completed job logging and metrics expect(recordHistogramValueSpy).toHaveBeenCalledWith( 'medplum.subscription.executionDuration', expect.any(Number) ); expect(recordHistogramValueSpy).toHaveBeenCalledWith('medplum.subscription.totalDuration', expect.any(Number)); recordHistogramValueSpy.mockClear(); } // Test the 'failed' event handler with a job const failedJob = createTestJob('job-789'); const error = new Error('Test error'); const failedHandlers = handlers.get('failed'); expect(failedHandlers).toBeDefined(); if (failedHandlers) { failedHandlers.forEach((handler) => handler(failedJob, error)); // Verify failed job logging and metrics expect(recordHistogramValueSpy).toHaveBeenCalledWith( 'medplum.subscription.failedExecutionDuration', expect.any(Number) ); recordHistogramValueSpy.mockClear(); // Test 'failed' handler with undefined job failedHandlers.forEach((handler) => handler(undefined, error)); // Verify logging for undefined job (no metrics) expect(recordHistogramValueSpy).not.toHaveBeenCalled(); } }); });

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/medplum/medplum'

If you have feedback or need assistance with the MCP directory API, please join our Discord server