NIH RePORTER MCP

by jbdamask
Verified
# server.py from typing import Any, List, Dict, Optional import httpx import os import logging import json from dotenv import load_dotenv # Import MCP libraries try: from mcp.server.fastmcp import FastMCP except ImportError as e: logging.error(f"Failed to import MCP libraries: {e}") raise # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(os.path.join(os.path.dirname(os.path.abspath(__file__)), "mcp-nih-reporter.log")), logging.StreamHandler() ] ) logger = logging.getLogger('nih-reporter-mcp') logger.info("Starting NIH RePORTER MCP server") # Load environment variables from .env file load_dotenv() # Configuration API_NAME = "NIH RePORTER" API_BASE = "https://api.reporter.nih.gov/v2" # Initialize FastMCP server mcp = FastMCP(API_NAME) class NIHReporterClient: """Client for interacting with the NIH RePORTER API""" def __init__(self): self.headers = { "Content-Type": "application/json", } async def get_projects(self, criteria: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """Get projects from NIH RePORTER API""" logger.info(f"Fetching projects from NIH RePORTER with criteria: {criteria}") async with httpx.AsyncClient() as client: payload = { "criteria": criteria or {}, "limit": criteria.get("limit", 50), "offset": criteria.get("offset", 0), "sort_field": criteria.get("sort_field", "project_start_date"), "sort_order": criteria.get("sort_order", "desc") } logger.debug(f"Sending payload to NIH API: {json.dumps(payload, indent=2)}") try: response = await client.post( f"{API_BASE}/projects/search", headers=self.headers, json=payload ) response.raise_for_status() response_data = response.json() logger.debug(f"Received response: {json.dumps(response_data, indent=2)}") return response_data except httpx.HTTPStatusError as e: logger.error(f"HTTP error occurred: {e.response.status_code} - {e.response.text}") raise except json.JSONDecodeError as e: logger.error(f"Failed to parse API response: {e}") logger.error(f"Raw response: {response.text}") raise except Exception as e: logger.error(f"Unexpected error during API call: {str(e)}") raise def format_project_results(self, results: Dict[str, Any], include_publications: bool = False) -> str: """Format project results into markdown string with optional publication links""" logger.debug(f"Formatting results: {json.dumps(results, indent=2)}") if not results.get("results"): logger.info("No results found in API response") return "No projects found." try: formatted_results = [] for project in results["results"]: # Format amount safely award_amount = project.get('award_amount') amount_str = f"${award_amount:,.2f}" if award_amount is not None else "N/A" # Get organization details safely org = project.get('organization', {}) org_parts = [] if org.get('org_name'): org_parts.append(org.get('org_name')) if org.get('org_city'): org_parts.append(org.get('org_city')) if org.get('org_state'): org_parts.append(org.get('org_state')) org_details = ", ".join(org_parts) if org_parts else "N/A" # Format dates safely start_date = project.get('project_start_date') or 'N/A' end_date = project.get('project_end_date') or 'N/A' # Get study section info safely study_section = project.get('study_section', {}) study_name = study_section.get('study_section_name', 'N/A') srg_code = study_section.get('srg_code', '') study_info = f"{study_name} ({srg_code})" if srg_code else study_name # Handle PIs safely pis = project.get('principal_investigators', []) pi_names = [pi.get('full_name', 'N/A') for pi in pis if pi.get('full_name')] pi_str = ", ".join(pi_names) if pi_names else "N/A" # Build project info with markdown formatting project_info = [ f"### {project.get('project_title', 'Untitled Project')}", "", f"**Project Number:** `{project.get('project_num', 'N/A')}`", f"**Principal Investigator(s):** {pi_str}", f"**Organization:** {org_details}", f"**Fiscal Year:** {project.get('fiscal_year', 'N/A')}", f"**Award Amount:** {amount_str}", f"**Project Period:** {start_date} to {end_date}", f"**Study Section:** {study_info}" ] # Add funding mechanism if available mechanism = project.get('funding_mechanism') if mechanism: project_info.append(f"**Funding Mechanism:** {mechanism}") # Add IC Code if available ic_code = project.get('agency_ic_admin') if ic_code: project_info.append(f"**Institute/Center:** {ic_code}") # Add RCDC terms if available rcdc_terms = project.get('rcdc_terms', []) if rcdc_terms: terms_str = ", ".join(f"`{term}`" for term in rcdc_terms if term) if terms_str: project_info.append(f"**RCDC Terms:** {terms_str}") # Add abstract if it exists abstract = project.get('abstract_text') if abstract: project_info.extend([ "", "#### Abstract", abstract ]) # Add PHR if it exists phr = project.get('phr_text') if phr: project_info.extend([ "", "#### Public Health Relevance", phr ]) # Add publications section if available if include_publications and project.get('related_publications'): project_info.extend([ "", "#### Related Publications" ]) for pub in project.get('related_publications', []): pmid = pub.get('pmid') title = pub.get('title', 'Untitled Publication') pub_info = [""] # Always show the PMID if we have it if pmid: pub_info.append(f"##### {title} (PMID: [{pmid}](https://pubmed.ncbi.nlm.nih.gov/{pmid}/))") else: pub_info.append(f"##### {title}") # Add other details if we have them if pub.get('authors'): author_str = ", ".join(pub['authors']) pub_info.append(f"**Authors:** {author_str}") if pub.get('journal_title'): pub_info.append(f"**Journal:** {pub['journal_title']}") if pub.get('publication_year'): pub_info.append(f"**Year:** {pub['publication_year']}") if pub.get('doi'): pub_info.append(f"**DOI:** [{pub['doi']}](https://doi.org/{pub['doi']})") project_info.extend(pub_info) project_info.extend(["", "---", ""]) formatted_results.append("\n".join(filter(None, project_info))) total = f"# NIH RePORTER Search Results\n\n**Total matching projects:** {results.get('meta', {}).get('total', 0)}" return f"{total}\n\n" + "\n".join(formatted_results) except Exception as e: logger.error(f"Error formatting results: {str(e)}") logger.error(f"Results that caused error: {json.dumps(results, indent=2)}") raise async def get_publications(self, criteria: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """Get publications from NIH RePORTER API""" logger.info(f"Fetching publications from NIH RePORTER with criteria: {criteria}") async with httpx.AsyncClient() as client: # Construct the payload according to API specification payload = { "criteria": criteria.get("criteria", {}), "limit": criteria.get("limit", 50), "offset": criteria.get("offset", 0), "sort_field": criteria.get("sort_field", "core_project_nums"), "sort_order": criteria.get("sort_order", "desc") } # Add publication years if specified if "publication_years" in criteria.get("criteria", {}): payload["criteria"]["publication_years"] = criteria["criteria"]["publication_years"] logger.debug(f"Sending payload to NIH Publications API: {json.dumps(payload, indent=2)}") try: response = await client.post( f"{API_BASE}/publications/search", headers=self.headers, json=payload ) response.raise_for_status() response_data = response.json() # If we got PMIDs, fetch the full publication details from PubMed if response_data.get("results"): pmids = [str(result.get("pmid")) for result in response_data["results"] if result.get("pmid")] if pmids: async with httpx.AsyncClient() as pubmed_client: # Use E-utilities to get full publication details pubmed_url = f"https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esummary.fcgi?db=pubmed&id={','.join(pmids)}&retmode=json" pubmed_response = await pubmed_client.get(pubmed_url) pubmed_data = pubmed_response.json() # Update our results with PubMed data for result in response_data["results"]: if result.get("pmid"): pmid = str(result["pmid"]) if pmid in pubmed_data.get("result", {}): pub_details = pubmed_data["result"][pmid] result.update({ "title": pub_details.get("title", ""), "authors": [author.get("name", "") for author in pub_details.get("authors", [])], "journal_title": pub_details.get("fulljournalname", ""), "publication_year": pub_details.get("pubdate", "").split()[0] if pub_details.get("pubdate") else None }) logger.debug(f"Received response: {json.dumps(response_data, indent=2)}") return response_data except httpx.HTTPStatusError as e: logger.error(f"HTTP error occurred: {e.response.status_code} - {e.response.text}") raise except json.JSONDecodeError as e: logger.error(f"Failed to parse API response: {e}") logger.error(f"Raw response: {response.text}") raise except Exception as e: logger.error(f"Unexpected error during API call: {str(e)}") raise def format_publication_results(self, results: Dict[str, Any], include_projects: bool = False) -> str: """Format publication results into markdown string with optional project links""" logger.debug(f"Formatting publication results: {json.dumps(results, indent=2)}") if not results.get("results"): logger.info("No publications found in API response") return "No publications found." try: formatted_results = [] for pub in results["results"]: # Format authors safely authors = pub.get('authors', []) author_str = ", ".join(authors) if authors else "N/A" pub_info = [ f"### {pub.get('title', 'Untitled Publication')}", "", f"**Authors:** {author_str}", f"**PMID:** `{pub.get('pmid', 'N/A')}`", f"**Core Project Number:** `{pub.get('core_project_num', 'N/A')}`" ] # Add publication year if available if pub.get('publication_year'): pub_info.append(f"**Publication Year:** {pub['publication_year']}") # Add journal info if available if pub.get('journal_title'): pub_info.append(f"**Journal:** {pub['journal_title']}") # Add DOI if available if pub.get('doi'): pub_info.append(f"**DOI:** [{pub['doi']}](https://doi.org/{pub['doi']})") # Add project links if available if pub.get('core_project_num'): pub_info.extend([ "", "#### Related NIH Projects", f"- Core Project: `{pub['core_project_num']}`" ]) pub_info.extend(["", "---", ""]) formatted_results.append("\n".join(filter(None, pub_info))) total = f"# NIH RePORTER Publication Results\n\n**Total matching publications:** {results.get('meta', {}).get('total', 0)}" return f"{total}\n\n" + "\n".join(formatted_results) except Exception as e: logger.error(f"Error formatting publication results: {str(e)}") logger.error(f"Results that caused error: {json.dumps(results, indent=2)}") raise # Initialize API client api_client = NIHReporterClient() @mcp.tool() async def search_projects( fiscal_years: Optional[str] = None, pi_names: Optional[str] = None, organization: Optional[str] = None, org_state: Optional[str] = None, org_city: Optional[str] = None, org_type: Optional[str] = None, org_department: Optional[str] = None, min_amount: Optional[float] = None, max_amount: Optional[float] = None, covid_response: Optional[str] = None, funding_mechanism: Optional[str] = None, ic_code: Optional[str] = None, rcdc_terms: Optional[str] = None, start_date: Optional[str] = None, end_date: Optional[str] = None, newly_added_only: Optional[bool] = False, include_abstracts: Optional[bool] = True, limit: Optional[int] = 10 ) -> str: """ Search for NIH funded projects with advanced criteria Args: fiscal_years: Comma-separated list of fiscal years (e.g., "2022,2023") pi_names: Comma-separated list of PI names (will match any of the names) organization: Name of the organization org_state: Two-letter state code (e.g., "CA", "NY") org_city: City name org_type: Organization type org_department: Department name min_amount: Minimum award amount max_amount: Maximum award amount covid_response: COVID-19 response category (options: "Reg-CV", "CV", "C3", "C4", "C5", "C6") funding_mechanism: Type of funding (e.g., "R01", "F32", "K99") ic_code: Institute or Center code (e.g., "NCI", "NIMH") rcdc_terms: Comma-separated RCDC terms for research categorization start_date: Project start date (YYYY-MM-DD) end_date: Project end date (YYYY-MM-DD) newly_added_only: Only show recently added projects include_abstracts: Include project abstracts in results limit: Maximum number of results to return (default: 10, max: 50) """ try: logger.info(f"Advanced search request received with parameters: {locals()}") criteria = {} # Basic criteria if fiscal_years: try: # Handle escaped quotes and clean the input string years_str = fiscal_years.replace('\\"', '').replace('"', '').strip() years = [int(year.strip()) for year in years_str.split(",") if year.strip()] if not years: raise ValueError("No valid years found after parsing") criteria["fiscal_years"] = years except ValueError as e: logger.error(f"Invalid fiscal years format: {fiscal_years}, error: {str(e)}") return f"Error: Invalid fiscal years format. Please provide comma-separated years (e.g., 2020,2021)" # Handle multiple PI names if pi_names: try: # Handle escaped quotes and clean the input string names_str = pi_names.replace('\\"', '').replace('"', '').strip() names = [name.strip() for name in names_str.split(",") if name.strip()] if not names: raise ValueError("No valid names found after parsing") criteria["pi_names"] = [{"any_name": name} for name in names] except Exception as e: logger.error(f"Invalid PI names format: {pi_names}, error: {str(e)}") return f"Error: Invalid PI names format. Please provide comma-separated names" # Organization criteria org_criteria = {} if organization: org_criteria["org_names"] = [organization.strip().strip('"').strip("'")] if org_state: org_criteria["org_states"] = [org_state.strip().strip('"').strip("'").upper()] if org_city: org_criteria["org_cities"] = [org_city.strip().strip('"').strip("'")] if org_type: org_criteria["org_types"] = [org_type.strip().strip('"').strip("'")] if org_department: org_criteria["org_depts"] = [org_department.strip().strip('"').strip("'")] if org_criteria: criteria.update(org_criteria) # Award amount range if min_amount is not None or max_amount is not None: criteria["award_amount_range"] = { "min_amount": min_amount if min_amount is not None else 0, "max_amount": max_amount if max_amount is not None else float('inf') } # COVID response if covid_response: criteria["covid_response"] = [covid_response] # Funding mechanism if funding_mechanism: criteria["funding_mechanism"] = funding_mechanism.strip().strip('"').strip("'") # Institute/Center code if ic_code: criteria["agency_ic_admin"] = ic_code.strip().strip('"').strip("'").upper() # RCDC terms if rcdc_terms: try: terms_str = rcdc_terms.strip().strip('"').strip("'") terms = [term.strip() for term in terms_str.split(",")] criteria["rcdc_terms"] = terms except Exception as e: logger.error(f"Invalid RCDC terms format: {rcdc_terms}") return f"Error: Invalid RCDC terms format. Please provide comma-separated terms without quotes" # Date criteria if start_date or end_date: criteria["date_range"] = { "start_date": start_date.strip().strip('"').strip("'") if start_date else None, "end_date": end_date.strip().strip('"').strip("'") if end_date else None } # Other filters if newly_added_only: criteria["newly_added_projects_only"] = True # Include fields include_fields = [ "project_num", "project_title", "principal_investigators", "organization", "fiscal_year", "award_amount", "project_start_date", "project_end_date", "funding_mechanism", "agency_ic_admin", "rcdc_terms" ] if include_abstracts: include_fields.extend(["abstract_text", "phr_text"]) # Ensure limit is within bounds try: criteria["limit"] = min(max(1, int(limit)), 50) except (ValueError, TypeError): logger.error(f"Invalid limit value: {limit}") return f"Error: Invalid limit value. Please provide a number between 1 and 50" logger.info(f"Constructed search criteria: {json.dumps(criteria, indent=2)}") results = await api_client.get_projects(criteria) return api_client.format_project_results(results) except Exception as e: logger.error(f"Project search failed: {str(e)}", exc_info=True) return f"Project search failed: {str(e)}\nPlease check the logs for more details." @mcp.tool() async def test_connection() -> str: """Test the connection to the NIH RePORTER API""" try: # Try to fetch a single project as a test result = await api_client.get_projects({"limit": 1}) return "Successfully connected to NIH RePORTER API" except Exception as e: logger.error(f"Connection test failed: {e}") return f"Connection test failed: {str(e)}" @mcp.tool() async def search_publications( pmids: Optional[str] = None, core_project_nums: Optional[str] = None, limit: Optional[int] = 10 ) -> str: """ Search for publications linked to NIH projects Args: pmids: Comma-separated list of PubMed IDs core_project_nums: Comma-separated list of NIH core project numbers limit: Maximum number of results to return (default: 10, max: 50) """ try: logger.info(f"Publication search request received with parameters: {locals()}") criteria = {} # Handle PMIDs if pmids: pmid_list = [pmid.strip() for pmid in pmids.split(",")] criteria["pmids"] = pmid_list # Handle core project numbers if core_project_nums: logger.info(f"Processing core_project_nums input: {core_project_nums}") # Clean the input string of any quotes clean_input = core_project_nums.strip().strip('"').strip("'") logger.info(f"Cleaned input: {clean_input}") proj_list = [num.strip() for num in clean_input.split(",")] logger.info(f"Created project list: {proj_list}") criteria["core_project_nums"] = proj_list # Ensure limit is within bounds criteria["limit"] = min(max(1, limit), 50) logger.info(f"Constructed publication search criteria: {json.dumps(criteria, indent=2)}") results = await api_client.get_publications({"criteria": criteria}) return api_client.format_publication_results(results) except Exception as e: logger.error(f"Publication search failed: {str(e)}", exc_info=True) return f"Publication search failed: {str(e)}\nPlease check the logs for more details." @mcp.tool() async def search_combined( # Project search parameters fiscal_years: Optional[str] = None, pi_names: Optional[str] = None, organization: Optional[str] = None, org_state: Optional[str] = None, funding_mechanism: Optional[str] = None, ic_code: Optional[str] = None, min_amount: Optional[float] = None, max_amount: Optional[float] = None, covid_response: Optional[str] = None, # Publication parameters include_publications: Optional[bool] = True, publication_years: Optional[str] = None, # General parameters limit: Optional[int] = 10 ) -> str: """ Search for NIH projects and their related publications in a single query Args: fiscal_years: Comma-separated list of fiscal years (e.g., "2022,2023") pi_names: Comma-separated list of PI names organization: Name of the organization org_state: Two-letter state code (e.g., "CA", "NY") funding_mechanism: Type of funding (e.g., "R01", "F32", "K99") ic_code: Institute or Center code (e.g., "NCI", "NIMH") min_amount: Minimum award amount max_amount: Maximum award amount covid_response: COVID-19 response category include_publications: Whether to include related publications publication_years: Comma-separated list of publication years limit: Maximum number of results to return (default: 10, max: 50) """ try: logger.info(f"Combined search request received with parameters: {locals()}") # First, search for projects project_criteria = {} if fiscal_years: try: # Handle escaped quotes and clean the input string years_str = fiscal_years.replace('\\"', '').replace('"', '').strip() years = [int(year.strip()) for year in years_str.split(",") if year.strip()] if not years: raise ValueError("No valid years found after parsing") project_criteria["fiscal_years"] = years except ValueError as e: logger.error(f"Invalid fiscal years format: {fiscal_years}, error: {str(e)}") return f"Error: Invalid fiscal years format. Please provide comma-separated years (e.g., 2020,2021)" if pi_names: try: # Handle escaped quotes and clean the input string names_str = pi_names.replace('\\"', '').replace('"', '').strip() names = [name.strip() for name in names_str.split(",") if name.strip()] if not names: raise ValueError("No valid names found after parsing") project_criteria["pi_names"] = [{"any_name": name} for name in names] except Exception as e: logger.error(f"Invalid PI names format: {pi_names}, error: {str(e)}") return f"Error: Invalid PI names format. Please provide comma-separated names" if organization: project_criteria["org_names"] = [organization] if org_state: project_criteria["org_states"] = [org_state.upper()] if funding_mechanism: project_criteria["funding_mechanism"] = funding_mechanism.strip().strip('"').strip("'") if ic_code: project_criteria["agency_ic_admin"] = ic_code.strip().strip('"').strip("'").upper() if min_amount is not None or max_amount is not None: project_criteria["award_amount_range"] = { "min_amount": min_amount if min_amount is not None else 0, "max_amount": max_amount if max_amount is not None else float('inf') } if covid_response: project_criteria["covid_response"] = [covid_response] project_criteria["limit"] = min(max(1, limit), 50) logger.info(f"Searching for projects with criteria: {json.dumps(project_criteria, indent=2)}") project_results = await api_client.get_projects(project_criteria) # If we want publications, get them for each project if include_publications: project_nums = [] for project in project_results.get("results", []): if project.get("project_num"): project_nums.append(project["project_num"]) if project_nums: logger.info(f"Found project numbers for publication search: {project_nums}") pub_criteria = { "criteria": { "core_project_nums": project_nums }, "limit": 100 # Get more publications since they're related } # Only add publication years if explicitly specified by the user if publication_years: try: years_str = publication_years.strip().strip('"').strip("'") years = [int(year.strip()) for year in years_str.split(",")] pub_criteria["criteria"]["publication_years"] = years logger.info(f"Filtering publications by years: {years}") except ValueError as e: logger.error(f"Invalid publication years format: {publication_years}") return f"Error: Invalid publication years format. Please provide comma-separated years without quotes (e.g., 2020,2021)" logger.info(f"Searching for publications with criteria: {json.dumps(pub_criteria, indent=2)}") pub_results = await api_client.get_publications(pub_criteria) # Add publications to each project pub_by_project = {} for pub in pub_results.get("results", []): proj_num = pub.get("core_project_num") if proj_num: if proj_num not in pub_by_project: pub_by_project[proj_num] = [] pub_by_project[proj_num].append(pub) for project in project_results.get("results", []): proj_num = project.get("project_num") if proj_num in pub_by_project: project["related_publications"] = pub_by_project[proj_num] return api_client.format_project_results(project_results, include_publications=include_publications) except Exception as e: logger.error(f"Combined search failed: {str(e)}", exc_info=True) return f"Combined search failed: {str(e)}\nPlease check the logs for more details." # Run the server when this script is executed directly if __name__ == "__main__": mcp.run(transport='stdio')