Skip to main content
Glama
post-deploy-migration.test.ts21.3 kB
// SPDX-FileCopyrightText: Copyright Orangebot, Inc. and Medplum contributors // SPDX-License-Identifier: Apache-2.0 import { getReferenceString } from '@medplum/core'; import type { AsyncJob, Parameters } from '@medplum/fhirtypes'; import { DelayedError, Job, Queue } from 'bullmq'; import { closeWorkers, initWorkers } from '.'; import { initAppServices, shutdownApp } from '../app'; import { loadTestConfig } from '../config/loader'; import type { MedplumServerConfig } from '../config/types'; import { getSystemRepo } from '../fhir/repo'; import { globalLogger } from '../logger'; import type { CustomPostDeployMigration, CustomPostDeployMigrationJobData, PostDeployJobData, } from '../migrations/data/types'; import * as migrateModule from '../migrations/migrate'; import * as migrationUtils from '../migrations/migration-utils'; import type { MigrationAction } from '../migrations/types'; import type { ServerRegistryInfo } from '../server-registry'; import { getRegisteredServers } from '../server-registry'; import { withTestContext } from '../test.setup'; import * as versionModule from '../util/version'; import { getServerVersion } from '../util/version'; import { addPostDeployMigrationJobData, jobProcessor, PostDeployMigrationQueueName, prepareCustomMigrationJobData, prepareDynamicMigrationJobData, runCustomMigration, } from './post-deploy-migration'; import { queueRegistry } from './utils'; jest.mock('../server-registry'); describe('Post-Deploy Migration Worker', () => { let config: MedplumServerConfig; let mockRegisteredServers: ServerRegistryInfo[]; beforeAll(async () => { config = await loadTestConfig(); // initialize everything but workers await initAppServices(config); await closeWorkers(); }); beforeEach(() => { jest.resetModules(); jest.clearAllMocks(); mockRegisteredServers = [ { id: 'test-id', firstSeen: '2022-12-29T15:00:00Z', lastSeen: '2025-06-27T17:26:00Z', version: getServerVersion(), fullVersion: getServerVersion() + '-test', }, ]; // suppress error log output during testing jest.spyOn(globalLogger, 'error').mockImplementation(() => {}); (getRegisteredServers as jest.Mock).mockImplementation(() => mockRegisteredServers); }); afterEach(async () => { await closeWorkers(); }); afterAll(async () => { await shutdownApp(); }); test('Initialize and close worker', async () => { let queue = queueRegistry.get(PostDeployMigrationQueueName); expect(queue).toBeUndefined(); await initWorkers(config); queue = queueRegistry.get(PostDeployMigrationQueueName); expect(queue).toBeDefined(); expect(queue).toBeInstanceOf(Queue); }); function getQueueFromRegistryOrThrow(): Queue<PostDeployJobData> { const queue = queueRegistry.get<PostDeployJobData>(PostDeployMigrationQueueName); if (!queue) { throw new Error(`Job queue ${PostDeployMigrationQueueName} not available`); } return queue; } test('prepareCustomMigrationJobData and addPostDeployMigrationJobData', async () => { await initWorkers(config); const queue = getQueueFromRegistryOrThrow(); const addSpy = jest.mocked(queue.add).mockImplementation(async (jobName, jobData, options) => { return { id: '123', name: jobName, data: jobData, opts: options, } as unknown as Job<PostDeployJobData>; }); const asyncJob = await getSystemRepo().createResource<AsyncJob>({ resourceType: 'AsyncJob', status: 'accepted', dataVersion: 123, requestTime: new Date().toISOString(), request: '/admin/super/migrate', }); // inside of withTestContext, requestId and traceId are set await withTestContext(async () => { const data1 = prepareCustomMigrationJobData(asyncJob); expect(data1).toEqual({ type: 'custom', asyncJobId: asyncJob.id, requestId: expect.any(String), traceId: expect.any(String), }); const result1 = await addPostDeployMigrationJobData(data1); expect(result1).toEqual( expect.objectContaining({ data: data1, }) ); expect(addSpy).toHaveBeenCalledWith('PostDeployMigrationJobData', data1, { deduplication: { id: expect.any(String) }, }); }); // outside of withTestContext, requestId and traceId are undefined const data2 = prepareCustomMigrationJobData(asyncJob); expect(data2).toEqual({ type: 'custom', asyncJobId: asyncJob.id, requestId: undefined, traceId: undefined, }); const result2 = await addPostDeployMigrationJobData(data2); expect(result2).toEqual( expect.objectContaining({ data: data2, }) ); expect(addSpy).toHaveBeenCalledWith('PostDeployMigrationJobData', data2, { deduplication: { id: expect.any(String) }, }); }); test.each<[string, Partial<AsyncJob>, boolean]>([ ['is not active', { status: 'cancelled' }, false], ['has no dataVersion', { dataVersion: undefined }, true], ])('Job processor skips job if AsyncJob %s', async (_, jobProps, shouldThrow) => { const getPostDeployMigrationSpy = jest.spyOn(migrationUtils, 'getPostDeployMigration'); const mockAsyncJob = await getSystemRepo().createResource<AsyncJob>({ resourceType: 'AsyncJob', status: 'accepted', dataVersion: 456, requestTime: new Date().toISOString(), request: '/admin/super/migrate', ...jobProps, }); // temporarily set to {} to appease typescript since it gets set within withTestContext let job: Job<PostDeployJobData> = {} as unknown as Job<PostDeployJobData>; await withTestContext(async () => { const jobData: PostDeployJobData = prepareCustomMigrationJobData(mockAsyncJob); job = { id: '1', data: jobData, queueName: 'PostDeployMigrationQueue', } as unknown as Job<PostDeployJobData>; }); expect(job.data).toBeDefined(); if (shouldThrow) { await expect(jobProcessor(job)).rejects.toThrow( `Post-deploy migration number (AsyncJob.dataVersion) not found in ${getReferenceString(mockAsyncJob)}` ); } else { await jobProcessor(job); } // getting the post-deploy migration is a reasonable proxy for running the job // since the migration is what contains the run function. expect(getPostDeployMigrationSpy).not.toHaveBeenCalled(); }); test('Job processor runs dynamic migration when AsyncJob is active', async () => { const getPostDeployMigrationSpy = jest.spyOn(migrationUtils, 'getPostDeployMigration').mockImplementation(() => { throw new Error('Should not be called'); }); const executeMigrationActionsSpy = jest .spyOn(migrateModule, 'executeMigrationActions') .mockImplementation(async (_client, results) => { results.push({ name: 'some-action', durationMs: 10 }); }); const systemRepo = getSystemRepo(); const mockAsyncJob = await systemRepo.createResource<AsyncJob>({ resourceType: 'AsyncJob', status: 'accepted', requestTime: new Date().toISOString(), request: '/admin/super/reconcile-schema-drift', }); const migrationActions: MigrationAction[] = [ { type: 'CREATE_INDEX', indexName: 'some_necessary_test_index', createIndexSql: 'CREATE INDEX some_necessary_test_index ON Observation (id)', }, ]; // temporarily set to {} to appease typescript since it gets set within withTestContext let job: Job<PostDeployJobData> = {} as unknown as Job<PostDeployJobData>; await withTestContext(async () => { const jobData: PostDeployJobData = prepareDynamicMigrationJobData(mockAsyncJob, migrationActions); job = { id: '1', data: jobData, queueName: 'PostDeployMigrationQueue', } as unknown as Job<PostDeployJobData>; }); expect(job.data).toBeDefined(); await jobProcessor(job); expect(getPostDeployMigrationSpy).not.toHaveBeenCalled(); expect(executeMigrationActionsSpy).toHaveBeenCalledTimes(1); expect(executeMigrationActionsSpy).toHaveBeenCalledWith(expect.any(Object), expect.any(Array), migrationActions); const updatedAsyncJob = await systemRepo.readResource<AsyncJob>('AsyncJob', mockAsyncJob.id); expect(updatedAsyncJob.status).toBe('completed'); expect(updatedAsyncJob.output?.parameter).toEqual([ { name: 'some-action', part: [{ name: 'durationMs', valueInteger: 10 }] }, ]); getPostDeployMigrationSpy.mockRestore(); executeMigrationActionsSpy.mockRestore(); }); test('Job processor runs migration when AsyncJob is active', async () => { const mockCustomMigration: CustomPostDeployMigration = { type: 'custom', prepareJobData: jest.fn(), run: jest.fn().mockImplementation(async (repo, job, jobData) => { return runCustomMigration(repo, job, jobData, async (_client, results) => { results.push({ name: 'first', durationMs: 111 }); results.push({ name: 'second', durationMs: 222 }); }); }), }; const getPostDeployMigrationSpy = jest .spyOn(migrationUtils, 'getPostDeployMigration') .mockReturnValue(mockCustomMigration); const systemRepo = getSystemRepo(); const mockAsyncJob = await systemRepo.createResource<AsyncJob>({ resourceType: 'AsyncJob', status: 'accepted', dataVersion: 456, requestTime: new Date().toISOString(), request: '/admin/super/migrate', }); // temporarily set to {} to appease typescript since it gets set within withTestContext let job: Job<PostDeployJobData> = {} as unknown as Job<PostDeployJobData>; await withTestContext(async () => { const jobData: PostDeployJobData = prepareCustomMigrationJobData(mockAsyncJob); job = { id: '1', data: jobData, queueName: 'PostDeployMigrationQueue', } as unknown as Job<PostDeployJobData>; }); expect(job.data).toBeDefined(); await jobProcessor(job); expect(getPostDeployMigrationSpy).toHaveBeenCalledWith(456); expect(mockCustomMigration.run).toHaveBeenCalledWith(expect.any(Object), job, job.data); const updatedAsyncJob = await systemRepo.readResource<AsyncJob>('AsyncJob', mockAsyncJob.id); expect(updatedAsyncJob.status).toBe('completed'); expect(updatedAsyncJob.output?.parameter).toEqual([ { name: 'first', part: [{ name: 'durationMs', valueInteger: 111 }] }, { name: 'second', part: [{ name: 'durationMs', valueInteger: 222 }] }, ]); }); test.each(['some-token', undefined])( 'Job processor re-queues ineligible jobs with job.token %s', async (jobToken) => { await initWorkers(config); const queue = getQueueFromRegistryOrThrow(); const mockAsyncJob = await getSystemRepo().createResource<AsyncJob>({ resourceType: 'AsyncJob', status: 'accepted', dataVersion: 456, requestTime: new Date().toISOString(), request: '/admin/super/migrate', minServerVersion: '10.2.1', }); const mockCustomMigration: CustomPostDeployMigration = { type: 'custom', prepareJobData: jest.fn(), run: jest.fn().mockImplementation(async (repo, job, jobData) => { return runCustomMigration(repo, job, jobData, async (_client, results) => { results.push({ name: 'first', durationMs: 111 }); results.push({ name: 'second', durationMs: 222 }); }); }), }; const getPostDeployMigrationSpy = jest .spyOn(migrationUtils, 'getPostDeployMigration') .mockReturnValue(mockCustomMigration); // temporarily set to {} to appease typescript since it gets set within withTestContext let job: Job<PostDeployJobData> = {} as unknown as Job<PostDeployJobData>; await withTestContext(async () => { const jobData: PostDeployJobData = prepareCustomMigrationJobData(mockAsyncJob); job = new Job(queue, 'PostDeployMigrationJobData', jobData); // Since the Job class is fully mocked, we need to set the data property manually job.data = jobData; // job.token generally gets set deep in the internals of bullmq, but we mock the module job.token = jobToken; }); // DelayedError is part of the mocked bullmq module. Something about that causes // the usual `expect(...).rejects.toThrow(...)`; it seems to be because DelayedError // is no longer an `Error`. So instead, we manually check that it threw let threw = undefined; let manuallyThrownError = undefined; try { await jobProcessor(job); manuallyThrownError = new Error( jobToken ? 'Expected job to throw DelayedError' : 'Expected job to throw Error' ); throw manuallyThrownError; } catch (err) { threw = err; } expect(threw).toBeDefined(); expect(threw).not.toBe(manuallyThrownError); expect(threw).toBeInstanceOf(jobToken ? DelayedError : Error); expect(getPostDeployMigrationSpy).not.toHaveBeenCalled(); expect(mockCustomMigration.run).not.toHaveBeenCalled(); if (jobToken) { expect(job.moveToDelayed).toHaveBeenCalledTimes(1); expect(job.moveToDelayed).toHaveBeenCalledWith(expect.any(Number), jobToken); } else { expect(job.moveToDelayed).not.toHaveBeenCalled(); } getPostDeployMigrationSpy.mockRestore(); } ); test('Job processor delays job when migration definition is not found', async () => { await initWorkers(config); const mockAsyncJob = await getSystemRepo().createResource<AsyncJob>({ resourceType: 'AsyncJob', status: 'accepted', dataVersion: 456, requestTime: new Date().toISOString(), request: '/admin/super/migrate', minServerVersion: '1.0.0', }); const mockCustomMigration: CustomPostDeployMigration = { type: 'custom', prepareJobData: jest.fn(), run: jest.fn().mockImplementation(async (repo, job, jobData) => { return runCustomMigration(repo, job, jobData, async (_client, results) => { results.push({ name: 'first', durationMs: 111 }); results.push({ name: 'second', durationMs: 222 }); }); }), }; const queue = getQueueFromRegistryOrThrow(); // temporarily set to {} to appease typescript since it gets set within withTestContext let job: Job<PostDeployJobData> = {} as unknown as Job<PostDeployJobData>; await withTestContext(async () => { const jobData: PostDeployJobData = prepareCustomMigrationJobData(mockAsyncJob); job = new Job(queue, 'PostDeployMigrationJobData', jobData); // Since the Job class is fully mocked, we need to set the data property manually job.data = jobData; // job.token generally gets set deep in the internals of bullmq, but we mock the module job.token = 'some-token'; }); await expect(jobProcessor(job)).rejects.toBeInstanceOf(DelayedError); expect(mockCustomMigration.run).not.toHaveBeenCalled(); expect(job.moveToDelayed).toHaveBeenCalledTimes(1); expect(job.moveToDelayed).toHaveBeenCalledWith(expect.any(Number), 'some-token'); }); test.each([ ['delays job when server cluster is not compatible', true], ['runs job when server cluster is compatible', false], ])('Job process %s ', async (_msg, includeOldServer) => { const mockServerVersion = '4.3.0'; const oldServerVersion = '4.2.2'; jest.spyOn(versionModule, 'getServerVersion').mockImplementation(() => mockServerVersion); await initWorkers(config); const mockAsyncJob = await getSystemRepo().createResource<AsyncJob>({ resourceType: 'AsyncJob', status: 'accepted', dataVersion: 13, requestTime: new Date().toISOString(), request: '/admin/super/migrate', minServerVersion: mockServerVersion, }); const mockCustomMigration: CustomPostDeployMigration = { type: 'custom', prepareJobData: jest.fn(), run: jest.fn().mockImplementation(async (repo, job, jobData) => { return runCustomMigration(repo, job, jobData, async (_client, results) => { results.push({ name: 'first', durationMs: 111 }); results.push({ name: 'second', durationMs: 222 }); }); }), }; const getPostDeployMigrationSpy = jest .spyOn(migrationUtils, 'getPostDeployMigration') .mockReturnValue(mockCustomMigration); mockRegisteredServers = [ { id: 'current-server-id', firstSeen: '2022-12-29T15:00:00Z', lastSeen: '2025-06-27T17:26:00Z', version: mockServerVersion, fullVersion: mockServerVersion + '-test', }, ]; if (includeOldServer) { mockRegisteredServers.push({ id: 'old-server-id', firstSeen: '2022-12-29T15:00:00Z', lastSeen: '2025-06-27T17:26:00Z', version: oldServerVersion, fullVersion: oldServerVersion + '-test', }); } const queue = getQueueFromRegistryOrThrow(); // temporarily set to {} to appease typescript since it gets set within withTestContext let job: Job<PostDeployJobData> = {} as unknown as Job<PostDeployJobData>; await withTestContext(async () => { const jobData: PostDeployJobData = prepareCustomMigrationJobData(mockAsyncJob); job = new Job(queue, 'PostDeployMigrationJobData', jobData); // Since the Job class is fully mocked, we need to set the data property manually job.data = jobData; // job.token generally gets set deep in the internals of bullmq, but we mock the module job.token = 'some-token'; }); if (includeOldServer) { process.env.MEDPLUM_ENABLE_STRICT_MIGRATION_VERSION_CHECKS = 'true'; await expect(jobProcessor(job)).rejects.toBeInstanceOf(DelayedError); delete process.env.MEDPLUM_ENABLE_STRICT_MIGRATION_VERSION_CHECKS; expect(mockCustomMigration.run).not.toHaveBeenCalled(); expect(job.moveToDelayed).toHaveBeenCalledTimes(1); expect(job.moveToDelayed).toHaveBeenCalledWith(expect.any(Number), 'some-token'); } else { await expect(jobProcessor(job)).resolves.toBeUndefined(); expect(mockCustomMigration.run).toHaveBeenCalled(); expect(job.moveToDelayed).not.toHaveBeenCalled(); } expect(getPostDeployMigrationSpy).toHaveBeenCalledWith(13); }); test('Run custom migration success', async () => { const systemRepo = getSystemRepo(); const asyncJob = await systemRepo.createResource<AsyncJob>({ resourceType: 'AsyncJob', status: 'accepted', dataVersion: 123, requestTime: new Date().toISOString(), request: '/admin/super/migrate', }); const mockCallback = jest.fn().mockImplementation(async (_client, results) => { results.push({ name: 'testAction', durationMs: 100 }); }); const jobData: CustomPostDeployMigrationJobData = { type: 'custom', asyncJobId: asyncJob.id, requestId: '123', traceId: '456', }; const result = await runCustomMigration(systemRepo, undefined, jobData, mockCallback); expect(result).toBe('finished'); expect(mockCallback).toHaveBeenCalledWith( expect.objectContaining({ query: expect.any(Function), release: expect.any(Function) }), expect.any(Array), undefined, jobData ); const updatedJob = await systemRepo.readResource<AsyncJob>('AsyncJob', asyncJob.id); expect(updatedJob.status).toBe('completed'); expect(updatedJob.output).toMatchObject<Partial<Parameters>>({ resourceType: 'Parameters', parameter: [ { name: 'testAction', part: [{ name: 'durationMs', valueInteger: 100 }], }, ], }); }); test('Run custom migration with error', async () => { const systemRepo = getSystemRepo(); const asyncJob = await systemRepo.createResource<AsyncJob>({ resourceType: 'AsyncJob', status: 'accepted', dataVersion: 123, requestTime: new Date().toISOString(), request: '/admin/super/migrate', }); const jobData: CustomPostDeployMigrationJobData = { type: 'custom', asyncJobId: asyncJob.id, requestId: '123', traceId: '456', }; const result = await runCustomMigration(systemRepo, undefined, jobData, async (_client, results) => { results.push({ name: 'first action', durationMs: 100 }); throw new Error('Some random error'); }); expect(result).toBe('finished'); const updatedJob = await systemRepo.readResource<AsyncJob>('AsyncJob', asyncJob.id); expect(updatedJob.status).toBe('error'); expect(updatedJob.output).toStrictEqual({ resourceType: 'Parameters', parameter: [ { name: 'first action', part: [{ name: 'durationMs', valueInteger: 100 }], }, { name: 'error', valueString: 'Some random error', }, { name: 'stack', valueString: expect.stringContaining('Error: Some random error'), }, ], }); }); });

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/medplum/medplum'

If you have feedback or need assistance with the MCP directory API, please join our Discord server