DICOM MCP Server
by fluxinc
GNU Lesser General Public License v2.1 only
2
from mcp.server.fastmcp import FastMCP
from pynetdicom import AE
from pynetdicom.sop_class import Verification
import sys
import yaml
import os
# Add debugging output
print("Starting DICOM MCP server...", file=sys.stderr)
# Load nodes configuration
def load_nodes_config():
"""Load the nodes configuration from nodes.yaml"""
try:
config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'nodes.yaml')
with open(config_path, 'r') as file:
return yaml.safe_load(file)
except Exception as e:
print(f"Error loading nodes configuration: {str(e)}", file=sys.stderr)
return {"nodes": {}, "local_ae_titles": []}
try:
mcp = FastMCP("DICOM")
print("Successfully initialized FastMCP", file=sys.stderr)
except Exception as e:
print(f"Error initializing FastMCP: {str(e)}", file=sys.stderr)
sys.exit(1)
@mcp.tool()
def list_dicom_nodes() -> dict:
"""List all configured DICOM nodes from the nodes.yaml file
Returns:
Dictionary containing the list of nodes and their details
"""
print("Listing DICOM nodes", file=sys.stderr)
config = load_nodes_config()
result = {
'nodes': [],
'local_ae_titles': []
}
# Add remote nodes
for name, details in config.get('nodes', {}).items():
node_info = {
'name': name,
'ae_title': details.get('ae_title', ''),
'ip': details.get('ip', ''),
'port': details.get('port', 0),
'description': details.get('description', '')
}
result['nodes'].append(node_info)
# Add local AE titles
for ae in config.get('local_ae_titles', []):
ae_info = {
'name': ae.get('name', ''),
'ae_title': ae.get('ae_title', ''),
'description': ae.get('description', '')
}
result['local_ae_titles'].append(ae_info)
return result
@mcp.tool()
def dicom_cecho_by_name(node_name: str, local_ae_name: str = "default") -> dict:
"""Perform a DICOM C-ECHO verification with a remote node using its configured name
Args:
node_name: The name of the remote node as configured in nodes.yaml
local_ae_name: The name of the local AE title to use (default: "default")
Returns:
Dictionary containing success status and message
"""
print(f"Attempting C-ECHO to node '{node_name}' using local AE '{local_ae_name}'", file=sys.stderr)
config = load_nodes_config()
# Find the remote node
if node_name not in config.get('nodes', {}):
return {
'success': False,
'message': f"Node '{node_name}' not found in configuration"
}
node = config['nodes'][node_name]
# Find the local AE title
local_ae_title = "MCP_DICOM" # Default fallback
for ae in config.get('local_ae_titles', []):
if ae.get('name') == local_ae_name:
local_ae_title = ae.get('ae_title')
break
# Call the original dicom_cecho function with the resolved parameters
return dicom_cecho(
remote_ae_title=node['ae_title'],
ip=node['ip'],
port=node['port'],
local_ae_title=local_ae_title
)
@mcp.tool()
def dicom_cecho(remote_ae_title: str, ip: str, port: int, local_ae_title: str = "MCP_DICOM") -> dict:
"""Perform a DICOM C-ECHO verification with a remote node
Args:
remote_ae_title: The AE title of the remote DICOM node
ip: IP address of the remote node
port: Port number of the remote node
local_ae_title: The AE title to use for the local end (default: "MCP_DICOM")
Returns:
Dictionary containing success status and message
"""
print(f"Attempting C-ECHO to {remote_ae_title}@{ip}:{port} from {local_ae_title}", file=sys.stderr)
# Initialize AE with local title
ae = AE(ae_title=local_ae_title)
# Add requested context for Verification (C-ECHO)
ae.add_requested_context(Verification)
try:
# Associate with peer AE
assoc = ae.associate(
addr=ip,
port=port,
ae_title=remote_ae_title.encode('ascii')
)
result = {
'success': False,
'message': ''
}
if assoc.is_established:
# Send C-ECHO request
status = assoc.send_c_echo()
if status:
# Check status code
status_code = status.Status
if status_code == 0x0000:
result['success'] = True
result['message'] = f"C-ECHO successful with {remote_ae_title}"
else:
result['message'] = f"C-ECHO failed with status: 0x{status_code:04x}"
else:
result['message'] = "Connection timed out or received invalid response"
# Release the association
assoc.release()
else:
result['message'] = f"Association rejected or failed with {remote_ae_title}"
return result
except Exception as e:
print(f"Error during C-ECHO: {str(e)}", file=sys.stderr)
return {
'success': False,
'message': f"Error during C-ECHO: {str(e)}"
}
if __name__ == "__main__":
# Start the MCP server
print("Starting MCP server on 0.0.0.0:8080", file=sys.stderr)
try:
mcp.run(host="0.0.0.0", port=8080) # Adjust host/port as needed
except Exception as e:
print(f"Error running MCP server: {str(e)}", file=sys.stderr)
sys.exit(1)