Skip to main content
Glama
receive-oru-message.ts23.7 kB
// SPDX-FileCopyrightText: Copyright Orangebot, Inc. and Medplum contributors // SPDX-License-Identifier: Apache-2.0 import { createReference, findResourceByCode, getReferenceString, Hl7Message, LOINC, normalizeErrorString, parseHl7DateTime, SNOMED, streamToBuffer, UCUM, } from '@medplum/core'; import type { BotEvent, Hl7Segment, MedplumClient } from '@medplum/core'; import type { Attachment, CodeableConcept, DiagnosticReport, Media, Observation, Organization, Quantity, Range, Reference, ServiceRequest, Specimen, } from '@medplum/fhirtypes'; import type { ReadStream } from 'ssh2'; import { default as SftpClient } from 'ssh2-sftp-client'; // Timezone offset of partner lab const PARTNER_TIMEZONE = '-05:00'; /** * This Bot demonstrates how to read lab results from an SFTP server to create `Observation` and `DiagnosticReport` * resources. Incoming data is the the form of HL7v2 ORU messages * * See: https://v2plus.hl7.org/2021Jan/message-structure/ORU_R01.html * @param medplum - The Medplum client * @param event - The Bot event * @returns The Bot result */ export async function handler(medplum: MedplumClient, event: BotEvent): Promise<any> { // Read SFTP connection data from project secrets (TODO link) const host = event.secrets['SFTP_HOST'].valueString; const user = event.secrets['SFTP_USER'].valueString; const key = event.secrets['SFTP_PRIVATE_KEY'].valueString; // Read the `Organization` resource corresponding to this lab const performer = await medplum.searchOne('Organization', { name: 'Acme Clinical Labs' }); if (!performer) { throw new Error(`Could not find Organization ${performer}`); } // Connect to SFTP const sftp = new SftpClient(); try { console.log('Connecting to SFTP...'); // Connect to SFTP server await sftp.connect({ host: host, username: user, privateKey: key, retries: 5, retry_factor: 2, retry_minTimeout: 2000, }); // Read *.oru files from SFTP server const fileContents = await readSFTPDirectory(sftp, 'out'); // Parse file contents into structured Hl7Message objects const messages = fileContents.map(Hl7Message.parse); for (const message of messages) { try { await processOruMessage(medplum, message, performer); } catch (err: any) { console.error(err.message); } } } catch (error: any) { console.error(error.message); throw error; } finally { console.log('Closing SFTP connection'); await sftp.end(); } } /** * Parses patient lab results from an HL7 ORU message (https://v2plus.hl7.org/2021Jan/message-structure/ORU_R01.html) * based on mappings between Hl7v2 and FHIR (https://build.fhir.org/ig/HL7/v2-to-fhir/segment_maps.html) to create * and/or update FHIR ServiceRequest, DiagnosticReport, and Observation resources * * Note: While ORU is an HL7 standard, every LIS/EHR system implements the standard in a slightly different way * * @param medplum - The Medplum Client object * @param message - Parsed ORU message. * @param performer - The `Organization` resource corresponding to the lab that performed the test. */ export async function processOruMessage( medplum: MedplumClient, message: Hl7Message, performer: Organization ): Promise<void> { // Parse the HL7 MSH segment // See: https://v2plus.hl7.org/2021Jan/segment-definition/MSH.html const messageType = message.header.getComponent(9, 1); // Message type should be "ORU" if (messageType !== 'ORU') { const err = new Error('Invalid message type: ' + messageType); throw err; } // Parse the HL7 PID segment to read the patient id // See: https://v2plus.hl7.org/2021Jan/segment-definition/PID.html const orderId = message.getSegment('PID')?.getComponent(2, 1); if (!orderId || orderId.length === 0) { throw new Error('Missing order id'); } // Find the existing `ServiceRequest` (i.e. lab order) with this id const serviceRequest = await medplum.searchOne('ServiceRequest', `identifier=${orderId}`); if (!serviceRequest) { throw new Error(`Could not find ServiceRequest with id ${orderId}`); } else { console.log(`Processing order with id ${orderId}...`); } // Find any existing `Observation`, `DiagnosticReport`, and `Specimen` resources associated with this order const existingObservations = await medplum.searchResources('Observation', { 'based-on': getReferenceString(serviceRequest), }); const existingReport = await medplum.searchOne('DiagnosticReport', `based-on=${getReferenceString(serviceRequest)}`); let existingSpecimens: Specimen[] = []; if (serviceRequest.specimen) { existingSpecimens = await Promise.all( serviceRequest.specimen.map((specimenRef) => medplum.readReference(specimenRef)) ); } // Update the collection date of each specimen associated with the order // See: https://v2plus.hl7.org/2021Jan/segment-definition/OBR.html const collectionDate = parseHl7DateTime(message.getSegment('OBR')?.getField(14)?.toString(), { tzOffset: PARTNER_TIMEZONE, }); if (collectionDate) { await Promise.all( existingSpecimens.map(async (specimen) => { specimen.receivedTime = collectionDate; }) ); } // Create / Update the observations let observations = processObxSegments(message, serviceRequest, performer); // If there's an existing report, just update it with the new values const report: DiagnosticReport = existingReport ?? { resourceType: 'DiagnosticReport', status: 'preliminary', code: serviceRequest.code as CodeableConcept, subject: serviceRequest.subject, basedOn: [createReference(serviceRequest)], performer: [createReference(performer)], }; // Check to see if the order was cancelled await handleOrderCancellation(medplum, message, serviceRequest, observations, report, existingSpecimens); // Upload any embedded PDF reports, and attach them to the DiagnosticReport await uploadEmbeddedPdfs(medplum, report, `report_${orderId}.pdf`, message); // Update observations in Medplum observations = await Promise.all( observations.map((obs) => createOrUpdateObservation(medplum, obs, existingObservations)) ); // Update the DiagnosticReport in medplum report.result = observations.map(createReference); if (existingReport) { await medplum.updateResource(report); } else { await medplum.createResource(report); } // Update the Specimens in Medplum for (const specimen of existingSpecimens) { await medplum.updateResource(specimen); } } /** * Check if the order was cancelled and set the appropriate status fields if so * @param medplum - MedplumClient object * @param message - parsed HL7 message * @param serviceRequest - `ServiceRequest` representing this order * @param observations - parsed `Observation` resources * @param report - `DiagnosticReport` for these results * @param specimens - `Specimen` associated with the results */ async function handleOrderCancellation( medplum: MedplumClient, message: Hl7Message, serviceRequest: ServiceRequest, observations: Observation[], report: DiagnosticReport, specimens: Specimen[] ): Promise<void> { const cancellationReason = await getCancellationReason(medplum, message, serviceRequest); // Set the `Observation.dataAbsentReason` on all cancelled orders for (const observation of observations) { if (observation.status === 'cancelled') { observation.dataAbsentReason = cancellationReason; } } // Mark the report as cancelled if a cancellation reason was found report.status = cancellationReason ? 'cancelled' : 'preliminary'; // Mark the specimen as unsatisfactory for (const specimen of specimens) { specimen.status = 'unsatisfactory'; if (cancellationReason) { specimen.condition = [cancellationReason]; } } } /** * Check to see if the order was cancelled. If so, return the cancellation reason * * @param medplum - The MedplumClient object. * @param message - The parsed HL7 message. * @param serviceRequest - The ServiceRequest resource. * @returns The cancellation reason, if any. */ async function getCancellationReason( medplum: MedplumClient, message: Hl7Message, serviceRequest: ServiceRequest ): Promise<CodeableConcept | undefined> { // Read the first OBR Segment to find the order status // See: https://v2plus.hl7.org/2021Jan/segment-definition/OBR.html const obrIndex = message.segments.findIndex((segment) => segment.name === 'OBR'); const obr = message.getSegment(obrIndex); const obrStatus = OBSERVATION_STATUS[obr?.getComponent(25, 1) as string]; const isCancelled = obrStatus === 'cancelled'; // If the order was cancelled, read the following note (NTE) segment to extract the cancellation reason // Note: How this cancellation reason is sent will differ across LIS/EHR systems if (isCancelled) { const obrNote = message.getSegment(obrIndex + 1) as Hl7Segment; const noteText = obrNote.getField(3).toString().toUpperCase(); // Create a note on the order to describe the cancellation reason if (!serviceRequest.note) { serviceRequest.note = []; } serviceRequest.note.push({ text: `Order Cancelled by Partner Lab: ${noteText}` }); await medplum.updateResource(serviceRequest); return CANCELLATION_REASONS[noteText]; } return undefined; } /* SFTP Handling Code */ async function readSFTPDirectory(sftp: SftpClient, dir: string, batchSize = 25): Promise<string[]> { const results: string[] = []; // List all *.oru files in directory const fileList = await sftp.list(dir, (fileInfo) => fileInfo.name.endsWith('oru')); // Read all files. To avoid overwhelming the SFTP server, read in batches of `batchSize` in parallel for (let i = 0; i < fileList.length; i += batchSize) { await Promise.allSettled( fileList.slice(i, i + batchSize).map(async (fileInfo): Promise<Buffer> => { // Read each file as a `Buffer` object const filePath = `${dir}/${fileInfo.name}`; let stream: ReadStream | undefined; let result: Buffer | undefined = undefined; try { stream = sftp.createReadStream(filePath); result = await streamToBuffer(stream); } catch (err) { // Throw any errors related to file reading console.error(`[${filePath}] Error processing file: ${normalizeErrorString(err)}`); throw err; } finally { if (stream) { stream.destroy(); } } return result; }) ).then((outcomes: PromiseSettledResult<Buffer>[]) => { // Convert every file that was successfully read to a string to return for (const outcome of outcomes) { if (outcome.status === 'fulfilled') { results.push(outcome.value.toString('utf-8')); } } }); } return results; } /** * Loop through all the segments (i.e. lines) and convert the OBX segments into FHIR Observations * Some observations have notes that are stored in subsequent lines, so we need to keep a pointer to the last * processed observation * * @param message - Parsed HL7 Message * @param serviceRequest - Current `ServiceRequest` representing the lab order * @param performer - A reference to the performing lab * @returns An array of `Observation` resources. */ function processObxSegments( message: Hl7Message, serviceRequest: ServiceRequest, performer: Organization ): Observation[] { const observations: Observation[] = []; let prevObservation: Observation | undefined; // Loop through all segments, handling observation segments (OBX) and note segments (nte) for (const segment of message.segments) { if (segment.name === 'OBX') { // We're starting to process a new observation, so enqueue the previous observation if (prevObservation) { observations.push(prevObservation); } prevObservation = processObservation(segment, serviceRequest, performer); } else if (segment.name === 'NTE') { // Handle associated notes // See https://v2plus.hl7.org/2021Jan/segment-definition/NTE.html const noteText = segment.getComponent(3, 1).toString(); if (!prevObservation) { console.warn(`Warning: Received the following note with no observation: '${noteText}'`); continue; } if (!prevObservation?.note) { prevObservation.note = []; } prevObservation.note.push({ text: noteText }); } } if (prevObservation) { observations.push(prevObservation); } return observations; } /** * Convert an OBX segment into a FHIR `Observation` resource. This function handles two different types of OBX messages * NM - Numerical Observations * ST - Structured text observations. * * Note that some systems send numerical values with comparators (e.g. <, >) as structured text fields * * @param segment - The OBX segment to convert. * @param serviceRequest - The current `ServiceRequest` representing the lab order. * @param performer - The performing lab. * @returns The converted `Observation` resource. */ function processObservation( segment: Hl7Segment, serviceRequest: ServiceRequest, performer: Organization ): Observation | undefined { // Convert the reported code into a standard LOINC code // Note: The reported coding scheme will vary across labs const reportedCode = segment.getComponent(3, 4); const code = LOINC_CODES[reportedCode]; if (!code) { console.error(`Unrecognized code '${segment.getField(3)?.toString()}'. Skipping....`); return undefined; } const observation: Observation = { resourceType: 'Observation', basedOn: [createReference(serviceRequest)], code, subject: serviceRequest.subject, status: OBSERVATION_STATUS[segment.getComponent(11, 1)], issued: parseHl7DateTime(segment.getComponent(14, 1)), performer: [createReference(performer)], specimen: serviceRequest.specimen?.[0], }; // Extract numerical value and units const valueType = segment.getComponent(2, 1); const value = segment.getComponent(5, 1); const unit = segment.getComponent(6, 1); const refRange = segment.getComponent(7, 1); let quantity: Quantity | undefined = { unit, system: UCUM }; let interpretation: CodeableConcept | undefined; if (valueType === 'NM') { // If it's a numerical observation, populate a valueQuantity interpretation = INTERPRETATION_CODES[segment.getComponent(8, 1)]; quantity.value = Number.parseFloat(value); } else if (valueType === 'ST') { // If it's a structured text field, check for the following conditions: // 1. Unable to calculate (Note: Different labs will represent this differently) // 2. Cancelled // 3. Numerical value with a comparator (<, >, <=, >=) if (value === 'UNABLE TO CALCULATE') { quantity = undefined; observation.dataAbsentReason = { text: 'Result Blanked by Partner Lab' }; } else if (observation.status !== 'cancelled') { quantity = { ...quantity, ...parseValueWithComparator(value) }; interpretation = INTERPRETATION_CODES[segment.getField(8).getComponent(1)]; } } Object.assign(observation, { referenceRange: [parseReferenceRange(refRange, unit)], interpretation: interpretation && [interpretation], valueQuantity: quantity, }); return observation; } /** * Check if there is an existing `Observation` resource with the same LOINC code, and update if necessary * @param medplum - The Medplum Client * @param updatedObservation - The latest `Observation` * @param existingObservations - The existing `Observation` on the server * @returns The updated `Observation` resource. */ function createOrUpdateObservation( medplum: MedplumClient, updatedObservation: Observation, existingObservations: Observation[] ): Promise<Observation> { const existingObservation = findResourceByCode( existingObservations, updatedObservation.code as CodeableConcept, LOINC ); if (existingObservation) { return medplum.updateResource({ ...(existingObservation as Observation), ...updatedObservation, }); } return medplum.createResource(updatedObservation); } /** * Upload any embedded rendered PDF reports in the HL7 message as a FHIR `Media` resource * and attach it to the diagnostic report in the `DiagnosticReport.presentedForm` property * @param medplum - The Medplum Client. * @param report - The current `DiagnosticReport` resource. * @param filename - The name of the PDF file. * @param message - The HL7 message. * @returns The uploaded `Media` resources. */ async function uploadEmbeddedPdfs( medplum: MedplumClient, report: DiagnosticReport, filename: string, message: Hl7Message ): Promise<Media[]> { // Upload PDF reports const pdfLines = message.segments.filter((seg) => seg.getComponent(3, 2) === 'PDFBASE64'); const media = await Promise.all( pdfLines.map(async (segment: Hl7Segment) => { const encodedData = segment.getComponent(5, 5); const decodedData = Buffer.from(encodedData, 'base64'); return medplum.createMedia({ data: decodedData, contentType: 'application/pdf', filename, additionalFields: { subject: report.subject, basedOn: report.basedOn as Reference<ServiceRequest>[], }, }); }) ); if (media.length > 0) { if (!report.presentedForm) { report.presentedForm = []; } report.presentedForm.push(...media.filter((m) => m.content).map((m) => m.content as Attachment)); } return media; } /* Parsing Utilities */ function parseValueWithComparator(value: string): Quantity | undefined { const match = value.match(/([<>][=]?)(\d+(\.\d+)?)/); if (match) { return { comparator: match[1] as Quantity['comparator'], value: Number.parseFloat(match[2]), }; } return undefined; } function parseReferenceRange(rangeString: string, unit: string): Range { rangeString = rangeString.trim(); const system = UCUM; if (rangeString.includes('-')) { const [low, high, ..._] = rangeString.split('-'); return { low: { value: Number.parseFloat(low), unit, system }, high: { value: Number.parseFloat(high), unit, system }, }; } for (const comparator of ['<=', '>=', '<', '>']) { if (rangeString.startsWith(comparator)) { const side = comparator.includes('<') ? 'high' : 'low'; return { [side]: { value: Number.parseFloat(rangeString.substring(comparator.length)), unit, system, comparator: rangeString.substring(0, comparator?.length) as Quantity['comparator'], }, }; } } return {}; } /** Code Mappings */ const INTERPRETATION_CODES: Record<string, CodeableConcept> = { '': { text: 'Normal', coding: [ { display: 'Normal', code: 'N', system: 'http://terminology.hl7.org/CodeSystem/v3-ObservationInterpretation', }, ], }, H: { text: 'High', coding: [ { display: 'High', code: 'H', system: 'http://terminology.hl7.org/CodeSystem/v3-ObservationInterpretation', }, ], }, HH: { text: ' Critical high', coding: [ { display: 'Critical high', code: 'HH', system: 'http://terminology.hl7.org/CodeSystem/v3-ObservationInterpretation', }, ], }, L: { text: 'Low', coding: [ { display: 'Low', code: 'L', system: 'http://terminology.hl7.org/CodeSystem/v3-ObservationInterpretation' }, ], }, LL: { text: 'Critical low', coding: [ { display: 'Critical low', code: 'LL', system: 'http://terminology.hl7.org/CodeSystem/v3-ObservationInterpretation', }, ], }, A: { text: 'Abnormal', coding: [ { display: 'Abnormal', code: 'A', system: 'http://terminology.hl7.org/CodeSystem/v3-ObservationInterpretation', }, ], }, AA: { text: 'Critical abnormal', coding: [ { display: 'Critical abnormal', code: 'AA', system: 'http://terminology.hl7.org/CodeSystem/v3-ObservationInterpretation', }, ], }, }; const OBSERVATION_STATUS: Record<string, Observation['status']> = { F: 'final', I: 'registered', P: 'preliminary', C: 'corrected', X: 'cancelled', }; const LOINC_CODES: Record<string, CodeableConcept> = { BUN: { coding: [ { system: LOINC, code: 'BUN', display: 'BUN', }, ], text: 'BUN', }, CHOL: { coding: [ { system: LOINC, code: 'CHOL', display: 'CHOL', }, ], text: 'CHOL', }, CREA: { coding: [ { system: LOINC, code: 'CREAT', display: 'CREAT', }, ], text: 'CREAT', }, HDLC3: { coding: [ { system: LOINC, code: 'HDL', display: 'HDL', }, ], text: 'HDL', }, TRIG: { coding: [ { system: LOINC, code: 'TRIG', display: 'TRIG', }, ], text: 'TRIG', }, TSH: { coding: [ { system: LOINC, code: 'TSH', display: 'TSH', }, ], text: 'TSH', }, DLDL: { coding: [ { system: LOINC, code: 'LDL', display: 'LDL-direct', }, ], text: 'LDL-direct', }, HGBA1C: { coding: [ { system: LOINC, code: 'HBA1C', display: 'HBA1C', }, ], text: 'HBA1C', }, LDL: { coding: [ { system: LOINC, code: 'LDL-CALCULATED', display: 'LDL-CALCULATED', }, ], text: 'LDL-CALCULATED', }, }; export const CANCELLATION_REASONS: Record<string, CodeableConcept> = { CLOTTING: { text: 'Clotting', coding: [ { system: 'http://terminology.hl7.org/CodeSystem/v2-0490', code: 'RC', display: 'Clotting', }, { system: SNOMED, code: '281279002', display: 'Sample clotted', }, ], }, 'QUANTITY NOT SUFFICIENT': { text: 'Quantity Not Sufficient', coding: [ { system: 'http://terminology.hl7.org/CodeSystem/v2-0490', code: 'QS', display: 'Quantity not sufficient', }, { system: SNOMED, code: '281268007', display: 'Insufficient sample', }, ], }, 'SAMPLE GROSSLY HEMOLYZED': { text: 'Hemolysis', coding: [ { system: 'http://terminology.hl7.org/CodeSystem/v2-0490', code: 'RH', display: 'Hemolysis', }, { system: SNOMED, code: '281288006', display: 'Sample grossly hemolyzed', }, ], }, 'SAMPLE STABILITY EXPIRED': { text: 'Expired', coding: [ { system: 'http://terminology.hl7.org/CodeSystem/v2-0490', code: 'EX', display: 'Expired', }, ], }, 'SAMPLE MISLABELED': { text: 'Labeling', coding: [ { system: 'http://terminology.hl7.org/CodeSystem/v2-0490', code: 'RM', display: 'Labeling', }, { system: SNOMED, code: '281265005', display: 'Sample incorrectly labeled', }, ], }, 'IMPROPER STORAGE': { text: 'Improper Storage', coding: [ { system: 'http://terminology.hl7.org/CodeSystem/v2-0490', code: 'RR', display: 'Improper Storage', }, ], }, };

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/medplum/medplum'

If you have feedback or need assistance with the MCP directory API, please join our Discord server