"""
Search FDA database using precise device identifiers (K-number, PMA, UDI).
This is the most reliable search method when FDA numbers are known.
"""
from typing import Dict, Any, Optional
from pydantic import BaseModel, Field
from core.app import mcp
from fastmcp import Context
import aiohttp
import logging
logger = logging.getLogger(__name__)
# FDA API endpoints
FDA_API_BASE = "https://api.fda.gov"
FDA_510K_ENDPOINT = "/device/510k.json"
FDA_PMA_ENDPOINT = "/device/pma.json"
FDA_UDI_ENDPOINT = "/device/udi.json"
class FDADevice(BaseModel):
"""FDA device information from database."""
device_name: str = Field(description="Device trade/proprietary name")
manufacturer: str = Field(description="Manufacturer/applicant name")
fda_number: str = Field(description="FDA clearance/approval number")
decision_date: str = Field(description="FDA decision date")
product_code: Optional[str] = Field(default=None, description="FDA product classification code")
device_class: Optional[str] = Field(default=None, description="FDA device class (1, 2, or 3)")
regulation_number: Optional[str] = Field(default=None, description="CFR regulation number")
clearance_type: str = Field(description="Type: 510(k)/PMA/UDI")
fda_status: str = Field(default="cleared", description="FDA status: cleared/approved/registered")
review_advisory_committee: Optional[str] = Field(default=None, description="FDA review committee")
decision_code: Optional[str] = Field(default=None, description="FDA decision code")
raw_data: Dict[str, Any] = Field(default_factory=dict, description="Raw FDA API response")
@mcp.tool
async def search_fda_by_identifier(
k_number: Optional[str] = None,
pma_number: Optional[str] = None,
udi: Optional[str] = None,
ctx: Context = None
) -> Optional[Dict[str, Any]]:
"""
Search FDA database using precise device identifiers.
This is the most reliable FDA search method. Use when you have
specific FDA identifiers from the device or its documentation.
Searches 510(k), PMA, or UDI databases based on provided identifiers.
Args:
k_number: FDA 510(k) clearance number (e.g., "K223137")
pma_number: FDA PMA approval number (e.g., "P200012")
udi: FDA Unique Device Identifier
ctx: FastMCP context for logging
Returns:
Dictionary with complete FDA information if found,
None if not found in FDA database
"""
if ctx:
identifier = k_number or pma_number or udi or "no identifier"
await ctx.info(f"Searching FDA database for: {identifier}")
# Search by K-number (most common for hearing aids)
if k_number:
result = await _search_510k(k_number, ctx)
if result:
return result.model_dump() if result else None
# Search by PMA number (for higher-risk devices)
if pma_number:
result = await _search_pma(pma_number, ctx)
if result:
return result.model_dump() if result else None
# Search by UDI (if available)
if udi:
result = await _search_udi(udi, ctx)
if result:
return result.model_dump() if result else None
if ctx:
await ctx.warning("No device found with provided identifiers")
return None
async def _search_510k(k_number: str, ctx: Optional[Context]) -> Optional[FDADevice]:
"""Search FDA 510(k) database."""
try:
# Normalize K-number format
k_clean = k_number.upper().replace("-", "").replace(" ", "")
params = {
"search": f'k_number:"{k_clean}"',
"limit": "1"
}
timeout = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession(timeout=timeout) as session:
url = f"{FDA_API_BASE}{FDA_510K_ENDPOINT}"
async with session.get(url, params=params) as response:
if response.status == 200:
data = await response.json()
if data.get("results"):
record = data["results"][0]
device = FDADevice(
device_name=record.get("device_name", "Unknown Device"),
manufacturer=record.get("applicant", "Unknown Manufacturer"),
fda_number=record.get("k_number", k_number),
decision_date=record.get("decision_date", ""),
product_code=record.get("product_code"),
device_class=_extract_device_class(record),
regulation_number=record.get("regulation_number"),
clearance_type="510(k)",
fda_status="cleared",
review_advisory_committee=record.get("advisory_committee"),
decision_code=record.get("decision_code"),
raw_data=record
)
if ctx:
await ctx.info(f"Found 510(k) device: {device.device_name}")
return device
elif response.status == 404:
if ctx:
await ctx.info(f"No 510(k) found for {k_number}")
else:
if ctx:
await ctx.warning(f"FDA API returned status {response.status}")
except Exception as e:
logger.error(f"Error searching 510(k): {e}")
if ctx:
await ctx.error(f"510(k) search failed: {e}")
return None
async def _search_pma(pma_number: str, ctx: Optional[Context]) -> Optional[FDADevice]:
"""Search FDA PMA database."""
try:
# Normalize PMA number format
pma_clean = pma_number.upper().replace("-", "").replace(" ", "")
params = {
"search": f'pma_number:"{pma_clean}"',
"limit": "1"
}
timeout = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession(timeout=timeout) as session:
url = f"{FDA_API_BASE}{FDA_PMA_ENDPOINT}"
async with session.get(url, params=params) as response:
if response.status == 200:
data = await response.json()
if data.get("results"):
record = data["results"][0]
device = FDADevice(
device_name=record.get("trade_name", record.get("device_name", "Unknown Device")),
manufacturer=record.get("applicant", "Unknown Manufacturer"),
fda_number=record.get("pma_number", pma_number),
decision_date=record.get("decision_date", ""),
product_code=record.get("product_code"),
device_class="3", # PMA devices are typically Class III
clearance_type="PMA",
fda_status="approved",
review_advisory_committee=record.get("advisory_committee"),
decision_code=record.get("decision_code"),
raw_data=record
)
if ctx:
await ctx.info(f"Found PMA device: {device.device_name}")
return device
elif response.status == 404:
if ctx:
await ctx.info(f"No PMA found for {pma_number}")
else:
if ctx:
await ctx.warning(f"FDA API returned status {response.status}")
except Exception as e:
logger.error(f"Error searching PMA: {e}")
if ctx:
await ctx.error(f"PMA search failed: {e}")
return None
async def _search_udi(udi: str, ctx: Optional[Context]) -> Optional[FDADevice]:
"""Search FDA UDI database."""
try:
params = {
"search": f'udi:"{udi}"',
"limit": "1"
}
timeout = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession(timeout=timeout) as session:
url = f"{FDA_API_BASE}{FDA_UDI_ENDPOINT}"
async with session.get(url, params=params) as response:
if response.status == 200:
data = await response.json()
if data.get("results"):
record = data["results"][0]
# UDI records have different structure
device = FDADevice(
device_name=record.get("brand_name", record.get("device_description", "Unknown Device")),
manufacturer=record.get("company_name", "Unknown Manufacturer"),
fda_number=udi,
decision_date=record.get("commercial_distribution_end_date", ""),
device_class=record.get("device_class"),
clearance_type="UDI",
fda_status="registered",
raw_data=record
)
# Try to find associated 510(k) or PMA
if "identifiers" in record:
for identifier in record["identifiers"]:
if identifier.get("type") == "510(k)":
device.fda_number = identifier.get("id", udi)
device.clearance_type = "510(k)"
device.fda_status = "cleared"
elif identifier.get("type") == "PMA":
device.fda_number = identifier.get("id", udi)
device.clearance_type = "PMA"
device.fda_status = "approved"
if ctx:
await ctx.info(f"Found UDI device: {device.device_name}")
return device
elif response.status == 404:
if ctx:
await ctx.info(f"No UDI found for {udi}")
else:
if ctx:
await ctx.warning(f"FDA API returned status {response.status}")
except Exception as e:
logger.error(f"Error searching UDI: {e}")
if ctx:
await ctx.error(f"UDI search failed: {e}")
return None
def _extract_device_class(record: Dict[str, Any]) -> Optional[str]:
"""Extract device class from FDA record."""
# Try different field names
device_class = record.get("device_class")
if device_class:
return str(device_class)
# Sometimes encoded in product code info
product_code_info = record.get("product_code_info")
if product_code_info and isinstance(product_code_info, dict):
return product_code_info.get("device_class")
# Check openfda section
openfda = record.get("openfda", {})
if openfda.get("device_class"):
return openfda["device_class"]
return None