Skip to main content
Glama

MCP Crypto API Servers

moralis_server.py71.3 kB
"""Moralis MCP Server This server provides tools for interacting with the Moralis Solana API to get information about tokens, NFTs, prices, and wallet data on the Solana blockchain. """ from __future__ import annotations from typing import Any, Dict, List, Optional import urllib.parse import json import sys import traceback import os from dotenv import load_dotenv from mcp.server.fastmcp import FastMCP from common import fetch_json # Load environment variables from .env file load_dotenv() # --------------------------------------------------------------------------- # Constants & configuration # --------------------------------------------------------------------------- # Initialize FastMCP mcp = FastMCP(name="moralis-server") # Moralis API configuration BASE_URL = "https://solana-gateway.moralis.io" API_KEY = None # Will be loaded from environment # --------------------------------------------------------------------------- # Helper functions # --------------------------------------------------------------------------- async def fetch_moralis_json(endpoint: str, params: dict = None) -> dict: """Fungsi helper untuk mengambil data JSON dari Moralis API dengan API key. Args: endpoint: Endpoint API Moralis (tanpa base URL) params: Parameter query tambahan Returns: dict: Data JSON dari respons API atau dict dengan error """ global API_KEY # Load API key jika belum diload if not API_KEY: load_dotenv() API_KEY = os.getenv("MORALIS_API_KEY") if not API_KEY: return {"error": "Moralis API key not found. Please set MORALIS_API_KEY in your .env file."} # Siapkan params jika belum ada if not params: params = {} # Set headers dengan API key headers = {"X-API-Key": API_KEY} # Bangun URL lengkap url = f"{BASE_URL}{endpoint}" if endpoint.startswith('/') else f"{BASE_URL}/{endpoint}" # Debug info print(f"DEBUG - Fetching URL: {url}", file=sys.stderr) print(f"DEBUG - With params: {params}", file=sys.stderr) print(f"DEBUG - With headers: {headers}", file=sys.stderr) # Kirim request dan return data JSON try: data = await fetch_json(url, headers=headers, params=params) if data and isinstance(data, dict) and "error" in data: print(f"DEBUG - API Error: {data['error']}", file=sys.stderr) return data except Exception as e: print(f"ERROR in fetch_moralis_json: {str(e)}", file=sys.stderr) return {"error": str(e)} # --------------------------------------------------------------------------- # Token API endpoints # --------------------------------------------------------------------------- @mcp.tool() async def get_token_metadata(network: str, address: str) -> str: """Get token metadata including name, symbol, decimals, and supply. Args: network: Network identifier (mainnet, devnet) address: Token mint address """ try: # Berdasarkan dokumentasi Moralis API, format endpoint yang benar adalah: # /token/{network}/{address}/metadata endpoint = f"/token/{network}/{address}/metadata" print(f"DEBUG - Fetching token metadata for {address} on {network}", file=sys.stderr) data = await fetch_moralis_json(endpoint) if not data: return f"Tidak dapat mengambil metadata token untuk {address} di {network}." if "error" in data: return f"Tidak dapat mengambil metadata token untuk {address} di {network}. Error: {data.get('error')}" # Format response as markdown result = f"# Token Metadata: {data.get('name', 'Unknown')}\n\n" result += f"| Property | Value |\n" result += f"|----------|-------|\n" result += f"| Address | `{data.get('address', 'N/A')}` |\n" result += f"| Name | {data.get('name', 'N/A')} |\n" result += f"| Symbol | {data.get('symbol', 'N/A')} |\n" result += f"| Decimals | {data.get('decimals', 'N/A')} |\n" result += f"| Total Supply | {data.get('total_supply', 'N/A')} |\n" result += f"| Current Supply | {data.get('current_supply', 'N/A')} |\n" result += f"| Verified Collection | {data.get('verified_collection', False)} |\n" result += f"| Possible Spam | {data.get('possible_spam', False)} |\n" if data.get('logo'): result += f"\n![Token Logo]({data.get('logo')})\n" return result except Exception as e: print(f"ERROR in get_token_metadata: {str(e)}", file=sys.stderr) return f"Terjadi kesalahan saat mengambil metadata token: {str(e)}" @mcp.tool() async def get_token_price(network: str, address: str) -> str: """Get current token price in USD and native SOL. Args: network: Network identifier (mainnet, devnet) address: Token mint address """ try: # Berdasarkan dokumentasi Moralis API, format endpoint yang benar adalah: # /token/{network}/{address}/price endpoint = f"/token/{network}/{address}/price" print(f"DEBUG - Fetching token price for {address} on {network}", file=sys.stderr) data = await fetch_moralis_json(endpoint) if not data: return f"Tidak dapat mengambil harga token untuk {address} di {network}." if "error" in data: return f"Tidak dapat mengambil harga token untuk {address} di {network}. Error: {data.get('error')}" # Format response as markdown result = f"# Token Price: {data.get('tokenName', 'Unknown')} ({data.get('tokenSymbol', 'Unknown')})\n\n" result += f"| Property | Value |\n" result += f"|----------|-------|\n" # Format USD price usd_price = data.get('usdPrice') if usd_price is not None: try: usd_price_float = float(usd_price) if usd_price_float < 0.01: formatted_usd = f"${usd_price_float:.8f}" else: formatted_usd = f"${usd_price_float:.4f}" except (ValueError, TypeError): formatted_usd = f"${usd_price}" else: formatted_usd = "N/A" # Format native price native_price = data.get('nativePrice') if native_price is not None: try: native_price_float = float(native_price) formatted_native = f"{native_price_float:.8f} SOL" except (ValueError, TypeError): formatted_native = f"{native_price} SOL" else: formatted_native = "N/A" result += f"| USD Price | {formatted_usd} |\n" result += f"| SOL Price | {formatted_native} |\n" result += f"| 24h Change | {data.get('24hrChange', 'N/A')}% |\n" result += f"| 24h Volume | ${data.get('24hrVolume', 'N/A')} |\n" result += f"| Market Cap | ${data.get('marketCap', 'N/A')} |\n" return result except Exception as e: print(f"ERROR in get_token_price: {str(e)}", file=sys.stderr) return f"Terjadi kesalahan saat mengambil harga token: {str(e)}" @mcp.tool() async def get_multiple_token_prices(network: str, tokens: str) -> str: """Get prices for multiple tokens in a single request. Args: network: Network identifier (mainnet, devnet) tokens: Comma-separated list of token addresses """ try: # Berdasarkan dokumentasi Moralis API, format endpoint yang benar adalah: # /token/{network}/prices (POST) endpoint = f"/token/{network}/prices" # Parse token addresses token_addresses = [addr.strip() for addr in tokens.split(",")] # Prepare request body request_body = {"tokens": token_addresses} print(f"DEBUG - Fetching prices for multiple tokens on {network}: {token_addresses}", file=sys.stderr) # Untuk POST request, kita perlu menggunakan fetch_json langsung url = f"{BASE_URL}{endpoint}" headers = {"X-API-Key": API_KEY, "Content-Type": "application/json"} # Kirim POST request data = await fetch_json(url, method="POST", headers=headers, json=request_body) if not data: return f"Tidak dapat mengambil harga untuk token-token yang diminta di {network}." if "error" in data: return f"Tidak dapat mengambil harga untuk token-token yang diminta di {network}. Error: {data.get('error')}" # Format response as markdown result = f"# Multiple Token Prices\n\n" result += f"| Token | Symbol | USD Price | SOL Price | 24h Change |\n" result += f"|-------|--------|-----------|-----------|------------|\n" for token_data in data: # Format USD price usd_price = token_data.get('usdPrice') if usd_price is not None: try: usd_price_float = float(usd_price) if usd_price_float < 0.01: formatted_usd = f"${usd_price_float:.8f}" else: formatted_usd = f"${usd_price_float:.4f}" except (ValueError, TypeError): formatted_usd = f"${usd_price}" else: formatted_usd = "N/A" # Format native price native_price = token_data.get('nativePrice') if native_price is not None: try: native_price_float = float(native_price) formatted_native = f"{native_price_float:.8f} SOL" except (ValueError, TypeError): formatted_native = f"{native_price} SOL" else: formatted_native = "N/A" # Get 24h change change_24h = token_data.get('24hrChange', 'N/A') if change_24h != 'N/A': change_24h = f"{change_24h}%" result += f"| {token_data.get('tokenName', 'Unknown')} | {token_data.get('tokenSymbol', 'Unknown')} | {formatted_usd} | {formatted_native} | {change_24h} |\n" return result except Exception as e: print(f"ERROR in get_multiple_token_prices: {str(e)}", file=sys.stderr) return f"Terjadi kesalahan saat mengambil harga multiple token: {str(e)}" @mcp.tool() async def get_pump_fun_tokens(status: str = None, sort_by: str = None, limit: int = 50) -> str: """Get Pump.fun tokens data. Args: status: Token status (bonding, graduated, all) sort_by: Sort criteria (created_at, market_cap, volume) limit: Results limit (max 100) """ try: # Berdasarkan dokumentasi API terbaru, Pump.fun tidak lagi menggunakan endpoint khusus # Sebagai gantinya, kita perlu mengambil daftar token dari Jupiter atau Raydium # dan kemudian memfilter token-token yang berasal dari Pump.fun # Siapkan parameter params = {"limit": min(limit, 100)} # Ensure limit doesn't exceed 100 if status: params["status"] = status if sort_by: params["sort_by"] = sort_by # Coba endpoint untuk mendapatkan token populer di Solana endpoint = "/market-data/solana/tokens" # Debug info print(f"DEBUG - Fetching Solana tokens with params: {params}", file=sys.stderr) data = await fetch_moralis_json(endpoint, params) if not data: return f"Tidak dapat mengambil data token Solana." if "error" in data: return f"Tidak dapat mengambil data token Solana. Error: {data.get('error')}" # Format response as markdown result = f"# Popular Solana Tokens\n\n" # Check data structure tokens = [] if isinstance(data, list): tokens = data elif isinstance(data, dict) and "tokens" in data: tokens = data["tokens"] elif isinstance(data, dict) and "result" in data: tokens = data["result"] if not tokens: return "Tidak ada data token yang ditemukan." # Filter token yang mungkin dari Pump.fun (berdasarkan metadata atau properti tertentu) # Catatan: Ini hanya perkiraan karena tidak ada flag eksplisit untuk token Pump.fun pump_tokens = [] for token in tokens: # Cek apakah token memiliki indikasi berasal dari Pump.fun # Misalnya, memiliki properti tertentu atau dari exchange tertentu if token.get("exchange") == "pump-fun" or "pump.fun" in str(token.get("name", "")).lower() or "pump.fun" in str(token.get("description", "")).lower(): pump_tokens.append(token) # Jika tidak ada token Pump.fun yang ditemukan, tampilkan token populer saja display_tokens = pump_tokens if pump_tokens else tokens # Format as table result += f"| Name | Symbol | Price (USD) | Market Cap | Volume 24h | Change 24h |\n" result += f"|------|--------|-------------|------------|------------|------------|\n" # Limit to first 20 tokens for readability display_limit = min(20, len(display_tokens)) for token in display_tokens[:display_limit]: # Format token name and symbol name = token.get("name", "Unknown") symbol = token.get("symbol", "Unknown") # Format price price_usd = "N/A" if "price" in token and token["price"] is not None: try: price_val = float(token["price"]) if price_val < 0.01: price_usd = f"${price_val:.8f}" else: price_usd = f"${price_val:.4f}" except (ValueError, TypeError): price_usd = str(token["price"]) # Format market cap market_cap = "N/A" if "marketCap" in token and token["marketCap"] is not None: try: mcap_val = float(token["marketCap"]) market_cap = f"${mcap_val:,.2f}" except (ValueError, TypeError): market_cap = str(token["marketCap"]) # Format volume volume = "N/A" if "volume24h" in token and token["volume24h"] is not None: try: vol_val = float(token["volume24h"]) volume = f"${vol_val:,.2f}" except (ValueError, TypeError): volume = str(token["volume24h"]) # Format price change price_change = "N/A" if "priceChange24h" in token and token["priceChange24h"] is not None: try: change_val = float(token["priceChange24h"]) price_change = f"{change_val:+.2f}%" except (ValueError, TypeError): price_change = str(token["priceChange24h"]) result += f"| {name} | {symbol} | {price_usd} | {market_cap} | {volume} | {price_change} |\n" # Show total count if more than display limit if len(display_tokens) > display_limit: result += f"\n*Showing {display_limit} of {len(display_tokens)} tokens*\n" # Tambahkan catatan jika tidak ada token Pump.fun yang ditemukan if not pump_tokens and tokens: result += f"\n**Note:** Tidak dapat mengidentifikasi token Pump.fun secara spesifik. Menampilkan token Solana populer sebagai gantinya.\n" return result except Exception as e: print(f"ERROR in get_pump_fun_tokens: {str(e)}", file=sys.stderr) return f"Terjadi kesalahan saat mengambil data token: {str(e)}" @mcp.tool() async def get_pump_fun_token_price(network: str, address: str) -> str: """Get price data specifically for Pump.fun tokens. Args: network: Network identifier (mainnet, devnet) address: Pump.fun token mint address """ try: # Berdasarkan dokumentasi Moralis API, format endpoint yang benar adalah: # /token/{network}/{address}/price/pump-fun endpoint = f"/token/{network}/{address}/price/pump-fun" print(f"DEBUG - Fetching Pump.fun token price for {address} on {network}", file=sys.stderr) data = await fetch_moralis_json(endpoint) if not data: return f"Tidak dapat mengambil data harga token Pump.fun untuk {address} di {network}." if "error" in data: # Jika endpoint khusus Pump.fun gagal, coba endpoint harga token biasa print(f"DEBUG - Pump.fun endpoint failed, trying regular token price endpoint", file=sys.stderr) return await get_token_price(network, address) # Format response as markdown result = f"# Pump.fun Token Price: {data.get('tokenName', 'Unknown')} ({data.get('tokenSymbol', 'Unknown')})\n\n" result += f"| Property | Value |\n" result += f"|----------|-------|\n" # Format USD price usd_price = data.get('usdPrice') if usd_price is not None: try: usd_price_float = float(usd_price) if usd_price_float < 0.01: formatted_usd = f"${usd_price_float:.8f}" else: formatted_usd = f"${usd_price_float:.4f}" except (ValueError, TypeError): formatted_usd = f"${usd_price}" else: formatted_usd = "N/A" # Format SOL price sol_price = data.get('solPrice') if sol_price is not None: try: sol_price_float = float(sol_price) formatted_sol = f"{sol_price_float:.8f} SOL" except (ValueError, TypeError): formatted_sol = f"{sol_price} SOL" else: formatted_sol = "N/A" # Format market cap market_cap = "N/A" if "marketCap" in data and data["marketCap"] is not None: try: mcap_val = float(data["marketCap"]) market_cap = f"${mcap_val:,.2f}" except (ValueError, TypeError): market_cap = str(data["marketCap"]) # Format liquidity liquidity = "N/A" if "liquidityUsd" in data and data["liquidityUsd"] is not None: try: liq_val = float(data["liquidityUsd"]) liquidity = f"${liq_val:,.2f}" except (ValueError, TypeError): liquidity = str(data["liquidityUsd"]) # Format volume volume = "N/A" if "volume24h" in data and data["volume24h"] is not None: try: vol_val = float(data["volume24h"]) volume = f"${vol_val:,.2f}" except (ValueError, TypeError): volume = str(data["volume24h"]) # Format price change price_change = "N/A" if "priceChange24h" in data and data["priceChange24h"] is not None: try: change_val = float(data["priceChange24h"]) price_change = f"{change_val:+.2f}%" except (ValueError, TypeError): price_change = str(data["priceChange24h"]) result += f"| USD Price | {formatted_usd} |\n" result += f"| SOL Price | {formatted_sol} |\n" result += f"| Market Cap | {market_cap} |\n" result += f"| Liquidity | {liquidity} |\n" result += f"| 24h Volume | {volume} |\n" result += f"| 24h Price Change | {price_change} |\n" # Bonding curve progress (jika ada) if "bondingCurveProgress" in data and data["bondingCurveProgress"] is not None: try: progress = float(data["bondingCurveProgress"]) * 100 result += f"| Bonding Curve Progress | {progress:.2f}% |\n" except (ValueError, TypeError): result += f"| Bonding Curve Progress | {data['bondingCurveProgress']} |\n" # Status graduated if "isGraduated" in data: status = "Graduated" if data["isGraduated"] else "Bonding" result += f"| Status | {status} |\n" # Raydium pool (jika ada) if "raydiumPool" in data and data["raydiumPool"]: result += f"| Raydium Pool | `{data['raydiumPool']}` |\n" return result except Exception as e: print(f"ERROR in get_pump_fun_token_price: {str(e)}", file=sys.stderr) return f"Terjadi kesalahan saat mengambil data harga token Pump.fun: {str(e)}" @mcp.tool() async def get_native_balance(network: str, address: str) -> str: """Get native SOL balance for a wallet address. Args: network: Network identifier (mainnet, devnet) address: Wallet address """ try: # Berdasarkan dokumentasi Moralis API, format endpoint yang benar adalah: # /account/{network}/{address}/balance endpoint = f"/account/{network}/{address}/balance" print(f"DEBUG - Fetching native balance for {address} on {network}", file=sys.stderr) data = await fetch_moralis_json(endpoint) if not data: return f"Tidak dapat mengambil saldo SOL untuk {address} di {network}." if "error" in data: return f"Tidak dapat mengambil saldo SOL untuk {address} di {network}. Error: {data.get('error')}" # Format response as markdown result = f"# Native SOL Balance\n\n" result += f"| Property | Value |\n" result += f"|----------|-------|\n" result += f"| Address | `{address}` |\n" # Format balance balance = data.get('solana') if balance is not None: try: balance_float = float(balance) formatted_balance = f"{balance_float:.9f} SOL" except (ValueError, TypeError): formatted_balance = f"{balance} SOL" else: formatted_balance = "0 SOL" result += f"| Balance | {formatted_balance} |\n" return result except Exception as e: print(f"ERROR in get_native_balance: {str(e)}", file=sys.stderr) return f"Terjadi kesalahan saat mengambil saldo SOL: {str(e)}" @mcp.tool() async def get_token_balances(network: str, address: str, limit: int = 100) -> str: """Get token balances for a wallet address. Args: network: Network identifier (mainnet, devnet) address: Wallet address limit: Maximum number of results (default 100) """ try: # Berdasarkan dokumentasi Moralis API, format endpoint yang benar adalah: # /account/{network}/{address}/tokens endpoint = f"/account/{network}/{address}/tokens" # Siapkan parameter params = {"limit": min(limit, 100)} # Ensure limit doesn't exceed 100 print(f"DEBUG - Fetching token balances for {address} on {network}", file=sys.stderr) data = await fetch_moralis_json(endpoint, params=params) if not data: return f"Tidak dapat mengambil saldo token untuk {address} di {network}." if "error" in data: return f"Tidak dapat mengambil saldo token untuk {address} di {network}. Error: {data.get('error')}" # Format response as markdown result = f"# Token Balances for {address}\n\n" # Check if data is a list tokens = data if isinstance(data, list) else data.get("tokens", []) if not tokens: return f"Tidak ada saldo token yang ditemukan untuk {address} di {network}." # Format as table result += f"| Token | Symbol | Balance | USD Value |\n" result += f"|-------|--------|---------|------------|\n" # Limit to first 20 tokens for readability display_limit = min(20, len(tokens)) for token in tokens[:display_limit]: # Format token name and symbol name = token.get("name", "Unknown") symbol = token.get("symbol", "Unknown") # Format balance balance = "0" if "balance" in token and token["balance"] is not None: try: decimals = int(token.get("decimals", 0)) raw_balance = int(token["balance"]) formatted_balance = raw_balance / (10 ** decimals) balance = f"{formatted_balance:,.4f}" except (ValueError, TypeError): balance = str(token["balance"]) # Format USD value usd_value = "N/A" if "usd_value" in token and token["usd_value"] is not None: try: usd_val = float(token["usd_value"]) usd_value = f"${usd_val:,.2f}" except (ValueError, TypeError): usd_value = str(token["usd_value"]) result += f"| {name} | {symbol} | {balance} | {usd_value} |\n" # Show total count if more than display limit if len(tokens) > display_limit: result += f"\n*Showing {display_limit} of {len(tokens)} tokens*\n" return result except Exception as e: print(f"ERROR in get_token_balances: {str(e)}", file=sys.stderr) return f"Terjadi kesalahan saat mengambil saldo token: {str(e)}" @mcp.tool() async def get_token_pairs(network: str, address: str) -> str: """Get trading pairs and liquidity data for a token. Args: network: Network identifier (mainnet, devnet) address: Token mint address """ try: # Berdasarkan dokumentasi Moralis API, format endpoint yang benar adalah: # /token/{network}/{address}/pairs endpoint = f"/token/{network}/{address}/pairs" print(f"DEBUG - Fetching token pairs for {address} on {network}", file=sys.stderr) data = await fetch_moralis_json(endpoint) if not data: return f"Tidak dapat mengambil data pair token untuk {address} di {network}." if "error" in data: return f"Tidak dapat mengambil data pair token untuk {address} di {network}. Error: {data.get('error')}" # Format response as markdown result = f"# Trading Pairs for Token\n\n" # Check if data is a list or has a pairs property pairs = data if isinstance(data, list) else data.get("pairs", []) if not pairs: return f"Tidak ada pair trading yang ditemukan untuk token {address} di {network}." # Format as table result += f"| Pair | DEX | Liquidity | Volume 24h | Price |\n" result += f"|------|-----|-----------|-----------|-------|\n" # Limit to first 15 pairs for readability display_limit = min(15, len(pairs)) for pair in pairs[:display_limit]: # Format pair name pair_name = pair.get("name", "Unknown") # Format DEX name dex = pair.get("dex", "Unknown") # Format liquidity liquidity = "N/A" if "liquidity" in pair and pair["liquidity"] is not None: try: liq_val = float(pair["liquidity"]) liquidity = f"${liq_val:,.2f}" except (ValueError, TypeError): liquidity = str(pair["liquidity"]) # Format volume volume = "N/A" if "volume_24h" in pair and pair["volume_24h"] is not None: try: vol_val = float(pair["volume_24h"]) volume = f"${vol_val:,.2f}" except (ValueError, TypeError): volume = str(pair["volume_24h"]) # Format price price = "N/A" if "price" in pair and pair["price"] is not None: try: price_val = float(pair["price"]) if price_val < 0.01: price = f"${price_val:.8f}" else: price = f"${price_val:.4f}" except (ValueError, TypeError): price = str(pair["price"]) result += f"| {pair_name} | {dex} | {liquidity} | {volume} | {price} |\n" # Show total count if more than display limit if len(pairs) > display_limit: result += f"\n*Showing {display_limit} of {len(pairs)} pairs*\n" return result except Exception as e: print(f"ERROR in get_token_pairs: {str(e)}", file=sys.stderr) return f"Terjadi kesalahan saat mengambil data pair token: {str(e)}" @mcp.tool() async def get_token_swaps(network: str, address: str, from_date: str = None, to_date: str = None, limit: int = 100) -> str: """Get swap transactions for a token. Args: network: Network identifier (mainnet, devnet) address: Token mint address from_date: Start date (ISO 8601) to_date: End date (ISO 8601) limit: Results limit (max 100) """ try: # Berdasarkan dokumentasi Moralis API, format endpoint yang benar adalah: # /token/{network}/{address}/swaps endpoint = f"/token/{network}/{address}/swaps" # Siapkan parameter params = {"limit": min(limit, 100)} # Ensure limit doesn't exceed 100 if from_date: params["from_date"] = from_date if to_date: params["to_date"] = to_date print(f"DEBUG - Fetching token swaps for {address} on {network}", file=sys.stderr) data = await fetch_moralis_json(endpoint, params=params) if not data: return f"Tidak dapat mengambil data swap token untuk {address} di {network}." if "error" in data: return f"Tidak dapat mengambil data swap token untuk {address} di {network}. Error: {data.get('error')}" # Format response as markdown result = f"# Token Swap Transactions\n\n" # Check if data is a list or has a swaps property swaps = data if isinstance(data, list) else data.get("swaps", []) if not swaps: return f"Tidak ada transaksi swap yang ditemukan untuk token {address} di {network}." # Format as table result += f"| Type | Amount | Value | Price | Time | Tx Hash |\n" result += f"|------|--------|-------|-------|------|---------|\n" # Limit to first 15 swaps for readability display_limit = min(15, len(swaps)) for swap in swaps[:display_limit]: # Format transaction type (buy/sell) tx_type = swap.get("type", "Unknown").capitalize() # Format amount amount = "N/A" if "token_amount" in swap and swap["token_amount"] is not None: try: amount_val = float(swap["token_amount"]) amount = f"{amount_val:,.4f}" except (ValueError, TypeError): amount = str(swap["token_amount"]) # Format USD value value = "N/A" if "usd_value" in swap and swap["usd_value"] is not None: try: value_val = float(swap["usd_value"]) value = f"${value_val:,.2f}" except (ValueError, TypeError): value = str(swap["usd_value"]) # Format price price = "N/A" if "price" in swap and swap["price"] is not None: try: price_val = float(swap["price"]) if price_val < 0.01: price = f"${price_val:.8f}" else: price = f"${price_val:.4f}" except (ValueError, TypeError): price = str(swap["price"]) # Format timestamp timestamp = "N/A" if "block_timestamp" in swap and swap["block_timestamp"] is not None: try: from datetime import datetime dt = datetime.fromisoformat(swap["block_timestamp"].replace('Z', '+00:00')) timestamp = dt.strftime('%Y-%m-%d %H:%M:%S') except (ValueError, TypeError): timestamp = str(swap["block_timestamp"]) # Format transaction hash tx_hash = swap.get("transaction_hash", "N/A") if len(tx_hash) > 10: tx_hash = f"`{tx_hash[:6]}...{tx_hash[-4:]}`" result += f"| {tx_type} | {amount} | {value} | {price} | {timestamp} | {tx_hash} |\n" # Show total count if more than display limit if len(swaps) > display_limit: result += f"\n*Showing {display_limit} of {len(swaps)} swap transactions*\n" return result except Exception as e: print(f"ERROR in get_token_swaps: {str(e)}", file=sys.stderr) return f"Terjadi kesalahan saat mengambil data swap token: {str(e)}" @mcp.tool() async def get_nft_transfers(network: str, address: str, from_date: str = None, to_date: str = None, limit: int = 100) -> str: """Get transfer history for a specific NFT. Args: network: Network identifier (mainnet, devnet) address: NFT mint address from_date: Start date (ISO 8601) to_date: End date (ISO 8601) limit: Results limit (max 100) """ try: # Berdasarkan dokumentasi Moralis API, format endpoint yang benar adalah: # /nft/{network}/{address}/transfers endpoint = f"/nft/{network}/{address}/transfers" # Siapkan parameter params = {"limit": min(limit, 100)} # Ensure limit doesn't exceed 100 if from_date: params["from_date"] = from_date if to_date: params["to_date"] = to_date print(f"DEBUG - Fetching NFT transfers for {address} on {network}", file=sys.stderr) data = await fetch_moralis_json(endpoint, params=params) if not data: return f"Tidak dapat mengambil data transfer NFT untuk {address} di {network}." if "error" in data: return f"Tidak dapat mengambil data transfer NFT untuk {address} di {network}. Error: {data.get('error')}" # Format response as markdown result = f"# NFT Transfer History\n\n" # Check if data is a list or has a transfers property transfers = data if isinstance(data, list) else data.get("transfers", []) if not transfers: return f"Tidak ada riwayat transfer yang ditemukan untuk NFT {address} di {network}." # Format as table result += f"| From | To | Time | Tx Hash |\n" result += f"|------|----|----|---------|\n" # Limit to first 15 transfers for readability display_limit = min(15, len(transfers)) for transfer in transfers[:display_limit]: # Format from address from_address = transfer.get("from_address", "N/A") if len(from_address) > 10: from_address = f"`{from_address[:6]}...{from_address[-4:]}`" # Format to address to_address = transfer.get("to_address", "N/A") if len(to_address) > 10: to_address = f"`{to_address[:6]}...{to_address[-4:]}`" # Format timestamp timestamp = "N/A" if "block_timestamp" in transfer and transfer["block_timestamp"] is not None: try: from datetime import datetime dt = datetime.fromisoformat(transfer["block_timestamp"].replace('Z', '+00:00')) timestamp = dt.strftime('%Y-%m-%d %H:%M:%S') except (ValueError, TypeError): timestamp = str(transfer["block_timestamp"]) # Format transaction hash tx_hash = transfer.get("transaction_hash", "N/A") if len(tx_hash) > 10: tx_hash = f"`{tx_hash[:6]}...{tx_hash[-4:]}`" result += f"| {from_address} | {to_address} | {timestamp} | {tx_hash} |\n" # Show total count if more than display limit if len(transfers) > display_limit: result += f"\n*Showing {display_limit} of {len(transfers)} transfers*\n" return result except Exception as e: print(f"ERROR in get_nft_transfers: {str(e)}", file=sys.stderr) return f"Terjadi kesalahan saat mengambil data transfer NFT: {str(e)}" @mcp.tool() async def get_wallet_nfts(network: str, address: str, limit: int = 100) -> str: """Get all NFTs owned by a wallet address. Args: network: Network identifier (mainnet, devnet) address: Wallet address limit: Results limit (max 100) """ try: # Berdasarkan dokumentasi Moralis API, format endpoint yang benar adalah: # /account/{network}/{address}/nfts endpoint = f"/account/{network}/{address}/nfts" # Siapkan parameter params = {"limit": min(limit, 100)} # Ensure limit doesn't exceed 100 print(f"DEBUG - Fetching NFTs for wallet {address} on {network}", file=sys.stderr) data = await fetch_moralis_json(endpoint, params=params) if not data: return f"Tidak dapat mengambil data NFT untuk wallet {address} di {network}." if "error" in data: return f"Tidak dapat mengambil data NFT untuk wallet {address} di {network}. Error: {data.get('error')}" # Format response as markdown result = f"# NFTs Owned by {address}\n\n" # Check if data is a list or has a nfts property nfts = data if isinstance(data, list) else data.get("nfts", []) if not nfts: return f"Tidak ada NFT yang ditemukan untuk wallet {address} di {network}." # Format as table result += f"| Name | Collection | Token ID | Image |\n" result += f"|------|-----------|----------|-------|\n" # Limit to first 15 NFTs for readability display_limit = min(15, len(nfts)) for nft in nfts[:display_limit]: # Format NFT name name = nft.get("name", "Unnamed NFT") if not name: name = "Unnamed NFT" # Format collection name collection = nft.get("collection_name", "Unknown Collection") if not collection: collection = "Unknown Collection" # Format token ID token_id = nft.get("token_id", "N/A") if token_id and len(str(token_id)) > 8: token_id = f"{str(token_id)[:4]}...{str(token_id)[-4:]}" # Format image URL image = "No Image" if "image" in nft and nft["image"]: image_url = nft["image"] if image_url.startswith("http"): image = f"[View](${image_url})" result += f"| {name} | {collection} | {token_id} | {image} |\n" # Show total count if more than display limit if len(nfts) > display_limit: result += f"\n*Showing {display_limit} of {len(nfts)} NFTs*\n" return result except Exception as e: print(f"ERROR in get_wallet_nfts: {str(e)}", file=sys.stderr) return f"Terjadi kesalahan saat mengambil data NFT wallet: {str(e)}" @mcp.tool() async def search_nfts(network: str, query: str = None, collection: str = None, limit: int = 100) -> str: """Search NFTs by metadata criteria. Args: network: Network identifier (mainnet, devnet) query: Search query (name, description, attributes) collection: Collection address filter limit: Results limit (max 100) """ try: # Berdasarkan dokumentasi Moralis API, format endpoint yang benar adalah: # /nft/{network}/search endpoint = f"/nft/{network}/search" # Siapkan parameter params = {"limit": min(limit, 100)} # Ensure limit doesn't exceed 100 if query: params["query"] = query if collection: params["collection"] = collection print(f"DEBUG - Searching NFTs on {network} with query: {query}", file=sys.stderr) data = await fetch_moralis_json(endpoint, params=params) if not data: return f"Tidak dapat mencari NFT di {network}." if "error" in data: return f"Tidak dapat mencari NFT di {network}. Error: {data.get('error')}" # Format response as markdown result = f"# NFT Search Results\n\n" # Check if data is a list or has a results property nfts = data if isinstance(data, list) else data.get("results", []) if not nfts: return f"Tidak ada NFT yang ditemukan untuk pencarian di {network}." # Format as table result += f"| Name | Collection | Token ID | Image |\n" result += f"|------|-----------|----------|-------|\n" # Limit to first 15 NFTs for readability display_limit = min(15, len(nfts)) for nft in nfts[:display_limit]: # Format NFT name name = nft.get("name", "Unnamed NFT") if not name: name = "Unnamed NFT" # Format collection name collection = nft.get("collection_name", "Unknown Collection") if not collection: collection = "Unknown Collection" # Format token ID token_id = nft.get("token_id", "N/A") if token_id and len(str(token_id)) > 8: token_id = f"{str(token_id)[:4]}...{str(token_id)[-4:]}" # Format image URL image = "No Image" if "image" in nft and nft["image"]: image_url = nft["image"] if image_url.startswith("http"): image = f"[View](${image_url})" result += f"| {name} | {collection} | {token_id} | {image} |\n" # Show total count if more than display limit if len(nfts) > display_limit: result += f"\n*Showing {display_limit} of {len(nfts)} NFTs*\n" return result except Exception as e: print(f"ERROR in search_nfts: {str(e)}", file=sys.stderr) return f"Terjadi kesalahan saat mencari NFT: {str(e)}" @mcp.tool() async def get_portfolio(network: str, address: str) -> str: """Get comprehensive wallet portfolio including tokens, NFTs, and native balance. Args: network: Network identifier (mainnet, devnet) address: Wallet address """ try: # Berdasarkan dokumentasi Moralis API, format endpoint yang benar adalah: # /account/{network}/{address}/portfolio endpoint = f"/account/{network}/{address}/portfolio" print(f"DEBUG - Fetching portfolio for {address} on {network}", file=sys.stderr) data = await fetch_moralis_json(endpoint) if not data: return f"Tidak dapat mengambil data portfolio untuk {address} di {network}." if "error" in data: return f"Tidak dapat mengambil data portfolio untuk {address} di {network}. Error: {data.get('error')}" # Format response as markdown result = f"# Portfolio for {address}\n\n" # Format native balance section native_balance = data.get("native_balance", {}) if native_balance: result += "## Native Balance\n\n" result += f"| Symbol | Balance | USD Value |\n" result += f"|--------|---------|-----------|\n" # Format balance balance = "0" if "balance" in native_balance and native_balance["balance"] is not None: try: decimals = int(native_balance.get("decimals", 9)) # SOL has 9 decimals raw_balance = int(native_balance["balance"]) formatted_balance = raw_balance / (10 ** decimals) balance = f"{formatted_balance:,.9f}" except (ValueError, TypeError): balance = str(native_balance["balance"]) # Format USD value usd_value = "N/A" if "usd_value" in native_balance and native_balance["usd_value"] is not None: try: usd_val = float(native_balance["usd_value"]) usd_value = f"${usd_val:,.2f}" except (ValueError, TypeError): usd_value = str(native_balance["usd_value"]) result += f"| SOL | {balance} | {usd_value} |\n\n" # Format tokens section tokens = data.get("tokens", []) if tokens: result += "## Tokens\n\n" result += f"| Token | Symbol | Balance | USD Value |\n" result += f"|-------|--------|---------|-----------|\n" # Limit to first 15 tokens for readability display_limit = min(15, len(tokens)) for token in tokens[:display_limit]: # Format token name and symbol name = token.get("name", "Unknown") symbol = token.get("symbol", "Unknown") # Format balance balance = "0" if "balance" in token and token["balance"] is not None: try: decimals = int(token.get("decimals", 0)) raw_balance = int(token["balance"]) formatted_balance = raw_balance / (10 ** decimals) balance = f"{formatted_balance:,.4f}" except (ValueError, TypeError): balance = str(token["balance"]) # Format USD value usd_value = "N/A" if "usd_value" in token and token["usd_value"] is not None: try: usd_val = float(token["usd_value"]) usd_value = f"${usd_val:,.2f}" except (ValueError, TypeError): usd_value = str(token["usd_value"]) result += f"| {name} | {symbol} | {balance} | {usd_value} |\n" # Show total count if more than display limit if len(tokens) > display_limit: result += f"\n*Showing {display_limit} of {len(tokens)} tokens*\n\n" else: result += "\n" # Format NFTs section nfts = data.get("nfts", []) if nfts: result += "## NFTs\n\n" result += f"| Name | Collection | Token ID |\n" result += f"|------|-----------|----------|\n" # Limit to first 10 NFTs for readability display_limit = min(10, len(nfts)) for nft in nfts[:display_limit]: # Format NFT name name = nft.get("name", "Unnamed NFT") if not name: name = "Unnamed NFT" # Format collection name collection = nft.get("collection_name", "Unknown Collection") if not collection: collection = "Unknown Collection" # Format token ID token_id = nft.get("token_id", "N/A") if token_id and len(str(token_id)) > 8: token_id = f"{str(token_id)[:4]}...{str(token_id)[-4:]}" result += f"| {name} | {collection} | {token_id} |\n" # Show total count if more than display limit if len(nfts) > display_limit: result += f"\n*Showing {display_limit} of {len(nfts)} NFTs*\n" # If no data in any section if not native_balance and not tokens and not nfts: result += "No portfolio data found for this address.\n" return result except Exception as e: print(f"ERROR in get_portfolio: {str(e)}", file=sys.stderr) return f"Terjadi kesalahan saat mengambil data portfolio: {str(e)}" @mcp.tool() async def get_wallet_token_transfers(network: str, address: str, limit: int = 100, from_date: str = None, to_date: str = None) -> str: """Get token transfer history for a wallet address. Args: network: Network identifier (mainnet, devnet) address: Wallet address limit: Results limit (max 100) from_date: Start date (ISO 8601) to_date: End date (ISO 8601) """ try: # Berdasarkan dokumentasi Moralis API, format endpoint yang benar adalah: # /account/{network}/{address}/token-transfers endpoint = f"/account/{network}/{address}/token-transfers" # Siapkan parameter params = {"limit": min(limit, 100)} # Ensure limit doesn't exceed 100 if from_date: params["from_date"] = from_date if to_date: params["to_date"] = to_date print(f"DEBUG - Fetching token transfers for wallet {address} on {network}", file=sys.stderr) data = await fetch_moralis_json(endpoint, params=params) if not data: return f"Tidak dapat mengambil data transfer token untuk wallet {address} di {network}." if "error" in data: return f"Tidak dapat mengambil data transfer token untuk wallet {address} di {network}. Error: {data.get('error')}" # Format response as markdown result = f"# Token Transfer History for {address}\n\n" # Check if data is a list or has a transfers property transfers = data if isinstance(data, list) else data.get("transfers", []) if not transfers: return f"Tidak ada riwayat transfer token yang ditemukan untuk wallet {address} di {network}." # Format as table result += f"| Token | From | To | Amount | Time | Tx Hash |\n" result += f"|-------|------|----|----|------|---------|\n" # Limit to first 15 transfers for readability display_limit = min(15, len(transfers)) for transfer in transfers[:display_limit]: # Format token name/symbol token_name = transfer.get("token_name", "Unknown") token_symbol = transfer.get("token_symbol", "") token = token_name if token_symbol: token = f"{token_name} ({token_symbol})" # Format from address from_address = transfer.get("from_address", "N/A") if len(from_address) > 10: from_address = f"`{from_address[:6]}...{from_address[-4:]}`" # Format to address to_address = transfer.get("to_address", "N/A") if len(to_address) > 10: to_address = f"`{to_address[:6]}...{to_address[-4:]}`" # Format amount amount = "N/A" if "value" in transfer and transfer["value"] is not None: try: decimals = int(transfer.get("decimals", 0)) raw_amount = int(transfer["value"]) formatted_amount = raw_amount / (10 ** decimals) amount = f"{formatted_amount:,.4f}" except (ValueError, TypeError): amount = str(transfer["value"]) # Format timestamp timestamp = "N/A" if "block_timestamp" in transfer and transfer["block_timestamp"] is not None: try: from datetime import datetime dt = datetime.fromisoformat(transfer["block_timestamp"].replace('Z', '+00:00')) timestamp = dt.strftime('%Y-%m-%d %H:%M:%S') except (ValueError, TypeError): timestamp = str(transfer["block_timestamp"]) # Format transaction hash tx_hash = transfer.get("transaction_hash", "N/A") if len(tx_hash) > 10: tx_hash = f"`{tx_hash[:6]}...{tx_hash[-4:]}`" result += f"| {token} | {from_address} | {to_address} | {amount} | {timestamp} | {tx_hash} |\n" # Show total count if more than display limit if len(transfers) > display_limit: result += f"\n*Showing {display_limit} of {len(transfers)} transfers*\n" return result except Exception as e: print(f"ERROR in get_wallet_token_transfers: {str(e)}", file=sys.stderr) return f"Terjadi kesalahan saat mengambil data transfer token wallet: {str(e)}" @mcp.tool() async def get_wallet_nft_transfers(network: str, address: str, from_date: str = None, to_date: str = None, limit: int = 100) -> str: """Get NFT transfer history for a wallet address. Args: network: Network identifier (mainnet, devnet) address: Wallet address from_date: Start date (ISO 8601) to_date: End date (ISO 8601) limit: Results limit (max 100) """ try: # Berdasarkan dokumentasi Moralis API, format endpoint yang benar adalah: # /account/{network}/{address}/nft-transfers endpoint = f"/account/{network}/{address}/nft-transfers" # Siapkan parameter params = {"limit": min(limit, 100)} # Ensure limit doesn't exceed 100 if from_date: params["from_date"] = from_date if to_date: params["to_date"] = to_date print(f"DEBUG - Fetching NFT transfers for wallet {address} on {network}", file=sys.stderr) data = await fetch_moralis_json(endpoint, params=params) if not data: return f"Tidak dapat mengambil data transfer NFT untuk wallet {address} di {network}." if "error" in data: return f"Tidak dapat mengambil data transfer NFT untuk wallet {address} di {network}. Error: {data.get('error')}" # Format response as markdown result = f"# NFT Transfer History for {address}\n\n" # Check if data is a list or has a transfers property transfers = data if isinstance(data, list) else data.get("transfers", []) if not transfers: return f"Tidak ada riwayat transfer NFT yang ditemukan untuk wallet {address} di {network}." # Format as table result += f"| NFT | From | To | Time | Tx Hash |\n" result += f"|-----|------|----|----|---------|\n" # Limit to first 15 transfers for readability display_limit = min(15, len(transfers)) for transfer in transfers[:display_limit]: # Format NFT name nft_name = transfer.get("name", "Unknown NFT") if not nft_name: nft_name = "Unknown NFT" # Format from address from_address = transfer.get("from_address", "N/A") if len(from_address) > 10: from_address = f"`{from_address[:6]}...{from_address[-4:]}`" # Format to address to_address = transfer.get("to_address", "N/A") if len(to_address) > 10: to_address = f"`{to_address[:6]}...{to_address[-4:]}`" # Format timestamp timestamp = "N/A" if "block_timestamp" in transfer and transfer["block_timestamp"] is not None: try: from datetime import datetime dt = datetime.fromisoformat(transfer["block_timestamp"].replace('Z', '+00:00')) timestamp = dt.strftime('%Y-%m-%d %H:%M:%S') except (ValueError, TypeError): timestamp = str(transfer["block_timestamp"]) # Format transaction hash tx_hash = transfer.get("transaction_hash", "N/A") if len(tx_hash) > 10: tx_hash = f"`{tx_hash[:6]}...{tx_hash[-4:]}`" result += f"| {nft_name} | {from_address} | {to_address} | {timestamp} | {tx_hash} |\n" # Show total count if more than display limit if len(transfers) > display_limit: result += f"\n*Showing {display_limit} of {len(transfers)} transfers*\n" return result except Exception as e: print(f"ERROR in get_wallet_nft_transfers: {str(e)}", file=sys.stderr) return f"Terjadi kesalahan saat mengambil data transfer NFT wallet: {str(e)}" @mcp.tool() async def get_wallet_swaps(network: str, address: str, from_date: str = None, to_date: str = None, limit: int = 100) -> str: """Get swap transaction history for a wallet. Args: network: Network identifier (mainnet, devnet) address: Wallet address from_date: Start date (ISO 8601) to_date: End date (ISO 8601) limit: Results limit (max 100) """ try: # Berdasarkan dokumentasi Moralis API, format endpoint yang benar adalah: # /account/{network}/{address}/swaps endpoint = f"/account/{network}/{address}/swaps" # Siapkan parameter params = {"limit": min(limit, 100)} # Ensure limit doesn't exceed 100 if from_date: params["from_date"] = from_date if to_date: params["to_date"] = to_date print(f"DEBUG - Fetching swaps for wallet {address} on {network}", file=sys.stderr) data = await fetch_moralis_json(endpoint, params=params) if not data: return f"Tidak dapat mengambil data swap untuk wallet {address} di {network}." if "error" in data: return f"Tidak dapat mengambil data swap untuk wallet {address} di {network}. Error: {data.get('error')}" # Format response as markdown result = f"# Swap Transaction History for {address}\n\n" # Check if data is a list or has a swaps property swaps = data if isinstance(data, list) else data.get("swaps", []) if not swaps: return f"Tidak ada riwayat transaksi swap yang ditemukan untuk wallet {address} di {network}." # Format as table result += f"| From Token | To Token | Amount | Value | Time | Tx Hash |\n" result += f"|-----------|---------|--------|-------|------|---------|\n" # Limit to first 15 swaps for readability display_limit = min(15, len(swaps)) for swap in swaps[:display_limit]: # Format from token from_token = "Unknown" if "from_token_symbol" in swap and swap["from_token_symbol"]: from_token = swap["from_token_symbol"] # Format to token to_token = "Unknown" if "to_token_symbol" in swap and swap["to_token_symbol"]: to_token = swap["to_token_symbol"] # Format amount amount = "N/A" if "from_amount" in swap and swap["from_amount"] is not None: try: from_decimals = int(swap.get("from_token_decimals", 0)) raw_amount = int(swap["from_amount"]) formatted_amount = raw_amount / (10 ** from_decimals) amount = f"{formatted_amount:,.4f} {from_token}" except (ValueError, TypeError): amount = str(swap["from_amount"]) # Format USD value value = "N/A" if "usd_value" in swap and swap["usd_value"] is not None: try: usd_val = float(swap["usd_value"]) value = f"${usd_val:,.2f}" except (ValueError, TypeError): value = str(swap["usd_value"]) # Format timestamp timestamp = "N/A" if "block_timestamp" in swap and swap["block_timestamp"] is not None: try: from datetime import datetime dt = datetime.fromisoformat(swap["block_timestamp"].replace('Z', '+00:00')) timestamp = dt.strftime('%Y-%m-%d %H:%M:%S') except (ValueError, TypeError): timestamp = str(swap["block_timestamp"]) # Format transaction hash tx_hash = swap.get("transaction_hash", "N/A") if len(tx_hash) > 10: tx_hash = f"`{tx_hash[:6]}...{tx_hash[-4:]}`" result += f"| {from_token} | {to_token} | {amount} | {value} | {timestamp} | {tx_hash} |\n" # Show total count if more than display limit if len(swaps) > display_limit: result += f"\n*Showing {display_limit} of {len(swaps)} swap transactions*\n" return result except Exception as e: print(f"ERROR in get_wallet_swaps: {str(e)}", file=sys.stderr) return f"Terjadi kesalahan saat mengambil data swap wallet: {str(e)}" @mcp.tool() async def get_token_price_history(network: str, address: str, from_date: str = None, to_date: str = None, interval: str = "24h") -> str: """Get historical price data for a token over time. Args: network: Network identifier (mainnet, devnet) address: Token mint address from_date: Start date (ISO 8601) to_date: End date (ISO 8601) interval: Price data interval (1h, 6h, 24h) """ try: # Berdasarkan dokumentasi Moralis API, format endpoint yang benar adalah: # /token/{network}/{address}/price/history endpoint = f"/token/{network}/{address}/price/history" # Siapkan parameter params = {"interval": interval} if from_date: params["from_date"] = from_date if to_date: params["to_date"] = to_date print(f"DEBUG - Fetching token price history for {address} on {network}", file=sys.stderr) data = await fetch_moralis_json(endpoint, params=params) if not data: return f"Tidak dapat mengambil data riwayat harga token untuk {address} di {network}." if "error" in data: return f"Tidak dapat mengambil data riwayat harga token untuk {address} di {network}. Error: {data.get('error')}" # Format response as markdown result = f"# Token Price History\n\n" # Check if data is a list or has a prices property prices = data if isinstance(data, list) else data.get("prices", []) if not prices: return f"Tidak ada data riwayat harga yang ditemukan untuk token {address} di {network}." # Format as table result += f"| Date | Price (USD) | Volume | Market Cap |\n" result += f"|------|-------------|--------|------------|\n" # Limit to first 20 price points for readability display_limit = min(20, len(prices)) for price in prices[:display_limit]: # Format timestamp timestamp = "N/A" if "date" in price and price["date"] is not None: try: from datetime import datetime dt = datetime.fromisoformat(price["date"].replace('Z', '+00:00')) timestamp = dt.strftime('%Y-%m-%d %H:%M') except (ValueError, TypeError): timestamp = str(price["date"]) # Format price price_usd = "N/A" if "price" in price and price["price"] is not None: try: price_val = float(price["price"]) if price_val < 0.01: price_usd = f"${price_val:.8f}" else: price_usd = f"${price_val:.4f}" except (ValueError, TypeError): price_usd = str(price["price"]) # Format volume volume = "N/A" if "volume" in price and price["volume"] is not None: try: vol_val = float(price["volume"]) volume = f"${vol_val:,.2f}" except (ValueError, TypeError): volume = str(price["volume"]) # Format market cap market_cap = "N/A" if "market_cap" in price and price["market_cap"] is not None: try: mcap_val = float(price["market_cap"]) market_cap = f"${mcap_val:,.2f}" except (ValueError, TypeError): market_cap = str(price["market_cap"]) result += f"| {timestamp} | {price_usd} | {volume} | {market_cap} |\n" # Show total count if more than display limit if len(prices) > display_limit: result += f"\n*Showing {display_limit} of {len(prices)} price points*\n" return result except Exception as e: print(f"ERROR in get_token_price_history: {str(e)}", file=sys.stderr) return f"Terjadi kesalahan saat mengambil data riwayat harga token: {str(e)}" @mcp.tool() async def get_ohlcv_by_token(network: str, address: str, timeframe: str, from_date: str = None, to_date: str = None, limit: int = 500) -> str: """Get OHLCV candlestick data for a token across all pairs. Args: network: Network identifier (mainnet, devnet) address: Token mint address timeframe: Timeframe for candles (1m, 5m, 15m, 30m, 1h, 4h, 1d, 1w) from_date: Start date (ISO 8601) to_date: End date (ISO 8601) limit: Number of candles to return (max 2000) """ try: # Berdasarkan dokumentasi Moralis API, format endpoint yang benar adalah: # /token/{network}/{address}/ohlc endpoint = f"/token/{network}/{address}/ohlc" # Siapkan parameter params = { "timeframe": timeframe, "limit": min(limit, 2000) # Ensure limit doesn't exceed 2000 } if from_date: params["from_date"] = from_date if to_date: params["to_date"] = to_date print(f"DEBUG - Fetching OHLCV data for {address} on {network} with timeframe {timeframe}", file=sys.stderr) data = await fetch_moralis_json(endpoint, params=params) if not data: return f"Tidak dapat mengambil data OHLCV untuk token {address} di {network}." if "error" in data: return f"Tidak dapat mengambil data OHLCV untuk token {address} di {network}. Error: {data.get('error')}" # Format response as markdown result = f"# OHLCV Data for Token ({timeframe})\n\n" # Check if data is a list or has a candles property candles = data if isinstance(data, list) else data.get("candles", []) if not candles: return f"Tidak ada data OHLCV yang ditemukan untuk token {address} di {network} dengan timeframe {timeframe}." # Format as table result += f"| Time | Open | High | Low | Close | Volume |\n" result += f"|------|------|------|-----|-------|--------|\n" # Limit to first 15 candles for readability display_limit = min(15, len(candles)) for candle in candles[:display_limit]: # Format timestamp timestamp = "N/A" if "timestamp" in candle and candle["timestamp"] is not None: try: from datetime import datetime dt = datetime.fromisoformat(candle["timestamp"].replace('Z', '+00:00')) timestamp = dt.strftime('%Y-%m-%d %H:%M') except (ValueError, TypeError): timestamp = str(candle["timestamp"]) # Format OHLC values open_price = format_price(candle.get("open")) high_price = format_price(candle.get("high")) low_price = format_price(candle.get("low")) close_price = format_price(candle.get("close")) # Format volume volume = "N/A" if "volume" in candle and candle["volume"] is not None: try: vol_val = float(candle["volume"]) volume = f"${vol_val:,.2f}" except (ValueError, TypeError): volume = str(candle["volume"]) result += f"| {timestamp} | {open_price} | {high_price} | {low_price} | {close_price} | {volume} |\n" # Show total count if more than display limit if len(candles) > display_limit: result += f"\n*Showing {display_limit} of {len(candles)} candles*\n" return result except Exception as e: print(f"ERROR in get_ohlcv_by_token: {str(e)}", file=sys.stderr) return f"Terjadi kesalahan saat mengambil data OHLCV token: {str(e)}" # Helper function for formatting price values def format_price(price_value): if price_value is None: return "N/A" try: price_val = float(price_value) if price_val < 0.01: return f"${price_val:.8f}" else: return f"${price_val:.4f}" except (ValueError, TypeError): return str(price_value) # Tambahkan fungsi-fungsi lain sesuai kebutuhan if __name__ == "__main__": # Jalankan server MCP mcp.run()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/adhinugroho1711/mcp-trading'

If you have feedback or need assistance with the MCP directory API, please join our Discord server