Skip to main content
Glama
gqy20

Europe PMC Literature Search MCP Server

pubmed_search.py29.2 kB
from typing import Any class PubMedService: """PubMed 关键词搜索服务 (控制在 500 行以内)""" def __init__(self, logger=None): import logging import re self.logger = logger or logging.getLogger(__name__) self.re = re # 保存模块引用,方便内部使用 self.base_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/" self.headers = {"User-Agent": "PubMedSearch/1.0"} self.MONTH_MAP = { "Jan": "01", "Feb": "02", "Mar": "03", "Apr": "04", "May": "05", "Jun": "06", "Jul": "07", "Aug": "08", "Sep": "09", "Oct": "10", "Nov": "11", "Dec": "12", } # ------------------------ 公共辅助方法 ------------------------ # @staticmethod def _validate_email(email: str) -> bool: return bool(email and "@" in email and "." in email.split("@")[-1]) def _format_date_range(self, start_date: str, end_date: str) -> str: """构建 PubMed 日期过滤语句 (PDAT)""" from datetime import datetime fmt_in = ["%Y-%m-%d", "%Y/%m/%d", "%Y%m%d"] def _parse(d): if not d: return None for f in fmt_in: try: return datetime.strptime(d, f) except ValueError: continue return None start_dt, end_dt = _parse(start_date), _parse(end_date) if not (start_dt or end_dt): return "" if start_dt and not end_dt: end_dt = datetime.now() if end_dt and not start_dt: # PubMed 允许 1800 年起查找,这里简单使用 1800-01-01 start_dt = datetime.strptime("1800-01-01", "%Y-%m-%d") if start_dt > end_dt: start_dt, end_dt = end_dt, start_dt return f"({start_dt.strftime('%Y/%m/%d')}[PDAT] : {end_dt.strftime('%Y/%m/%d')}[PDAT])" # ------------------------ 核心解析逻辑 ------------------------ # def _process_article(self, article_xml): if article_xml is None: return None try: medline = article_xml.find("./MedlineCitation") if medline is None: return None pmid = medline.findtext("./PMID") article = medline.find("./Article") if article is None: return None title_elem = article.find("./ArticleTitle") title = "".join(title_elem.itertext()).strip() if title_elem is not None else "无标题" # 作者 authors = [] for author in article.findall("./AuthorList/Author"): last = author.findtext("LastName", "").strip() fore = author.findtext("ForeName", "").strip() coll = author.findtext("CollectiveName") if coll: authors.append(coll.strip()) elif last or fore: authors.append(f"{fore} {last}".strip()) # 期刊 journal_raw = article.findtext("./Journal/Title", "未知期刊") journal = self.re.sub(r"\s*\(.*?\)\s*", "", journal_raw).strip() or journal_raw # 发表日期 pub_date_elem = article.find("./Journal/JournalIssue/PubDate") pub_date = "日期未知" if pub_date_elem is not None: year = pub_date_elem.findtext("Year") month = pub_date_elem.findtext("Month", "01") day = pub_date_elem.findtext("Day", "01") if month in self.MONTH_MAP: month = self.MONTH_MAP[month] month = month.zfill(2) if month.isdigit() else "01" day = day.zfill(2) if day.isdigit() else "01" if year and year.isdigit(): pub_date = f"{year}-{month}-{day}" # 摘要 abs_parts = [ "".join(n.itertext()).strip() for n in article.findall("./Abstract/AbstractText") ] abstract = " ".join([p for p in abs_parts if p]) if abs_parts else "无摘要" # 提取 DOI(从 PubmedData 或 Article 中) doi = None doi_link = None pmc_id = None pmc_link = None pubmed_data = article_xml.find("./PubmedData") if pubmed_data is not None: # 提取 DOI doi_elem = pubmed_data.find("./ArticleIdList/ArticleId[@IdType='doi']") if doi_elem is not None and doi_elem.text: doi = doi_elem.text.strip() doi_link = f"https://doi.org/{doi}" # 提取 PMC ID pmc_elem = pubmed_data.find("./ArticleIdList/ArticleId[@IdType='pmc']") if pmc_elem is not None and pmc_elem.text: pmc_id = pmc_elem.text.strip() if pmc_id.startswith("PMC"): pmc_link = f"https://www.ncbi.nlm.nih.gov/pmc/articles/{pmc_id}/" return { "pmid": pmid or "N/A", "pmid_link": f"https://pubmed.ncbi.nlm.nih.gov/{pmid}/" if pmid else None, "title": title, "authors": authors, "journal_name": journal, "publication_date": pub_date, "abstract": abstract, "doi": doi, "doi_link": doi_link, "pmc_id": pmc_id, "pmc_link": pmc_link, "arxiv_id": None, "arxiv_link": None, "semantic_scholar_id": None, "semantic_scholar_link": None, } except Exception as e: self.logger.warning(f"解析文献失败: {e}") return None # ------------------------ 期刊质量评估 ------------------------ # def _load_journal_cache(self): """加载本地期刊信息缓存""" import json import os try: cache_path = os.path.join(os.path.dirname(__file__), "resource", "journal_info.json") if os.path.exists(cache_path): with open(cache_path, encoding="utf-8") as f: return json.load(f) return {} except Exception as e: self.logger.warning(f"加载期刊缓存失败: {e}") return {} def _save_journal_cache(self, cache_data): """保存期刊信息到本地缓存""" import json import os try: cache_path = os.path.join(os.path.dirname(__file__), "resource", "journal_info.json") os.makedirs(os.path.dirname(cache_path), exist_ok=True) with open(cache_path, "w", encoding="utf-8") as f: json.dump(cache_data, f, ensure_ascii=False, indent=2) except Exception as e: self.logger.warning(f"保存期刊缓存失败: {e}") def _query_easyscholar_api(self, journal_name: str, secret_key: str): """调用 EasyScholar API 获取期刊信息""" import requests try: url = "https://www.easyscholar.cc/open/getPublicationRank" params = {"secretKey": secret_key, "publicationName": journal_name} self.logger.info(f"调用 EasyScholar API 查询期刊: {journal_name}") response = requests.get(url, params=params, timeout=10) response.raise_for_status() data = response.json() if data.get("code") == 200 and data.get("data"): return data["data"] else: self.logger.warning(f"EasyScholar API 返回错误: {data.get('msg', 'Unknown error')}") return None except requests.exceptions.RequestException as e: self.logger.warning(f"EasyScholar API 请求失败: {e}") return None except Exception as e: self.logger.warning(f"EasyScholar API 处理错误: {e}") return None def _extract_quality_metrics(self, rank_data): """从期刊排名数据中提取质量指标""" if not rank_data: return {} metrics = {} # 提取影响因子 if "sciif" in rank_data: metrics["impact_factor"] = rank_data["sciif"] # 提取分区信息 if "sci" in rank_data: metrics["sci_quartile"] = rank_data["sci"] if "sciUp" in rank_data: metrics["sci_zone"] = rank_data["sciUp"] if "sciUpSmall" in rank_data: metrics["sci_zone_detail"] = rank_data["sciUpSmall"] # 提取JCI if "jci" in rank_data: metrics["jci"] = rank_data["jci"] # 提取5年影响因子 if "sciif5" in rank_data: metrics["impact_factor_5year"] = rank_data["sciif5"] return metrics def get_journal_quality(self, journal_name: str, secret_key: str = None): """获取期刊质量评估信息(影响因子、分区等)""" if not journal_name or not journal_name.strip(): return {"error": "期刊名称不能为空"} journal_name = journal_name.strip() # 1. 先从本地缓存查询 cache = self._load_journal_cache() if journal_name in cache: rank_data = cache[journal_name].get("rank", {}) metrics = self._extract_quality_metrics(rank_data) if metrics: self.logger.info(f"从本地缓存获取期刊信息: {journal_name}") return { "journal_name": journal_name, "source": "local_cache", "quality_metrics": metrics, "error": None, } # 2. 如果本地没有且提供了API密钥,则调用EasyScholar API if secret_key: api_data = self._query_easyscholar_api(journal_name, secret_key) if api_data: # 保存到缓存 if journal_name not in cache: cache[journal_name] = {} cache[journal_name]["rank"] = {} # 处理官方排名数据 if "officialRank" in api_data: official = api_data["officialRank"] if "select" in official: cache[journal_name]["rank"].update(official["select"]) elif "all" in official: cache[journal_name]["rank"].update(official["all"]) # 处理自定义排名数据 if "customRank" in api_data: custom = api_data["customRank"] if "rankInfo" in custom and "rank" in custom: # 解析自定义排名 rank_info_map = {info["uuid"]: info for info in custom["rankInfo"]} for rank_entry in custom["rank"]: if "&&&" in rank_entry: uuid, rank_level = rank_entry.split("&&&", 1) if uuid in rank_info_map: info = rank_info_map[uuid] abbr_name = info.get("abbName", "") rank_text = "" if rank_level == "1": rank_text = info.get("oneRankText", "") elif rank_level == "2": rank_text = info.get("twoRankText", "") elif rank_level == "3": rank_text = info.get("threeRankText", "") elif rank_level == "4": rank_text = info.get("fourRankText", "") elif rank_level == "5": rank_text = info.get("fiveRankText", "") if abbr_name and rank_text: cache[journal_name]["rank"][abbr_name.lower()] = rank_text self._save_journal_cache(cache) # 提取质量指标 metrics = self._extract_quality_metrics(cache[journal_name]["rank"]) self.logger.info(f"从 EasyScholar API 获取期刊信息: {journal_name}") return { "journal_name": journal_name, "source": "easyscholar_api", "quality_metrics": metrics, "error": None, } # 3. 都没有找到 return { "journal_name": journal_name, "source": None, "quality_metrics": {}, "error": "未找到期刊质量信息" + ("(未提供 EasyScholar API 密钥)" if not secret_key else ""), } def evaluate_articles_quality(self, articles: list, secret_key: str = None): """批量评估文献的期刊质量""" if not articles: return [] evaluated_articles = [] for article in articles: journal_name = article.get("journal_name") if journal_name: quality_info = self.get_journal_quality(journal_name, secret_key) article_copy = article.copy() article_copy["journal_quality"] = quality_info evaluated_articles.append(article_copy) else: article_copy = article.copy() article_copy["journal_quality"] = { "journal_name": None, "source": None, "quality_metrics": {}, "error": "无期刊信息", } evaluated_articles.append(article_copy) return evaluated_articles # ------------------------ 对外接口 ------------------------ # def search( self, keyword: str, email: str = None, start_date: str = None, end_date: str = None, max_results: int = 10, ): """关键词搜索 PubMed,返回与 Europe PMC 一致的结构""" import time import xml.etree.ElementTree as ET import requests start_time = time.time() try: if email and not self._validate_email(email): self.logger.info("邮箱格式不正确,将不在请求中携带 email 参数") email = None # 构建查询语句 term = keyword.strip() date_filter = self._format_date_range(start_date, end_date) if date_filter: term = f"{term} AND {date_filter}" esearch_params = { "db": "pubmed", "term": term, "retmax": str(max_results), "retmode": "xml", } if email: esearch_params["email"] = email self.logger.info(f"PubMed ESearch: {term}") r = requests.get( self.base_url + "esearch.fcgi", params=esearch_params, headers=self.headers, timeout=15, ) r.raise_for_status() ids = ET.fromstring(r.content).findall(".//Id") if not ids: return {"articles": [], "message": "未找到相关文献", "error": None} pmids = [elem.text for elem in ids[:max_results]] # EFETCH efetch_params = { "db": "pubmed", "id": ",".join(pmids), "retmode": "xml", "rettype": "xml", } if email: efetch_params["email"] = email self.logger.info(f"PubMed EFetch {len(pmids)} 篇文献") r2 = requests.get( self.base_url + "efetch.fcgi", params=efetch_params, headers=self.headers, timeout=20, ) r2.raise_for_status() root = ET.fromstring(r2.content) articles = [] for art in root.findall(".//PubmedArticle"): info = self._process_article(art) if info: articles.append(info) return { "articles": articles, "error": None, "message": f"找到 {len(articles)} 篇相关文献" if articles else "未找到相关文献", "processing_time": round(time.time() - start_time, 2), } except requests.exceptions.RequestException as e: return {"articles": [], "error": f"网络请求错误: {e}", "message": None} except Exception as e: return {"articles": [], "error": f"处理错误: {e}", "message": None} # ------------------------ 引用文献获取 ------------------------ # def get_citing_articles(self, pmid: str, email: str = None, max_results: int = 20): """获取引用该 PMID 的文献信息(Semantic Scholar → PubMed 补全)""" import time import xml.etree.ElementTree as ET import requests start_time = time.time() try: if not pmid or not pmid.isdigit(): return {"citing_articles": [], "error": "PMID 无效", "message": None} if email and not self._validate_email(email): email = None # 1. 使用 Semantic Scholar Graph API 获取引用列表 ss_url = f"https://api.semanticscholar.org/graph/v1/paper/PMID:{pmid}/citations" ss_params = { "fields": "title,year,authors,venue,externalIds,publicationDate", "limit": max_results, } self.logger.info(f"Semantic Scholar 查询引用: {ss_url}") ss_resp = requests.get(ss_url, params=ss_params, timeout=20) if ss_resp.status_code != 200: return { "citing_articles": [], "error": f"Semantic Scholar 错误 {ss_resp.status_code}", "message": None, } ss_data = ss_resp.json() ss_items = ss_data.get("data", []) if not ss_items: return { "citing_articles": [], "total_count": 0, "message": "未找到引用文献", "error": None, } pmid_list = [] interim_articles = [] for item in ss_items: paper = item.get("citingPaper") or item.get("paper") or {} ext_ids = paper.get("externalIds", {}) ss_pmid = ext_ids.get("PubMed") or ext_ids.get("PMID") if ss_pmid and str(ss_pmid).isdigit(): pmid_list.append(str(ss_pmid)) else: # 为没有PMID的文献构建完整信息 doi = ext_ids.get("DOI") arxiv_id = ext_ids.get("ArXiv") ss_paper_id = paper.get("paperId") # 构建各种链接 doi_link = f"https://doi.org/{doi}" if doi else None arxiv_link = f"https://arxiv.org/abs/{arxiv_id}" if arxiv_id else None ss_link = ( f"https://www.semanticscholar.org/paper/{ss_paper_id}" if ss_paper_id else None ) # 优先级:DOI > ArXiv > Semantic Scholar primary_link = doi_link or arxiv_link or ss_link interim_articles.append( { "pmid": None, "pmid_link": primary_link, "title": paper.get("title"), "authors": ( [a.get("name") for a in paper.get("authors", [])] if paper.get("authors") else None ), "journal_name": paper.get("venue"), "publication_date": paper.get("publicationDate") or str(paper.get("year")), "abstract": None, "doi": doi, "doi_link": doi_link, "arxiv_id": arxiv_id, "arxiv_link": arxiv_link, "semantic_scholar_id": ss_paper_id, "semantic_scholar_link": ss_link, } ) # 2. 使用 PubMed EFetch 批量补全 citing_articles = [] if pmid_list: efetch_params = { "db": "pubmed", "id": ",".join(pmid_list), "retmode": "xml", "rettype": "xml", } if email: efetch_params["email"] = email r2 = requests.get( self.base_url + "efetch.fcgi", params=efetch_params, headers=self.headers, timeout=20, ) r2.raise_for_status() root = ET.fromstring(r2.content) for art in root.findall(".//PubmedArticle"): info = self._process_article(art) if info: citing_articles.append(info) citing_articles.extend(interim_articles) return { "citing_articles": citing_articles, "total_count": len(ss_items), "error": None, "message": f"获取 {len(citing_articles)} 条引用文献 (Semantic Scholar + PubMed)", "processing_time": round(time.time() - start_time, 2), } except requests.exceptions.RequestException as e: return {"citing_articles": [], "error": f"网络请求错误: {e}", "message": None} except Exception as e: return {"citing_articles": [], "error": f"处理错误: {e}", "message": None} def get_pmc_fulltext_html(self, pmc_id: str) -> dict[str, Any]: """通过PMC ID获取全文HTML内容 功能说明: - 通过PMC ID从PMC数据库获取文章的完整HTML内容 - 支持开放获取的文章全文获取 - 返回文章的基本信息和HTML全文内容 参数说明: - pmc_id: 必需,PMC标识符(如:"PMC1234567") 返回值说明: - pmc_id: PMC标识符 - pmc_link: PMC文章链接 - title: 文章标题 - authors: 作者列表 - journal_name: 期刊名称 - publication_date: 发表日期 - abstract: 摘要 - fulltext_html: 完整的HTML全文内容 - fulltext_available: 是否可获取全文 - error: 错误信息(如果有) 使用场景: - 获取开放获取文章的全文内容 - 文献内容深度分析 - 学术研究资料收集 技术特点: - 基于PMC官方API - 支持开放获取文章全文获取 - 完整的错误处理机制 """ import xml.etree.ElementTree as ET import requests try: # 验证PMC ID格式 if not pmc_id or not pmc_id.strip(): return { "pmc_id": None, "pmc_link": None, "title": None, "authors": [], "journal_name": None, "publication_date": None, "abstract": None, "fulltext_html": None, "fulltext_available": False, "error": "PMC ID不能为空", } # 标准化PMC ID格式 normalized_pmc_id = pmc_id.strip() if not normalized_pmc_id.startswith("PMC"): normalized_pmc_id = f"PMC{normalized_pmc_id}" # 构建PMC链接 pmc_link = f"https://www.ncbi.nlm.nih.gov/pmc/articles/{normalized_pmc_id}/" # 请求PMC全文XML xml_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi" params = {"db": "pmc", "id": normalized_pmc_id, "rettype": "xml", "retmode": "xml"} self.logger.info(f"请求PMC全文: {normalized_pmc_id}") response = requests.get(xml_url, params=params, timeout=30) response.raise_for_status() # 解析XML root = ET.fromstring(response.content) # 提取基本信息 title = root.findtext(".//article-title") if not title: title = root.findtext(".//article-title", "无标题") # 提取作者 authors = [] for author_elem in root.findall(".//contrib[@contrib-type='author']"): name = author_elem.findtext(".//name/surname") forename = author_elem.findtext(".//name/given-names") if name and forename: authors.append(f"{forename} {name}") elif name: authors.append(name) # 提取期刊信息 journal_name = root.findtext(".//journal-title") if not journal_name: journal_name = root.findtext(".//journal-id", "未知期刊") # 提取发表日期 pub_date = root.findtext(".//pub-date/year") if pub_date: month = root.findtext(".//pub-date/month", "01") day = root.findtext(".//pub-date/day", "01") pub_date = f"{pub_date}-{month.zfill(2)}-{day.zfill(2)}" # 提取摘要 abstract = root.findtext(".//abstract") if not abstract: abstract_parts = [ "".join(elem.itertext()).strip() for elem in root.findall(".//abstract//p") ] abstract = ( " ".join([p for p in abstract_parts if p]) if abstract_parts else "无摘要" ) # 提取全文HTML # PMC XML通常不直接包含完整的HTML,需要另外获取 fulltext_available = True fulltext_html = response.text # 这是XML格式,不是HTML return { "pmc_id": normalized_pmc_id, "pmc_link": pmc_link, "title": title, "authors": authors, "journal_name": journal_name, "publication_date": pub_date, "abstract": abstract, "fulltext_html": fulltext_html, "fulltext_available": fulltext_available, "error": None, } except requests.exceptions.RequestException as e: return { "pmc_id": pmc_id, "pmc_link": ( f"https://www.ncbi.nlm.nih.gov/pmc/articles/{pmc_id}/" if pmc_id else None ), "title": None, "authors": [], "journal_name": None, "publication_date": None, "abstract": None, "fulltext_html": None, "fulltext_available": False, "error": f"网络请求错误: {str(e)}", } except ET.ParseError as e: return { "pmc_id": pmc_id, "pmc_link": ( f"https://www.ncbi.nlm.nih.gov/pmc/articles/{pmc_id}/" if pmc_id else None ), "title": None, "authors": [], "journal_name": None, "publication_date": None, "abstract": None, "fulltext_html": None, "fulltext_available": False, "error": f"XML解析错误: {str(e)}", } except Exception as e: self.logger.error(f"获取PMC全文时发生错误: {str(e)}") return { "pmc_id": pmc_id, "pmc_link": ( f"https://www.ncbi.nlm.nih.gov/pmc/articles/{pmc_id}/" if pmc_id else None ), "title": None, "authors": [], "journal_name": None, "publication_date": None, "abstract": None, "fulltext_html": None, "fulltext_available": False, "error": f"处理错误: {str(e)}", } def create_pubmed_service(logger=None): """工厂函数,保持接口一致""" return PubMedService(logger)

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/gqy20/article-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server