metaculus_server.py•10 kB
from typing import Annotated
from fastmcp import FastMCP
from datetime import datetime, timezone
import httpx
import os
import re
import json
from dotenv import load_dotenv
mcp = FastMCP(
name="metaculus",
instructions="Provides historical prediction data from Metaculus forecasting platform. Use this to retrieve question details, community predictions, and user comments for specific Metaculus questions."
)
@mcp.tool(exclude_args=["cutoff_date"])
async def get_metaculus_question_info(
question_url: Annotated[str, "The full Metaculus question URL (e.g., 'https://www.metaculus.com/questions/39771/')"],
cutoff_date: Annotated[str, "ISO format date (YYYY-MM-DD) - only return predictions made before this date"] = datetime.now().strftime("%Y-%m-%d")
) -> str:
"""Get Metaculus question information with historical predictions filtered by cutoff date.
Args:
question_url: Full Metaculus question URL
cutoff_date: Only return predictions before this date
Returns:
Formatted string with question details, predictions, and comments
"""
try:
load_dotenv()
# Extract post ID from URL
post_id_match = re.search(r'/questions/(\d+)', question_url)
if not post_id_match:
return f"Error: Could not extract post ID from URL: {question_url}. Expected format: https://www.metaculus.com/questions/[ID]"
post_id = post_id_match.group(1)
# Fetch question from Metaculus API
api_url = f"https://www.metaculus.com/api/posts/{post_id}/"
async with httpx.AsyncClient() as client:
response = await client.get(api_url, timeout=30.0)
if not response.is_success:
return f"Error fetching question from Metaculus API: {response.status_code} {response.text[:200]}"
data = response.json()
question = data.get("question")
if not question:
return f"Error: No question found in post {post_id}"
# Extract metadata
title = question.get("title", "N/A")
question_id = question.get("id", "N/A")
question_type = question.get("type", "N/A")
background_info = question.get("background_info", "")
resolution_criteria = question.get("resolution_criteria", "")
fine_print = question.get("fine_print", "")
# Extract and filter prediction history
aggregations = question.get("aggregations", {})
recency_weighted = aggregations.get("recency_weighted", {})
history = recency_weighted.get("history", [])
if not history:
return f"Error: No prediction history found for question {question_id}"
# Filter by cutoff date
cutoff_dt = datetime.fromisoformat(cutoff_date).replace(tzinfo=timezone.utc)
cutoff_timestamp = cutoff_dt.timestamp()
filtered_history = [
entry for entry in history
if entry.get("end_time") is not None and entry.get("end_time") <= cutoff_timestamp
]
if not filtered_history:
return f"No prediction history available before cutoff date {cutoff_date} for question {question_id}"
# Fetch comments using Firecrawl (optional)
comments_text = "N/A"
if firecrawl_key := os.environ.get("FIRECRAWL_API_KEY"):
try:
firecrawl_url = "https://api.firecrawl.dev/v2/scrape"
firecrawl_payload = {
"url": question_url if question_url.endswith('/') else f"{question_url}/",
"onlyMainContent": False,
"maxAge": 172800000,
"parsers": ["pdf"],
"formats": [{
"type": "json",
"schema": {
"type": "object",
"required": ["comments"],
"properties": {
"comments": {
"type": "array",
"items": {
"type": "object",
"required": ["content"],
"properties": {
"content": {"type": "string"},
"time_posted": {"type": "string"},
"upvotes": {"type": "number"},
"downvotes": {"type": "number"},
"changed_my_mind_votes": {"type": "number"},
"author": {"type": "string"}
}
}
}
}
}
}]
}
async with httpx.AsyncClient() as client:
firecrawl_response = await client.post(
firecrawl_url,
json=firecrawl_payload,
headers={
"Authorization": f"Bearer {firecrawl_key}",
"Content-Type": "application/json"
},
timeout=60.0
)
if firecrawl_response.is_success:
firecrawl_data = firecrawl_response.json()
if "data" in firecrawl_data and "json" in firecrawl_data["data"]:
raw_comments = firecrawl_data["data"]["json"].get("comments", "")
# Filter comments by cutoff_date if time_posted is available
if isinstance(raw_comments, list):
filtered_comments = []
for comment in raw_comments:
if "time_posted" in comment:
try:
comment_time = datetime.fromisoformat(comment["time_posted"].replace('Z', '+00:00'))
if comment_time <= cutoff_dt:
filtered_comments.append(comment)
except:
# If we can't parse date, include the comment to be safe
filtered_comments.append(comment)
else:
# No timestamp, include it
filtered_comments.append(comment)
comments_text = json.dumps(filtered_comments, indent=2) if filtered_comments else "No comments before cutoff date"
else:
comments_text = str(raw_comments)
except Exception as e:
comments_text = f"Error fetching comments: {str(e)}"
# Format output
parts = [
"--- Metaculus Question ---",
f"Title: {title}",
f"Question ID: {question_id}",
f"Post ID: {post_id}",
f"Type: {question_type}",
f"URL: {question_url}",
"",
"Background:",
background_info,
"",
"Resolution Criteria:",
resolution_criteria,
]
if fine_print and fine_print != "N/A":
parts.extend(["", "Fine Print:", fine_print])
# Add comments
if comments_text and comments_text != "N/A":
parts.extend(["", "User Comments:", comments_text])
parts.extend([
"",
f"Community Prediction History:",
"Recent predictions:"
])
# Show last 15 entries
recent_entries = filtered_history[-15:]
for entry in recent_entries:
start_dt = datetime.fromtimestamp(entry["start_time"], tz=timezone.utc)
end_dt = datetime.fromtimestamp(entry["end_time"], tz=timezone.utc)
forecaster_count = entry.get("forecaster_count", 0)
# Format prediction by question type
if question_type == "binary":
centers = entry.get("centers", [])
prediction = f"{centers[0]*100:.1f}%" if centers and centers[0] is not None else "N/A"
elif question_type in ["numeric", "date"]:
lower = entry.get("interval_lower_bounds")
upper = entry.get("interval_upper_bounds")
centers = entry.get("centers")
if centers and centers[0] is not None:
prediction = f"median: {centers[0]:.2f}"
if lower and upper and lower[0] is not None and upper[0] is not None:
prediction += f" (range: {lower[0]:.2f} - {upper[0]:.2f})"
else:
prediction = "N/A"
elif question_type == "multiple_choice":
centers = entry.get("centers", [])
prediction = f"options: {[f'{c*100:.1f}%' if c is not None else 'N/A' for c in centers]}" if centers else "N/A"
else:
prediction = str(entry.get("centers", "N/A"))
time_range = f"{start_dt.strftime('%Y-%m-%d %H:%M')} to {end_dt.strftime('%Y-%m-%d %H:%M')}"
parts.append(f" {time_range}: {prediction} ({forecaster_count} forecasters)")
# Mark most recent
if recent_entries:
parts[-1] += " [MOST RECENT]"
parts.append(f"\nTotal historical entries: {len(filtered_history)}")
return "\n".join(parts)
except ValueError as e:
return f"Error parsing date '{cutoff_date}'. Please use ISO format (YYYY-MM-DD): {str(e)}"
except Exception as e:
return f"Error retrieving Metaculus question history: {str(e)}"