parse_hl7_message
Convert raw HL7 v2.x messages into human-readable format with field names, values, and explanations for segments like MSH, PID, PV1, OBR, and OBX.
Instructions
Parse an HL7 v2.x message into human-readable format. Returns parsed segments with field names, values, table lookups, and explanations. Supports MSH, PID, PV1, OBR, OBX, ORC, DG1, AL1, NK1, IN1, and more.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| message | Yes | Raw HL7 v2.x message string (pipe-delimited). Segments separated by \r, \n, or \r\n. |
Implementation Reference
- The implementation of the `parse_hl7_message` function, which parses a raw HL7 message string into a human-readable format.
def parse_hl7_message(message: str) -> str: """Parse an HL7 v2.x message into human-readable format. Args: message: Raw HL7 message string (pipe-delimited). Segments can be separated by \\r, \\n, or \\r\\n. Returns: Parsed segments with field names, values, and explanations. """ if not message or not message.strip(): return "Error: Empty message provided." # Normalize line endings and split into segments clean = message.strip() # Handle escaped newlines from JSON input clean = clean.replace("\\r\\n", "\n").replace("\\r", "\n").replace("\\n", "\n") # Handle actual carriage returns clean = clean.replace("\r\n", "\n").replace("\r", "\n") segments = [s.strip() for s in clean.split("\n") if s.strip()] if not segments: return "Error: No segments found in message." # Validate MSH segment first_seg = segments[0] if not first_seg.startswith("MSH"): return ( "Error: Message must start with MSH segment.\n" f"First segment starts with: '{first_seg[:10]}...'" ) # Parse encoding characters from MSH if len(first_seg) < 8: return "Error: MSH segment too short to contain encoding characters." field_sep = first_seg[3] # Usually '|' encoding_chars = first_seg[4:8] if len(first_seg) >= 8 else "^~\\&" component_sep = encoding_chars[0] if len(encoding_chars) > 0 else "^" repetition_sep = encoding_chars[1] if len(encoding_chars) > 1 else "~" escape_char = encoding_chars[2] if len(encoding_chars) > 2 else "\\" subcomponent_sep = encoding_chars[3] if len(encoding_chars) > 3 else "&" # Parse message header to get message type msh_fields = _split_msh(first_seg, field_sep) msg_type = msh_fields[8] if len(msh_fields) > 8 else "Unknown" msg_type_clean = msg_type.split(component_sep)[0] + "^" + msg_type.split(component_sep)[1] if component_sep in msg_type else msg_type # Build output parts = [ "=" * 70, f"HL7 MESSAGE PARSE RESULT", f"=" * 70, f"Message Type: {msg_type_clean}", ] # Look up message type info type_info = HL7_MESSAGE_TYPES.get(msg_type_clean) if type_info: parts.append(f"Description: {type_info['name']}") parts.append(f" {type_info['description']}") parts.append("") # Parse each segment for seg_text in segments: seg_id = seg_text.split(field_sep)[0] if field_sep in seg_text else seg_text[:3] if seg_id == "MSH": fields = _split_msh(seg_text, field_sep) else: fields = seg_text.split(field_sep) parts.append("-" * 70) seg_info = HL7_SEGMENTS.get(seg_id) if seg_info: parts.append(f"Segment: {seg_id} — {seg_info['name']}") parts.append(f" {seg_info['description']}") else: parts.append(f"Segment: {seg_id} (not in standard dictionary — may be Z-segment or custom)") parts.append("") # Parse fields for i, value in enumerate(fields): if i == 0: continue # Skip segment ID if not value: continue # Skip empty fields field_name = "Unknown" field_desc = "" table_ref = None if seg_info: field_def = _get_field_def(seg_info, i) if field_def: field_name = field_def["name"] field_desc = field_def.get("description", "") table_ref = field_def.get("table") # Format the field output display_value = value if len(value) > 120: display_value = value[:120] + "..." parts.append(f" {seg_id}-{i}: {field_name}") parts.append(f" Value: {display_value}") # Look up table values if applicable if table_ref and value: table_lookup = _lookup_table_value(table_ref, value, component_sep) if table_lookup: parts.append(f" Table {table_ref}: {table_lookup}") # Parse components for complex fields if component_sep in value and len(value.split(component_sep)) > 1: components = value.split(component_sep) non_empty = [(j, c) for j, c in enumerate(components, 1) if c] if len(non_empty) > 1: parts.append(f" Components:") for comp_idx, comp_val in non_empty: parts.append(f" .{comp_idx}: {comp_val}") parts.append("") parts.append("=" * 70) return "\n".join(parts)