Skip to main content
Glama

MCP Crypto API Servers

coingecko_server.py84.7 kB
"""CoinGecko MCP Server This server provides tools for interacting with the CoinGecko API to get information about cryptocurrencies, exchanges, and market data. Based on CoinGecko API Pro documentation: https://docs.coingecko.com/reference/introduction """ from __future__ import annotations from typing import Any, Dict, List, Optional import urllib.parse import json import sys import traceback import os import time from datetime import datetime from dotenv import load_dotenv from mcp.server import FastMCP # Load environment variables from .env file load_dotenv() # --------------------------------------------------------------------------- # Constants & configuration # --------------------------------------------------------------------------- # Initialize MCP server mcp = FastMCP() # CoinGecko API configuration - using demo API (no key required) BASE_URL = "https://api.coingecko.com/api/v3" API_KEY = None # Debug mode - set to True for detailed logging DEBUG = True # Will be loaded from .env # --------------------------------------------------------------------------- # Helper functions # --------------------------------------------------------------------------- async def fetch_json(url: str, headers: dict = None, params: dict = None) -> dict: """Fetch JSON data from a URL with optional headers and parameters. Args: url: URL to fetch data from headers: Optional headers to include in the request params: Optional query parameters Returns: dict: JSON response data or error dict """ import aiohttp import asyncio # Debug info print(f"DEBUG - Fetching URL: {url}", file=sys.stderr) if params: print(f"DEBUG - With params: {params}", file=sys.stderr) if headers: print(f"DEBUG - With headers: {headers}", file=sys.stderr) try: async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers, params=params, timeout=30) as response: if response.status == 200: return await response.json() else: error_text = await response.text() print(f"ERROR - HTTP {response.status}: {error_text}", file=sys.stderr) return {"error": f"HTTP {response.status}: {error_text}"} except asyncio.TimeoutError: print(f"ERROR - Request timed out: {url}", file=sys.stderr) return {"error": "Request timed out"} except Exception as e: print(f"ERROR - Exception in fetch_json: {str(e)}", file=sys.stderr) return {"error": str(e)} async def fetch_coingecko_json(endpoint: str, params: dict = None) -> dict: """Fungsi helper untuk mengambil data JSON dari CoinGecko API dengan API key. Args: endpoint: Endpoint API CoinGecko (tanpa base URL) params: Parameter query tambahan Returns: API response as dictionary """ if not params: params = {} # Standard headers for demo API headers = { "Accept": "application/json", "User-Agent": "MCP-CoinGecko-Client/1.0" } # Build full URL url = f"{BASE_URL}{endpoint}" if endpoint.startswith('/') else f"{BASE_URL}/{endpoint}" if DEBUG: print(f"DEBUG - CoinGecko API request: {url}", file=sys.stderr) try: # Fetch data using common helper data = await fetch_json(url, headers=headers, params=params) # Handle rate limiting for demo API if "error" in data and ("rate limit" in str(data["error"]).lower() or "429" in str(data)): print("WARNING - Rate limit hit, waiting 60 seconds", file=sys.stderr) time.sleep(60) return await fetch_coingecko_json(endpoint, params) if "error" in data and DEBUG: print(f"DEBUG - API Error: {data['error']}", file=sys.stderr) return data except Exception as e: print(f"ERROR in fetch_coingecko_json: {str(e)}", file=sys.stderr) return {"error": str(e)} # --------------------------------------------------------------------------- # Coin API endpoints # --------------------------------------------------------------------------- # Dokumentasi: https://docs.coingecko.com/reference/coins-markets @mcp.tool("get_coin_markets") async def get_coin_markets(vs_currency: str = "usd", category: str = None, order: str = "market_cap_desc", per_page: int = 20, page: int = 1) -> str: """Get list of coins with market data. Args: vs_currency: The target currency of market data (usd, eur, jpy, etc.) category: Filter by coin category order: Sort results by field (market_cap_desc, volume_desc, id_asc, etc.) per_page: Number of results per page (1-250) page: Page number """ try: # Validasi parameter if per_page < 1 or per_page > 250: return "❌ Error: per_page must be between 1 and 250" # Set parameters params = { "vs_currency": vs_currency, "order": order, "per_page": per_page, "page": page, "sparkline": "false" } # Add optional parameters if provided if category: params["category"] = category # Fetch data data = await fetch_coingecko_json("/coins/markets", params) if "error" in data: return f"❌ Error fetching coin markets: {data['error']}" if not isinstance(data, list): return f"❌ Invalid response format: {data}" if not data: return "No coin market data found for the specified parameters" # Format response as markdown result = f"## Coin Markets (vs {vs_currency.upper()})\n\n" # Add category info if provided if category: result += f"Category: {category}\n\n" # Format as table result += "| # | Coin | Symbol | Price | 24h % | 7d % | Market Cap | Volume (24h) |\n" result += "|---|------|--------|-------|------|------|------------|----------------|\n" for i, coin in enumerate(data): name = coin.get("name", "N/A") symbol = coin.get("symbol", "N/A").upper() price = coin.get("current_price", 0) price_change_24h = coin.get("price_change_percentage_24h", 0) price_change_7d = coin.get("price_change_percentage_7d_in_currency", 0) market_cap = coin.get("market_cap", 0) volume = coin.get("total_volume", 0) # Format values with appropriate precision if price is not None: if price < 0.01 and price > 0: price_str = f"{price:.8f}" else: price_str = f"{price:,.2f}" else: price_str = "N/A" # Format price changes with arrows if price_change_24h is not None: arrow_24h = "⬆" if price_change_24h > 0 else "⬇" if price_change_24h < 0 else "" price_change_24h_str = f"{arrow_24h} {price_change_24h:+.2f}%" else: price_change_24h_str = "N/A" if price_change_7d is not None: arrow_7d = "⬆" if price_change_7d > 0 else "⬇" if price_change_7d < 0 else "" price_change_7d_str = f"{arrow_7d} {price_change_7d:+.2f}%" else: price_change_7d_str = "N/A" # Format market cap and volume with appropriate units if market_cap is not None: if market_cap >= 1_000_000_000: market_cap_str = f"${market_cap/1_000_000_000:.2f}B" elif market_cap >= 1_000_000: market_cap_str = f"${market_cap/1_000_000:.2f}M" else: market_cap_str = f"${market_cap:,.0f}" else: market_cap_str = "N/A" if volume is not None: if volume >= 1_000_000_000: volume_str = f"${volume/1_000_000_000:.2f}B" elif volume >= 1_000_000: volume_str = f"${volume/1_000_000:.2f}M" else: volume_str = f"${volume:,.0f}" else: volume_str = "N/A" result += f"| {i+1} | {name} | {symbol} | ${price_str} | {price_change_24h_str} | {price_change_7d_str} | {market_cap_str} | {volume_str} |\n" # Add pagination info result += f"\n*Page {page}, showing {len(data)} results per page*\n" return result except Exception as e: return f"❌ Error fetching coin markets: {str(e)}" # Dokumentasi: https://docs.coingecko.com/reference/ping-server @mcp.tool("ping") async def ping() -> str: """Check if the CoinGecko API server is up and running.""" try: data = await fetch_coingecko_json("/ping") if "gecko_says" in data: return "✅ CoinGecko API server is up and running!" else: return f"❌ CoinGecko API server returned unexpected response: {data}" except Exception as e: return f"❌ Error checking CoinGecko API server: {str(e)}" # Dokumentasi: https://docs.coingecko.com/reference/coins-list @mcp.tool("get_coin_list") async def get_coin_list(include_platform: bool = False) -> str: """Get list of all supported coins with id, name, and symbol. Args: include_platform: Include platform contract addresses """ try: # Fetch coin list from CoinGecko API params = {"include_platform": "true" if include_platform else "false"} data = await fetch_coingecko_json("/coins/list", params=params) if "error" in data: return f"❌ Error fetching coin list: {data['error']}" # Check if data is a list if not isinstance(data, list): return f"❌ Unexpected response format: {data}" # Limit the number of coins to display (to avoid overwhelming output) max_coins = 20 limited_data = data[:max_coins] # Format response as markdown table result = "## CoinGecko Coin List\n\n" result += "| ID | Symbol | Name |\n" result += "|-----|--------|------|\n" for coin in limited_data: coin_id = coin.get("id", "N/A") symbol = coin.get("symbol", "N/A") name = coin.get("name", "N/A") result += f"| {coin_id} | {symbol} | {name} |\n" # Add note about limited results if len(data) > max_coins: result += f"\n*Showing {max_coins} of {len(data)} coins*\n\n" result += f"**Total coins available:** {len(data)}" return result except Exception as e: return f"❌ Error fetching coin list: {str(e)}" # Dokumentasi: https://docs.coingecko.com/reference/simple-price @mcp.tool("get_coin_price") async def get_coin_price(ids: str, vs_currencies: str = "usd") -> str: """Get current price of coins in any currency. Args: ids: Comma-separated coin ids (e.g. bitcoin,ethereum) vs_currencies: Comma-separated currency codes (e.g. usd,eur) """ try: # Validasi input if not ids: return "❌ Error: Coin IDs are required" # Siapkan parameter params = { "ids": ids, "vs_currencies": vs_currencies, "include_market_cap": "true", "include_24hr_vol": "true", "include_24hr_change": "true", "include_last_updated_at": "true" } # Ambil data dari API data = await fetch_coingecko_json("/simple/price", params) if "error" in data: return f"❌ Error fetching coin prices: {data['error']}" if not data or len(data) == 0: return "❌ No price data found for the specified coins" # Format response as markdown table result = "## CoinGecko Coin Prices\n\n" # Split currencies currencies = vs_currencies.split(",") # Format as table result += "| Coin | Currency | Price | Market Cap | 24h Volume | 24h Change | Last Updated |\n" result += "|------|----------|-------|------------|------------|------------|----------------|\n" for coin_id, prices in data.items(): for currency in currencies: if currency in prices: price = prices.get(currency, "N/A") market_cap = prices.get(f"{currency}_market_cap", "N/A") vol_24h = prices.get(f"{currency}_24h_vol", "N/A") change_24h = prices.get(f"{currency}_24h_change", "N/A") # Format change with sign if change_24h != "N/A": change_sign = "🔺" if change_24h >= 0 else "🔻" change_formatted = f"{change_sign} {abs(change_24h):.2f}%" else: change_formatted = "N/A" # Format last updated last_updated_at = prices.get("last_updated_at", None) if last_updated_at: last_updated = datetime.fromtimestamp(last_updated_at).strftime('%Y-%m-%d %H:%M:%S') else: last_updated = "N/A" # Format numbers for better readability if market_cap != "N/A" and market_cap > 1000000: market_cap = f"${market_cap/1000000:.2f}M" if vol_24h != "N/A" and vol_24h > 1000000: vol_24h = f"${vol_24h/1000000:.2f}M" result += f"| {coin_id} | {currency.upper()} | ${price} | {market_cap} | {vol_24h} | {change_formatted} | {last_updated} |\n" return result except Exception as e: print(f"ERROR in get_coin_price: {str(e)}", file=sys.stderr) return f"Terjadi kesalahan saat mengambil harga koin: {str(e)}" # Dokumentasi: https://docs.coingecko.com/reference/coins-id @mcp.tool("get_coin_detail") async def get_coin_detail(id: str, localization: bool = False, tickers: bool = True, market_data: bool = True, community_data: bool = True, developer_data: bool = True) -> str: """Get current data for a coin including price, market cap, volume, and more. Args: id: The coin id (e.g. bitcoin, ethereum) localization: Include localized data tickers: Include ticker data market_data: Include market data community_data: Include community data developer_data: Include developer data """ try: # Validasi parameter if not id: return "❌ Error: Coin ID is required" # Set parameters params = { "localization": str(localization).lower(), "tickers": str(tickers).lower(), "market_data": str(market_data).lower(), "community_data": str(community_data).lower(), "developer_data": str(developer_data).lower(), "sparkline": "false" } # Fetch data data = await fetch_coingecko_json(f"/coins/{id}", params) if "error" in data: return f"❌ Error fetching coin details: {data['error']}" # Format response as markdown result = f"## {data.get('name', 'N/A')} ({data.get('symbol', 'N/A').upper()})\n\n" # Add basic info result += "### Basic Information\n\n" result += f"**ID:** {data.get('id', 'N/A')} \n" result += f"**Symbol:** {data.get('symbol', 'N/A').upper()} \n" result += f"**Name:** {data.get('name', 'N/A')} \n" # Add image if available if 'image' in data and 'large' in data['image']: result += f"**Logo:** ![{data.get('name', 'Coin')} Logo]({data['image']['large']}) \n" # Add market data if available if market_data and 'market_data' in data: market = data['market_data'] result += "\n### Market Data\n\n" # Current price if 'current_price' in market: result += "**Current Price:** \n" for currency, price in list(market['current_price'].items())[:5]: # Limit to 5 currencies result += f"- {currency.upper()}: {price:,.8f} \n" # Market cap if 'market_cap' in market: usd_market_cap = market['market_cap'].get('usd', 0) if usd_market_cap: if usd_market_cap >= 1_000_000_000: market_cap_str = f"${usd_market_cap/1_000_000_000:.2f}B" elif usd_market_cap >= 1_000_000: market_cap_str = f"${usd_market_cap/1_000_000:.2f}M" else: market_cap_str = f"${usd_market_cap:,.0f}" result += f"**Market Cap:** {market_cap_str} \n" # 24h volume if 'total_volume' in market: usd_volume = market['total_volume'].get('usd', 0) if usd_volume: if usd_volume >= 1_000_000_000: volume_str = f"${usd_volume/1_000_000_000:.2f}B" elif usd_volume >= 1_000_000: volume_str = f"${usd_volume/1_000_000:.2f}M" else: volume_str = f"${usd_volume:,.0f}" result += f"**24h Trading Volume:** {volume_str} \n" # Price changes if 'price_change_percentage_24h' in market: change_24h = market['price_change_percentage_24h'] if change_24h is not None: arrow = "⬆" if change_24h > 0 else "⬇" if change_24h < 0 else "" result += f"**24h Change:** {arrow} {change_24h:+.2f}% \n" if 'price_change_percentage_7d' in market: change_7d = market['price_change_percentage_7d'] if change_7d is not None: arrow = "⬆" if change_7d > 0 else "⬇" if change_7d < 0 else "" result += f"**7d Change:** {arrow} {change_7d:+.2f}% \n" if 'price_change_percentage_30d' in market: change_30d = market['price_change_percentage_30d'] if change_30d is not None: arrow = "⬆" if change_30d > 0 else "⬇" if change_30d < 0 else "" result += f"**30d Change:** {arrow} {change_30d:+.2f}% \n" # All-time high if 'ath' in market and 'ath_date' in market and 'ath_change_percentage' in market: ath_usd = market['ath'].get('usd') ath_date_usd = market['ath_date'].get('usd') ath_change_percentage_usd = market['ath_change_percentage'].get('usd') if ath_usd and ath_date_usd: ath_date_formatted = ath_date_usd.split('T')[0] # Extract just the date part result += f"**All-Time High:** ${ath_usd:,.2f} ({ath_date_formatted}) \n" if ath_change_percentage_usd is not None: result += f"**From ATH:** {ath_change_percentage_usd:+.2f}% \n" # Add community data if available if community_data and 'community_data' in data: community = data['community_data'] result += "\n### Community Data\n\n" if 'twitter_followers' in community: twitter = community['twitter_followers'] if twitter: result += f"**Twitter Followers:** {twitter:,} \n" if 'reddit_subscribers' in community: reddit = community['reddit_subscribers'] if reddit: result += f"**Reddit Subscribers:** {reddit:,} \n" # Add developer data if available if developer_data and 'developer_data' in data: dev = data['developer_data'] result += "\n### Developer Data\n\n" if 'forks' in dev: forks = dev['forks'] if forks is not None: result += f"**Forks:** {forks} \n" if 'stars' in dev: stars = dev['stars'] if stars is not None: result += f"**Stars:** {stars} \n" if 'subscribers' in dev: subscribers = dev['subscribers'] if subscribers is not None: result += f"**Subscribers:** {subscribers} \n" if 'total_issues' in dev: issues = dev['total_issues'] if issues is not None: result += f"**Total Issues:** {issues} \n" if 'closed_issues' in dev: closed = dev['closed_issues'] if closed is not None: result += f"**Closed Issues:** {closed} \n" # Add links if available if 'links' in data: links = data['links'] result += "\n### Links\n\n" if 'homepage' in links and links['homepage']: homepage = links['homepage'][0] if homepage: result += f"**Website:** {homepage} \n" if 'blockchain_site' in links and links['blockchain_site']: blockchain = links['blockchain_site'][0] if blockchain: result += f"**Blockchain Explorer:** {blockchain} \n" if 'official_forum_url' in links and links['official_forum_url']: forum = links['official_forum_url'][0] if forum: result += f"**Official Forum:** {forum} \n" if 'chat_url' in links and links['chat_url']: chat = links['chat_url'][0] if chat: result += f"**Chat:** {chat} \n" if 'announcement_url' in links and links['announcement_url']: announcement = links['announcement_url'][0] if announcement: result += f"**Announcements:** {announcement} \n" # Add description if available if 'description' in data and 'en' in data['description'] and data['description']['en']: description = data['description']['en'] # Truncate if too long if len(description) > 500: description = description[:500] + "..." result += "\n### Description\n\n" result += f"{description}\n" return result except Exception as e: return f"❌ Error fetching market chart data: {str(e)}" # Dokumentasi: https://docs.coingecko.com/reference/coins-id-market-chart @mcp.tool("get_coin_market_chart") async def get_coin_market_chart(id: str, vs_currency: str = "usd", days: str = "14", interval: str = None) -> str: """Get historical market data for a coin. Args: id: Coin id (e.g. bitcoin) vs_currency: Currency code (e.g. usd) days: Data up to number of days ago (any integer or 'max') interval: Data interval (e.g. 'daily', leave empty for auto granularity) """ try: # Validasi input if not id: return "❌ Error: Coin ID is required" # Validasi interval parameter jika disediakan if interval and interval not in ["daily"]: return "❌ Error: 'interval' must be 'daily' or empty" # Siapkan parameter params = { "vs_currency": vs_currency, "days": days } # Tambahkan interval jika disediakan if interval: params["interval"] = interval # Ambil data dari API data = await fetch_coingecko_json(f"/coins/{id}/market_chart", params) if "error" in data: return f"❌ Error fetching market chart: {data['error']}" # Format response as markdown result = f"## {id.capitalize()} Market Chart ({days} days)\n\n" # Process prices prices = data.get("prices", []) if not prices: return f"{result}❌ No price data found" # Get first and last price first_price = prices[0][1] if len(prices) > 0 else None last_price = prices[-1][1] if len(prices) > 0 else None # Calculate change if first_price and last_price: change = ((last_price - first_price) / first_price) * 100 change_sign = "🔺" if change >= 0 else "🔻" change_str = f"{change_sign} {abs(change):.2f}%" else: change_str = "N/A" # Summary result += "### Summary\n\n" result += f"- Currency: {vs_currency.upper()}\n" result += f"- Period: {days} days\n" result += f"- Starting price: ${first_price:.4f}\n" if first_price else "" result += f"- Current price: ${last_price:.4f}\n" if last_price else "" result += f"- Change: {change_str}\n\n" # Market caps market_caps = data.get("market_caps", []) if market_caps and len(market_caps) > 0: last_market_cap = market_caps[-1][1] if last_market_cap > 1000000000: result += f"- Market Cap: ${last_market_cap/1000000000:.2f}B\n" elif last_market_cap > 1000000: result += f"- Market Cap: ${last_market_cap/1000000:.2f}M\n" else: result += f"- Market Cap: ${last_market_cap:,.0f}\n" # Volumes volumes = data.get("total_volumes", []) if volumes and len(volumes) > 0: last_volume = volumes[-1][1] if last_volume > 1000000000: result += f"- 24h Volume: ${last_volume/1000000000:.2f}B\n\n" elif last_volume > 1000000: result += f"- 24h Volume: ${last_volume/1000000:.2f}M\n\n" else: result += f"- 24h Volume: ${last_volume:,.0f}\n\n" # Sample data result += "### Recent Price Data\n\n" result += "| Date | Price | Market Cap | Volume |\n" result += "|------|-------|------------|--------|\n" # Get last 5 data points with even spacing sample_size = min(5, len(prices)) step = max(1, len(prices) // sample_size) sample_indices = [len(prices) - 1 - (i * step) for i in range(sample_size)] sample_indices.sort() # Sort in ascending order for idx in sample_indices: if idx < 0 or idx >= len(prices): continue ts = prices[idx][0] # Find matching data points price_val = prices[idx][1] if idx < len(prices) else None mcap_val = market_caps[idx][1] if idx < len(market_caps) else None vol_val = volumes[idx][1] if idx < len(volumes) else None # Format timestamp date_str = datetime.fromtimestamp(ts/1000).strftime('%Y-%m-%d') # Format values price_str = f"${price_val:.4f}" if price_val is not None else "N/A" if mcap_val is not None: if mcap_val > 1000000000: mcap_str = f"${mcap_val/1000000000:.2f}B" elif mcap_val > 1000000: mcap_str = f"${mcap_val/1000000:.2f}M" else: mcap_str = f"${mcap_val:,.0f}" else: mcap_str = "N/A" if vol_val is not None: if vol_val > 1000000000: vol_str = f"${vol_val/1000000000:.2f}B" elif vol_val > 1000000: vol_str = f"${vol_val/1000000:.2f}M" else: vol_str = f"${vol_val:,.0f}" else: vol_str = "N/A" result += f"| {date_str} | {price_str} | {mcap_str} | {vol_str} |\n" # Tambahkan informasi tambahan result += "\n### Informasi Tambahan\n\n" result += "*Untuk informasi lebih detail tentang koin ini, gunakan fungsi `get_coin_detail`*\n" result += "*Data disediakan oleh CoinGecko API*\n" return result except Exception as e: return f"❌ Error fetching market chart data: {str(e)}" # Dokumentasi: https://docs.coingecko.com/reference/search-trending @mcp.tool("get_trending_coins") async def get_trending_coins() -> str: """Get trending search coins on CoinGecko in the last 24 hours.""" try: # Ambil data dari API data = await fetch_coingecko_json("/search/trending") if "error" in data: return f"❌ Error fetching trending coins: {data['error']}" # Format response as markdown result = "## Trending Coins (24h)\n\n" # Check if we have trending coins coins = data.get("coins", []) if not coins: return f"{result}❌ No trending coins found" # Format as table result += "| Rank | Coin | Symbol | Market Cap Rank | Price (BTC) |\n" result += "|------|------|--------|----------------|-------------|\n" for i, item in enumerate(coins): coin = item.get("item", {}) name = coin.get("name", "N/A") symbol = coin.get("symbol", "N/A").upper() market_cap_rank = coin.get("market_cap_rank", "N/A") price_btc = coin.get("price_btc", "N/A") # Format BTC price with appropriate precision if price_btc != "N/A": price_btc = f"{price_btc:.8f} BTC" # Add coin ID for reference coin_id = coin.get("id", "N/A") result += f"| {i+1} | {name} | {symbol} | {market_cap_rank} | {price_btc} |\n" return result except Exception as e: return f"❌ Error fetching trending coins: {str(e)}" @mcp.tool("search_coins") async def search_coins(query: str) -> str: """Search for coins, categories and markets. Args: query: Search query """ try: # Validasi input if not query: return "❌ Error: Search query is required" # Ambil data dari API data = await fetch_coingecko_json("/search", {"query": query}) if "error" in data: return f"❌ Error searching for '{query}': {data['error']}" # Format response as markdown result = f"## Search Results: '{query}'\n\n" # Process coins coins = data.get("coins", []) if coins: result += f"### Coins ({len(coins)})\n\n" result += "| ID | Name | Symbol | Market Cap Rank |\n" result += "|-----|------|--------|--------------------|\n" # Limit to first 15 coins for readability display_limit = min(15, len(coins)) for coin in coins[:display_limit]: coin_id = coin.get("id", "N/A") name = coin.get("name", "N/A") symbol = coin.get("symbol", "N/A").upper() market_cap_rank = coin.get("market_cap_rank", "N/A") result += f"| {coin_id} | {name} | {symbol} | {market_cap_rank} |\n" if len(coins) > display_limit: result += f"\n*Showing {display_limit} of {len(coins)} coins*\n" else: result += f"### Coins\n\nNo coins found matching '{query}'.\n" # Process exchanges exchanges = data.get("exchanges", []) if exchanges: result += f"\n### Exchanges ({len(exchanges)})\n\n" result += "| ID | Name | Market Type |\n" result += "|-----|------|------------|\n" # Limit to first 10 exchanges for readability display_limit = min(10, len(exchanges)) for exchange in exchanges[:display_limit]: exchange_id = exchange.get("id", "N/A") name = exchange.get("name", "N/A") market_type = exchange.get("market_type", "N/A") result += f"| {exchange_id} | {name} | {market_type} |\n" if len(exchanges) > display_limit: result += f"\n*Showing {display_limit} of {len(exchanges)} exchanges*\n" # Process categories categories = data.get("categories", []) if categories: result += f"\n### Categories ({len(categories)})\n\n" result += "| ID | Name |\n" result += "|-----|------|\n" # Limit to first 10 categories for readability display_limit = min(10, len(categories)) for category in categories[:display_limit]: category_id = category.get("id", "N/A") name = category.get("name", "N/A") result += f"| {category_id} | {name} |\n" if len(categories) > display_limit: result += f"\n*Showing {display_limit} of {len(categories)} categories*\n" return result except Exception as e: return f"❌ Error searching for '{query}': {str(e)}" # Dokumentasi: https://docs.coingecko.com/reference/coins-categories-list @mcp.tool("get_coin_categories") async def get_coin_categories() -> str: """Get list of all coin categories.""" try: # Fetch data data = await fetch_coingecko_json("/coins/categories/list") if "error" in data: return f"❌ Error fetching coin categories: {data['error']}" if not isinstance(data, list): return f"❌ Invalid response format: {data}" if not data: return "No coin categories found" # Format response as markdown result = "## Coin Categories\n\n" # Format as table result += "| Category ID | Category Name |\n" result += "|-------------|-----------------|\n" for category in data: category_id = category.get("category_id", "N/A") name = category.get("name", "N/A") result += f"| {category_id} | {name} |\n" # Add count info result += f"\n*Total categories: {len(data)}*\n" return result except Exception as e: return f"❌ Error fetching coin categories: {str(e)}" # Dokumentasi: https://docs.coingecko.com/reference/coins-categories @mcp.tool("get_coin_category_details") async def get_coin_category_details(category_id: str) -> str: """Get market data for a specific coin category. Args: category_id: The category id (e.g. 'decentralized-finance-defi') """ try: # Validasi parameter if not category_id: return "❌ Error: Category ID is required" # Fetch data data = await fetch_coingecko_json("/coins/categories") if "error" in data: return f"❌ Error fetching category details: {data['error']}" if not isinstance(data, list): return f"❌ Invalid response format: {data}" # Find the requested category category = None for cat in data: if cat.get("id") == category_id: category = cat break if not category: return f"❌ Category '{category_id}' not found" # Format response as markdown result = f"## {category.get('name', 'N/A')} Category\n\n" # Add market data market_cap = category.get("market_cap", 0) if market_cap: if market_cap >= 1_000_000_000: market_cap_str = f"${market_cap/1_000_000_000:.2f}B" elif market_cap >= 1_000_000: market_cap_str = f"${market_cap/1_000_000:.2f}M" else: market_cap_str = f"${market_cap:,.0f}" result += f"**Market Cap:** {market_cap_str} \n" volume_24h = category.get("volume_24h", 0) if volume_24h: if volume_24h >= 1_000_000_000: volume_str = f"${volume_24h/1_000_000_000:.2f}B" elif volume_24h >= 1_000_000: volume_str = f"${volume_24h/1_000_000:.2f}M" else: volume_str = f"${volume_24h:,.0f}" result += f"**24h Volume:** {volume_str} \n" # Add price changes market_cap_change_24h = category.get("market_cap_change_24h", 0) if market_cap_change_24h is not None: arrow = "⬆" if market_cap_change_24h > 0 else "⬇" if market_cap_change_24h < 0 else "" result += f"**24h Market Cap Change:** {arrow} {market_cap_change_24h:+.2f}% \n" # Add top coins in category top_coins = category.get("top_3_coins", []) if top_coins: result += "\n**Top Coins in Category:** \n" for i, coin_url in enumerate(top_coins[:3]): result += f"{i+1}. ![Coin]({coin_url}) \n" # Add updated timestamp if available updated_at = category.get("updated_at", "") if updated_at: result += f"\n*Last updated: {updated_at}*\n" return result except Exception as e: return f"❌ Error fetching category details: {str(e)}" # Market API endpoints # --------------------------------------------------------------------------- # Dokumentasi: https://docs.coingecko.com/reference/global @mcp.tool("get_global_data") async def get_global_data() -> str: """Get cryptocurrency global market data.""" try: # Ambil data dari API data = await fetch_coingecko_json("/global") if "error" in data: return f"❌ Error fetching global market data: {data['error']}" # Extract data data = data.get("data", {}) if not data: return "❌ No global market data found" # Format response as markdown result = "## Global Cryptocurrency Market Data\n\n" # Market cap total_market_cap = data.get("total_market_cap", {}) total_market_cap_usd = total_market_cap.get("usd", 0) # Volume total_volume = data.get("total_volume", {}) total_volume_usd = total_volume.get("usd", 0) # Format market cap and volume with appropriate units if total_market_cap_usd >= 1_000_000_000_000: market_cap_formatted = f"${total_market_cap_usd/1_000_000_000_000:.2f}T" elif total_market_cap_usd >= 1_000_000_000: market_cap_formatted = f"${total_market_cap_usd/1_000_000_000:.2f}B" else: market_cap_formatted = f"${total_market_cap_usd:,.0f}" if total_volume_usd >= 1_000_000_000_000: volume_formatted = f"${total_volume_usd/1_000_000_000_000:.2f}T" elif total_volume_usd >= 1_000_000_000: volume_formatted = f"${total_volume_usd/1_000_000_000:.2f}B" else: volume_formatted = f"${total_volume_usd:,.0f}" # Market cap percentage market_cap_percentage = data.get("market_cap_percentage", {}) # Format summary result += "### Summary\n\n" result += "| Metric | Value |\n" result += "|--------|-------|\n" result += f"| Active Cryptocurrencies | {data.get('active_cryptocurrencies', 'N/A')} |\n" result += f"| Active Markets | {data.get('markets', 'N/A')} |\n" result += f"| Total Market Cap | {market_cap_formatted} |\n" result += f"| Total 24h Volume | {volume_formatted} |\n" result += f"| BTC Dominance | {data.get('market_cap_percentage', {}).get('btc', 0):.2f}% |\n" result += f"| ETH Dominance | {data.get('market_cap_percentage', {}).get('eth', 0):.2f}% |\n" result += f"| Market Cap Change 24h | {data.get('market_cap_change_percentage_24h_usd', 0):.2f}% |\n" # Market cap percentage (dominance) result += "\n### Market Cap Percentage (Dominance)\n\n" result += "| Coin | Percentage |\n" result += "|------|------------|\n" # Sort by percentage sorted_percentages = sorted(market_cap_percentage.items(), key=lambda x: x[1], reverse=True) for coin, percentage in sorted_percentages[:10]: # Top 10 coins by dominance coin_upper = coin.upper() result += f"| {coin_upper} | {percentage:.2f}% |\n" # Add update time if available if "updated_at" in data: from datetime import datetime update_time = datetime.fromtimestamp(data["updated_at"]).strftime('%Y-%m-%d %H:%M:%S') result += f"\n*Data updated at: {update_time}*\n" return result except Exception as e: return f"❌ Error fetching global market data: {str(e)}" @mcp.tool("get_top_gainers_losers") async def get_top_gainers_losers(vs_currency: str = "usd", time_period: str = "24h", top_count: int = 10) -> str: """Get top gainers and losers in the market. Args: vs_currency: Currency code (e.g. usd) time_period: Time period (1h, 24h, 7d, 14d, 30d, 200d, 1y) top_count: Number of top gainers/losers to show """ try: # Validasi parameter valid_periods = ["1h", "24h", "7d", "14d", "30d", "200d", "1y"] if time_period not in valid_periods: return f"❌ Error: Invalid time period. Use one of: {', '.join(valid_periods)}" if top_count < 1 or top_count > 100: return f"❌ Error: top_count must be between 1 and 100" # Map time_period ke parameter price_change_percentage period_map = { "1h": "1h", "24h": "24h", "7d": "7d", "14d": "14d", "30d": "30d", "200d": "200d", "1y": "1y" } price_change_key = f"price_change_percentage_{period_map[time_period]}_in_currency" # Set parameters for demo API params = { "vs_currency": vs_currency, "order": "market_cap_desc", "per_page": 250, # Get more coins to find top gainers/losers "page": 1, "price_change_percentage": period_map[time_period] } if DEBUG: print(f"DEBUG - Fetching top gainers/losers for {time_period}", file=sys.stderr) data = await fetch_coingecko_json("/coins/markets", params) if "error" in data: return f"❌ Error fetching top gainers/losers: {data['error']}" if not isinstance(data, list): return f"❌ Invalid response format: {data}" # Filter out coins with None price change filtered_data = [coin for coin in data if coin.get(price_change_key) is not None] if not filtered_data: return f"❌ No price change data available for period {time_period}" # Sort by price change percentage gainers = sorted(filtered_data, key=lambda x: x.get(price_change_key, 0), reverse=True) losers = sorted(filtered_data, key=lambda x: x.get(price_change_key, 0)) # Limit to top_count top_gainers = gainers[:top_count] top_losers = losers[:top_count] # Format response as markdown result = f"## Top Gainers and Losers ({time_period})\n\n" # Format top gainers result += f"### Top {top_count} Gainers\n\n" result += f"| Coin | Symbol | Price ({vs_currency.upper()}) | {time_period} Change | Market Cap |\n" result += f"|------|--------|-----------------|--------------|------------|\n" for coin in top_gainers: name = coin.get("name", "N/A") symbol = coin.get("symbol", "N/A").upper() price = coin.get("current_price", 0) price_change = coin.get(price_change_key, 0) market_cap = coin.get("market_cap", 0) # Format values with appropriate precision if price < 0.01 and price > 0: price_str = f"{price:.8f}" else: price_str = f"{price:,.2f}" price_change_str = f"{price_change:+.2f}%" # Format market cap with appropriate units if market_cap >= 1_000_000_000: market_cap_str = f"${market_cap/1_000_000_000:.2f}B" elif market_cap >= 1_000_000: market_cap_str = f"${market_cap/1_000_000:.2f}M" else: market_cap_str = f"${market_cap:,.0f}" result += f"| {name} | {symbol} | {price_str} | {price_change_str} | {market_cap_str} |\n" # Format top losers result += f"\n### Top {top_count} Losers\n\n" result += f"| Coin | Symbol | Price ({vs_currency.upper()}) | {time_period} Change | Market Cap |\n" result += f"|------|--------|-----------------|--------------|------------|\n" for coin in top_losers: name = coin.get("name", "N/A") symbol = coin.get("symbol", "N/A").upper() price = coin.get("current_price", 0) price_change = coin.get(price_change_key, 0) market_cap = coin.get("market_cap", 0) # Format values with appropriate precision if price < 0.01 and price > 0: price_str = f"{price:.8f}" else: price_str = f"{price:,.2f}" price_change_str = f"{price_change:+.2f}%" # Format market cap with appropriate units if market_cap >= 1_000_000_000: market_cap_str = f"${market_cap/1_000_000_000:.2f}B" elif market_cap >= 1_000_000: market_cap_str = f"${market_cap/1_000_000:.2f}M" else: market_cap_str = f"${market_cap:,.0f}" result += f"| {name} | {symbol} | {price_str} | {price_change_str} | {market_cap_str} |\n" return result except Exception as e: return f"❌ Error fetching top gainers/losers: {str(e)}" # Dokumentasi: https://docs.coingecko.com/reference/coins-id-market-chart-range @mcp.tool("get_coin_market_chart_range") async def get_coin_market_chart_range(id: str, vs_currency: str = "usd", from_timestamp: int = None, to_timestamp: int = None) -> str: """Get historical market data for a coin within a specific time range. Args: id: Coin id (e.g. bitcoin) vs_currency: Currency code (e.g. usd) from_timestamp: Starting date in UNIX timestamp to_timestamp: Ending date in UNIX timestamp """ try: # Validasi parameter if not id: return "❌ Error: Coin ID is required" # Default timestamps jika tidak disediakan if from_timestamp is None: # Default 7 days ago from_timestamp = int(time.time()) - (7 * 24 * 60 * 60) if to_timestamp is None: # Default to current time to_timestamp = int(time.time()) if from_timestamp >= to_timestamp: return "❌ Error: from_timestamp must be earlier than to_timestamp" # Set parameters params = { "vs_currency": vs_currency, "from": from_timestamp, "to": to_timestamp } # Fetch data data = await fetch_coingecko_json(f"/coins/{id}/market_chart/range", params) if "error" in data: return f"❌ Error fetching market chart data: {data['error']}" # Format response as markdown result = f"## Market Chart Data for {id.capitalize()} ({vs_currency.upper()})\n\n" # Add date range from_date = datetime.fromtimestamp(from_timestamp).strftime('%Y-%m-%d %H:%M') to_date = datetime.fromtimestamp(to_timestamp).strftime('%Y-%m-%d %H:%M') result += f"*From {from_date} to {to_date}*\n\n" # Process price data prices = data.get("prices", []) if prices: result += "### Price Data\n\n" result += "| Date | Price |\n" result += "|------|-------|\n" # Show only a sample of price data points (first, middle, and last few) sample_size = 7 total_points = len(prices) if total_points <= sample_size * 3: # If we have few data points, show all sample_prices = prices else: # Otherwise, show first few, middle few, and last few first = prices[:sample_size] middle_start = total_points // 2 - sample_size // 2 middle = prices[middle_start:middle_start + sample_size] last = prices[-sample_size:] sample_prices = first + [None] + middle + [None] + last for price_data in sample_prices: if price_data is None: result += "| ... | ... |\n" continue timestamp, price = price_data date = datetime.fromtimestamp(timestamp/1000).strftime('%Y-%m-%d %H:%M') # Format price based on value if vs_currency.lower() in ["btc", "eth", "ltc"] or price < 1: price_str = f"{price:.8f}" else: price_str = f"{price:.2f}" result += f"| {date} | {price_str} |\n" result += f"\n*Total price data points: {len(prices)}*\n" # Process market cap data market_caps = data.get("market_caps", []) if market_caps: result += "\n### Market Cap Data\n\n" # Calculate average, min, and max market cap if market_caps: market_cap_values = [mc[1] for mc in market_caps] avg_market_cap = sum(market_cap_values) / len(market_cap_values) min_market_cap = min(market_cap_values) max_market_cap = max(market_cap_values) # Format values for readability if avg_market_cap >= 1_000_000_000: avg_str = f"${avg_market_cap/1_000_000_000:.2f}B" min_str = f"${min_market_cap/1_000_000_000:.2f}B" max_str = f"${max_market_cap/1_000_000_000:.2f}B" elif avg_market_cap >= 1_000_000: avg_str = f"${avg_market_cap/1_000_000:.2f}M" min_str = f"${min_market_cap/1_000_000:.2f}M" max_str = f"${max_market_cap/1_000_000:.2f}M" else: avg_str = f"${avg_market_cap:.2f}" min_str = f"${min_market_cap:.2f}" max_str = f"${max_market_cap:.2f}" result += f"**Average Market Cap:** {avg_str} \n" result += f"**Min Market Cap:** {min_str} \n" result += f"**Max Market Cap:** {max_str} \n" result += f"**Data Points:** {len(market_caps)} \n" # Process volume data total_volumes = data.get("total_volumes", []) if total_volumes: result += "\n### Trading Volume Data\n\n" # Calculate average, min, and max volume if total_volumes: volume_values = [vol[1] for vol in total_volumes] avg_volume = sum(volume_values) / len(volume_values) min_volume = min(volume_values) max_volume = max(volume_values) # Format values for readability if avg_volume >= 1_000_000_000: avg_str = f"${avg_volume/1_000_000_000:.2f}B" min_str = f"${min_volume/1_000_000_000:.2f}B" max_str = f"${max_volume/1_000_000_000:.2f}B" elif avg_volume >= 1_000_000: avg_str = f"${avg_volume/1_000_000:.2f}M" min_str = f"${min_volume/1_000_000:.2f}M" max_str = f"${max_volume/1_000_000:.2f}M" else: avg_str = f"${avg_volume:.2f}" min_str = f"${min_volume:.2f}" max_str = f"${max_volume:.2f}" result += f"**Average Volume:** {avg_str} \n" result += f"**Min Volume:** {min_str} \n" result += f"**Max Volume:** {max_str} \n" result += f"**Data Points:** {len(total_volumes)} \n" return result except Exception as e: return f"❌ Error fetching market chart range data: {str(e)}" # Dokumentasi: https://docs.coingecko.com/reference/coins-id-tickers @mcp.tool("get_coin_tickers") async def get_coin_tickers(id: str, exchange_ids: str = None, include_exchange_logo: bool = False, page: int = 1, order: str = "trust_score_desc", depth: bool = False) -> str: """Get ticker data for a specific coin. Args: id: Coin id (e.g. bitcoin) exchange_ids: Comma-separated exchange ids (e.g. 'binance,gdax') include_exchange_logo: Include exchange logo URLs page: Page number order: Sort tickers by order (trust_score_desc, trust_score_asc, volume_desc) depth: Include 2% orderbook depth """ try: # Validasi parameter if not id: return "❌ Error: Coin ID is required" # Validasi order parameter valid_orders = ["trust_score_desc", "trust_score_asc", "volume_desc"] if order not in valid_orders: return f"❌ Error: Invalid order parameter. Valid options are: {', '.join(valid_orders)}" # Set parameters params = { "page": page, "order": order, "depth": str(depth).lower(), "include_exchange_logo": str(include_exchange_logo).lower() } # Add exchange_ids if provided if exchange_ids: params["exchange_ids"] = exchange_ids # Fetch data data = await fetch_coingecko_json(f"/coins/{id}/tickers", params) if "error" in data: return f"❌ Error fetching ticker data: {data['error']}" tickers = data.get("tickers", []) if not tickers: return f"❌ No ticker data found for {id}" # Format response as markdown result = f"## Ticker Data for {id.capitalize()}\n\n" # Add pagination info if available if "total" in data and "per_page" in data and "last" in data: total = data.get("total", 0) per_page = data.get("per_page", 0) last_page = data.get("last", 0) current_page = page result += f"*Page {current_page} of {last_page} (Total: {total} tickers, {per_page} per page)*\n\n" # Format as table result += "| Exchange | Pair | Price | Volume | Trust Score | Last Updated |\n" result += "|----------|------|-------|--------|-------------|-------------|\n" for ticker in tickers[:20]: # Limit to first 20 entries for readability exchange = ticker.get("market", {}).get("name", "Unknown") base = ticker.get("base", "") target = ticker.get("target", "") pair = f"{base}/{target}" # Get price price = ticker.get("last", 0) if price < 0.000001: price_str = f"{price:.8f}" elif price < 1: price_str = f"{price:.6f}" else: price_str = f"{price:.2f}" # Format volume volume = ticker.get("volume", 0) if volume >= 1_000_000_000: volume_str = f"${volume/1_000_000_000:.2f}B" elif volume >= 1_000_000: volume_str = f"${volume/1_000_000:.2f}M" elif volume >= 1_000: volume_str = f"${volume/1_000:.2f}K" else: volume_str = f"${volume:.2f}" # Get trust score trust_score = ticker.get("trust_score", "") if trust_score == "green": trust_score_str = "✅ High" elif trust_score == "yellow": trust_score_str = "⚠️ Medium" elif trust_score == "red": trust_score_str = "❌ Low" else: trust_score_str = "N/A" # Format timestamp timestamp = ticker.get("last_traded_at", "") if timestamp: try: dt = datetime.fromisoformat(timestamp.replace("Z", "+00:00")) timestamp_str = dt.strftime("%Y-%m-%d %H:%M") except: timestamp_str = timestamp else: timestamp_str = "N/A" result += f"| {exchange} | {pair} | {price_str} | {volume_str} | {trust_score_str} | {timestamp_str} |\n" # Add note if data was truncated if len(tickers) > 20: result += f"\n*Note: Showing 20 of {len(tickers)} tickers*\n" # Add market info if "market_cap_rank" in data: result += f"\n**Market Cap Rank:** #{data['market_cap_rank']}\n" # Add bid-ask spread if available and depth was requested if depth and any("cost_to_move_up_usd" in ticker or "cost_to_move_down_usd" in ticker for ticker in tickers[:5]): result += "\n### Order Book Depth (2%)\n\n" result += "| Exchange | Pair | Cost to Move Up | Cost to Move Down |\n" result += "|----------|------|----------------|------------------|\n" for ticker in [t for t in tickers[:10] if "cost_to_move_up_usd" in t or "cost_to_move_down_usd" in t]: exchange = ticker.get("market", {}).get("name", "Unknown") base = ticker.get("base", "") target = ticker.get("target", "") pair = f"{base}/{target}" cost_up = ticker.get("cost_to_move_up_usd", 0) cost_down = ticker.get("cost_to_move_down_usd", 0) # Format costs if cost_up >= 1_000_000: cost_up_str = f"${cost_up/1_000_000:.2f}M" elif cost_up >= 1_000: cost_up_str = f"${cost_up/1_000:.2f}K" else: cost_up_str = f"${cost_up:.2f}" if cost_down >= 1_000_000: cost_down_str = f"${cost_down/1_000_000:.2f}M" elif cost_down >= 1_000: cost_down_str = f"${cost_down/1_000:.2f}K" else: cost_down_str = f"${cost_down:.2f}" result += f"| {exchange} | {pair} | {cost_up_str} | {cost_down_str} |\n" return result except Exception as e: return f"❌ Error fetching ticker data: {str(e)}" # Dokumentasi: https://docs.coingecko.com/reference/coins-id-ohlc @mcp.tool("get_coin_ohlc") async def get_coin_ohlc(id: str, vs_currency: str = "usd", days: int = 7) -> str: """Get Open, High, Low, Close data for a coin. Args: id: Coin id (e.g. bitcoin) vs_currency: Currency code (e.g. usd) days: Data up to number of days (1, 7, 14, 30, 90, 180, 365) """ try: # Validasi parameter if not id: return "❌ Error: Coin ID is required" # Validasi days parameter valid_days = [1, 7, 14, 30, 90, 180, 365] if days not in valid_days: return f"❌ Error: Invalid days parameter. Valid options are: {', '.join(map(str, valid_days))}" # Set parameters params = { "vs_currency": vs_currency, "days": days } # Fetch data data = await fetch_coingecko_json(f"/coins/{id}/ohlc", params) if "error" in data: return f"❌ Error fetching OHLC data: {data['error']}" if not isinstance(data, list) or not data: return f"❌ No OHLC data found for {id}" # Format response as markdown result = f"## OHLC Data for {id.capitalize()} ({vs_currency.upper()})\n\n" result += f"*Last {days} days*\n\n" # Format as table result += "| Date | Open | High | Low | Close |\n" result += "|------|------|------|-----|-------|\n" # OHLC data format: [timestamp, open, high, low, close] for entry in data[:20]: # Limit to first 20 entries for readability if len(entry) >= 5: timestamp = entry[0] open_price = entry[1] high = entry[2] low = entry[3] close = entry[4] # Convert timestamp to date date = datetime.fromtimestamp(timestamp/1000).strftime('%Y-%m-%d %H:%M') # Format prices if vs_currency.lower() in ["btc", "eth", "ltc"]: # Use more decimal places for crypto base currencies open_str = f"{open_price:.8f}" high_str = f"{high:.8f}" low_str = f"{low:.8f}" close_str = f"{close:.8f}" elif open_price < 1: # Use more decimal places for small values open_str = f"{open_price:.6f}" high_str = f"{high:.6f}" low_str = f"{low:.6f}" close_str = f"{close:.6f}" else: open_str = f"{open_price:.2f}" high_str = f"{high:.2f}" low_str = f"{low:.2f}" close_str = f"{close:.2f}" result += f"| {date} | {open_str} | {high_str} | {low_str} | {close_str} |\n" # Add note if data was truncated if len(data) > 20: result += f"\n*Note: Showing 20 of {len(data)} data points*\n" # Add chart hint result += "\n*For visualization, consider plotting this data on a candlestick chart*\n" return result except Exception as e: return f"❌ Error fetching OHLC data: {str(e)}" # --------------------------------------------------------------------------- # NFT API endpoints # --------------------------------------------------------------------------- # Dokumentasi: https://docs.coingecko.com/reference/nfts-list @mcp.tool("get_nfts") async def get_nfts(per_page: int = 20, page: int = 1, order: str = "h24_volume_native_desc", asset_platform_id: str = None) -> str: """Get list of NFT collections with market data. Args: per_page: Number of results per page (1-250) page: Page number order: Sort results by field (market_cap_desc, volume_desc, id_asc, etc.) asset_platform_id: Filter by platform (e.g. 'ethereum') """ try: # Validasi parameter if per_page < 1 or per_page > 250: return "❌ Error: per_page must be between 1 and 250" # Validate order parameter valid_orders = ["h24_volume_native_desc", "h24_volume_native_asc", "floor_price_native_desc", "floor_price_native_asc", "market_cap_native_desc", "market_cap_native_asc"] if order not in valid_orders: return f"❌ Error: Invalid order parameter. Valid options are: {', '.join(valid_orders)}" # Set parameters params = { "per_page": per_page, "page": page, "order": order } # Add asset_platform_id if provided if asset_platform_id: params["asset_platform_id"] = asset_platform_id # Fetch data data = await fetch_coingecko_json("/nfts/list", params) if "error" in data: return f"❌ Error fetching NFT collections: {data['error']}" if not isinstance(data, list): return f"❌ Invalid response format: {data}" if not data: return "No NFT collections found for the specified parameters" # Format response as markdown result = "## NFT Collections\n\n" # Format as table result += "| # | Collection | Symbol | Contract | Platform |\n" result += "|---|-----------|--------|----------|----------|\n" for i, nft in enumerate(data): name = nft.get("name", "N/A") symbol = nft.get("symbol", "N/A") contract = nft.get("contract_address", "N/A") if len(contract) > 10: contract = contract[:6] + "..." + contract[-4:] platform = nft.get("asset_platform_id", "N/A") result += f"| {i+1} | {name} | {symbol} | {contract} | {platform} |\n" # Add pagination info result += f"\n*Page {page}, showing {len(data)} results per page*\n" return result except Exception as e: return f"❌ Error fetching NFT collections: {str(e)}" # Dokumentasi: https://docs.coingecko.com/reference/nfts-id @mcp.tool("get_nft_detail") async def get_nft_detail(id: str) -> str: """Get detailed data for a specific NFT collection. Args: id: The NFT collection id (e.g. 'cryptopunks') """ try: # Validasi parameter if not id: return "❌ Error: NFT collection ID is required" # Fetch data data = await fetch_coingecko_json(f"/nfts/{id}") if "error" in data: return f"❌ Error fetching NFT collection details: {data['error']}" # Format response as markdown result = f"## {data.get('name', 'N/A')} NFT Collection\n\n" # Add basic info result += "### Basic Information\n\n" result += f"**ID:** {data.get('id', 'N/A')} \n" result += f"**Name:** {data.get('name', 'N/A')} \n" result += f"**Symbol:** {data.get('symbol', 'N/A')} \n" # Add image if available if 'image' in data and data['image'].get('small'): result += f"**Image:** ![{data.get('name', 'NFT')} Image]({data['image']['small']}) \n" # Add contract info contract = data.get('contract_address', 'N/A') result += f"**Contract Address:** {contract} \n" platform = data.get('asset_platform_id', 'N/A') result += f"**Platform:** {platform} \n" # Add market data if available if 'floor_price' in data and data['floor_price']: floor_price = data['floor_price'].get('native_currency', 'N/A') if floor_price != 'N/A': result += f"**Floor Price:** {floor_price} {data['floor_price'].get('symbol', '')} \n" floor_price_usd = data['floor_price'].get('usd', 'N/A') if floor_price_usd != 'N/A': result += f"**Floor Price (USD):** ${floor_price_usd:,.2f} \n" if 'market_cap' in data and data['market_cap']: market_cap = data['market_cap'].get('native_currency', 'N/A') if market_cap != 'N/A': result += f"**Market Cap:** {market_cap:,.2f} {data['market_cap'].get('symbol', '')} \n" market_cap_usd = data['market_cap'].get('usd', 'N/A') if market_cap_usd != 'N/A': if market_cap_usd >= 1_000_000_000: market_cap_str = f"${market_cap_usd/1_000_000_000:.2f}B" elif market_cap_usd >= 1_000_000: market_cap_str = f"${market_cap_usd/1_000_000:.2f}M" else: market_cap_str = f"${market_cap_usd:,.0f}" result += f"**Market Cap (USD):** {market_cap_str} \n" if 'volume_24h' in data and data['volume_24h']: volume = data['volume_24h'].get('native_currency', 'N/A') if volume != 'N/A': result += f"**24h Volume:** {volume:,.2f} {data['volume_24h'].get('symbol', '')} \n" volume_usd = data['volume_24h'].get('usd', 'N/A') if volume_usd != 'N/A': if volume_usd >= 1_000_000: volume_str = f"${volume_usd/1_000_000:.2f}M" elif volume_usd >= 1_000: volume_str = f"${volume_usd/1_000:.2f}K" else: volume_str = f"${volume_usd:,.2f}" result += f"**24h Volume (USD):** {volume_str} \n" # Add number of unique addresses if available if 'number_of_unique_addresses' in data: addresses = data.get('number_of_unique_addresses', 'N/A') if addresses != 'N/A': result += f"**Unique Addresses:** {addresses:,} \n" # Add number of unique tokens if available if 'number_of_unique_tokens' in data: tokens = data.get('number_of_unique_tokens', 'N/A') if tokens != 'N/A': result += f"**Unique Tokens:** {tokens:,} \n" # Add links if available if 'links' in data: result += "\n### Links\n\n" links = data['links'] if 'homepage' in links and links['homepage']: homepage = links['homepage'][0] if homepage: result += f"**Website:** {homepage} \n" if 'twitter' in links and links['twitter']: twitter = links['twitter'][0] if twitter: result += f"**Twitter:** {twitter} \n" if 'discord' in links and links['discord']: discord = links['discord'][0] if discord: result += f"**Discord:** {discord} \n" # Add description if available description = data.get('description', '') if description: # Truncate if too long if len(description) > 500: description = description[:500] + "..." result += "\n### Description\n\n" result += f"{description}\n" return result except Exception as e: return f"❌ Error fetching NFT collection details: {str(e)}" # --------------------------------------------------------------------------- # Exchange API endpoints # --------------------------------------------------------------------------- # Dokumentasi: https://docs.coingecko.com/reference/exchanges @mcp.tool("get_exchanges") async def get_exchanges(per_page: int = 20, page: int = 1) -> str: """Get list of all exchanges with basic data. Args: per_page: Number of results per page (1-250) page: Page number """ try: # Validasi parameter if per_page < 1 or per_page > 250: return "❌ Error: per_page must be between 1 and 250" # Set parameters params = { "per_page": per_page, "page": page } # Fetch data data = await fetch_coingecko_json("/exchanges", params) if "error" in data: return f"❌ Error fetching exchanges: {data['error']}" if not isinstance(data, list): return f"❌ Invalid response format: {data}" if not data: return "No exchanges found for the specified parameters" # Format response as markdown result = "## Cryptocurrency Exchanges\n\n" # Format as table result += "| # | Exchange | Trust Score | 24h Volume (BTC) | Year Established |\n" result += "|---|----------|-------------|-----------------|---------------------|\n" for i, exchange in enumerate(data): name = exchange.get("name", "N/A") trust_score = exchange.get("trust_score", "N/A") volume_btc = exchange.get("trade_volume_24h_btc", 0) year = exchange.get("year_established", "N/A") # Format volume with appropriate precision if volume_btc is not None and volume_btc > 0: if volume_btc >= 10000: volume_str = f"{volume_btc:,.0f} BTC" else: volume_str = f"{volume_btc:,.2f} BTC" else: volume_str = "N/A" result += f"| {i+1} | {name} | {trust_score} | {volume_str} | {year} |\n" # Add pagination info result += f"\n*Page {page}, showing {len(data)} results per page*\n" return result except Exception as e: return f"❌ Error fetching exchanges: {str(e)}" # Dokumentasi: https://docs.coingecko.com/reference/exchanges-id @mcp.tool("get_exchange_detail") async def get_exchange_detail(id: str) -> str: """Get detailed data for a specific exchange. Args: id: The exchange id (e.g. 'binance', 'gdax') """ try: # Validasi parameter if not id: return "❌ Error: Exchange ID is required" # Fetch data data = await fetch_coingecko_json(f"/exchanges/{id}") if "error" in data: return f"❌ Error fetching exchange details: {data['error']}" # Format response as markdown result = f"## {data.get('name', 'N/A')} Exchange\n\n" # Add basic info result += "### Basic Information\n\n" result += f"**ID:** {data.get('id', 'N/A')} \n" result += f"**Name:** {data.get('name', 'N/A')} \n" # Add image if available if 'image' in data: result += f"**Logo:** ![{data.get('name', 'Exchange')} Logo]({data['image']}) \n" # Add year established if available year = data.get('year_established') if year: result += f"**Year Established:** {year} \n" # Add country if available country = data.get('country') if country: result += f"**Country:** {country} \n" # Add trust score if available trust_score = data.get('trust_score') if trust_score is not None: result += f"**Trust Score:** {trust_score}/10 \n" trust_rank = data.get('trust_score_rank') if trust_rank is not None: result += f"**Trust Rank:** #{trust_rank} \n" # Add trading volume volume_btc = data.get('trade_volume_24h_btc') if volume_btc is not None: if volume_btc >= 10000: volume_str = f"{volume_btc:,.0f} BTC" else: volume_str = f"{volume_btc:,.2f} BTC" result += f"**24h Trading Volume:** {volume_str} \n" # Add links if available if 'url' in data: result += "\n### Links\n\n" if 'website' in data['url'] and data['url']['website']: website = data['url']['website'][0] if website: result += f"**Website:** {website} \n" if 'twitter' in data['url'] and data['url']['twitter']: twitter = data['url']['twitter'][0] if twitter: result += f"**Twitter:** {twitter} \n" if 'facebook' in data['url'] and data['url']['facebook']: facebook = data['url']['facebook'][0] if facebook: result += f"**Facebook:** {facebook} \n" if 'telegram' in data['url'] and data['url']['telegram']: telegram = data['url']['telegram'][0] if telegram: result += f"**Telegram:** {telegram} \n" # Add description if available description = data.get('description', '') if description: # Truncate if too long if len(description) > 500: description = description[:500] + "..." result += "\n### Description\n\n" result += f"{description}\n" # Add tickers if available tickers = data.get('tickers', []) if tickers: result += "\n### Top Trading Pairs\n\n" result += "| Base | Target | Volume | Price | Spread | Last Updated |\n" result += "|------|--------|--------|-------|--------|-------------|\n" # Limit to top 10 tickers for ticker in tickers[:10]: base = ticker.get('base', 'N/A') target = ticker.get('target', 'N/A') volume = ticker.get('volume', 0) price = ticker.get('last', 0) bid_ask_spread = ticker.get('bid_ask_spread_percentage', 0) last_traded = ticker.get('last_traded_at', '') # Format values if volume is not None and volume > 0: if volume >= 1_000_000: volume_str = f"{volume/1_000_000:.2f}M" elif volume >= 1_000: volume_str = f"{volume/1_000:.2f}K" else: volume_str = f"{volume:.2f}" else: volume_str = "N/A" if price is not None: if price < 0.01 and price > 0: price_str = f"{price:.8f}" else: price_str = f"{price:.4f}" else: price_str = "N/A" if bid_ask_spread is not None: spread_str = f"{bid_ask_spread:.2f}%" else: spread_str = "N/A" if last_traded: # Format timestamp to just show date try: last_traded_str = last_traded.split('T')[0] except: last_traded_str = last_traded else: last_traded_str = "N/A" result += f"| {base} | {target} | {volume_str} | {price_str} | {spread_str} | {last_traded_str} |\n" return result except Exception as e: return f"❌ Error fetching exchange details: {str(e)}" # --------------------------------------------------------------------------- # Simple API endpoints # --------------------------------------------------------------------------- # Dokumentasi: https://docs.coingecko.com/reference/simple-supported-currencies @mcp.tool("get_supported_vs_currencies") async def get_supported_vs_currencies() -> str: """Get list of supported vs currencies.""" try: # Fetch data data = await fetch_coingecko_json("/simple/supported_vs_currencies") if "error" in data: return f"❌ Error fetching supported currencies: {data['error']}" if not isinstance(data, list) or not data: return "❌ No supported currencies found" # Format response as markdown result = "## Supported VS Currencies\n\n" # Group currencies by type (fiat, crypto, commodity) fiat_currencies = ["usd", "eur", "jpy", "gbp", "aud", "cad", "chf", "cny", "hkd", "nzd", "sek", "krw", "sgd", "nok", "mxn", "inr", "rub", "zar", "try", "brl", "twd", "dkk", "pln", "thb", "idr", "czk", "aed", "ars", "clp", "cop", "egp", "ils", "kwd", "myr", "ngn", "php", "pkr", "sar", "vef", "vnd"] crypto_currencies = ["btc", "eth", "ltc", "bch", "bnb", "eos", "xrp", "xlm", "dot", "yfi", "aave", "uni"] commodity_currencies = ["xag", "xau"] # Categorize currencies categorized_currencies = { "Fiat": [], "Crypto": [], "Commodity": [], "Other": [] } for currency in data: currency = currency.lower() if currency in fiat_currencies: categorized_currencies["Fiat"].append(currency) elif currency in crypto_currencies: categorized_currencies["Crypto"].append(currency) elif currency in commodity_currencies: categorized_currencies["Commodity"].append(currency) else: categorized_currencies["Other"].append(currency) # Format each category for category, currencies in categorized_currencies.items(): if currencies: result += f"### {category} Currencies\n\n" # Format as a table with multiple columns currencies.sort() columns = 5 # Number of columns in the table rows = (len(currencies) + columns - 1) // columns # Ceiling division # Create table header result += "|" + "||".join([" Currency " for _ in range(min(columns, len(currencies)))]) + "|\n" result += "|" + "||".join([":------:" for _ in range(min(columns, len(currencies)))]) + "|\n" # Create table rows for i in range(rows): row = "" for j in range(columns): idx = i + j * rows if idx < len(currencies): row += f" `{currencies[idx].upper()}` |" else: row += " |" result += "|" + row + "\n" result += "\n" # Add total count result += f"**Total supported currencies:** {len(data)}\n" return result except Exception as e: return f"❌ Error fetching supported currencies: {str(e)}" # --------------------------------------------------------------------------- # Main entry point # --------------------------------------------------------------------------- if __name__ == "__main__": # Jalankan server MCP mcp.run()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/adhinugroho1711/mcp-trading'

If you have feedback or need assistance with the MCP directory API, please join our Discord server