DICOM MCP Server
by ChristianHinge
Verified
"""
DICOM Client.
This module provides a clean interface to pynetdicom functionality,
abstracting the details of DICOM networking.
"""
import os
import time
import tempfile
from typing import Dict, List, Any, Tuple
from pydicom import dcmread
from pydicom.dataset import Dataset
from pynetdicom import AE, evt, build_role
from pynetdicom.sop_class import (
PatientRootQueryRetrieveInformationModelFind,
StudyRootQueryRetrieveInformationModelFind,
PatientRootQueryRetrieveInformationModelGet,
PatientRootQueryRetrieveInformationModelMove,
StudyRootQueryRetrieveInformationModelGet,
StudyRootQueryRetrieveInformationModelMove,
Verification,
EncapsulatedPDFStorage # Add specific SOP class for PDF
)
from .attributes import get_attributes_for_level
class DicomClient:
"""DICOM networking client that handles communication with DICOM nodes."""
def __init__(self, host: str, port: int, calling_aet: str, called_aet: str):
"""Initialize DICOM client.
Args:
host: DICOM node hostname or IP
port: DICOM node port
calling_aet: Local AE title (our AE title)
called_aet: Remote AE title (the node we're connecting to)
"""
self.host = host
self.port = port
self.called_aet = called_aet
self.calling_aet = calling_aet
# Create the Application Entity
self.ae = AE(ae_title=calling_aet)
# Add the necessary presentation contexts
self.ae.add_requested_context(Verification)
self.ae.add_requested_context(PatientRootQueryRetrieveInformationModelFind)
self.ae.add_requested_context(PatientRootQueryRetrieveInformationModelGet)
self.ae.add_requested_context(PatientRootQueryRetrieveInformationModelMove)
self.ae.add_requested_context(StudyRootQueryRetrieveInformationModelFind)
self.ae.add_requested_context(StudyRootQueryRetrieveInformationModelGet)
self.ae.add_requested_context(StudyRootQueryRetrieveInformationModelMove)
# Add specific storage context for PDF - instead of adding all storage contexts
self.ae.add_requested_context(EncapsulatedPDFStorage)
def verify_connection(self) -> Tuple[bool, str]:
"""Verify connectivity to the DICOM node using C-ECHO.
Returns:
Tuple of (success, message)
"""
# Associate with the DICOM node
assoc = self.ae.associate(self.host, self.port, ae_title=self.called_aet)
if assoc.is_established:
# Send C-ECHO request
status = assoc.send_c_echo()
# Release the association
assoc.release()
if status and status.Status == 0:
return True, f"Connection successful to {self.host}:{self.port} (Called AE: {self.called_aet}, Calling AE: {self.calling_aet})"
else:
return False, f"C-ECHO failed with status: {status.Status if status else 'None'}"
else:
return False, f"Failed to associate with DICOM node at {self.host}:{self.port} (Called AE: {self.called_aet}, Calling AE: {self.calling_aet})"
def find(self, query_dataset: Dataset, query_model) -> List[Dict[str, Any]]:
"""Execute a C-FIND request.
Args:
query_dataset: Dataset containing query parameters
query_model: DICOM query model (Patient/StudyRoot)
Returns:
List of dictionaries containing query results
Raises:
Exception: If association fails
"""
# Associate with the DICOM node
assoc = self.ae.associate(self.host, self.port, ae_title=self.called_aet)
if not assoc.is_established:
raise Exception(f"Failed to associate with DICOM node at {self.host}:{self.port} (Called AE: {self.called_aet}, Calling AE: {self.calling_aet})")
results = []
try:
# Send C-FIND request
responses = assoc.send_c_find(query_dataset, query_model)
for (status, dataset) in responses:
if status and status.Status == 0xFF00: # Pending
if dataset:
results.append(self._dataset_to_dict(dataset))
finally:
# Always release the association
assoc.release()
return results
def query_patient(self, patient_id: str = None, name_pattern: str = None,
birth_date: str = None, attribute_preset: str = "standard",
additional_attrs: List[str] = None, exclude_attrs: List[str] = None) -> List[Dict[str, Any]]:
"""Query for patients matching criteria.
Args:
patient_id: Patient ID
name_pattern: Patient name pattern (can include wildcards * and ?)
birth_date: Patient birth date (YYYYMMDD)
attribute_preset: Attribute preset (minimal, standard, extended)
additional_attrs: Additional attributes to include
exclude_attrs: Attributes to exclude
Returns:
List of matching patient records
"""
# Create query dataset
ds = Dataset()
ds.QueryRetrieveLevel = "PATIENT"
# Add query parameters if provided
if patient_id:
ds.PatientID = patient_id
if name_pattern:
ds.PatientName = name_pattern
if birth_date:
ds.PatientBirthDate = birth_date
# Add attributes based on preset
attrs = get_attributes_for_level("patient", attribute_preset, additional_attrs, exclude_attrs)
for attr in attrs:
if not hasattr(ds, attr):
setattr(ds, attr, "")
# Execute query
return self.find(ds, PatientRootQueryRetrieveInformationModelFind)
def query_study(self, patient_id: str = None, study_date: str = None,
modality: str = None, study_description: str = None,
accession_number: str = None, study_instance_uid: str = None,
attribute_preset: str = "standard", additional_attrs: List[str] = None,
exclude_attrs: List[str] = None) -> List[Dict[str, Any]]:
"""Query for studies matching criteria.
Args:
patient_id: Patient ID
study_date: Study date or range (YYYYMMDD or YYYYMMDD-YYYYMMDD)
modality: Modalities in study
study_description: Study description (can include wildcards)
accession_number: Accession number
study_instance_uid: Study Instance UID
attribute_preset: Attribute preset (minimal, standard, extended)
additional_attrs: Additional attributes to include
exclude_attrs: Attributes to exclude
Returns:
List of matching study records
"""
# Create query dataset
ds = Dataset()
ds.QueryRetrieveLevel = "STUDY"
# Add query parameters if provided
if patient_id:
ds.PatientID = patient_id
if study_date:
ds.StudyDate = study_date
if modality:
ds.ModalitiesInStudy = modality
if study_description:
ds.StudyDescription = study_description
if accession_number:
ds.AccessionNumber = accession_number
if study_instance_uid:
ds.StudyInstanceUID = study_instance_uid
# Add attributes based on preset
attrs = get_attributes_for_level("study", attribute_preset, additional_attrs, exclude_attrs)
for attr in attrs:
if not hasattr(ds, attr):
setattr(ds, attr, "")
# Execute query
return self.find(ds, StudyRootQueryRetrieveInformationModelFind)
def query_series(self, study_instance_uid: str, series_instance_uid: str = None,
modality: str = None, series_number: str = None,
series_description: str = None, attribute_preset: str = "standard",
additional_attrs: List[str] = None, exclude_attrs: List[str] = None) -> List[Dict[str, Any]]:
"""Query for series matching criteria.
Args:
study_instance_uid: Study Instance UID (required)
series_instance_uid: Series Instance UID
modality: Modality (e.g. "CT", "MR")
series_number: Series number
series_description: Series description (can include wildcards)
attribute_preset: Attribute preset (minimal, standard, extended)
additional_attrs: Additional attributes to include
exclude_attrs: Attributes to exclude
Returns:
List of matching series records
"""
# Create query dataset
ds = Dataset()
ds.QueryRetrieveLevel = "SERIES"
ds.StudyInstanceUID = study_instance_uid
# Add query parameters if provided
if series_instance_uid:
ds.SeriesInstanceUID = series_instance_uid
if modality:
ds.Modality = modality
if series_number:
ds.SeriesNumber = series_number
if series_description:
ds.SeriesDescription = series_description
# Add attributes based on preset
attrs = get_attributes_for_level("series", attribute_preset, additional_attrs, exclude_attrs)
for attr in attrs:
if not hasattr(ds, attr):
setattr(ds, attr, "")
# Execute query
return self.find(ds, StudyRootQueryRetrieveInformationModelFind)
def query_instance(self, series_instance_uid: str, sop_instance_uid: str = None,
instance_number: str = None, attribute_preset: str = "standard",
additional_attrs: List[str] = None, exclude_attrs: List[str] = None) -> List[Dict[str, Any]]:
"""Query for instances matching criteria.
Args:
series_instance_uid: Series Instance UID (required)
sop_instance_uid: SOP Instance UID
instance_number: Instance number
attribute_preset: Attribute preset (minimal, standard, extended)
additional_attrs: Additional attributes to include
exclude_attrs: Attributes to exclude
Returns:
List of matching instance records
"""
# Create query dataset
ds = Dataset()
ds.QueryRetrieveLevel = "IMAGE"
ds.SeriesInstanceUID = series_instance_uid
# Add query parameters if provided
if sop_instance_uid:
ds.SOPInstanceUID = sop_instance_uid
if instance_number:
ds.InstanceNumber = instance_number
# Add attributes based on preset
attrs = get_attributes_for_level("instance", attribute_preset, additional_attrs, exclude_attrs)
for attr in attrs:
if not hasattr(ds, attr):
setattr(ds, attr, "")
# Execute query
return self.find(ds, StudyRootQueryRetrieveInformationModelFind)
def extract_pdf_text_from_dicom(
self,
study_instance_uid: str,
series_instance_uid: str,
sop_instance_uid: str
) -> Dict[str, Any]:
"""Retrieve a DICOM instance with encapsulated PDF and extract its text content.
This function retrieves a DICOM instance that contains an encapsulated PDF document
using C-GET and extracts the PDF content using PyPDF2 to parse the text content.
Args:
study_instance_uid: Study Instance UID
series_instance_uid: Series Instance UID
sop_instance_uid: SOP Instance UID
Returns:
Dictionary with extracted text information and status:
{
"success": bool,
"message": str,
"text_content": str,
"file_path": str # Path to the temporary DICOM file
}
"""
# Create temporary directory for storing retrieved files
temp_dir = tempfile.mkdtemp()
# Create dataset for C-GET query
ds = Dataset()
ds.QueryRetrieveLevel = "IMAGE"
ds.StudyInstanceUID = study_instance_uid
ds.SeriesInstanceUID = series_instance_uid
ds.SOPInstanceUID = sop_instance_uid
# Define a handler for C-STORE operations during C-GET
received_files = []
def handle_store(event):
"""Handle C-STORE operations during C-GET"""
ds = event.dataset
sop_instance = ds.SOPInstanceUID if hasattr(ds, 'SOPInstanceUID') else "unknown"
# Ensure we have file meta information
if not hasattr(ds, 'file_meta') or not hasattr(ds.file_meta, 'TransferSyntaxUID'):
from pydicom.dataset import FileMetaDataset
if not hasattr(ds, 'file_meta'):
ds.file_meta = FileMetaDataset()
if event.context.transfer_syntax:
ds.file_meta.TransferSyntaxUID = event.context.transfer_syntax
else:
ds.file_meta.TransferSyntaxUID = "1.2.840.10008.1.2.1"
if not hasattr(ds.file_meta, 'MediaStorageSOPClassUID') and hasattr(ds, 'SOPClassUID'):
ds.file_meta.MediaStorageSOPClassUID = ds.SOPClassUID
if not hasattr(ds.file_meta, 'MediaStorageSOPInstanceUID') and hasattr(ds, 'SOPInstanceUID'):
ds.file_meta.MediaStorageSOPInstanceUID = ds.SOPInstanceUID
# Save the dataset to file
file_path = os.path.join(temp_dir, f"{sop_instance}.dcm")
ds.save_as(file_path, write_like_original=False)
received_files.append(file_path)
return 0x0000 # Success
# Define event handlers - using the proper format for pynetdicom
handlers = [(evt.EVT_C_STORE, handle_store)]
# Create an SCP/SCU Role Selection Negotiation item for PDF Storage
# This is needed to indicate our AE can act as an SCP (receiver) for C-STORE operations
# during the C-GET operation
role = build_role(EncapsulatedPDFStorage, scp_role=True)
# Associate with the DICOM node, providing the event handlers during association
# This is the correct way to handle events in pynetdicom
assoc = self.ae.associate(
self.host,
self.port,
ae_title=self.called_aet,
evt_handlers=handlers,
ext_neg=[role] # Add extended negotiation for SCP/SCU role selection
)
if not assoc.is_established:
return {
"success": False,
"message": f"Failed to associate with DICOM node at {self.host}:{self.port}",
"text_content": "",
"file_path": ""
}
success = False
message = "C-GET operation failed"
pdf_path = ""
extracted_text = ""
try:
# Send C-GET request - without evt_handlers parameter since we provided them during association
responses = assoc.send_c_get(ds, PatientRootQueryRetrieveInformationModelGet)
for (status, dataset) in responses:
if status:
status_int = status.Status if hasattr(status, 'Status') else 0
if status_int == 0x0000: # Success
success = True
message = "C-GET operation completed successfully"
elif status_int == 0xFF00: # Pending
success = True # Still processing
message = "C-GET operation in progress"
finally:
# Always release the association
assoc.release()
# Process received files
if received_files:
dicom_file = received_files[0]
# Read the DICOM file
ds = dcmread(dicom_file)
# Check if it's an encapsulated PDF
if (hasattr(ds, 'SOPClassUID') and
ds.SOPClassUID == '1.2.840.10008.5.1.4.1.1.104.1'): # Encapsulated PDF Storage
# Extract the PDF data
pdf_data = ds.EncapsulatedDocument
# Write to a temporary file
pdf_path = os.path.join(temp_dir, "extracted.pdf")
with open(pdf_path, 'wb') as pdf_file:
pdf_file.write(pdf_data)
import PyPDF2
# Extract text from the PDF
with open(pdf_path, 'rb') as pdf_file:
pdf_reader = PyPDF2.PdfReader(pdf_file)
text_parts = []
# Extract text from each page
for page_num in range(len(pdf_reader.pages)):
page = pdf_reader.pages[page_num]
text_parts.append(page.extract_text())
extracted_text = "\n".join(text_parts)
return {
"success": True,
"message": "Successfully extracted text from PDF in DICOM",
"text_content": extracted_text,
"file_path": dicom_file
}
else:
message = "Retrieved DICOM instance does not contain an encapsulated PDF"
success = False
return {
"success": success,
"message": message,
"text_content": extracted_text,
"file_path": received_files[0] if received_files else ""
}
@staticmethod
def _dataset_to_dict(dataset: Dataset) -> Dict[str, Any]:
"""Convert a DICOM dataset to a dictionary.
Args:
dataset: DICOM dataset
Returns:
Dictionary representation of the dataset
"""
if hasattr(dataset, "is_empty") and dataset.is_empty():
return {}
result = {}
for elem in dataset:
if elem.VR == "SQ":
# Handle sequences
result[elem.keyword] = [DicomClient._dataset_to_dict(item) for item in elem.value]
else:
# Handle regular elements
if hasattr(elem, "keyword"):
try:
if elem.VM > 1:
# Multiple values
result[elem.keyword] = list(elem.value)
else:
# Single value
result[elem.keyword] = elem.value
except Exception:
# Fall back to string representation
result[elem.keyword] = str(elem.value)
return result