"""Unified FDA search tool that accepts any combination of identifiers or descriptive information. Automatically determines optimal search strategy."""
import time
from typing import Annotated, Optional
from datetime import datetime
import httpx
from pydantic import Field
from fastmcp import Context
from fastmcp.exceptions import ToolError
from core.app import mcp
def calculate_confidence(search_type: str, match_quality: float = 1.0) -> float:
"""Calculate confidence score based on search type and match quality."""
base_confidence = {
"k_number_exact": 1.0,
"pma_exact": 1.0,
"product_code_exact_name": 0.9,
"product_code_fuzzy_name": 0.75,
"fuzzy_name_manufacturer": 0.6,
"fuzzy_name_only": 0.5,
}
return round(base_confidence.get(search_type, 0.5) * match_quality, 2)
def normalize_company_name(name: str) -> str:
"""Normalize company name by removing common suffixes and standardizing format."""
if not name:
return ""
# Convert to lowercase for comparison
normalized = name.lower().strip()
# Remove common company suffixes and variations
suffixes_to_remove = [
' inc.', ' inc', ' incorporated',
' corp.', ' corp', ' corporation',
' ltd.', ' ltd', ' limited',
' llc', ' l.l.c.', ' l.l.c',
' (pty) ltd', ' pty ltd', ' pty. ltd.', ' (pty)', ' pty',
' sa', ' s.a.', ' (sa)',
' gmbh', ' ag',
' co.', ' co', ' company',
' group', ' international',
]
for suffix in suffixes_to_remove:
if normalized.endswith(suffix):
normalized = normalized[:-len(suffix)].strip()
# Remove parentheses and their contents (often contain legal entity types)
import re
normalized = re.sub(r'\([^)]*\)', '', normalized).strip()
# Remove extra whitespace
normalized = ' '.join(normalized.split())
return normalized
def fuzzy_match_score(text1: str, text2: str, is_company_name: bool = False) -> float:
"""Fuzzy matching score (0.0 to 1.0) with optional company name normalization."""
if not text1 or not text2:
return 0.0
# Normalize company names if specified
if is_company_name:
t1 = normalize_company_name(text1)
t2 = normalize_company_name(text2)
else:
t1 = text1.lower().strip()
t2 = text2.lower().strip()
# Exact match
if t1 == t2:
return 1.0
# Substring match
if t1 in t2 or t2 in t1:
return 0.8
# Simple word overlap score
words1 = set(t1.split())
words2 = set(t2.split())
if not words1 or not words2:
return 0.0
overlap = len(words1 & words2)
total = len(words1 | words2)
return round(overlap / total, 2) if total > 0 else 0.0
async def search_510k_by_k_number(k_number: str, ctx: Context) -> Optional[dict]:
"""Search FDA 510(k) database by K-number."""
await ctx.info(f"Searching 510(k) database for K-number: {k_number}")
# FDA openFDA API endpoint for 510(k)
# Build URL manually to avoid encoding issues
url = f"https://api.fda.gov/device/510k.json?search=k_number:{k_number}&limit=1"
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(url)
if response.status_code == 404:
return None
if response.status_code != 200:
raise ToolError(f"FDA API error: {response.status_code}")
data = response.json()
if not data.get("results"):
return None
result = data["results"][0]
# Transform to our schema
return {
"device_name": result.get("device_name", ""),
"manufacturer": result.get("applicant", ""),
"fda_identifiers": {
"k_number": result.get("k_number"),
"pma_number": None,
"product_code": result.get("product_code"),
"regulation_number": result.get("regulation_number"),
"device_class": str(result.get("device_class", "")).strip(),
},
"approval_info": {
"status": "cleared",
"clearance_type": "510(k)",
"decision_date": result.get("decision_date", ""),
"decision_code": result.get("decision_code", ""),
"decision_description": result.get("statement", ""),
},
"device_classification": {
"is_otc": "OTC" in result.get("device_name", "").upper() or "OVER THE COUNTER" in result.get("device_name", "").upper(),
"requires_prescription": False, # OTC devices don't require prescription
"advisory_committee": result.get("advisory_committee", ""),
},
"match_quality": {
"confidence": 1.0,
"match_reason": "Exact K-number match",
},
}
except httpx.TimeoutException:
raise ToolError("FDA API request timed out")
except httpx.HTTPError as e:
raise ToolError(f"FDA API request failed: {str(e)}")
except Exception as e:
raise ToolError(f"FDA API request failed: {str(e)}")
async def search_510k_fuzzy(device_name: Optional[str], manufacturer: Optional[str], product_code: Optional[str], ctx: Context) -> list[dict]:
"""Fuzzy search FDA 510(k) database by device name and/or manufacturer."""
await ctx.info(f"Performing fuzzy search: device_name={device_name}, manufacturer={manufacturer}, product_code={product_code}")
# Build search query
search_parts = []
if product_code:
search_parts.append(f"product_code:{product_code}")
if device_name:
# Search by device name (use first 3 significant words)
# Replace spaces with + for FDA API
name_words = device_name.split()[:3]
if name_words:
device_name_query = "+".join(name_words)
search_parts.append(f'device_name:"{device_name_query}"')
if manufacturer:
# Normalize manufacturer name and use core company name
normalized_mfr = normalize_company_name(manufacturer)
mfr_words = normalized_mfr.split()[:2] # Use first 2 words of normalized name
if mfr_words:
# Use wildcard for better matching
# Replace spaces with + for FDA API
mfr_query = "+".join(mfr_words)
search_parts.append(f'applicant:*{mfr_query}*')
if not search_parts:
return []
search_query = "+AND+".join(search_parts)
# Build URL manually to avoid double-encoding of + symbols
# httpx would encode + as %2B, but FDA API needs literal +
url = f"https://api.fda.gov/device/510k.json?search={search_query}&limit=5"
try:
async with httpx.AsyncClient(timeout=15.0) as client:
response = await client.get(url)
if response.status_code == 404:
return []
if response.status_code != 200:
await ctx.warning(f"FDA API returned status {response.status_code}")
return []
data = response.json()
if not data.get("results"):
return []
results = []
for result in data["results"]:
# Calculate match quality with company name normalization
name_score = fuzzy_match_score(device_name or "", result.get("device_name", ""), is_company_name=False) if device_name else 0.5
mfr_score = fuzzy_match_score(manufacturer or "", result.get("applicant", ""), is_company_name=True) if manufacturer else 0.5
match_score = (name_score + mfr_score) / 2
results.append({
"device_name": result.get("device_name", ""),
"manufacturer": result.get("applicant", ""),
"fda_identifiers": {
"k_number": result.get("k_number"),
"pma_number": None,
"product_code": result.get("product_code"),
"regulation_number": result.get("regulation_number"),
"device_class": str(result.get("device_class", "")).strip(),
},
"approval_info": {
"status": "cleared",
"clearance_type": "510(k)",
"decision_date": result.get("decision_date", ""),
"decision_code": result.get("decision_code", ""),
"decision_description": result.get("statement", ""),
},
"device_classification": {
"is_otc": "OTC" in result.get("device_name", "").upper() or "OVER THE COUNTER" in result.get("device_name", "").upper(),
"requires_prescription": False,
"advisory_committee": result.get("advisory_committee", ""),
},
"match_quality": {
"confidence": round(match_score, 2),
"match_reason": f"Fuzzy match (name: {name_score:.2f}, mfr: {mfr_score:.2f})",
},
})
# Sort by match quality
results.sort(key=lambda x: x["match_quality"]["confidence"], reverse=True)
return results
except httpx.TimeoutException:
raise ToolError("FDA API request timed out")
except httpx.HTTPError as e:
raise ToolError(f"FDA API request failed: {str(e)}")
except Exception as e:
raise ToolError(f"FDA API request failed: {str(e)}")
@mcp.tool(
annotations={
"readOnlyHint": True,
"idempotentHint": True,
"openWorldHint": False,
}
)
async def search_fda_device(
device_identifiers: Annotated[Optional[dict], Field(description="FDA identifiers if available (k_number, pma_number, product_code, udi)")] = None,
device_description: Annotated[Optional[dict], Field(description="Device descriptive information for fuzzy search (device_name, manufacturer)")] = None,
search_options: Annotated[Optional[dict], Field(description="Search options (fuzzy_search, max_results)")] = None,
ctx: Context = None,
) -> dict:
"""Unified FDA search tool that accepts any combination of identifiers or descriptive information. Automatically determines optimal search strategy.
Args:
device_identifiers: FDA identifiers (k_number, pma_number, product_code, udi)
device_description: Device info for fuzzy search (device_name, manufacturer)
search_options: Search options (fuzzy_search, max_results)
ctx: FastMCP context for logging and capabilities
Returns:
Structured search results with confidence scores
Raises:
ToolError: If validation fails or FDA API request fails
"""
start_time = time.time()
await ctx.info("Executing search_fda_device")
# Parse inputs
device_identifiers = device_identifiers or {}
device_description = device_description or {}
search_options = search_options or {}
k_number = device_identifiers.get("k_number")
pma_number = device_identifiers.get("pma_number")
product_code = device_identifiers.get("product_code")
udi = device_identifiers.get("udi")
device_name = device_description.get("device_name")
manufacturer = device_description.get("manufacturer")
fuzzy_search = search_options.get("fuzzy_search", True)
max_results = search_options.get("max_results", 5)
# Validate at least some search criteria provided
if not any([k_number, pma_number, product_code, udi, device_name, manufacturer]):
raise ToolError("At least one search parameter must be provided (identifier or device description)")
results = []
warnings = []
search_strategy = "unknown"
search_type = "unknown"
data_source = "FDA 510(k) Database"
# Search strategy logic
if k_number:
# Priority 1: K-number exact match
await ctx.info(f"Using K-number search strategy: {k_number}")
search_strategy = "identifier"
search_type = "k_number_exact"
result = await search_510k_by_k_number(k_number, ctx)
if result:
results = [result]
else:
warnings.append(f"No FDA device found with K-number: {k_number}")
elif pma_number:
# Priority 2: PMA number (not implemented yet)
await ctx.warning("PMA search not yet implemented")
warnings.append("PMA search not yet implemented - please use K-number or device name")
search_strategy = "identifier"
search_type = "pma_exact"
elif product_code and device_name and fuzzy_search:
# Priority 3: Product code + fuzzy name
await ctx.info(f"Using product code + fuzzy name strategy")
search_strategy = "hybrid"
search_type = "product_code_fuzzy_name"
results = await search_510k_fuzzy(device_name, manufacturer, product_code, ctx)
if not results:
warnings.append("No devices found matching product code and device name")
elif (device_name or manufacturer) and fuzzy_search:
# Priority 4: Fuzzy name/manufacturer search
await ctx.info("Using fuzzy search strategy")
search_strategy = "fuzzy"
search_type = "fuzzy_name_manufacturer" if (device_name and manufacturer) else "fuzzy_name_only"
results = await search_510k_fuzzy(device_name, manufacturer, product_code, ctx)
if not results:
warnings.append("No devices found matching device name or manufacturer")
elif len(results) > 1:
warnings.append(f"Fuzzy search returned {len(results)} candidates - verify device name matches")
else:
raise ToolError("Insufficient search parameters or fuzzy_search disabled")
# Limit results
if len(results) > max_results:
results = results[:max_results]
warnings.append(f"Results limited to top {max_results} matches")
# Calculate overall confidence
if results:
confidence = calculate_confidence(search_type, results[0]["match_quality"]["confidence"])
else:
confidence = 0.0
execution_time = round((time.time() - start_time) * 1000)
return {
"success": True,
"data": {
"search_strategy": search_strategy,
"results": results,
"result_count": len(results),
},
"warnings": warnings,
"confidence": confidence,
"metadata": {
"execution_time_ms": execution_time,
"data_source": data_source,
"search_type": search_type,
},
}