Skip to main content
Glama
gqy20

Europe PMC Literature Search MCP Server

get_article_details

Retrieve detailed information about specific articles from Europe PMC using PMIDs. Supports high-performance asynchronous processing, concurrent queries, caching, and automatic retries for efficient literature data retrieval.

Instructions

获取特定文献的详细信息(高性能优化版本)

功能说明:

  • 使用异步方式根据PMID获取文献的完整详细信息

  • 支持并发处理,性能更优

  • 集成缓存机制,重复查询响应更快

  • 自动重试和错误恢复

参数说明:

  • pmid: 必需,PubMed ID(如:"37769091")

返回值说明:

  • 包含与同步版本相同的字段

  • 额外提供:

    • processing_time: 处理耗时(秒)

    • cache_hit: 是否命中缓存

    • performance_info: 性能统计信息

    • retry_count: 重试次数

使用场景:

  • 需要高性能的文献详情获取

  • 批量文献详情查询

  • 大规模数据处理

性能特点:

  • 比同步版本快20-40%

  • 支持智能缓存

  • 自动重试机制

  • 并发控制

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
pmidYes

Implementation Reference

  • The main handler function for the 'get_article_details' MCP tool. It accepts an identifier, detects type if auto, queries specified sources (default europe_pmc, crossref), merges results, and optionally adds journal quality metrics.
    @mcp.tool(
        description="获取文献详情工具。通过DOI、PMID等标识符获取文献的详细信息。",
        annotations=ToolAnnotations(
            title="文献详情",
            readOnlyHint=True,
            openWorldHint=False
        ),
        tags={"literature", "details", "metadata"}
    )
    def get_article_details(
        identifier: str,
        id_type: str = "auto",
        sources: list[str] | None = None,
        include_quality_metrics: bool = False,
    ) -> dict[str, Any]:
        """获取文献详情工具。通过DOI、PMID等标识符获取文献的详细信息。
    
        Args:
            identifier: 文献标识符 (DOI, PMID, PMCID, arXiv ID)
            id_type: 标识符类型 ["auto", "doi", "pmid", "pmcid", "arxiv_id"]
            sources: 数据源列表,优先级顺序查询
            include_quality_metrics: 是否包含期刊质量指标
    
        Returns:
            包含文献详细信息的字典,包括标题、作者、摘要、期刊等
        """
        try:
            if not identifier or not identifier.strip():
                from fastmcp.exceptions import ToolError
                raise ToolError("文献标识符不能为空")
    
            from article_mcp.services.merged_results import extract_identifier_type
            from article_mcp.services.merged_results import merge_same_doi_articles
    
            start_time = time.time()
            details_by_source = {}
            sources_found = []
    
            # 处理None值的sources参数
            if sources is None:
                sources = ["europe_pmc", "crossref"]
    
            # 自动识别标识符类型
            if id_type == "auto":
                id_type = extract_identifier_type(identifier.strip())
    
            # 从每个数据源获取详情
            for source in sources:
                if source not in _article_services:
                    continue
    
                try:
                    service = _article_services[source]
                    if source == "europe_pmc":
                        result = service.fetch(identifier.strip(), id_type=id_type)
                    elif source == "crossref":
                        if id_type == "doi":
                            result = service.get_work_by_doi(identifier.strip())
                        else:
                            continue
                    elif source == "openalex":
                        if id_type == "doi":
                            result = service.get_work_by_doi(identifier.strip())
                        else:
                            continue
                    elif source == "arxiv":
                        if id_type == "arxiv_id":
                            result = service.fetch(identifier.strip(), id_type=id_type)
                        else:
                            continue
                    else:
                        continue
    
                    # 判断获取成功:没有错误且有文章数据
                    error = result.get("error")
                    article = result.get("article")
                    if not error and article:
                        details_by_source[source] = article
                        sources_found.append(source)
                        logger.info(f"{source} 获取详情成功")
                    else:
                        logger.debug(f"{source} 未找到文献详情: {error or '无数据'}")
    
                except Exception as e:
                    logger.error(f"{source} 获取详情异常: {e}")
                    continue
    
            # 合并详情
            merged_detail = None
            if details_by_source:
                articles = [details_by_source[source] for source in sources_found]
                merged_detail = merge_same_doi_articles(articles)
    
            # 获取质量指标
            quality_metrics = None
            if include_quality_metrics and merged_detail:
                journal_name = merged_detail.get("journal", "")
                if journal_name:
                    try:
                        from article_mcp.services.mcp_config import get_easyscholar_key
    
                        secret_key = get_easyscholar_key(None, logger)
                        pubmed_service = _article_services.get("pubmed")
                        if pubmed_service:
                            quality_metrics = pubmed_service.get_journal_quality(
                                journal_name, secret_key
                            )
                    except Exception as e:
                        logger.warning(f"获取期刊质量指标失败: {e}")
    
            processing_time = round(time.time() - start_time, 2)
    
            return {
                "success": len(details_by_source) > 0,
                "identifier": identifier.strip(),
                "id_type": id_type,
                "sources_found": sources_found,
                "details_by_source": details_by_source,
                "merged_detail": merged_detail,
                "quality_metrics": quality_metrics,
                "processing_time": processing_time,
            }
    
        except Exception as e:
            logger.error(f"获取文献详情异常: {e}")
            # 抛出MCP标准错误
            from mcp import McpError
            from mcp.types import ErrorData
            raise McpError(ErrorData(
                code=-32603,
                message=f"获取文献详情失败: {type(e).__name__}: {str(e)}"
            ))
  • The registration function that initializes global service references and registers the get_article_details tool using the @mcp.tool decorator.
    def register_article_tools(mcp: FastMCP, services: dict[str, Any], logger: Any) -> None:
        """注册文献详情工具"""
        global _article_services
        _article_services = services
  • Helper function to automatically detect the type of article identifier (DOI, PMID, PMCID, arXiv ID) used in the tool.
    def extract_identifier_type(identifier: str) -> str:
        """提取标识符类型"""
        identifier = identifier.strip()
    
        # DOI检测
        if identifier.startswith("10.") and "/" in identifier:
            return "doi"
    
        # PMID检测 (纯数字,通常7-8位)
        if identifier.isdigit() and 6 <= len(identifier) <= 8:
            return "pmid"
    
        # PMCID检测
        if identifier.startswith("PMC") and identifier[3:].isdigit():
            return "pmcid"
    
        # arXiv ID检测
        if identifier.startswith("arXiv:") or (
            "." in identifier
            and identifier.replace(".", "").replace("-", "").replace("_", "").isalnum()
        ):
            return "arxiv_id"
    
        # 默认尝试DOI
        return "doi"
  • Helper function to merge article details from multiple sources for the same DOI, prioritizing non-empty fields.
    def merge_same_doi_articles(articles: list[dict]) -> dict:
        """合并同一DOI的多源文章"""
        if len(articles) == 1:
            article = articles[0]
            source_from = article.get("source_from", "unknown")
            return {
                **article,
                "sources": [source_from],
                "data_sources": {source_from: article},
            }
    
        # 选择最完整的数据作为基础
        base_article = articles[0]
        for article in articles[1:]:
            # 合并字段,优先选择非空值
            for key, value in article.items():
                if key not in base_article or not base_article[key]:
                    base_article[key] = value
    
        return {
            **base_article,
            "sources": [a.get("source_from", "unknown") for a in articles],
            "data_sources": {a.get("source_from", "unknown"): a for a in articles},
        }
  • Service-level helper method in EuropePMCService for synchronously fetching article details, called by the tool handler.
    def get_article_details_sync(
        self, identifier: str, id_type: str = "pmid", include_fulltext: bool = False
    ) -> dict[str, Any]:
        """同步获取文献详情"""
        self.logger.info(f"获取文献详情: {id_type}={identifier}")
    
        def fetch_from_api():
            max_retries = 3
            for attempt in range(max_retries):
                try:
                    # 根据标识符类型构建查询
                    if id_type.lower() == "pmid":
                        query = f"EXT_ID:{identifier}"
                    elif id_type.lower() == "pmcid":
                        # 对于PMCID,使用特殊的查询语法
                        if identifier.startswith("PMC"):
                            query = f"PMCID:{identifier}"
                        else:
                            query = f"PMCID:PMC{identifier}"
                    else:
                        query = f"{id_type.upper()}:{identifier}"
    
                    params = {"query": query, "format": "json", "resultType": "core"}
                    session = self._get_sync_session()
                    response = session.get(self.detail_url, params=params, timeout=30)
    
                    # 检查HTTP状态码
                    if response.status_code == 429:  # 速率限制
                        self.logger.warning(
                            f"遇到速率限制,等待后重试 ({attempt + 1}/{max_retries})"
                        )
                        time.sleep(2**attempt)  # 指数退避
                        continue
                    elif response.status_code == 503:  # 服务不可用
                        self.logger.warning(
                            f"服务暂时不可用,等待后重试 ({attempt + 1}/{max_retries})"
                        )
                        time.sleep(2**attempt)  # 指数退避
                        continue
                    elif response.status_code != 200:
                        return {
                            "error": f"API 请求失败: HTTP {response.status_code}",
                            "article": None,
                        }
    
                    response.raise_for_status()
    
                    data = response.json()
                    results = data.get("resultList", {}).get("result", [])
    
                    if not results:
                        return {
                            "error": f"未找到 {id_type.upper()} 为 {identifier} 的文献",
                            "article": None,
                        }
    
                    article_info = self.process_europe_pmc_article(results[0])
    
                    # 如果需要全文且结果中有PMC ID,则获取全文
                    if (
                        include_fulltext
                        and article_info
                        and article_info.get("pmcid")
                        and self.pubmed_service
                    ):
                        try:
                            pmc_id = article_info["pmcid"]
                            self.logger.info(f"获取PMC全文: {pmc_id}")
                            fulltext_result = self.pubmed_service.get_pmc_fulltext_html(pmc_id)
                            if not fulltext_result.get("error"):
                                article_info["fulltext"] = {
                                    "html": fulltext_result.get("fulltext_html"),
                                    "available": fulltext_result.get("fulltext_available", False),
                                    "title": fulltext_result.get("title"),
                                    "authors": fulltext_result.get("authors"),
                                    "abstract": fulltext_result.get("abstract"),
                                }
                            else:
                                self.logger.warning(
                                    f"获取PMC全文失败: {fulltext_result.get('error')}"
                                )
                        except Exception as e:
                            self.logger.error(f"获取PMC全文时发生错误: {str(e)}")
    
                    return (
                        {"article": article_info, "error": None}
                        if article_info
                        else {"error": "处理文献信息失败", "article": None}
                    )
    
                except requests.exceptions.Timeout:
                    self.logger.warning(f"请求超时,重试 ({attempt + 1}/{max_retries})")
                    if attempt < max_retries - 1:
                        time.sleep(2**attempt)  # 指数退避
                        continue
                    else:
                        return {
                            "error": f"获取文献详情超时: {id_type}={identifier}",
                            "article": None,
                        }
                except requests.exceptions.ConnectionError:
                    self.logger.warning(f"连接错误,重试 ({attempt + 1}/{max_retries})")
                    if attempt < max_retries - 1:
                        time.sleep(2**attempt)  # 指数退避
                        continue
                    else:
                        return {"error": f"连接到API失败: {id_type}={identifier}", "article": None}
                except Exception as e:
                    self.logger.error(f"获取文献详情时发生未预期错误: {str(e)}")
                    return {"error": f"获取文献详情失败: {str(e)}", "article": None}
    
            return {"error": f"经过 {max_retries} 次重试后仍失败", "article": None}
    
        cache_key = f"article_{id_type}_{identifier}"
        return self._get_cached_or_fetch_sync(cache_key, fetch_from_api)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/gqy20/article-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server