from fastmcp import FastMCP
from fhir_validator import (
load_consolidated_fhir_schema,
compile_fhir_schema,
validate_fhir_resource,
)
import httpx
import json
import os
import argparse
from typing import Any
from dotenv import load_dotenv
# Import Zus-specific extensions
from zus_extensions import (
get_patient_zus_upid as zus_get_patient_zus_upid,
calculate_name_similarity
)
# Parse command line arguments
parser = argparse.ArgumentParser(description='FHIR MCP Server')
parser.add_argument('--env-file', type=str, help='Path to environment file to load')
args, unknown = parser.parse_known_args()
# Load environment variables from .env file only if --env-file flag is provided
if args.env_file:
load_dotenv(args.env_file)
# Configuration from environment variables
FHIR_BASE_URL = os.getenv("FHIR_BASE_URL", "http://localhost:8080/fhir")
FHIR_ALLOW_READ = os.getenv("FHIR_ALLOW_READ", "true").lower() == "true"
FHIR_ALLOW_WRITE = os.getenv("FHIR_ALLOW_WRITE", "true").lower() == "true"
FHIR_AUTH_TOKEN = os.getenv("FHIR_AUTH_TOKEN", "")
# HTTP Methods configuration (takes precedence over ALLOW_READ/ALLOW_WRITE)
# Format: comma-separated list like "GET,POST,PUT" or empty for default behavior
FHIR_ALLOWED_METHODS = os.getenv("FHIR_ALLOWED_METHODS", "")
_allowed_methods_set = (
set(m.strip().upper() for m in FHIR_ALLOWED_METHODS.split(",") if m.strip())
if FHIR_ALLOWED_METHODS
else None
)
mcp = FastMCP("FHIR MCP Server")
# Load and compile FHIR validator schema
try:
fhir_schema = load_consolidated_fhir_schema()
compiled_validator = compile_fhir_schema(fhir_schema)
except Exception as e:
# If schema loading fails, set to None and validation will be skipped
compiled_validator = None
print(f"Warning: Could not load FHIR schema: {e}")
def is_http_method_allowed(method: str) -> bool:
"""
Check if an HTTP method is allowed based on configuration.
Priority:
1. If FHIR_ALLOWED_METHODS is set, use that (most specific)
2. Otherwise, fall back to FHIR_ALLOW_READ/WRITE (legacy)
"""
method = method.upper()
# If specific methods are configured, use those
if _allowed_methods_set is not None:
return method in _allowed_methods_set
# Otherwise, fall back to read/write permissions
if method == "GET":
return FHIR_ALLOW_READ
elif method in ["POST", "PUT", "PATCH", "DELETE"]:
return FHIR_ALLOW_WRITE
else:
return False
def get_fhir_headers(custom_headers: dict[str, str] | None = None) -> dict[str, str]:
"""
Get headers for FHIR API requests.
Args:
custom_headers: Optional dictionary of additional custom headers to include.
For example, Zus servers use {"Zus-Account": "builder-id"}.
Returns:
Dictionary of HTTP headers for FHIR requests
"""
headers = {
"Content-Type": "application/fhir+json",
"Accept": "application/fhir+json",
}
if FHIR_AUTH_TOKEN:
headers["Authorization"] = f"Bearer {FHIR_AUTH_TOKEN}"
if custom_headers:
headers.update(custom_headers)
return headers
@mcp.tool()
async def write_fhir_resource(resource: dict[str, Any], custom_headers: dict[str, str] | None = None) -> str:
"""
Write a FHIR resource to the FHIR server.
This tool:
1. Validates the FHIR resource using fhir-validator
2. Determines the appropriate endpoint based on resource type
3. POSTs or PUTs the resource to the FHIR server
4. Returns validation errors or server errors for correction
Args:
resource: A FHIR resource as a JSON object (dict)
custom_headers: Optional dictionary of custom HTTP headers to include in the request.
For Zus servers, use {"Zus-Account": "builder-id"} for multi-tenant access.
Returns:
A status message indicating success or detailed error information
"""
# Determine if this is a create or update first to check permissions
resource_id = resource.get("id")
method = "PUT" if resource_id else "POST"
# Check if the HTTP method is allowed
if not is_http_method_allowed(method):
if _allowed_methods_set is not None:
return f"Error: HTTP method {method} is not allowed. Allowed methods: {', '.join(sorted(_allowed_methods_set))}"
else:
return "Error: Write operations are not allowed. Set FHIR_ALLOW_WRITE=true to enable."
# Validate the resource first
if compiled_validator:
try:
is_valid, error_message = validate_fhir_resource(
resource, compiled_validator
)
if not is_valid:
return f"Validation failed:\n{error_message}"
except Exception as e:
return f"Validation error: {str(e)}"
# Extract resource type
resource_type = resource.get("resourceType")
if not resource_type:
return "Error: Resource must have a 'resourceType' field"
# Build the endpoint URL
if resource_id:
# Update operation (PUT)
url = f"{FHIR_BASE_URL}/{resource_type}/{resource_id}"
else:
# Create operation (POST)
url = f"{FHIR_BASE_URL}/{resource_type}"
# Send the request to FHIR server
try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.request(
method=method, url=url, json=resource, headers=get_fhir_headers(custom_headers)
)
# Handle different response codes
if response.status_code in [200, 201]:
# Success
if FHIR_ALLOW_READ:
# Return the full response if read is allowed
response_data = response.json()
created_id = response_data.get("id", resource_id)
return f"Success: {resource_type} resource {method.lower()}d successfully.\nID: {created_id}\n\nResponse:\n{json.dumps(response_data, indent=2)}"
else:
# Only return confirmation without resource data
created_id = resource_id or "(created)"
return f"Success: {resource_type} resource {method.lower()}d successfully.\nID: {created_id}"
elif response.status_code == 400:
# Bad request - return details for fixing
try:
error_data = response.json()
return f"Server validation error (400):\n{json.dumps(error_data, indent=2)}"
except:
return f"Server validation error (400): {response.text}"
elif response.status_code == 404:
if method == "PUT":
return f"Error: Resource {resource_type}/{resource_id} not found. Use POST to create a new resource."
else:
return f"Error: Endpoint {url} not found. Verify the resource type is correct."
elif response.status_code == 401:
return (
"Error: Authentication failed. Check FHIR_AUTH_TOKEN configuration."
)
elif response.status_code == 403:
return "Error: Authorization failed. Insufficient permissions to write this resource."
elif response.status_code == 422:
# Unprocessable entity - business rule violation
try:
error_data = response.json()
return f"Server business rule error (422):\n{json.dumps(error_data, indent=2)}"
except:
return f"Server business rule error (422): {response.text}"
else:
# Other errors
return f"Server error ({response.status_code}):\n{response.text}"
except httpx.TimeoutException:
return f"Error: Request to FHIR server timed out after 30 seconds"
except httpx.RequestError as e:
return f"Error connecting to FHIR server: {str(e)}\nVerify FHIR_BASE_URL is correct: {FHIR_BASE_URL}"
except Exception as e:
return f"Unexpected error: {str(e)}"
@mcp.tool()
async def read_fhir_resource(resource_type: str, resource_id: str, custom_headers: dict[str, str] | None = None) -> str:
"""
Read a FHIR resource by type and ID from the FHIR server.
Args:
resource_type: The FHIR resource type (e.g., "Patient", "Observation")
resource_id: The ID of the resource to read
custom_headers: Optional dictionary of custom HTTP headers to include in the request.
For Zus servers, use {"Zus-Account": "builder-id"} for multi-tenant access.
Returns:
The FHIR resource as JSON or an error message
"""
# Check if GET method is allowed
if not is_http_method_allowed("GET"):
if _allowed_methods_set is not None:
return f"Error: HTTP method GET is not allowed. Allowed methods: {', '.join(sorted(_allowed_methods_set))}"
else:
return "Error: Read operations are not allowed. Set FHIR_ALLOW_READ=true to enable."
url = f"{FHIR_BASE_URL}/{resource_type}/{resource_id}"
try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(url, headers=get_fhir_headers(custom_headers))
if response.status_code == 200:
resource_data = response.json()
return json.dumps(resource_data, indent=2)
elif response.status_code == 404:
return f"Error: Resource {resource_type}/{resource_id} not found."
elif response.status_code == 401:
return (
"Error: Authentication failed. Check FHIR_AUTH_TOKEN configuration."
)
elif response.status_code == 403:
return "Error: Authorization failed. Insufficient permissions to read this resource."
else:
return f"Server error ({response.status_code}):\n{response.text}"
except httpx.TimeoutException:
return f"Error: Request to FHIR server timed out after 30 seconds"
except httpx.RequestError as e:
return f"Error connecting to FHIR server: {str(e)}\nVerify FHIR_BASE_URL is correct: {FHIR_BASE_URL}"
except Exception as e:
return f"Unexpected error: {str(e)}"
@mcp.tool()
async def search_fhir_resources(
resource_type: str, search_params: dict[str, str] | None = None, custom_headers: dict[str, str] | None = None
) -> str:
"""
Search for FHIR resources using query parameters.
Args:
resource_type: The FHIR resource type to search (e.g., "Patient", "Observation")
search_params: Optional dictionary of search parameters (e.g., {"name": "Smith", "gender": "female"})
custom_headers: Optional dictionary of custom HTTP headers to include in the request.
For Zus servers, use {"Zus-Account": "builder-id"} for multi-tenant access.
Returns:
A FHIR Bundle containing matching resources or an error message
"""
# Check if GET method is allowed
if not is_http_method_allowed("GET"):
if _allowed_methods_set is not None:
return f"Error: HTTP method GET is not allowed. Allowed methods: {', '.join(sorted(_allowed_methods_set))}"
else:
return "Error: Read operations are not allowed. Set FHIR_ALLOW_READ=true to enable."
url = f"{FHIR_BASE_URL}/{resource_type}"
try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(
url, params=search_params or {}, headers=get_fhir_headers(custom_headers)
)
if response.status_code == 200:
bundle_data = response.json()
total = bundle_data.get("total", 0)
entries = bundle_data.get("entry", [])
result = f"Found {total} {resource_type} resource(s)\n\n"
result += json.dumps(bundle_data, indent=2)
return result
elif response.status_code == 400:
try:
error_data = response.json()
return f"Invalid search parameters (400):\n{json.dumps(error_data, indent=2)}"
except:
return f"Invalid search parameters (400): {response.text}"
elif response.status_code == 401:
return (
"Error: Authentication failed. Check FHIR_AUTH_TOKEN configuration."
)
elif response.status_code == 403:
return "Error: Authorization failed. Insufficient permissions to search resources."
elif response.status_code == 404:
return (
f"Error: Resource type {resource_type} not found or not supported."
)
else:
return f"Server error ({response.status_code}):\n{response.text}"
except httpx.TimeoutException:
return f"Error: Request to FHIR server timed out after 30 seconds"
except httpx.RequestError as e:
return f"Error connecting to FHIR server: {str(e)}\nVerify FHIR_BASE_URL is correct: {FHIR_BASE_URL}"
except Exception as e:
return f"Unexpected error: {str(e)}"
@mcp.tool()
async def get_patient_zus_upid(
first_name: str, last_name: str, builder_id: str | None = None
) -> str:
"""
Get the Zus UPID (Universal Patient ID) for a Patient resource from Zus FHIR server.
**This is a Zus-specific tool** for working with Zus Health's FHIR API.
Searches for a Patient by first and last name, optionally filtered by builderID,
then extracts the Zus UPID from the Patient's identifiers.
Args:
first_name: Patient's first name
last_name: Patient's last name
builder_id: Optional Zus builder ID (string) to filter the search
Returns:
The Zus UPID value or an error message if not found
"""
return await zus_get_patient_zus_upid(
first_name=first_name,
last_name=last_name,
builder_id=builder_id,
is_http_method_allowed_fn=is_http_method_allowed,
_allowed_methods_set=_allowed_methods_set
)
@mcp.tool()
async def get_fhir_config() -> str:
"""
Get the current FHIR server configuration.
Returns:
Current configuration settings
"""
config = f"""FHIR Server Configuration:
- Base URL: {FHIR_BASE_URL}
- Read Allowed: {FHIR_ALLOW_READ}
- Write Allowed: {FHIR_ALLOW_WRITE}
- Authentication: {"Configured" if FHIR_AUTH_TOKEN else "Not configured"}"""
if _allowed_methods_set is not None:
config += f"\n- Allowed HTTP Methods: {', '.join(sorted(_allowed_methods_set))} (overrides Read/Write settings)"
else:
config += "\n- Allowed HTTP Methods: Not configured (using Read/Write settings)"
return config