get_article_details
Retrieve detailed information about specific articles from Europe PMC using PMIDs. Supports high-performance asynchronous processing, concurrent queries, caching, and automatic retries for efficient literature data retrieval.
Instructions
获取特定文献的详细信息(高性能优化版本)
功能说明:
使用异步方式根据PMID获取文献的完整详细信息
支持并发处理,性能更优
集成缓存机制,重复查询响应更快
自动重试和错误恢复
参数说明:
pmid: 必需,PubMed ID(如:"37769091")
返回值说明:
包含与同步版本相同的字段
额外提供:
processing_time: 处理耗时(秒)
cache_hit: 是否命中缓存
performance_info: 性能统计信息
retry_count: 重试次数
使用场景:
需要高性能的文献详情获取
批量文献详情查询
大规模数据处理
性能特点:
比同步版本快20-40%
支持智能缓存
自动重试机制
并发控制
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| pmid | Yes |
Implementation Reference
- The main handler function for the 'get_article_details' MCP tool. It accepts an identifier, detects type if auto, queries specified sources (default europe_pmc, crossref), merges results, and optionally adds journal quality metrics.@mcp.tool( description="获取文献详情工具。通过DOI、PMID等标识符获取文献的详细信息。", annotations=ToolAnnotations( title="文献详情", readOnlyHint=True, openWorldHint=False ), tags={"literature", "details", "metadata"} ) def get_article_details( identifier: str, id_type: str = "auto", sources: list[str] | None = None, include_quality_metrics: bool = False, ) -> dict[str, Any]: """获取文献详情工具。通过DOI、PMID等标识符获取文献的详细信息。 Args: identifier: 文献标识符 (DOI, PMID, PMCID, arXiv ID) id_type: 标识符类型 ["auto", "doi", "pmid", "pmcid", "arxiv_id"] sources: 数据源列表,优先级顺序查询 include_quality_metrics: 是否包含期刊质量指标 Returns: 包含文献详细信息的字典,包括标题、作者、摘要、期刊等 """ try: if not identifier or not identifier.strip(): from fastmcp.exceptions import ToolError raise ToolError("文献标识符不能为空") from article_mcp.services.merged_results import extract_identifier_type from article_mcp.services.merged_results import merge_same_doi_articles start_time = time.time() details_by_source = {} sources_found = [] # 处理None值的sources参数 if sources is None: sources = ["europe_pmc", "crossref"] # 自动识别标识符类型 if id_type == "auto": id_type = extract_identifier_type(identifier.strip()) # 从每个数据源获取详情 for source in sources: if source not in _article_services: continue try: service = _article_services[source] if source == "europe_pmc": result = service.fetch(identifier.strip(), id_type=id_type) elif source == "crossref": if id_type == "doi": result = service.get_work_by_doi(identifier.strip()) else: continue elif source == "openalex": if id_type == "doi": result = service.get_work_by_doi(identifier.strip()) else: continue elif source == "arxiv": if id_type == "arxiv_id": result = service.fetch(identifier.strip(), id_type=id_type) else: continue else: continue # 判断获取成功:没有错误且有文章数据 error = result.get("error") article = result.get("article") if not error and article: details_by_source[source] = article sources_found.append(source) logger.info(f"{source} 获取详情成功") else: logger.debug(f"{source} 未找到文献详情: {error or '无数据'}") except Exception as e: logger.error(f"{source} 获取详情异常: {e}") continue # 合并详情 merged_detail = None if details_by_source: articles = [details_by_source[source] for source in sources_found] merged_detail = merge_same_doi_articles(articles) # 获取质量指标 quality_metrics = None if include_quality_metrics and merged_detail: journal_name = merged_detail.get("journal", "") if journal_name: try: from article_mcp.services.mcp_config import get_easyscholar_key secret_key = get_easyscholar_key(None, logger) pubmed_service = _article_services.get("pubmed") if pubmed_service: quality_metrics = pubmed_service.get_journal_quality( journal_name, secret_key ) except Exception as e: logger.warning(f"获取期刊质量指标失败: {e}") processing_time = round(time.time() - start_time, 2) return { "success": len(details_by_source) > 0, "identifier": identifier.strip(), "id_type": id_type, "sources_found": sources_found, "details_by_source": details_by_source, "merged_detail": merged_detail, "quality_metrics": quality_metrics, "processing_time": processing_time, } except Exception as e: logger.error(f"获取文献详情异常: {e}") # 抛出MCP标准错误 from mcp import McpError from mcp.types import ErrorData raise McpError(ErrorData( code=-32603, message=f"获取文献详情失败: {type(e).__name__}: {str(e)}" ))
- src/article_mcp/tools/core/article_tools.py:14-18 (registration)The registration function that initializes global service references and registers the get_article_details tool using the @mcp.tool decorator.def register_article_tools(mcp: FastMCP, services: dict[str, Any], logger: Any) -> None: """注册文献详情工具""" global _article_services _article_services = services
- Helper function to automatically detect the type of article identifier (DOI, PMID, PMCID, arXiv ID) used in the tool.def extract_identifier_type(identifier: str) -> str: """提取标识符类型""" identifier = identifier.strip() # DOI检测 if identifier.startswith("10.") and "/" in identifier: return "doi" # PMID检测 (纯数字,通常7-8位) if identifier.isdigit() and 6 <= len(identifier) <= 8: return "pmid" # PMCID检测 if identifier.startswith("PMC") and identifier[3:].isdigit(): return "pmcid" # arXiv ID检测 if identifier.startswith("arXiv:") or ( "." in identifier and identifier.replace(".", "").replace("-", "").replace("_", "").isalnum() ): return "arxiv_id" # 默认尝试DOI return "doi"
- Helper function to merge article details from multiple sources for the same DOI, prioritizing non-empty fields.def merge_same_doi_articles(articles: list[dict]) -> dict: """合并同一DOI的多源文章""" if len(articles) == 1: article = articles[0] source_from = article.get("source_from", "unknown") return { **article, "sources": [source_from], "data_sources": {source_from: article}, } # 选择最完整的数据作为基础 base_article = articles[0] for article in articles[1:]: # 合并字段,优先选择非空值 for key, value in article.items(): if key not in base_article or not base_article[key]: base_article[key] = value return { **base_article, "sources": [a.get("source_from", "unknown") for a in articles], "data_sources": {a.get("source_from", "unknown"): a for a in articles}, }
- Service-level helper method in EuropePMCService for synchronously fetching article details, called by the tool handler.def get_article_details_sync( self, identifier: str, id_type: str = "pmid", include_fulltext: bool = False ) -> dict[str, Any]: """同步获取文献详情""" self.logger.info(f"获取文献详情: {id_type}={identifier}") def fetch_from_api(): max_retries = 3 for attempt in range(max_retries): try: # 根据标识符类型构建查询 if id_type.lower() == "pmid": query = f"EXT_ID:{identifier}" elif id_type.lower() == "pmcid": # 对于PMCID,使用特殊的查询语法 if identifier.startswith("PMC"): query = f"PMCID:{identifier}" else: query = f"PMCID:PMC{identifier}" else: query = f"{id_type.upper()}:{identifier}" params = {"query": query, "format": "json", "resultType": "core"} session = self._get_sync_session() response = session.get(self.detail_url, params=params, timeout=30) # 检查HTTP状态码 if response.status_code == 429: # 速率限制 self.logger.warning( f"遇到速率限制,等待后重试 ({attempt + 1}/{max_retries})" ) time.sleep(2**attempt) # 指数退避 continue elif response.status_code == 503: # 服务不可用 self.logger.warning( f"服务暂时不可用,等待后重试 ({attempt + 1}/{max_retries})" ) time.sleep(2**attempt) # 指数退避 continue elif response.status_code != 200: return { "error": f"API 请求失败: HTTP {response.status_code}", "article": None, } response.raise_for_status() data = response.json() results = data.get("resultList", {}).get("result", []) if not results: return { "error": f"未找到 {id_type.upper()} 为 {identifier} 的文献", "article": None, } article_info = self.process_europe_pmc_article(results[0]) # 如果需要全文且结果中有PMC ID,则获取全文 if ( include_fulltext and article_info and article_info.get("pmcid") and self.pubmed_service ): try: pmc_id = article_info["pmcid"] self.logger.info(f"获取PMC全文: {pmc_id}") fulltext_result = self.pubmed_service.get_pmc_fulltext_html(pmc_id) if not fulltext_result.get("error"): article_info["fulltext"] = { "html": fulltext_result.get("fulltext_html"), "available": fulltext_result.get("fulltext_available", False), "title": fulltext_result.get("title"), "authors": fulltext_result.get("authors"), "abstract": fulltext_result.get("abstract"), } else: self.logger.warning( f"获取PMC全文失败: {fulltext_result.get('error')}" ) except Exception as e: self.logger.error(f"获取PMC全文时发生错误: {str(e)}") return ( {"article": article_info, "error": None} if article_info else {"error": "处理文献信息失败", "article": None} ) except requests.exceptions.Timeout: self.logger.warning(f"请求超时,重试 ({attempt + 1}/{max_retries})") if attempt < max_retries - 1: time.sleep(2**attempt) # 指数退避 continue else: return { "error": f"获取文献详情超时: {id_type}={identifier}", "article": None, } except requests.exceptions.ConnectionError: self.logger.warning(f"连接错误,重试 ({attempt + 1}/{max_retries})") if attempt < max_retries - 1: time.sleep(2**attempt) # 指数退避 continue else: return {"error": f"连接到API失败: {id_type}={identifier}", "article": None} except Exception as e: self.logger.error(f"获取文献详情时发生未预期错误: {str(e)}") return {"error": f"获取文献详情失败: {str(e)}", "article": None} return {"error": f"经过 {max_retries} 次重试后仍失败", "article": None} cache_key = f"article_{id_type}_{identifier}" return self._get_cached_or_fetch_sync(cache_key, fetch_from_api)