Skip to main content
Glama

PubMed Enhanced Search

by leescot
pubmed_enhanced_mcp_server.py17.5 kB
import logging import requests import time import re import xml.etree.ElementTree as ET from mcp.server.fastmcp import FastMCP from typing import Dict, Any, List, Optional, Tuple # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s') logger = logging.getLogger(__name__) mcp = FastMCP("PubmedEnhanced") BASE_URL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils" def make_request_with_retry(url, params, max_retries=3, wait_time=1.0): """發送帶有重試機制的請求""" for i in range(max_retries): try: response = requests.get(url, params=params) response.raise_for_status() return response except requests.exceptions.RequestException as e: logger.error(f"Request failed: {str(e)}") if i < max_retries - 1: # Don't sleep on the last retry time.sleep(wait_time) wait_time *= 2 else: raise @mcp.tool() async def search_pubmed(keywords: List[str] = [], journal: Optional[str] = None, num_results: int = 10, sort_by: str = "relevance") -> Dict[str, Any]: """ Search the PubMed database using specified keywords and optional journal name. This function allows users to search the PubMed database by providing keywords and an optional journal name. It returns a specified number of results in a formatted dictionary. Parameters: - keywords (List[str]): Keywords to search for in PubMed without field restrictions. - journal (Optional[str]): Journal name to limit the search to a specific journal. - num_results (int): Maximum number of results to return. Default is 10. - sort_by (str): Sort order for results. Options: "relevance" (default), "date_desc" (newest first), "date_asc" (oldest first). Returns: - Dict[str, Any]: A dictionary containing the success status, a list of results with PubMed IDs, links, abstracts, and the total number of results found. """ try: query_parts = [] if keywords: keyword_query = " OR ".join([keyword for keyword in keywords]) query_parts.append(f"({keyword_query})") if journal: query_parts.append(f"{journal}[Journal]") query = " AND ".join(query_parts) if query_parts else "" if not query: return { "success": False, "error": "No search parameters provided. Please specify keywords or journal.", "results": [] } logger.info(f"Search query: {query}") # Map sort_by parameter to PubMed sort parameter sort_param = "" if sort_by == "date_desc": sort_param = "pub date" elif sort_by == "date_asc": sort_param = "pub date" # We'll reverse the results later search_url = f"{BASE_URL}/esearch.fcgi" search_params = { "db": "pubmed", "term": query, "retmax": num_results, "retmode": "json" } if sort_param: search_params["sort"] = sort_param search_response = make_request_with_retry(search_url, search_params) search_data = search_response.json() pmids = search_data.get("esearchresult", {}).get("idlist", []) if sort_by == "date_asc" and sort_param: pmids.reverse() formatted_results = await format_paper_details(pmids) return { "success": True, "results": formatted_results, "total_results": int(search_data.get("esearchresult", {}).get("count", "0")) } except Exception as e: logger.error(f"Error in search_pubmed: {str(e)}") return { "success": False, "error": str(e), "results": [] } @mcp.tool() async def get_mesh_terms(search_word: str) -> Dict[str, Any]: """ Get MeSH (Medical Subject Headings) terms related to a search word. This function queries the PubMed MeSH database to find relevant medical terminology that matches the provided search term. Useful for finding standardized medical terms. Parameters: - search_word (str): The word or phrase to search for in the MeSH database. Returns: - Dict[str, Any]: A dictionary containing success status and a list of MeSH terms. """ try: # 步驟 1:查詢 MeSH IDs search_url = f"{BASE_URL}/esearch.fcgi" search_params = { "db": "mesh", "term": search_word, "retmode": "xml" } search_response = make_request_with_retry(search_url, search_params) try: tree = ET.fromstring(search_response.text) mesh_ids = [id_elem.text for id_elem in tree.findall(".//Id")] except ET.ParseError as e: logger.error(f"XML Parse Error in search response: {str(e)}") raise if not mesh_ids: logger.info(f"No MeSH IDs found for term: {search_word}") return { "success": True, "mesh_terms": [] } # 步驟 2:獲取描述符 fetch_url = f"{BASE_URL}/efetch.fcgi" fetch_params = { "db": "mesh", "id": ",".join(mesh_ids), "retmode": "text" } fetch_response = make_request_with_retry(fetch_url, fetch_params) mesh_terms = parse_mesh_text_response(fetch_response.text) logger.debug(f"Parsed MeSH terms: {mesh_terms}") return { "success": True, "mesh_terms": mesh_terms } except Exception as e: logger.error(f"Error in get_mesh_terms: {str(e)}") return { "success": False, "error": str(e), "mesh_terms": [] } @mcp.tool() async def get_pubmed_count(search_terms: List[str]) -> Dict[str, Any]: """ Get the number of PubMed results for multiple search terms. This function queries PubMed and returns the count of results for each provided search term. Useful for comparing the prevalence of different medical terms or concepts in the literature. Parameters: - search_terms (List[str]): List of search terms to query in PubMed. Returns: - Dict[str, Any]: A dictionary containing success status and counts for each search term. """ try: if not search_terms: return { "success": False, "error": "No search terms provided", "counts": {} } base_url = f"{BASE_URL}/esearch.fcgi" counts = {} for term in search_terms: params = { "db": "pubmed", "term": term, "retmode": "xml" } response = make_request_with_retry(base_url, params) counts[term] = extract_count_from_xml(response.text) return { "success": True, "counts": counts } except Exception as e: logger.error(f"Error in get_pubmed_count: {str(e)}") return { "success": False, "error": str(e), "counts": {} } @mcp.tool() async def format_paper_details(pubmed_ids: List[str]) -> List[Dict[str, Any]]: """ Fetch and format details of multiple PubMed articles. This function retrieves details for a list of PubMed IDs and formats them into a list of dictionaries containing article information. Parameters: - pubmed_ids (List[str]): A list of PubMed IDs to fetch details for. Returns: - List[Dict[str, Any]]: A list of dictionaries, each containing details of a PubMed article. """ try: if not pubmed_ids: return [] fetch_url = f"{BASE_URL}/efetch.fcgi" fetch_params = { "db": "pubmed", "id": ",".join(pubmed_ids), "retmode": "xml" } fetch_response = make_request_with_retry(fetch_url, fetch_params) return parse_article_details(fetch_response.content) except Exception as e: logger.error(f"Error in format_paper_details: {str(e)}") return [] def parse_article_details(xml_content) -> List[Dict[str, Any]]: """Parse XML content to extract article details""" root = ET.fromstring(xml_content) articles = root.findall(".//PubmedArticle") results = [] for article in articles: title = article.findtext(".//ArticleTitle", default="N/A") # Extract abstract with sections if available abstract_sections = article.findall(".//Abstract/AbstractText") if abstract_sections: abstract_parts = [] for section in abstract_sections: label = section.get("Label", "") text = section.text or "" if label: abstract_parts.append(f"{label}: {text}") else: abstract_parts.append(text) abstract = " ".join(abstract_parts) else: abstract = article.findtext(".//Abstract/AbstractText", default="N/A") journal = article.findtext(".//Journal/Title", default="N/A") volume = article.findtext(".//Journal/JournalIssue/Volume", default="N/A") issue = article.findtext(".//Journal/JournalIssue/Issue", default="N/A") pages = article.findtext(".//Pagination/MedlinePgn", default="N/A") # Extract DOI if available doi_elem = article.find(".//ELocationID[@EIdType='doi']") doi = doi_elem.text if doi_elem is not None else "N/A" # Extract publication date elements year = article.findtext(".//PubDate/Year", default="") month = article.findtext(".//PubDate/Month", default="") day = article.findtext(".//PubDate/Day", default="") # Format publication date pubdate_parts = [part for part in [year, month, day] if part] pubdate = "-".join(pubdate_parts) if pubdate_parts else "N/A" # Extract authors authors = [] for author in article.findall(".//Author"): lastname = author.findtext("LastName", default="") forename = author.findtext("ForeName", default="") initials = author.findtext("Initials", default="") # Create author name based on available information if lastname and forename: authors.append(f"{lastname} {forename}") elif lastname and initials: authors.append(f"{lastname} {initials}") elif lastname: authors.append(lastname) # Extract keywords/MeSH terms keywords = [] for keyword in article.findall(".//MeshHeading/DescriptorName"): if keyword.text: keywords.append(keyword.text) pmid = article.findtext(".//PMID", default="N/A") results.append({ "pubmed_id": pmid, "link": f"https://pubmed.ncbi.nlm.nih.gov/{pmid}/", "title": title, "authors": authors, "source": journal, "volume": volume, "issue": issue, "pages": pages, "doi": doi, "pubdate": pubdate, "abstract": abstract, "keywords": keywords[:10] if keywords else [] # Limit to 10 keywords }) return results def parse_mesh_text_response(text): """Parse the text response from MeSH API to extract terms""" entries = [] current_entry = "" pattern = r'^\d+: (.+?)(?=\n|$)' for line in text.split('\n'): if re.match(r'^\d+:', line): if current_entry: match = re.search(pattern, current_entry) if match: entries.append(match.group(1).strip()) current_entry = line else: current_entry += "\n" + line if current_entry: match = re.search(pattern, current_entry) if match: entries.append(match.group(1).strip()) return entries def extract_count_from_xml(xml_text): """Extract count value from XML response""" tree = ET.fromstring(xml_text) count_element = tree.find("Count") if count_element is not None: return int(count_element.text) else: raise ValueError("Count element not found in the XML response") @mcp.tool() async def pico_search(p_terms: List[str] = [], i_terms: List[str] = [], c_terms: List[str] = [], o_terms: List[str] = []) -> Dict[str, Any]: """ Perform PICO (Population, Intervention, Comparison, Outcome) based PubMed search with synonyms. This function takes lists of terms for each PICO element, combines them with OR within each element, and then performs various AND combinations between elements. Returns search queries and result counts. Parameters: - p_terms (List[str]): Population terms/synonyms (at least 2 recommended) - i_terms (List[str]): Intervention terms/synonyms (at least 2 recommended) - c_terms (List[str]): Comparison terms/synonyms (optional, at least 2 recommended if provided) - o_terms (List[str]): Outcome terms/synonyms (optional, at least 2 recommended if provided) Returns: - Dict[str, Any]: A dictionary containing individual element searches and combination searches with queries and result counts """ try: if len(p_terms) < 1 or len(i_terms) < 1: return { "success": False, "error": "At least P (Population) and I (Intervention) terms are required with multiple synonyms recommended.", "results": {} } results = {} # Function to construct element query and get count async def process_element(element_name: str, terms: List[str]) -> Tuple[str, int]: if not terms: return "", 0 element_query = " OR ".join([f"({term})" for term in terms]) full_query = f"({element_query})" # Get count using the existing function count_result = await get_pubmed_count([full_query]) count = count_result.get("counts", {}).get(full_query, 0) return full_query, count # Process each PICO element individually p_query, p_count = await process_element("Population", p_terms) i_query, i_count = await process_element("Intervention", i_terms) c_query, c_count = await process_element("Comparison", c_terms) o_query, o_count = await process_element("Outcome", o_terms) # Store individual element results results["individual"] = { "P": {"query": p_query, "count": p_count}, "I": {"query": i_query, "count": i_count} } if c_terms: results["individual"]["C"] = {"query": c_query, "count": c_count} if o_terms: results["individual"]["O"] = {"query": o_query, "count": o_count} # Perform combinations combinations = {} # P AND I pi_query = f"{p_query} AND {i_query}" pi_count_result = await get_pubmed_count([pi_query]) combinations["P_AND_I"] = { "query": pi_query, "count": pi_count_result.get("counts", {}).get(pi_query, 0) } # P AND I AND C (if C exists) if c_terms: pic_query = f"{p_query} AND {i_query} AND {c_query}" pic_count_result = await get_pubmed_count([pic_query]) combinations["P_AND_I_AND_C"] = { "query": pic_query, "count": pic_count_result.get("counts", {}).get(pic_query, 0) } # P AND I AND O (if O exists) if o_terms: pio_query = f"{p_query} AND {i_query} AND {o_query}" pio_count_result = await get_pubmed_count([pio_query]) combinations["P_AND_I_AND_O"] = { "query": pio_query, "count": pio_count_result.get("counts", {}).get(pio_query, 0) } # P AND I AND C AND O (if both C and O exist) if c_terms and o_terms: pico_query = f"{p_query} AND {i_query} AND {c_query} AND {o_query}" pico_count_result = await get_pubmed_count([pico_query]) combinations["P_AND_I_AND_C_AND_O"] = { "query": pico_query, "count": pico_count_result.get("counts", {}).get(pico_query, 0) } results["combinations"] = combinations return { "success": True, "results": results } except Exception as e: logger.error(f"Error in pico_search: {str(e)}") return { "success": False, "error": str(e), "results": {} } if __name__ == "__main__": mcp.run()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/leescot/pubmed-mcp-smithery'

If you have feedback or need assistance with the MCP directory API, please join our Discord server