Skip to main content
Glama
post-deploy-migration.ts9.71 kB
// SPDX-FileCopyrightText: Copyright Orangebot, Inc. and Medplum contributors // SPDX-License-Identifier: Apache-2.0 import type { WithId } from '@medplum/core'; import { capitalize, getReferenceString, normalizeErrorString, PropertyType, toTypedValue } from '@medplum/core'; import type { AsyncJob, Parameters, ParametersParameter } from '@medplum/fhirtypes'; import type { Job, JobsOptions, QueueBaseOptions } from 'bullmq'; import { Queue, Worker } from 'bullmq'; import type { PoolClient } from 'pg'; import * as semver from 'semver'; import { tryGetRequestContext, tryRunInRequestContext } from '../context'; import { AsyncJobExecutor } from '../fhir/operations/utils/asyncjobexecutor'; import type { Repository } from '../fhir/repo'; import { getSystemRepo } from '../fhir/repo'; import { globalLogger } from '../logger'; import type { CustomPostDeployMigrationJobData, DynamicPostDeployJobData, PostDeployJobData, PostDeployJobRunResult, PostDeployMigration, } from '../migrations/data/types'; import { executeMigrationActions } from '../migrations/migrate'; import { enforceStrictMigrationVersionChecks, getPostDeployManifestEntry, getPostDeployMigration, MigrationDefinitionNotFoundError, withLongRunningDatabaseClient, } from '../migrations/migration-utils'; import type { MigrationAction, MigrationActionResult } from '../migrations/types'; import { getRegisteredServers } from '../server-registry'; import type { WorkerInitializer } from './utils'; import { addVerboseQueueLogging, isJobActive, isJobCompatible, moveToDelayedAndThrow, queueRegistry } from './utils'; export const PostDeployMigrationQueueName = 'PostDeployMigrationQueue'; function getJobDataLoggingFields(job: Job<PostDeployJobData>): Record<string, string> { return { asyncJob: 'AsyncJob/' + job.data.asyncJobId, jobType: job.data.type, }; } export const initPostDeployMigrationWorker: WorkerInitializer = (config) => { const defaultOptions: QueueBaseOptions = { connection: config.redis, }; const queue = new Queue<PostDeployJobData>(PostDeployMigrationQueueName, { ...defaultOptions, defaultJobOptions: { attempts: 1, // No retries }, }); const worker = new Worker<PostDeployJobData>( PostDeployMigrationQueueName, async (job) => tryRunInRequestContext(job.data.requestId, job.data.traceId, async () => jobProcessor(job)), { ...config.bullmq, ...defaultOptions, } ); addVerboseQueueLogging<PostDeployJobData>(queue, worker, getJobDataLoggingFields); return { queue, worker, name: PostDeployMigrationQueueName }; }; export async function isClusterCompatible(migrationNumber: number): Promise<boolean> { if (!enforceStrictMigrationVersionChecks()) { return true; } const servers = await getRegisteredServers(true); const entry = getPostDeployManifestEntry(migrationNumber); const requiredVersion = entry.serverVersion; return servers.every((server) => semver.gte(server.version, requiredVersion)); } export async function jobProcessor(job: Job<PostDeployJobData>): Promise<void> { const asyncJob = await getSystemRepo().readResource<AsyncJob>('AsyncJob', job.data.asyncJobId); if (!isJobCompatible(asyncJob)) { await moveToDelayedAndThrow(job, 'Post-deploy migration delayed since this worker is not compatible'); } if (!isJobActive(asyncJob)) { globalLogger.info(`${PostDeployMigrationQueueName} processor skipping job since AsyncJob is not active`, { jobId: job.id, ...getJobDataLoggingFields(job), asyncJobStatus: asyncJob.status, }); return; } if (job.data.type === 'dynamic') { await runDynamicMigration(getSystemRepo(), job as Job<DynamicPostDeployJobData>); return; } const migrationNumber = asyncJob.dataVersion; if (!migrationNumber) { throw new Error(`Post-deploy migration number (AsyncJob.dataVersion) not found in ${getReferenceString(asyncJob)}`); } let migration: PostDeployMigration; try { migration = getPostDeployMigration(migrationNumber); } catch (err: any) { if (err instanceof MigrationDefinitionNotFoundError) { await moveToDelayedAndThrow( job, 'Post-deploy migration delayed since migration definition was not found on this worker' ); } throw err; } if (migration.type !== job.data.type) { throw new Error(`Post-deploy migration ${migrationNumber} is not a ${job.data.type} migration`); } if (!(await isClusterCompatible(migrationNumber))) { await moveToDelayedAndThrow( job, `Post-deploy migration v${migrationNumber} delayed since the server cluster is not compatible` ); } const result: PostDeployJobRunResult = await migration.run(getSystemRepo(), job, job.data); switch (result) { case 'ineligible': { await moveToDelayedAndThrow(job, 'Post-deploy migration delayed since worker is not eligible to execute it'); break; } case 'finished': case 'interrupted': break; default: result satisfies never; throw new Error(`Unexpected PostDeployMigration.run(${migrationNumber}) result: ${result}`); } } async function runDynamicMigration( repo: Repository, job: Job<DynamicPostDeployJobData> ): Promise<PostDeployJobRunResult> { const asyncJob = await repo.readResource<AsyncJob>('AsyncJob', job.data.asyncJobId); const exec = new AsyncJobExecutor(repo, asyncJob); const results: MigrationActionResult[] = []; try { await withLongRunningDatabaseClient(async (client) => { await executeMigrationActions(client, results, job.data.migrationActions); }); const output = getAsyncJobOutputFromMigrationActionResults(results); await exec.completeJob(repo, output); } catch (err: any) { const errorMsg = normalizeErrorString(err); globalLogger.error('Post-deploy migration threw an error', { error: errorMsg, asyncJob: getReferenceString(asyncJob), type: job.data.type, dataVersion: asyncJob.dataVersion, }); await exec.failJob(repo, err); } return 'finished'; } export async function runCustomMigration( repo: Repository, job: Job<CustomPostDeployMigrationJobData> | undefined, jobData: CustomPostDeployMigrationJobData, callback: ( client: PoolClient, results: MigrationActionResult[], job: Job<CustomPostDeployMigrationJobData> | undefined, jobData: CustomPostDeployMigrationJobData ) => Promise<void> ): Promise<PostDeployJobRunResult> { const asyncJob = await repo.readResource<AsyncJob>('AsyncJob', jobData.asyncJobId); const exec = new AsyncJobExecutor(repo, asyncJob); const results: MigrationActionResult[] = []; try { await withLongRunningDatabaseClient(async (client) => { await callback(client, results, job, jobData); }); const output = getAsyncJobOutputFromMigrationActionResults(results); await exec.completeJob(repo, output); } catch (err: any) { const errorMsg = normalizeErrorString(err); globalLogger.error('Post-deploy migration threw an error', { error: errorMsg, asyncJob: getReferenceString(asyncJob), type: jobData.type, dataVersion: asyncJob.dataVersion, }); const output = getAsyncJobOutputFromMigrationActionResults(results); await exec.failJob(repo, err, output); } return 'finished'; } function getAsyncJobOutputFromMigrationActionResults(results: MigrationActionResult[]): Parameters { return { resourceType: 'Parameters', parameter: results.map((r) => { const { name, durationMs, ...rest } = r; const part: ParametersParameter[] = [ { name: 'durationMs', valueInteger: durationMs, }, ]; for (const [name, value] of Object.entries(rest)) { const typedValue = toTypedValue(value); if (typedValue.type === 'undefined') { continue; } if ([PropertyType.integer, PropertyType.decimal, PropertyType.boolean].includes(typedValue.type as any)) { part.push({ name: name, ['value' + capitalize(typedValue.type)]: value, }); } else { part.push({ name: name, valueString: value?.toString(), }); } } return { name, part, }; }), }; } export function prepareCustomMigrationJobData(asyncJob: WithId<AsyncJob>): CustomPostDeployMigrationJobData { const ctx = tryGetRequestContext(); return { type: 'custom', asyncJobId: asyncJob.id, requestId: ctx?.requestId, traceId: ctx?.traceId, }; } export function prepareDynamicMigrationJobData( asyncJob: WithId<AsyncJob>, migrationActions: MigrationAction[] ): DynamicPostDeployJobData { const ctx = tryGetRequestContext(); return { type: 'dynamic', migrationActions, asyncJobId: asyncJob.id, requestId: ctx?.requestId, traceId: ctx?.traceId, }; } export async function addPostDeployMigrationJobData<T extends PostDeployJobData>( jobData: T, options?: JobsOptions ): Promise<Job<T> | undefined> { const asyncJob = await getSystemRepo().readResource<AsyncJob>('AsyncJob', jobData.asyncJobId); const deduplicationId = `v${asyncJob.dataVersion}`; const queue = queueRegistry.get<PostDeployJobData>(PostDeployMigrationQueueName); if (!queue) { throw new Error(`Job queue ${PostDeployMigrationQueueName} not available`); } const job = await queue.add('PostDeployMigrationJobData', jobData, { ...options, deduplication: { id: deduplicationId }, }); globalLogger.debug('Added post-deploy migration job', { jobId: job.id, ...getJobDataLoggingFields(job), }); return job as Job<T>; }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/medplum/medplum'

If you have feedback or need assistance with the MCP directory API, please join our Discord server