server.py•27.7 kB
"""AI Research MCP Server - Main server implementation."""
import os
import asyncio
from datetime import datetime, timezone
from typing import Any, List, Dict, Optional
from mcp.server import Server
from mcp.types import (
Tool,
TextContent,
Resource,
ResourceTemplate,
)
from mcp.server.stdio import stdio_server
from .data_sources import (
ArxivClient,
GithubClient,
HuggingFaceClient,
PapersWithCodeClient,
)
from .utils import CacheManager
class AIResearchServer:
"""MCP Server for AI research tracking."""
def __init__(self):
"""Initialize the server."""
self.server = Server("ai-research-mcp")
self.cache = CacheManager(os.getenv("CACHE_DIR", ".cache"))
# Initialize clients
self.arxiv = ArxivClient()
self.github = GithubClient(os.getenv("GITHUB_TOKEN"))
self.huggingface = HuggingFaceClient()
self.papers_with_code = PapersWithCodeClient()
# Cache expiry times (in seconds)
self.cache_expiry = {
"github": int(os.getenv("CACHE_EXPIRY_GITHUB", "3600")),
"arxiv": int(os.getenv("CACHE_EXPIRY_ARXIV", "7200")),
"summary": int(os.getenv("CACHE_EXPIRY_SUMMARY", "86400")),
}
self._setup_handlers()
def _setup_handlers(self):
"""Setup MCP handlers."""
@self.server.list_tools()
async def list_tools() -> List[Tool]:
"""List available tools."""
return [
Tool(
name="search_latest_papers",
description="Search for latest AI/ML research papers from multiple sources (arXiv, Papers with Code, Hugging Face)",
inputSchema={
"type": "object",
"properties": {
"keywords": {
"type": "array",
"items": {"type": "string"},
"description": "Keywords to search for (e.g., ['LLM', 'multimodal'])",
},
"days": {
"type": "integer",
"description": "Number of days to look back (1-30)",
"default": 7,
},
"sources": {
"type": "array",
"items": {
"type": "string",
"enum": ["arxiv", "papers_with_code", "huggingface"],
},
"description": "Data sources to search (default: all)",
},
"max_results": {
"type": "integer",
"description": "Maximum number of results per source",
"default": 20,
},
},
},
),
Tool(
name="search_github_repos",
description="Search for trending AI/ML GitHub repositories",
inputSchema={
"type": "object",
"properties": {
"keywords": {
"type": "array",
"items": {"type": "string"},
"description": "Keywords to search for",
},
"topics": {
"type": "array",
"items": {"type": "string"},
"description": "GitHub topics to filter by (e.g., ['llm', 'transformer'])",
},
"min_stars": {
"type": "integer",
"description": "Minimum number of stars",
"default": 50,
},
"days": {
"type": "integer",
"description": "Look for repos updated in last N days",
"default": 30,
},
"max_results": {
"type": "integer",
"description": "Maximum number of results",
"default": 25,
},
},
},
),
Tool(
name="get_daily_papers",
description="Get today's featured AI papers from Hugging Face",
inputSchema={
"type": "object",
"properties": {
"days": {
"type": "integer",
"description": "Number of days to look back (1-7)",
"default": 1,
},
},
},
),
Tool(
name="get_trending_repos",
description="Get trending AI/ML repositories on GitHub",
inputSchema={
"type": "object",
"properties": {
"period": {
"type": "string",
"enum": ["daily", "weekly", "monthly"],
"description": "Time period for trending",
"default": "weekly",
},
"language": {
"type": "string",
"description": "Filter by programming language",
},
"max_results": {
"type": "integer",
"description": "Maximum number of results",
"default": 25,
},
},
},
),
Tool(
name="get_trending_models",
description="Get trending AI models from Hugging Face",
inputSchema={
"type": "object",
"properties": {
"task": {
"type": "string",
"description": "Filter by task (e.g., 'text-generation', 'image-classification')",
},
"sort": {
"type": "string",
"enum": ["downloads", "likes", "trending", "created"],
"description": "Sort criterion",
"default": "downloads",
},
"limit": {
"type": "integer",
"description": "Maximum number of results",
"default": 30,
},
},
},
),
Tool(
name="search_by_area",
description="Search papers and repos by research area (llm, vision, robotics, bioinfo, etc.)",
inputSchema={
"type": "object",
"properties": {
"area": {
"type": "string",
"description": "Research area: llm, vision, robotics, bioinfo, rl, graph, etc.",
},
"days": {
"type": "integer",
"description": "Number of days to look back",
"default": 7,
},
"include_papers": {
"type": "boolean",
"description": "Include papers from arXiv",
"default": True,
},
"include_repos": {
"type": "boolean",
"description": "Include GitHub repositories",
"default": True,
},
},
"required": ["area"],
},
),
Tool(
name="generate_daily_summary",
description="Generate a comprehensive daily summary of AI research activity",
inputSchema={
"type": "object",
"properties": {
"include_papers": {
"type": "boolean",
"description": "Include papers section",
"default": True,
},
"include_repos": {
"type": "boolean",
"description": "Include GitHub repos section",
"default": True,
},
"include_models": {
"type": "boolean",
"description": "Include Hugging Face models section",
"default": True,
},
},
},
),
Tool(
name="generate_weekly_summary",
description="Generate a comprehensive weekly summary of AI research activity",
inputSchema={
"type": "object",
"properties": {
"include_papers": {
"type": "boolean",
"description": "Include papers section",
"default": True,
},
"include_repos": {
"type": "boolean",
"description": "Include GitHub repos section",
"default": True,
},
"include_models": {
"type": "boolean",
"description": "Include Hugging Face models section",
"default": True,
},
},
},
),
]
@self.server.call_tool()
async def call_tool(name: str, arguments: Any) -> List[TextContent]:
"""Handle tool calls."""
try:
if name == "search_latest_papers":
result = await self._search_latest_papers(**arguments)
elif name == "search_github_repos":
result = await self._search_github_repos(**arguments)
elif name == "get_daily_papers":
result = await self._get_daily_papers(**arguments)
elif name == "get_trending_repos":
result = await self._get_trending_repos(**arguments)
elif name == "get_trending_models":
result = await self._get_trending_models(**arguments)
elif name == "search_by_area":
result = await self._search_by_area(**arguments)
elif name == "generate_daily_summary":
result = await self._generate_daily_summary(**arguments)
elif name == "generate_weekly_summary":
result = await self._generate_weekly_summary(**arguments)
else:
raise ValueError(f"Unknown tool: {name}")
return [TextContent(type="text", text=result)]
except Exception as e:
error_msg = f"Error executing {name}: {str(e)}"
return [TextContent(type="text", text=error_msg)]
@self.server.list_resources()
async def list_resources() -> List[Resource]:
"""List available resources."""
return [
Resource(
uri="ai-research://daily-summary",
name="Daily AI Research Summary",
mimeType="text/markdown",
description="Today's AI research summary including papers, repos, and models",
),
Resource(
uri="ai-research://weekly-summary",
name="Weekly AI Research Summary",
mimeType="text/markdown",
description="This week's AI research summary",
),
]
@self.server.read_resource()
async def read_resource(uri: str) -> str:
"""Read a resource."""
if uri == "ai-research://daily-summary":
# Try cache first
cached = self.cache.get("daily-summary", self.cache_expiry["summary"])
if cached:
return cached
# Generate new summary
summary = await self._generate_daily_summary()
self.cache.set("daily-summary", summary)
return summary
elif uri == "ai-research://weekly-summary":
cached = self.cache.get("weekly-summary", self.cache_expiry["summary"])
if cached:
return cached
summary = await self._generate_weekly_summary()
self.cache.set("weekly-summary", summary)
return summary
else:
raise ValueError(f"Unknown resource: {uri}")
async def _search_latest_papers(
self,
keywords: Optional[List[str]] = None,
days: int = 7,
sources: Optional[List[str]] = None,
max_results: int = 20,
) -> str:
"""Search for latest papers."""
if sources is None:
sources = ["arxiv", "papers_with_code", "huggingface"]
all_papers = []
# Search arXiv
if "arxiv" in sources:
cache_key = f"arxiv_{keywords}_{days}"
cached = self.cache.get(cache_key, self.cache_expiry["arxiv"])
if cached:
all_papers.extend(cached)
else:
papers = await asyncio.to_thread(
self.arxiv.search_papers,
keywords=keywords,
days=days,
max_results=max_results,
)
self.cache.set(cache_key, papers)
all_papers.extend(papers)
# Search Papers with Code
if "papers_with_code" in sources:
cache_key = f"pwc_{keywords}_{days}"
cached = self.cache.get(cache_key, self.cache_expiry["arxiv"])
if cached:
all_papers.extend(cached)
else:
papers = await asyncio.to_thread(
self.papers_with_code.get_latest_papers,
days=days,
items_per_page=max_results,
)
self.cache.set(cache_key, papers)
all_papers.extend(papers)
# Get Hugging Face daily papers
if "huggingface" in sources:
cache_key = f"hf_daily_{days}"
cached = self.cache.get(cache_key, self.cache_expiry["arxiv"])
if cached:
all_papers.extend(cached)
else:
papers = await asyncio.to_thread(
self.huggingface.get_daily_papers,
days=min(days, 7),
)
self.cache.set(cache_key, papers)
all_papers.extend(papers)
# Format results
return self._format_papers(all_papers, keywords)
async def _search_github_repos(
self,
keywords: Optional[List[str]] = None,
topics: Optional[List[str]] = None,
min_stars: int = 50,
days: int = 30,
max_results: int = 25,
) -> str:
"""Search GitHub repositories."""
cache_key = f"github_{keywords}_{topics}_{min_stars}_{days}"
cached = self.cache.get(cache_key, self.cache_expiry["github"])
if cached:
repos = cached
else:
repos = await asyncio.to_thread(
self.github.search_repositories,
keywords=keywords,
topics=topics,
min_stars=min_stars,
pushed_since=f"{days}d",
max_results=max_results,
)
self.cache.set(cache_key, repos)
return self._format_repos(repos)
async def _get_daily_papers(self, days: int = 1) -> str:
"""Get daily featured papers from Hugging Face."""
cache_key = f"hf_daily_{days}"
cached = self.cache.get(cache_key, 3600 * 12) # 12 hour cache
if cached:
papers = cached
else:
papers = await asyncio.to_thread(
self.huggingface.get_daily_papers,
days=days,
)
self.cache.set(cache_key, papers)
return self._format_papers(papers)
async def _get_trending_repos(
self,
period: str = "weekly",
language: Optional[str] = None,
max_results: int = 25,
) -> str:
"""Get trending repositories."""
cache_key = f"github_trending_{period}_{language}"
cached = self.cache.get(cache_key, 3600) # 1 hour cache
if cached:
repos = cached
else:
repos = await asyncio.to_thread(
self.github.get_trending_repositories,
period=period,
language=language,
max_results=max_results,
)
self.cache.set(cache_key, repos)
return self._format_repos(repos)
async def _get_trending_models(
self,
task: Optional[str] = None,
sort: str = "downloads",
limit: int = 30,
) -> str:
"""Get trending models from Hugging Face."""
cache_key = f"hf_models_{task}_{sort}"
cached = self.cache.get(cache_key, 3600)
if cached:
models = cached
else:
models = await asyncio.to_thread(
self.huggingface.get_trending_models,
task=task,
sort=sort,
limit=limit,
)
self.cache.set(cache_key, models)
return self._format_models(models)
async def _search_by_area(
self,
area: str,
days: int = 7,
include_papers: bool = True,
include_repos: bool = True,
) -> str:
"""Search by research area."""
results = []
if include_papers:
papers = await asyncio.to_thread(
self.arxiv.get_latest_by_area,
area=area,
days=days,
)
results.append(f"## Papers ({len(papers)})\n\n{self._format_papers(papers)}")
if include_repos:
repos = await asyncio.to_thread(
self.github.search_by_area,
area=area,
days=days,
)
results.append(f"## Repositories ({len(repos)})\n\n{self._format_repos(repos)}")
return f"# AI Research: {area.upper()}\n\n" + "\n\n".join(results)
async def _generate_daily_summary(
self,
include_papers: bool = True,
include_repos: bool = True,
include_models: bool = True,
) -> str:
"""Generate daily summary."""
sections = []
sections.append(f"# AI Research Daily Summary\n*Generated: {datetime.now().strftime('%Y-%m-%d %H:%M')}*\n")
if include_papers:
# Get papers from multiple sources
hf_papers = await asyncio.to_thread(self.huggingface.get_daily_papers, days=1)
arxiv_papers = await asyncio.to_thread(self.arxiv.get_latest_papers, days=1, max_results=20)
all_papers = hf_papers + arxiv_papers
sections.append(f"## 📄 Today's Featured Papers ({len(all_papers)})\n\n{self._format_papers(all_papers[:15])}")
if include_repos:
repos = await asyncio.to_thread(self.github.get_trending_repositories, period="daily")
sections.append(f"## 🔥 Trending Repositories ({len(repos)})\n\n{self._format_repos(repos[:10])}")
if include_models:
models = await asyncio.to_thread(self.huggingface.get_llm_models, limit=15)
sections.append(f"## 🤖 Popular Models ({len(models)})\n\n{self._format_models(models[:10])}")
return "\n\n".join(sections)
async def _generate_weekly_summary(
self,
include_papers: bool = True,
include_repos: bool = True,
include_models: bool = True,
) -> str:
"""Generate weekly summary."""
sections = []
sections.append(f"# AI Research Weekly Summary\n*Generated: {datetime.now().strftime('%Y-%m-%d %H:%M')}*\n")
if include_papers:
hf_papers = await asyncio.to_thread(self.huggingface.get_daily_papers, days=7)
arxiv_papers = await asyncio.to_thread(self.arxiv.get_latest_papers, days=7, max_results=50)
pwc_papers = await asyncio.to_thread(self.papers_with_code.get_latest_papers, days=7)
all_papers = hf_papers + arxiv_papers + pwc_papers
sections.append(f"## 📄 This Week's Papers ({len(all_papers)})\n\n{self._format_papers(all_papers[:30])}")
if include_repos:
repos = await asyncio.to_thread(self.github.get_trending_repositories, period="weekly", max_results=30)
sections.append(f"## 🔥 Trending Repositories ({len(repos)})\n\n{self._format_repos(repos[:20])}")
if include_models:
models = await asyncio.to_thread(self.huggingface.get_recent_models, days=7, limit=25)
sections.append(f"## 🤖 New & Updated Models ({len(models)})\n\n{self._format_models(models[:15])}")
return "\n\n".join(sections)
def _format_papers(self, papers: List[Dict], keywords: Optional[List[str]] = None) -> str:
"""Format papers as markdown."""
if not papers:
return "*No papers found.*"
# Deduplicate by title
seen_titles = set()
unique_papers = []
for paper in papers:
title = paper.get("title", "").lower()
if title and title not in seen_titles:
seen_titles.add(title)
unique_papers.append(paper)
# Sort by date (most recent first)
def get_date(paper):
date_str = paper.get("published") or paper.get("updated") or ""
try:
dt = datetime.fromisoformat(date_str.replace("Z", "+00:00"))
# Ensure timezone-aware
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
except:
return datetime.min.replace(tzinfo=timezone.utc)
unique_papers.sort(key=get_date, reverse=True)
lines = []
for i, paper in enumerate(unique_papers, 1):
title = paper.get("title", "Untitled")
authors = paper.get("authors", [])
author_str = authors[0] if authors else "Unknown"
if len(authors) > 1:
author_str += " et al."
url = paper.get("url", "")
source = paper.get("source", "unknown")
published = paper.get("published", "")[:10] # Just the date
lines.append(f"### {i}. [{title}]({url})")
lines.append(f"*{author_str} • {published} • {source}*")
# Add summary/abstract (truncated)
summary = paper.get("summary") or paper.get("abstract", "")
if summary:
summary = summary.replace("\n", " ")[:200] + "..."
lines.append(f"\n{summary}")
# Add GitHub link if available
github_url = paper.get("github_url")
if github_url:
stars = paper.get("stars", 0)
lines.append(f"\n💻 [Code]({github_url}) {'⭐ ' + str(stars) if stars > 0 else ''}")
lines.append("") # Empty line between papers
return "\n".join(lines)
def _format_repos(self, repos: List[Dict]) -> str:
"""Format repositories as markdown."""
if not repos:
return "*No repositories found.*"
lines = []
for i, repo in enumerate(repos, 1):
name = repo.get("full_name", "Unknown")
description = repo.get("description", "No description")
url = repo.get("url", "")
stars = repo.get("stars", 0)
language = repo.get("language", "")
topics = repo.get("topics", [])
lines.append(f"### {i}. [{name}]({url})")
lines.append(f"⭐ {stars:,} • {language}")
lines.append(f"\n{description}")
if topics:
topic_tags = " ".join(f"`{t}`" for t in topics[:5])
lines.append(f"\n{topic_tags}")
lines.append("")
return "\n".join(lines)
def _format_models(self, models: List[Dict]) -> str:
"""Format models as markdown."""
if not models:
return "*No models found.*"
lines = []
for i, model in enumerate(models, 1):
model_id = model.get("id", "Unknown")
url = model.get("url", "")
downloads = model.get("downloads", 0)
likes = model.get("likes", 0)
task = model.get("pipeline_tag", "")
lines.append(f"### {i}. [{model_id}]({url})")
lines.append(f"📥 {downloads:,} downloads • ❤️ {likes} likes")
if task:
lines.append(f"Task: `{task}`")
lines.append("")
return "\n".join(lines)
async def run(self):
"""Run the server."""
async with stdio_server() as (read_stream, write_stream):
await self.server.run(
read_stream,
write_stream,
self.server.create_initialization_options(),
)
def main():
"""Main entry point."""
import asyncio
server = AIResearchServer()
asyncio.run(server.run())
if __name__ == "__main__":
main()