Skip to main content
Glama
asyncjobexecutor.ts6.51 kB
// SPDX-FileCopyrightText: Copyright Orangebot, Inc. and Medplum contributors // SPDX-License-Identifier: Apache-2.0 import type { WithId } from '@medplum/core'; import { OperationOutcomeError, accepted } from '@medplum/core'; import type { AsyncJob, Parameters } from '@medplum/fhirtypes'; import { DelayedError } from 'bullmq'; import type { Request, Response } from 'express'; import { AsyncLocalStorage } from 'node:async_hooks'; import { getConfig } from '../../../config/loader'; import { getAuthenticatedContext } from '../../../context'; import { DatabaseMode, getDatabasePool } from '../../../database'; import { getLogger } from '../../../logger'; import { markPostDeployMigrationCompleted } from '../../../migration-sql'; import { maybeAutoRunPendingPostDeployMigration } from '../../../migrations/migration-utils'; import { sendOutcome } from '../../outcomes'; import type { Repository } from '../../repo'; import { getSystemRepo } from '../../repo'; export class AsyncJobExecutor { readonly repo: Repository; private resource: WithId<AsyncJob> | undefined; constructor(repo: Repository, resource?: WithId<AsyncJob>) { this.repo = repo.clone(); this.resource = resource; } async init(url: string): Promise<WithId<AsyncJob>> { this.resource ??= await this.repo.createResource<AsyncJob>({ resourceType: 'AsyncJob', status: 'accepted', request: url, requestTime: new Date().toISOString(), }); return this.resource; } /** * Begins execution of the async job and coordinates resource updates and logging throughout the job lifecycle. * @param callback - The callback to execute. */ start(callback: (job: AsyncJob) => Promise<any>): void { // Rely on `startAsync` for error handling/logging this.startAsync(callback).catch(() => {}); } /** * Executes the async job and coordinates resource updates and logging throughout the job lifecycle. * @param callback - The callback to execute. * @returns A promise that resolves when the job is completed or fails. */ async startAsync(callback: (job: AsyncJob) => Promise<any>): Promise<AsyncJob | undefined> { const log = getLogger(); if (!this.resource) { throw new Error('AsyncJob missing'); } if (this.resource.status !== 'accepted') { throw new Error('Job already completed'); } const startTime = Date.now(); const systemRepo = getSystemRepo(); log.info('AsyncJob execution starting', { name: callback.name, asyncJobId: this.resource?.id }); return this.run(callback) .then(async (output) => { log.info('AsyncJob execution completed', { name: callback.name, asyncJobId: this.resource?.id, duration: `${Date.now() - startTime} ms`, }); return this.completeJob(systemRepo, output); }) .catch(async (err) => { log.error('AsyncJob execution failed', { name: callback.name, asyncJobId: this.resource?.id, error: err instanceof Error ? err.message : err.toString(), stack: err instanceof Error ? err.stack : undefined, }); return this.failJob(systemRepo, err); }) .finally(() => { this.repo[Symbol.dispose](); }); } /** * Conditionally runs the job callback if the AsyncJob resource is in the correct state. * @param callback - The callback to execute. * @returns (optional) Output encoded as a Parameters resource. */ async run( callback: ((job: WithId<AsyncJob>) => Promise<Parameters>) | ((job: WithId<AsyncJob>) => Promise<void>) ): Promise<Parameters | undefined> { callback = AsyncLocalStorage.bind(callback); if (!this.resource) { throw new Error('AsyncJob missing'); } if (this.resource.status !== 'accepted') { throw new Error('Job already completed'); } const output = await callback(this.resource); return output ?? undefined; } async completeJob(repo: Repository, output?: Parameters): Promise<AsyncJob> { if (!this.resource) { throw new Error('Cannot completeJob since AsyncJob is not specified'); } let updatedJob: AsyncJob = { ...this.resource, status: 'completed', transactionTime: new Date().toISOString(), output, }; if (updatedJob.type === 'data-migration' && updatedJob.dataVersion) { await markPostDeployMigrationCompleted(getDatabasePool(DatabaseMode.WRITER), updatedJob.dataVersion); updatedJob = await repo.updateResource<AsyncJob>(updatedJob); await maybeAutoRunPendingPostDeployMigration(); return updatedJob; } else { return repo.updateResource<AsyncJob>(updatedJob); } } async failJob(repo: Repository, err?: Error, output?: Parameters): Promise<AsyncJob> { if (!this.resource) { throw new Error('Cannot failJob since AsyncJob is not specified'); } // A job throwing `DelayedError` means the job has been delayed/re-queued, // so the job should not fail. Instead re-throw the error for BullMQ // to handle. if (err instanceof DelayedError) { throw err; } const failedJob: AsyncJob = { ...this.resource, status: 'error', transactionTime: new Date().toISOString(), output: output ?? this.resource.output, }; if (err) { failedJob.output ??= { resourceType: 'Parameters', }; failedJob.output.parameter ??= []; if (err instanceof OperationOutcomeError) { failedJob.output.parameter.push({ name: 'outcome', resource: err.outcome }); } else { failedJob.output.parameter.push( { name: 'error', valueString: err.message }, { name: 'stack', valueString: err.stack } ); } } return repo.updateResource<AsyncJob>(failedJob); } getContentLocation(baseUrl: string): string { if (!this.resource) { throw new Error('AsyncJob missing'); } return `${baseUrl}fhir/R4/job/${this.resource.id}/status`; } } export async function sendAsyncResponse( req: Request, res: Response, callback: ((job: AsyncJob) => Promise<Parameters>) | ((job: AsyncJob) => Promise<void>) ): Promise<void> { const ctx = getAuthenticatedContext(); const { baseUrl } = getConfig(); const exec = new AsyncJobExecutor(ctx.repo); await exec.init(req.protocol + '://' + req.get('host') + req.originalUrl); exec.start(callback); sendOutcome(res, accepted(exec.getContentLocation(baseUrl))); }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/medplum/medplum'

If you have feedback or need assistance with the MCP directory API, please join our Discord server