Skip to main content
Glama
subscription.ts27.7 kB
// SPDX-FileCopyrightText: Copyright Orangebot, Inc. and Medplum contributors // SPDX-License-Identifier: Apache-2.0 import type { BackgroundJobContext, BackgroundJobInteraction, WithId } from '@medplum/core'; import { AccessPolicyInteraction, ContentType, Operator, createReference, deepClone, getExtension, getExtensionValue, getReferenceString, isGone, isNotFound, isString, normalizeOperationOutcome, resourceMatchesSubscriptionCriteria, satisfiedAccessPolicy, stringify, } from '@medplum/core'; import type { Bot, ClientApplication, Patient, Practitioner, Project, ProjectMembership, Reference, RelatedPerson, Resource, ResourceType, Subscription, } from '@medplum/fhirtypes'; import type { Job, QueueBaseOptions } from 'bullmq'; import { Queue, Worker } from 'bullmq'; import fetch from 'node-fetch'; import { createHmac } from 'node:crypto'; import { executeBot } from '../bots/execute'; import { getRequestContext, runInAsyncContext, tryGetRequestContext, tryRunInRequestContext } from '../context'; import { buildAccessPolicy } from '../fhir/accesspolicy'; import { isPreCommitSubscription } from '../fhir/precommit'; import type { Repository, ResendSubscriptionsOptions } from '../fhir/repo'; import { getSystemRepo } from '../fhir/repo'; import { RewriteMode, rewriteAttachments } from '../fhir/rewrite'; import { getLogger, globalLogger } from '../logger'; import type { AuthState } from '../oauth/middleware'; import { recordHistogramValue } from '../otel/otel'; import { getRedis } from '../redis'; import type { SubEventsOptions } from '../subscriptions/websockets'; import { parseTraceparent } from '../traceparent'; import { AuditEventOutcome, createSubscriptionAuditEvent } from '../util/auditevent'; import type { WorkerInitializer } from './utils'; import { addVerboseQueueLogging, findProjectMembership, isJobSuccessful, queueRegistry } from './utils'; /** * The timeout for outbound rest-hook subscription HTTP requests. * This is passed into fetch and will make fetch abort the request after REQUEST_TIMEOUT milliseconds. */ const REQUEST_TIMEOUT = 120_000; // 120 seconds, 2 mins /** * The upper limit on the number of times a job can be attempted. * Using exponential backoff, 19 attempts is about 73 hours (2^18 seconds). */ const MAX_JOB_ATTEMPTS = 19; /** * The default number of times a job will be attempted. * This can be overridden by the subscription-max-attempts extension. */ const DEFAULT_ATTEMPTS = 4; /** * The maximum number of attempts to get through the preamble (loading subscription and resource). * Errors in the preamble point to issues with the Medplum server as opposed to the client's hook, bot, etc, * so we limit the number of retries differently. Set to a large value to allow time for server issues to be resolved * before dropping the job. */ const MAX_PREAMBLE_ATTEMPTS = MAX_JOB_ATTEMPTS; /* * The subscription worker inspects every resource change, * and executes FHIR Subscription resources for those changes. * * The common case is to perform an outbound HTTP request to a webhook. * But Subscriptions can include email and SMS notifications. */ export interface SubscriptionJobData { readonly subscriptionId: string; readonly resourceType: ResourceType; readonly channelType?: Subscription['channel']['type']; readonly id: string; readonly versionId: string; readonly interaction: 'create' | 'update' | 'delete'; readonly requestTime: string; readonly requestId?: string; readonly traceId?: string; readonly authState?: AuthState; readonly verbose?: boolean; } function getLoggingFields(job: Job<SubscriptionJobData>): Record<string, string | undefined> { return { subscription: 'Subscription/' + job.data.subscriptionId, resource: `${job.data.resourceType}/${job.data.id}`, versionId: job.data.versionId, interaction: job.data.interaction, channelType: job.data.channelType, requestId: job.data.requestId, traceId: job.data.traceId, }; } const queueName = 'SubscriptionQueue'; const jobName = 'SubscriptionJobData'; export const initSubscriptionWorker: WorkerInitializer = (config) => { const defaultOptions: QueueBaseOptions = { connection: config.redis, }; const queue = new Queue<SubscriptionJobData>(queueName, { ...defaultOptions, defaultJobOptions: { attempts: MAX_JOB_ATTEMPTS, // 1 second * 2^18 = 73 hours backoff: { type: 'exponential', delay: 1000, }, }, }); const worker = new Worker<SubscriptionJobData>( queueName, (job) => job.data.authState ? runInAsyncContext(job.data.authState, job.data.requestId, job.data.traceId, () => execSubscriptionJob(job)) : tryRunInRequestContext(job.data.requestId, job.data.traceId, () => execSubscriptionJob(job)), { ...defaultOptions, ...config.bullmq, } ); addVerboseQueueLogging<SubscriptionJobData>(queue, worker, getLoggingFields); worker.on('active', (job) => { // Only record queuedDuration on the first attempt if (job.attemptsMade === 0) { recordHistogramValue('medplum.subscription.queuedDuration', (Date.now() - (job.timestamp as number)) / 1000); } }); worker.on('completed', (job) => { recordHistogramValue( 'medplum.subscription.executionDuration', ((job.finishedOn as number) - (job.processedOn as number)) / 1000 ); recordHistogramValue( 'medplum.subscription.totalDuration', ((job.finishedOn as number) - (job.timestamp as number)) / 1000 ); }); worker.on('failed', (job) => { if (job) { recordHistogramValue( 'medplum.subscription.failedExecutionDuration', ((job.finishedOn as number) - (job.processedOn as number)) / 1000 ); } }); return { queue, worker, name: queueName }; }; /** * Returns the subscription queue instance. * This is used by the unit tests. * @returns The subscription queue (if available). */ export function getSubscriptionQueue(): Queue<SubscriptionJobData> | undefined { return queueRegistry.get(queueName); } /** * Checks if this resource should create a notification for this `Subscription` based on the access policy that should be applied for this `Subscription`. * The `AccessPolicy` of author's `ProjectMembership` for this resource's `Project` is used when evaluating whether the `AccessPolicy` is satisfied. * * Currently we log if the `AccessPolicy` is not satisfied only. * * TODO: Actually prevent notifications for `Subscriptions` where the `AccessPolicy` is not satisfied (for rest-hook subscriptions) * * @param resource - The resource to evaluate against the `AccessPolicy`. * @param project - The project containing the resource. * @param subscription - The `Subscription` to get the `AccessPolicy` for. * @returns True if access policy is satisfied for this Subscription notification, otherwise returns false */ async function satisfiesAccessPolicy( resource: Resource, project: WithId<Project>, subscription: Subscription ): Promise<boolean> { let satisfied = true; try { // We can assert author because any time a resource is updated, the author will be set to the previous author or if it doesn't exist // The current Repository author, which must exist for Repository to successfully construct const subAuthor = subscription.meta?.author as Reference; const membership = await findProjectMembership(project.id, subAuthor); if (membership) { const accessPolicy = await buildAccessPolicy(membership); satisfied = !!satisfiedAccessPolicy(resource, AccessPolicyInteraction.READ, accessPolicy); if (!satisfied && subscription.channel.type !== 'websocket') { const resourceReference = getReferenceString(resource); const subReference = getReferenceString(subscription); const projectReference = getReferenceString(project); globalLogger.warn( `[Subscription Access Policy]: Access Policy not satisfied on '${resourceReference}' for '${subReference}'`, { subscription: subReference, project: projectReference, accessPolicy } ); } } else { const projectReference = getReferenceString(project); const authorReference = getReferenceString(subAuthor); const subReference = getReferenceString(subscription); globalLogger.warn( `[Subscription Access Policy]: No membership for subscription author '${authorReference}' in project '${projectReference}'`, { subscription: subReference, project: projectReference } ); satisfied = false; } } catch (err: unknown) { const projectReference = getReferenceString(project); const resourceReference = getReferenceString(resource); const subReference = getReferenceString(subscription); globalLogger.warn( `[Subscription Access Policy]: Error occurred while checking access policy for resource '${resourceReference}' against '${subReference}'`, { subscription: subReference, project: projectReference, error: err } ); satisfied = false; } // Always return true if channelType !== websocket for now return subscription.channel.type === 'websocket' ? satisfied : true; } /** * Adds all subscription jobs for a given resource. * * There are a few important structural considerations: * 1) One resource change can spawn multiple subscription jobs. * 2) Subscription jobs can fail, and must be retried independently. * 3) Subscriptions should be evaluated at the time of the resource change. * * So, when a resource changes (create, update, or delete), we evaluate all subscriptions * at that moment in time. For each matching subscription, we enqueue the job. * The only purpose of the job is to make the outbound HTTP request, * not to re-evaluate the subscription. * @param resource - The resource that was created or updated. * @param previousVersion - The previous version of the resource. * @param context - The background job context. * @param options - The resend subscriptions options. */ export async function addSubscriptionJobs( resource: WithId<Resource>, previousVersion: Resource | undefined, context: BackgroundJobContext, options?: ResendSubscriptionsOptions ): Promise<void> { if (resource.resourceType === 'AuditEvent') { // Never send subscriptions for audit events return; } // websocket subscriptions are persisted differently than other subscriptions // and are generally thought of as more transient in a way that firing subscriptions // for websocket subscriptions themselves would be quite noisy and not particularly useful // since they cannot be found through traditional means like reading by resource ID. if (resource.resourceType === 'Subscription' && resource.channel?.type === 'websocket') { return; } const ctx = tryGetRequestContext(); const logger = getLogger(); const logFn = options?.verbose ? logger.info : logger.debug; const project = context?.project; if (!project) { return; } const requestTime = new Date().toISOString(); const subscriptions = await getSubscriptions(resource, project); logFn(`Evaluate ${subscriptions.length} subscription(s)`); const wsEvents = [] as [Resource, string, SubEventsOptions][]; for (const subscription of subscriptions) { if (isPreCommitSubscription(subscription)) { // Ignore pre-commit subscriptions continue; } if (options?.subscription && options.subscription !== getReferenceString(subscription)) { logFn('Subscription does not match options.subscription'); continue; } let matches: boolean; try { matches = await matchesCriteria(resource, previousVersion, subscription, context); logFn(`Subscription matchesCriteria(${resource.id}, ${subscription.id}) = ${matches}`); } catch (err) { // If we throw when evaluating the criteria, log and continue logFn('Error when evaluating matchesCriteria for resource against Subscription', { resourceType: resource.resourceType, resource: resource.id, subscription: subscription.id, err, }); continue; } if (matches) { if (!(await satisfiesAccessPolicy(resource, project, subscription))) { logFn(`Subscription satisfiesAccessPolicy(${resource.id}) = false`); continue; } if (subscription.channel.type === 'websocket') { wsEvents.push([resource, subscription.id, { includeResource: true }]); continue; } await addSubscriptionJobData({ subscriptionId: subscription.id, resourceType: resource.resourceType, channelType: subscription.channel.type, id: resource.id, versionId: resource.meta?.versionId as string, interaction: context.interaction, requestTime, requestId: ctx?.requestId, traceId: ctx?.traceId, authState: ctx?.authState, verbose: options?.verbose, }); } } if (wsEvents.length) { await getRedis().publish('medplum:subscriptions:r4:websockets', JSON.stringify(wsEvents)); } } /** * Determines if the resource matches the subscription criteria. * @param resource - The resource that was created or updated. * @param previousVersion - The previous version of the resource. * @param subscription - The subscription. * @param context - Background job context. * @returns True if the resource matches the subscription criteria. */ async function matchesCriteria( resource: Resource, previousVersion: Resource | undefined, subscription: Subscription, context: BackgroundJobContext ): Promise<boolean> { const ctx = getRequestContext(); const getPreviousResource = async (): Promise<Resource | undefined> => previousVersion; return resourceMatchesSubscriptionCriteria({ resource, subscription, context, logger: ctx.logger, getPreviousResource: getPreviousResource, }); } /** * Adds a subscription job to the queue. * @param job - The subscription job details. */ async function addSubscriptionJobData(job: SubscriptionJobData): Promise<void> { const queue = queueRegistry.get(queueName); if (queue) { await queue.add(jobName, job); } } /** * Loads the list of all subscriptions in this repository. * @param resource - The resource that was created or updated. * @param project - The project that contains this resource. * @returns The list of all subscriptions in this repository. */ async function getSubscriptions(resource: Resource, project: WithId<Project>): Promise<WithId<Subscription>[]> { const projectId = project.id; const systemRepo = getSystemRepo(); const subscriptions = await systemRepo.searchResources<Subscription>({ resourceType: 'Subscription', count: 1000, filters: [ { code: '_project', operator: Operator.EQUALS, value: projectId, }, { code: 'status', operator: Operator.EQUALS, value: 'active', }, ], }); const redis = getRedis(); const setKey = `medplum:subscriptions:r4:project:${projectId}:active`; const redisOnlySubRefStrs = await redis.smembers(setKey); if (redisOnlySubRefStrs.length) { const redisOnlySubStrs = await redis.mget(redisOnlySubRefStrs); if (project.features?.includes('websocket-subscriptions')) { const activeSubStrs = redisOnlySubStrs.filter(Boolean); if (redisOnlySubStrs.length - activeSubStrs.length >= 50) { getLogger().warn('Excessive subscription cache miss', { numKeys: redisOnlySubRefStrs.length, hitRate: activeSubStrs.length / redisOnlySubStrs.length, projectId, }); const inactiveSubs: string[] = []; for (let i = 0; i < redisOnlySubStrs.length; i++) { if (!redisOnlySubStrs[i]) { inactiveSubs.push(redisOnlySubRefStrs[i]); } } await redis.srem(setKey, inactiveSubs); } const subArrStr = '[' + activeSubStrs.join(',') + ']'; const inMemorySubs = JSON.parse(subArrStr) as { resource: WithId<Subscription>; projectId: string }[]; for (const { resource } of inMemorySubs) { subscriptions.push(resource); } } else { globalLogger.debug( `[WebSocket Subscriptions]: subscription for resource '${getReferenceString(resource)}' might have been fired but WebSocket subscriptions are not enabled for project '${project.name ?? getReferenceString(project)}'` ); } } return subscriptions; } /** * Executes a subscription job. * @param job - The subscription job details. */ export async function execSubscriptionJob(job: Job<SubscriptionJobData>): Promise<void> { let subscription: WithId<Subscription> | undefined; let rewrittenResource: Resource; try { const systemRepo = getSystemRepo(); const { subscriptionId, resourceType, id, versionId, verbose } = job.data; const logger = getLogger(); const logFn = verbose ? logger.info : logger.debug; subscription = await tryGetSubscription(systemRepo, subscriptionId, job.data.channelType); if (!subscription) { // If the subscription was deleted, then stop processing it. logFn(`Subscription ${subscriptionId} not found`); return; } const channelType = subscription.channel.type; if (channelType !== 'rest-hook') { logFn(`Subscription ${subscriptionId} has unsupported channel type ${channelType}`); return; } if (subscription.status !== 'active') { // If the subscription has been disabled, then stop processing it. logFn(`Subscription ${subscriptionId} is not active`); return; } if (job.data.interaction !== 'delete') { const currentVersion = await tryGetCurrentVersion(systemRepo, resourceType, id); if (!currentVersion) { // If the resource was deleted, then stop processing it. logFn(`Resource ${resourceType}/${id} not found`); return; } if (job.attemptsMade > 0 && currentVersion.meta?.versionId !== versionId) { // If this is a retry and the resource is not the current version, then stop processing it. logFn(`Resource ${resourceType}/${id} is not the current version`); return; } } const versionedResource = await systemRepo.readVersion(resourceType, id, versionId); // We use the resource with rewritten attachments here since we want subscribers to get the resource with the same attachment URLs // They would get if they did a search rewrittenResource = await rewriteAttachments(RewriteMode.PRESIGNED_URL, systemRepo, deepClone(versionedResource)); } catch (err) { if (job.attemptsMade < MAX_PREAMBLE_ATTEMPTS) { throw err; } // Too many errors in the preamble, give up globalLogger.error('Subscription job preamble failed too many times, giving up', getLoggingFields(job)); return; } try { // Errors in this try/catch are considered to be issues in the client's rest hook or bot // and should trigger retries according to the subscription's max attempts if (subscription.channel?.endpoint?.startsWith('Bot/')) { await execBot(job, subscription, rewrittenResource, job.data.interaction, job.data.requestTime); } else { await sendRestHook(job, subscription, rewrittenResource, job.data.interaction, job.data.requestTime); } } catch (err) { await catchJobError(subscription, job, err); } } async function tryGetSubscription( systemRepo: Repository, subscriptionId: string, channelType: SubscriptionJobData['channelType'] ): Promise<WithId<Subscription> | undefined> { try { return await systemRepo.readResource<Subscription>('Subscription', subscriptionId, { checkCacheOnly: channelType === 'websocket', }); } catch (err) { const outcome = normalizeOperationOutcome(err); // If the Subscription was marked as deleted in the database, this will return "gone" // However, deleted WebSocket subscriptions will return "not found" if (isGone(outcome) || isNotFound(outcome)) { // If the subscription was deleted, then stop processing it. return undefined; } // Otherwise re-throw throw err; } } async function tryGetCurrentVersion<T extends Resource = Resource>( systemRepo: Repository, resourceType: T['resourceType'], id: string ): Promise<T | undefined> { try { return await systemRepo.readResource(resourceType, id); } catch (err) { const outcome = normalizeOperationOutcome(err); if (isGone(outcome) || isNotFound(outcome)) { // If the resource was deleted, then stop processing it. return undefined; } // Otherwise re-throw throw err; } } /** * Sends a rest-hook subscription. * @param job - The subscription job details. * @param subscription - The subscription. * @param resource - The resource that triggered the subscription. * @param interaction - The interaction type. * @param requestTime - The request time. */ async function sendRestHook( job: Job<SubscriptionJobData>, subscription: WithId<Subscription>, resource: Resource, interaction: BackgroundJobInteraction, requestTime: string ): Promise<void> { const log = getLogger(); const url = subscription.channel?.endpoint as string; if (!url) { // This can happen if a user updates the Subscription after the job is created. log.debug(`Ignore rest hook missing URL`); return; } const body = interaction === 'delete' ? '{}' : stringify(resource); const headers = buildRestHookHeaders(job, subscription, resource, interaction, body); let error: Error | undefined = undefined; const fetchStartTime = Date.now(); let fetchEndTime: number; try { log.info('Sending rest hook to: ' + url); log.debug('Rest hook headers: ' + JSON.stringify(headers, undefined, 2)); const response = await fetch(url, { method: 'POST', headers, body, timeout: REQUEST_TIMEOUT }); fetchEndTime = Date.now(); log.info('Received rest hook status: ' + response.status); const success = isJobSuccessful(subscription, response.status); await createSubscriptionAuditEvent( resource, requestTime, success ? AuditEventOutcome.Success : AuditEventOutcome.MinorFailure, `Attempt ${job.attemptsMade} received status ${response.status}`, subscription ); if (!success) { error = new Error('Received status ' + response.status); } } catch (ex) { fetchEndTime = Date.now(); log.info('Subscription exception: ' + ex); await createSubscriptionAuditEvent( resource, requestTime, AuditEventOutcome.MinorFailure, `Attempt ${job.attemptsMade} received error ${ex}`, subscription ); error = ex as Error; } const fetchDurationMs = fetchEndTime - fetchStartTime; recordHistogramValue('medplum.subscription.restHookFetchDuration', fetchDurationMs / 1000); log.info('Subscription rest hook fetch duration', { fetchDurationMs, subscription: subscription.id, project: subscription?.meta?.project, }); if (error) { throw error; } } /** * Builds a collection of HTTP request headers for the rest-hook subscription. * @param job - The subscription job. * @param subscription - The subscription resource. * @param resource - The trigger resource. * @param interaction - The interaction type. * @param body - The request body. * @returns The HTTP request headers. */ function buildRestHookHeaders( job: Job<SubscriptionJobData>, subscription: WithId<Subscription>, resource: Resource, interaction: BackgroundJobInteraction, body: string ): Record<string, string> { const headers: Record<string, string> = { 'Content-Type': ContentType.FHIR_JSON, 'X-Medplum-Subscription': subscription.id, 'X-Medplum-Interaction': interaction, }; if (interaction === 'delete') { headers['X-Medplum-Deleted-Resource'] = `${resource.resourceType}/${resource.id}`; } if (subscription.channel?.header) { for (const header of subscription.channel.header) { const [key, value] = header.split(/:/); headers[key.trim()] = value.trim(); } } // Look for signature secret in Medplum extension // Note that the first version of the extension used a different URL // We still support the old URL for backwards compatibility const secret = getExtensionValue(subscription, 'https://www.medplum.com/fhir/StructureDefinition/subscription-secret') || getExtensionValue(subscription, 'https://www.medplum.com/fhir/StructureDefinition-subscriptionSecret'); if (secret && isString(secret)) { headers['X-Signature'] = createHmac('sha256', secret).update(body).digest('hex'); } const traceId = job.data.traceId; if (traceId) { headers['x-trace-id'] = traceId; if (parseTraceparent(traceId)) { headers['traceparent'] = traceId; } } return headers; } /** * Executes a Bot subscription. * @param job - The subscription job. * @param subscription - The subscription. * @param resource - The resource that triggered the subscription. * @param interaction - The interaction type. * @param requestTime - The request time. */ async function execBot( job: Job<SubscriptionJobData>, subscription: WithId<Subscription>, resource: Resource, interaction: BackgroundJobInteraction, requestTime: string ): Promise<void> { const ctx = getRequestContext(); const url = subscription.channel?.endpoint as string; if (!url) { // This can happen if a user updates the Subscription after the job is created. ctx.logger.debug(`Ignore rest hook missing URL`); return; } // URL should be a Bot reference string const systemRepo = getSystemRepo(); const bot = await systemRepo.readReference<Bot>({ reference: url }); const project = bot.meta?.project as string; const requester = resource.meta?.author as Reference< Bot | ClientApplication | Patient | Practitioner | RelatedPerson >; let runAs: WithId<ProjectMembership> | undefined; if (bot.runAsUser) { runAs = await findProjectMembership(project, requester); } else { runAs = await findProjectMembership(project, createReference(bot)); } if (!runAs) { throw new Error('Could not find project membership for bot'); } const body = interaction === 'delete' ? { deletedResource: resource } : resource; const headers = buildRestHookHeaders(job, subscription, resource, interaction, JSON.stringify(body)); await executeBot({ subscription, bot, runAs, requester, input: body, contentType: ContentType.FHIR_JSON, requestTime, traceId: ctx.traceId, headers, }); } async function catchJobError(subscription: Subscription, job: Job<SubscriptionJobData>, err: any): Promise<void> { const maxJobAttempts = getExtension(subscription, 'https://medplum.com/fhir/StructureDefinition/subscription-max-attempts') ?.valueInteger ?? DEFAULT_ATTEMPTS; if (job.attemptsMade < maxJobAttempts) { globalLogger.debug(`Retrying job due to error: ${err}`); // Lower the job priority // "Note that the priorities go from 1 to 2 097 152, where a lower number is always a higher priority than higher numbers." // "Jobs without a `priority`` assigned will get the most priority." // See: https://docs.bullmq.io/guide/jobs/prioritized await job.changePriority({ priority: 1 + job.attemptsMade }); throw err; } // If the maxJobAttempts equals the jobs.attemptsMade, we won't throw, which won't trigger a retry globalLogger.debug(`Max attempts made for job ${job.id}, subscription: ${subscription.id}`); }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/medplum/medplum'

If you have feedback or need assistance with the MCP directory API, please join our Discord server