Skip to main content
Glama

MCP Metaculus Server

metaculus_server.py10 kB
from typing import Annotated from fastmcp import FastMCP from datetime import datetime, timezone import httpx import os import re import json from dotenv import load_dotenv mcp = FastMCP( name="metaculus", instructions="Provides historical prediction data from Metaculus forecasting platform. Use this to retrieve question details, community predictions, and user comments for specific Metaculus questions." ) @mcp.tool(exclude_args=["cutoff_date"]) async def get_metaculus_question_info( question_url: Annotated[str, "The full Metaculus question URL (e.g., 'https://www.metaculus.com/questions/39771/')"], cutoff_date: Annotated[str, "ISO format date (YYYY-MM-DD) - only return predictions made before this date"] = datetime.now().strftime("%Y-%m-%d") ) -> str: """Get Metaculus question information with historical predictions filtered by cutoff date. Args: question_url: Full Metaculus question URL cutoff_date: Only return predictions before this date Returns: Formatted string with question details, predictions, and comments """ try: load_dotenv() # Extract post ID from URL post_id_match = re.search(r'/questions/(\d+)', question_url) if not post_id_match: return f"Error: Could not extract post ID from URL: {question_url}. Expected format: https://www.metaculus.com/questions/[ID]" post_id = post_id_match.group(1) # Fetch question from Metaculus API api_url = f"https://www.metaculus.com/api/posts/{post_id}/" async with httpx.AsyncClient() as client: response = await client.get(api_url, timeout=30.0) if not response.is_success: return f"Error fetching question from Metaculus API: {response.status_code} {response.text[:200]}" data = response.json() question = data.get("question") if not question: return f"Error: No question found in post {post_id}" # Extract metadata title = question.get("title", "N/A") question_id = question.get("id", "N/A") question_type = question.get("type", "N/A") background_info = question.get("background_info", "") resolution_criteria = question.get("resolution_criteria", "") fine_print = question.get("fine_print", "") # Extract and filter prediction history aggregations = question.get("aggregations", {}) recency_weighted = aggregations.get("recency_weighted", {}) history = recency_weighted.get("history", []) if not history: return f"Error: No prediction history found for question {question_id}" # Filter by cutoff date cutoff_dt = datetime.fromisoformat(cutoff_date).replace(tzinfo=timezone.utc) cutoff_timestamp = cutoff_dt.timestamp() filtered_history = [ entry for entry in history if entry.get("end_time") is not None and entry.get("end_time") <= cutoff_timestamp ] if not filtered_history: return f"No prediction history available before cutoff date {cutoff_date} for question {question_id}" # Fetch comments using Firecrawl (optional) comments_text = "N/A" if firecrawl_key := os.environ.get("FIRECRAWL_API_KEY"): try: firecrawl_url = "https://api.firecrawl.dev/v2/scrape" firecrawl_payload = { "url": question_url if question_url.endswith('/') else f"{question_url}/", "onlyMainContent": False, "maxAge": 172800000, "parsers": ["pdf"], "formats": [{ "type": "json", "schema": { "type": "object", "required": ["comments"], "properties": { "comments": { "type": "array", "items": { "type": "object", "required": ["content"], "properties": { "content": {"type": "string"}, "time_posted": {"type": "string"}, "upvotes": {"type": "number"}, "downvotes": {"type": "number"}, "changed_my_mind_votes": {"type": "number"}, "author": {"type": "string"} } } } } } }] } async with httpx.AsyncClient() as client: firecrawl_response = await client.post( firecrawl_url, json=firecrawl_payload, headers={ "Authorization": f"Bearer {firecrawl_key}", "Content-Type": "application/json" }, timeout=60.0 ) if firecrawl_response.is_success: firecrawl_data = firecrawl_response.json() if "data" in firecrawl_data and "json" in firecrawl_data["data"]: raw_comments = firecrawl_data["data"]["json"].get("comments", "") # Filter comments by cutoff_date if time_posted is available if isinstance(raw_comments, list): filtered_comments = [] for comment in raw_comments: if "time_posted" in comment: try: comment_time = datetime.fromisoformat(comment["time_posted"].replace('Z', '+00:00')) if comment_time <= cutoff_dt: filtered_comments.append(comment) except: # If we can't parse date, include the comment to be safe filtered_comments.append(comment) else: # No timestamp, include it filtered_comments.append(comment) comments_text = json.dumps(filtered_comments, indent=2) if filtered_comments else "No comments before cutoff date" else: comments_text = str(raw_comments) except Exception as e: comments_text = f"Error fetching comments: {str(e)}" # Format output parts = [ "--- Metaculus Question ---", f"Title: {title}", f"Question ID: {question_id}", f"Post ID: {post_id}", f"Type: {question_type}", f"URL: {question_url}", "", "Background:", background_info, "", "Resolution Criteria:", resolution_criteria, ] if fine_print and fine_print != "N/A": parts.extend(["", "Fine Print:", fine_print]) # Add comments if comments_text and comments_text != "N/A": parts.extend(["", "User Comments:", comments_text]) parts.extend([ "", f"Community Prediction History:", "Recent predictions:" ]) # Show last 15 entries recent_entries = filtered_history[-15:] for entry in recent_entries: start_dt = datetime.fromtimestamp(entry["start_time"], tz=timezone.utc) end_dt = datetime.fromtimestamp(entry["end_time"], tz=timezone.utc) forecaster_count = entry.get("forecaster_count", 0) # Format prediction by question type if question_type == "binary": centers = entry.get("centers", []) prediction = f"{centers[0]*100:.1f}%" if centers and centers[0] is not None else "N/A" elif question_type in ["numeric", "date"]: lower = entry.get("interval_lower_bounds") upper = entry.get("interval_upper_bounds") centers = entry.get("centers") if centers and centers[0] is not None: prediction = f"median: {centers[0]:.2f}" if lower and upper and lower[0] is not None and upper[0] is not None: prediction += f" (range: {lower[0]:.2f} - {upper[0]:.2f})" else: prediction = "N/A" elif question_type == "multiple_choice": centers = entry.get("centers", []) prediction = f"options: {[f'{c*100:.1f}%' if c is not None else 'N/A' for c in centers]}" if centers else "N/A" else: prediction = str(entry.get("centers", "N/A")) time_range = f"{start_dt.strftime('%Y-%m-%d %H:%M')} to {end_dt.strftime('%Y-%m-%d %H:%M')}" parts.append(f" {time_range}: {prediction} ({forecaster_count} forecasters)") # Mark most recent if recent_entries: parts[-1] += " [MOST RECENT]" parts.append(f"\nTotal historical entries: {len(filtered_history)}") return "\n".join(parts) except ValueError as e: return f"Error parsing date '{cutoff_date}'. Please use ISO format (YYYY-MM-DD): {str(e)}" except Exception as e: return f"Error retrieving Metaculus question history: {str(e)}"

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/chestnutforty/mcp-metaculus'

If you have feedback or need assistance with the MCP directory API, please join our Discord server