Skip to main content
Glama
h-lu
by h-lu
semantic.py17.2 kB
# paper_search_mcp/academic_platforms/semantic.py """ SemanticSearcher - Semantic Scholar 论文搜索 2025 最佳实践版本: - 支持 API Key(提升速率限制,获取专用配额) - 只请求必要字段(减少延迟和配额消耗) - 指数退避重试机制 - 使用 PyMuPDF4LLM 提取 PDF(替代 PyPDF2) - Session 复用 """ from typing import List, Optional from datetime import datetime import requests import time import os import re import logging from ..paper import Paper import pymupdf4llm logger = logging.getLogger(__name__) class PaperSource: """Abstract base class for paper sources""" def search(self, query: str, **kwargs) -> List[Paper]: raise NotImplementedError def download_pdf(self, paper_id: str, save_path: str) -> str: raise NotImplementedError def read_paper(self, paper_id: str, save_path: str) -> str: raise NotImplementedError class SemanticSearcher(PaperSource): """Semantic Scholar 论文搜索器 使用 Semantic Scholar Academic Graph API 搜索论文。 2025 最佳实践: - API Key 提供专用速率限制(1 RPS 起步,可申请提升) - 只请求必要字段减少延迟 - 指数退避处理 429 错误 - 支持多种论文 ID 格式(DOI, arXiv, PMID 等) 环境变量: - SEMANTIC_SCHOLAR_API_KEY: API 密钥(推荐) 获取 API Key: https://www.semanticscholar.org/product/api """ BASE_URL = "https://api.semanticscholar.org/graph/v1" # 只请求必要字段(2025 最佳实践) DEFAULT_FIELDS = [ "title", "abstract", "year", "citationCount", "authors", "url", "publicationDate", "externalIds", "fieldsOfStudy", "openAccessPdf" ] def __init__( self, api_key: Optional[str] = None, timeout: int = 30, max_retries: int = 3 ): """初始化 Semantic Scholar 搜索器 Args: api_key: API Key(默认从环境变量获取) timeout: 请求超时时间(秒) max_retries: 最大重试次数 """ self.api_key = api_key or os.environ.get('SEMANTIC_SCHOLAR_API_KEY', '') self.timeout = timeout self.max_retries = max_retries # Session 复用 self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'paper_search_mcp/1.0', 'Accept': 'application/json', }) # 添加 API Key 到 headers if self.api_key: self.session.headers['x-api-key'] = self.api_key logger.info("Using authenticated access with API key") else: logger.warning( "No SEMANTIC_SCHOLAR_API_KEY set. " "Using shared rate limit (5000 req/5min shared with all users)" ) # 速率限制追踪 self._last_request_time = 0.0 # 有 API Key = 1 RPS,无 API Key = 共享池 self.min_request_interval = 1.0 if self.api_key else 0.5 def _rate_limit_wait(self): """速率限制等待""" elapsed = time.time() - self._last_request_time if elapsed < self.min_request_interval: time.sleep(self.min_request_interval - elapsed) self._last_request_time = time.time() def _make_request( self, endpoint: str, params: dict, retry_count: int = 0 ) -> Optional[requests.Response]: """发送 API 请求,带重试机制 Args: endpoint: API 端点路径 params: 请求参数 retry_count: 当前重试次数 Returns: Response 对象或 None(发生错误时) """ self._rate_limit_wait() url = f"{self.BASE_URL}/{endpoint}" try: response = self.session.get(url, params=params, timeout=self.timeout) # 处理 429 速率限制 if response.status_code == 429: if retry_count < self.max_retries: # 指数退避 + 随机抖动 wait_time = (2 ** retry_count) + (time.time() % 1) logger.warning(f"Rate limited (429), retrying in {wait_time:.1f}s...") time.sleep(wait_time) return self._make_request(endpoint, params, retry_count + 1) else: logger.error(f"Rate limited after {self.max_retries} retries") return None response.raise_for_status() return response except requests.exceptions.RequestException as e: if retry_count < self.max_retries: wait_time = 2 ** retry_count logger.warning(f"Request failed, retrying in {wait_time}s: {e}") time.sleep(wait_time) return self._make_request(endpoint, params, retry_count + 1) logger.error(f"Request failed after {self.max_retries} retries: {e}") return None def _parse_date(self, date_str: str) -> Optional[datetime]: """解析日期字符串""" if not date_str: return None try: return datetime.strptime(date_str.strip(), "%Y-%m-%d") except ValueError: # 尝试只解析年份 try: return datetime.strptime(date_str.strip()[:4], "%Y") except ValueError: return None def _extract_pdf_url(self, open_access_pdf: dict) -> str: """从 openAccessPdf 字段提取 PDF URL""" if not open_access_pdf: return "" # 直接获取 URL if open_access_pdf.get('url'): return open_access_pdf['url'] # 从 disclaimer 中提取 disclaimer = open_access_pdf.get('disclaimer', '') if disclaimer: # 匹配 URL 模式 url_pattern = r'https?://[^\s,)"]+' matches = re.findall(url_pattern, disclaimer) if matches: # 优先返回 DOI 或 arXiv URL for url in matches: if 'doi.org' in url or 'arxiv.org' in url: # 转换 arXiv abs 链接为 PDF 链接 if 'arxiv.org/abs/' in url: return url.replace('/abs/', '/pdf/') + '.pdf' return url return matches[0] return "" def _parse_paper(self, data: dict) -> Optional[Paper]: """解析论文数据 Args: data: API 返回的论文数据 Returns: Paper 对象或 None """ try: paper_id = data.get('paperId', '') if not paper_id: return None # 作者 authors = [ author.get('name', '') for author in data.get('authors', []) if author.get('name') ] # DOI external_ids = data.get('externalIds', {}) or {} doi = external_ids.get('DOI', '') # PDF URL pdf_url = self._extract_pdf_url(data.get('openAccessPdf')) return Paper( paper_id=paper_id, title=data.get('title', 'Untitled'), authors=authors, abstract=data.get('abstract', ''), url=data.get('url', ''), pdf_url=pdf_url, published_date=self._parse_date(data.get('publicationDate', '')), source="semantic", categories=data.get('fieldsOfStudy', []) or [], doi=doi, citations=data.get('citationCount', 0) or 0, ) except Exception as e: logger.warning(f"Failed to parse paper: {e}") return None def search( self, query: str, year: Optional[str] = None, max_results: int = 10 ) -> List[Paper]: """搜索论文 Args: query: 搜索关键词 year: 年份过滤(支持格式:"2019", "2016-2020", "2010-", "-2015") max_results: 最大返回数量 Returns: List[Paper]: 论文列表 """ params = { "query": query, "limit": min(max_results, 100), # API 限制 "fields": ",".join(self.DEFAULT_FIELDS), } if year: params["year"] = year response = self._make_request("paper/search", params) if not response: return [] try: data = response.json() results = data.get('data', []) except Exception as e: logger.error(f"Failed to parse response: {e}") return [] papers = [] for item in results[:max_results]: paper = self._parse_paper(item) if paper: papers.append(paper) logger.info(f"Found {len(papers)} papers for query: {query}") return papers def get_paper_details(self, paper_id: str) -> Optional[Paper]: """获取单篇论文详情 Args: paper_id: 论文 ID,支持多种格式: - Semantic Scholar ID: "649def34f8be52c8b66281af98ae884c09aef38b" - DOI: "DOI:10.18653/v1/N18-3011" - arXiv: "ARXIV:2106.15928" - PMID: "PMID:19872477" - ACL: "ACL:W12-3903" - URL: "URL:https://arxiv.org/abs/2106.15928" Returns: Paper 对象或 None """ params = {"fields": ",".join(self.DEFAULT_FIELDS)} response = self._make_request(f"paper/{paper_id}", params) if not response: return None try: data = response.json() return self._parse_paper(data) except Exception as e: logger.error(f"Failed to get paper details: {e}") return None def download_pdf(self, paper_id: str, save_path: str) -> str: """下载论文 PDF Args: paper_id: 论文 ID(支持多种格式) save_path: 保存目录 Returns: 下载的文件路径或错误信息 """ paper = self.get_paper_details(paper_id) if not paper: return f"Error: Could not find paper {paper_id}" if not paper.pdf_url: return f"Error: No PDF URL available for paper {paper_id}" pdf_url = paper.pdf_url logger.info(f"Downloading PDF from: {pdf_url}") try: # 直接使用 requests 下载 pdf_response = requests.get(pdf_url, timeout=60) pdf_response.raise_for_status() # 验证下载的内容是 PDF content_type = pdf_response.headers.get('Content-Type', '') content = pdf_response.content # 检查是否是 PDF(通过内容头部) if not content.startswith(b'%PDF') and 'application/pdf' not in content_type: logger.warning(f"Downloaded content is not a PDF. Content-Type: {content_type}") # 如果是 HTML 页面(如 OSTI),尝试提取真实 PDF 链接 if b'<html' in content[:1000].lower() or b'<!doctype' in content[:1000].lower(): logger.error("Downloaded HTML instead of PDF. The URL may require browser access.") return f"Error: URL {pdf_url} returned HTML, not PDF. This may require direct browser download." # 准备保存路径 os.makedirs(save_path, exist_ok=True) safe_id = paper_id.replace('/', '_').replace(':', '_') filename = f"semantic_{safe_id}.pdf" pdf_path = os.path.join(save_path, filename) with open(pdf_path, "wb") as f: f.write(content) # 最终验证 file_size = os.path.getsize(pdf_path) if file_size < 1000: os.remove(pdf_path) return f"Error: Downloaded file too small ({file_size} bytes)" logger.info(f"PDF downloaded successfully: {pdf_path} ({file_size} bytes)") return pdf_path except requests.exceptions.Timeout: return f"Error: Download timed out for {pdf_url}" except requests.exceptions.RequestException as e: logger.error(f"PDF download error: {e}") return f"Error downloading PDF: {e}" def read_paper(self, paper_id: str, save_path: str) -> str: """下载并提取论文文本 使用 PyMuPDF4LLM 提取 Markdown 格式。 Args: paper_id: 论文 ID save_path: 保存目录 Returns: 提取的文本内容或错误信息 """ # 先下载 PDF pdf_path = self.download_pdf(paper_id, save_path) if pdf_path.startswith("Error"): return pdf_path # 获取论文元数据 paper = self.get_paper_details(paper_id) try: text = pymupdf4llm.to_markdown(pdf_path, show_progress=False) logger.info(f"Extracted {len(text)} characters using PyMuPDF4LLM") if not text.strip(): return f"PDF downloaded to {pdf_path}, but no text could be extracted." # 添加元数据 metadata = "" if paper: metadata = f"# {paper.title}\n\n" metadata += f"**Authors**: {', '.join(paper.authors)}\n" metadata += f"**Published**: {paper.published_date}\n" metadata += f"**URL**: {paper.url}\n" metadata += f"**PDF**: {pdf_path}\n\n" metadata += "---\n\n" return metadata + text except Exception as e: logger.error(f"Failed to extract text: {e}") return f"Error extracting text: {e}" # ============================================================ # 测试代码 # ============================================================ if __name__ == "__main__": logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) searcher = SemanticSearcher() # 配置信息 print("=" * 60) print("SemanticSearcher Configuration") print("=" * 60) print(f"API Key: {'Configured' if searcher.api_key else 'Not set (shared rate limit)'}") print(f"PDF Extraction: PyMuPDF4LLM") # 测试搜索 print("\n" + "=" * 60) print("1. Testing search...") print("=" * 60) query = "machine learning" papers = searcher.search(query, max_results=3) print(f"Found {len(papers)} papers:") for i, paper in enumerate(papers, 1): print(f"\n{i}. {paper.title[:60]}...") print(f" Authors: {', '.join(paper.authors[:3])}{'...' if len(paper.authors) > 3 else ''}") print(f" DOI: {paper.doi or 'N/A'}") print(f" Citations: {paper.citations}") print(f" PDF: {'Available' if paper.pdf_url else 'Not available'}") # 测试获取详情 print("\n" + "=" * 60) print("2. Testing get_paper_details...") print("=" * 60) if papers: paper_id = papers[0].paper_id details = searcher.get_paper_details(paper_id) if details: print(f"Title: {details.title}") print(f"Abstract: {details.abstract[:200]}..." if details.abstract else "No abstract") print(f"PDF URL: {details.pdf_url or 'Not available'}") # 测试下载 PDF print("\n" + "=" * 60) print("3. Testing download_pdf...") print("=" * 60) if papers: # 找一个有 PDF URL 的论文 paper_with_pdf = None for p in papers: if p.pdf_url: paper_with_pdf = p break if paper_with_pdf: print(f"Downloading: {paper_with_pdf.title[:50]}...") print(f"PDF URL: {paper_with_pdf.pdf_url}") from pathlib import Path save_dir = str(Path.home() / "paper_downloads") result = searcher.download_pdf(paper_with_pdf.paper_id, save_dir) if result.startswith("Error"): print(f"❌ Download failed: {result}") else: print(f"✅ Downloaded to: {result}") else: print("No paper with available PDF found in search results") print("\n✅ All tests completed!")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/h-lu/paper-search-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server