#!/usr/bin/env python3
"""
Google Scholar MCP Server
This MCP server provides tools to search and extract information from Google Scholar
for research purposes using web scraping techniques.
"""
import json
import logging
import re
import time
import urllib.parse
from typing import Any, Dict, List, Optional
import requests
from bs4 import BeautifulSoup
from mcp.server.models import InitializationOptions
from mcp.server import NotificationOptions, Server
from mcp.types import (
Resource,
Tool,
TextContent,
ImageContent,
EmbeddedResource,
LoggingLevel
)
import mcp.types as types
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("google-scholar-mcp")
# Initialize the MCP server
server = Server("google-scholar-mcp")
# User-Agent to mimic a regular browser
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
class GoogleScholarScraper:
"""Google Scholar scraper with rate limiting and error handling"""
def __init__(self):
self.session = requests.Session()
self.session.headers.update({
'User-Agent': USER_AGENT,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
})
self.last_request_time = 0
self.request_delay = 5 # 5 seconds between requests to be more respectful
def _rate_limit(self):
"""Implement rate limiting to be respectful to Google Scholar"""
current_time = time.time()
time_since_last = current_time - self.last_request_time
if time_since_last < self.request_delay:
time.sleep(self.request_delay - time_since_last)
self.last_request_time = time.time()
def search_papers(self, query: str, num_results: int = 10, start_year: Optional[int] = None, end_year: Optional[int] = None) -> List[Dict[str, Any]]:
"""Search for papers on Google Scholar"""
self._rate_limit()
# Construct search URL
params = {
'q': query,
'hl': 'en',
'num': str(min(num_results, 20)) # Google Scholar limits results per page
}
# Add year range if specified
if start_year or end_year:
year_range = f"{start_year or ''}-{end_year or ''}"
params['as_ylo'] = str(start_year) if start_year else ''
params['as_yhi'] = str(end_year) if end_year else ''
url = f"https://scholar.google.com/scholar?{urllib.parse.urlencode(params)}"
try:
response = self.session.get(url, timeout=15)
response.raise_for_status()
# Check if we got blocked
if "blocked" in response.text.lower() or response.status_code == 429:
raise Exception("Google Scholar blocked the request. Please wait a few minutes and try simpler queries.")
soup = BeautifulSoup(response.content, 'html.parser')
papers = []
# Find all paper entries
for result in soup.find_all('div', {'class': 'gs_r gs_or gs_scl'}):
paper = self._extract_paper_info(result)
if paper:
papers.append(paper)
if not papers:
logger.warning("No papers found - might be blocked or no results")
return papers[:num_results]
except Exception as e:
error_msg = str(e)
if "Connection" in error_msg or "Remote" in error_msg:
error_msg = "Google Scholar connection failed. You may be temporarily blocked. Try again in a few minutes with simpler queries."
logger.error(f"Error searching Google Scholar: {error_msg}")
raise Exception(error_msg)
def _extract_paper_info(self, result_div) -> Optional[Dict[str, Any]]:
"""Extract paper information from a search result div"""
try:
paper = {}
# Title and link
title_elem = result_div.find('h3', {'class': 'gs_rt'})
if title_elem:
title_link = title_elem.find('a')
if title_link:
paper['title'] = title_link.get_text().strip()
paper['url'] = title_link.get('href', '')
else:
paper['title'] = title_elem.get_text().strip()
paper['url'] = ''
else:
return None
# Authors and publication info
author_elem = result_div.find('div', {'class': 'gs_a'})
if author_elem:
author_text = author_elem.get_text().strip()
paper['authors_and_publication'] = author_text
# Try to parse authors and year
parts = author_text.split(' - ')
if len(parts) >= 1:
paper['authors'] = parts[0].strip()
if len(parts) >= 2:
pub_info = parts[1].strip()
paper['publication_info'] = pub_info
# Extract year
year_match = re.search(r'\b(19|20)\d{2}\b', pub_info)
if year_match:
paper['year'] = int(year_match.group())
# Abstract/snippet
snippet_elem = result_div.find('div', {'class': 'gs_rs'})
if snippet_elem:
paper['snippet'] = snippet_elem.get_text().strip()
# Citation info
citation_elem = result_div.find('div', {'class': 'gs_fl'})
if citation_elem:
citation_links = citation_elem.find_all('a')
for link in citation_links:
text = link.get_text().strip()
if 'Cited by' in text:
cited_by_match = re.search(r'Cited by (\d+)', text)
if cited_by_match:
paper['cited_by'] = int(cited_by_match.group(1))
# PDF link
pdf_elem = result_div.find('div', {'class': 'gs_or_ggsm'})
if pdf_elem:
pdf_link = pdf_elem.find('a')
if pdf_link:
paper['pdf_url'] = pdf_link.get('href', '')
return paper
except Exception as e:
logger.error(f"Error extracting paper info: {e}")
return None
def get_paper_details(self, paper_url: str) -> Dict[str, Any]:
"""Get detailed information about a specific paper"""
self._rate_limit()
try:
response = self.session.get(paper_url, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
details = {}
# This would need to be customized based on the specific paper source
# For now, return basic info
details['url'] = paper_url
details['scraped_content'] = soup.get_text()[:1000] + "..."
return details
except Exception as e:
logger.error(f"Error getting paper details: {e}")
raise
# Initialize scraper
scraper = GoogleScholarScraper()
@server.list_tools()
async def handle_list_tools() -> List[Tool]:
"""List available tools for Google Scholar operations"""
return [
Tool(
name="search_papers",
description="Search for academic papers on Google Scholar",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query for papers (e.g., 'machine learning neural networks')"
},
"num_results": {
"type": "integer",
"description": "Number of results to return (default: 10, max: 20)",
"default": 10,
"minimum": 1,
"maximum": 20
},
"start_year": {
"type": "integer",
"description": "Earliest publication year to include (optional)"
},
"end_year": {
"type": "integer",
"description": "Latest publication year to include (optional)"
}
},
"required": ["query"]
}
),
Tool(
name="get_author_papers",
description="Search for papers by a specific author",
inputSchema={
"type": "object",
"properties": {
"author_name": {
"type": "string",
"description": "Name of the author to search for"
},
"num_results": {
"type": "integer",
"description": "Number of results to return (default: 10)",
"default": 10,
"minimum": 1,
"maximum": 20
}
},
"required": ["author_name"]
}
),
Tool(
name="search_recent_papers",
description="Search for recent papers in a specific field",
inputSchema={
"type": "object",
"properties": {
"field": {
"type": "string",
"description": "Research field or topic"
},
"years_back": {
"type": "integer",
"description": "How many years back to search (default: 2)",
"default": 2,
"minimum": 1,
"maximum": 10
},
"num_results": {
"type": "integer",
"description": "Number of results to return (default: 10)",
"default": 10
}
},
"required": ["field"]
}
),
Tool(
name="get_highly_cited_papers",
description="Search for highly cited papers in a topic",
inputSchema={
"type": "object",
"properties": {
"topic": {
"type": "string",
"description": "Research topic or field"
},
"min_citations": {
"type": "integer",
"description": "Minimum number of citations (default: 100)",
"default": 100
},
"num_results": {
"type": "integer",
"description": "Number of results to return (default: 10)",
"default": 10
}
},
"required": ["topic"]
}
)
]
@server.call_tool()
async def handle_call_tool(name: str, arguments: Dict[str, Any]) -> List[types.TextContent]:
"""Handle tool calls for Google Scholar operations"""
try:
if name == "search_papers":
query = arguments["query"]
num_results = arguments.get("num_results", 10)
start_year = arguments.get("start_year")
end_year = arguments.get("end_year")
papers = scraper.search_papers(query, num_results, start_year, end_year)
result = {
"search_query": query,
"filters": {
"start_year": start_year,
"end_year": end_year
},
"total_results": len(papers),
"papers": papers
}
return [types.TextContent(
type="text",
text=json.dumps(result, indent=2, ensure_ascii=False)
)]
elif name == "get_author_papers":
author_name = arguments["author_name"]
num_results = arguments.get("num_results", 10)
# Search for papers by author
query = f'author:"{author_name}"'
papers = scraper.search_papers(query, num_results)
result = {
"author": author_name,
"total_papers_found": len(papers),
"papers": papers
}
return [types.TextContent(
type="text",
text=json.dumps(result, indent=2, ensure_ascii=False)
)]
elif name == "search_recent_papers":
field = arguments["field"]
years_back = arguments.get("years_back", 2)
num_results = arguments.get("num_results", 10)
current_year = time.localtime().tm_year
start_year = current_year - years_back
papers = scraper.search_papers(field, num_results, start_year, current_year)
result = {
"field": field,
"time_range": f"{start_year}-{current_year}",
"total_results": len(papers),
"papers": papers
}
return [types.TextContent(
type="text",
text=json.dumps(result, indent=2, ensure_ascii=False)
)]
elif name == "get_highly_cited_papers":
topic = arguments["topic"]
min_citations = arguments.get("min_citations", 100)
num_results = arguments.get("num_results", 10)
# Search and filter by citations
papers = scraper.search_papers(topic, num_results * 2) # Get more results to filter
highly_cited = [p for p in papers if p.get("cited_by", 0) >= min_citations][:num_results]
result = {
"topic": topic,
"minimum_citations": min_citations,
"total_highly_cited": len(highly_cited),
"papers": highly_cited
}
return [types.TextContent(
type="text",
text=json.dumps(result, indent=2, ensure_ascii=False)
)]
else:
raise ValueError(f"Unknown tool: {name}")
except Exception as e:
error_msg = f"Error executing {name}: {str(e)}"
logger.error(error_msg)
return [types.TextContent(
type="text",
text=json.dumps({"error": error_msg}, indent=2)
)]
async def main():
"""Main function to run the MCP server"""
# Import here to avoid issues with event loop
from mcp.server.stdio import stdio_server
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="google-scholar-mcp",
server_version="1.0.0",
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)
if __name__ == "__main__":
import asyncio
asyncio.run(main())