Skip to main content
Glama
h-lu
by h-lu
medrxiv.py8.51 kB
# paper_search_mcp/academic_platforms/medrxiv.py """ MedRxivSearcher - medRxiv 医学预印本搜索 2025 最佳实践版本: - 使用 PyMuPDF4LLM 提取 PDF - 日志记录 - 分页处理 - 速率限制(1秒间隔) """ from typing import List, Optional import requests import os import time from datetime import datetime, timedelta import logging from ..paper import Paper import pymupdf4llm logger = logging.getLogger(__name__) class PaperSource: """Abstract base class for paper sources""" def search(self, query: str, **kwargs) -> List[Paper]: raise NotImplementedError def download_pdf(self, paper_id: str, save_path: str) -> str: raise NotImplementedError def read_paper(self, paper_id: str, save_path: str) -> str: raise NotImplementedError class MedRxivSearcher(PaperSource): """medRxiv 医学预印本搜索器 使用 bioRxiv API(共享 API)搜索医学预印本。 2025 最佳实践: - 分页获取(每页 100 篇) - 请求间隔 1 秒 - 指数退避重试 """ BASE_URL = "https://api.biorxiv.org/details/medrxiv" def __init__(self, timeout: int = 30, max_retries: int = 3): self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'paper_search_mcp/1.0' }) self.timeout = timeout self.max_retries = max_retries self._last_request_time = 0.0 def _rate_limit_wait(self): """速率限制:1秒间隔""" elapsed = time.time() - self._last_request_time if elapsed < 1.0: time.sleep(1.0 - elapsed) self._last_request_time = time.time() def _make_request(self, url: str, retry_count: int = 0) -> Optional[requests.Response]: """发送请求,带重试机制""" self._rate_limit_wait() try: response = self.session.get(url, timeout=self.timeout) if response.status_code == 429: if retry_count < self.max_retries: wait_time = (2 ** retry_count) + 1 logger.warning(f"Rate limited, retrying in {wait_time}s...") time.sleep(wait_time) return self._make_request(url, retry_count + 1) return None response.raise_for_status() return response except requests.exceptions.RequestException as e: if retry_count < self.max_retries: wait_time = 2 ** retry_count logger.warning(f"Request failed, retrying in {wait_time}s: {e}") time.sleep(wait_time) return self._make_request(url, retry_count + 1) logger.error(f"Request failed after {self.max_retries} retries: {e}") return None def search(self, query: str, max_results: int = 10, days: int = 30) -> List[Paper]: """搜索 medRxiv 论文 Args: query: 分类名称(如 "cardiovascular medicine", "infectious diseases") max_results: 最大返回数量 days: 搜索最近 N 天的论文 Returns: List[Paper]: 论文列表 """ end_date = datetime.now().strftime('%Y-%m-%d') start_date = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d') # 格式化分类 category = query.lower().replace(' ', '_') papers = [] cursor = 0 while len(papers) < max_results: url = f"{self.BASE_URL}/{start_date}/{end_date}/{cursor}" if category: url += f"?category={category}" response = self._make_request(url) if not response: break try: data = response.json() collection = data.get('collection', []) except Exception as e: logger.error(f"Failed to parse response: {e}") break if not collection: break for item in collection: if len(papers) >= max_results: break paper = self._parse_item(item) if paper: papers.append(paper) if len(collection) < 100: break # 没有更多结果 cursor += 100 logger.info(f"Found {len(papers)} papers for query: {query}") return papers[:max_results] def _parse_item(self, item: dict) -> Optional[Paper]: """解析 API 返回的论文数据""" try: doi = item.get('doi', '') version = item.get('version', '1') date_str = item.get('date', '') published_date = None if date_str: try: published_date = datetime.strptime(date_str, '%Y-%m-%d') except ValueError: pass return Paper( paper_id=doi, title=item.get('title', ''), authors=item.get('authors', '').split('; '), abstract=item.get('abstract', ''), url=f"https://www.medrxiv.org/content/{doi}v{version}", pdf_url=f"https://www.medrxiv.org/content/{doi}v{version}.full.pdf", published_date=published_date, source="medrxiv", categories=[item.get('category', '')], doi=doi ) except Exception as e: logger.warning(f"Failed to parse item: {e}") return None def download_pdf(self, paper_id: str, save_path: str) -> str: """下载 PDF Args: paper_id: medRxiv DOI save_path: 保存目录 Returns: 下载的文件路径或错误信息 """ if not paper_id: return "Error: paper_id is empty" pdf_url = f"https://www.medrxiv.org/content/{paper_id}v1.full.pdf" try: response = self.session.get( pdf_url, timeout=self.timeout, headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'} ) response.raise_for_status() os.makedirs(save_path, exist_ok=True) filename = f"{paper_id.replace('/', '_')}.pdf" pdf_path = os.path.join(save_path, filename) with open(pdf_path, 'wb') as f: f.write(response.content) logger.info(f"PDF downloaded: {pdf_path}") return pdf_path except Exception as e: logger.error(f"PDF download failed: {e}") return f"Error downloading PDF: {e}" def read_paper(self, paper_id: str, save_path: str) -> str: """下载并提取论文文本 Args: paper_id: medRxiv DOI save_path: 保存目录 Returns: 提取的 Markdown 文本 """ pdf_path = os.path.join(save_path, f"{paper_id.replace('/', '_')}.pdf") if not os.path.exists(pdf_path): result = self.download_pdf(paper_id, save_path) if result.startswith("Error"): return result pdf_path = result try: text = pymupdf4llm.to_markdown(pdf_path, show_progress=False) logger.info(f"Extracted {len(text)} characters from {pdf_path}") return text except Exception as e: logger.error(f"Failed to extract text: {e}") return f"Error extracting text: {e}" if __name__ == "__main__": logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') searcher = MedRxivSearcher() print("=" * 60) print("Testing MedRxivSearcher...") print("=" * 60) papers = searcher.search("infectious diseases", max_results=2, days=7) print(f"Found {len(papers)} papers") for i, paper in enumerate(papers, 1): print(f"\n{i}. {paper.title[:60]}...") print(f" DOI: {paper.doi}") print(f" PDF: {paper.pdf_url}") print("\n✅ Test completed!")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/h-lu/paper-search-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server