Skip to main content
Glama
cron.ts7.02 kB
// SPDX-FileCopyrightText: Copyright Orangebot, Inc. and Medplum contributors // SPDX-License-Identifier: Apache-2.0 import type { BackgroundJobContext, WithId } from '@medplum/core'; import { ContentType, createReference } from '@medplum/core'; import type { Bot, Project, Resource, Timing } from '@medplum/fhirtypes'; import type { Job, QueueBaseOptions } from 'bullmq'; import { Queue, Worker } from 'bullmq'; import { isValidCron } from 'cron-validator'; import { executeBot } from '../bots/execute'; import { getSystemRepo } from '../fhir/repo'; import { getLogger, globalLogger } from '../logger'; import type { WorkerInitializer } from './utils'; import { findProjectMembership, queueRegistry } from './utils'; const daysOfWeekConversion = { sun: 0, mon: 1, tue: 2, wed: 3, thu: 4, fri: 5, sat: 6 }; const MAX_BOTS_PER_PAGE = 500; /* * The Cron worker inspects resources takes a bot, * if it has the Cron property, will add it as a repeatable * Cron job */ export interface CronJobData { readonly resourceType: string; readonly botId: string; } const queueName = 'CronQueue'; export const initCronWorker: WorkerInitializer = (config) => { const defaultOptions: QueueBaseOptions = { connection: config.redis, }; const queue = new Queue<CronJobData>(queueName, { ...defaultOptions, defaultJobOptions: { attempts: 3, backoff: { type: 'exponential', delay: 1000, }, }, }); const worker = new Worker<CronJobData>(queueName, execBot, { ...defaultOptions, ...config.bullmq, }); worker.on('completed', (job) => globalLogger.info(`Completed job ${job.id} successfully`)); worker.on('failed', (job, err) => globalLogger.info(`Failed job ${job?.id} with ${err}`)); return { queue, worker, name: queueName }; }; /** * Returns the Cron queue instance. * This is used by the unit tests. * @returns The Cron queue (if available). */ export function getCronQueue(): Queue<CronJobData> | undefined { return queueRegistry.get(queueName); } /** * Updates the Cron job for the given resource. * Only applies changes if the effective cron string has changed. * @param resource - The resource that was created or updated. * @param previousVersion - The previous version of the resource, if available. * @param context - The background job context. */ export async function addCronJobs( resource: WithId<Resource>, previousVersion: Resource | undefined, context: BackgroundJobContext ): Promise<void> { const queue = queueRegistry.get(queueName); if (!queue) { // The queue is not available return; } if (resource.resourceType !== 'Bot') { // For now we have only the bot to execute on a timed job return; } const logger = getLogger(); const bot = resource; // Adding a new feature for project that allows users to add a cron const project = context?.project; if (!project?.features?.includes('cron')) { logger.debug('Cron not enabled. Cron needs to be enabled in project to create cron job for bot'); return; } const oldCronStr = getCronStringForBot(previousVersion as Bot); const newCronStr = getCronStringForBot(bot); logger.debug('Cron job for bot', { botId: bot.id, oldCronStr, newCronStr }); if (oldCronStr === newCronStr) { // No change in cron job return; } if (newCronStr) { logger.info('Upsert cron job for bot', { botId: bot.id }); await queue.upsertJobScheduler( bot.id, { pattern: newCronStr, }, { data: { resourceType: bot.resourceType, botId: bot.id, }, } ); } else { logger.info('Removing cron job for bot', { botId: bot.id }); await queue.removeJobScheduler(bot.id); } } function getCronStringForBot(bot: Bot | undefined): string | undefined { if (bot?.cronTiming) { const timingStr = convertTimingToCron(bot.cronTiming); if (timingStr) { return timingStr; } } if (bot?.cronString && isValidCron(bot.cronString)) { return bot.cronString; } // Otherwise, this is not a valid cron job return undefined; } /** * BullMQ repeat option, which conducts the job has a cron-parser's pattern * @param timing - The Cron property from the bot, which is a Timing Type. * @returns The cron string. */ export function convertTimingToCron(timing: Timing): string | undefined { let minute = '0'; let hour = '*'; // The timing input doesn't have a feature for this const dayOfMonth = '*'; // The timing input doesn't have a feature for this const month = '*'; let dayOfWeek = '*'; if (!timing.repeat) { return undefined; } // if period isn't available, we'll have it at 1 const repeat = timing.repeat.period ? timing.repeat.period : 1; // Keep it a max rate of Once a minute for the time being if (repeat > 24 && repeat < 60) { // If more than once an hour we'll need to add to the rate of every Nth min const timesAnHour = Math.ceil((24 * 60) / repeat); minute = `*/${timesAnHour}`; } else { const timesADay = Math.ceil(24 / repeat); hour = `*/${timesADay}`; } // Days of the week const days = timing.repeat.dayOfWeek ? timing.repeat.dayOfWeek : undefined; if (days) { const daysCronFormat = []; for (const day of days) { daysCronFormat.push(daysOfWeekConversion[day]); } dayOfWeek = daysCronFormat.join(','); } return `${minute} ${hour} ${dayOfMonth} ${month} ${dayOfWeek}`; } export async function execBot(job: Job<CronJobData>): Promise<void> { const systemRepo = getSystemRepo(); const bot = await systemRepo.readReference<Bot>({ reference: 'Bot/' + job.data.botId }); const project = bot.meta?.project as string; const runAs = await findProjectMembership(project, createReference(bot)); if (!runAs) { throw new Error('Could not find project membership for bot'); } await executeBot({ bot, runAs, input: bot, contentType: ContentType.FHIR_JSON }); } export async function removeBullMQJobByKey(botId: string): Promise<void> { const queue = queueRegistry.get(queueName); if (queue) { await queue.removeJobScheduler(botId); } } export async function reloadCronBots(): Promise<void> { const queue = queueRegistry.get(queueName); if (queue) { // Clears all jobs from the cron queue, including active ones await queue.obliterate({ force: true }); const systemRepo = getSystemRepo(); await systemRepo.processAllResources<Bot>( { resourceType: 'Bot', count: MAX_BOTS_PER_PAGE }, async (bot) => { // If the bot has a cron, then add a scheduler for it if (bot.cronString || bot.cronTiming) { // We pass `undefined` as previous version to make sure that the latest cron string is used const project = await systemRepo.readResource<Project>('Project', bot.meta?.project as string); await addCronJobs(bot, undefined, { project, interaction: 'update' }); } }, { delayBetweenPagesMs: 1000 } ); } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/medplum/medplum'

If you have feedback or need assistance with the MCP directory API, please join our Discord server