fhir_mcp_server.py•25.2 kB
"""
Progressive Discovery FHIR MCP Server
A FastMCP-based server that progressively discovers FHIR resource operations
based on user needs, inspired by Klavis progressive discovery patterns.
Architecture:
- Discovery Groups: FHIR resources organized into logical categories
- Progressive Loading: Tools are discovered based on user intent
- Native PDF Conversion: Converts medical PDFs to FHIR resources
"""
from typing import Any, Dict, List, Optional, Literal
import json
from datetime import datetime
from mcp.server.fastmcp import FastMCP
from pydantic import BaseModel, Field
# Initialize FastMCP server
mcp = FastMCP("FHIR Progressive Discovery Server")
# ============================================================================
# PROGRESSIVE DISCOVERY ARCHITECTURE
# ============================================================================
class ResourceGroup(BaseModel):
"""Represents a logical group of FHIR resources"""
name: str
description: str
resources: List[str]
priority: int = 0
# Define FHIR resource categories for progressive discovery
RESOURCE_GROUPS = {
"clinical": ResourceGroup(
name="Clinical Resources",
description="Patient clinical data: conditions, observations, procedures, medications, allergies, diagnostic reports",
resources=[
"Condition", "Observation", "Procedure", "Medication",
"MedicationRequest", "MedicationStatement", "Immunization",
"AllergyIntolerance", "DiagnosticReport", "Specimen"
],
priority=1
),
"administrative": ResourceGroup(
name="Administrative Resources",
description="Individuals and entities: patients, practitioners, organizations, locations",
resources=[
"Patient", "Practitioner", "PractitionerRole", "Person",
"RelatedPerson", "Organization", "Location", "Group",
"HealthcareService"
],
priority=1
),
"workflow": ResourceGroup(
name="Workflow Resources",
description="Care coordination and scheduling: encounters, appointments, episodes, tasks",
resources=[
"Encounter", "EpisodeOfCare", "Appointment", "AppointmentResponse",
"Schedule", "Slot", "Task", "ServiceRequest", "CareTeam", "CarePlan"
],
priority=2
),
"financial": ResourceGroup(
name="Financial Resources",
description="Billing and claims: coverage, claims, invoices, payments",
resources=[
"Coverage", "Claim", "ClaimResponse", "Invoice",
"PaymentNotice", "PaymentReconciliation", "Account",
"ChargeItem", "ExplanationOfBenefit"
],
priority=3
),
"clinical_reasoning": ResourceGroup(
name="Clinical Reasoning Resources",
description="Clinical decision support and guidelines",
resources=[
"ActivityDefinition", "PlanDefinition", "Questionnaire",
"QuestionnaireResponse", "RequestGroup", "RiskAssessment",
"ClinicalImpression", "DetectedIssue", "Goal"
],
priority=3
),
"diagnostics": ResourceGroup(
name="Diagnostic Resources",
description="Imaging and diagnostic studies",
resources=[
"ImagingStudy", "Media", "BodyStructure", "MolecularSequence",
"GenomicStudy"
],
priority=2
),
"medications": ResourceGroup(
name="Medication Resources",
description="Comprehensive medication management",
resources=[
"Medication", "MedicationAdministration", "MedicationDispense",
"MedicationKnowledge", "MedicationRequest", "MedicationStatement"
],
priority=2
),
"documents": ResourceGroup(
name="Document Resources",
description="Clinical documents and compositions",
resources=[
"DocumentReference", "DocumentManifest", "Composition",
"ClinicalDocument"
],
priority=2
),
"research": ResourceGroup(
name="Research Resources",
description="Clinical research and studies",
resources=[
"ResearchStudy", "ResearchSubject", "Evidence", "EvidenceVariable",
"Citation"
],
priority=4
),
"terminology": ResourceGroup(
name="Terminology Resources",
description="Code systems, value sets, and concept maps",
resources=[
"CodeSystem", "ValueSet", "ConceptMap", "NamingSystem"
],
priority=4
),
"security": ResourceGroup(
name="Security & Provenance",
description="Audit trails, consent, and data provenance",
resources=[
"AuditEvent", "Provenance", "Consent", "Permission"
],
priority=3
)
}
# Track discovered groups per session
_discovered_groups: set = set()
# ============================================================================
# DISCOVERY TOOLS
# ============================================================================
@mcp.tool()
def discover_resource_groups(
intent: str = Field(description="User's intent or domain (e.g., 'patient care', 'billing', 'research')")
) -> Dict[str, Any]:
"""
Discover available FHIR resource groups based on user intent.
This is the entry point for progressive discovery - call this first to understand
what resource categories are available before diving into specific resources.
Returns groups ranked by relevance to the intent.
"""
intent_lower = intent.lower()
# Simple keyword matching for relevance scoring
relevance_scores = {}
for group_id, group in RESOURCE_GROUPS.items():
score = 0
# Check intent against group metadata
search_text = f"{group.name} {group.description} {' '.join(group.resources)}".lower()
# Keyword matching
keywords = intent_lower.split()
for keyword in keywords:
if keyword in search_text:
score += 10
# Boost priority groups
score += (5 - group.priority) * 2
relevance_scores[group_id] = score
# Sort by relevance
sorted_groups = sorted(
relevance_scores.items(),
key=lambda x: x[1],
reverse=True
)
# Return top relevant groups
result = {
"intent": intent,
"discovered_groups": []
}
for group_id, score in sorted_groups[:5]: # Top 5 groups
if score > 0:
group = RESOURCE_GROUPS[group_id]
result["discovered_groups"].append({
"group_id": group_id,
"name": group.name,
"description": group.description,
"resource_count": len(group.resources),
"relevance_score": score
})
return result
@mcp.tool()
def explore_resource_group(
group_id: str = Field(description="ID of the resource group to explore (e.g., 'clinical', 'administrative')")
) -> Dict[str, Any]:
"""
Explore a specific FHIR resource group to see available resources and operations.
Call this after discovering groups to dive deeper into specific resource types.
"""
if group_id not in RESOURCE_GROUPS:
return {
"error": f"Unknown group_id: {group_id}",
"available_groups": list(RESOURCE_GROUPS.keys())
}
group = RESOURCE_GROUPS[group_id]
_discovered_groups.add(group_id)
return {
"group_id": group_id,
"name": group.name,
"description": group.description,
"resources": group.resources,
"available_operations": [
"create_resource",
"validate_resource",
"search_resources",
"get_resource_schema"
],
"next_steps": [
"Use get_resource_schema() to see structure of a specific resource type",
"Use create_resource() to create a new FHIR resource",
"Use validate_resource() to validate existing FHIR JSON",
"Use search_resources() to query resources"
]
}
@mcp.tool()
def list_all_resource_groups() -> Dict[str, Any]:
"""
List all available FHIR resource groups in the system.
Use this for a complete overview of available resource categories.
"""
return {
"total_groups": len(RESOURCE_GROUPS),
"groups": [
{
"group_id": group_id,
"name": group.name,
"description": group.description,
"resource_count": len(group.resources),
"priority": group.priority
}
for group_id, group in RESOURCE_GROUPS.items()
]
}
# ============================================================================
# FHIR RESOURCE OPERATIONS
# ============================================================================
@mcp.tool()
def get_resource_schema(
resource_type: str = Field(description="FHIR resource type (e.g., 'Patient', 'Observation')")
) -> Dict[str, Any]:
"""
Get the schema/structure for a specific FHIR resource type.
Returns field definitions, required fields, and examples.
"""
try:
from fhir.resources import get_fhir_model_class
# Get the resource class
resource_class = get_fhir_model_class(resource_type)
# Get Pydantic schema
schema = resource_class.model_json_schema()
return {
"resource_type": resource_type,
"fhir_version": "R5",
"schema": schema,
"description": schema.get("description", ""),
"required_fields": schema.get("required", [])
}
except Exception as e:
return {
"error": f"Failed to get schema for {resource_type}: {str(e)}",
"hint": "Make sure the resource type is valid and fhir.resources is installed"
}
@mcp.tool()
def create_resource(
resource_type: str = Field(description="FHIR resource type (e.g., 'Patient', 'Observation')"),
resource_data: Dict[str, Any] = Field(description="Resource data as JSON/dict")
) -> Dict[str, Any]:
"""
Create and validate a FHIR resource with the provided data.
Returns the validated resource in FHIR JSON format.
"""
try:
from fhir.resources import get_fhir_model_class
# Get the resource class
resource_class = get_fhir_model_class(resource_type)
# Create and validate the resource
resource = resource_class(**resource_data)
# Convert to FHIR JSON
fhir_json = json.loads(resource.model_dump_json(exclude_none=True))
return {
"success": True,
"resource_type": resource_type,
"resource": fhir_json,
"id": fhir_json.get("id", "not-assigned"),
"created_at": datetime.utcnow().isoformat() + "Z"
}
except Exception as e:
return {
"success": False,
"error": str(e),
"resource_type": resource_type,
"hint": "Check that resource_data matches FHIR schema requirements"
}
@mcp.tool()
def validate_resource(
resource_json: str = Field(description="FHIR resource as JSON string")
) -> Dict[str, Any]:
"""
Validate an existing FHIR resource JSON string.
Returns validation result with any errors found.
"""
try:
from fhir.resources import get_fhir_model_class
# Parse JSON
resource_dict = json.loads(resource_json)
# Get resource type
resource_type = resource_dict.get("resourceType")
if not resource_type:
return {
"valid": False,
"error": "Missing 'resourceType' field in resource"
}
# Get the resource class and validate
resource_class = get_fhir_model_class(resource_type)
resource = resource_class.model_validate_json(resource_json)
return {
"valid": True,
"resource_type": resource_type,
"id": resource_dict.get("id", "not-assigned"),
"message": "Resource is valid FHIR"
}
except json.JSONDecodeError as e:
return {
"valid": False,
"error": f"Invalid JSON: {str(e)}"
}
except Exception as e:
return {
"valid": False,
"error": str(e)
}
@mcp.tool()
def convert_between_formats(
resource_data: str = Field(description="FHIR resource as JSON, XML, or YAML string"),
input_format: Literal["json", "xml", "yaml"] = Field(description="Input format"),
output_format: Literal["json", "xml", "yaml"] = Field(description="Desired output format")
) -> Dict[str, Any]:
"""
Convert a FHIR resource between different formats (JSON, XML, YAML).
"""
try:
from fhir.resources import get_fhir_model_class
# Parse based on input format
if input_format == "json":
resource_dict = json.loads(resource_data)
resource_type = resource_dict.get("resourceType")
resource_class = get_fhir_model_class(resource_type)
resource = resource_class.model_validate_json(resource_data)
elif input_format == "xml":
# Try to extract resourceType from XML
import re
match = re.search(r'<(\w+)\s', resource_data)
if not match:
return {"success": False, "error": "Could not determine resource type from XML"}
resource_type = match.group(1)
resource_class = get_fhir_model_class(resource_type)
resource = resource_class.model_validate_xml(resource_data)
elif input_format == "yaml":
import yaml
resource_dict = yaml.safe_load(resource_data)
resource_type = resource_dict.get("resourceType")
resource_class = get_fhir_model_class(resource_type)
resource = resource_class.model_validate(resource_dict)
else:
return {"success": False, "error": f"Unsupported input format: {input_format}"}
# Convert to output format
if output_format == "json":
output = resource.model_dump_json(exclude_none=True, indent=2)
elif output_format == "xml":
output = resource.model_dump_xml()
elif output_format == "yaml":
output = resource.model_dump_yaml()
else:
return {"success": False, "error": f"Unsupported output format: {output_format}"}
return {
"success": True,
"resource_type": resource_type,
"input_format": input_format,
"output_format": output_format,
"output": output
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
# ============================================================================
# PDF TO FHIR CONVERSION
# ============================================================================
@mcp.tool()
def convert_pdf_to_fhir(
pdf_path: str = Field(description="Path to the PDF file to convert"),
document_type: str = Field(
default="clinical_note",
description="Type of document: 'clinical_note', 'lab_report', 'discharge_summary', 'prescription'"
),
patient_reference: Optional[str] = Field(
default=None,
description="Patient reference ID if known (e.g., 'Patient/123')"
)
) -> Dict[str, Any]:
"""
Convert a medical PDF document to FHIR format.
Extracts text from PDF and creates appropriate FHIR resources (DocumentReference, Observation, etc.)
based on document type.
"""
try:
import PyPDF2
from pathlib import Path
# Validate file exists
pdf_file = Path(pdf_path)
if not pdf_file.exists():
return {
"success": False,
"error": f"PDF file not found: {pdf_path}"
}
# Extract text from PDF
with open(pdf_path, 'rb') as file:
pdf_reader = PyPDF2.PdfReader(file)
text_content = ""
for page in pdf_reader.pages:
text_content += page.extract_text() + "\n"
# Get file metadata
file_size = pdf_file.stat().st_size
file_name = pdf_file.name
# Create DocumentReference resource
from fhir.resources.documentreference import DocumentReference
from fhir.resources.attachment import Attachment
from fhir.resources.codeableconcept import CodeableConcept
from fhir.resources.coding import Coding
# Map document types to LOINC codes
document_type_codes = {
"clinical_note": ("34108-1", "Outpatient Note"),
"lab_report": ("11502-2", "Laboratory report"),
"discharge_summary": ("18842-5", "Discharge summary"),
"prescription": ("57833-6", "Prescription for medication")
}
code, display = document_type_codes.get(
document_type,
("34108-1", "Outpatient Note")
)
# Build DocumentReference
doc_ref_data = {
"resourceType": "DocumentReference",
"status": "current",
"type": {
"coding": [{
"system": "http://loinc.org",
"code": code,
"display": display
}]
},
"content": [{
"attachment": {
"contentType": "application/pdf",
"title": file_name,
"data": None, # We don't include base64 data to keep response small
"size": file_size,
"creation": datetime.utcnow().isoformat() + "Z"
},
"format": {
"system": "http://ihe.net/fhir/ValueSet/IHE.FormatCode.codesystem",
"code": "urn:ihe:iti:xds:2017:mimeTypeSufficient",
"display": "mimeType Sufficient"
}
}],
"date": datetime.utcnow().isoformat() + "Z"
}
# Add patient reference if provided
if patient_reference:
doc_ref_data["subject"] = {"reference": patient_reference}
# Add extracted text as note
if text_content.strip():
doc_ref_data["description"] = text_content[:500] # First 500 chars
# Create and validate the resource
doc_ref = DocumentReference(**doc_ref_data)
fhir_json = json.loads(doc_ref.model_dump_json(exclude_none=True))
# Parse extracted text for structured data (basic extraction)
extracted_data = _extract_clinical_data(text_content, document_type)
return {
"success": True,
"document_reference": fhir_json,
"extracted_text_preview": text_content[:1000],
"text_length": len(text_content),
"page_count": len(pdf_reader.pages),
"extracted_structured_data": extracted_data,
"hint": "Use create_resource() to persist this DocumentReference to your FHIR server"
}
except ImportError:
return {
"success": False,
"error": "PyPDF2 library not installed. Install with: pip install PyPDF2"
}
except Exception as e:
return {
"success": False,
"error": f"Failed to convert PDF: {str(e)}"
}
def _extract_clinical_data(text: str, document_type: str) -> Dict[str, Any]:
"""
Extract structured clinical data from text.
This is a basic implementation - can be enhanced with NLP/AI.
"""
import re
extracted = {
"document_type": document_type,
"extracted_fields": {}
}
# Basic pattern matching for common clinical data
patterns = {
"dates": r'\b\d{1,2}[/-]\d{1,2}[/-]\d{2,4}\b',
"patient_id": r'(?:Patient ID|MRN|Medical Record):\s*(\S+)',
"blood_pressure": r'(?:BP|Blood Pressure):\s*(\d{2,3}/\d{2,3})',
"temperature": r'(?:Temp|Temperature):\s*([\d.]+)\s*°?[FCfc]?',
"medications": r'(?:Medication|Rx|Prescribed):\s*([^\n]+)',
"diagnoses": r'(?:Diagnosis|Dx):\s*([^\n]+)',
}
for field, pattern in patterns.items():
matches = re.findall(pattern, text, re.IGNORECASE)
if matches:
extracted["extracted_fields"][field] = matches
return extracted
@mcp.tool()
def batch_convert_pdfs_to_fhir(
pdf_directory: str = Field(description="Directory containing PDF files to convert"),
document_type: str = Field(default="clinical_note", description="Type of documents"),
patient_reference: Optional[str] = Field(default=None, description="Patient reference if applicable")
) -> Dict[str, Any]:
"""
Batch convert multiple PDF files in a directory to FHIR DocumentReferences.
"""
try:
from pathlib import Path
pdf_dir = Path(pdf_directory)
if not pdf_dir.exists() or not pdf_dir.is_dir():
return {
"success": False,
"error": f"Directory not found: {pdf_directory}"
}
# Find all PDF files
pdf_files = list(pdf_dir.glob("*.pdf"))
if not pdf_files:
return {
"success": False,
"error": f"No PDF files found in {pdf_directory}"
}
results = []
for pdf_file in pdf_files:
result = convert_pdf_to_fhir(
pdf_path=str(pdf_file),
document_type=document_type,
patient_reference=patient_reference
)
results.append({
"file": pdf_file.name,
"result": result
})
successful = sum(1 for r in results if r["result"].get("success"))
return {
"success": True,
"total_files": len(pdf_files),
"successful_conversions": successful,
"failed_conversions": len(pdf_files) - successful,
"results": results
}
except Exception as e:
return {
"success": False,
"error": f"Batch conversion failed: {str(e)}"
}
# ============================================================================
# SEARCH AND QUERY
# ============================================================================
@mcp.tool()
def search_resources(
resource_type: str = Field(description="FHIR resource type to search"),
search_params: Dict[str, str] = Field(
description="Search parameters as key-value pairs (e.g., {'name': 'John', 'birthdate': '1990-01-01'})"
)
) -> Dict[str, Any]:
"""
Search for FHIR resources with specified parameters.
Note: This is a client-side validation tool. For actual search, connect to a FHIR server.
"""
try:
from fhir.resources import get_fhir_model_class
# Validate resource type exists
resource_class = get_fhir_model_class(resource_type)
# Get schema to validate search parameters
schema = resource_class.model_json_schema()
properties = schema.get("properties", {})
# Validate search params against schema
valid_params = {}
invalid_params = {}
for param, value in search_params.items():
if param in properties:
valid_params[param] = value
else:
invalid_params[param] = value
return {
"resource_type": resource_type,
"search_params": search_params,
"valid_params": valid_params,
"invalid_params": invalid_params,
"note": "This validates search parameters. To execute searches, connect to a FHIR server.",
"hint": "Common search params: _id, _lastUpdated, _tag, _profile, _security"
}
except Exception as e:
return {
"error": f"Search validation failed: {str(e)}"
}
# ============================================================================
# UTILITY FUNCTIONS
# ============================================================================
@mcp.tool()
def get_fhir_version_info() -> Dict[str, Any]:
"""
Get information about supported FHIR versions and the current configuration.
"""
try:
import fhir.resources
return {
"default_version": "R5",
"supported_versions": ["R5", "R4B", "STU3"],
"library_version": getattr(fhir.resources, "__version__", "unknown"),
"features": [
"JSON serialization",
"XML serialization (experimental)",
"YAML serialization (experimental)",
"Pydantic v2 validation",
"Progressive discovery",
"PDF to FHIR conversion"
]
}
except Exception as e:
return {
"error": str(e)
}
if __name__ == "__main__":
# Run the MCP server
mcp.run()