Skip to main content
Glama
reindex.test.ts30 kB
// SPDX-FileCopyrightText: Copyright Orangebot, Inc. and Medplum contributors // SPDX-License-Identifier: Apache-2.0 import { createReference, LogLevel, OperationOutcomeError, parseSearchRequest, preconditionFailed, } from '@medplum/core'; import type { AsyncJob, ImmunizationEvaluation, Parameters, Patient, Practitioner, Project, ResourceType, User, } from '@medplum/fhirtypes'; import { DelayedError, Job } from 'bullmq'; import { randomUUID } from 'crypto'; import { initAppServices, shutdownApp } from '../app'; import { loadTestConfig } from '../config/loader'; import { DatabaseMode, getDatabasePool } from '../database'; import { getSystemRepo, Repository } from '../fhir/repo'; import { SelectQuery } from '../fhir/sql'; import { globalLogger, systemLogger } from '../logger'; import { createTestProject, withTestContext } from '../test.setup'; import type { ReindexJobData } from './reindex'; import { addReindexJob, getReindexQueue, jobProcessor, prepareReindexJobData, REINDEX_WORKER_VERSION, ReindexJob, } from './reindex'; import { queueRegistry } from './utils'; describe('Reindex Worker', () => { let repo: Repository; beforeAll(async () => { const config = await loadTestConfig(); await initAppServices(config); repo = (await createTestProject({ withRepo: true })).repo; }); afterEach(() => { jest.clearAllMocks(); }); afterAll(async () => { await shutdownApp(); }); test('Reindex resource type with empty page', () => withTestContext(async () => { const queue = getReindexQueue() as any; queue.add.mockClear(); let asyncJob = await repo.createResource<AsyncJob>({ resourceType: 'AsyncJob', status: 'accepted', requestTime: new Date().toISOString(), request: '/admin/super/reindex', }); await addReindexJob(['MedicinalProductManufactured'], asyncJob); expect(queue.add).toHaveBeenCalledWith( 'ReindexJobData', expect.objectContaining<Partial<ReindexJobData>>({ resourceTypes: ['MedicinalProductManufactured'], asyncJobId: asyncJob.id, }) ); const jobData = queue.add.mock.calls[0][1] as ReindexJobData; const reindexJob = new ReindexJob(); const processIterationSpy = jest.spyOn(reindexJob, 'processIteration'); await reindexJob.execute(undefined, jobData); asyncJob = await repo.readResource('AsyncJob', asyncJob.id); expect(asyncJob.status).toStrictEqual('completed'); expect(asyncJob.output).toMatchObject<Partial<Parameters>>({ parameter: expect.arrayContaining([ { name: 'result', part: expect.arrayContaining([ { name: 'count', valueInteger: 0 }, { name: 'resourceType', valueCode: 'MedicinalProductManufactured' }, ]), }, ]), }); // Just one iteration since it's an empty resource type expect(processIterationSpy).toHaveBeenCalledTimes(1); })); const idSystem = 'http://example.com/mrn'; test('prepare job', async () => { // without request context const jobData1 = prepareReindexJobData(['Patient'], 'asyncJobId1'); expect(jobData1).toMatchObject<Partial<ReindexJobData>>({ resourceTypes: ['Patient'], asyncJobId: 'asyncJobId1', requestId: undefined, traceId: undefined, }); // with request context await withTestContext(() => { const jobData2 = prepareReindexJobData(['Patient'], 'asyncJobId1'); expect(jobData2).toMatchObject<Partial<ReindexJobData>>({ resourceTypes: ['Patient'], asyncJobId: 'asyncJobId1', requestId: expect.any(String), traceId: expect.any(String), }); }); }); test('Multiple iterations when more than one batchSize exist', () => withTestContext(async () => { let asyncJob = await repo.createResource<AsyncJob>({ resourceType: 'AsyncJob', status: 'accepted', requestTime: new Date().toISOString(), request: '/admin/super/reindex', }); const mrn = randomUUID(); const batchSize = 20; // this is based on the minimum _count that allows canUseCursorLinks to be true // Create one more resource than the iteration count so two iterations are expected for (let i = 0; i < batchSize + 1; i++) { await repo.createResource<ImmunizationEvaluation>({ resourceType: 'ImmunizationEvaluation', identifier: [{ system: idSystem, value: mrn }], status: 'completed', patient: { reference: 'Patient/123' }, targetDisease: { text: '1234567890' }, immunizationEvent: { reference: 'Immunization/123' }, doseStatus: { text: '1234567890' }, }); } const jobData = prepareReindexJobData( ['ImmunizationEvaluation'], asyncJob.id, parseSearchRequest(`ImmunizationEvaluation?identifier=${idSystem}|${mrn}`) ); const systemRepo = getSystemRepo(); const reindexJob = new ReindexJob(systemRepo, batchSize); jest.spyOn(reindexJob, 'processIteration'); await reindexJob.execute(undefined, jobData); asyncJob = await repo.readResource('AsyncJob', asyncJob.id); expect(asyncJob.status).toStrictEqual('completed'); expect(asyncJob.output).toMatchObject<Partial<Parameters>>({ parameter: expect.arrayContaining([ { name: 'result', part: expect.arrayContaining([ { name: 'resourceType', valueCode: 'ImmunizationEvaluation' }, { name: 'count', valueInteger: batchSize + 1 }, ]), }, ]), }); expect(reindexJob.processIteration).toHaveBeenCalledTimes(2); expect(reindexJob.processIteration).toHaveBeenCalledWith( systemRepo, expect.objectContaining<Partial<ReindexJobData>>({ resourceTypes: ['ImmunizationEvaluation'], cursor: expect.stringContaining('-'), }) ); })); test('Proceeds to next resource type after exhausting initial one', () => withTestContext(async () => { let asyncJob = await repo.createResource<AsyncJob>({ resourceType: 'AsyncJob', status: 'accepted', requestTime: new Date().toISOString(), request: '/admin/super/reindex', }); const resourceTypes = ['PaymentNotice', 'MedicinalProductManufactured'] as ResourceType[]; const jobData = prepareReindexJobData(resourceTypes, asyncJob.id); const systemRepo = getSystemRepo(); const reindexJob = new ReindexJob(systemRepo); jest.spyOn(reindexJob, 'processIteration'); await reindexJob.execute(undefined, jobData); expect(reindexJob.processIteration).toHaveBeenCalledTimes(2); expect(reindexJob.processIteration).toHaveBeenCalledWith( systemRepo, expect.objectContaining<Partial<ReindexJobData>>({ resourceTypes: ['MedicinalProductManufactured'], count: 0, }) ); asyncJob = await repo.readResource('AsyncJob', asyncJob.id); expect(asyncJob.status).toStrictEqual('completed'); // Expect 4 history entries (newest first): // completed (same output) // MedicinalProductManufactured finished // PaymentNotice finished // creation (no output) const history = await repo.readHistory<AsyncJob>('AsyncJob', asyncJob.id); const outputs = history.entry?.map((entry) => entry.resource?.output?.parameter) ?? []; expect(outputs).toHaveLength(4); expect(outputs[0]).toStrictEqual(outputs[1]); expect(outputs[1]).toEqual([ { name: 'result', part: expect.arrayContaining([{ name: 'resourceType', valueCode: 'PaymentNotice' }]), }, { name: 'result', part: expect.arrayContaining([{ name: 'resourceType', valueCode: 'MedicinalProductManufactured' }]), }, ]); expect(outputs[2]).toEqual([ { name: 'result', part: expect.arrayContaining([{ name: 'resourceType', valueCode: 'PaymentNotice' }]), }, ]); expect(outputs[3]).toBeUndefined(); })); test('Fails job on error', () => withTestContext(async () => { let asyncJob = await repo.createResource<AsyncJob>({ resourceType: 'AsyncJob', status: 'accepted', requestTime: new Date().toISOString(), request: '/admin/super/reindex', }); const jobData = prepareReindexJobData(['ValueSet'], asyncJob.id); const systemRepo = getSystemRepo(); const reindexJob = new ReindexJob(systemRepo); jest.spyOn(systemRepo, 'search').mockRejectedValueOnce(new Error('Failed to search systemRepo')); const originalLevel = systemLogger.level; systemLogger.level = LogLevel.NONE; await expect(reindexJob.execute(undefined, jobData)).resolves.toBe('finished'); systemLogger.level = originalLevel; asyncJob = await repo.readResource('AsyncJob', asyncJob.id); expect(asyncJob.status).toStrictEqual('error'); expect(asyncJob.output?.parameter).toEqual([ { name: 'result', part: expect.arrayContaining([ { name: 'resourceType', valueCode: 'ValueSet', }, { name: 'error', valueString: 'Failed to search systemRepo', }, { name: 'stack', valueString: expect.stringContaining('Failed to search systemRepo') }, { name: 'errSearchRequest', valueString: expect.stringContaining('"resourceType":"ValueSet"') }, ]), }, ]); })); test('Continues when one resource type fails and reports error', () => withTestContext(async () => { let asyncJob = await repo.createResource<AsyncJob>({ resourceType: 'AsyncJob', status: 'accepted', requestTime: new Date().toISOString(), request: '/admin/super/reindex', }); const resourceTypes: ResourceType[] = [ 'MedicinalProductManufactured', 'Binary', 'MedicinalProductContraindication', ]; const jobData = prepareReindexJobData(resourceTypes, asyncJob.id); await expect(new ReindexJob().execute(undefined, jobData)).resolves.toBe('finished'); asyncJob = await repo.readResource('AsyncJob', asyncJob.id); expect(asyncJob.status).toStrictEqual('error'); expect(asyncJob.output?.parameter).toEqual<Parameters['parameter']>([ { name: 'result', part: expect.arrayContaining([ { name: 'resourceType', valueCode: 'MedicinalProductManufactured' }, expect.objectContaining({ name: 'count' }), expect.objectContaining({ name: 'elapsedTime' }), ]), }, { name: 'result', part: expect.arrayContaining([ { name: 'resourceType', valueCode: 'Binary' }, { name: 'error', valueString: 'Cannot search on Binary resource type' }, { name: 'nextTimestamp', valueDateTime: '1970-01-01T00:00:00.000Z' }, ]), }, { name: 'result', part: expect.arrayContaining([ { name: 'resourceType', valueCode: 'MedicinalProductContraindication' }, expect.objectContaining({ name: 'count' }), expect.objectContaining({ name: 'elapsedTime' }), ]), }, ]); })); test('Reindex with search filter', () => withTestContext(async () => { const idSystem = 'http://example.com/mrn'; const mrn = randomUUID(); let asyncJob = await repo.createResource<AsyncJob>({ resourceType: 'AsyncJob', status: 'accepted', requestTime: new Date().toISOString(), request: '/admin/super/reindex', }); await repo.createResource<Patient>({ resourceType: 'Patient', gender: 'unknown', identifier: [{ system: idSystem, value: mrn }], }); await repo.createResource<Patient>({ resourceType: 'Patient', gender: 'female', // is not 'unknown', so it won't be reindexed identifier: [{ system: idSystem, value: mrn }], }); await repo.createResource<Practitioner>({ resourceType: 'Practitioner', gender: 'unknown', identifier: [{ system: idSystem, value: mrn }], }); const resourceTypes = ['Patient', 'Practitioner'] as ResourceType[]; const searchFilter = parseSearchRequest(`Person?identifier=${idSystem}|${mrn}&gender=unknown`); const jobData = prepareReindexJobData(resourceTypes, asyncJob.id, searchFilter); await new ReindexJob().execute(undefined, jobData); asyncJob = await repo.readResource('AsyncJob', asyncJob.id); expect(asyncJob.status).toStrictEqual('completed'); expect(asyncJob.output?.parameter).toEqual([ { name: 'result', part: expect.arrayContaining([ expect.objectContaining({ name: 'resourceType', valueCode: 'Patient' }), expect.objectContaining({ name: 'count', valueInteger: 1 }), ]), }, { name: 'result', part: expect.arrayContaining([ expect.objectContaining({ name: 'resourceType', valueCode: 'Practitioner' }), expect.objectContaining({ name: 'count', valueInteger: 1 }), ]), }, ]); })); test('Reindex with _lastUpdated filter', () => withTestContext(async () => { const idSystem = 'http://example.com/mrn'; const mrn = randomUUID(); const systemRepo = getSystemRepo(); let asyncJob = await systemRepo.createResource<AsyncJob>({ resourceType: 'AsyncJob', status: 'accepted', requestTime: new Date().toISOString(), request: '/admin/super/reindex', }); // lastUpdated before the greater than or equal filter should NOT be reindexed await systemRepo.createResource<Patient>({ resourceType: 'Patient', meta: { lastUpdated: '1999-12-31T00:00:00Z' }, identifier: [{ system: idSystem, value: mrn }], }); // lastUpdated matching the greater than or equal filter should be reindexed await systemRepo.createResource<Patient>({ resourceType: 'Patient', meta: { lastUpdated: '2000-01-01T00:00:00Z' }, identifier: [{ system: idSystem, value: mrn }], }); // lastUpdated matching the less than filter should NOT be reindexed await systemRepo.createResource<Patient>({ resourceType: 'Patient', meta: { lastUpdated: '2001-01-01T00:00:00Z' }, identifier: [{ system: idSystem, value: mrn }], }); const resourceTypes = ['Patient'] as ResourceType[]; const searchFilter = parseSearchRequest( `Patient?identifier=${idSystem}|${mrn}&_lastUpdated=ge2000-01-01T00:00:00Z&_lastUpdated=lt2001-01-01T00:00:00Z` ); const jobData = prepareReindexJobData(resourceTypes, asyncJob.id, searchFilter); await new ReindexJob().execute(undefined, jobData); asyncJob = await systemRepo.readResource('AsyncJob', asyncJob.id); expect(asyncJob.status).toStrictEqual('completed'); expect(asyncJob.output).toMatchObject<Parameters>({ resourceType: 'Parameters', parameter: [ { name: 'result', part: expect.arrayContaining([ expect.objectContaining({ name: 'resourceType', valueCode: 'Patient' }), expect.objectContaining({ name: 'count', valueInteger: 1 }), ]), }, ], }); })); const CURRENT_VERSION = Repository.VERSION; const OLDER_VERSION = CURRENT_VERSION - 1; test.each([ [OLDER_VERSION, 1], [CURRENT_VERSION, 2], [undefined, 2], ])('Reindex with maxResourceVersion %s', (maxResourceVersion, expectedCount) => withTestContext(async () => { const systemRepo = getSystemRepo(); let asyncJob = await systemRepo.createResource<AsyncJob>({ resourceType: 'AsyncJob', status: 'accepted', requestTime: new Date().toISOString(), request: '/admin/super/reindex', }); const idSystem = 'http://example.com/mrn'; const mrn = randomUUID(); const outdatedPatient = await systemRepo.createResource<Patient>({ resourceType: 'Patient', identifier: [{ system: idSystem, value: mrn }], }); const currentPatient = await systemRepo.createResource<Patient>({ resourceType: 'Patient', identifier: [{ system: idSystem, value: mrn }], }); const client = repo.getDatabaseClient(DatabaseMode.WRITER); const getVersionQuery = (id: string[]): SelectQuery => new SelectQuery('Patient').column('id').column('__version').where('id', 'IN', id); await client.query('UPDATE "Patient" SET __version = $1 WHERE id = $2', [OLDER_VERSION, outdatedPatient.id]); const beforeResults = await getVersionQuery([outdatedPatient.id, currentPatient.id]).execute(client); expect(beforeResults).toHaveLength(2); expect(beforeResults).toEqual( expect.arrayContaining([ { id: outdatedPatient.id, __version: OLDER_VERSION }, { id: currentPatient.id, __version: Repository.VERSION }, ]) ); const jobData = prepareReindexJobData( ['Patient'], asyncJob.id, parseSearchRequest(`Patient?identifier=${idSystem}|${mrn}`), maxResourceVersion ); await new ReindexJob().execute(undefined, jobData); const afterResults = await getVersionQuery([outdatedPatient.id, currentPatient.id]).execute(client); expect(afterResults).toHaveLength(2); expect(afterResults).toEqual( expect.arrayContaining([ { id: outdatedPatient.id, __version: CURRENT_VERSION }, { id: currentPatient.id, __version: CURRENT_VERSION }, ]) ); asyncJob = await systemRepo.readResource('AsyncJob', asyncJob.id); expect(asyncJob.status).toStrictEqual('completed'); expect(asyncJob.output).toMatchObject<Parameters>({ resourceType: 'Parameters', parameter: [ { name: 'result', part: expect.arrayContaining([ expect.objectContaining({ name: 'resourceType', valueCode: 'Patient' }), expect.objectContaining({ name: 'count', valueInteger: expectedCount }), ]), }, ], }); }) ); test('Populates User.projectId column in database', () => withTestContext(async () => { const queue = getReindexQueue() as any; queue.add.mockClear(); const idSystem = 'http://example.com/mrn'; const mrn = randomUUID(); let asyncJob = await repo.createResource<AsyncJob>({ resourceType: 'AsyncJob', status: 'accepted', requestTime: new Date().toISOString(), request: '/admin/super/reindex', }); const systemRepo = getSystemRepo(); const project = repo.currentProject() as Project; let user = await systemRepo.createResource<User>({ resourceType: 'User', identifier: [{ system: idSystem, value: mrn }], firstName: 'Project', lastName: 'User', project: createReference(project), }); const resourceTypes = ['User'] as ResourceType[]; const searchFilter = parseSearchRequest(`User?identifier=${idSystem}|${mrn}`); const jobData = prepareReindexJobData(resourceTypes, asyncJob.id, searchFilter); await new ReindexJob().execute(undefined, jobData); asyncJob = await systemRepo.readResource('AsyncJob', asyncJob.id); expect(asyncJob.status).toStrictEqual('completed'); expect(asyncJob.output).toMatchObject<Parameters>({ resourceType: 'Parameters', parameter: [ { name: 'result', part: expect.arrayContaining([ expect.objectContaining({ name: 'resourceType', valueCode: 'User' }), expect.objectContaining({ name: 'count', valueInteger: 1 }), ]), }, ], }); user = await systemRepo.readResource<User>('User', user.id); expect(user.meta?.project).toBeUndefined(); const rows = await new SelectQuery('User') .column('projectId') .where('id', '=', user.id) .execute(getDatabasePool(DatabaseMode.READER)); expect(rows[0].projectId).toStrictEqual(project.id); })); }); describe('Job cancellation', () => { let repo: Repository; const systemRepo = getSystemRepo(); beforeAll(async () => { const config = await loadTestConfig(); await initAppServices(config); repo = (await createTestProject({ withRepo: true })).repo; }); afterAll(async () => { await shutdownApp(); }); test('Detect cancelled AsyncJob when iteration begins', () => withTestContext(async () => { let asyncJob = await repo.createResource<AsyncJob>({ resourceType: 'AsyncJob', status: 'cancelled', requestTime: new Date().toISOString(), request: '/admin/super/reindex', }); const jobData = prepareReindexJobData(['MedicinalProductContraindication'], asyncJob.id); const result = await new ReindexJob().execute(undefined, jobData); expect(result).toStrictEqual('interrupted'); asyncJob = await repo.readResource('AsyncJob', asyncJob.id); expect(asyncJob.status).toStrictEqual('cancelled'); expect(asyncJob.output).toBeUndefined(); })); test('Ensure job reads up-to-date cancellation status from DB', () => withTestContext(async () => { const originalJob = await repo.createResource<AsyncJob>({ resourceType: 'AsyncJob', status: 'accepted', requestTime: new Date().toISOString(), request: '/admin/super/reindex', }); const cancelledJob = await repo.updateResource<AsyncJob>({ ...originalJob, status: 'cancelled', }); // Job will start up with the uncancelled version of the resource const jobData = prepareReindexJobData(['MedicinalProductContraindication'], originalJob.id); // Should be a no-op due to cancellation const result = await new ReindexJob().execute(undefined, jobData); expect(result).toStrictEqual('interrupted'); const finalJob = await repo.readResource<AsyncJob>('AsyncJob', cancelledJob.id); expect(finalJob.status).toStrictEqual('cancelled'); expect(finalJob.output).toBeUndefined(); })); test.each(['some-token', undefined])('Job handles queue closing with job.token %s', async (jobToken) => withTestContext(async () => { const originalJob = await repo.createResource<AsyncJob>({ resourceType: 'AsyncJob', status: 'accepted', requestTime: new Date().toISOString(), request: '/admin/super/reindex', }); const queueName = 'ReindexQueue'; const queue = queueRegistry.get(queueName); if (!queue) { throw new Error('Could not find queue'); } const isClosingSpy = jest.spyOn(queueRegistry, 'isClosing').mockReturnValue(true); const globalErrorSpy = jest.spyOn(globalLogger, 'error').mockImplementation(() => {}); const jobData = prepareReindexJobData(['MedicinalProductContraindication'], originalJob.id); const job = new Job(queue, 'ReindexJob', jobData, { attempts: 55 }); // job.token generally gets set deep in the internals of bullmq, but we mock the module job.token = jobToken; // DelayedError is part of the mocked bullmq module. Something about that causes // the usual `expect(...).rejects.toThrow(...)`; it seems to be becuase DelayedError // is no longer an `Error`. So instead, we manually check that it threw let threw = undefined; let manuallyThrownError = undefined; try { await new ReindexJob().execute(job, jobData); manuallyThrownError = new Error( jobToken ? 'Expected job to throw DelayedError' : 'Expected job to throw Error' ); throw manuallyThrownError; } catch (err) { threw = err; } expect(threw).toBeDefined(); expect(threw).not.toBe(manuallyThrownError); expect(threw).toBeInstanceOf(jobToken ? DelayedError : Error); expect(isClosingSpy).toHaveBeenCalledTimes(1); isClosingSpy.mockRestore(); if (jobToken) { expect(job.moveToDelayed).toHaveBeenCalledTimes(1); expect(job.moveToDelayed).toHaveBeenCalledWith(expect.any(Number), jobToken); expect(globalErrorSpy).not.toHaveBeenCalled(); } else { expect(job.moveToDelayed).not.toHaveBeenCalled(); expect(globalErrorSpy).toHaveBeenCalledTimes(1); } globalErrorSpy.mockRestore(); }) ); test.each([ [undefined, 'some-token'], // can process job [REINDEX_WORKER_VERSION, 'some-token'], // can process job [REINDEX_WORKER_VERSION + 1, 'some-token'], // cannot process job [REINDEX_WORKER_VERSION + 1, undefined], // cannot process job ])( 'Handles insufficient reindex worker version with minReindexWorkerVersion %s and job.token %s', async (minReindexWorkerVersion, jobToken) => { const originalJob = await repo.createResource<AsyncJob>({ resourceType: 'AsyncJob', status: 'accepted', requestTime: new Date().toISOString(), request: '/admin/super/reindex', }); const queueName = 'ReindexQueue'; const queue = queueRegistry.get(queueName); if (!queue) { throw new Error('Could not find queue'); } // temporarily set to {} to appease typescript since it gets set within the withTestContext callback let jobData: ReindexJobData = {} as unknown as ReindexJobData; await withTestContext(async () => { jobData = prepareReindexJobData(['MedicinalProductContraindication'], originalJob.id); }); // `as any` since it's a readonly property (jobData as any).minReindexWorkerVersion = minReindexWorkerVersion; const job = new Job(queue, 'ReindexJob', jobData, { attempts: 55 }); // Since the Job class is fully mocked, we need to set the data property manually job.data = jobData; job.token = jobToken; const isIneligible = minReindexWorkerVersion && REINDEX_WORKER_VERSION < minReindexWorkerVersion; const globalErrorSpy = jest.spyOn(globalLogger, 'error').mockImplementation(() => {}); const result = await new ReindexJob().execute(job, jobData); expect(result).toBe(isIneligible ? 'ineligible' : 'finished'); // DelayedError is part of the mocked bullmq module. Something about that causes // the usual `expect(...).rejects.toThrow(...)`; it seems to be becuase DelayedError // is no longer an `Error`. So instead, we manually check that it threw let threw = undefined; let manuallyThrownError = undefined; try { await jobProcessor(job); if (isIneligible) { manuallyThrownError = new Error( jobToken ? 'Expected job to throw DelayedError' : 'Expected job to throw Error' ); throw manuallyThrownError; } } catch (err) { threw = err; } if (isIneligible) { expect(threw).toBeDefined(); expect(threw).not.toBe(manuallyThrownError); expect(threw).toBeInstanceOf(jobToken ? DelayedError : Error); if (jobToken) { expect(job.moveToDelayed).toHaveBeenCalledTimes(1); expect(job.moveToDelayed).toHaveBeenCalledWith(expect.any(Number), jobToken); expect(globalErrorSpy).not.toHaveBeenCalled(); } else { expect(job.moveToDelayed).not.toHaveBeenCalled(); expect(globalErrorSpy).toHaveBeenCalledTimes(1); } } else { expect(threw).toBeUndefined(); expect(job.moveToDelayed).not.toHaveBeenCalled(); expect(globalErrorSpy).not.toHaveBeenCalled(); } globalErrorSpy.mockRestore(); } ); test('Ensure updates from job do not clobber cancellation status', () => withTestContext(async () => { const originalJob = await repo.createResource<AsyncJob>({ resourceType: 'AsyncJob', status: 'accepted', requestTime: new Date().toISOString(), request: '/admin/super/reindex', }); const cancelledJob = await repo.updateResource<AsyncJob>({ ...originalJob, status: 'cancelled', }); // Mock repo for the job to return error for version-conditional update const error = Promise.reject(new OperationOutcomeError(preconditionFailed)); await expect(error).rejects.toBeDefined(); // Await promise to ensure it's settled to rejection state jest.spyOn(systemRepo, 'updateResource').mockReturnValueOnce(error); // Simulate job being cancelled in the middle of the worker execution, after the initial status check // but before the job would update the resource itself jest .spyOn(systemRepo, 'readResource') .mockReturnValueOnce(Promise.resolve(originalJob)) .mockReturnValueOnce(Promise.resolve(cancelledJob)); // Job will start up with the uncancelled version of the resource const jobData = prepareReindexJobData(['MedicinalProductContraindication'], originalJob.id); // Should not override the cancellation status const result = await new ReindexJob().execute(undefined, jobData); expect(result).toStrictEqual('interrupted'); const finalJob = await repo.readResource<AsyncJob>('AsyncJob', originalJob.id); expect(finalJob.status).toStrictEqual('cancelled'); expect(finalJob.output).toBeUndefined(); })); });

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/medplum/medplum'

If you have feedback or need assistance with the MCP directory API, please join our Discord server