Skip to main content
Glama
gqy20

Europe PMC Literature Search MCP Server

similar_articles.py11.6 kB
import logging import re import xml.etree.ElementTree as ET from datetime import datetime from datetime import timedelta from typing import Any import requests # 创建日志记录器 logger = logging.getLogger(__name__) # 月份名称到数字的映射 MONTH_MAP = { "Jan": "01", "Feb": "02", "Mar": "03", "Apr": "04", "May": "05", "Jun": "06", "Jul": "07", "Aug": "08", "Sep": "09", "Oct": "10", "Nov": "11", "Dec": "12", } # NCBI E-utils 配置 NCBI_BASE_URL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/" TOOL_NAME = "europe_pmc_mcp_server" EFETCH_BATCH_SIZE = 100 # 每次批量获取的文章数量 def parse_pubmed_article(article_xml: ET.Element) -> dict[str, Any] | None: """解析PubMed文章XML元素""" if article_xml is None: return None pmid = None try: medline_citation = article_xml.find("./MedlineCitation") pubmed_data = article_xml.find("./PubmedData") if medline_citation is None: return None pmid = medline_citation.findtext("./PMID") article = medline_citation.find("./Article") if article is None or pmid is None: return None # 提取标题 title_element = article.find("./ArticleTitle") title = ( "".join(title_element.itertext()).strip() if title_element is not None else "未找到标题" ) # 提取作者 author_list = [] author_elements = article.findall("./AuthorList/Author") for author in author_elements: last_name = author.findtext("LastName") fore_name = author.findtext("ForeName") collective_name = author.findtext("CollectiveName") if collective_name: author_list.append(collective_name.strip()) elif last_name: name_parts = [] if fore_name: name_parts.append(fore_name.strip()) name_parts.append(last_name.strip()) author_list.append(" ".join(name_parts)) # 提取摘要 abstract_parts = [] abstract_elements = article.findall("./Abstract/AbstractText") if abstract_elements: for part in abstract_elements: label = part.get("Label") text = "".join(part.itertext()).strip() if label and text: abstract_parts.append(f"{label.upper()}: {text}") elif text: abstract_parts.append(text) abstract = "\n".join(abstract_parts) if abstract_parts else None # 提取PMCID pmcid = None pmcid_link = None if pubmed_data is not None: pmc_element = pubmed_data.find("./ArticleIdList/ArticleId[@IdType='pmc']") if pmc_element is not None and pmc_element.text: pmcid_raw = pmc_element.text.strip().upper() if pmcid_raw.startswith("PMC"): pmcid = pmcid_raw pmcid_link = f"https://www.ncbi.nlm.nih.gov/pmc/articles/{pmcid}/" # 提取期刊名称 journal_title_raw = article.findtext("./Journal/Title") journal_name = None if journal_title_raw: journal_name = re.sub(r"\s*\(.*?\)\s*", "", journal_title_raw).strip() if not journal_name: journal_name = journal_title_raw.strip() # 提取发表日期 pub_date_element = article.find("./Journal/JournalIssue/PubDate") publication_date = None if pub_date_element is not None: year = pub_date_element.findtext("Year") if year and year.isdigit(): month = pub_date_element.findtext("Month", "01") day = pub_date_element.findtext("Day", "01") # 处理月份名称 if month in MONTH_MAP: month = MONTH_MAP[month] elif month.isdigit(): month = month.zfill(2) else: month = "01" day = day.zfill(2) if day.isdigit() else "01" publication_date = f"{year}-{month}-{day}" return { "title": title, "authors": author_list if author_list else None, "journal": journal_name, "publication_date": publication_date, "pmid": pmid, "pmid_link": f"https://pubmed.ncbi.nlm.nih.gov/{pmid}/", "pmcid": pmcid, "pmcid_link": pmcid_link, "abstract": abstract, } except Exception as e: logger.error(f"解析文章 PMID {pmid or 'UNKNOWN'} 时出错: {e}") return None def get_similar_articles_by_doi( doi: str, email: str = None, max_results: int = 20 ) -> dict[str, Any]: """根据DOI获取相似文章""" try: # 验证DOI if not doi or not doi.strip(): return { "original_article": None, "similar_articles": [], "total_similar_count": 0, "retrieved_count": 0, "error": "DOI不能为空", } if not email: email = "user@example.com" headers = {"User-Agent": f"{TOOL_NAME}/1.0 ({email})"} # 步骤1:通过DOI获取初始文章的PMID logger.info(f"正在为 DOI {doi} 搜索 PMID") esearch_params = { "db": "pubmed", "term": doi, "retmax": 1, "retmode": "xml", "email": email, "tool": TOOL_NAME, } response = requests.get( f"{NCBI_BASE_URL}esearch.fcgi", params=esearch_params, headers=headers ) response.raise_for_status() esearch_xml = ET.fromstring(response.content) ids = esearch_xml.findall(".//Id") if not ids: return { "original_article": None, "similar_articles": [], "total_similar_count": 0, "message": f"未找到 DOI: {doi} 对应的 PubMed 记录", } initial_pmid = ids[0].text logger.info(f"找到初始文章 PMID: {initial_pmid}") # 步骤2:获取初始文章详情 efetch_params = { "db": "pubmed", "id": initial_pmid, "rettype": "xml", "retmode": "xml", "email": email, "tool": TOOL_NAME, } response = requests.get( f"{NCBI_BASE_URL}efetch.fcgi", params=efetch_params, headers=headers ) response.raise_for_status() efetch_xml = ET.fromstring(response.content) original_article_xml = efetch_xml.find(".//PubmedArticle") original_article = parse_pubmed_article(original_article_xml) if not original_article: return { "original_article": None, "similar_articles": [], "total_similar_count": 0, "error": f"无法解析初始 PMID: {initial_pmid} 的文章信息", } # 步骤3:使用elink查找相关文章 elink_params = { "dbfrom": "pubmed", "db": "pubmed", "id": initial_pmid, "linkname": "pubmed_pubmed", "cmd": "neighbor_history", "email": email, "tool": TOOL_NAME, } response = requests.get(f"{NCBI_BASE_URL}elink.fcgi", params=elink_params, headers=headers) response.raise_for_status() elink_xml = ET.fromstring(response.content) webenv_elink = elink_xml.findtext(".//WebEnv") query_key_elink = elink_xml.findtext(".//LinkSetDbHistory/QueryKey") if not webenv_elink or not query_key_elink: return { "original_article": original_article, "similar_articles": [], "total_similar_count": 0, "message": "找到了原始文章,但未找到相关文章", } # 步骤4:使用日期过滤获取相关文章 today = datetime.now() five_years_ago = today - timedelta(days=5 * 365.25) min_date = five_years_ago.strftime("%Y/%m/%d") max_date = today.strftime("%Y/%m/%d") esearch_params2 = { "db": "pubmed", "query_key": query_key_elink, "WebEnv": webenv_elink, "retmax": str(max_results), "retmode": "xml", "datetype": "pdat", "mindate": min_date, "maxdate": max_date, "email": email, "tool": TOOL_NAME, "usehistory": "y", } response = requests.get( f"{NCBI_BASE_URL}esearch.fcgi", params=esearch_params2, headers=headers ) response.raise_for_status() esearch_xml2 = ET.fromstring(response.content) total_count = int(esearch_xml2.findtext(".//Count", "0")) webenv_filtered = esearch_xml2.findtext(".//WebEnv") query_key_filtered = esearch_xml2.findtext(".//QueryKey") if total_count == 0: return { "original_article": original_article, "similar_articles": [], "total_similar_count": 0, "message": "在最近5年内未找到相关文章", } # 步骤5:批量获取相关文章详情 similar_articles = [] actual_fetch_count = min(total_count, max_results) efetch_params_batch = { "db": "pubmed", "query_key": query_key_filtered, "WebEnv": webenv_filtered, "retstart": "0", "retmax": str(actual_fetch_count), "rettype": "xml", "retmode": "xml", "email": email, "tool": TOOL_NAME, } response = requests.get( f"{NCBI_BASE_URL}efetch.fcgi", params=efetch_params_batch, headers=headers ) response.raise_for_status() efetch_xml_batch = ET.fromstring(response.content) article_elements = efetch_xml_batch.findall(".//PubmedArticle") for article_xml in article_elements: article_details = parse_pubmed_article(article_xml) if article_details: similar_articles.append(article_details) logger.info(f"成功获取了 {len(similar_articles)} 篇相关文章") return { "original_article": original_article, "similar_articles": similar_articles, "total_similar_count": total_count, "retrieved_count": len(similar_articles), "message": f"成功找到并获取了 {len(similar_articles)} 篇相关文章", } except requests.exceptions.RequestException as e: logger.error(f"网络请求错误: {e}") return {"error": f"网络请求错误: {e}"} except ET.ParseError as e: logger.error(f"XML解析错误: {e}") return {"error": f"XML解析错误: {e}"} except Exception as e: logger.error(f"获取相似文章时出错: {e}") return {"error": f"获取相似文章时出错: {e}"}

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/gqy20/article-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server