Skip to main content
Glama
download.ts11.2 kB
// SPDX-FileCopyrightText: Copyright Orangebot, Inc. and Medplum contributors // SPDX-License-Identifier: Apache-2.0 import type { BackgroundJobContext, TypedValueWithPath, WithId } from '@medplum/core'; import { arrayify, crawlTypedValue, getReferenceString, isGone, normalizeOperationOutcome, pathToJSONPointer, toTypedValue, } from '@medplum/core'; import type { Binary, Project, Resource, ResourceType } from '@medplum/fhirtypes'; import type { Job, QueueBaseOptions } from 'bullmq'; import { Queue, Worker } from 'bullmq'; import fetch from 'node-fetch'; import type { Readable } from 'node:stream'; import { Pointer } from 'rfc6902'; import { getConfig } from '../config/loader'; import { tryGetRequestContext, tryRunInRequestContext } from '../context'; import { getSystemRepo } from '../fhir/repo'; import { getLogger, globalLogger } from '../logger'; import { getBinaryStorage } from '../storage/loader'; import { parseTraceparent } from '../traceparent'; import type { WorkerInitializer } from './utils'; import { queueRegistry } from './utils'; /* * The download worker inspects resources, * looking for external URLs that need to be downloaded. * * If an external URL is found, the worker attempts to download the content, * and use the Medplum server storage service. * * On successfully downloading the content, the worker updates the resource * with the Binary resource. */ export interface DownloadJobData { readonly resourceType: ResourceType; readonly id: string; readonly url: string; readonly requestId?: string; readonly traceId?: string; } const queueName = 'DownloadQueue'; const jobName = 'DownloadJobData'; export const initDownloadWorker: WorkerInitializer = (config) => { const defaultOptions: QueueBaseOptions = { connection: config.redis, }; const queue = new Queue<DownloadJobData>(queueName, { ...defaultOptions, defaultJobOptions: { attempts: 3, backoff: { type: 'exponential', delay: 1000, }, }, }); const worker = new Worker<DownloadJobData>( queueName, (job) => tryRunInRequestContext(job.data.requestId, job.data.traceId, () => execDownloadJob(job)), { ...defaultOptions, ...config.bullmq, } ); worker.on('completed', (job) => globalLogger.info(`Completed job ${job.id} successfully`)); worker.on('failed', (job, err) => globalLogger.info(`Failed job ${job?.id} with ${err}`)); return { queue, worker, name: queueName }; }; /** * Returns the download queue instance. * This is used by the unit tests. * @returns The download queue (if available). */ export function getDownloadQueue(): Queue<DownloadJobData> | undefined { return queueRegistry.get(queueName); } /** * Adds all download jobs for a given resource. * * There are a few important structural considerations: * 1) One resource change can spawn multiple download jobs. * 2) Download jobs can fail, and must be retried independently. * 3) Downloads should be evaluated at the time of the resource change. * * So, when a resource changes (create or update), we evaluate all downloaders * at that moment in time. For each matching download, we enqueue the job. * The only purpose of the job is to make the outbound HTTP request, * not to re-evaluate the download. * @param resource - The resource that was created or updated. * @param previousVersion - The previous version of the resource, if available * @param context - The background job context. */ export async function addDownloadJobs( resource: WithId<Resource>, previousVersion: Resource | undefined, context: BackgroundJobContext ): Promise<void> { if (!getConfig().autoDownloadEnabled) { return; } const project = context?.project; if (!project) { return; } const ctx = tryGetRequestContext(); for (const attachment of getAttachments(resource)) { // Only process allowed external URLs const url = attachment.value.url; if (!isExternalUrl(url) || !isUrlAllowedByProject(project, url)) { continue; } // Skip if this mutation didn't adjust the URL in question // Note that there are some cases where we detect a change when an element // _moved_ in the path tree without actually changing. For example, if you // delete an element at array index 0, the remaining items in the array // will shift their index down by 1. // // Given that this is a low frequency type of mutation, we prefer to pay // the (low) cost of a double-enqueued download job instead of trying to // detect path moves on every mutation. const pointer = Pointer.fromJSON(`${pathToJSONPointer(attachment.path)}/url`); if (pointer.get(resource) === pointer.get(previousVersion)) { continue; } await addDownloadJobData({ resourceType: resource.resourceType, id: resource.id, url, requestId: ctx?.requestId, traceId: ctx?.traceId, }); } } /** * Determines if a content URL is an external URL. * * URL's are "internal" if: * 1) They refer to a fully qualified fhir/R4/Binary/ endpoint. * 2) They refer to the Medplum storage URL. * 3) They refer to a Binary in canonical form (i.e., "Binary/123"). * @param url - The Media content URL. * @returns True if the URL is an external URL. */ function isExternalUrl(url: string | undefined): url is string { return !!( url && url.startsWith('https://') && !url.startsWith(getConfig().baseUrl + 'fhir/R4/Binary/') && !url.startsWith(getConfig().storageBaseUrl) && !url.startsWith('Binary/') ); } /** * Determines if a URL is allowed for auto-download. * @param project - The project settings. * @param url - The URL to check. * @returns True if the URL is allowed for auto-download. */ function isUrlAllowedByProject(project: Project, url: string): boolean { if (project.setting?.find((s) => s.name === 'autoDownloadEnabled')?.valueBoolean === false) { // If the project has auto-download disabled, then ignore all URLs. return false; } const allowedUrlPrefixes = project.setting?.find((s) => s.name === 'autoDownloadAllowedUrlPrefixes')?.valueString?.split(',') ?? []; if (allowedUrlPrefixes.length > 0 && !allowedUrlPrefixes.some((prefix) => url.startsWith(prefix))) { // If allowed URLs are specified and the URL does not match an allowed prefix, then ignore it. return false; } const ignoredUrlPrefixes = project.setting?.find((s) => s.name === 'autoDownloadIgnoredUrlPrefixes')?.valueString?.split(',') ?? []; if (ignoredUrlPrefixes.some((prefix) => url.startsWith(prefix))) { // If the URL matches an ignored prefix, then ignore it. return false; } return true; } /** * Adds a download job to the queue. * @param job - The download job details. */ async function addDownloadJobData(job: DownloadJobData): Promise<void> { const queue = getDownloadQueue(); if (queue) { await queue.add(jobName, job); } } /** * Executes a download job. * @param job - The download job details. */ export async function execDownloadJob<T extends Resource = Resource>(job: Job<DownloadJobData>): Promise<void> { const systemRepo = getSystemRepo(); const log = getLogger(); const { resourceType, id, url } = job.data; let resource: T; try { resource = await systemRepo.readResource<T>(resourceType, id); } catch (err) { const outcome = normalizeOperationOutcome(err); if (isGone(outcome)) { // If the resource was deleted, then stop processing it. return; } throw err; } if (!JSON.stringify(resource).includes(url)) { // If the resource no longer includes the URL, then stop processing it. return; } const projectId = resource.meta?.project; if (!projectId) { return; } const project = await systemRepo.readResource<Project>('Project', projectId); if (!isUrlAllowedByProject(project, url)) { return; } const headers: HeadersInit = {}; const traceId = job.data.traceId; if (traceId) { headers['x-trace-id'] = traceId; if (parseTraceparent(traceId)) { headers['traceparent'] = traceId; } } const reference = getReferenceString(resource); try { log.info('Requesting content at: ' + url); const response = await fetch(url, { headers, }); log.info('Received status: ' + response.status); if (response.status >= 400) { throw new Error('Received status ' + response.status); } const contentDisposition = response.headers.get('content-disposition') as string | undefined; const contentType = response.headers.get('content-type') as string; const binary = await systemRepo.createResource<Binary>({ resourceType: 'Binary', contentType, meta: { project: resource.meta?.project, }, securityContext: { reference, }, }); if (response.body === null) { throw new Error('Received null response body'); } // From node-fetch docs: // Note that while the Fetch Standard requires the property to always be a WHATWG ReadableStream, in node-fetch it is a Node.js Readable stream. await getBinaryStorage().writeBinary(binary, contentDisposition, contentType, response.body as Readable); log.info('Downloaded content successfully', { binaryId: binary.id }); // re-fetch resource so we are mutating as recent a copy as possible // (there may have been other mutations applied while we were writing the // object into storage) resource = await systemRepo.readResource<T>(resourceType, id); const attachments = getAttachments(resource); const patches = attachments .filter((attachment) => attachment.value.url === url) .map((value) => ({ op: 'replace' as const, path: `${pathToJSONPointer(value.path)}/url`, value: `Binary/${binary.id}`, })); if (patches.length === 0) { // This can happen if we double enqueued autodownload jobs for the same // URL, or if a user has amended a resource they wrote faster than this // job ran. log.info('Download succeeded but original URL no longer found in resource', { resourceType, id, url, binaryId: binary.id, }); return; } await systemRepo.patchResource( resourceType, id, [...patches, { op: 'replace', path: '/meta/author', value: { reference: 'system' } }], { ifMatch: resource.meta?.versionId } ); } catch (err) { log.info('Download error', { projectId, reference, url, err }); throw err; } } export function getAttachments(resource: Resource): TypedValueWithPath[] { const attachments: TypedValueWithPath[] = []; crawlTypedValue( toTypedValue(resource), { visitProperty: (_parent, _key, _path, propertyValues) => { for (const propertyValue of propertyValues) { for (const value of arrayify(propertyValue)) { if (value.type === 'Attachment') { attachments.push(value); } } } }, }, { initialPath: '' } ); return attachments; }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/medplum/medplum'

If you have feedback or need assistance with the MCP directory API, please join our Discord server