"""
Import Radiology Data into FHIR
Links MIMIC-CXR radiology images to FHIR Patient resources by:
1. Extracting unique subject IDs from MIMICCXRImages
2. Matching to existing FHIR Patients (by name similarity or ID patterns)
3. Creating new Synthea patients for unmatched subjects
4. Populating PatientImageMapping table
Part of Feature 007: FHIR Radiology Integration
"""
import argparse
import json
import os
import sys
import random
import string
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Tuple, Any
import requests
# Add project root to path
parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '../..'))
if parent_dir not in sys.path:
sys.path.insert(0, parent_dir)
from src.db.connection import get_connection
from src.setup.create_patient_mapping import (
lookup_patient_mapping,
insert_patient_mapping,
get_mapping_stats
)
from src.adapters.fhir_radiology_adapter import (
FHIRRadiologyAdapter,
ImagingStudyData,
DiagnosticReportData
)
# Configuration
FHIR_BASE_URL = os.getenv('FHIR_BASE_URL', 'http://localhost:52773/fhir/r4')
SYNTHEA_NAMES = [
("James", "Wilson"), ("Sarah", "Connor"), ("Michael", "Chen"),
("Emily", "Johnson"), ("David", "Brown"), ("Jessica", "Martinez"),
("Robert", "Garcia"), ("Lisa", "Anderson"), ("William", "Taylor"),
("Jennifer", "Thomas"), ("Christopher", "Jackson"), ("Amanda", "White"),
("Daniel", "Harris"), ("Michelle", "Martin"), ("Matthew", "Thompson"),
("Ashley", "Robinson"), ("Andrew", "Clark"), ("Stephanie", "Lewis"),
("Joshua", "Lee"), ("Nicole", "Walker")
]
def get_mimic_studies_for_subject(subject_id: str) -> List[Dict]:
"""
Get all imaging studies for a MIMIC subject from the MIMICCXRImages table.
Args:
subject_id: MIMIC subject ID (e.g., 'p10002428')
Returns:
List of dicts with study_id, view_position, image_path
"""
conn = get_connection()
cursor = conn.cursor()
try:
sql = """
SELECT DISTINCT StudyID, ViewPosition, ImagePath
FROM VectorSearch.MIMICCXRImages
WHERE SubjectID = ?
"""
cursor.execute(sql, (subject_id,))
studies = []
for row in cursor.fetchall():
studies.append({
'study_id': row[0],
'view_position': row[1],
'image_path': row[2]
})
return studies
finally:
cursor.close()
conn.close()
def get_mimic_subject_ids(limit: Optional[int] = None) -> List[str]:
"""
Get unique MIMIC subject IDs from the MIMICCXRImages table.
Args:
limit: Maximum number of subjects to return (for testing)
Returns:
List of unique subject IDs (e.g., ['p10002428', 'p10003187', ...])
"""
conn = get_connection()
cursor = conn.cursor()
try:
if limit:
sql = f"SELECT DISTINCT TOP {limit} SubjectID FROM VectorSearch.MIMICCXRImages"
else:
sql = "SELECT DISTINCT SubjectID FROM VectorSearch.MIMICCXRImages"
cursor.execute(sql)
subjects = [row[0] for row in cursor.fetchall()]
return subjects
finally:
cursor.close()
conn.close()
def search_fhir_patients(name: Optional[str] = None, count: int = 100) -> List[Dict]:
"""
Search FHIR Patient resources.
Args:
name: Patient name to search for
count: Maximum results to return
Returns:
List of FHIR Patient resources
"""
session = requests.Session()
session.headers.update({
'Accept': 'application/fhir+json'
})
params = {'_count': count}
if name:
params['name'] = name
try:
response = session.get(f"{FHIR_BASE_URL}/Patient", params=params)
response.raise_for_status()
bundle = response.json()
patients = []
for entry in bundle.get('entry', []):
if 'resource' in entry:
patients.append(entry['resource'])
return patients
except Exception as e:
print(f"Error searching FHIR patients: {e}", file=sys.stderr)
return []
def get_patient_name(patient: Dict) -> str:
"""Extract display name from FHIR Patient resource."""
names = patient.get('name', [])
if not names:
return "Unknown"
name = names[0]
# Try text first
if 'text' in name:
return name['text']
# Build from parts
parts = []
if 'given' in name:
parts.extend(name['given'])
if 'family' in name:
parts.append(name['family'])
return ' '.join(parts) if parts else "Unknown"
def match_patient_for_subject(
subject_id: str,
fhir_patients: List[Dict],
used_patient_ids: set
) -> Optional[Tuple[str, str, float, str]]:
"""
Try to match a MIMIC subject to an existing FHIR patient.
Uses various heuristics:
- Random assignment from available pool (demo mode)
- Name pattern matching
- ID suffix matching
Args:
subject_id: MIMIC subject ID (e.g., 'p10002428')
fhir_patients: List of available FHIR Patient resources
used_patient_ids: Set of already-used patient IDs
Returns:
Tuple of (patient_id, patient_name, confidence, match_type) or None
"""
# Filter to available patients (not already used)
available = [p for p in fhir_patients if p.get('id') not in used_patient_ids]
if not available:
return None
# For demo: randomly assign to create a diverse patient set
# In production, this would use more sophisticated matching
patient = random.choice(available)
patient_id = patient.get('id', '')
patient_name = get_patient_name(patient)
# Mark as used
used_patient_ids.add(patient_id)
return (patient_id, patient_name, 0.85, 'random_assignment')
def create_synthea_patient(subject_id: str) -> Tuple[str, str]:
"""
Create a new synthetic patient using Synthea-like naming.
Args:
subject_id: MIMIC subject ID for generating patient
Returns:
Tuple of (patient_id, patient_name)
"""
# Generate deterministic name based on subject_id hash
hash_val = hash(subject_id)
name_idx = abs(hash_val) % len(SYNTHEA_NAMES)
given, family = SYNTHEA_NAMES[name_idx]
# Add random suffix for uniqueness
suffix = ''.join(random.choices(string.digits, k=3))
patient_id = f"synthea-{subject_id.replace('p', '')}"
full_name = f"{given} {family}"
# Create FHIR Patient resource
patient_resource = {
"resourceType": "Patient",
"id": patient_id,
"identifier": [
{
"system": "urn:mimic-cxr:subject",
"value": subject_id
},
{
"system": "urn:synthea:patient",
"value": patient_id
}
],
"name": [
{
"use": "official",
"family": family,
"given": [given],
"text": full_name
}
],
"gender": "unknown",
"birthDate": f"{1940 + abs(hash_val) % 60}-01-01"
}
# POST to FHIR server
adapter = FHIRRadiologyAdapter()
try:
result = adapter.put_resource(patient_resource)
print(f" Created Synthea patient: {full_name} ({patient_id})")
return (patient_id, full_name)
except Exception as e:
print(f" Failed to create Synthea patient: {e}", file=sys.stderr)
# Return the ID anyway for mapping purposes
return (patient_id, full_name)
def import_subject_mappings(
subjects: List[str],
fhir_patients: List[Dict],
create_synthea: bool = True,
dry_run: bool = False
) -> Dict[str, Any]:
"""
Import patient mappings for MIMIC subjects.
Args:
subjects: List of MIMIC subject IDs
fhir_patients: List of available FHIR patients
create_synthea: Whether to create Synthea patients for unmatched
dry_run: If True, don't actually insert mappings
Returns:
Statistics dict with counts of operations
"""
stats = {
'total_subjects': len(subjects),
'already_mapped': 0,
'matched_to_fhir': 0,
'synthea_created': 0,
'unlinked': 0,
'errors': 0
}
used_patient_ids = set()
for i, subject_id in enumerate(subjects):
if (i + 1) % 50 == 0:
print(f"Processing {i + 1}/{len(subjects)}...")
# Check if already mapped
existing = lookup_patient_mapping(subject_id)
if existing:
stats['already_mapped'] += 1
used_patient_ids.add(existing['FHIRPatientID'])
continue
# Try to match to existing FHIR patient
match = match_patient_for_subject(subject_id, fhir_patients, used_patient_ids)
if match:
patient_id, patient_name, confidence, match_type = match
if not dry_run:
try:
insert_patient_mapping(
mimic_subject_id=subject_id,
fhir_patient_id=patient_id,
fhir_patient_name=patient_name,
match_confidence=confidence,
match_type=match_type
)
stats['matched_to_fhir'] += 1
except Exception as e:
print(f"Error inserting mapping for {subject_id}: {e}", file=sys.stderr)
stats['errors'] += 1
else:
stats['matched_to_fhir'] += 1
elif create_synthea:
# Create new Synthea patient
if not dry_run:
try:
patient_id, patient_name = create_synthea_patient(subject_id)
insert_patient_mapping(
mimic_subject_id=subject_id,
fhir_patient_id=patient_id,
fhir_patient_name=patient_name,
match_confidence=1.0,
match_type='synthea_generated'
)
stats['synthea_created'] += 1
except Exception as e:
print(f"Error creating Synthea patient for {subject_id}: {e}", file=sys.stderr)
stats['errors'] += 1
else:
stats['synthea_created'] += 1
else:
stats['unlinked'] += 1
return stats
def generate_unlinked_report(output_path: Optional[str] = None) -> Dict[str, Any]:
"""
Generate report of MIMIC subjects without patient mappings (FR-007).
Args:
output_path: Optional path to write JSON report
Returns:
Report dict with unlinked subjects
"""
conn = get_connection()
cursor = conn.cursor()
try:
# Find subjects without mappings
sql = """
SELECT DISTINCT i.SubjectID
FROM VectorSearch.MIMICCXRImages i
LEFT JOIN VectorSearch.PatientImageMapping m
ON i.SubjectID = m.MIMICSubjectID
WHERE m.MIMICSubjectID IS NULL
"""
cursor.execute(sql)
unlinked = [row[0] for row in cursor.fetchall()]
report = {
'generated_at': datetime.now().isoformat(),
'total_unlinked': len(unlinked),
'subjects': unlinked
}
if output_path:
with open(output_path, 'w') as f:
json.dump(report, f, indent=2)
print(f"Unlinked report written to: {output_path}")
return report
finally:
cursor.close()
conn.close()
def create_imaging_studies_for_subject(
subject_id: str,
patient_id: str,
adapter: FHIRRadiologyAdapter,
match_encounters: bool = True,
dry_run: bool = False
) -> Dict[str, Any]:
"""
Create FHIR ImagingStudy resources for a MIMIC subject (T021).
Uses 24-hour window encounter matching (T019) to link studies to encounters.
Args:
subject_id: MIMIC subject ID
patient_id: FHIR Patient resource ID
adapter: FHIRRadiologyAdapter instance
match_encounters: Whether to attempt encounter matching
dry_run: If True, don't actually create resources
Returns:
Statistics dict with counts of operations
"""
stats = {
'studies_found': 0,
'imaging_studies_created': 0,
'encounters_matched': 0,
'errors': 0
}
# Get studies for this subject from database
studies = get_mimic_studies_for_subject(subject_id)
stats['studies_found'] = len(studies)
# Group by study_id (may have multiple images per study)
unique_studies = {}
for study in studies:
study_id = study['study_id']
if study_id not in unique_studies:
unique_studies[study_id] = study
for study_id, study_info in unique_studies.items():
try:
# Check if ImagingStudy already exists
existing = adapter.get_imaging_study(study_id)
if existing:
continue # Skip already created
# Determine encounter via 24-hour window matching (T019)
encounter_id = None
if match_encounters:
# Use current date as study date (in real scenario, extract from MIMIC metadata)
# For demo purposes, try to find any encounter for this patient
study_date = datetime.now()
encounter_id = adapter.lookup_encounter_by_date(
patient_id=patient_id,
study_date=study_date,
window_hours=24
)
if encounter_id:
stats['encounters_matched'] += 1
# Build ImagingStudy data
imaging_data = ImagingStudyData(
study_id=study_id,
subject_id=subject_id,
patient_id=patient_id,
modality="CR", # Chest X-ray
num_series=1,
num_instances=1,
encounter_id=encounter_id,
description=f"Chest X-ray ({study_info.get('view_position', 'unknown view')})"
)
if not dry_run:
resource = adapter.build_imaging_study(imaging_data)
adapter.put_resource(resource)
stats['imaging_studies_created'] += 1
except Exception as e:
print(f" Error creating ImagingStudy for {study_id}: {e}", file=sys.stderr)
stats['errors'] += 1
return stats
def main():
"""Main entry point for import script."""
parser = argparse.ArgumentParser(
description='Import MIMIC-CXR radiology data into FHIR'
)
parser.add_argument(
'--limit', type=int, default=None,
help='Limit number of subjects to process (for testing)'
)
parser.add_argument(
'--no-synthea', action='store_true',
help='Do not create Synthea patients for unmatched subjects'
)
parser.add_argument(
'--dry-run', action='store_true',
help='Preview changes without modifying database'
)
parser.add_argument(
'--unlinked-report', type=str,
help='Path to write unlinked subjects report'
)
parser.add_argument(
'--stats-only', action='store_true',
help='Only show current mapping statistics'
)
parser.add_argument(
'--create-imaging-studies', action='store_true',
help='Create FHIR ImagingStudy resources for mapped subjects (T021)'
)
parser.add_argument(
'--no-encounter-matching', action='store_true',
help='Skip 24-hour window encounter matching during ImagingStudy creation (T019)'
)
args = parser.parse_args()
# Stats only mode
if args.stats_only:
stats = get_mapping_stats()
print("\n=== Current Mapping Statistics ===")
print(f"Total mappings: {stats['total_mappings']}")
print(f"By match type:")
for match_type, count in stats['by_match_type'].items():
print(f" - {match_type}: {count}")
print(f"Average confidence: {stats['avg_confidence']:.2%}")
return
print("=== FHIR Radiology Import ===")
print(f"FHIR Base URL: {FHIR_BASE_URL}")
if args.dry_run:
print("MODE: Dry run (no changes will be made)")
print()
# Get MIMIC subjects
print("Fetching MIMIC subject IDs...")
subjects = get_mimic_subject_ids(limit=args.limit)
print(f"Found {len(subjects)} unique subjects")
# Get existing FHIR patients
print("Fetching existing FHIR patients...")
fhir_patients = search_fhir_patients(count=500)
print(f"Found {len(fhir_patients)} FHIR patients")
# Import mappings
print("\nImporting patient mappings...")
stats = import_subject_mappings(
subjects=subjects,
fhir_patients=fhir_patients,
create_synthea=not args.no_synthea,
dry_run=args.dry_run
)
# Print summary
print("\n=== Import Summary ===")
print(f"Total subjects processed: {stats['total_subjects']}")
print(f"Already mapped: {stats['already_mapped']}")
print(f"Matched to FHIR patients: {stats['matched_to_fhir']}")
print(f"Synthea patients created: {stats['synthea_created']}")
print(f"Left unlinked: {stats['unlinked']}")
print(f"Errors: {stats['errors']}")
# Generate unlinked report if requested
if args.unlinked_report:
print("\nGenerating unlinked subjects report...")
generate_unlinked_report(args.unlinked_report)
# Create ImagingStudy resources if requested (T021)
if args.create_imaging_studies:
print("\n=== Creating FHIR ImagingStudy Resources ===")
adapter = FHIRRadiologyAdapter()
imaging_stats = {
'total_subjects': 0,
'studies_found': 0,
'imaging_studies_created': 0,
'encounters_matched': 0,
'errors': 0
}
# Get all mapped subjects
conn = get_connection()
cursor = conn.cursor()
try:
cursor.execute("""
SELECT MIMICSubjectID, FHIRPatientID
FROM VectorSearch.PatientImageMapping
""")
mapped_subjects = [(row[0], row[1]) for row in cursor.fetchall()]
finally:
cursor.close()
conn.close()
print(f"Found {len(mapped_subjects)} mapped subjects")
for i, (subject_id, patient_id) in enumerate(mapped_subjects):
if (i + 1) % 50 == 0:
print(f"Processing {i + 1}/{len(mapped_subjects)}...")
subject_stats = create_imaging_studies_for_subject(
subject_id=subject_id,
patient_id=patient_id,
adapter=adapter,
match_encounters=not args.no_encounter_matching,
dry_run=args.dry_run
)
imaging_stats['total_subjects'] += 1
imaging_stats['studies_found'] += subject_stats['studies_found']
imaging_stats['imaging_studies_created'] += subject_stats['imaging_studies_created']
imaging_stats['encounters_matched'] += subject_stats['encounters_matched']
imaging_stats['errors'] += subject_stats['errors']
# Print ImagingStudy summary
print("\n=== ImagingStudy Creation Summary ===")
print(f"Subjects processed: {imaging_stats['total_subjects']}")
print(f"Studies found in MIMIC: {imaging_stats['studies_found']}")
print(f"ImagingStudy resources created: {imaging_stats['imaging_studies_created']}")
print(f"Encounters matched (24h window): {imaging_stats['encounters_matched']}")
print(f"Errors: {imaging_stats['errors']}")
if __name__ == '__main__':
main()