Skip to main content
Glama
gqy20

Europe PMC Literature Search MCP Server

get_article_details

Retrieve detailed information about specific articles from Europe PMC using PMIDs. Supports high-performance asynchronous processing, concurrent queries, caching, and automatic retries for efficient literature data retrieval.

Instructions

获取特定文献的详细信息(高性能优化版本)

功能说明:

  • 使用异步方式根据PMID获取文献的完整详细信息

  • 支持并发处理,性能更优

  • 集成缓存机制,重复查询响应更快

  • 自动重试和错误恢复

参数说明:

  • pmid: 必需,PubMed ID(如:"37769091")

返回值说明:

  • 包含与同步版本相同的字段

  • 额外提供:

    • processing_time: 处理耗时(秒)

    • cache_hit: 是否命中缓存

    • performance_info: 性能统计信息

    • retry_count: 重试次数

使用场景:

  • 需要高性能的文献详情获取

  • 批量文献详情查询

  • 大规模数据处理

性能特点:

  • 比同步版本快20-40%

  • 支持智能缓存

  • 自动重试机制

  • 并发控制

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
pmidYes

Implementation Reference

  • The main handler function for the 'get_article_details' MCP tool. It accepts an identifier, detects type if auto, queries specified sources (default europe_pmc, crossref), merges results, and optionally adds journal quality metrics.
    @mcp.tool( description="获取文献详情工具。通过DOI、PMID等标识符获取文献的详细信息。", annotations=ToolAnnotations( title="文献详情", readOnlyHint=True, openWorldHint=False ), tags={"literature", "details", "metadata"} ) def get_article_details( identifier: str, id_type: str = "auto", sources: list[str] | None = None, include_quality_metrics: bool = False, ) -> dict[str, Any]: """获取文献详情工具。通过DOI、PMID等标识符获取文献的详细信息。 Args: identifier: 文献标识符 (DOI, PMID, PMCID, arXiv ID) id_type: 标识符类型 ["auto", "doi", "pmid", "pmcid", "arxiv_id"] sources: 数据源列表,优先级顺序查询 include_quality_metrics: 是否包含期刊质量指标 Returns: 包含文献详细信息的字典,包括标题、作者、摘要、期刊等 """ try: if not identifier or not identifier.strip(): from fastmcp.exceptions import ToolError raise ToolError("文献标识符不能为空") from article_mcp.services.merged_results import extract_identifier_type from article_mcp.services.merged_results import merge_same_doi_articles start_time = time.time() details_by_source = {} sources_found = [] # 处理None值的sources参数 if sources is None: sources = ["europe_pmc", "crossref"] # 自动识别标识符类型 if id_type == "auto": id_type = extract_identifier_type(identifier.strip()) # 从每个数据源获取详情 for source in sources: if source not in _article_services: continue try: service = _article_services[source] if source == "europe_pmc": result = service.fetch(identifier.strip(), id_type=id_type) elif source == "crossref": if id_type == "doi": result = service.get_work_by_doi(identifier.strip()) else: continue elif source == "openalex": if id_type == "doi": result = service.get_work_by_doi(identifier.strip()) else: continue elif source == "arxiv": if id_type == "arxiv_id": result = service.fetch(identifier.strip(), id_type=id_type) else: continue else: continue # 判断获取成功:没有错误且有文章数据 error = result.get("error") article = result.get("article") if not error and article: details_by_source[source] = article sources_found.append(source) logger.info(f"{source} 获取详情成功") else: logger.debug(f"{source} 未找到文献详情: {error or '无数据'}") except Exception as e: logger.error(f"{source} 获取详情异常: {e}") continue # 合并详情 merged_detail = None if details_by_source: articles = [details_by_source[source] for source in sources_found] merged_detail = merge_same_doi_articles(articles) # 获取质量指标 quality_metrics = None if include_quality_metrics and merged_detail: journal_name = merged_detail.get("journal", "") if journal_name: try: from article_mcp.services.mcp_config import get_easyscholar_key secret_key = get_easyscholar_key(None, logger) pubmed_service = _article_services.get("pubmed") if pubmed_service: quality_metrics = pubmed_service.get_journal_quality( journal_name, secret_key ) except Exception as e: logger.warning(f"获取期刊质量指标失败: {e}") processing_time = round(time.time() - start_time, 2) return { "success": len(details_by_source) > 0, "identifier": identifier.strip(), "id_type": id_type, "sources_found": sources_found, "details_by_source": details_by_source, "merged_detail": merged_detail, "quality_metrics": quality_metrics, "processing_time": processing_time, } except Exception as e: logger.error(f"获取文献详情异常: {e}") # 抛出MCP标准错误 from mcp import McpError from mcp.types import ErrorData raise McpError(ErrorData( code=-32603, message=f"获取文献详情失败: {type(e).__name__}: {str(e)}" ))
  • The registration function that initializes global service references and registers the get_article_details tool using the @mcp.tool decorator.
    def register_article_tools(mcp: FastMCP, services: dict[str, Any], logger: Any) -> None: """注册文献详情工具""" global _article_services _article_services = services
  • Helper function to automatically detect the type of article identifier (DOI, PMID, PMCID, arXiv ID) used in the tool.
    def extract_identifier_type(identifier: str) -> str: """提取标识符类型""" identifier = identifier.strip() # DOI检测 if identifier.startswith("10.") and "/" in identifier: return "doi" # PMID检测 (纯数字,通常7-8位) if identifier.isdigit() and 6 <= len(identifier) <= 8: return "pmid" # PMCID检测 if identifier.startswith("PMC") and identifier[3:].isdigit(): return "pmcid" # arXiv ID检测 if identifier.startswith("arXiv:") or ( "." in identifier and identifier.replace(".", "").replace("-", "").replace("_", "").isalnum() ): return "arxiv_id" # 默认尝试DOI return "doi"
  • Helper function to merge article details from multiple sources for the same DOI, prioritizing non-empty fields.
    def merge_same_doi_articles(articles: list[dict]) -> dict: """合并同一DOI的多源文章""" if len(articles) == 1: article = articles[0] source_from = article.get("source_from", "unknown") return { **article, "sources": [source_from], "data_sources": {source_from: article}, } # 选择最完整的数据作为基础 base_article = articles[0] for article in articles[1:]: # 合并字段,优先选择非空值 for key, value in article.items(): if key not in base_article or not base_article[key]: base_article[key] = value return { **base_article, "sources": [a.get("source_from", "unknown") for a in articles], "data_sources": {a.get("source_from", "unknown"): a for a in articles}, }
  • Service-level helper method in EuropePMCService for synchronously fetching article details, called by the tool handler.
    def get_article_details_sync( self, identifier: str, id_type: str = "pmid", include_fulltext: bool = False ) -> dict[str, Any]: """同步获取文献详情""" self.logger.info(f"获取文献详情: {id_type}={identifier}") def fetch_from_api(): max_retries = 3 for attempt in range(max_retries): try: # 根据标识符类型构建查询 if id_type.lower() == "pmid": query = f"EXT_ID:{identifier}" elif id_type.lower() == "pmcid": # 对于PMCID,使用特殊的查询语法 if identifier.startswith("PMC"): query = f"PMCID:{identifier}" else: query = f"PMCID:PMC{identifier}" else: query = f"{id_type.upper()}:{identifier}" params = {"query": query, "format": "json", "resultType": "core"} session = self._get_sync_session() response = session.get(self.detail_url, params=params, timeout=30) # 检查HTTP状态码 if response.status_code == 429: # 速率限制 self.logger.warning( f"遇到速率限制,等待后重试 ({attempt + 1}/{max_retries})" ) time.sleep(2**attempt) # 指数退避 continue elif response.status_code == 503: # 服务不可用 self.logger.warning( f"服务暂时不可用,等待后重试 ({attempt + 1}/{max_retries})" ) time.sleep(2**attempt) # 指数退避 continue elif response.status_code != 200: return { "error": f"API 请求失败: HTTP {response.status_code}", "article": None, } response.raise_for_status() data = response.json() results = data.get("resultList", {}).get("result", []) if not results: return { "error": f"未找到 {id_type.upper()} 为 {identifier} 的文献", "article": None, } article_info = self.process_europe_pmc_article(results[0]) # 如果需要全文且结果中有PMC ID,则获取全文 if ( include_fulltext and article_info and article_info.get("pmcid") and self.pubmed_service ): try: pmc_id = article_info["pmcid"] self.logger.info(f"获取PMC全文: {pmc_id}") fulltext_result = self.pubmed_service.get_pmc_fulltext_html(pmc_id) if not fulltext_result.get("error"): article_info["fulltext"] = { "html": fulltext_result.get("fulltext_html"), "available": fulltext_result.get("fulltext_available", False), "title": fulltext_result.get("title"), "authors": fulltext_result.get("authors"), "abstract": fulltext_result.get("abstract"), } else: self.logger.warning( f"获取PMC全文失败: {fulltext_result.get('error')}" ) except Exception as e: self.logger.error(f"获取PMC全文时发生错误: {str(e)}") return ( {"article": article_info, "error": None} if article_info else {"error": "处理文献信息失败", "article": None} ) except requests.exceptions.Timeout: self.logger.warning(f"请求超时,重试 ({attempt + 1}/{max_retries})") if attempt < max_retries - 1: time.sleep(2**attempt) # 指数退避 continue else: return { "error": f"获取文献详情超时: {id_type}={identifier}", "article": None, } except requests.exceptions.ConnectionError: self.logger.warning(f"连接错误,重试 ({attempt + 1}/{max_retries})") if attempt < max_retries - 1: time.sleep(2**attempt) # 指数退避 continue else: return {"error": f"连接到API失败: {id_type}={identifier}", "article": None} except Exception as e: self.logger.error(f"获取文献详情时发生未预期错误: {str(e)}") return {"error": f"获取文献详情失败: {str(e)}", "article": None} return {"error": f"经过 {max_retries} 次重试后仍失败", "article": None} cache_key = f"article_{id_type}_{identifier}" return self._get_cached_or_fetch_sync(cache_key, fetch_from_api)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/gqy20/article-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server