Skip to main content
Glama
repo.ts103 kB
// SPDX-FileCopyrightText: Copyright Orangebot, Inc. and Medplum contributors // SPDX-License-Identifier: Apache-2.0 import type { BackgroundJobInteraction, Filter, SearchParameterDetails, SearchRequest, TypedValue, TypedValueWithPath, ValidatorOptions, WithId, } from '@medplum/core'; import { AccessPolicyInteraction, accessPolicySupportsInteraction, allOk, badRequest, convertToSearchableDates, convertToSearchableNumbers, convertToSearchableQuantities, convertToSearchableReferences, convertToSearchableStrings, convertToSearchableTokens, convertToSearchableUris, createReference, deepClone, deepEquals, DEFAULT_MAX_SEARCH_COUNT, evalFhirPathTyped, extractAccountReferences, flatMapFilter, forbidden, formatSearchQuery, getReferenceString, getStatus, gone, isGone, isNotFound, isObject, isOk, isResource, isResourceWithId, isUUID, normalizeErrorString, normalizeOperationOutcome, notFound, OperationOutcomeError, Operator, parseReference, parseSearchRequest, preconditionFailed, PropertyType, protectedResourceTypes, readInteractions, resolveId, satisfiedAccessPolicy, SearchParameterType, serverError, sleep, stringify, toPeriod, toTypedValue, validateResource, validateResourceType, } from '@medplum/core'; import type { CreateResourceOptions, ReadHistoryOptions, UpdateResourceOptions } from '@medplum/fhir-router'; import { FhirRepository, RepositoryMode } from '@medplum/fhir-router'; import type { AccessPolicy, AccessPolicyResource, Binary, Bundle, BundleEntry, CodeableConcept, Coding, Meta, OperationOutcome, OperationOutcomeIssue, Project, Reference, Resource, ResourceType, SearchParameter, StructureDefinition, ValueSet, } from '@medplum/fhirtypes'; import { Readable } from 'node:stream'; import type { Pool, PoolClient } from 'pg'; import type { Operation } from 'rfc6902'; import { v4 } from 'uuid'; import { getConfig } from '../config/loader'; import type { ArrayColumnPaddingConfig } from '../config/types'; import { syntheticR4Project, systemResourceProjectId } from '../constants'; import { AuthenticatedRequestContext, tryGetRequestContext } from '../context'; import { DatabaseMode, getDatabasePool } from '../database'; import { getLogger } from '../logger'; import { incrementCounter, recordHistogramValue } from '../otel/otel'; import { getRedis } from '../redis'; import { getBinaryStorage } from '../storage/loader'; import type { AuditEventSubtype } from '../util/auditevent'; import { AuditEventOutcome, createAuditEvent, CreateInteraction, DeleteInteraction, HistoryInteraction, logAuditEvent, PatchInteraction, ReadInteraction, RestfulOperationType, SearchInteraction, UpdateInteraction, VreadInteraction, } from '../util/auditevent'; import { patchObject } from '../util/patch'; import { addBackgroundJobs } from '../workers'; import { addSubscriptionJobs } from '../workers/subscription'; import type { FhirRateLimiter } from './fhirquota'; import { validateResourceWithJsonSchema } from './jsonschema'; import type { HumanNameResource } from './lookups/humanname'; import { getHumanNameSortValue } from './lookups/humanname'; import { getStandardAndDerivedSearchParameters } from './lookups/util'; import { clamp } from './operations/utils/parameters'; import { findTerminologyResource } from './operations/utils/terminology'; import { validateCodingInValueSet } from './operations/valuesetvalidatecode'; import { getPatients } from './patient'; import { preCommitValidation } from './precommit'; import { replaceConditionalReferences, validateResourceReferences } from './references'; import type { ResourceCap } from './resource-cap'; import { getFullUrl } from './response'; import { rewriteAttachments, RewriteMode } from './rewrite'; import type { SearchOptions } from './search'; import { buildSearchExpression, searchByReferenceImpl, searchImpl } from './search'; import type { ColumnSearchParameterImplementation } from './searchparameter'; import { getSearchParameterImplementation, lookupTables } from './searchparameter'; import type { Expression, TransactionIsolationLevel } from './sql'; import { Condition, DeleteQuery, Disjunction, InsertQuery, normalizeDatabaseError, periodToRangeString, PostgresError, SelectQuery, } from './sql'; import { buildTokenColumns } from './token-column'; const defaultTransactionAttempts = 2; const defaultExpBackoffBaseDelayMs = 50; const retryableTransactionErrorCodes: string[] = [PostgresError.SerializationFailure]; /** * The RepositoryContext interface defines standard metadata for repository actions. * In practice, there will be one Repository per HTTP request. * And the RepositoryContext represents the context of that request, * such as "who is the current user?" and "what is the current project?" */ export interface RepositoryContext { /** * The current author reference. * This should be a FHIR reference string (i.e., "resourceType/id"). * Where resource type is ClientApplication, Patient, Practitioner, etc. * This value will be included in every resource as meta.author. */ author: Reference; /** * Optional individual, device, or organization for whom the change was made. * This value will be included in every resource as meta.onBehalfOf. */ onBehalfOf?: Reference; remoteAddress?: string; /** * Projects that the Repository is allowed to access. * This should include the ID/UUID of the current project, but may also include other accessory Projects. * If this is undefined, the current user is a server user (e.g. Super Admin) * The usual case has two elements: the user's Project and the base R4 Project * The user's "primary" Project will be the first element in the array (i.e. projects[0]) * This value will be included in every resource as meta.project. */ projects?: WithId<Project>[]; /** Current Project of the authenticated user, or none for the system repository. */ currentProject?: WithId<Project>; /** * Optional compartment restriction. * If the compartments array is provided, * all queries will be restricted to those compartments. */ accessPolicy?: AccessPolicy; /** * Optional flag for system administrators, * which grants system-level access. */ superAdmin?: boolean; /** * Optional flag for project administrators, * which grants additional project-level access. */ projectAdmin?: boolean; /** * Optional flag to validate resources in strict mode. * Strict mode validates resources against StructureDefinition resources, * which includes strict date validation, backbone elements, and more. * Non-strict mode uses the official FHIR JSONSchema definition, which is * significantly more relaxed. */ strictMode?: boolean; /** * Optional flag to validate references on write operations. * If enabled, the repository will check that all references are valid, * and that the current user has access to the referenced resource. */ checkReferencesOnWrite?: boolean; validateTerminology?: boolean; /** * Optional flag to include Medplum extended meta fields. * Medplum tracks additional metadata for each resource, such as: * 1) "author" - Reference to the last user who modified the resource. * 2) "project" - Reference to the project that owns the resource. * 3) "compartment" - References to all compartments the resource is in. */ extendedMode?: boolean; } export interface CacheEntry<T extends Resource = Resource> { resource: T; projectId: string; } export interface InteractionOptions { verbose?: boolean; } export interface ReadResourceOptions extends InteractionOptions { checkCacheOnly?: boolean; } export interface ResendSubscriptionsOptions extends InteractionOptions { interaction?: BackgroundJobInteraction; subscription?: string; } export interface ProcessAllResourcesOptions { delayBetweenPagesMs?: number; } /** * The Repository class manages reading and writing to the FHIR repository. * It is a thin layer on top of the database. * Repository instances should be created per author and project. */ export class Repository extends FhirRepository<PoolClient> implements Disposable { private readonly context: RepositoryContext; private conn?: PoolClient; private readonly disposable: boolean = true; private transactionDepth = 0; private closed = false; mode: RepositoryMode; private preCommitCallbacks: (() => Promise<void>)[] = []; private postCommitCallbacks: (() => Promise<void>)[] = []; /** * The version to be set on resources when they are inserted/updated into the database. * The value should be incremented each time there is a change in the schema (really just columns) * of the resource tables or when there are code changes to `buildResourceRow`. * * Version history: * * 1. 02/27/25 - Added `__version` column (https://github.com/medplum/medplum/pull/6033) * 2. 04/09/25 - Added qualification-code search param for `Practitioner` (https://github.com/medplum/medplum/pull/6280) * 3. 04/09/25 - Added __tokens column for `token-column` search strategy (https://github.com/medplum/medplum/pull/6291) * 4. 04/25/25 - Consider `resource.id` in lookup table batch reindex (https://github.com/medplum/medplum/pull/6479) * 5. 04/29/25 - Added `status` param for `Flag` resources (https://github.com/medplum/medplum/pull/6500) * 6. 06/12/25 - Added columns per token search parameter (https://github.com/medplum/medplum/pull/6727) * 7. 06/25/25 - Added search params `ProjectMembership-identifier`, `Immunization-encounter`, `AllergyIntolerance-encounter` (https://github.com/medplum/medplum/pull/6868) * 8. 08/06/25 - Added Task to Patient compartment (https://github.com/medplum/medplum/pull/7194) * 9. 08/19/25 - Added search parameter `ServiceRequest-reason-code` (https://github.com/medplum/medplum/pull/7271) * 10. 08/27/25 - Added HumanName sort columns (https://github.com/medplum/medplum/pull/7304) * 11. 09/25/25 - Added ConceptMapping lookup table (https://github.com/medplum/medplum/pull/7469) * 12. 12/01/25 - Added search param `Bot-cds-hook` (https://github.com/medplum/medplum/pull/7933) */ static readonly VERSION: number = 12; constructor(context: RepositoryContext, conn?: PoolClient) { super(); this.context = context; this.context.projects?.push(syntheticR4Project); if (!this.context.author?.reference) { throw new Error('Invalid author reference'); } if (conn) { this.conn = conn; this.disposable = false; } // Default to writer mode // In the future, as we do more testing and validation, we will explore defaulting to reader mode // However, for now, we default to writer and only use reader mode for requests guaranteed not to have consistency risks this.mode = RepositoryMode.WRITER; } clone(): Repository { return new Repository(this.context, this.conn); } setMode(mode: RepositoryMode): void { this.mode = mode; } private rateLimiter(): FhirRateLimiter | undefined { return !this.isSuperAdmin() ? tryGetRequestContext()?.fhirRateLimiter : undefined; } private resourceCap(): ResourceCap | undefined { const context = tryGetRequestContext(); return !this.isSuperAdmin() && context instanceof AuthenticatedRequestContext ? context.resourceCap : undefined; } currentProject(): WithId<Project> | undefined { return this.context.currentProject; } /** * Returns a project by ID. * This handles the common case where the project ID is the same as the current project ID, * but also supports the super admin case where the project ID is different. * @param projectId - The project ID to look up. * @returns The project, or undefined if not found. */ private async getProjectById(projectId: string | undefined): Promise<WithId<Project> | undefined> { if (!projectId) { return undefined; } if (projectId === this.context.currentProject?.id) { return this.context.currentProject; } return getSystemRepo().readResource<Project>('Project', projectId); } async createResource<T extends Resource>(resource: T, options?: CreateResourceOptions): Promise<WithId<T>> { await this.rateLimiter()?.recordWrite(); await this.resourceCap()?.created(); if (options?.assignedId && resource.id && !this.context.superAdmin) { // NB: To be removed after proper client assigned ID support is added const systemRepo = getSystemRepo(); try { const existing = await systemRepo.readResourceImpl(resource.resourceType, resource.id); if (existing) { throw new Error('Assigned ID is already in use'); } } catch (err) { if (!isNotFound(normalizeOperationOutcome(err))) { throw err; } } } const resourceWithId = { ...resource, id: options?.assignedId && resource.id ? resource.id : this.generateId(), }; const startTime = Date.now(); try { const result = await this.updateResourceImpl(resourceWithId, true); const durationMs = Date.now() - startTime; await this.postCommit(async () => { this.logEvent(CreateInteraction, AuditEventOutcome.Success, undefined, { resource: result, durationMs }); }); return result; } catch (err) { const durationMs = Date.now() - startTime; this.logEvent(CreateInteraction, AuditEventOutcome.MinorFailure, err, { durationMs, resource: { type: resource.resourceType }, }); throw err; } } generateId(): string { return v4(); } async readResource<T extends Resource>( resourceType: T['resourceType'], id: string, options?: ReadResourceOptions ): Promise<WithId<T>> { await this.rateLimiter()?.recordRead(); const startTime = Date.now(); try { const result = this.removeHiddenFields(await this.readResourceImpl<T>(resourceType, id, options)); const durationMs = Date.now() - startTime; this.logEvent(ReadInteraction, AuditEventOutcome.Success, undefined, { resource: result, durationMs }); return result; } catch (err) { const durationMs = Date.now() - startTime; this.logEvent(ReadInteraction, AuditEventOutcome.MinorFailure, err, { resource: { reference: `${resourceType}/${id}` }, durationMs, }); throw err; } } private async readResourceImpl<T extends Resource>( resourceType: T['resourceType'], id: string, options?: ReadResourceOptions ): Promise<WithId<T>> { if (!id || !isUUID(id)) { throw new OperationOutcomeError(notFound); } validateResourceType(resourceType); if (!this.supportsInteraction(AccessPolicyInteraction.READ, resourceType)) { throw new OperationOutcomeError(forbidden); } const cacheRecord = await this.getCacheEntry<T>(resourceType, id); if (cacheRecord) { // This is an optimization to avoid a database query. // However, it depends on all values in the cache having "meta.compartment" // Old versions of Medplum did not populate "meta.compartment" // So this optimization is blocked until we add a migration. // if (!this.canReadCacheEntry(cacheRecord)) { // throw new OperationOutcomeError(notFound); // } if (this.canPerformInteraction(AccessPolicyInteraction.READ, cacheRecord.resource)) { return cacheRecord.resource; } } if (options?.checkCacheOnly) { throw new OperationOutcomeError(notFound); } return this.readResourceFromDatabase(resourceType, id); } private async readResourceFromDatabase<T extends Resource>(resourceType: string, id: string): Promise<T> { if (!isUUID(id)) { throw new OperationOutcomeError(notFound); } const builder = new SelectQuery(resourceType).column('content').column('deleted').where('id', '=', id); this.addSecurityFilters(builder, resourceType); const rows = await builder.execute(this.getDatabaseClient(DatabaseMode.READER)); if (rows.length === 0) { throw new OperationOutcomeError(notFound); } if (rows[0].deleted) { throw new OperationOutcomeError(gone); } const resource = JSON.parse(rows[0].content as string) as WithId<T>; await this.setCacheEntry(resource); return resource; } async readReferences<T extends Resource>(references: Reference<T>[]): Promise<(WithId<T> | Error)[]> { await this.rateLimiter()?.recordRead(references.length); const cacheEntries = await this.getCacheEntries(references); const result: (WithId<T> | Error)[] = new Array(references.length); for (let i = 0; i < result.length; i++) { const startTime = Date.now(); const reference = references[i]; const cacheEntry = cacheEntries[i]; let entryResult = await this.processReadReferenceEntry(reference, cacheEntry); const durationMs = Date.now() - startTime; if (entryResult instanceof Error) { const reference = references[i]; this.logEvent(ReadInteraction, AuditEventOutcome.MinorFailure, entryResult, { resource: reference, durationMs, }); } else { entryResult = this.removeHiddenFields(entryResult); this.logEvent(ReadInteraction, AuditEventOutcome.Success, undefined, { resource: entryResult, durationMs }); } result[i] = entryResult as WithId<T> | Error; } return result; } private async processReadReferenceEntry( reference: Reference, cacheEntry: CacheEntry | undefined ): Promise<Resource | Error> { if (!reference.reference?.match(/^[A-Z][a-zA-Z]+\//)) { // Non-local references cannot be resolved return new OperationOutcomeError(notFound); } try { const [resourceType, id] = parseReference(reference); validateResourceType(resourceType); if (!this.supportsInteraction(AccessPolicyInteraction.READ, resourceType)) { return new OperationOutcomeError(forbidden); } if (cacheEntry) { if (!this.canPerformInteraction(AccessPolicyInteraction.READ, cacheEntry.resource)) { return new OperationOutcomeError(notFound); } return cacheEntry.resource; } return await this.readResourceFromDatabase(resourceType, id); } catch (err) { if (err instanceof OperationOutcomeError) { if (isNotFound(err.outcome) || isGone(err.outcome)) { // Only return "not found" or "gone" errors return err; } // Other errors should be treated as database errors throw err; } throw new OperationOutcomeError(normalizeOperationOutcome(err), { cause: err }); } } async readReference<T extends Resource>(reference: Reference<T>): Promise<WithId<T>> { let parts: [T['resourceType'], string]; try { parts = parseReference(reference); } catch (_err) { throw new OperationOutcomeError(badRequest('Invalid reference')); } return this.readResource(parts[0], parts[1]); } /** * Returns resource history. * * Results are sorted with oldest versions last * * See: https://www.hl7.org/fhir/http.html#history * @param resourceType - The FHIR resource type. * @param id - The FHIR resource ID. * @param options - The read history options. * @returns Operation outcome and a history bundle. */ async readHistory<T extends Resource>( resourceType: T['resourceType'], id: string, options?: ReadHistoryOptions ): Promise<Bundle<T>> { await this.rateLimiter()?.recordHistory(); const startTime = Date.now(); try { let resource: T | undefined = undefined; try { resource = await this.readResourceImpl<T>(resourceType, id); if (!this.canPerformInteraction(AccessPolicyInteraction.HISTORY, resource)) { throw new OperationOutcomeError(forbidden); } } catch (err) { if (!(err instanceof OperationOutcomeError) || !isGone(err.outcome)) { throw err; } } if (options?.offset !== undefined) { const maxOffset = getConfig().maxSearchOffset; if (maxOffset !== undefined && options.offset > maxOffset) { throw new OperationOutcomeError( badRequest(`Search offset exceeds maximum (got ${options.offset}, max ${maxOffset})`) ); } } const rows = await new SelectQuery(resourceType + '_History') .column('versionId') .column('id') .column('content') .column('lastUpdated') .where('id', '=', id) .orderBy('lastUpdated', true) .limit(clamp(0, options?.limit ?? 100, DEFAULT_MAX_SEARCH_COUNT)) .offset(Math.max(0, options?.offset ?? 0)) .execute(this.getDatabaseClient(DatabaseMode.READER)); const countRows = await new SelectQuery(resourceType + '_History') .raw('COUNT(*)::int AS "count"') .where('id', '=', id) .execute(this.getDatabaseClient(DatabaseMode.READER)); const totalCount = countRows[0].count as number; const entries: BundleEntry<T>[] = []; for (const row of rows) { const resource = row.content ? this.removeHiddenFields(JSON.parse(row.content as string)) : undefined; const outcome: OperationOutcome = row.content ? allOk : { resourceType: 'OperationOutcome', id: 'gone', issue: [ { severity: 'error', code: 'deleted', details: { text: 'Deleted on ' + row.lastUpdated, }, }, ], }; entries.push({ fullUrl: getFullUrl(resourceType, row.id), request: { method: 'GET', url: `${resourceType}/${row.id}/_history/${row.versionId}`, }, response: { status: getStatus(outcome).toString(), outcome, }, resource, }); } const durationMs = Date.now() - startTime; this.logEvent(HistoryInteraction, AuditEventOutcome.Success, undefined, { resource, durationMs }); return { resourceType: 'Bundle', type: 'history', entry: entries, total: totalCount, }; } catch (err) { const durationMs = Date.now() - startTime; this.logEvent(HistoryInteraction, AuditEventOutcome.MinorFailure, err, { resource: { reference: `${resourceType}/${id}` }, durationMs, }); throw err; } } async readVersion<T extends Resource>(resourceType: T['resourceType'], id: string, vid: string): Promise<T> { await this.rateLimiter()?.recordRead(); const startTime = Date.now(); const versionReference = { reference: `${resourceType}/${id}/_history/${vid}` }; try { if (!isUUID(id) || !isUUID(vid)) { throw new OperationOutcomeError(notFound); } try { const resource = await this.readResourceImpl<T>(resourceType, id); if (!this.canPerformInteraction(AccessPolicyInteraction.VREAD, resource)) { throw new OperationOutcomeError(forbidden); } } catch (err) { if (!isGone(normalizeOperationOutcome(err))) { throw err; } } const rows = await new SelectQuery(resourceType + '_History') .column('content') .where('id', '=', id) .where('versionId', '=', vid) .execute(this.getDatabaseClient(DatabaseMode.READER)); if (rows.length === 0) { throw new OperationOutcomeError(notFound); } const result = this.removeHiddenFields(JSON.parse(rows[0].content as string)); const durationMs = Date.now() - startTime; this.logEvent(VreadInteraction, AuditEventOutcome.Success, undefined, { resource: versionReference, durationMs }); return result; } catch (err) { const durationMs = Date.now() - startTime; this.logEvent(VreadInteraction, AuditEventOutcome.MinorFailure, err, { resource: versionReference, durationMs }); throw err; } } async updateResource<T extends Resource>(resource: T, options?: UpdateResourceOptions): Promise<WithId<T>> { await this.rateLimiter()?.recordWrite(); const startTime = Date.now(); try { let result: WithId<T>; if (options?.ifMatch) { // Conditional update requires transaction result = await this.withTransaction(() => this.updateResourceImpl(resource, false, options)); } else { result = await this.updateResourceImpl(resource, false, options); } const durationMs = Date.now() - startTime; await this.postCommit(async () => { this.logEvent(UpdateInteraction, AuditEventOutcome.Success, undefined, { resource: result, durationMs }); }); return result; } catch (err) { const durationMs = Date.now() - startTime; this.logEvent(UpdateInteraction, AuditEventOutcome.MinorFailure, err, { resource, durationMs }); throw err; } } private checkResourcePermissions<T extends Resource>(resource: T, interaction: AccessPolicyInteraction): WithId<T> { if (!isResourceWithId(resource)) { throw new OperationOutcomeError(badRequest('Missing id')); } const { resourceType, id } = resource; if (!isUUID(id)) { throw new OperationOutcomeError(badRequest('Invalid id')); } // Add default profiles before validating resource if (!resource.meta?.profile) { const defaultProfiles = this.currentProject()?.defaultProfile?.find( (o) => o.resourceType === resourceType )?.profile; if (defaultProfiles?.length) { resource.meta = { ...resource.meta, profile: defaultProfiles }; } } if (!this.supportsInteraction(interaction, resourceType)) { throw new OperationOutcomeError(forbidden); } return resource; } private async updateResourceImpl<T extends Resource>( resource: T, create: boolean, options?: UpdateResourceOptions ): Promise<WithId<T>> { const interaction = create ? AccessPolicyInteraction.CREATE : AccessPolicyInteraction.UPDATE; let validatedResource = this.checkResourcePermissions(resource, interaction); const { resourceType, id } = validatedResource; const preCommitResult = await preCommitValidation(this, validatedResource, 'update'); if ( isResourceWithId(preCommitResult, validatedResource.resourceType) && preCommitResult.id === validatedResource.id ) { validatedResource = this.checkResourcePermissions(preCommitResult, interaction); } const existing = create ? undefined : await this.checkExistingResource<T>(resourceType, id); if (existing) { (existing.meta as Meta).compartment = this.getCompartments(existing); // Update compartments with latest rules if (!this.canPerformInteraction(interaction, existing)) { // Check before the update throw new OperationOutcomeError(forbidden); } if (options?.ifMatch && existing.meta?.versionId !== options.ifMatch) { throw new OperationOutcomeError(preconditionFailed); } } let updated = await rewriteAttachments(RewriteMode.REFERENCE, this, { ...this.restoreReadonlyFields(validatedResource, existing), }); updated = await replaceConditionalReferences(this, updated); const resultMeta: Meta = { ...updated.meta, versionId: this.generateId(), lastUpdated: this.getLastUpdated(existing, validatedResource), author: this.getAuthor(validatedResource), onBehalfOf: this.context.onBehalfOf, }; const result = { ...updated, meta: resultMeta }; const projectId = this.getProjectId(existing, updated); if (projectId) { resultMeta.project = projectId; } const accounts = await this.getAccounts(existing, updated); if (accounts) { resultMeta.account = accounts[0]; resultMeta.accounts = accounts; } resultMeta.compartment = this.getCompartments(result); // Validate resource after all modifications and touchups above are done await this.validateResource(result); if (this.context.checkReferencesOnWrite) { await this.preCommit(async () => { await validateResourceReferences(this, result); }); } if (this.isNotModified(existing, result)) { this.removeHiddenFields(existing); return existing; } if (!this.isResourceWriteable(existing, result, interaction)) { // Check after the update throw new OperationOutcomeError(forbidden); } await this.handleStorage(result, create); await this.postCommit(async () => this.handleBinaryUpdate(existing, result)); await this.postCommit(async () => { const project = await this.getProjectById(projectId); await addBackgroundJobs(result, existing, { project, interaction }); }); const output = deepClone(result); return this.removeHiddenFields(output); } /** * Handles a Binary resource update. * If the resource has embedded base-64 data, writes the data to the binary storage. * Otherwise if the resource already exists, copies the existing binary to the new resource. * @param existing - Existing binary if it exists. * @param resource - The resource to write to the database. */ private async handleBinaryUpdate<T extends Resource>(existing: T | undefined, resource: T): Promise<void> { if (resource.resourceType !== 'Binary') { return; } if (resource.data) { await this.handleBinaryData(resource); } else if (existing) { await getBinaryStorage().copyBinary(existing as Binary, resource); } } /** * Handles a Binary resource with embedded base-64 data. * Writes the data to the binary storage and removes the data field from the resource. * @param resource - The resource to write to the database. */ private async handleBinaryData(resource: Binary): Promise<void> { // Parse result.data as a base64 string const buffer = Buffer.from(resource.data as string, 'base64'); // Convert buffer to a Readable stream const stream = new Readable({ read() { this.push(buffer); this.push(null); // Signifies the end of the stream (EOF) }, }); // Write the stream to the binary storage await getBinaryStorage().writeBinary(resource, undefined, resource.contentType, stream); // Remove the data field from the resource resource.data = undefined; } /** * Handles persisting data to at-rest storage: cache and/or database. * This method handles all the special cases for storage, including cache invalidation. * @param resource - The resource to store. * @param create - Whether the resource is being create, or updated in place. */ private async handleStorage(resource: WithId<Resource>, create: boolean): Promise<void> { if (!this.isCacheOnly(resource)) { await this.writeToDatabase(resource, create); } await this.setCacheEntry(resource); // Handle special cases for resource caching if (resource.resourceType === 'Subscription' && resource.channel?.type === 'websocket') { const redis = getRedis(); const project = resource?.meta?.project; if (!project) { throw new OperationOutcomeError(serverError(new Error('No project connected to the specified Subscription.'))); } // WebSocket Subscriptions are also cache-only, but also need to be added to a special cache key await redis.sadd(`medplum:subscriptions:r4:project:${project}:active`, `Subscription/${resource.id}`); } if (resource.resourceType === 'StructureDefinition') { await removeCachedProfile(resource); } } /** * Validates a resource against the current project configuration. * If strict mode is enabled (default), validates against base StructureDefinition and all profiles. * If strict mode is disabled, validates against the legacy JSONSchema validator. * Throws on validation errors. * Returns silently on success. * @param resource - The candidate resource to validate. */ async validateResource(resource: Resource): Promise<void> { if (this.context.strictMode) { await this.validateResourceStrictly(resource); } else { // Perform loose validation first to detect any severe issues validateResourceWithJsonSchema(resource); // Attempt strict validation and log warnings on failure try { await this.validateResourceStrictly(resource); } catch (err: any) { getLogger().warn('Strict validation would fail', { resource: getReferenceString(resource), err, }); } } } async validateResourceStrictly(resource: Resource): Promise<void> { const logger = getLogger(); const start = process.hrtime.bigint(); // Prepare validator options let options: ValidatorOptions | undefined; if (this.context.validateTerminology) { const tokens = Object.create(null); options = { ...options, collect: { tokens } }; } // Validate resource against base FHIR spec const issues = validateResource(resource, options); for (const issue of issues) { logger.warn(`Validator warning: ${issue.details?.text}`, { project: this.context.projects?.[0]?.id, issue }); } // Validate profiles after verifying compliance with base spec const profileUrls = resource.meta?.profile; if (profileUrls) { await this.validateProfiles(resource, profileUrls, options); } // (Optionally) check any required terminology bindings found if (this.context.validateTerminology && options?.collect?.tokens) { await this.validateTerminology(options.collect.tokens, issues); if (issues.some((iss) => iss.severity === 'error')) { throw new OperationOutcomeError({ resourceType: 'OperationOutcome', issue: issues }); } } // Track latency for successful validation const durationMs = Number(process.hrtime.bigint() - start) / 1e6; // Convert nanoseconds to milliseconds recordHistogramValue('medplum.server.validationDurationMs', durationMs, { options: { unit: 'ms' } }); if (durationMs > 10) { logger.debug('High validator latency', { resourceType: resource.resourceType, id: resource.id, durationMs, }); } } private async validateProfiles(resource: Resource, profileUrls: string[], options?: ValidatorOptions): Promise<void> { const logger = getLogger(); for (const url of profileUrls) { const loadStart = process.hrtime.bigint(); const profile = await this.loadProfile(url); const loadTime = Number(process.hrtime.bigint() - loadStart); if (!profile) { logger.warn('Unknown profile referenced', { resource: `${resource.resourceType}/${resource.id}`, url, }); continue; } const validateStart = process.hrtime.bigint(); validateResource(resource, { ...options, profile }); const validateTime = Number(process.hrtime.bigint() - validateStart); logger.debug('Profile loaded', { url, loadTime, validateTime, }); } } private async validateTerminology( tokens: Record<string, TypedValueWithPath[]>, issues: OperationOutcomeIssue[] ): Promise<void> { for (const [url, values] of Object.entries(tokens)) { const valueSet = await findTerminologyResource<ValueSet>(this, 'ValueSet', url); const resultCache: Record<string, boolean | undefined> = Object.create(null); for (const value of values) { let codings: Coding[] | undefined; switch (value.type) { case 'CodeableConcept': codings = (value.value as CodeableConcept).coding; break; case 'Coding': codings = [value.value as Coding]; break; default: { const cachedResult = resultCache[`${value.type}|${value.value}`]; if (cachedResult === false) { issues.push({ severity: 'error', code: 'value', details: { text: `Value ${JSON.stringify(value.value)} did not satisfy terminology binding ${url}` }, expression: [value.path], }); } if (cachedResult !== undefined) { continue; } codings = [{ code: value.value as string }]; break; } } if (!codings?.length) { continue; } const matchedCoding = await validateCodingInValueSet(this, valueSet, codings); resultCache[`${value.type}|${value.value}`] = Boolean(matchedCoding); if (!matchedCoding) { issues.push({ severity: 'error', code: 'value', details: { text: `Value ${JSON.stringify(value.value)} did not satisfy terminology binding ${url}` }, expression: [value.path], }); } } } } private async loadProfile(url: string): Promise<StructureDefinition | undefined> { if (this.context.projects?.length) { // Try loading from cache, using all available Project IDs const cacheKeys = this.context.projects.map((p) => getProfileCacheKey(p.id, url)); const results = await getRedis().mget(...cacheKeys); const cachedProfile = results.find(Boolean) as string | undefined; if (cachedProfile) { return (JSON.parse(cachedProfile) as CacheEntry<StructureDefinition>).resource; } } // Fall back to loading from the DB; descending version sort approximates version resolution for some cases const profile = await this.searchOne<StructureDefinition>({ resourceType: 'StructureDefinition', filters: [ { code: 'url', operator: Operator.EQUALS, value: url, }, ], sortRules: [ { code: 'version', descending: true, }, { code: 'date', descending: true, }, ], }); if (this.context.projects?.length && profile) { // Store loaded profile in cache await cacheProfile(profile); } return profile; } /** * Writes the resource to the database. * This is a single atomic operation inside of a transaction. * @param resource - The resource to write to the database. * @param create - If true, then the resource is being created. */ private async writeToDatabase<T extends WithId<Resource>>(resource: T, create: boolean): Promise<void> { await this.ensureInTransaction(async (client) => { await this.writeResource(client, resource); await this.writeResourceVersion(client, resource); await this.writeLookupTables(client, resource, create); }); } /** * Tries to return the existing resource, if it is available. * Handles the following cases: * - Previous version exists * - Previous version was deleted, and user is restoring it * - Previous version does not exist, and user does not have permission to create by ID * - Previous version does not exist, and user does have permission to create by ID * @param resourceType - The FHIR resource type. * @param id - The resource ID. * @returns The existing resource, if found. */ private async checkExistingResource<T extends Resource>( resourceType: T['resourceType'], id: string ): Promise<WithId<T> | undefined> { try { return await this.readResourceImpl<T>(resourceType, id); } catch (err) { const outcome = normalizeOperationOutcome(err); if (!isOk(outcome) && !isNotFound(outcome) && !isGone(outcome)) { throw new OperationOutcomeError(outcome, { cause: err }); } if (isNotFound(outcome) && !this.canSetId()) { throw new OperationOutcomeError(outcome, { cause: err }); } // Otherwise, it is ok if the resource is not found. // This is an "update" operation, and the outcome is "not-found" or "gone", // and the current user has permission to create a new version. return undefined; } } /** * Returns true if the resource is not modified from the existing resource. * @param existing - The existing resource. * @param updated - The updated resource. * @returns True if the resource is not modified. */ private isNotModified<T extends Resource>(existing: T | undefined, updated: T): existing is T { if (!existing) { return false; } // When stricter FHIR validation is enabled, then this can be removed. // At present, there are some cases where a server accepts "empty" values that escape the deep equals. const cleanExisting = JSON.parse(stringify(existing)); const cleanUpdated = JSON.parse(stringify(updated)); return deepEquals(cleanExisting, cleanUpdated); } /** * Reindexes the resource. * This is only available to the system and super admin accounts. * This should not result in any change to the resource or its history. * @param resourceType - The resource type. * @param id - The resource ID. * @returns Promise to complete. */ async reindexResource<T extends Resource = Resource>(resourceType: T['resourceType'], id: string): Promise<void> { if (!this.isSuperAdmin()) { throw new OperationOutcomeError(forbidden); } await this.withTransaction(async (conn) => { const resource = await this.readResourceImpl<T>(resourceType, id); return this.reindexResources(conn, [resource]); }); } /** * Internal implementation of reindexing a resource. * This accepts a resource as a parameter, rather than a resource type and ID. * When doing a bulk reindex, this will be more efficient because it avoids unnecessary reads. * @param conn - Database client to use for reindex operations. * @param resources - The resource(s) to reindex. */ async reindexResources<T extends Resource>(conn: PoolClient, resources: WithId<T>[]): Promise<void> { if (!this.isSuperAdmin()) { throw new OperationOutcomeError(forbidden); } // Since the page size could be relatively large (1k+), preferring a simple for loop with re-used variables // eslint-disable-next-line @typescript-eslint/prefer-for-of for (let i = 0; i < resources.length; i++) { const resource = resources[i]; const meta = resource.meta as Meta; meta.compartment = this.getCompartments(resource); if (!meta.project) { const projectRef = meta.compartment.find((r) => r.reference?.startsWith('Project/')); meta.project = resolveId(projectRef); } } await this.batchWriteLookupTables(conn, resources, false); await this.batchWriteResources(conn, resources); } /** * Resends subscriptions for the resource. * This is only available to the admin accounts. * This should not result in any change to the resource or its history. * @param resourceType - The resource type. * @param id - The resource ID. * @param options - Additional options. * @returns Promise to complete. */ async resendSubscriptions<T extends Resource = Resource>( resourceType: T['resourceType'], id: string, options?: ResendSubscriptionsOptions ): Promise<void> { if (!this.isSuperAdmin() && !this.isProjectAdmin()) { throw new OperationOutcomeError(forbidden); } const resource = await this.readResourceImpl<T>(resourceType, id); const interaction = options?.interaction ?? 'update'; let previousVersion: T | undefined; if (interaction === 'update') { const history = await this.readHistory(resourceType, id, { limit: 2 }); if (history.entry?.[0]?.resource?.meta?.versionId !== resource.meta?.versionId) { throw new OperationOutcomeError(preconditionFailed); } previousVersion = history.entry?.[1]?.resource; } return addSubscriptionJobs( resource, previousVersion, { project: await this.getProjectById(resource.meta?.project), interaction, }, options ); } async deleteResource<T extends Resource = Resource>(resourceType: T['resourceType'], id: string): Promise<void> { await this.rateLimiter()?.recordWrite(); const startTime = Date.now(); let resource: WithId<T>; try { resource = await this.readResourceImpl<T>(resourceType, id); } catch (err) { const outcomeErr = err as OperationOutcomeError; if (isGone(outcomeErr.outcome)) { return; // Resource is already deleted, return successfully } throw err; } try { if (!this.canPerformInteraction(AccessPolicyInteraction.DELETE, resource)) { throw new OperationOutcomeError(forbidden); } await preCommitValidation(this, resource, 'delete'); await this.deleteCacheEntry(resourceType, id); if (!this.isCacheOnly(resource)) { await this.ensureInTransaction(async (conn) => { const lastUpdated = new Date(); const content = ''; const columns: Record<string, any> = { id, lastUpdated, deleted: true, projectId: resource.meta?.project ?? systemResourceProjectId, content, __version: -1, }; if (resourceType !== 'Binary') { columns['compartments'] = this.getCompartments(resource).map((ref) => resolveId(ref)); } for (const searchParam of getStandardAndDerivedSearchParameters(resourceType)) { this.buildColumn({ resourceType } as Resource, columns, searchParam); } await new InsertQuery(resourceType, [columns]).mergeOnConflict().execute(conn); await new InsertQuery(resourceType + '_History', [ { id, versionId: this.generateId(), lastUpdated, content, }, ]).execute(conn); await this.deleteFromLookupTables(conn, resource); const durationMs = Date.now() - startTime; await this.postCommit(async () => { this.logEvent(DeleteInteraction, AuditEventOutcome.Success, undefined, { resource, durationMs }); }); }); } await addSubscriptionJobs(resource, resource, { project: await this.getProjectById(resource.meta?.project), interaction: 'delete', }); } catch (err) { const durationMs = Date.now() - startTime; this.logEvent(DeleteInteraction, AuditEventOutcome.MinorFailure, err, { resource: { reference: `${resourceType}/${id}` }, durationMs, }); throw err; } } async patchResource<T extends Resource>( resourceType: T['resourceType'], id: string, patch: Operation[], options?: UpdateResourceOptions ): Promise<WithId<T>> { await this.rateLimiter()?.recordWrite(); const startTime = Date.now(); try { return await this.ensureInTransaction(async () => { const resource = await this.readResourceFromDatabase<T>(resourceType, id); if (resource.resourceType !== resourceType) { throw new OperationOutcomeError(badRequest('Incorrect resource type')); } if (resource.id !== id) { throw new OperationOutcomeError(badRequest('Incorrect ID')); } patchObject(resource, patch); const result = await this.updateResourceImpl(resource, false, options); const durationMs = Date.now() - startTime; await this.postCommit(async () => { this.logEvent(PatchInteraction, AuditEventOutcome.Success, undefined, { resource: result, durationMs }); }); return result; }); } catch (err) { const durationMs = Date.now() - startTime; this.logEvent(PatchInteraction, AuditEventOutcome.MinorFailure, err, { resource: { reference: `${resourceType}/${id}` }, durationMs, }); throw err; } } /** * Permanently deletes the specified resource and all of its history. * This is only available to the system and super admin accounts. * @param resourceType - The FHIR resource type. * @param id - The resource ID. */ async expungeResource(resourceType: string, id: string): Promise<void> { await this.expungeResources(resourceType, [id]); } /** * Permanently deletes the specified resources and all of its history. * This is only available to the system and super admin accounts. * @param resourceType - The FHIR resource type. * @param ids - The resource IDs. */ async expungeResources(resourceType: string, ids: string[]): Promise<void> { if (!this.isSuperAdmin() && !this.isProjectAdmin()) { throw new OperationOutcomeError(forbidden); } if (ids.length === 0) { return; } await this.withTransaction(async (client) => { for (const id of ids) { await this.deleteFromLookupTables(client, { resourceType, id } as Resource); } const db = this.getDatabaseClient(DatabaseMode.WRITER); await new DeleteQuery(resourceType).where('id', 'IN', ids).execute(db); await new DeleteQuery(resourceType + '_History').where('id', 'IN', ids).execute(db); await this.postCommit(() => this.deleteCacheEntries(resourceType, ids)); }); incrementCounter( `medplum.fhir.interaction.delete.count`, { attributes: { resourceType, result: 'success' } }, ids.length ); await this.resourceCap()?.deleted(ids.length); } /** * Purges resources of the specified type that were last updated before the specified date. * This is only available to the system and super admin accounts. * @param resourceType - The FHIR resource type. * @param before - The date before which resources should be purged. */ async purgeResources(resourceType: ResourceType, before: string): Promise<void> { if (!this.isSuperAdmin()) { throw new OperationOutcomeError(forbidden); } const client = this.getDatabaseClient(DatabaseMode.WRITER); // Delete from lookup tables first // These operations use the main resource table for lastUpdated, so must come first for (const lookupTable of lookupTables) { await lookupTable.purgeValuesBefore(client, resourceType, before); } await new DeleteQuery(resourceType).where('lastUpdated', '<=', before).execute(client); await new DeleteQuery(resourceType + '_History').where('lastUpdated', '<=', before).execute(client); } async search<T extends Resource>( searchRequest: SearchRequest<T>, options?: SearchOptions ): Promise<Bundle<WithId<T>>> { await this.rateLimiter()?.recordSearch(); const startTime = Date.now(); try { // Resource type validation is performed in the searchImpl function const result = await searchImpl(this, searchRequest, options); const durationMs = Date.now() - startTime; this.logEvent(SearchInteraction, AuditEventOutcome.Success, undefined, { searchRequest, durationMs }); return result; } catch (err) { const durationMs = Date.now() - startTime; this.logEvent(SearchInteraction, AuditEventOutcome.MinorFailure, err, { searchRequest, durationMs }); throw err; } } async processAllResources<T extends Resource>( initialSearchRequest: SearchRequest<T>, process: (resource: WithId<T>) => Promise<void>, options?: ProcessAllResourcesOptions ): Promise<void> { let searchRequest: SearchRequest<T> | undefined = initialSearchRequest; while (searchRequest) { const bundle: Bundle<T> = await this.search<T>(searchRequest); if (!bundle.entry?.length) { break; } for (const entry of bundle.entry) { if (entry.resource?.id) { await process(entry.resource as WithId<T>); } } const nextLink = bundle.link?.find((b) => b.relation === 'next'); if (nextLink) { searchRequest = parseSearchRequest<T>(nextLink.url); if (options?.delayBetweenPagesMs) { await sleep(options.delayBetweenPagesMs); } } else { searchRequest = undefined; } } } async searchByReference<T extends Resource>( searchRequest: SearchRequest<T>, referenceField: string, references: string[] ): Promise<Record<string, WithId<T>[]>> { await this.rateLimiter()?.recordSearch(references.length); const startTime = Date.now(); try { const result = await searchByReferenceImpl(this, searchRequest, referenceField, references); const durationMs = Date.now() - startTime; for (const ref of references) { const refFilter: Filter = { code: referenceField, operator: 'eq', value: ref }; const refSearch: SearchRequest = { ...searchRequest, filters: searchRequest.filters ? [...searchRequest.filters, refFilter] : [refFilter], }; this.logEvent(SearchInteraction, AuditEventOutcome.Success, undefined, { searchRequest: refSearch, durationMs, }); } return result; } catch (err) { const durationMs = Date.now() - startTime; this.logEvent(SearchInteraction, AuditEventOutcome.MinorFailure, err, { searchRequest, durationMs }); throw err; } } /** * Adds filters to ignore soft-deleted resources. * @param builder - The select query builder. */ addDeletedFilter(builder: SelectQuery): void { builder.where('deleted', '=', false); } /** * Adds security filters to the select query. * @param builder - The select query builder. * @param resourceType - The resource type for compartments. */ addSecurityFilters(builder: SelectQuery, resourceType: string): void { // No compartment restrictions for admins. if (!this.isSuperAdmin()) { this.addProjectFilters(builder, resourceType); } this.addAccessPolicyFilters(builder, resourceType); } /** * Adds the "project" filter to the select query. * @param builder - The select query builder. * @param resourceType - The resource type being searched. */ private addProjectFilters(builder: SelectQuery, resourceType: string): void { if (this.context.projects?.length) { const projectIds = [this.context.projects[0].id]; // Always include the first project for (let i = 1; i < this.context.projects.length; i++) { const project = this.context.projects[i]; if ( resourceType === 'Project' || // When searching for projects, include all projects project.id === this.context.currentProject?.id || // Always include the current project (usually the same as the first project) !project.exportedResourceType?.length || // Include projects that do not specify exported resource types project.exportedResourceType?.includes(resourceType as ResourceType) // Include projects that export resourceType ) { projectIds.push(project.id); } } builder.where('projectId', 'IN', projectIds); } } /** * Adds access policy filters to the select query. * @param builder - The select query builder. * @param resourceType - The resource type being searched. */ private addAccessPolicyFilters(builder: SelectQuery, resourceType: string): void { const accessPolicy = this.context.accessPolicy; if (!accessPolicy?.resource) { return; } // Binary has no search parameters, so it cannot be restricted by an access policy if (resourceType === 'Binary') { return; } const expressions: Expression[] = []; for (const policy of accessPolicy.resource) { if (policy.resourceType === resourceType || policy.resourceType === '*') { const policyCompartmentId = resolveId(policy.compartment); if (policyCompartmentId) { // Deprecated - to be removed // Add compartment restriction for the access policy. expressions.push( new Condition('compartments', 'ARRAY_OVERLAPS_AND_IS_NOT_NULL', policyCompartmentId, 'UUID[]') ); } else if (policy.criteria) { if (!policy.criteria.startsWith(policy.resourceType + '?')) { getLogger().warn('Invalid access policy criteria', { accessPolicy: accessPolicy.id, resourceType: policy.resourceType, criteria: policy.criteria, }); return; // Ignore invalid access policy criteria } // Add subquery for access policy criteria. let criteria = policy.criteria; if (policy.resourceType === '*') { const queryIndex = criteria.indexOf('?'); criteria = resourceType + '?' + criteria.slice(queryIndex + 1); } const searchRequest = parseSearchRequest(criteria); const accessPolicyExpression = buildSearchExpression( this, builder, searchRequest.resourceType, searchRequest ); if (accessPolicyExpression) { expressions.push(accessPolicyExpression); } } else { // Allow access to all resources in the compartment. return; } } } if (expressions.length > 0) { builder.predicate.expressions.push(new Disjunction(expressions)); } } private buildResourceRow(resource: Resource): Record<string, any> { const resourceType = resource.resourceType; const meta = resource.meta as Meta; const content = stringify(resource); const row: Record<string, any> = { id: resource.id, lastUpdated: meta.lastUpdated, deleted: false, projectId: meta.project ?? systemResourceProjectId, content, __version: Repository.VERSION, }; const searchParams = getStandardAndDerivedSearchParameters(resourceType); if (searchParams.length > 0) { const startTime = process.hrtime.bigint(); try { for (const searchParam of searchParams) { this.buildColumn(resource, row, searchParam); } } catch (err) { getLogger().error('Error building row for resource', { resource: `${resourceType}/${resource.id}`, err, }); throw err; } recordHistogramValue( 'medplum.server.indexingDurationMs', Number(process.hrtime.bigint() - startTime) / 1e6, // High resolution time, converted from ns to ms { options: { unit: 'ms' }, } ); } return row; } /** * Writes the resource to the resource table. * This builds all search parameter columns. * This does *not* write the version to the history table. * @param client - The database client inside the transaction. * @param resource - The resource. */ private async writeResource(client: PoolClient, resource: Resource): Promise<void> { await new InsertQuery(resource.resourceType, [this.buildResourceRow(resource)]).mergeOnConflict().execute(client); } private async batchWriteResources(client: PoolClient, resources: Resource[]): Promise<void> { if (!resources.length) { return; } await new InsertQuery( resources[0].resourceType, resources.map((r) => this.buildResourceRow(r)) ) .mergeOnConflict() .execute(client); } /** * Writes a version of the resource to the resource history table. * @param client - The database client inside the transaction. * @param resource - The resource. */ private async writeResourceVersion(client: PoolClient, resource: Resource): Promise<void> { const resourceType = resource.resourceType; const meta = resource.meta as Meta; const content = stringify(resource); await new InsertQuery(resourceType + '_History', [ { id: resource.id, versionId: meta.versionId, lastUpdated: meta.lastUpdated, content, }, ]).execute(client); } /** * Builds a list of compartments for the resource for writing. * FHIR compartments are used for two purposes. * 1) Search narrowing (i.e., /Patient/123/Observation searches within the patient compartment). * 2) Access controls. * @param resource - The resource. * @returns The list of compartments for the resource. */ private getCompartments(resource: WithId<Resource>): Reference[] { const compartments = new Set<string>(); if (resource.meta?.project && isUUID(resource.meta.project)) { // Deprecated - to be removed after migrating all tables to use "projectId" column compartments.add('Project/' + resource.meta.project); } if (resource.resourceType === 'User' && resource.project?.reference && isUUID(resolveId(resource.project) ?? '')) { // Deprecated - to be removed after migrating all tables to use "projectId" column compartments.add(resource.project.reference); } if (resource.meta?.accounts) { for (const account of resource.meta.accounts) { const id = resolveId(account); if (!account.reference?.startsWith('Project/') && id && isUUID(id)) { compartments.add(account.reference as string); } } } else if (resource.meta?.account && !resource.meta.account.reference?.startsWith('Project/')) { const id = resolveId(resource.meta.account); if (id && isUUID(id)) { compartments.add(resource.meta.account.reference as string); } } for (const patient of getPatients(resource)) { const patientId = resolveId(patient); if (patientId && isUUID(patientId)) { compartments.add(patient.reference); } } const results: Reference[] = []; for (const reference of compartments.values()) { results.push({ reference }); } return results; } /** * Builds the columns to write for a given resource and search parameter. * If nothing to write, then no columns will be added. * Some search parameters can result in multiple columns (for example, Reference objects). * @param resource - The resource to write. * @param columns - The output columns to write. * @param searchParam - The search parameter definition. */ private buildColumn(resource: Resource, columns: Record<string, any>, searchParam: SearchParameter): void { if ( searchParam.code === '_id' || searchParam.code === '_lastUpdated' || searchParam.code === '_compartment:identifier' || searchParam.code === '_deleted' || searchParam.type === 'composite' ) { return; } if (searchParam.code === '_compartment') { columns['compartments'] = resource.meta?.compartment?.map((ref) => resolveId(ref)) ?? []; return; } const impl = getSearchParameterImplementation(resource.resourceType, searchParam); if (impl.searchStrategy === 'lookup-table') { if (impl.sortColumnName) { columns[impl.sortColumnName] = getHumanNameSortValue((resource as HumanNameResource).name, searchParam); } return; } const typedValues = evalFhirPathTyped(impl.parsedExpression, [toTypedValue(resource)]); // Handle special case for "MeasureReport-period" // This is a trial for using "tstzrange" columns for date/time ranges. // Eventually, this special case will go away, and this will become the default behavior for all "date" search parameters. if (searchParam.id === 'MeasureReport-period') { columns['period_range'] = this.buildPeriodColumn(typedValues[0]?.value); } if (impl.searchStrategy === 'token-column') { buildTokenColumns(searchParam, impl, columns, resource, { paddingConfig: getArrayPaddingConfig(searchParam, resource.resourceType), }); return; } impl satisfies ColumnSearchParameterImplementation; const columnValues = this.buildColumnValues(searchParam, impl, typedValues); if (impl.array) { columns[impl.columnName] = columnValues.length > 0 ? columnValues : undefined; } else { columns[impl.columnName] = columnValues[0]; } } /** * Builds a single value for a given search parameter. * If the search parameter is an array, then this method will be called for each element. * If the search parameter is not an array, then this method will be called for the value. * @param searchParam - The search parameter definition. * @param details - The extra search parameter details. * @param typedValues - The FHIR resource value. * @returns The column value. */ private buildColumnValues( searchParam: SearchParameter, details: SearchParameterDetails, typedValues: TypedValue[] ): (boolean | number | string | undefined | null)[] { if (details.type === SearchParameterType.BOOLEAN) { const value = typedValues[0]?.value; if (value === undefined || value === null) { return [null]; } return [value === true || value === 'true']; } if (details.type === SearchParameterType.DATE) { // "Date" column is a special case that only applies when the following conditions are true: // 1. The search parameter is a date type. // 2. The underlying FHIR ElementDefinition referred to by the search parameter has a type of "date". return flatMapFilter(convertToSearchableDates(typedValues), (p) => (p.start ?? p.end)?.substring(0, 10)); } if (details.type === SearchParameterType.DATETIME) { // Future work: write the whole period to the DB after migrating all "date" search parameters to use a tstzrange. return flatMapFilter(convertToSearchableDates(typedValues), (p) => p.start ?? p.end); } if (searchParam.type === 'number') { // Future work: write the whole range to the DB after migrating all "number" search parameters to use a range. return flatMapFilter(convertToSearchableNumbers(typedValues), ([low, high]) => low ?? high); } if (searchParam.type === 'quantity') { // Future work: write the whole range to the DB after migrating all "quantity" search parameters to use a range. return flatMapFilter(convertToSearchableQuantities(typedValues), (q) => q.value); } if (searchParam.type === 'reference') { return flatMapFilter(convertToSearchableReferences(typedValues), truncateTextColumn); } if (searchParam.type === 'token') { return flatMapFilter(convertToSearchableTokens(typedValues), (t) => truncateTextColumn(t.value)); } if (searchParam.type === 'string') { return flatMapFilter(convertToSearchableStrings(typedValues), truncateTextColumn); } if (searchParam.type === 'uri') { return flatMapFilter(convertToSearchableUris(typedValues), truncateTextColumn); } if (searchParam.type === 'special' || searchParam.type === 'composite') { // Special and composite search parameters are not supported in the database. return []; } throw new Error('Unrecognized search parameter type: ' + searchParam.type); } /** * Builds the column value for a "date" search parameter. * This is currently in trial mode. The intention is for this to replace all "date" and "date/time" search parameters. * @param value - The FHIRPath result value. * @returns The period column string value. */ private buildPeriodColumn(value: any): string | undefined { const period = toPeriod(value); if (period) { return periodToRangeString(period); } return undefined; } /** * Writes resources values to the lookup tables. * @param client - The database client inside the transaction. * @param resource - The resource to index. * @param create - If true, then the resource is being created. */ private async writeLookupTables(client: PoolClient, resource: WithId<Resource>, create: boolean): Promise<void> { for (const lookupTable of lookupTables) { await lookupTable.indexResource(client, resource, create); } } private async batchWriteLookupTables<T extends Resource>( client: PoolClient, resources: WithId<T>[], create: boolean ): Promise<void> { for (const lookupTable of lookupTables) { await lookupTable.batchIndexResources(client, resources, create); } } /** * Deletes values from lookup tables. * @param client - The database client inside the transaction. * @param resource - The resource to delete. */ private async deleteFromLookupTables(client: Pool | PoolClient, resource: Resource): Promise<void> { for (const lookupTable of lookupTables) { await lookupTable.deleteValuesForResource(client, resource); } } /** * Returns the last updated timestamp for the resource. * During historical data migration, some client applications are allowed * to override the timestamp. * @param existing - Existing resource if one exists. * @param resource - The FHIR resource. * @returns The last updated date. */ private getLastUpdated(existing: Resource | undefined, resource: Resource): string { if (!existing) { // If the resource has a specified "lastUpdated", // and there is no existing version, // and the current context is a ClientApplication (i.e., OAuth client credentials), // then allow the ClientApplication to set the date. const lastUpdated = resource.meta?.lastUpdated; if (lastUpdated && this.canWriteProtectedMeta()) { return lastUpdated; } } // Otherwise, use "now" return new Date().toISOString(); } /** * Returns the project ID for the resource. * If it is a public resource type, then returns the public project ID. * If it is a protected resource type, then returns the Medplum project ID. * Otherwise, by default, return the current context project ID. * @param existing - Existing resource if one exists. * @param updated - The FHIR resource. * @returns The project ID. */ private getProjectId(existing: Resource | undefined, updated: Resource): string | undefined { if (updated.resourceType === 'Project') { return updated.id; } if (updated.resourceType === 'ProjectMembership') { return resolveId(updated.project); } if (updated.resourceType === 'User' && this.isSuperAdmin()) { // Super admins can add, remove, and the project compartment of users. return updated?.meta?.project; } if (protectedResourceTypes.includes(updated.resourceType)) { return undefined; } const submittedProjectId = updated.meta?.project; if (submittedProjectId && this.canWriteProtectedMeta()) { // If the resource has an project (whether provided or from existing), // and the current context is allowed to write meta, // then use the provided value. return submittedProjectId; } return existing?.meta?.project ?? this.context.projects?.[0]?.id; } /** * Returns the author reference. * If the current context is allowed to write meta, * and the provided resource includes an author reference, * then use the provided value. * Otherwise uses the current context profile. * @param resource - The FHIR resource. * @returns The author value. */ getAuthor(resource?: Resource): Reference { // If the resource has an author (whether provided or from existing), // and the current context is allowed to write meta, // then use the provided value. const author = resource?.meta?.author; if (author && this.canWriteProtectedMeta()) { return author; } return this.context.author; } /** * Returns the author reference string (resourceType/id). * If the current context is a ClientApplication, handles "on behalf of". * Otherwise uses the current context profile. * @param existing - Current (soon to be previous) resource, if one exists. * @param updated - The incoming updated resource. * @returns The account values. */ private async getAccounts( existing: WithId<Resource> | undefined, updated: WithId<Resource> ): Promise<Reference[] | undefined> { if (updated.meta && this.canWriteAccount()) { // If the user specifies accounts, and they have permission, then use the provided accounts. const updatedAccounts = extractAccountReferences(updated.meta); return updatedAccounts; } const accounts = new Set<string>(); if (!existing && this.context.accessPolicy?.compartment?.reference) { // If the creator's access policy specifies a compartment, then use it as the account. // The writer's access policy is only applied at resource creation: simply editing a // resource does NOT pull it into the user's account. accounts.add(this.context.accessPolicy.compartment.reference); } if (updated.resourceType === 'Patient') { // When examining a Patient resource, we only look at the individual patient // We should not call `getPatients` and `readReference` const existingAccounts = extractAccountReferences(existing?.meta); if (existingAccounts?.length) { for (const account of existingAccounts) { accounts.add(account.reference as string); } } } else { const systemRepo = getSystemRepo(this.conn); // Re-use DB connection to preserve transaction state const patients = await systemRepo.readReferences(getPatients(updated)); for (const patient of patients) { if (patient instanceof Error) { getLogger().debug('Error setting patient compartment', patient); continue; } // If the patient has an account, then use it as the resource account. const patientAccounts = extractAccountReferences(patient.meta); if (patientAccounts?.length) { for (const account of patientAccounts) { if (account.reference) { accounts.add(account.reference); } } } } } if (accounts.size < 1) { return undefined; } const result: Reference[] = []; for (const reference of accounts) { result.push({ reference }); } return result; } /** * Determines if the current user can manually set the ID field. * This is very powerful, and reserved for the system account. * @returns True if the current user can manually set the ID field. */ private canSetId(): boolean { return this.isSuperAdmin(); } /** * Determines if the current user can manually set certain protected meta fields * such as author, project, lastUpdated, etc. * @returns True if the current user can manually set protected meta fields. */ private canWriteProtectedMeta(): boolean { return this.isSuperAdmin(); } private canWriteAccount(): boolean { return Boolean(this.context.extendedMode && (this.isSuperAdmin() || this.isProjectAdmin())); } /** * Verifies that the current user would be allowed to perform the given interaction, * without the full check on the specific resource being interacted with. * @param interaction - The FHIR interaction being performed. * @param resourceType - The type of resource the interaction is performed on. * @returns True when the interaction is permitted by the access policy for the given resource type. */ supportsInteraction(interaction: AccessPolicyInteraction, resourceType: string): boolean { if (!this.isSuperAdmin() && protectedResourceTypes.includes(resourceType)) { return false; } if (!this.context.accessPolicy) { return true; } return accessPolicySupportsInteraction(this.context.accessPolicy, interaction, resourceType as ResourceType); } /** * Determines if the current user can actually perform some interaction on the specified resource. * This is a more in-depth check, e.g. after building the candidate result of a write operation. * @param interaction - The interaction to be performed. * @param resource - The resource. * @returns The access policy permitting the interaction, or undefined if not permitted. */ canPerformInteraction(interaction: AccessPolicyInteraction, resource: Resource): AccessPolicyResource | undefined { if (!this.isSuperAdmin()) { // Only Super Admins can access server-critical resource types if (protectedResourceTypes.includes(resource.resourceType)) { return undefined; } // Non-Superusers can only access resources in their Project, with read-only access to linked Projects if (readInteractions.includes(interaction)) { if (!this.context.projects?.some((p) => p.id === resource.meta?.project)) { return undefined; } } else if (resource.meta?.project !== this.context.projects?.[0]?.id) { return undefined; } } return satisfiedAccessPolicy(resource, interaction, this.context.accessPolicy); } /** * Check that a resource can be written in its current form. * @param previous - The resource before updates were applied. * @param current - The resource as it will be written. * @param interaction - The FHIR interaction being performed. * @returns True if the current user can write the specified resource type. */ private isResourceWriteable( previous: Resource | undefined, current: Resource, interaction: 'create' | 'update' ): boolean { const matchingPolicy = this.canPerformInteraction(interaction, current); if (!matchingPolicy) { return false; } if (!matchingPolicy.writeConstraint) { return true; } return matchingPolicy.writeConstraint.every((constraint) => { const invariant = evalFhirPathTyped( constraint.expression as string, [{ type: current.resourceType, value: current }], { '%before': { type: previous?.resourceType ?? 'undefined', value: previous }, '%after': { type: current.resourceType, value: current }, } ); return invariant.length === 1 && invariant[0].value === true; }); } /** * Returns true if the resource is "cache only" and not written to the database. * This is a highly specialized use case for internal system resources. * @param resource - The candidate resource. * @returns True if the resource should be cached only and not written to the database. */ private isCacheOnly(resource: Resource): boolean { if (resource.resourceType === 'Login' && (resource.authMethod === 'client' || resource.authMethod === 'execute')) { return true; } if (resource.resourceType === 'Subscription' && resource.channel?.type === 'websocket') { return true; } return false; } /** * Removes hidden fields from a resource as defined by the access policy. * This should be called for any "read" operation. * @param input - The input resource. * @returns The resource with hidden fields removed. */ removeHiddenFields<T extends Resource>(input: T): T { const policy = satisfiedAccessPolicy(input, AccessPolicyInteraction.READ, this.context.accessPolicy); if (policy?.hiddenFields) { for (const field of policy.hiddenFields) { this.removeField(input, field); } } if (!this.context.extendedMode && input.meta) { const meta = input.meta as Meta; meta.author = undefined; meta.project = undefined; meta.account = undefined; meta.compartment = undefined; } return input; } /** * Overwrites readonly fields from a resource as defined by the access policy. * If no original (i.e., this is the first version), then blank them out. * This should be called for any "write" operation. * @param input - The input resource. * @param original - The previous version, if it exists. * @returns The resource with restored hidden fields. */ private restoreReadonlyFields<T extends Resource>(input: T, original: T | undefined): T { const policy = satisfiedAccessPolicy( original ?? input, original ? AccessPolicyInteraction.UPDATE : AccessPolicyInteraction.CREATE, this.context.accessPolicy ); if (!policy?.readonlyFields && !policy?.hiddenFields) { return input; } const fieldsToRestore = []; if (policy.readonlyFields) { fieldsToRestore.push(...policy.readonlyFields); } if (policy.hiddenFields) { fieldsToRestore.push(...policy.hiddenFields); } for (const field of fieldsToRestore) { this.removeField(input, field); // only top-level fields can be restored. // choice-of-type fields technically aren't allowed in readonlyFields/hiddenFields, // but that isn't currently enforced at write time, so exclude them here if (original && !field.includes('.') && !field.endsWith('[x]')) { const value = original[field as keyof T]; if (value) { input[field as keyof T] = value; } } } return input; } /** * Removes a field from the input resource; supports nested fields. * @param input - The input resource. * @param path - The path to the field to remove */ private removeField<T extends Resource>(input: T, path: string): void { let last: any[] = [input]; const pathParts = path.split('.'); for (let i = 0; i < pathParts.length; i++) { const pathPart = pathParts[i]; if (i === pathParts.length - 1) { // final key part last.forEach((item) => { resolveFieldName(item, pathPart).forEach((k) => { delete item[k]; }); }); } else { // intermediate key part const next: any[] = []; for (const lastItem of last) { for (const k of resolveFieldName(lastItem, pathPart)) { if (lastItem[k] !== undefined) { if (Array.isArray(lastItem[k])) { next.push(...lastItem[k]); } else if (isObject(lastItem[k])) { next.push(lastItem[k]); } } } } last = next; } } } isSuperAdmin(): boolean { return !!this.context.superAdmin; } isProjectAdmin(): boolean { return !!this.context.projectAdmin; } /** * Logs an AuditEvent for a restful operation. * @param subtype - The AuditEvent subtype. * @param outcome - The AuditEvent outcome. * @param description - The description. Can be a string, object, or Error. Will be normalized to a string. * @param options - * @param options.resource - Optional resource to associate with the AuditEvent. * @param options.searchRequest - Optional search parameters to associate with the AuditEvent. * @param options.durationMs - Duration of the operation, used for generating metrics. */ private logEvent( subtype: AuditEventSubtype, outcome: AuditEventOutcome, description?: unknown, options?: { resource?: Resource | Reference; searchRequest?: SearchRequest; durationMs?: number; } ): void { if (this.context.author.reference === 'system') { // Don't log system events. return; } let outcomeDesc: string | undefined = undefined; if (description) { outcomeDesc = normalizeErrorString(description); } let query: string | undefined = undefined; if (options?.searchRequest) { query = options.searchRequest.resourceType + formatSearchQuery(options.searchRequest); } const resource = options?.resource; const auditEvent = createAuditEvent( RestfulOperationType, subtype, this.context.projects?.[0]?.id as string, this.context.author, this.context.remoteAddress, outcome, { description: outcomeDesc, resource, searchQuery: query, durationMs: options?.durationMs, } ); logAuditEvent(auditEvent); if (options?.durationMs !== undefined && outcome === AuditEventOutcome.Success) { const duration = options.durationMs / 1000; // Report duration in whole seconds recordHistogramValue('medplum.fhir.interaction.' + subtype.code, duration, { attributes: { resourceType: isResource(resource) ? resource?.resourceType : undefined, }, }); } incrementCounter(`medplum.fhir.interaction.${subtype.code}.count`, { attributes: { resourceType: isResource(resource) ? resource?.resourceType : undefined, result: outcome === AuditEventOutcome.Success ? 'success' : 'failure', }, }); if (getConfig().saveAuditEvents && isResource(resource) && resource?.resourceType !== 'AuditEvent') { auditEvent.id = this.generateId(); this.updateResourceImpl(auditEvent, true).catch(console.error); } } /** * Returns a database client. * Use this method when you don't care if you're in a transaction or not. * For example, use this method for "read by ID". * The return value can either be a pool client or a pool. * If in a transaction, then returns the transaction client (PoolClient). * Otherwise, returns the pool (Pool). * @param mode - The database mode. * @returns The database client. */ getDatabaseClient(mode: DatabaseMode): Pool | PoolClient { this.assertNotClosed(); if (this.conn) { // If in a transaction, then use the transaction client. return this.conn; } if (mode === DatabaseMode.WRITER) { // If we ever use a writer, then all subsequent operations must use a writer. this.mode = RepositoryMode.WRITER; } return getDatabasePool(this.mode === RepositoryMode.WRITER ? DatabaseMode.WRITER : mode); } /** * Returns a proper database connection. * Unlike getDatabaseClient(), this method always returns a PoolClient. * @param mode - The database mode. * @returns Database connection. */ private async getConnection(mode: DatabaseMode): Promise<PoolClient> { this.assertNotClosed(); if (!this.conn) { this.conn = await getDatabasePool(mode).connect(); } return this.conn; } /** * Releases the database connection. * Include an error to remove the connection from the pool. * See: https://github.com/brianc/node-postgres/blob/master/packages/pg-pool/index.js#L333 * @param err - Optional error to remove the connection from the pool. */ private releaseConnection(err?: boolean | Error): void { if (this.conn && this.disposable) { this.conn.release(err); this.conn = undefined; } } async withTransaction<TResult>( callback: (client: PoolClient) => Promise<TResult>, options?: { serializable: boolean } ): Promise<TResult> { const config = getConfig(); const transactionAttempts = config.transactionAttempts ?? defaultTransactionAttempts; let error: OperationOutcomeError | undefined; for (let attempt = 0; attempt < transactionAttempts; attempt++) { const attemptStartTime = Date.now(); try { const client = await this.beginTransaction(options?.serializable ? 'SERIALIZABLE' : undefined); const result = await callback(client); await this.commitTransaction(); if (attempt > 0) { getLogger().info('Completed transaction', { attempt, attemptDurationMs: Date.now() - attemptStartTime, transactionAttempts, serializable: options?.serializable ?? false, }); } return result; } catch (err) { const operationOutcomeError = normalizeDatabaseError(err); // Assigning here and throwing below is necessary to satisfy TypeScript error = operationOutcomeError; // Ensure transaction is rolled back before attempting any retry await this.rollbackTransaction(operationOutcomeError); if (!this.isRetryableTransactionError(operationOutcomeError)) { break; // Fall through to throw statement outside of the loop } } finally { this.endTransaction(); } const attemptDurationMs = Date.now() - attemptStartTime; if (attempt + 1 < transactionAttempts) { const baseDelayMs = config.transactionExpBackoffBaseDelayMs ?? defaultExpBackoffBaseDelayMs; // Attempts are 0-indexed, so first wait after first attempt will be somewhere between 75% and 125% of baseDelayMs // This calculation results in something like this for the default values: // Between attempt 0 and 1: 50 * (2^0) = 50 * [0.75, 1.25] = **[37.5, 63.5] ms** // Between attempt 1 and 2: 50 * (2^1) = 100 * [0.75, 1.25] = **[75, 125] ms** // etc... const delayMs = Math.ceil(baseDelayMs * 2 ** attempt * (0.75 + Math.random() * 0.5)); getLogger().info('Retrying transaction', { attempt, attemptDurationMs, transactionAttempts, serializable: options?.serializable ?? false, delayMs, baseDelayMs, }); await sleep(delayMs); } else { getLogger().info('Transaction failed final attempt', { attempt, attemptDurationMs, transactionAttempts, serializable: options?.serializable ?? false, }); } } // Cannot be undefined: either the function returns normally from the `try` block, // or `error` is assigned at top of `catch` block before reaching this line throw error; } private async beginTransaction(isolationLevel: TransactionIsolationLevel = 'REPEATABLE READ'): Promise<PoolClient> { this.assertNotClosed(); this.transactionDepth++; const conn = await this.getConnection(DatabaseMode.WRITER); if (this.transactionDepth === 1) { await conn.query('BEGIN ISOLATION LEVEL ' + isolationLevel); } else { await conn.query('SAVEPOINT sp' + this.transactionDepth); } return conn; } private async commitTransaction(): Promise<void> { this.assertInTransaction(); const conn = await this.getConnection(DatabaseMode.WRITER); if (this.transactionDepth === 1) { await this.processPreCommit(); await conn.query('COMMIT'); this.transactionDepth--; this.releaseConnection(); await this.processPostCommit(); } else { await conn.query('RELEASE SAVEPOINT sp' + this.transactionDepth); this.transactionDepth--; } } private async rollbackTransaction(error: Error): Promise<void> { this.assertInTransaction(); const conn = await this.getConnection(DatabaseMode.WRITER); if (this.transactionDepth === 1) { await conn.query('ROLLBACK'); this.transactionDepth--; this.releaseConnection(error); } else { await conn.query('ROLLBACK TO SAVEPOINT sp' + this.transactionDepth); this.transactionDepth--; } } private endTransaction(): void { if (this.transactionDepth === 0) { this.releaseConnection(); } } private assertInTransaction(): void { if (this.transactionDepth <= 0) { throw new Error('Not in transaction'); } } async preCommit(fn: () => Promise<void>): Promise<void> { if (this.transactionDepth) { this.preCommitCallbacks.push(fn); } else { // rely on thrown errors bubbling up from here to halt the transaction await fn(); } } private async processPreCommit(): Promise<void> { const callbacks = this.preCommitCallbacks; this.preCommitCallbacks = []; for (const cb of callbacks) { // rely on thrown errors bubbling up from here to halt the transaction await cb(); } } async postCommit(fn: () => Promise<void>): Promise<void> { if (this.transactionDepth) { this.postCommitCallbacks.push(fn); } else { await this.invokePostCommitCallback(fn); } } private async processPostCommit(): Promise<void> { const callbacks = this.postCommitCallbacks; this.postCommitCallbacks = []; for (const cb of callbacks) { await this.invokePostCommitCallback(cb); } } private async invokePostCommitCallback(fn: () => Promise<void>): Promise<void> { try { await fn(); } catch (err) { if (err instanceof Error) { getLogger().error('Error processing post-commit callback', err); } else { getLogger().error('Error processing post-commit callback', { err }); } } } /** * Checks whether an error represents a serialization conflict that can safely be retried. * NOTE: Retrying a transaction must be done in full: the entire `Repository.withTransaction()` block * should be re-executed, in a new transaction. * @param err - The error to check. * @returns True if the error indicates a retryable transaction failure. */ private isRetryableTransactionError(err: OperationOutcomeError): boolean { if (this.transactionDepth) { // Nested transactions (i.e. savepoints) are NOT retryable per the Postgres docs; // the entire transaction must have been rolled back before anything can be retried: // "It is important to retry the complete transaction, including all logic // that decides which SQL to issue and/or which values to use" // @see https://www.postgresql.org/docs/16/mvcc-serialization-failure-handling.html return false; } if (err.outcome.issue.length !== 1) { // Multiple errors combined cannot be guaranteed to be retryable return false; } const issue = err.outcome.issue[0]; return Boolean( issue.code === 'conflict' && issue.details?.coding?.some((c) => retryableTransactionErrorCodes.includes(c.code as string)) ); } /** * Tries to read a cache entry from Redis by resource type and ID. * @param resourceType - The resource type. * @param id - The resource ID. * @returns The cache entry if found; otherwise, undefined. */ private async getCacheEntry<T extends Resource>( resourceType: string, id: string ): Promise<CacheEntry<WithId<T>> | undefined> { // No cache access allowed mid-transaction if (this.transactionDepth) { return undefined; } const cachedValue = await getRedis().get(getCacheKey(resourceType, id)); return cachedValue ? (JSON.parse(cachedValue) as CacheEntry<WithId<T>>) : undefined; } /** * Performs a bulk read of cache entries from Redis. * @param references - Array of FHIR references. * @returns Array of cache entries or undefined. */ private async getCacheEntries(references: Reference[]): Promise<(CacheEntry | undefined)[]> { // No cache access allowed mid-transaction if (this.transactionDepth) { return new Array(references.length); } const referenceKeys: string[] = []; // Build referenceKeys only for valid input references and track // their indices in the original array so that the result array // is constructed in the correct order. const referenceKeyIndices: number[] = new Array(references.length); for (let i = 0; i < references.length; i++) { const r = references[i]; if (r.reference) { referenceKeys.push(r.reference); referenceKeyIndices[i] = referenceKeys.length - 1; } } if (referenceKeys.length === 0) { // Return early to avoid calling mget() with no args, which is an error return new Array(references.length); } const cachedValues = await getRedis().mget(referenceKeys); const result = new Array<CacheEntry | undefined>(references.length); for (let i = 0; i < references.length; i++) { if (referenceKeyIndices[i] === undefined) { result[i] = undefined; } else { const cachedValue = cachedValues[referenceKeyIndices[i]]; result[i] = cachedValue ? (JSON.parse(cachedValue) as CacheEntry) : undefined; } } return result; } /** * Writes a cache entry to Redis. * @param resource - The resource to cache. */ private async setCacheEntry(resource: WithId<Resource>): Promise<void> { // No cache access allowed mid-transaction if (this.transactionDepth) { const cachedResource = deepClone(resource); await this.postCommit(() => { return this.setCacheEntry(cachedResource); }); return; } const projectId = resource.meta?.project; await getRedis().set( getCacheKey(resource.resourceType, resource.id), stringify({ resource, projectId }), 'EX', REDIS_CACHE_EX_SECONDS ); } /** * Deletes a cache entry from Redis. * @param resourceType - The resource type. * @param id - The resource ID. */ private async deleteCacheEntry(resourceType: string, id: string): Promise<void> { // No cache access allowed mid-transaction if (this.transactionDepth) { await this.postCommit(() => this.deleteCacheEntry(resourceType, id)); return; } await getRedis().del(getCacheKey(resourceType, id)); } /** * Deletes cache entries from Redis. * @param resourceType - The resource type. * @param ids - The resource IDs. */ private async deleteCacheEntries(resourceType: string, ids: string[]): Promise<void> { // No cache access allowed mid-transaction if (this.transactionDepth) { await this.postCommit(() => this.deleteCacheEntries(resourceType, ids)); return; } const cacheKeys = ids.map((id) => { return getCacheKey(resourceType, id); }); await getRedis().del(cacheKeys); } async ensureInTransaction<TResult>(callback: (client: PoolClient) => Promise<TResult>): Promise<TResult> { if (this.transactionDepth) { const client = await this.getConnection(DatabaseMode.WRITER); return callback(client); } else { return this.withTransaction(callback); } } getConfig(): RepositoryContext { return this.context; } [Symbol.dispose](removeConnection?: boolean): void { this.assertNotClosed(); if (this.transactionDepth > 0) { // Bad state, remove connection from pool getLogger().error('Closing Repository with active transaction'); this.releaseConnection(new Error('Closing Repository with active transaction')); } else { // Good state, return healthy connection to pool this.releaseConnection(removeConnection); } this.closed = true; } private assertNotClosed(): void { if (this.closed) { throw new Error('Already closed'); } } } const REDIS_CACHE_EX_SECONDS = 24 * 60 * 60; // 24 hours in seconds const PROFILE_CACHE_EX_SECONDS = 5 * 60; // 5 minutes in seconds /** * Returns the redis cache key for the given resource type and resource ID. * @param resourceType - The resource type. * @param id - The resource ID. * @returns The Redis cache key. */ function getCacheKey(resourceType: string, id: string): string { return `${resourceType}/${id}`; } /** * Writes a FHIR profile cache entry to Redis. * @param profile - The profile structure definition. */ async function cacheProfile(profile: StructureDefinition): Promise<void> { if (!profile.url || !profile.meta?.project) { return; } profile = await getSystemRepo().readReference(createReference(profile)); await getRedis().set( getProfileCacheKey(profile.meta?.project as string, profile.url), JSON.stringify({ resource: profile, projectId: profile.meta?.project }), 'EX', PROFILE_CACHE_EX_SECONDS ); } /** * Writes a FHIR profile cache entry to Redis. * @param profile - The profile structure definition. */ async function removeCachedProfile(profile: StructureDefinition): Promise<void> { if (!profile.url || !profile.meta?.project) { return; } await getRedis().del(getProfileCacheKey(profile.meta.project, profile.url)); } /** * Returns the redis cache key for the given profile resource. * @param projectId - The ID of the Project to which the profile belongs. * @param url - The canonical URL of the profile. * @returns The Redis cache key. */ function getProfileCacheKey(projectId: string, url: string): string { return `Project/${projectId}/StructureDefinition/${url}`; } export function getSystemRepo(conn?: PoolClient): Repository { return new Repository( { superAdmin: true, strictMode: true, extendedMode: true, author: { reference: 'system', }, // System repo does not have an associated Project; it can write to any }, conn ); } function lowercaseFirstLetter(str: string): string { return str.charAt(0).toLowerCase() + str.slice(1); } function resolveFieldName(input: any, fieldName: string): string[] { if (!fieldName.endsWith('[x]')) { return [fieldName]; } const baseKey = fieldName.slice(0, -3); return Object.keys(input).filter((k) => { if (k.startsWith(baseKey)) { const maybePropertyType = k.substring(baseKey.length); if (maybePropertyType in PropertyType || lowercaseFirstLetter(maybePropertyType) in PropertyType) { return true; } } return false; }); } export function setTypedPropertyValue(target: TypedValue, path: string, replacement: TypedValue): void { let patchPath = '/' + path.replaceAll(/\[|\]\.|\./g, '/'); if (patchPath.endsWith(']')) { patchPath = patchPath.slice(0, -1); } patchObject(target.value, [{ op: 'replace', path: patchPath, value: replacement.value }]); } const textEncoder = new TextEncoder(); /** * Apply a maximum string length to ensure the value can accommodate the maximum * size for a btree index entry: 2704 bytes. If the string is too large, * be as conservative as possible to avoid write errors by truncating to 675 characters * to accommodate the entire string being 4-byte UTF-8 code points. * @param value - The column value to truncate. * @returns The possibly truncated column value. */ function truncateTextColumn(value: string | undefined): string | undefined { if (!value) { return undefined; } if (textEncoder.encode(value).length <= 2704) { return value; } return Array.from(value).slice(0, 675).join(''); } function getArrayPaddingConfig( searchParam: SearchParameter, resourceType: string ): ArrayColumnPaddingConfig | undefined { const paddingConfigEntry = getConfig().arrayColumnPadding?.[searchParam.code]; if ( paddingConfigEntry && (paddingConfigEntry.resourceType === undefined || paddingConfigEntry.resourceType.includes(resourceType)) ) { return paddingConfigEntry.config; } return undefined; }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/medplum/medplum'

If you have feedback or need assistance with the MCP directory API, please join our Discord server