coingecko_server.py•84.7 kB
"""CoinGecko MCP Server
This server provides tools for interacting with the CoinGecko API to get information
about cryptocurrencies, exchanges, and market data.
Based on CoinGecko API Pro documentation: https://docs.coingecko.com/reference/introduction
"""
from __future__ import annotations
from typing import Any, Dict, List, Optional
import urllib.parse
import json
import sys
import traceback
import os
import time
from datetime import datetime
from dotenv import load_dotenv
from mcp.server import FastMCP
# Load environment variables from .env file
load_dotenv()
# ---------------------------------------------------------------------------
# Constants & configuration
# ---------------------------------------------------------------------------
# Initialize MCP server
mcp = FastMCP()
# CoinGecko API configuration - using demo API (no key required)
BASE_URL = "https://api.coingecko.com/api/v3"
API_KEY = None
# Debug mode - set to True for detailed logging
DEBUG = True # Will be loaded from .env
# ---------------------------------------------------------------------------
# Helper functions
# ---------------------------------------------------------------------------
async def fetch_json(url: str, headers: dict = None, params: dict = None) -> dict:
"""Fetch JSON data from a URL with optional headers and parameters.
Args:
url: URL to fetch data from
headers: Optional headers to include in the request
params: Optional query parameters
Returns:
dict: JSON response data or error dict
"""
import aiohttp
import asyncio
# Debug info
print(f"DEBUG - Fetching URL: {url}", file=sys.stderr)
if params:
print(f"DEBUG - With params: {params}", file=sys.stderr)
if headers:
print(f"DEBUG - With headers: {headers}", file=sys.stderr)
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers, params=params, timeout=30) as response:
if response.status == 200:
return await response.json()
else:
error_text = await response.text()
print(f"ERROR - HTTP {response.status}: {error_text}", file=sys.stderr)
return {"error": f"HTTP {response.status}: {error_text}"}
except asyncio.TimeoutError:
print(f"ERROR - Request timed out: {url}", file=sys.stderr)
return {"error": "Request timed out"}
except Exception as e:
print(f"ERROR - Exception in fetch_json: {str(e)}", file=sys.stderr)
return {"error": str(e)}
async def fetch_coingecko_json(endpoint: str, params: dict = None) -> dict:
"""Fungsi helper untuk mengambil data JSON dari CoinGecko API dengan API key.
Args:
endpoint: Endpoint API CoinGecko (tanpa base URL)
params: Parameter query tambahan
Returns:
API response as dictionary
"""
if not params:
params = {}
# Standard headers for demo API
headers = {
"Accept": "application/json",
"User-Agent": "MCP-CoinGecko-Client/1.0"
}
# Build full URL
url = f"{BASE_URL}{endpoint}" if endpoint.startswith('/') else f"{BASE_URL}/{endpoint}"
if DEBUG:
print(f"DEBUG - CoinGecko API request: {url}", file=sys.stderr)
try:
# Fetch data using common helper
data = await fetch_json(url, headers=headers, params=params)
# Handle rate limiting for demo API
if "error" in data and ("rate limit" in str(data["error"]).lower() or "429" in str(data)):
print("WARNING - Rate limit hit, waiting 60 seconds", file=sys.stderr)
time.sleep(60)
return await fetch_coingecko_json(endpoint, params)
if "error" in data and DEBUG:
print(f"DEBUG - API Error: {data['error']}", file=sys.stderr)
return data
except Exception as e:
print(f"ERROR in fetch_coingecko_json: {str(e)}", file=sys.stderr)
return {"error": str(e)}
# ---------------------------------------------------------------------------
# Coin API endpoints
# ---------------------------------------------------------------------------
# Dokumentasi: https://docs.coingecko.com/reference/coins-markets
@mcp.tool("get_coin_markets")
async def get_coin_markets(vs_currency: str = "usd", category: str = None, order: str = "market_cap_desc", per_page: int = 20, page: int = 1) -> str:
"""Get list of coins with market data.
Args:
vs_currency: The target currency of market data (usd, eur, jpy, etc.)
category: Filter by coin category
order: Sort results by field (market_cap_desc, volume_desc, id_asc, etc.)
per_page: Number of results per page (1-250)
page: Page number
"""
try:
# Validasi parameter
if per_page < 1 or per_page > 250:
return "❌ Error: per_page must be between 1 and 250"
# Set parameters
params = {
"vs_currency": vs_currency,
"order": order,
"per_page": per_page,
"page": page,
"sparkline": "false"
}
# Add optional parameters if provided
if category:
params["category"] = category
# Fetch data
data = await fetch_coingecko_json("/coins/markets", params)
if "error" in data:
return f"❌ Error fetching coin markets: {data['error']}"
if not isinstance(data, list):
return f"❌ Invalid response format: {data}"
if not data:
return "No coin market data found for the specified parameters"
# Format response as markdown
result = f"## Coin Markets (vs {vs_currency.upper()})\n\n"
# Add category info if provided
if category:
result += f"Category: {category}\n\n"
# Format as table
result += "| # | Coin | Symbol | Price | 24h % | 7d % | Market Cap | Volume (24h) |\n"
result += "|---|------|--------|-------|------|------|------------|----------------|\n"
for i, coin in enumerate(data):
name = coin.get("name", "N/A")
symbol = coin.get("symbol", "N/A").upper()
price = coin.get("current_price", 0)
price_change_24h = coin.get("price_change_percentage_24h", 0)
price_change_7d = coin.get("price_change_percentage_7d_in_currency", 0)
market_cap = coin.get("market_cap", 0)
volume = coin.get("total_volume", 0)
# Format values with appropriate precision
if price is not None:
if price < 0.01 and price > 0:
price_str = f"{price:.8f}"
else:
price_str = f"{price:,.2f}"
else:
price_str = "N/A"
# Format price changes with arrows
if price_change_24h is not None:
arrow_24h = "⬆" if price_change_24h > 0 else "⬇" if price_change_24h < 0 else ""
price_change_24h_str = f"{arrow_24h} {price_change_24h:+.2f}%"
else:
price_change_24h_str = "N/A"
if price_change_7d is not None:
arrow_7d = "⬆" if price_change_7d > 0 else "⬇" if price_change_7d < 0 else ""
price_change_7d_str = f"{arrow_7d} {price_change_7d:+.2f}%"
else:
price_change_7d_str = "N/A"
# Format market cap and volume with appropriate units
if market_cap is not None:
if market_cap >= 1_000_000_000:
market_cap_str = f"${market_cap/1_000_000_000:.2f}B"
elif market_cap >= 1_000_000:
market_cap_str = f"${market_cap/1_000_000:.2f}M"
else:
market_cap_str = f"${market_cap:,.0f}"
else:
market_cap_str = "N/A"
if volume is not None:
if volume >= 1_000_000_000:
volume_str = f"${volume/1_000_000_000:.2f}B"
elif volume >= 1_000_000:
volume_str = f"${volume/1_000_000:.2f}M"
else:
volume_str = f"${volume:,.0f}"
else:
volume_str = "N/A"
result += f"| {i+1} | {name} | {symbol} | ${price_str} | {price_change_24h_str} | {price_change_7d_str} | {market_cap_str} | {volume_str} |\n"
# Add pagination info
result += f"\n*Page {page}, showing {len(data)} results per page*\n"
return result
except Exception as e:
return f"❌ Error fetching coin markets: {str(e)}"
# Dokumentasi: https://docs.coingecko.com/reference/ping-server
@mcp.tool("ping")
async def ping() -> str:
"""Check if the CoinGecko API server is up and running."""
try:
data = await fetch_coingecko_json("/ping")
if "gecko_says" in data:
return "✅ CoinGecko API server is up and running!"
else:
return f"❌ CoinGecko API server returned unexpected response: {data}"
except Exception as e:
return f"❌ Error checking CoinGecko API server: {str(e)}"
# Dokumentasi: https://docs.coingecko.com/reference/coins-list
@mcp.tool("get_coin_list")
async def get_coin_list(include_platform: bool = False) -> str:
"""Get list of all supported coins with id, name, and symbol.
Args:
include_platform: Include platform contract addresses
"""
try:
# Fetch coin list from CoinGecko API
params = {"include_platform": "true" if include_platform else "false"}
data = await fetch_coingecko_json("/coins/list", params=params)
if "error" in data:
return f"❌ Error fetching coin list: {data['error']}"
# Check if data is a list
if not isinstance(data, list):
return f"❌ Unexpected response format: {data}"
# Limit the number of coins to display (to avoid overwhelming output)
max_coins = 20
limited_data = data[:max_coins]
# Format response as markdown table
result = "## CoinGecko Coin List\n\n"
result += "| ID | Symbol | Name |\n"
result += "|-----|--------|------|\n"
for coin in limited_data:
coin_id = coin.get("id", "N/A")
symbol = coin.get("symbol", "N/A")
name = coin.get("name", "N/A")
result += f"| {coin_id} | {symbol} | {name} |\n"
# Add note about limited results
if len(data) > max_coins:
result += f"\n*Showing {max_coins} of {len(data)} coins*\n\n"
result += f"**Total coins available:** {len(data)}"
return result
except Exception as e:
return f"❌ Error fetching coin list: {str(e)}"
# Dokumentasi: https://docs.coingecko.com/reference/simple-price
@mcp.tool("get_coin_price")
async def get_coin_price(ids: str, vs_currencies: str = "usd") -> str:
"""Get current price of coins in any currency.
Args:
ids: Comma-separated coin ids (e.g. bitcoin,ethereum)
vs_currencies: Comma-separated currency codes (e.g. usd,eur)
"""
try:
# Validasi input
if not ids:
return "❌ Error: Coin IDs are required"
# Siapkan parameter
params = {
"ids": ids,
"vs_currencies": vs_currencies,
"include_market_cap": "true",
"include_24hr_vol": "true",
"include_24hr_change": "true",
"include_last_updated_at": "true"
}
# Ambil data dari API
data = await fetch_coingecko_json("/simple/price", params)
if "error" in data:
return f"❌ Error fetching coin prices: {data['error']}"
if not data or len(data) == 0:
return "❌ No price data found for the specified coins"
# Format response as markdown table
result = "## CoinGecko Coin Prices\n\n"
# Split currencies
currencies = vs_currencies.split(",")
# Format as table
result += "| Coin | Currency | Price | Market Cap | 24h Volume | 24h Change | Last Updated |\n"
result += "|------|----------|-------|------------|------------|------------|----------------|\n"
for coin_id, prices in data.items():
for currency in currencies:
if currency in prices:
price = prices.get(currency, "N/A")
market_cap = prices.get(f"{currency}_market_cap", "N/A")
vol_24h = prices.get(f"{currency}_24h_vol", "N/A")
change_24h = prices.get(f"{currency}_24h_change", "N/A")
# Format change with sign
if change_24h != "N/A":
change_sign = "🔺" if change_24h >= 0 else "🔻"
change_formatted = f"{change_sign} {abs(change_24h):.2f}%"
else:
change_formatted = "N/A"
# Format last updated
last_updated_at = prices.get("last_updated_at", None)
if last_updated_at:
last_updated = datetime.fromtimestamp(last_updated_at).strftime('%Y-%m-%d %H:%M:%S')
else:
last_updated = "N/A"
# Format numbers for better readability
if market_cap != "N/A" and market_cap > 1000000:
market_cap = f"${market_cap/1000000:.2f}M"
if vol_24h != "N/A" and vol_24h > 1000000:
vol_24h = f"${vol_24h/1000000:.2f}M"
result += f"| {coin_id} | {currency.upper()} | ${price} | {market_cap} | {vol_24h} | {change_formatted} | {last_updated} |\n"
return result
except Exception as e:
print(f"ERROR in get_coin_price: {str(e)}", file=sys.stderr)
return f"Terjadi kesalahan saat mengambil harga koin: {str(e)}"
# Dokumentasi: https://docs.coingecko.com/reference/coins-id
@mcp.tool("get_coin_detail")
async def get_coin_detail(id: str, localization: bool = False, tickers: bool = True, market_data: bool = True, community_data: bool = True, developer_data: bool = True) -> str:
"""Get current data for a coin including price, market cap, volume, and more.
Args:
id: The coin id (e.g. bitcoin, ethereum)
localization: Include localized data
tickers: Include ticker data
market_data: Include market data
community_data: Include community data
developer_data: Include developer data
"""
try:
# Validasi parameter
if not id:
return "❌ Error: Coin ID is required"
# Set parameters
params = {
"localization": str(localization).lower(),
"tickers": str(tickers).lower(),
"market_data": str(market_data).lower(),
"community_data": str(community_data).lower(),
"developer_data": str(developer_data).lower(),
"sparkline": "false"
}
# Fetch data
data = await fetch_coingecko_json(f"/coins/{id}", params)
if "error" in data:
return f"❌ Error fetching coin details: {data['error']}"
# Format response as markdown
result = f"## {data.get('name', 'N/A')} ({data.get('symbol', 'N/A').upper()})\n\n"
# Add basic info
result += "### Basic Information\n\n"
result += f"**ID:** {data.get('id', 'N/A')} \n"
result += f"**Symbol:** {data.get('symbol', 'N/A').upper()} \n"
result += f"**Name:** {data.get('name', 'N/A')} \n"
# Add image if available
if 'image' in data and 'large' in data['image']:
result += f"**Logo:**  \n"
# Add market data if available
if market_data and 'market_data' in data:
market = data['market_data']
result += "\n### Market Data\n\n"
# Current price
if 'current_price' in market:
result += "**Current Price:** \n"
for currency, price in list(market['current_price'].items())[:5]: # Limit to 5 currencies
result += f"- {currency.upper()}: {price:,.8f} \n"
# Market cap
if 'market_cap' in market:
usd_market_cap = market['market_cap'].get('usd', 0)
if usd_market_cap:
if usd_market_cap >= 1_000_000_000:
market_cap_str = f"${usd_market_cap/1_000_000_000:.2f}B"
elif usd_market_cap >= 1_000_000:
market_cap_str = f"${usd_market_cap/1_000_000:.2f}M"
else:
market_cap_str = f"${usd_market_cap:,.0f}"
result += f"**Market Cap:** {market_cap_str} \n"
# 24h volume
if 'total_volume' in market:
usd_volume = market['total_volume'].get('usd', 0)
if usd_volume:
if usd_volume >= 1_000_000_000:
volume_str = f"${usd_volume/1_000_000_000:.2f}B"
elif usd_volume >= 1_000_000:
volume_str = f"${usd_volume/1_000_000:.2f}M"
else:
volume_str = f"${usd_volume:,.0f}"
result += f"**24h Trading Volume:** {volume_str} \n"
# Price changes
if 'price_change_percentage_24h' in market:
change_24h = market['price_change_percentage_24h']
if change_24h is not None:
arrow = "⬆" if change_24h > 0 else "⬇" if change_24h < 0 else ""
result += f"**24h Change:** {arrow} {change_24h:+.2f}% \n"
if 'price_change_percentage_7d' in market:
change_7d = market['price_change_percentage_7d']
if change_7d is not None:
arrow = "⬆" if change_7d > 0 else "⬇" if change_7d < 0 else ""
result += f"**7d Change:** {arrow} {change_7d:+.2f}% \n"
if 'price_change_percentage_30d' in market:
change_30d = market['price_change_percentage_30d']
if change_30d is not None:
arrow = "⬆" if change_30d > 0 else "⬇" if change_30d < 0 else ""
result += f"**30d Change:** {arrow} {change_30d:+.2f}% \n"
# All-time high
if 'ath' in market and 'ath_date' in market and 'ath_change_percentage' in market:
ath_usd = market['ath'].get('usd')
ath_date_usd = market['ath_date'].get('usd')
ath_change_percentage_usd = market['ath_change_percentage'].get('usd')
if ath_usd and ath_date_usd:
ath_date_formatted = ath_date_usd.split('T')[0] # Extract just the date part
result += f"**All-Time High:** ${ath_usd:,.2f} ({ath_date_formatted}) \n"
if ath_change_percentage_usd is not None:
result += f"**From ATH:** {ath_change_percentage_usd:+.2f}% \n"
# Add community data if available
if community_data and 'community_data' in data:
community = data['community_data']
result += "\n### Community Data\n\n"
if 'twitter_followers' in community:
twitter = community['twitter_followers']
if twitter:
result += f"**Twitter Followers:** {twitter:,} \n"
if 'reddit_subscribers' in community:
reddit = community['reddit_subscribers']
if reddit:
result += f"**Reddit Subscribers:** {reddit:,} \n"
# Add developer data if available
if developer_data and 'developer_data' in data:
dev = data['developer_data']
result += "\n### Developer Data\n\n"
if 'forks' in dev:
forks = dev['forks']
if forks is not None:
result += f"**Forks:** {forks} \n"
if 'stars' in dev:
stars = dev['stars']
if stars is not None:
result += f"**Stars:** {stars} \n"
if 'subscribers' in dev:
subscribers = dev['subscribers']
if subscribers is not None:
result += f"**Subscribers:** {subscribers} \n"
if 'total_issues' in dev:
issues = dev['total_issues']
if issues is not None:
result += f"**Total Issues:** {issues} \n"
if 'closed_issues' in dev:
closed = dev['closed_issues']
if closed is not None:
result += f"**Closed Issues:** {closed} \n"
# Add links if available
if 'links' in data:
links = data['links']
result += "\n### Links\n\n"
if 'homepage' in links and links['homepage']:
homepage = links['homepage'][0]
if homepage:
result += f"**Website:** {homepage} \n"
if 'blockchain_site' in links and links['blockchain_site']:
blockchain = links['blockchain_site'][0]
if blockchain:
result += f"**Blockchain Explorer:** {blockchain} \n"
if 'official_forum_url' in links and links['official_forum_url']:
forum = links['official_forum_url'][0]
if forum:
result += f"**Official Forum:** {forum} \n"
if 'chat_url' in links and links['chat_url']:
chat = links['chat_url'][0]
if chat:
result += f"**Chat:** {chat} \n"
if 'announcement_url' in links and links['announcement_url']:
announcement = links['announcement_url'][0]
if announcement:
result += f"**Announcements:** {announcement} \n"
# Add description if available
if 'description' in data and 'en' in data['description'] and data['description']['en']:
description = data['description']['en']
# Truncate if too long
if len(description) > 500:
description = description[:500] + "..."
result += "\n### Description\n\n"
result += f"{description}\n"
return result
except Exception as e:
return f"❌ Error fetching market chart data: {str(e)}"
# Dokumentasi: https://docs.coingecko.com/reference/coins-id-market-chart
@mcp.tool("get_coin_market_chart")
async def get_coin_market_chart(id: str, vs_currency: str = "usd", days: str = "14", interval: str = None) -> str:
"""Get historical market data for a coin.
Args:
id: Coin id (e.g. bitcoin)
vs_currency: Currency code (e.g. usd)
days: Data up to number of days ago (any integer or 'max')
interval: Data interval (e.g. 'daily', leave empty for auto granularity)
"""
try:
# Validasi input
if not id:
return "❌ Error: Coin ID is required"
# Validasi interval parameter jika disediakan
if interval and interval not in ["daily"]:
return "❌ Error: 'interval' must be 'daily' or empty"
# Siapkan parameter
params = {
"vs_currency": vs_currency,
"days": days
}
# Tambahkan interval jika disediakan
if interval:
params["interval"] = interval
# Ambil data dari API
data = await fetch_coingecko_json(f"/coins/{id}/market_chart", params)
if "error" in data:
return f"❌ Error fetching market chart: {data['error']}"
# Format response as markdown
result = f"## {id.capitalize()} Market Chart ({days} days)\n\n"
# Process prices
prices = data.get("prices", [])
if not prices:
return f"{result}❌ No price data found"
# Get first and last price
first_price = prices[0][1] if len(prices) > 0 else None
last_price = prices[-1][1] if len(prices) > 0 else None
# Calculate change
if first_price and last_price:
change = ((last_price - first_price) / first_price) * 100
change_sign = "🔺" if change >= 0 else "🔻"
change_str = f"{change_sign} {abs(change):.2f}%"
else:
change_str = "N/A"
# Summary
result += "### Summary\n\n"
result += f"- Currency: {vs_currency.upper()}\n"
result += f"- Period: {days} days\n"
result += f"- Starting price: ${first_price:.4f}\n" if first_price else ""
result += f"- Current price: ${last_price:.4f}\n" if last_price else ""
result += f"- Change: {change_str}\n\n"
# Market caps
market_caps = data.get("market_caps", [])
if market_caps and len(market_caps) > 0:
last_market_cap = market_caps[-1][1]
if last_market_cap > 1000000000:
result += f"- Market Cap: ${last_market_cap/1000000000:.2f}B\n"
elif last_market_cap > 1000000:
result += f"- Market Cap: ${last_market_cap/1000000:.2f}M\n"
else:
result += f"- Market Cap: ${last_market_cap:,.0f}\n"
# Volumes
volumes = data.get("total_volumes", [])
if volumes and len(volumes) > 0:
last_volume = volumes[-1][1]
if last_volume > 1000000000:
result += f"- 24h Volume: ${last_volume/1000000000:.2f}B\n\n"
elif last_volume > 1000000:
result += f"- 24h Volume: ${last_volume/1000000:.2f}M\n\n"
else:
result += f"- 24h Volume: ${last_volume:,.0f}\n\n"
# Sample data
result += "### Recent Price Data\n\n"
result += "| Date | Price | Market Cap | Volume |\n"
result += "|------|-------|------------|--------|\n"
# Get last 5 data points with even spacing
sample_size = min(5, len(prices))
step = max(1, len(prices) // sample_size)
sample_indices = [len(prices) - 1 - (i * step) for i in range(sample_size)]
sample_indices.sort() # Sort in ascending order
for idx in sample_indices:
if idx < 0 or idx >= len(prices):
continue
ts = prices[idx][0]
# Find matching data points
price_val = prices[idx][1] if idx < len(prices) else None
mcap_val = market_caps[idx][1] if idx < len(market_caps) else None
vol_val = volumes[idx][1] if idx < len(volumes) else None
# Format timestamp
date_str = datetime.fromtimestamp(ts/1000).strftime('%Y-%m-%d')
# Format values
price_str = f"${price_val:.4f}" if price_val is not None else "N/A"
if mcap_val is not None:
if mcap_val > 1000000000:
mcap_str = f"${mcap_val/1000000000:.2f}B"
elif mcap_val > 1000000:
mcap_str = f"${mcap_val/1000000:.2f}M"
else:
mcap_str = f"${mcap_val:,.0f}"
else:
mcap_str = "N/A"
if vol_val is not None:
if vol_val > 1000000000:
vol_str = f"${vol_val/1000000000:.2f}B"
elif vol_val > 1000000:
vol_str = f"${vol_val/1000000:.2f}M"
else:
vol_str = f"${vol_val:,.0f}"
else:
vol_str = "N/A"
result += f"| {date_str} | {price_str} | {mcap_str} | {vol_str} |\n"
# Tambahkan informasi tambahan
result += "\n### Informasi Tambahan\n\n"
result += "*Untuk informasi lebih detail tentang koin ini, gunakan fungsi `get_coin_detail`*\n"
result += "*Data disediakan oleh CoinGecko API*\n"
return result
except Exception as e:
return f"❌ Error fetching market chart data: {str(e)}"
# Dokumentasi: https://docs.coingecko.com/reference/search-trending
@mcp.tool("get_trending_coins")
async def get_trending_coins() -> str:
"""Get trending search coins on CoinGecko in the last 24 hours."""
try:
# Ambil data dari API
data = await fetch_coingecko_json("/search/trending")
if "error" in data:
return f"❌ Error fetching trending coins: {data['error']}"
# Format response as markdown
result = "## Trending Coins (24h)\n\n"
# Check if we have trending coins
coins = data.get("coins", [])
if not coins:
return f"{result}❌ No trending coins found"
# Format as table
result += "| Rank | Coin | Symbol | Market Cap Rank | Price (BTC) |\n"
result += "|------|------|--------|----------------|-------------|\n"
for i, item in enumerate(coins):
coin = item.get("item", {})
name = coin.get("name", "N/A")
symbol = coin.get("symbol", "N/A").upper()
market_cap_rank = coin.get("market_cap_rank", "N/A")
price_btc = coin.get("price_btc", "N/A")
# Format BTC price with appropriate precision
if price_btc != "N/A":
price_btc = f"{price_btc:.8f} BTC"
# Add coin ID for reference
coin_id = coin.get("id", "N/A")
result += f"| {i+1} | {name} | {symbol} | {market_cap_rank} | {price_btc} |\n"
return result
except Exception as e:
return f"❌ Error fetching trending coins: {str(e)}"
@mcp.tool("search_coins")
async def search_coins(query: str) -> str:
"""Search for coins, categories and markets.
Args:
query: Search query
"""
try:
# Validasi input
if not query:
return "❌ Error: Search query is required"
# Ambil data dari API
data = await fetch_coingecko_json("/search", {"query": query})
if "error" in data:
return f"❌ Error searching for '{query}': {data['error']}"
# Format response as markdown
result = f"## Search Results: '{query}'\n\n"
# Process coins
coins = data.get("coins", [])
if coins:
result += f"### Coins ({len(coins)})\n\n"
result += "| ID | Name | Symbol | Market Cap Rank |\n"
result += "|-----|------|--------|--------------------|\n"
# Limit to first 15 coins for readability
display_limit = min(15, len(coins))
for coin in coins[:display_limit]:
coin_id = coin.get("id", "N/A")
name = coin.get("name", "N/A")
symbol = coin.get("symbol", "N/A").upper()
market_cap_rank = coin.get("market_cap_rank", "N/A")
result += f"| {coin_id} | {name} | {symbol} | {market_cap_rank} |\n"
if len(coins) > display_limit:
result += f"\n*Showing {display_limit} of {len(coins)} coins*\n"
else:
result += f"### Coins\n\nNo coins found matching '{query}'.\n"
# Process exchanges
exchanges = data.get("exchanges", [])
if exchanges:
result += f"\n### Exchanges ({len(exchanges)})\n\n"
result += "| ID | Name | Market Type |\n"
result += "|-----|------|------------|\n"
# Limit to first 10 exchanges for readability
display_limit = min(10, len(exchanges))
for exchange in exchanges[:display_limit]:
exchange_id = exchange.get("id", "N/A")
name = exchange.get("name", "N/A")
market_type = exchange.get("market_type", "N/A")
result += f"| {exchange_id} | {name} | {market_type} |\n"
if len(exchanges) > display_limit:
result += f"\n*Showing {display_limit} of {len(exchanges)} exchanges*\n"
# Process categories
categories = data.get("categories", [])
if categories:
result += f"\n### Categories ({len(categories)})\n\n"
result += "| ID | Name |\n"
result += "|-----|------|\n"
# Limit to first 10 categories for readability
display_limit = min(10, len(categories))
for category in categories[:display_limit]:
category_id = category.get("id", "N/A")
name = category.get("name", "N/A")
result += f"| {category_id} | {name} |\n"
if len(categories) > display_limit:
result += f"\n*Showing {display_limit} of {len(categories)} categories*\n"
return result
except Exception as e:
return f"❌ Error searching for '{query}': {str(e)}"
# Dokumentasi: https://docs.coingecko.com/reference/coins-categories-list
@mcp.tool("get_coin_categories")
async def get_coin_categories() -> str:
"""Get list of all coin categories."""
try:
# Fetch data
data = await fetch_coingecko_json("/coins/categories/list")
if "error" in data:
return f"❌ Error fetching coin categories: {data['error']}"
if not isinstance(data, list):
return f"❌ Invalid response format: {data}"
if not data:
return "No coin categories found"
# Format response as markdown
result = "## Coin Categories\n\n"
# Format as table
result += "| Category ID | Category Name |\n"
result += "|-------------|-----------------|\n"
for category in data:
category_id = category.get("category_id", "N/A")
name = category.get("name", "N/A")
result += f"| {category_id} | {name} |\n"
# Add count info
result += f"\n*Total categories: {len(data)}*\n"
return result
except Exception as e:
return f"❌ Error fetching coin categories: {str(e)}"
# Dokumentasi: https://docs.coingecko.com/reference/coins-categories
@mcp.tool("get_coin_category_details")
async def get_coin_category_details(category_id: str) -> str:
"""Get market data for a specific coin category.
Args:
category_id: The category id (e.g. 'decentralized-finance-defi')
"""
try:
# Validasi parameter
if not category_id:
return "❌ Error: Category ID is required"
# Fetch data
data = await fetch_coingecko_json("/coins/categories")
if "error" in data:
return f"❌ Error fetching category details: {data['error']}"
if not isinstance(data, list):
return f"❌ Invalid response format: {data}"
# Find the requested category
category = None
for cat in data:
if cat.get("id") == category_id:
category = cat
break
if not category:
return f"❌ Category '{category_id}' not found"
# Format response as markdown
result = f"## {category.get('name', 'N/A')} Category\n\n"
# Add market data
market_cap = category.get("market_cap", 0)
if market_cap:
if market_cap >= 1_000_000_000:
market_cap_str = f"${market_cap/1_000_000_000:.2f}B"
elif market_cap >= 1_000_000:
market_cap_str = f"${market_cap/1_000_000:.2f}M"
else:
market_cap_str = f"${market_cap:,.0f}"
result += f"**Market Cap:** {market_cap_str} \n"
volume_24h = category.get("volume_24h", 0)
if volume_24h:
if volume_24h >= 1_000_000_000:
volume_str = f"${volume_24h/1_000_000_000:.2f}B"
elif volume_24h >= 1_000_000:
volume_str = f"${volume_24h/1_000_000:.2f}M"
else:
volume_str = f"${volume_24h:,.0f}"
result += f"**24h Volume:** {volume_str} \n"
# Add price changes
market_cap_change_24h = category.get("market_cap_change_24h", 0)
if market_cap_change_24h is not None:
arrow = "⬆" if market_cap_change_24h > 0 else "⬇" if market_cap_change_24h < 0 else ""
result += f"**24h Market Cap Change:** {arrow} {market_cap_change_24h:+.2f}% \n"
# Add top coins in category
top_coins = category.get("top_3_coins", [])
if top_coins:
result += "\n**Top Coins in Category:** \n"
for i, coin_url in enumerate(top_coins[:3]):
result += f"{i+1}.  \n"
# Add updated timestamp if available
updated_at = category.get("updated_at", "")
if updated_at:
result += f"\n*Last updated: {updated_at}*\n"
return result
except Exception as e:
return f"❌ Error fetching category details: {str(e)}"
# Market API endpoints
# ---------------------------------------------------------------------------
# Dokumentasi: https://docs.coingecko.com/reference/global
@mcp.tool("get_global_data")
async def get_global_data() -> str:
"""Get cryptocurrency global market data."""
try:
# Ambil data dari API
data = await fetch_coingecko_json("/global")
if "error" in data:
return f"❌ Error fetching global market data: {data['error']}"
# Extract data
data = data.get("data", {})
if not data:
return "❌ No global market data found"
# Format response as markdown
result = "## Global Cryptocurrency Market Data\n\n"
# Market cap
total_market_cap = data.get("total_market_cap", {})
total_market_cap_usd = total_market_cap.get("usd", 0)
# Volume
total_volume = data.get("total_volume", {})
total_volume_usd = total_volume.get("usd", 0)
# Format market cap and volume with appropriate units
if total_market_cap_usd >= 1_000_000_000_000:
market_cap_formatted = f"${total_market_cap_usd/1_000_000_000_000:.2f}T"
elif total_market_cap_usd >= 1_000_000_000:
market_cap_formatted = f"${total_market_cap_usd/1_000_000_000:.2f}B"
else:
market_cap_formatted = f"${total_market_cap_usd:,.0f}"
if total_volume_usd >= 1_000_000_000_000:
volume_formatted = f"${total_volume_usd/1_000_000_000_000:.2f}T"
elif total_volume_usd >= 1_000_000_000:
volume_formatted = f"${total_volume_usd/1_000_000_000:.2f}B"
else:
volume_formatted = f"${total_volume_usd:,.0f}"
# Market cap percentage
market_cap_percentage = data.get("market_cap_percentage", {})
# Format summary
result += "### Summary\n\n"
result += "| Metric | Value |\n"
result += "|--------|-------|\n"
result += f"| Active Cryptocurrencies | {data.get('active_cryptocurrencies', 'N/A')} |\n"
result += f"| Active Markets | {data.get('markets', 'N/A')} |\n"
result += f"| Total Market Cap | {market_cap_formatted} |\n"
result += f"| Total 24h Volume | {volume_formatted} |\n"
result += f"| BTC Dominance | {data.get('market_cap_percentage', {}).get('btc', 0):.2f}% |\n"
result += f"| ETH Dominance | {data.get('market_cap_percentage', {}).get('eth', 0):.2f}% |\n"
result += f"| Market Cap Change 24h | {data.get('market_cap_change_percentage_24h_usd', 0):.2f}% |\n"
# Market cap percentage (dominance)
result += "\n### Market Cap Percentage (Dominance)\n\n"
result += "| Coin | Percentage |\n"
result += "|------|------------|\n"
# Sort by percentage
sorted_percentages = sorted(market_cap_percentage.items(), key=lambda x: x[1], reverse=True)
for coin, percentage in sorted_percentages[:10]: # Top 10 coins by dominance
coin_upper = coin.upper()
result += f"| {coin_upper} | {percentage:.2f}% |\n"
# Add update time if available
if "updated_at" in data:
from datetime import datetime
update_time = datetime.fromtimestamp(data["updated_at"]).strftime('%Y-%m-%d %H:%M:%S')
result += f"\n*Data updated at: {update_time}*\n"
return result
except Exception as e:
return f"❌ Error fetching global market data: {str(e)}"
@mcp.tool("get_top_gainers_losers")
async def get_top_gainers_losers(vs_currency: str = "usd", time_period: str = "24h", top_count: int = 10) -> str:
"""Get top gainers and losers in the market.
Args:
vs_currency: Currency code (e.g. usd)
time_period: Time period (1h, 24h, 7d, 14d, 30d, 200d, 1y)
top_count: Number of top gainers/losers to show
"""
try:
# Validasi parameter
valid_periods = ["1h", "24h", "7d", "14d", "30d", "200d", "1y"]
if time_period not in valid_periods:
return f"❌ Error: Invalid time period. Use one of: {', '.join(valid_periods)}"
if top_count < 1 or top_count > 100:
return f"❌ Error: top_count must be between 1 and 100"
# Map time_period ke parameter price_change_percentage
period_map = {
"1h": "1h",
"24h": "24h",
"7d": "7d",
"14d": "14d",
"30d": "30d",
"200d": "200d",
"1y": "1y"
}
price_change_key = f"price_change_percentage_{period_map[time_period]}_in_currency"
# Set parameters for demo API
params = {
"vs_currency": vs_currency,
"order": "market_cap_desc",
"per_page": 250, # Get more coins to find top gainers/losers
"page": 1,
"price_change_percentage": period_map[time_period]
}
if DEBUG:
print(f"DEBUG - Fetching top gainers/losers for {time_period}", file=sys.stderr)
data = await fetch_coingecko_json("/coins/markets", params)
if "error" in data:
return f"❌ Error fetching top gainers/losers: {data['error']}"
if not isinstance(data, list):
return f"❌ Invalid response format: {data}"
# Filter out coins with None price change
filtered_data = [coin for coin in data if coin.get(price_change_key) is not None]
if not filtered_data:
return f"❌ No price change data available for period {time_period}"
# Sort by price change percentage
gainers = sorted(filtered_data, key=lambda x: x.get(price_change_key, 0), reverse=True)
losers = sorted(filtered_data, key=lambda x: x.get(price_change_key, 0))
# Limit to top_count
top_gainers = gainers[:top_count]
top_losers = losers[:top_count]
# Format response as markdown
result = f"## Top Gainers and Losers ({time_period})\n\n"
# Format top gainers
result += f"### Top {top_count} Gainers\n\n"
result += f"| Coin | Symbol | Price ({vs_currency.upper()}) | {time_period} Change | Market Cap |\n"
result += f"|------|--------|-----------------|--------------|------------|\n"
for coin in top_gainers:
name = coin.get("name", "N/A")
symbol = coin.get("symbol", "N/A").upper()
price = coin.get("current_price", 0)
price_change = coin.get(price_change_key, 0)
market_cap = coin.get("market_cap", 0)
# Format values with appropriate precision
if price < 0.01 and price > 0:
price_str = f"{price:.8f}"
else:
price_str = f"{price:,.2f}"
price_change_str = f"{price_change:+.2f}%"
# Format market cap with appropriate units
if market_cap >= 1_000_000_000:
market_cap_str = f"${market_cap/1_000_000_000:.2f}B"
elif market_cap >= 1_000_000:
market_cap_str = f"${market_cap/1_000_000:.2f}M"
else:
market_cap_str = f"${market_cap:,.0f}"
result += f"| {name} | {symbol} | {price_str} | {price_change_str} | {market_cap_str} |\n"
# Format top losers
result += f"\n### Top {top_count} Losers\n\n"
result += f"| Coin | Symbol | Price ({vs_currency.upper()}) | {time_period} Change | Market Cap |\n"
result += f"|------|--------|-----------------|--------------|------------|\n"
for coin in top_losers:
name = coin.get("name", "N/A")
symbol = coin.get("symbol", "N/A").upper()
price = coin.get("current_price", 0)
price_change = coin.get(price_change_key, 0)
market_cap = coin.get("market_cap", 0)
# Format values with appropriate precision
if price < 0.01 and price > 0:
price_str = f"{price:.8f}"
else:
price_str = f"{price:,.2f}"
price_change_str = f"{price_change:+.2f}%"
# Format market cap with appropriate units
if market_cap >= 1_000_000_000:
market_cap_str = f"${market_cap/1_000_000_000:.2f}B"
elif market_cap >= 1_000_000:
market_cap_str = f"${market_cap/1_000_000:.2f}M"
else:
market_cap_str = f"${market_cap:,.0f}"
result += f"| {name} | {symbol} | {price_str} | {price_change_str} | {market_cap_str} |\n"
return result
except Exception as e:
return f"❌ Error fetching top gainers/losers: {str(e)}"
# Dokumentasi: https://docs.coingecko.com/reference/coins-id-market-chart-range
@mcp.tool("get_coin_market_chart_range")
async def get_coin_market_chart_range(id: str, vs_currency: str = "usd", from_timestamp: int = None, to_timestamp: int = None) -> str:
"""Get historical market data for a coin within a specific time range.
Args:
id: Coin id (e.g. bitcoin)
vs_currency: Currency code (e.g. usd)
from_timestamp: Starting date in UNIX timestamp
to_timestamp: Ending date in UNIX timestamp
"""
try:
# Validasi parameter
if not id:
return "❌ Error: Coin ID is required"
# Default timestamps jika tidak disediakan
if from_timestamp is None:
# Default 7 days ago
from_timestamp = int(time.time()) - (7 * 24 * 60 * 60)
if to_timestamp is None:
# Default to current time
to_timestamp = int(time.time())
if from_timestamp >= to_timestamp:
return "❌ Error: from_timestamp must be earlier than to_timestamp"
# Set parameters
params = {
"vs_currency": vs_currency,
"from": from_timestamp,
"to": to_timestamp
}
# Fetch data
data = await fetch_coingecko_json(f"/coins/{id}/market_chart/range", params)
if "error" in data:
return f"❌ Error fetching market chart data: {data['error']}"
# Format response as markdown
result = f"## Market Chart Data for {id.capitalize()} ({vs_currency.upper()})\n\n"
# Add date range
from_date = datetime.fromtimestamp(from_timestamp).strftime('%Y-%m-%d %H:%M')
to_date = datetime.fromtimestamp(to_timestamp).strftime('%Y-%m-%d %H:%M')
result += f"*From {from_date} to {to_date}*\n\n"
# Process price data
prices = data.get("prices", [])
if prices:
result += "### Price Data\n\n"
result += "| Date | Price |\n"
result += "|------|-------|\n"
# Show only a sample of price data points (first, middle, and last few)
sample_size = 7
total_points = len(prices)
if total_points <= sample_size * 3:
# If we have few data points, show all
sample_prices = prices
else:
# Otherwise, show first few, middle few, and last few
first = prices[:sample_size]
middle_start = total_points // 2 - sample_size // 2
middle = prices[middle_start:middle_start + sample_size]
last = prices[-sample_size:]
sample_prices = first + [None] + middle + [None] + last
for price_data in sample_prices:
if price_data is None:
result += "| ... | ... |\n"
continue
timestamp, price = price_data
date = datetime.fromtimestamp(timestamp/1000).strftime('%Y-%m-%d %H:%M')
# Format price based on value
if vs_currency.lower() in ["btc", "eth", "ltc"] or price < 1:
price_str = f"{price:.8f}"
else:
price_str = f"{price:.2f}"
result += f"| {date} | {price_str} |\n"
result += f"\n*Total price data points: {len(prices)}*\n"
# Process market cap data
market_caps = data.get("market_caps", [])
if market_caps:
result += "\n### Market Cap Data\n\n"
# Calculate average, min, and max market cap
if market_caps:
market_cap_values = [mc[1] for mc in market_caps]
avg_market_cap = sum(market_cap_values) / len(market_cap_values)
min_market_cap = min(market_cap_values)
max_market_cap = max(market_cap_values)
# Format values for readability
if avg_market_cap >= 1_000_000_000:
avg_str = f"${avg_market_cap/1_000_000_000:.2f}B"
min_str = f"${min_market_cap/1_000_000_000:.2f}B"
max_str = f"${max_market_cap/1_000_000_000:.2f}B"
elif avg_market_cap >= 1_000_000:
avg_str = f"${avg_market_cap/1_000_000:.2f}M"
min_str = f"${min_market_cap/1_000_000:.2f}M"
max_str = f"${max_market_cap/1_000_000:.2f}M"
else:
avg_str = f"${avg_market_cap:.2f}"
min_str = f"${min_market_cap:.2f}"
max_str = f"${max_market_cap:.2f}"
result += f"**Average Market Cap:** {avg_str} \n"
result += f"**Min Market Cap:** {min_str} \n"
result += f"**Max Market Cap:** {max_str} \n"
result += f"**Data Points:** {len(market_caps)} \n"
# Process volume data
total_volumes = data.get("total_volumes", [])
if total_volumes:
result += "\n### Trading Volume Data\n\n"
# Calculate average, min, and max volume
if total_volumes:
volume_values = [vol[1] for vol in total_volumes]
avg_volume = sum(volume_values) / len(volume_values)
min_volume = min(volume_values)
max_volume = max(volume_values)
# Format values for readability
if avg_volume >= 1_000_000_000:
avg_str = f"${avg_volume/1_000_000_000:.2f}B"
min_str = f"${min_volume/1_000_000_000:.2f}B"
max_str = f"${max_volume/1_000_000_000:.2f}B"
elif avg_volume >= 1_000_000:
avg_str = f"${avg_volume/1_000_000:.2f}M"
min_str = f"${min_volume/1_000_000:.2f}M"
max_str = f"${max_volume/1_000_000:.2f}M"
else:
avg_str = f"${avg_volume:.2f}"
min_str = f"${min_volume:.2f}"
max_str = f"${max_volume:.2f}"
result += f"**Average Volume:** {avg_str} \n"
result += f"**Min Volume:** {min_str} \n"
result += f"**Max Volume:** {max_str} \n"
result += f"**Data Points:** {len(total_volumes)} \n"
return result
except Exception as e:
return f"❌ Error fetching market chart range data: {str(e)}"
# Dokumentasi: https://docs.coingecko.com/reference/coins-id-tickers
@mcp.tool("get_coin_tickers")
async def get_coin_tickers(id: str, exchange_ids: str = None, include_exchange_logo: bool = False, page: int = 1, order: str = "trust_score_desc", depth: bool = False) -> str:
"""Get ticker data for a specific coin.
Args:
id: Coin id (e.g. bitcoin)
exchange_ids: Comma-separated exchange ids (e.g. 'binance,gdax')
include_exchange_logo: Include exchange logo URLs
page: Page number
order: Sort tickers by order (trust_score_desc, trust_score_asc, volume_desc)
depth: Include 2% orderbook depth
"""
try:
# Validasi parameter
if not id:
return "❌ Error: Coin ID is required"
# Validasi order parameter
valid_orders = ["trust_score_desc", "trust_score_asc", "volume_desc"]
if order not in valid_orders:
return f"❌ Error: Invalid order parameter. Valid options are: {', '.join(valid_orders)}"
# Set parameters
params = {
"page": page,
"order": order,
"depth": str(depth).lower(),
"include_exchange_logo": str(include_exchange_logo).lower()
}
# Add exchange_ids if provided
if exchange_ids:
params["exchange_ids"] = exchange_ids
# Fetch data
data = await fetch_coingecko_json(f"/coins/{id}/tickers", params)
if "error" in data:
return f"❌ Error fetching ticker data: {data['error']}"
tickers = data.get("tickers", [])
if not tickers:
return f"❌ No ticker data found for {id}"
# Format response as markdown
result = f"## Ticker Data for {id.capitalize()}\n\n"
# Add pagination info if available
if "total" in data and "per_page" in data and "last" in data:
total = data.get("total", 0)
per_page = data.get("per_page", 0)
last_page = data.get("last", 0)
current_page = page
result += f"*Page {current_page} of {last_page} (Total: {total} tickers, {per_page} per page)*\n\n"
# Format as table
result += "| Exchange | Pair | Price | Volume | Trust Score | Last Updated |\n"
result += "|----------|------|-------|--------|-------------|-------------|\n"
for ticker in tickers[:20]: # Limit to first 20 entries for readability
exchange = ticker.get("market", {}).get("name", "Unknown")
base = ticker.get("base", "")
target = ticker.get("target", "")
pair = f"{base}/{target}"
# Get price
price = ticker.get("last", 0)
if price < 0.000001:
price_str = f"{price:.8f}"
elif price < 1:
price_str = f"{price:.6f}"
else:
price_str = f"{price:.2f}"
# Format volume
volume = ticker.get("volume", 0)
if volume >= 1_000_000_000:
volume_str = f"${volume/1_000_000_000:.2f}B"
elif volume >= 1_000_000:
volume_str = f"${volume/1_000_000:.2f}M"
elif volume >= 1_000:
volume_str = f"${volume/1_000:.2f}K"
else:
volume_str = f"${volume:.2f}"
# Get trust score
trust_score = ticker.get("trust_score", "")
if trust_score == "green":
trust_score_str = "✅ High"
elif trust_score == "yellow":
trust_score_str = "⚠️ Medium"
elif trust_score == "red":
trust_score_str = "❌ Low"
else:
trust_score_str = "N/A"
# Format timestamp
timestamp = ticker.get("last_traded_at", "")
if timestamp:
try:
dt = datetime.fromisoformat(timestamp.replace("Z", "+00:00"))
timestamp_str = dt.strftime("%Y-%m-%d %H:%M")
except:
timestamp_str = timestamp
else:
timestamp_str = "N/A"
result += f"| {exchange} | {pair} | {price_str} | {volume_str} | {trust_score_str} | {timestamp_str} |\n"
# Add note if data was truncated
if len(tickers) > 20:
result += f"\n*Note: Showing 20 of {len(tickers)} tickers*\n"
# Add market info
if "market_cap_rank" in data:
result += f"\n**Market Cap Rank:** #{data['market_cap_rank']}\n"
# Add bid-ask spread if available and depth was requested
if depth and any("cost_to_move_up_usd" in ticker or "cost_to_move_down_usd" in ticker for ticker in tickers[:5]):
result += "\n### Order Book Depth (2%)\n\n"
result += "| Exchange | Pair | Cost to Move Up | Cost to Move Down |\n"
result += "|----------|------|----------------|------------------|\n"
for ticker in [t for t in tickers[:10] if "cost_to_move_up_usd" in t or "cost_to_move_down_usd" in t]:
exchange = ticker.get("market", {}).get("name", "Unknown")
base = ticker.get("base", "")
target = ticker.get("target", "")
pair = f"{base}/{target}"
cost_up = ticker.get("cost_to_move_up_usd", 0)
cost_down = ticker.get("cost_to_move_down_usd", 0)
# Format costs
if cost_up >= 1_000_000:
cost_up_str = f"${cost_up/1_000_000:.2f}M"
elif cost_up >= 1_000:
cost_up_str = f"${cost_up/1_000:.2f}K"
else:
cost_up_str = f"${cost_up:.2f}"
if cost_down >= 1_000_000:
cost_down_str = f"${cost_down/1_000_000:.2f}M"
elif cost_down >= 1_000:
cost_down_str = f"${cost_down/1_000:.2f}K"
else:
cost_down_str = f"${cost_down:.2f}"
result += f"| {exchange} | {pair} | {cost_up_str} | {cost_down_str} |\n"
return result
except Exception as e:
return f"❌ Error fetching ticker data: {str(e)}"
# Dokumentasi: https://docs.coingecko.com/reference/coins-id-ohlc
@mcp.tool("get_coin_ohlc")
async def get_coin_ohlc(id: str, vs_currency: str = "usd", days: int = 7) -> str:
"""Get Open, High, Low, Close data for a coin.
Args:
id: Coin id (e.g. bitcoin)
vs_currency: Currency code (e.g. usd)
days: Data up to number of days (1, 7, 14, 30, 90, 180, 365)
"""
try:
# Validasi parameter
if not id:
return "❌ Error: Coin ID is required"
# Validasi days parameter
valid_days = [1, 7, 14, 30, 90, 180, 365]
if days not in valid_days:
return f"❌ Error: Invalid days parameter. Valid options are: {', '.join(map(str, valid_days))}"
# Set parameters
params = {
"vs_currency": vs_currency,
"days": days
}
# Fetch data
data = await fetch_coingecko_json(f"/coins/{id}/ohlc", params)
if "error" in data:
return f"❌ Error fetching OHLC data: {data['error']}"
if not isinstance(data, list) or not data:
return f"❌ No OHLC data found for {id}"
# Format response as markdown
result = f"## OHLC Data for {id.capitalize()} ({vs_currency.upper()})\n\n"
result += f"*Last {days} days*\n\n"
# Format as table
result += "| Date | Open | High | Low | Close |\n"
result += "|------|------|------|-----|-------|\n"
# OHLC data format: [timestamp, open, high, low, close]
for entry in data[:20]: # Limit to first 20 entries for readability
if len(entry) >= 5:
timestamp = entry[0]
open_price = entry[1]
high = entry[2]
low = entry[3]
close = entry[4]
# Convert timestamp to date
date = datetime.fromtimestamp(timestamp/1000).strftime('%Y-%m-%d %H:%M')
# Format prices
if vs_currency.lower() in ["btc", "eth", "ltc"]:
# Use more decimal places for crypto base currencies
open_str = f"{open_price:.8f}"
high_str = f"{high:.8f}"
low_str = f"{low:.8f}"
close_str = f"{close:.8f}"
elif open_price < 1:
# Use more decimal places for small values
open_str = f"{open_price:.6f}"
high_str = f"{high:.6f}"
low_str = f"{low:.6f}"
close_str = f"{close:.6f}"
else:
open_str = f"{open_price:.2f}"
high_str = f"{high:.2f}"
low_str = f"{low:.2f}"
close_str = f"{close:.2f}"
result += f"| {date} | {open_str} | {high_str} | {low_str} | {close_str} |\n"
# Add note if data was truncated
if len(data) > 20:
result += f"\n*Note: Showing 20 of {len(data)} data points*\n"
# Add chart hint
result += "\n*For visualization, consider plotting this data on a candlestick chart*\n"
return result
except Exception as e:
return f"❌ Error fetching OHLC data: {str(e)}"
# ---------------------------------------------------------------------------
# NFT API endpoints
# ---------------------------------------------------------------------------
# Dokumentasi: https://docs.coingecko.com/reference/nfts-list
@mcp.tool("get_nfts")
async def get_nfts(per_page: int = 20, page: int = 1, order: str = "h24_volume_native_desc", asset_platform_id: str = None) -> str:
"""Get list of NFT collections with market data.
Args:
per_page: Number of results per page (1-250)
page: Page number
order: Sort results by field (market_cap_desc, volume_desc, id_asc, etc.)
asset_platform_id: Filter by platform (e.g. 'ethereum')
"""
try:
# Validasi parameter
if per_page < 1 or per_page > 250:
return "❌ Error: per_page must be between 1 and 250"
# Validate order parameter
valid_orders = ["h24_volume_native_desc", "h24_volume_native_asc", "floor_price_native_desc",
"floor_price_native_asc", "market_cap_native_desc", "market_cap_native_asc"]
if order not in valid_orders:
return f"❌ Error: Invalid order parameter. Valid options are: {', '.join(valid_orders)}"
# Set parameters
params = {
"per_page": per_page,
"page": page,
"order": order
}
# Add asset_platform_id if provided
if asset_platform_id:
params["asset_platform_id"] = asset_platform_id
# Fetch data
data = await fetch_coingecko_json("/nfts/list", params)
if "error" in data:
return f"❌ Error fetching NFT collections: {data['error']}"
if not isinstance(data, list):
return f"❌ Invalid response format: {data}"
if not data:
return "No NFT collections found for the specified parameters"
# Format response as markdown
result = "## NFT Collections\n\n"
# Format as table
result += "| # | Collection | Symbol | Contract | Platform |\n"
result += "|---|-----------|--------|----------|----------|\n"
for i, nft in enumerate(data):
name = nft.get("name", "N/A")
symbol = nft.get("symbol", "N/A")
contract = nft.get("contract_address", "N/A")
if len(contract) > 10:
contract = contract[:6] + "..." + contract[-4:]
platform = nft.get("asset_platform_id", "N/A")
result += f"| {i+1} | {name} | {symbol} | {contract} | {platform} |\n"
# Add pagination info
result += f"\n*Page {page}, showing {len(data)} results per page*\n"
return result
except Exception as e:
return f"❌ Error fetching NFT collections: {str(e)}"
# Dokumentasi: https://docs.coingecko.com/reference/nfts-id
@mcp.tool("get_nft_detail")
async def get_nft_detail(id: str) -> str:
"""Get detailed data for a specific NFT collection.
Args:
id: The NFT collection id (e.g. 'cryptopunks')
"""
try:
# Validasi parameter
if not id:
return "❌ Error: NFT collection ID is required"
# Fetch data
data = await fetch_coingecko_json(f"/nfts/{id}")
if "error" in data:
return f"❌ Error fetching NFT collection details: {data['error']}"
# Format response as markdown
result = f"## {data.get('name', 'N/A')} NFT Collection\n\n"
# Add basic info
result += "### Basic Information\n\n"
result += f"**ID:** {data.get('id', 'N/A')} \n"
result += f"**Name:** {data.get('name', 'N/A')} \n"
result += f"**Symbol:** {data.get('symbol', 'N/A')} \n"
# Add image if available
if 'image' in data and data['image'].get('small'):
result += f"**Image:**  \n"
# Add contract info
contract = data.get('contract_address', 'N/A')
result += f"**Contract Address:** {contract} \n"
platform = data.get('asset_platform_id', 'N/A')
result += f"**Platform:** {platform} \n"
# Add market data if available
if 'floor_price' in data and data['floor_price']:
floor_price = data['floor_price'].get('native_currency', 'N/A')
if floor_price != 'N/A':
result += f"**Floor Price:** {floor_price} {data['floor_price'].get('symbol', '')} \n"
floor_price_usd = data['floor_price'].get('usd', 'N/A')
if floor_price_usd != 'N/A':
result += f"**Floor Price (USD):** ${floor_price_usd:,.2f} \n"
if 'market_cap' in data and data['market_cap']:
market_cap = data['market_cap'].get('native_currency', 'N/A')
if market_cap != 'N/A':
result += f"**Market Cap:** {market_cap:,.2f} {data['market_cap'].get('symbol', '')} \n"
market_cap_usd = data['market_cap'].get('usd', 'N/A')
if market_cap_usd != 'N/A':
if market_cap_usd >= 1_000_000_000:
market_cap_str = f"${market_cap_usd/1_000_000_000:.2f}B"
elif market_cap_usd >= 1_000_000:
market_cap_str = f"${market_cap_usd/1_000_000:.2f}M"
else:
market_cap_str = f"${market_cap_usd:,.0f}"
result += f"**Market Cap (USD):** {market_cap_str} \n"
if 'volume_24h' in data and data['volume_24h']:
volume = data['volume_24h'].get('native_currency', 'N/A')
if volume != 'N/A':
result += f"**24h Volume:** {volume:,.2f} {data['volume_24h'].get('symbol', '')} \n"
volume_usd = data['volume_24h'].get('usd', 'N/A')
if volume_usd != 'N/A':
if volume_usd >= 1_000_000:
volume_str = f"${volume_usd/1_000_000:.2f}M"
elif volume_usd >= 1_000:
volume_str = f"${volume_usd/1_000:.2f}K"
else:
volume_str = f"${volume_usd:,.2f}"
result += f"**24h Volume (USD):** {volume_str} \n"
# Add number of unique addresses if available
if 'number_of_unique_addresses' in data:
addresses = data.get('number_of_unique_addresses', 'N/A')
if addresses != 'N/A':
result += f"**Unique Addresses:** {addresses:,} \n"
# Add number of unique tokens if available
if 'number_of_unique_tokens' in data:
tokens = data.get('number_of_unique_tokens', 'N/A')
if tokens != 'N/A':
result += f"**Unique Tokens:** {tokens:,} \n"
# Add links if available
if 'links' in data:
result += "\n### Links\n\n"
links = data['links']
if 'homepage' in links and links['homepage']:
homepage = links['homepage'][0]
if homepage:
result += f"**Website:** {homepage} \n"
if 'twitter' in links and links['twitter']:
twitter = links['twitter'][0]
if twitter:
result += f"**Twitter:** {twitter} \n"
if 'discord' in links and links['discord']:
discord = links['discord'][0]
if discord:
result += f"**Discord:** {discord} \n"
# Add description if available
description = data.get('description', '')
if description:
# Truncate if too long
if len(description) > 500:
description = description[:500] + "..."
result += "\n### Description\n\n"
result += f"{description}\n"
return result
except Exception as e:
return f"❌ Error fetching NFT collection details: {str(e)}"
# ---------------------------------------------------------------------------
# Exchange API endpoints
# ---------------------------------------------------------------------------
# Dokumentasi: https://docs.coingecko.com/reference/exchanges
@mcp.tool("get_exchanges")
async def get_exchanges(per_page: int = 20, page: int = 1) -> str:
"""Get list of all exchanges with basic data.
Args:
per_page: Number of results per page (1-250)
page: Page number
"""
try:
# Validasi parameter
if per_page < 1 or per_page > 250:
return "❌ Error: per_page must be between 1 and 250"
# Set parameters
params = {
"per_page": per_page,
"page": page
}
# Fetch data
data = await fetch_coingecko_json("/exchanges", params)
if "error" in data:
return f"❌ Error fetching exchanges: {data['error']}"
if not isinstance(data, list):
return f"❌ Invalid response format: {data}"
if not data:
return "No exchanges found for the specified parameters"
# Format response as markdown
result = "## Cryptocurrency Exchanges\n\n"
# Format as table
result += "| # | Exchange | Trust Score | 24h Volume (BTC) | Year Established |\n"
result += "|---|----------|-------------|-----------------|---------------------|\n"
for i, exchange in enumerate(data):
name = exchange.get("name", "N/A")
trust_score = exchange.get("trust_score", "N/A")
volume_btc = exchange.get("trade_volume_24h_btc", 0)
year = exchange.get("year_established", "N/A")
# Format volume with appropriate precision
if volume_btc is not None and volume_btc > 0:
if volume_btc >= 10000:
volume_str = f"{volume_btc:,.0f} BTC"
else:
volume_str = f"{volume_btc:,.2f} BTC"
else:
volume_str = "N/A"
result += f"| {i+1} | {name} | {trust_score} | {volume_str} | {year} |\n"
# Add pagination info
result += f"\n*Page {page}, showing {len(data)} results per page*\n"
return result
except Exception as e:
return f"❌ Error fetching exchanges: {str(e)}"
# Dokumentasi: https://docs.coingecko.com/reference/exchanges-id
@mcp.tool("get_exchange_detail")
async def get_exchange_detail(id: str) -> str:
"""Get detailed data for a specific exchange.
Args:
id: The exchange id (e.g. 'binance', 'gdax')
"""
try:
# Validasi parameter
if not id:
return "❌ Error: Exchange ID is required"
# Fetch data
data = await fetch_coingecko_json(f"/exchanges/{id}")
if "error" in data:
return f"❌ Error fetching exchange details: {data['error']}"
# Format response as markdown
result = f"## {data.get('name', 'N/A')} Exchange\n\n"
# Add basic info
result += "### Basic Information\n\n"
result += f"**ID:** {data.get('id', 'N/A')} \n"
result += f"**Name:** {data.get('name', 'N/A')} \n"
# Add image if available
if 'image' in data:
result += f"**Logo:**  \n"
# Add year established if available
year = data.get('year_established')
if year:
result += f"**Year Established:** {year} \n"
# Add country if available
country = data.get('country')
if country:
result += f"**Country:** {country} \n"
# Add trust score if available
trust_score = data.get('trust_score')
if trust_score is not None:
result += f"**Trust Score:** {trust_score}/10 \n"
trust_rank = data.get('trust_score_rank')
if trust_rank is not None:
result += f"**Trust Rank:** #{trust_rank} \n"
# Add trading volume
volume_btc = data.get('trade_volume_24h_btc')
if volume_btc is not None:
if volume_btc >= 10000:
volume_str = f"{volume_btc:,.0f} BTC"
else:
volume_str = f"{volume_btc:,.2f} BTC"
result += f"**24h Trading Volume:** {volume_str} \n"
# Add links if available
if 'url' in data:
result += "\n### Links\n\n"
if 'website' in data['url'] and data['url']['website']:
website = data['url']['website'][0]
if website:
result += f"**Website:** {website} \n"
if 'twitter' in data['url'] and data['url']['twitter']:
twitter = data['url']['twitter'][0]
if twitter:
result += f"**Twitter:** {twitter} \n"
if 'facebook' in data['url'] and data['url']['facebook']:
facebook = data['url']['facebook'][0]
if facebook:
result += f"**Facebook:** {facebook} \n"
if 'telegram' in data['url'] and data['url']['telegram']:
telegram = data['url']['telegram'][0]
if telegram:
result += f"**Telegram:** {telegram} \n"
# Add description if available
description = data.get('description', '')
if description:
# Truncate if too long
if len(description) > 500:
description = description[:500] + "..."
result += "\n### Description\n\n"
result += f"{description}\n"
# Add tickers if available
tickers = data.get('tickers', [])
if tickers:
result += "\n### Top Trading Pairs\n\n"
result += "| Base | Target | Volume | Price | Spread | Last Updated |\n"
result += "|------|--------|--------|-------|--------|-------------|\n"
# Limit to top 10 tickers
for ticker in tickers[:10]:
base = ticker.get('base', 'N/A')
target = ticker.get('target', 'N/A')
volume = ticker.get('volume', 0)
price = ticker.get('last', 0)
bid_ask_spread = ticker.get('bid_ask_spread_percentage', 0)
last_traded = ticker.get('last_traded_at', '')
# Format values
if volume is not None and volume > 0:
if volume >= 1_000_000:
volume_str = f"{volume/1_000_000:.2f}M"
elif volume >= 1_000:
volume_str = f"{volume/1_000:.2f}K"
else:
volume_str = f"{volume:.2f}"
else:
volume_str = "N/A"
if price is not None:
if price < 0.01 and price > 0:
price_str = f"{price:.8f}"
else:
price_str = f"{price:.4f}"
else:
price_str = "N/A"
if bid_ask_spread is not None:
spread_str = f"{bid_ask_spread:.2f}%"
else:
spread_str = "N/A"
if last_traded:
# Format timestamp to just show date
try:
last_traded_str = last_traded.split('T')[0]
except:
last_traded_str = last_traded
else:
last_traded_str = "N/A"
result += f"| {base} | {target} | {volume_str} | {price_str} | {spread_str} | {last_traded_str} |\n"
return result
except Exception as e:
return f"❌ Error fetching exchange details: {str(e)}"
# ---------------------------------------------------------------------------
# Simple API endpoints
# ---------------------------------------------------------------------------
# Dokumentasi: https://docs.coingecko.com/reference/simple-supported-currencies
@mcp.tool("get_supported_vs_currencies")
async def get_supported_vs_currencies() -> str:
"""Get list of supported vs currencies."""
try:
# Fetch data
data = await fetch_coingecko_json("/simple/supported_vs_currencies")
if "error" in data:
return f"❌ Error fetching supported currencies: {data['error']}"
if not isinstance(data, list) or not data:
return "❌ No supported currencies found"
# Format response as markdown
result = "## Supported VS Currencies\n\n"
# Group currencies by type (fiat, crypto, commodity)
fiat_currencies = ["usd", "eur", "jpy", "gbp", "aud", "cad", "chf", "cny", "hkd", "nzd", "sek", "krw", "sgd", "nok", "mxn", "inr", "rub", "zar", "try", "brl", "twd", "dkk", "pln", "thb", "idr", "czk", "aed", "ars", "clp", "cop", "egp", "ils", "kwd", "myr", "ngn", "php", "pkr", "sar", "vef", "vnd"]
crypto_currencies = ["btc", "eth", "ltc", "bch", "bnb", "eos", "xrp", "xlm", "dot", "yfi", "aave", "uni"]
commodity_currencies = ["xag", "xau"]
# Categorize currencies
categorized_currencies = {
"Fiat": [],
"Crypto": [],
"Commodity": [],
"Other": []
}
for currency in data:
currency = currency.lower()
if currency in fiat_currencies:
categorized_currencies["Fiat"].append(currency)
elif currency in crypto_currencies:
categorized_currencies["Crypto"].append(currency)
elif currency in commodity_currencies:
categorized_currencies["Commodity"].append(currency)
else:
categorized_currencies["Other"].append(currency)
# Format each category
for category, currencies in categorized_currencies.items():
if currencies:
result += f"### {category} Currencies\n\n"
# Format as a table with multiple columns
currencies.sort()
columns = 5 # Number of columns in the table
rows = (len(currencies) + columns - 1) // columns # Ceiling division
# Create table header
result += "|" + "||".join([" Currency " for _ in range(min(columns, len(currencies)))]) + "|\n"
result += "|" + "||".join([":------:" for _ in range(min(columns, len(currencies)))]) + "|\n"
# Create table rows
for i in range(rows):
row = ""
for j in range(columns):
idx = i + j * rows
if idx < len(currencies):
row += f" `{currencies[idx].upper()}` |"
else:
row += " |"
result += "|" + row + "\n"
result += "\n"
# Add total count
result += f"**Total supported currencies:** {len(data)}\n"
return result
except Exception as e:
return f"❌ Error fetching supported currencies: {str(e)}"
# ---------------------------------------------------------------------------
# Main entry point
# ---------------------------------------------------------------------------
if __name__ == "__main__":
# Jalankan server MCP
mcp.run()