Skip to main content
Glama
bulkexporter.ts4.84 kB
// SPDX-FileCopyrightText: Copyright Orangebot, Inc. and Medplum contributors // SPDX-License-Identifier: Apache-2.0 import type { WithId } from '@medplum/core'; import { getReferenceString } from '@medplum/core'; import type { AsyncJob, Binary, Bundle, Parameters, Project, Resource } from '@medplum/fhirtypes'; import { PassThrough } from 'node:stream'; import { getBinaryStorage } from '../../../storage/loader'; import type { Repository } from '../../repo'; import { getSystemRepo } from '../../repo'; const NDJSON_CONTENT_TYPE = 'application/fhir+ndjson'; class BulkFileWriter { readonly binary: WithId<Binary>; private readonly stream: PassThrough; private readonly writerPromise: Promise<void>; constructor(binary: WithId<Binary>) { this.binary = binary; const filename = `export.ndjson`; this.stream = new PassThrough(); this.writerPromise = getBinaryStorage().writeBinary(binary, filename, NDJSON_CONTENT_TYPE, this.stream); } async write(resource: Resource): Promise<void> { const data = JSON.stringify(resource) + '\n'; // Handle backpressure - if write buffer is full, wait for drain if (!this.stream.write(data)) { await new Promise<void>((resolve, reject) => { this.stream.once('drain', () => resolve()); this.stream.once('error', (err) => reject(err)); }); } } close(): Promise<void> { this.stream.end(); return this.writerPromise; } } export class BulkExporter { readonly repo: Repository; private resource: WithId<AsyncJob> | undefined; readonly writers: Record<string, BulkFileWriter> = {}; readonly resourceSets = new Map<string, Set<string>>(); constructor(repo: Repository) { this.repo = repo; } async start(url: string): Promise<WithId<AsyncJob>> { this.resource = await this.repo.createResource<AsyncJob>({ resourceType: 'AsyncJob', status: 'active', request: url, requestTime: new Date().toISOString(), }); return this.resource; } async getWriter(resourceType: string): Promise<BulkFileWriter> { let writer = this.writers[resourceType]; if (!writer) { const binary = await this.repo.createResource<Binary>({ resourceType: 'Binary', contentType: NDJSON_CONTENT_TYPE, }); writer = new BulkFileWriter(binary); this.writers[resourceType] = writer; } return writer; } async closeWriter(resourceType: string): Promise<void> { const writer = this.writers[resourceType]; if (writer) { await writer.close(); // Keep reference for formatOutput(), but free the stream resources } // Clear tracking for this resource type to free memory this.resourceSets.delete(resourceType); } async writeBundle(bundle: Bundle<WithId<Resource>>): Promise<void> { if (bundle.entry) { for (const entry of bundle.entry) { if (entry.resource) { await this.writeResource(entry.resource); } } } } async writeResource(resource: WithId<Resource>): Promise<void> { const resourceType = resource.resourceType; const ref = getReferenceString(resource); // Get or create the Set for this resource type let exportedResources = this.resourceSets.get(resourceType); if (!exportedResources) { exportedResources = new Set<string>(); this.resourceSets.set(resourceType, exportedResources); } // Only write if not already tracked if (!exportedResources.has(ref)) { const writer = await this.getWriter(resourceType); await writer.write(resource); exportedResources.add(ref); } } async close(project: Project): Promise<AsyncJob> { if (!this.resource) { throw new Error('Export must be started before calling close()'); } for (const writer of Object.values(this.writers)) { await writer.close(); } // Clear remaining tracked resources to free memory immediately this.resourceSets.clear(); // Update the AsyncJob const systemRepo = getSystemRepo(); const asyncJob = await systemRepo.readResource<AsyncJob>('AsyncJob', this.resource.id); if (asyncJob.status !== 'cancelled') { return systemRepo.updateResource<AsyncJob>({ ...this.resource, meta: { project: project.id, }, status: 'completed', transactionTime: new Date().toISOString(), output: this.formatOutput(), }); } return this.resource; } formatOutput(): Parameters { return { resourceType: 'Parameters', parameter: Object.entries(this.writers).map(([resourceType, writer]) => ({ name: 'output', part: [ { name: 'type', valueCode: resourceType }, { name: 'url', valueUri: getReferenceString(writer.binary) }, ], })), }; } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/medplum/medplum'

If you have feedback or need assistance with the MCP directory API, please join our Discord server