// SPDX-FileCopyrightText: Copyright Orangebot, Inc. and Medplum contributors
// SPDX-License-Identifier: Apache-2.0
import type { WithId } from '@medplum/core';
import {
ContentType,
createReference,
getReferenceString,
isOk,
OperationOutcomeError,
serverError,
} from '@medplum/core';
import type { FhirRequest } from '@medplum/fhir-router';
import { FhirRouter } from '@medplum/fhir-router';
import type { AsyncJob, Bundle } from '@medplum/fhirtypes';
import type { Job, QueueBaseOptions } from 'bullmq';
import { Queue, Worker } from 'bullmq';
import { getUserConfiguration } from '../auth/me';
import { getAuthenticatedContext, runInAsyncContext } from '../context';
import { getRepoForLogin } from '../fhir/accesspolicy';
import { uploadBinaryData } from '../fhir/binary';
import { AsyncJobExecutor } from '../fhir/operations/utils/asyncjobexecutor';
import { getSystemRepo } from '../fhir/repo';
import { getLogger } from '../logger';
import type { AuthState } from '../oauth/middleware';
import type { WorkerInitializer } from './utils';
import { queueRegistry } from './utils';
/*
* The batch worker runs a batch asynchronously,
* decoupled from an individual HTTP request.
*/
export interface BatchJobData {
readonly asyncJob: WithId<AsyncJob>;
readonly bundle: Bundle;
readonly authState: Readonly<AuthState>;
readonly requestId?: string;
readonly traceId?: string;
}
const queueName = 'BatchQueue';
const jobName = 'BatchJobData';
export const initBatchWorker: WorkerInitializer = (config) => {
const defaultOptions: QueueBaseOptions = {
connection: config.redis,
};
const queue = new Queue<BatchJobData>(queueName, {
...defaultOptions,
defaultJobOptions: { attempts: 1 },
});
const worker = new Worker<BatchJobData>(
queueName,
(job) => {
const { authState, requestId, traceId } = job.data;
return runInAsyncContext(authState, requestId, traceId, () => execBatchJob(job));
},
{
...defaultOptions,
...config.bullmq,
}
);
return { queue, worker, name: queueName };
};
/**
* Returns the batch queue instance.
* This is used by the unit tests.
* @returns The batch queue (if available).
*/
export function getBatchQueue(): Queue<BatchJobData> | undefined {
return queueRegistry.get(queueName);
}
/**
* Adds a batch job to the queue.
* @param job - The batch job details.
* @returns The enqueued job.
*/
async function addBatchJobData(job: BatchJobData): Promise<Job<BatchJobData>> {
const queue = queueRegistry.get<BatchJobData>(queueName);
if (!queue) {
throw new Error(`Job queue ${queueName} not available`);
}
return queue.add(jobName, job);
}
export async function queueBatchProcessing(bundle: Bundle, asyncJob: WithId<AsyncJob>): Promise<Job<BatchJobData>> {
const { authentication: authState, requestId, traceId } = getAuthenticatedContext();
return addBatchJobData({ bundle, asyncJob, authState, requestId, traceId });
}
/**
* Executes a batch job.
* @param job - The batch job details.
*/
export async function execBatchJob(job: Job<BatchJobData>): Promise<void> {
const bundle = job.data.bundle;
const { login, project, membership } = job.data.authState;
const logger = getLogger();
// Prepare the original submitting user's repo
const userConfig = await getUserConfiguration(getSystemRepo(), project, membership);
const repo = await getRepoForLogin({ login, project, membership, userConfig }, true);
const router = new FhirRouter();
const req: FhirRequest = {
method: 'POST',
url: '/',
pathname: '',
params: Object.create(null),
query: Object.create(null),
body: bundle,
};
const systemRepo = getSystemRepo();
const exec = new AsyncJobExecutor(systemRepo, job.data.asyncJob);
// Intentionally swallow all errors thrown during or after execution of the batch request, since we do NOT want to
// execute part or all of the batch more than once.
// If this function does not throw an error, the job will be considered "successful" and not requeued
try {
const [outcome, result] = await router.handleRequest(req, repo);
// Update the async job with system repo
if (isOk(outcome)) {
// Upload resulting Bundle JSON as Binary for async retrieval
const binary = await uploadBinaryData(repo, JSON.stringify(result), { contentType: ContentType.FHIR_JSON });
const bundle = result as Bundle;
if (!bundle.entry) {
return;
}
let errors = 0;
for (const entry of bundle.entry) {
if (!entry.response?.outcome || !isOk(entry.response.outcome)) {
errors++;
}
}
logger.info('Completed async batch request', {
jobId: job.id,
asyncJob: job.data.asyncJob.id,
results: getReferenceString(binary),
entries: bundle.entry.length,
errors,
});
await exec.completeJob(systemRepo, {
resourceType: 'Parameters',
parameter: [{ name: 'results', valueReference: createReference(binary) }],
});
} else {
logger.warn('Async batch request failed', {
jobId: job.id,
asyncJob: job.data.asyncJob.id,
outcome,
});
await exec.failJob(systemRepo, new OperationOutcomeError(outcome));
}
} catch (err: any) {
logger.error(`Async batch unhandled exception`, err);
// Try to mark AsyncJob as failed, best effort
await exec.failJob(systemRepo, new OperationOutcomeError(serverError(err))).catch(() => {});
}
}