We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/chestnutforty/mcp-metaculus'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
from typing import Annotated
from fastmcp import FastMCP
from datetime import datetime, timezone
import httpx
import os
import re
import json
from dotenv import load_dotenv
mcp = FastMCP(
name="metaculus",
instructions="Provides historical prediction data from Metaculus forecasting platform. Use this to retrieve question details, community predictions, and user comments for specific Metaculus questions."
)
@mcp.tool(exclude_args=["cutoff_date"])
async def get_metaculus_question_info(
question_url: Annotated[str, "The full Metaculus question URL (e.g., 'https://www.metaculus.com/questions/39771/')"],
cutoff_date: Annotated[str, "ISO format date (YYYY-MM-DD) - only return predictions made before this date"] = datetime.now().strftime("%Y-%m-%d")
) -> str:
"""Get Metaculus question information with historical predictions filtered by cutoff date.
Args:
question_url: Full Metaculus question URL
cutoff_date: Only return predictions before this date
Returns:
Formatted string with question details, predictions, and comments
"""
try:
load_dotenv()
# Extract post ID from URL
post_id_match = re.search(r'/questions/(\d+)', question_url)
if not post_id_match:
return f"Error: Could not extract post ID from URL: {question_url}. Expected format: https://www.metaculus.com/questions/[ID]"
post_id = post_id_match.group(1)
# Fetch question from Metaculus API
api_url = f"https://www.metaculus.com/api/posts/{post_id}/"
async with httpx.AsyncClient() as client:
response = await client.get(api_url, timeout=30.0)
if not response.is_success:
return f"Error fetching question from Metaculus API: {response.status_code} {response.text[:200]}"
data = response.json()
question = data.get("question")
if not question:
return f"Error: No question found in post {post_id}"
# Extract metadata
title = question.get("title", "N/A")
question_id = question.get("id", "N/A")
question_type = question.get("type", "N/A")
background_info = question.get("background_info", "")
resolution_criteria = question.get("resolution_criteria", "")
fine_print = question.get("fine_print", "")
# Extract and filter prediction history
aggregations = question.get("aggregations", {})
recency_weighted = aggregations.get("recency_weighted", {})
history = recency_weighted.get("history", [])
if not history:
return f"Error: No prediction history found for question {question_id}"
# Filter by cutoff date
cutoff_dt = datetime.fromisoformat(cutoff_date).replace(tzinfo=timezone.utc)
cutoff_timestamp = cutoff_dt.timestamp()
filtered_history = [
entry for entry in history
if entry.get("end_time") is not None and entry.get("end_time") <= cutoff_timestamp
]
if not filtered_history:
return f"No prediction history available before cutoff date {cutoff_date} for question {question_id}"
# Fetch comments using Firecrawl (optional)
comments_text = "N/A"
if firecrawl_key := os.environ.get("FIRECRAWL_API_KEY"):
try:
firecrawl_url = "https://api.firecrawl.dev/v2/scrape"
firecrawl_payload = {
"url": question_url if question_url.endswith('/') else f"{question_url}/",
"onlyMainContent": False,
"maxAge": 172800000,
"parsers": ["pdf"],
"formats": [{
"type": "json",
"schema": {
"type": "object",
"required": ["comments"],
"properties": {
"comments": {
"type": "array",
"items": {
"type": "object",
"required": ["content"],
"properties": {
"content": {"type": "string"},
"time_posted": {"type": "string"},
"upvotes": {"type": "number"},
"downvotes": {"type": "number"},
"changed_my_mind_votes": {"type": "number"},
"author": {"type": "string"}
}
}
}
}
}
}]
}
async with httpx.AsyncClient() as client:
firecrawl_response = await client.post(
firecrawl_url,
json=firecrawl_payload,
headers={
"Authorization": f"Bearer {firecrawl_key}",
"Content-Type": "application/json"
},
timeout=60.0
)
if firecrawl_response.is_success:
firecrawl_data = firecrawl_response.json()
if "data" in firecrawl_data and "json" in firecrawl_data["data"]:
raw_comments = firecrawl_data["data"]["json"].get("comments", "")
# Filter comments by cutoff_date if time_posted is available
if isinstance(raw_comments, list):
filtered_comments = []
for comment in raw_comments:
if "time_posted" in comment:
try:
comment_time = datetime.fromisoformat(comment["time_posted"].replace('Z', '+00:00'))
if comment_time <= cutoff_dt:
filtered_comments.append(comment)
except:
# If we can't parse date, include the comment to be safe
filtered_comments.append(comment)
else:
# No timestamp, include it
filtered_comments.append(comment)
comments_text = json.dumps(filtered_comments, indent=2) if filtered_comments else "No comments before cutoff date"
else:
comments_text = str(raw_comments)
except Exception as e:
comments_text = f"Error fetching comments: {str(e)}"
# Format output
parts = [
"--- Metaculus Question ---",
f"Title: {title}",
f"Question ID: {question_id}",
f"Post ID: {post_id}",
f"Type: {question_type}",
f"URL: {question_url}",
"",
"Background:",
background_info,
"",
"Resolution Criteria:",
resolution_criteria,
]
if fine_print and fine_print != "N/A":
parts.extend(["", "Fine Print:", fine_print])
# Add comments
if comments_text and comments_text != "N/A":
parts.extend(["", "User Comments:", comments_text])
parts.extend([
"",
f"Community Prediction History:",
"Recent predictions:"
])
# Show last 15 entries
recent_entries = filtered_history[-15:]
for entry in recent_entries:
start_dt = datetime.fromtimestamp(entry["start_time"], tz=timezone.utc)
end_dt = datetime.fromtimestamp(entry["end_time"], tz=timezone.utc)
forecaster_count = entry.get("forecaster_count", 0)
# Format prediction by question type
if question_type == "binary":
centers = entry.get("centers", [])
prediction = f"{centers[0]*100:.1f}%" if centers and centers[0] is not None else "N/A"
elif question_type in ["numeric", "date"]:
lower = entry.get("interval_lower_bounds")
upper = entry.get("interval_upper_bounds")
centers = entry.get("centers")
if centers and centers[0] is not None:
prediction = f"median: {centers[0]:.2f}"
if lower and upper and lower[0] is not None and upper[0] is not None:
prediction += f" (range: {lower[0]:.2f} - {upper[0]:.2f})"
else:
prediction = "N/A"
elif question_type == "multiple_choice":
centers = entry.get("centers", [])
prediction = f"options: {[f'{c*100:.1f}%' if c is not None else 'N/A' for c in centers]}" if centers else "N/A"
else:
prediction = str(entry.get("centers", "N/A"))
time_range = f"{start_dt.strftime('%Y-%m-%d %H:%M')} to {end_dt.strftime('%Y-%m-%d %H:%M')}"
parts.append(f" {time_range}: {prediction} ({forecaster_count} forecasters)")
# Mark most recent
if recent_entries:
parts[-1] += " [MOST RECENT]"
parts.append(f"\nTotal historical entries: {len(filtered_history)}")
return "\n".join(parts)
except ValueError as e:
return f"Error parsing date '{cutoff_date}'. Please use ISO format (YYYY-MM-DD): {str(e)}"
except Exception as e:
return f"Error retrieving Metaculus question history: {str(e)}"