moralis_server.py•71.3 kB
"""Moralis MCP Server
This server provides tools for interacting with the Moralis Solana API to get information
about tokens, NFTs, prices, and wallet data on the Solana blockchain.
"""
from __future__ import annotations
from typing import Any, Dict, List, Optional
import urllib.parse
import json
import sys
import traceback
import os
from dotenv import load_dotenv
from mcp.server.fastmcp import FastMCP
from common import fetch_json
# Load environment variables from .env file
load_dotenv()
# ---------------------------------------------------------------------------
# Constants & configuration
# ---------------------------------------------------------------------------
# Initialize FastMCP
mcp = FastMCP(name="moralis-server")
# Moralis API configuration
BASE_URL = "https://solana-gateway.moralis.io"
API_KEY = None # Will be loaded from environment
# ---------------------------------------------------------------------------
# Helper functions
# ---------------------------------------------------------------------------
async def fetch_moralis_json(endpoint: str, params: dict = None) -> dict:
"""Fungsi helper untuk mengambil data JSON dari Moralis API dengan API key.
Args:
endpoint: Endpoint API Moralis (tanpa base URL)
params: Parameter query tambahan
Returns:
dict: Data JSON dari respons API atau dict dengan error
"""
global API_KEY
# Load API key jika belum diload
if not API_KEY:
load_dotenv()
API_KEY = os.getenv("MORALIS_API_KEY")
if not API_KEY:
return {"error": "Moralis API key not found. Please set MORALIS_API_KEY in your .env file."}
# Siapkan params jika belum ada
if not params:
params = {}
# Set headers dengan API key
headers = {"X-API-Key": API_KEY}
# Bangun URL lengkap
url = f"{BASE_URL}{endpoint}" if endpoint.startswith('/') else f"{BASE_URL}/{endpoint}"
# Debug info
print(f"DEBUG - Fetching URL: {url}", file=sys.stderr)
print(f"DEBUG - With params: {params}", file=sys.stderr)
print(f"DEBUG - With headers: {headers}", file=sys.stderr)
# Kirim request dan return data JSON
try:
data = await fetch_json(url, headers=headers, params=params)
if data and isinstance(data, dict) and "error" in data:
print(f"DEBUG - API Error: {data['error']}", file=sys.stderr)
return data
except Exception as e:
print(f"ERROR in fetch_moralis_json: {str(e)}", file=sys.stderr)
return {"error": str(e)}
# ---------------------------------------------------------------------------
# Token API endpoints
# ---------------------------------------------------------------------------
@mcp.tool()
async def get_token_metadata(network: str, address: str) -> str:
"""Get token metadata including name, symbol, decimals, and supply.
Args:
network: Network identifier (mainnet, devnet)
address: Token mint address
"""
try:
# Berdasarkan dokumentasi Moralis API, format endpoint yang benar adalah:
# /token/{network}/{address}/metadata
endpoint = f"/token/{network}/{address}/metadata"
print(f"DEBUG - Fetching token metadata for {address} on {network}", file=sys.stderr)
data = await fetch_moralis_json(endpoint)
if not data:
return f"Tidak dapat mengambil metadata token untuk {address} di {network}."
if "error" in data:
return f"Tidak dapat mengambil metadata token untuk {address} di {network}. Error: {data.get('error')}"
# Format response as markdown
result = f"# Token Metadata: {data.get('name', 'Unknown')}\n\n"
result += f"| Property | Value |\n"
result += f"|----------|-------|\n"
result += f"| Address | `{data.get('address', 'N/A')}` |\n"
result += f"| Name | {data.get('name', 'N/A')} |\n"
result += f"| Symbol | {data.get('symbol', 'N/A')} |\n"
result += f"| Decimals | {data.get('decimals', 'N/A')} |\n"
result += f"| Total Supply | {data.get('total_supply', 'N/A')} |\n"
result += f"| Current Supply | {data.get('current_supply', 'N/A')} |\n"
result += f"| Verified Collection | {data.get('verified_collection', False)} |\n"
result += f"| Possible Spam | {data.get('possible_spam', False)} |\n"
if data.get('logo'):
result += f"\n})\n"
return result
except Exception as e:
print(f"ERROR in get_token_metadata: {str(e)}", file=sys.stderr)
return f"Terjadi kesalahan saat mengambil metadata token: {str(e)}"
@mcp.tool()
async def get_token_price(network: str, address: str) -> str:
"""Get current token price in USD and native SOL.
Args:
network: Network identifier (mainnet, devnet)
address: Token mint address
"""
try:
# Berdasarkan dokumentasi Moralis API, format endpoint yang benar adalah:
# /token/{network}/{address}/price
endpoint = f"/token/{network}/{address}/price"
print(f"DEBUG - Fetching token price for {address} on {network}", file=sys.stderr)
data = await fetch_moralis_json(endpoint)
if not data:
return f"Tidak dapat mengambil harga token untuk {address} di {network}."
if "error" in data:
return f"Tidak dapat mengambil harga token untuk {address} di {network}. Error: {data.get('error')}"
# Format response as markdown
result = f"# Token Price: {data.get('tokenName', 'Unknown')} ({data.get('tokenSymbol', 'Unknown')})\n\n"
result += f"| Property | Value |\n"
result += f"|----------|-------|\n"
# Format USD price
usd_price = data.get('usdPrice')
if usd_price is not None:
try:
usd_price_float = float(usd_price)
if usd_price_float < 0.01:
formatted_usd = f"${usd_price_float:.8f}"
else:
formatted_usd = f"${usd_price_float:.4f}"
except (ValueError, TypeError):
formatted_usd = f"${usd_price}"
else:
formatted_usd = "N/A"
# Format native price
native_price = data.get('nativePrice')
if native_price is not None:
try:
native_price_float = float(native_price)
formatted_native = f"{native_price_float:.8f} SOL"
except (ValueError, TypeError):
formatted_native = f"{native_price} SOL"
else:
formatted_native = "N/A"
result += f"| USD Price | {formatted_usd} |\n"
result += f"| SOL Price | {formatted_native} |\n"
result += f"| 24h Change | {data.get('24hrChange', 'N/A')}% |\n"
result += f"| 24h Volume | ${data.get('24hrVolume', 'N/A')} |\n"
result += f"| Market Cap | ${data.get('marketCap', 'N/A')} |\n"
return result
except Exception as e:
print(f"ERROR in get_token_price: {str(e)}", file=sys.stderr)
return f"Terjadi kesalahan saat mengambil harga token: {str(e)}"
@mcp.tool()
async def get_multiple_token_prices(network: str, tokens: str) -> str:
"""Get prices for multiple tokens in a single request.
Args:
network: Network identifier (mainnet, devnet)
tokens: Comma-separated list of token addresses
"""
try:
# Berdasarkan dokumentasi Moralis API, format endpoint yang benar adalah:
# /token/{network}/prices (POST)
endpoint = f"/token/{network}/prices"
# Parse token addresses
token_addresses = [addr.strip() for addr in tokens.split(",")]
# Prepare request body
request_body = {"tokens": token_addresses}
print(f"DEBUG - Fetching prices for multiple tokens on {network}: {token_addresses}", file=sys.stderr)
# Untuk POST request, kita perlu menggunakan fetch_json langsung
url = f"{BASE_URL}{endpoint}"
headers = {"X-API-Key": API_KEY, "Content-Type": "application/json"}
# Kirim POST request
data = await fetch_json(url, method="POST", headers=headers, json=request_body)
if not data:
return f"Tidak dapat mengambil harga untuk token-token yang diminta di {network}."
if "error" in data:
return f"Tidak dapat mengambil harga untuk token-token yang diminta di {network}. Error: {data.get('error')}"
# Format response as markdown
result = f"# Multiple Token Prices\n\n"
result += f"| Token | Symbol | USD Price | SOL Price | 24h Change |\n"
result += f"|-------|--------|-----------|-----------|------------|\n"
for token_data in data:
# Format USD price
usd_price = token_data.get('usdPrice')
if usd_price is not None:
try:
usd_price_float = float(usd_price)
if usd_price_float < 0.01:
formatted_usd = f"${usd_price_float:.8f}"
else:
formatted_usd = f"${usd_price_float:.4f}"
except (ValueError, TypeError):
formatted_usd = f"${usd_price}"
else:
formatted_usd = "N/A"
# Format native price
native_price = token_data.get('nativePrice')
if native_price is not None:
try:
native_price_float = float(native_price)
formatted_native = f"{native_price_float:.8f} SOL"
except (ValueError, TypeError):
formatted_native = f"{native_price} SOL"
else:
formatted_native = "N/A"
# Get 24h change
change_24h = token_data.get('24hrChange', 'N/A')
if change_24h != 'N/A':
change_24h = f"{change_24h}%"
result += f"| {token_data.get('tokenName', 'Unknown')} | {token_data.get('tokenSymbol', 'Unknown')} | {formatted_usd} | {formatted_native} | {change_24h} |\n"
return result
except Exception as e:
print(f"ERROR in get_multiple_token_prices: {str(e)}", file=sys.stderr)
return f"Terjadi kesalahan saat mengambil harga multiple token: {str(e)}"
@mcp.tool()
async def get_pump_fun_tokens(status: str = None, sort_by: str = None, limit: int = 50) -> str:
"""Get Pump.fun tokens data.
Args:
status: Token status (bonding, graduated, all)
sort_by: Sort criteria (created_at, market_cap, volume)
limit: Results limit (max 100)
"""
try:
# Berdasarkan dokumentasi API terbaru, Pump.fun tidak lagi menggunakan endpoint khusus
# Sebagai gantinya, kita perlu mengambil daftar token dari Jupiter atau Raydium
# dan kemudian memfilter token-token yang berasal dari Pump.fun
# Siapkan parameter
params = {"limit": min(limit, 100)} # Ensure limit doesn't exceed 100
if status:
params["status"] = status
if sort_by:
params["sort_by"] = sort_by
# Coba endpoint untuk mendapatkan token populer di Solana
endpoint = "/market-data/solana/tokens"
# Debug info
print(f"DEBUG - Fetching Solana tokens with params: {params}", file=sys.stderr)
data = await fetch_moralis_json(endpoint, params)
if not data:
return f"Tidak dapat mengambil data token Solana."
if "error" in data:
return f"Tidak dapat mengambil data token Solana. Error: {data.get('error')}"
# Format response as markdown
result = f"# Popular Solana Tokens\n\n"
# Check data structure
tokens = []
if isinstance(data, list):
tokens = data
elif isinstance(data, dict) and "tokens" in data:
tokens = data["tokens"]
elif isinstance(data, dict) and "result" in data:
tokens = data["result"]
if not tokens:
return "Tidak ada data token yang ditemukan."
# Filter token yang mungkin dari Pump.fun (berdasarkan metadata atau properti tertentu)
# Catatan: Ini hanya perkiraan karena tidak ada flag eksplisit untuk token Pump.fun
pump_tokens = []
for token in tokens:
# Cek apakah token memiliki indikasi berasal dari Pump.fun
# Misalnya, memiliki properti tertentu atau dari exchange tertentu
if token.get("exchange") == "pump-fun" or "pump.fun" in str(token.get("name", "")).lower() or "pump.fun" in str(token.get("description", "")).lower():
pump_tokens.append(token)
# Jika tidak ada token Pump.fun yang ditemukan, tampilkan token populer saja
display_tokens = pump_tokens if pump_tokens else tokens
# Format as table
result += f"| Name | Symbol | Price (USD) | Market Cap | Volume 24h | Change 24h |\n"
result += f"|------|--------|-------------|------------|------------|------------|\n"
# Limit to first 20 tokens for readability
display_limit = min(20, len(display_tokens))
for token in display_tokens[:display_limit]:
# Format token name and symbol
name = token.get("name", "Unknown")
symbol = token.get("symbol", "Unknown")
# Format price
price_usd = "N/A"
if "price" in token and token["price"] is not None:
try:
price_val = float(token["price"])
if price_val < 0.01:
price_usd = f"${price_val:.8f}"
else:
price_usd = f"${price_val:.4f}"
except (ValueError, TypeError):
price_usd = str(token["price"])
# Format market cap
market_cap = "N/A"
if "marketCap" in token and token["marketCap"] is not None:
try:
mcap_val = float(token["marketCap"])
market_cap = f"${mcap_val:,.2f}"
except (ValueError, TypeError):
market_cap = str(token["marketCap"])
# Format volume
volume = "N/A"
if "volume24h" in token and token["volume24h"] is not None:
try:
vol_val = float(token["volume24h"])
volume = f"${vol_val:,.2f}"
except (ValueError, TypeError):
volume = str(token["volume24h"])
# Format price change
price_change = "N/A"
if "priceChange24h" in token and token["priceChange24h"] is not None:
try:
change_val = float(token["priceChange24h"])
price_change = f"{change_val:+.2f}%"
except (ValueError, TypeError):
price_change = str(token["priceChange24h"])
result += f"| {name} | {symbol} | {price_usd} | {market_cap} | {volume} | {price_change} |\n"
# Show total count if more than display limit
if len(display_tokens) > display_limit:
result += f"\n*Showing {display_limit} of {len(display_tokens)} tokens*\n"
# Tambahkan catatan jika tidak ada token Pump.fun yang ditemukan
if not pump_tokens and tokens:
result += f"\n**Note:** Tidak dapat mengidentifikasi token Pump.fun secara spesifik. Menampilkan token Solana populer sebagai gantinya.\n"
return result
except Exception as e:
print(f"ERROR in get_pump_fun_tokens: {str(e)}", file=sys.stderr)
return f"Terjadi kesalahan saat mengambil data token: {str(e)}"
@mcp.tool()
async def get_pump_fun_token_price(network: str, address: str) -> str:
"""Get price data specifically for Pump.fun tokens.
Args:
network: Network identifier (mainnet, devnet)
address: Pump.fun token mint address
"""
try:
# Berdasarkan dokumentasi Moralis API, format endpoint yang benar adalah:
# /token/{network}/{address}/price/pump-fun
endpoint = f"/token/{network}/{address}/price/pump-fun"
print(f"DEBUG - Fetching Pump.fun token price for {address} on {network}", file=sys.stderr)
data = await fetch_moralis_json(endpoint)
if not data:
return f"Tidak dapat mengambil data harga token Pump.fun untuk {address} di {network}."
if "error" in data:
# Jika endpoint khusus Pump.fun gagal, coba endpoint harga token biasa
print(f"DEBUG - Pump.fun endpoint failed, trying regular token price endpoint", file=sys.stderr)
return await get_token_price(network, address)
# Format response as markdown
result = f"# Pump.fun Token Price: {data.get('tokenName', 'Unknown')} ({data.get('tokenSymbol', 'Unknown')})\n\n"
result += f"| Property | Value |\n"
result += f"|----------|-------|\n"
# Format USD price
usd_price = data.get('usdPrice')
if usd_price is not None:
try:
usd_price_float = float(usd_price)
if usd_price_float < 0.01:
formatted_usd = f"${usd_price_float:.8f}"
else:
formatted_usd = f"${usd_price_float:.4f}"
except (ValueError, TypeError):
formatted_usd = f"${usd_price}"
else:
formatted_usd = "N/A"
# Format SOL price
sol_price = data.get('solPrice')
if sol_price is not None:
try:
sol_price_float = float(sol_price)
formatted_sol = f"{sol_price_float:.8f} SOL"
except (ValueError, TypeError):
formatted_sol = f"{sol_price} SOL"
else:
formatted_sol = "N/A"
# Format market cap
market_cap = "N/A"
if "marketCap" in data and data["marketCap"] is not None:
try:
mcap_val = float(data["marketCap"])
market_cap = f"${mcap_val:,.2f}"
except (ValueError, TypeError):
market_cap = str(data["marketCap"])
# Format liquidity
liquidity = "N/A"
if "liquidityUsd" in data and data["liquidityUsd"] is not None:
try:
liq_val = float(data["liquidityUsd"])
liquidity = f"${liq_val:,.2f}"
except (ValueError, TypeError):
liquidity = str(data["liquidityUsd"])
# Format volume
volume = "N/A"
if "volume24h" in data and data["volume24h"] is not None:
try:
vol_val = float(data["volume24h"])
volume = f"${vol_val:,.2f}"
except (ValueError, TypeError):
volume = str(data["volume24h"])
# Format price change
price_change = "N/A"
if "priceChange24h" in data and data["priceChange24h"] is not None:
try:
change_val = float(data["priceChange24h"])
price_change = f"{change_val:+.2f}%"
except (ValueError, TypeError):
price_change = str(data["priceChange24h"])
result += f"| USD Price | {formatted_usd} |\n"
result += f"| SOL Price | {formatted_sol} |\n"
result += f"| Market Cap | {market_cap} |\n"
result += f"| Liquidity | {liquidity} |\n"
result += f"| 24h Volume | {volume} |\n"
result += f"| 24h Price Change | {price_change} |\n"
# Bonding curve progress (jika ada)
if "bondingCurveProgress" in data and data["bondingCurveProgress"] is not None:
try:
progress = float(data["bondingCurveProgress"]) * 100
result += f"| Bonding Curve Progress | {progress:.2f}% |\n"
except (ValueError, TypeError):
result += f"| Bonding Curve Progress | {data['bondingCurveProgress']} |\n"
# Status graduated
if "isGraduated" in data:
status = "Graduated" if data["isGraduated"] else "Bonding"
result += f"| Status | {status} |\n"
# Raydium pool (jika ada)
if "raydiumPool" in data and data["raydiumPool"]:
result += f"| Raydium Pool | `{data['raydiumPool']}` |\n"
return result
except Exception as e:
print(f"ERROR in get_pump_fun_token_price: {str(e)}", file=sys.stderr)
return f"Terjadi kesalahan saat mengambil data harga token Pump.fun: {str(e)}"
@mcp.tool()
async def get_native_balance(network: str, address: str) -> str:
"""Get native SOL balance for a wallet address.
Args:
network: Network identifier (mainnet, devnet)
address: Wallet address
"""
try:
# Berdasarkan dokumentasi Moralis API, format endpoint yang benar adalah:
# /account/{network}/{address}/balance
endpoint = f"/account/{network}/{address}/balance"
print(f"DEBUG - Fetching native balance for {address} on {network}", file=sys.stderr)
data = await fetch_moralis_json(endpoint)
if not data:
return f"Tidak dapat mengambil saldo SOL untuk {address} di {network}."
if "error" in data:
return f"Tidak dapat mengambil saldo SOL untuk {address} di {network}. Error: {data.get('error')}"
# Format response as markdown
result = f"# Native SOL Balance\n\n"
result += f"| Property | Value |\n"
result += f"|----------|-------|\n"
result += f"| Address | `{address}` |\n"
# Format balance
balance = data.get('solana')
if balance is not None:
try:
balance_float = float(balance)
formatted_balance = f"{balance_float:.9f} SOL"
except (ValueError, TypeError):
formatted_balance = f"{balance} SOL"
else:
formatted_balance = "0 SOL"
result += f"| Balance | {formatted_balance} |\n"
return result
except Exception as e:
print(f"ERROR in get_native_balance: {str(e)}", file=sys.stderr)
return f"Terjadi kesalahan saat mengambil saldo SOL: {str(e)}"
@mcp.tool()
async def get_token_balances(network: str, address: str, limit: int = 100) -> str:
"""Get token balances for a wallet address.
Args:
network: Network identifier (mainnet, devnet)
address: Wallet address
limit: Maximum number of results (default 100)
"""
try:
# Berdasarkan dokumentasi Moralis API, format endpoint yang benar adalah:
# /account/{network}/{address}/tokens
endpoint = f"/account/{network}/{address}/tokens"
# Siapkan parameter
params = {"limit": min(limit, 100)} # Ensure limit doesn't exceed 100
print(f"DEBUG - Fetching token balances for {address} on {network}", file=sys.stderr)
data = await fetch_moralis_json(endpoint, params=params)
if not data:
return f"Tidak dapat mengambil saldo token untuk {address} di {network}."
if "error" in data:
return f"Tidak dapat mengambil saldo token untuk {address} di {network}. Error: {data.get('error')}"
# Format response as markdown
result = f"# Token Balances for {address}\n\n"
# Check if data is a list
tokens = data if isinstance(data, list) else data.get("tokens", [])
if not tokens:
return f"Tidak ada saldo token yang ditemukan untuk {address} di {network}."
# Format as table
result += f"| Token | Symbol | Balance | USD Value |\n"
result += f"|-------|--------|---------|------------|\n"
# Limit to first 20 tokens for readability
display_limit = min(20, len(tokens))
for token in tokens[:display_limit]:
# Format token name and symbol
name = token.get("name", "Unknown")
symbol = token.get("symbol", "Unknown")
# Format balance
balance = "0"
if "balance" in token and token["balance"] is not None:
try:
decimals = int(token.get("decimals", 0))
raw_balance = int(token["balance"])
formatted_balance = raw_balance / (10 ** decimals)
balance = f"{formatted_balance:,.4f}"
except (ValueError, TypeError):
balance = str(token["balance"])
# Format USD value
usd_value = "N/A"
if "usd_value" in token and token["usd_value"] is not None:
try:
usd_val = float(token["usd_value"])
usd_value = f"${usd_val:,.2f}"
except (ValueError, TypeError):
usd_value = str(token["usd_value"])
result += f"| {name} | {symbol} | {balance} | {usd_value} |\n"
# Show total count if more than display limit
if len(tokens) > display_limit:
result += f"\n*Showing {display_limit} of {len(tokens)} tokens*\n"
return result
except Exception as e:
print(f"ERROR in get_token_balances: {str(e)}", file=sys.stderr)
return f"Terjadi kesalahan saat mengambil saldo token: {str(e)}"
@mcp.tool()
async def get_token_pairs(network: str, address: str) -> str:
"""Get trading pairs and liquidity data for a token.
Args:
network: Network identifier (mainnet, devnet)
address: Token mint address
"""
try:
# Berdasarkan dokumentasi Moralis API, format endpoint yang benar adalah:
# /token/{network}/{address}/pairs
endpoint = f"/token/{network}/{address}/pairs"
print(f"DEBUG - Fetching token pairs for {address} on {network}", file=sys.stderr)
data = await fetch_moralis_json(endpoint)
if not data:
return f"Tidak dapat mengambil data pair token untuk {address} di {network}."
if "error" in data:
return f"Tidak dapat mengambil data pair token untuk {address} di {network}. Error: {data.get('error')}"
# Format response as markdown
result = f"# Trading Pairs for Token\n\n"
# Check if data is a list or has a pairs property
pairs = data if isinstance(data, list) else data.get("pairs", [])
if not pairs:
return f"Tidak ada pair trading yang ditemukan untuk token {address} di {network}."
# Format as table
result += f"| Pair | DEX | Liquidity | Volume 24h | Price |\n"
result += f"|------|-----|-----------|-----------|-------|\n"
# Limit to first 15 pairs for readability
display_limit = min(15, len(pairs))
for pair in pairs[:display_limit]:
# Format pair name
pair_name = pair.get("name", "Unknown")
# Format DEX name
dex = pair.get("dex", "Unknown")
# Format liquidity
liquidity = "N/A"
if "liquidity" in pair and pair["liquidity"] is not None:
try:
liq_val = float(pair["liquidity"])
liquidity = f"${liq_val:,.2f}"
except (ValueError, TypeError):
liquidity = str(pair["liquidity"])
# Format volume
volume = "N/A"
if "volume_24h" in pair and pair["volume_24h"] is not None:
try:
vol_val = float(pair["volume_24h"])
volume = f"${vol_val:,.2f}"
except (ValueError, TypeError):
volume = str(pair["volume_24h"])
# Format price
price = "N/A"
if "price" in pair and pair["price"] is not None:
try:
price_val = float(pair["price"])
if price_val < 0.01:
price = f"${price_val:.8f}"
else:
price = f"${price_val:.4f}"
except (ValueError, TypeError):
price = str(pair["price"])
result += f"| {pair_name} | {dex} | {liquidity} | {volume} | {price} |\n"
# Show total count if more than display limit
if len(pairs) > display_limit:
result += f"\n*Showing {display_limit} of {len(pairs)} pairs*\n"
return result
except Exception as e:
print(f"ERROR in get_token_pairs: {str(e)}", file=sys.stderr)
return f"Terjadi kesalahan saat mengambil data pair token: {str(e)}"
@mcp.tool()
async def get_token_swaps(network: str, address: str, from_date: str = None, to_date: str = None, limit: int = 100) -> str:
"""Get swap transactions for a token.
Args:
network: Network identifier (mainnet, devnet)
address: Token mint address
from_date: Start date (ISO 8601)
to_date: End date (ISO 8601)
limit: Results limit (max 100)
"""
try:
# Berdasarkan dokumentasi Moralis API, format endpoint yang benar adalah:
# /token/{network}/{address}/swaps
endpoint = f"/token/{network}/{address}/swaps"
# Siapkan parameter
params = {"limit": min(limit, 100)} # Ensure limit doesn't exceed 100
if from_date:
params["from_date"] = from_date
if to_date:
params["to_date"] = to_date
print(f"DEBUG - Fetching token swaps for {address} on {network}", file=sys.stderr)
data = await fetch_moralis_json(endpoint, params=params)
if not data:
return f"Tidak dapat mengambil data swap token untuk {address} di {network}."
if "error" in data:
return f"Tidak dapat mengambil data swap token untuk {address} di {network}. Error: {data.get('error')}"
# Format response as markdown
result = f"# Token Swap Transactions\n\n"
# Check if data is a list or has a swaps property
swaps = data if isinstance(data, list) else data.get("swaps", [])
if not swaps:
return f"Tidak ada transaksi swap yang ditemukan untuk token {address} di {network}."
# Format as table
result += f"| Type | Amount | Value | Price | Time | Tx Hash |\n"
result += f"|------|--------|-------|-------|------|---------|\n"
# Limit to first 15 swaps for readability
display_limit = min(15, len(swaps))
for swap in swaps[:display_limit]:
# Format transaction type (buy/sell)
tx_type = swap.get("type", "Unknown").capitalize()
# Format amount
amount = "N/A"
if "token_amount" in swap and swap["token_amount"] is not None:
try:
amount_val = float(swap["token_amount"])
amount = f"{amount_val:,.4f}"
except (ValueError, TypeError):
amount = str(swap["token_amount"])
# Format USD value
value = "N/A"
if "usd_value" in swap and swap["usd_value"] is not None:
try:
value_val = float(swap["usd_value"])
value = f"${value_val:,.2f}"
except (ValueError, TypeError):
value = str(swap["usd_value"])
# Format price
price = "N/A"
if "price" in swap and swap["price"] is not None:
try:
price_val = float(swap["price"])
if price_val < 0.01:
price = f"${price_val:.8f}"
else:
price = f"${price_val:.4f}"
except (ValueError, TypeError):
price = str(swap["price"])
# Format timestamp
timestamp = "N/A"
if "block_timestamp" in swap and swap["block_timestamp"] is not None:
try:
from datetime import datetime
dt = datetime.fromisoformat(swap["block_timestamp"].replace('Z', '+00:00'))
timestamp = dt.strftime('%Y-%m-%d %H:%M:%S')
except (ValueError, TypeError):
timestamp = str(swap["block_timestamp"])
# Format transaction hash
tx_hash = swap.get("transaction_hash", "N/A")
if len(tx_hash) > 10:
tx_hash = f"`{tx_hash[:6]}...{tx_hash[-4:]}`"
result += f"| {tx_type} | {amount} | {value} | {price} | {timestamp} | {tx_hash} |\n"
# Show total count if more than display limit
if len(swaps) > display_limit:
result += f"\n*Showing {display_limit} of {len(swaps)} swap transactions*\n"
return result
except Exception as e:
print(f"ERROR in get_token_swaps: {str(e)}", file=sys.stderr)
return f"Terjadi kesalahan saat mengambil data swap token: {str(e)}"
@mcp.tool()
async def get_nft_transfers(network: str, address: str, from_date: str = None, to_date: str = None, limit: int = 100) -> str:
"""Get transfer history for a specific NFT.
Args:
network: Network identifier (mainnet, devnet)
address: NFT mint address
from_date: Start date (ISO 8601)
to_date: End date (ISO 8601)
limit: Results limit (max 100)
"""
try:
# Berdasarkan dokumentasi Moralis API, format endpoint yang benar adalah:
# /nft/{network}/{address}/transfers
endpoint = f"/nft/{network}/{address}/transfers"
# Siapkan parameter
params = {"limit": min(limit, 100)} # Ensure limit doesn't exceed 100
if from_date:
params["from_date"] = from_date
if to_date:
params["to_date"] = to_date
print(f"DEBUG - Fetching NFT transfers for {address} on {network}", file=sys.stderr)
data = await fetch_moralis_json(endpoint, params=params)
if not data:
return f"Tidak dapat mengambil data transfer NFT untuk {address} di {network}."
if "error" in data:
return f"Tidak dapat mengambil data transfer NFT untuk {address} di {network}. Error: {data.get('error')}"
# Format response as markdown
result = f"# NFT Transfer History\n\n"
# Check if data is a list or has a transfers property
transfers = data if isinstance(data, list) else data.get("transfers", [])
if not transfers:
return f"Tidak ada riwayat transfer yang ditemukan untuk NFT {address} di {network}."
# Format as table
result += f"| From | To | Time | Tx Hash |\n"
result += f"|------|----|----|---------|\n"
# Limit to first 15 transfers for readability
display_limit = min(15, len(transfers))
for transfer in transfers[:display_limit]:
# Format from address
from_address = transfer.get("from_address", "N/A")
if len(from_address) > 10:
from_address = f"`{from_address[:6]}...{from_address[-4:]}`"
# Format to address
to_address = transfer.get("to_address", "N/A")
if len(to_address) > 10:
to_address = f"`{to_address[:6]}...{to_address[-4:]}`"
# Format timestamp
timestamp = "N/A"
if "block_timestamp" in transfer and transfer["block_timestamp"] is not None:
try:
from datetime import datetime
dt = datetime.fromisoformat(transfer["block_timestamp"].replace('Z', '+00:00'))
timestamp = dt.strftime('%Y-%m-%d %H:%M:%S')
except (ValueError, TypeError):
timestamp = str(transfer["block_timestamp"])
# Format transaction hash
tx_hash = transfer.get("transaction_hash", "N/A")
if len(tx_hash) > 10:
tx_hash = f"`{tx_hash[:6]}...{tx_hash[-4:]}`"
result += f"| {from_address} | {to_address} | {timestamp} | {tx_hash} |\n"
# Show total count if more than display limit
if len(transfers) > display_limit:
result += f"\n*Showing {display_limit} of {len(transfers)} transfers*\n"
return result
except Exception as e:
print(f"ERROR in get_nft_transfers: {str(e)}", file=sys.stderr)
return f"Terjadi kesalahan saat mengambil data transfer NFT: {str(e)}"
@mcp.tool()
async def get_wallet_nfts(network: str, address: str, limit: int = 100) -> str:
"""Get all NFTs owned by a wallet address.
Args:
network: Network identifier (mainnet, devnet)
address: Wallet address
limit: Results limit (max 100)
"""
try:
# Berdasarkan dokumentasi Moralis API, format endpoint yang benar adalah:
# /account/{network}/{address}/nfts
endpoint = f"/account/{network}/{address}/nfts"
# Siapkan parameter
params = {"limit": min(limit, 100)} # Ensure limit doesn't exceed 100
print(f"DEBUG - Fetching NFTs for wallet {address} on {network}", file=sys.stderr)
data = await fetch_moralis_json(endpoint, params=params)
if not data:
return f"Tidak dapat mengambil data NFT untuk wallet {address} di {network}."
if "error" in data:
return f"Tidak dapat mengambil data NFT untuk wallet {address} di {network}. Error: {data.get('error')}"
# Format response as markdown
result = f"# NFTs Owned by {address}\n\n"
# Check if data is a list or has a nfts property
nfts = data if isinstance(data, list) else data.get("nfts", [])
if not nfts:
return f"Tidak ada NFT yang ditemukan untuk wallet {address} di {network}."
# Format as table
result += f"| Name | Collection | Token ID | Image |\n"
result += f"|------|-----------|----------|-------|\n"
# Limit to first 15 NFTs for readability
display_limit = min(15, len(nfts))
for nft in nfts[:display_limit]:
# Format NFT name
name = nft.get("name", "Unnamed NFT")
if not name:
name = "Unnamed NFT"
# Format collection name
collection = nft.get("collection_name", "Unknown Collection")
if not collection:
collection = "Unknown Collection"
# Format token ID
token_id = nft.get("token_id", "N/A")
if token_id and len(str(token_id)) > 8:
token_id = f"{str(token_id)[:4]}...{str(token_id)[-4:]}"
# Format image URL
image = "No Image"
if "image" in nft and nft["image"]:
image_url = nft["image"]
if image_url.startswith("http"):
image = f"[View](${image_url})"
result += f"| {name} | {collection} | {token_id} | {image} |\n"
# Show total count if more than display limit
if len(nfts) > display_limit:
result += f"\n*Showing {display_limit} of {len(nfts)} NFTs*\n"
return result
except Exception as e:
print(f"ERROR in get_wallet_nfts: {str(e)}", file=sys.stderr)
return f"Terjadi kesalahan saat mengambil data NFT wallet: {str(e)}"
@mcp.tool()
async def search_nfts(network: str, query: str = None, collection: str = None, limit: int = 100) -> str:
"""Search NFTs by metadata criteria.
Args:
network: Network identifier (mainnet, devnet)
query: Search query (name, description, attributes)
collection: Collection address filter
limit: Results limit (max 100)
"""
try:
# Berdasarkan dokumentasi Moralis API, format endpoint yang benar adalah:
# /nft/{network}/search
endpoint = f"/nft/{network}/search"
# Siapkan parameter
params = {"limit": min(limit, 100)} # Ensure limit doesn't exceed 100
if query:
params["query"] = query
if collection:
params["collection"] = collection
print(f"DEBUG - Searching NFTs on {network} with query: {query}", file=sys.stderr)
data = await fetch_moralis_json(endpoint, params=params)
if not data:
return f"Tidak dapat mencari NFT di {network}."
if "error" in data:
return f"Tidak dapat mencari NFT di {network}. Error: {data.get('error')}"
# Format response as markdown
result = f"# NFT Search Results\n\n"
# Check if data is a list or has a results property
nfts = data if isinstance(data, list) else data.get("results", [])
if not nfts:
return f"Tidak ada NFT yang ditemukan untuk pencarian di {network}."
# Format as table
result += f"| Name | Collection | Token ID | Image |\n"
result += f"|------|-----------|----------|-------|\n"
# Limit to first 15 NFTs for readability
display_limit = min(15, len(nfts))
for nft in nfts[:display_limit]:
# Format NFT name
name = nft.get("name", "Unnamed NFT")
if not name:
name = "Unnamed NFT"
# Format collection name
collection = nft.get("collection_name", "Unknown Collection")
if not collection:
collection = "Unknown Collection"
# Format token ID
token_id = nft.get("token_id", "N/A")
if token_id and len(str(token_id)) > 8:
token_id = f"{str(token_id)[:4]}...{str(token_id)[-4:]}"
# Format image URL
image = "No Image"
if "image" in nft and nft["image"]:
image_url = nft["image"]
if image_url.startswith("http"):
image = f"[View](${image_url})"
result += f"| {name} | {collection} | {token_id} | {image} |\n"
# Show total count if more than display limit
if len(nfts) > display_limit:
result += f"\n*Showing {display_limit} of {len(nfts)} NFTs*\n"
return result
except Exception as e:
print(f"ERROR in search_nfts: {str(e)}", file=sys.stderr)
return f"Terjadi kesalahan saat mencari NFT: {str(e)}"
@mcp.tool()
async def get_portfolio(network: str, address: str) -> str:
"""Get comprehensive wallet portfolio including tokens, NFTs, and native balance.
Args:
network: Network identifier (mainnet, devnet)
address: Wallet address
"""
try:
# Berdasarkan dokumentasi Moralis API, format endpoint yang benar adalah:
# /account/{network}/{address}/portfolio
endpoint = f"/account/{network}/{address}/portfolio"
print(f"DEBUG - Fetching portfolio for {address} on {network}", file=sys.stderr)
data = await fetch_moralis_json(endpoint)
if not data:
return f"Tidak dapat mengambil data portfolio untuk {address} di {network}."
if "error" in data:
return f"Tidak dapat mengambil data portfolio untuk {address} di {network}. Error: {data.get('error')}"
# Format response as markdown
result = f"# Portfolio for {address}\n\n"
# Format native balance section
native_balance = data.get("native_balance", {})
if native_balance:
result += "## Native Balance\n\n"
result += f"| Symbol | Balance | USD Value |\n"
result += f"|--------|---------|-----------|\n"
# Format balance
balance = "0"
if "balance" in native_balance and native_balance["balance"] is not None:
try:
decimals = int(native_balance.get("decimals", 9)) # SOL has 9 decimals
raw_balance = int(native_balance["balance"])
formatted_balance = raw_balance / (10 ** decimals)
balance = f"{formatted_balance:,.9f}"
except (ValueError, TypeError):
balance = str(native_balance["balance"])
# Format USD value
usd_value = "N/A"
if "usd_value" in native_balance and native_balance["usd_value"] is not None:
try:
usd_val = float(native_balance["usd_value"])
usd_value = f"${usd_val:,.2f}"
except (ValueError, TypeError):
usd_value = str(native_balance["usd_value"])
result += f"| SOL | {balance} | {usd_value} |\n\n"
# Format tokens section
tokens = data.get("tokens", [])
if tokens:
result += "## Tokens\n\n"
result += f"| Token | Symbol | Balance | USD Value |\n"
result += f"|-------|--------|---------|-----------|\n"
# Limit to first 15 tokens for readability
display_limit = min(15, len(tokens))
for token in tokens[:display_limit]:
# Format token name and symbol
name = token.get("name", "Unknown")
symbol = token.get("symbol", "Unknown")
# Format balance
balance = "0"
if "balance" in token and token["balance"] is not None:
try:
decimals = int(token.get("decimals", 0))
raw_balance = int(token["balance"])
formatted_balance = raw_balance / (10 ** decimals)
balance = f"{formatted_balance:,.4f}"
except (ValueError, TypeError):
balance = str(token["balance"])
# Format USD value
usd_value = "N/A"
if "usd_value" in token and token["usd_value"] is not None:
try:
usd_val = float(token["usd_value"])
usd_value = f"${usd_val:,.2f}"
except (ValueError, TypeError):
usd_value = str(token["usd_value"])
result += f"| {name} | {symbol} | {balance} | {usd_value} |\n"
# Show total count if more than display limit
if len(tokens) > display_limit:
result += f"\n*Showing {display_limit} of {len(tokens)} tokens*\n\n"
else:
result += "\n"
# Format NFTs section
nfts = data.get("nfts", [])
if nfts:
result += "## NFTs\n\n"
result += f"| Name | Collection | Token ID |\n"
result += f"|------|-----------|----------|\n"
# Limit to first 10 NFTs for readability
display_limit = min(10, len(nfts))
for nft in nfts[:display_limit]:
# Format NFT name
name = nft.get("name", "Unnamed NFT")
if not name:
name = "Unnamed NFT"
# Format collection name
collection = nft.get("collection_name", "Unknown Collection")
if not collection:
collection = "Unknown Collection"
# Format token ID
token_id = nft.get("token_id", "N/A")
if token_id and len(str(token_id)) > 8:
token_id = f"{str(token_id)[:4]}...{str(token_id)[-4:]}"
result += f"| {name} | {collection} | {token_id} |\n"
# Show total count if more than display limit
if len(nfts) > display_limit:
result += f"\n*Showing {display_limit} of {len(nfts)} NFTs*\n"
# If no data in any section
if not native_balance and not tokens and not nfts:
result += "No portfolio data found for this address.\n"
return result
except Exception as e:
print(f"ERROR in get_portfolio: {str(e)}", file=sys.stderr)
return f"Terjadi kesalahan saat mengambil data portfolio: {str(e)}"
@mcp.tool()
async def get_wallet_token_transfers(network: str, address: str, limit: int = 100, from_date: str = None, to_date: str = None) -> str:
"""Get token transfer history for a wallet address.
Args:
network: Network identifier (mainnet, devnet)
address: Wallet address
limit: Results limit (max 100)
from_date: Start date (ISO 8601)
to_date: End date (ISO 8601)
"""
try:
# Berdasarkan dokumentasi Moralis API, format endpoint yang benar adalah:
# /account/{network}/{address}/token-transfers
endpoint = f"/account/{network}/{address}/token-transfers"
# Siapkan parameter
params = {"limit": min(limit, 100)} # Ensure limit doesn't exceed 100
if from_date:
params["from_date"] = from_date
if to_date:
params["to_date"] = to_date
print(f"DEBUG - Fetching token transfers for wallet {address} on {network}", file=sys.stderr)
data = await fetch_moralis_json(endpoint, params=params)
if not data:
return f"Tidak dapat mengambil data transfer token untuk wallet {address} di {network}."
if "error" in data:
return f"Tidak dapat mengambil data transfer token untuk wallet {address} di {network}. Error: {data.get('error')}"
# Format response as markdown
result = f"# Token Transfer History for {address}\n\n"
# Check if data is a list or has a transfers property
transfers = data if isinstance(data, list) else data.get("transfers", [])
if not transfers:
return f"Tidak ada riwayat transfer token yang ditemukan untuk wallet {address} di {network}."
# Format as table
result += f"| Token | From | To | Amount | Time | Tx Hash |\n"
result += f"|-------|------|----|----|------|---------|\n"
# Limit to first 15 transfers for readability
display_limit = min(15, len(transfers))
for transfer in transfers[:display_limit]:
# Format token name/symbol
token_name = transfer.get("token_name", "Unknown")
token_symbol = transfer.get("token_symbol", "")
token = token_name
if token_symbol:
token = f"{token_name} ({token_symbol})"
# Format from address
from_address = transfer.get("from_address", "N/A")
if len(from_address) > 10:
from_address = f"`{from_address[:6]}...{from_address[-4:]}`"
# Format to address
to_address = transfer.get("to_address", "N/A")
if len(to_address) > 10:
to_address = f"`{to_address[:6]}...{to_address[-4:]}`"
# Format amount
amount = "N/A"
if "value" in transfer and transfer["value"] is not None:
try:
decimals = int(transfer.get("decimals", 0))
raw_amount = int(transfer["value"])
formatted_amount = raw_amount / (10 ** decimals)
amount = f"{formatted_amount:,.4f}"
except (ValueError, TypeError):
amount = str(transfer["value"])
# Format timestamp
timestamp = "N/A"
if "block_timestamp" in transfer and transfer["block_timestamp"] is not None:
try:
from datetime import datetime
dt = datetime.fromisoformat(transfer["block_timestamp"].replace('Z', '+00:00'))
timestamp = dt.strftime('%Y-%m-%d %H:%M:%S')
except (ValueError, TypeError):
timestamp = str(transfer["block_timestamp"])
# Format transaction hash
tx_hash = transfer.get("transaction_hash", "N/A")
if len(tx_hash) > 10:
tx_hash = f"`{tx_hash[:6]}...{tx_hash[-4:]}`"
result += f"| {token} | {from_address} | {to_address} | {amount} | {timestamp} | {tx_hash} |\n"
# Show total count if more than display limit
if len(transfers) > display_limit:
result += f"\n*Showing {display_limit} of {len(transfers)} transfers*\n"
return result
except Exception as e:
print(f"ERROR in get_wallet_token_transfers: {str(e)}", file=sys.stderr)
return f"Terjadi kesalahan saat mengambil data transfer token wallet: {str(e)}"
@mcp.tool()
async def get_wallet_nft_transfers(network: str, address: str, from_date: str = None, to_date: str = None, limit: int = 100) -> str:
"""Get NFT transfer history for a wallet address.
Args:
network: Network identifier (mainnet, devnet)
address: Wallet address
from_date: Start date (ISO 8601)
to_date: End date (ISO 8601)
limit: Results limit (max 100)
"""
try:
# Berdasarkan dokumentasi Moralis API, format endpoint yang benar adalah:
# /account/{network}/{address}/nft-transfers
endpoint = f"/account/{network}/{address}/nft-transfers"
# Siapkan parameter
params = {"limit": min(limit, 100)} # Ensure limit doesn't exceed 100
if from_date:
params["from_date"] = from_date
if to_date:
params["to_date"] = to_date
print(f"DEBUG - Fetching NFT transfers for wallet {address} on {network}", file=sys.stderr)
data = await fetch_moralis_json(endpoint, params=params)
if not data:
return f"Tidak dapat mengambil data transfer NFT untuk wallet {address} di {network}."
if "error" in data:
return f"Tidak dapat mengambil data transfer NFT untuk wallet {address} di {network}. Error: {data.get('error')}"
# Format response as markdown
result = f"# NFT Transfer History for {address}\n\n"
# Check if data is a list or has a transfers property
transfers = data if isinstance(data, list) else data.get("transfers", [])
if not transfers:
return f"Tidak ada riwayat transfer NFT yang ditemukan untuk wallet {address} di {network}."
# Format as table
result += f"| NFT | From | To | Time | Tx Hash |\n"
result += f"|-----|------|----|----|---------|\n"
# Limit to first 15 transfers for readability
display_limit = min(15, len(transfers))
for transfer in transfers[:display_limit]:
# Format NFT name
nft_name = transfer.get("name", "Unknown NFT")
if not nft_name:
nft_name = "Unknown NFT"
# Format from address
from_address = transfer.get("from_address", "N/A")
if len(from_address) > 10:
from_address = f"`{from_address[:6]}...{from_address[-4:]}`"
# Format to address
to_address = transfer.get("to_address", "N/A")
if len(to_address) > 10:
to_address = f"`{to_address[:6]}...{to_address[-4:]}`"
# Format timestamp
timestamp = "N/A"
if "block_timestamp" in transfer and transfer["block_timestamp"] is not None:
try:
from datetime import datetime
dt = datetime.fromisoformat(transfer["block_timestamp"].replace('Z', '+00:00'))
timestamp = dt.strftime('%Y-%m-%d %H:%M:%S')
except (ValueError, TypeError):
timestamp = str(transfer["block_timestamp"])
# Format transaction hash
tx_hash = transfer.get("transaction_hash", "N/A")
if len(tx_hash) > 10:
tx_hash = f"`{tx_hash[:6]}...{tx_hash[-4:]}`"
result += f"| {nft_name} | {from_address} | {to_address} | {timestamp} | {tx_hash} |\n"
# Show total count if more than display limit
if len(transfers) > display_limit:
result += f"\n*Showing {display_limit} of {len(transfers)} transfers*\n"
return result
except Exception as e:
print(f"ERROR in get_wallet_nft_transfers: {str(e)}", file=sys.stderr)
return f"Terjadi kesalahan saat mengambil data transfer NFT wallet: {str(e)}"
@mcp.tool()
async def get_wallet_swaps(network: str, address: str, from_date: str = None, to_date: str = None, limit: int = 100) -> str:
"""Get swap transaction history for a wallet.
Args:
network: Network identifier (mainnet, devnet)
address: Wallet address
from_date: Start date (ISO 8601)
to_date: End date (ISO 8601)
limit: Results limit (max 100)
"""
try:
# Berdasarkan dokumentasi Moralis API, format endpoint yang benar adalah:
# /account/{network}/{address}/swaps
endpoint = f"/account/{network}/{address}/swaps"
# Siapkan parameter
params = {"limit": min(limit, 100)} # Ensure limit doesn't exceed 100
if from_date:
params["from_date"] = from_date
if to_date:
params["to_date"] = to_date
print(f"DEBUG - Fetching swaps for wallet {address} on {network}", file=sys.stderr)
data = await fetch_moralis_json(endpoint, params=params)
if not data:
return f"Tidak dapat mengambil data swap untuk wallet {address} di {network}."
if "error" in data:
return f"Tidak dapat mengambil data swap untuk wallet {address} di {network}. Error: {data.get('error')}"
# Format response as markdown
result = f"# Swap Transaction History for {address}\n\n"
# Check if data is a list or has a swaps property
swaps = data if isinstance(data, list) else data.get("swaps", [])
if not swaps:
return f"Tidak ada riwayat transaksi swap yang ditemukan untuk wallet {address} di {network}."
# Format as table
result += f"| From Token | To Token | Amount | Value | Time | Tx Hash |\n"
result += f"|-----------|---------|--------|-------|------|---------|\n"
# Limit to first 15 swaps for readability
display_limit = min(15, len(swaps))
for swap in swaps[:display_limit]:
# Format from token
from_token = "Unknown"
if "from_token_symbol" in swap and swap["from_token_symbol"]:
from_token = swap["from_token_symbol"]
# Format to token
to_token = "Unknown"
if "to_token_symbol" in swap and swap["to_token_symbol"]:
to_token = swap["to_token_symbol"]
# Format amount
amount = "N/A"
if "from_amount" in swap and swap["from_amount"] is not None:
try:
from_decimals = int(swap.get("from_token_decimals", 0))
raw_amount = int(swap["from_amount"])
formatted_amount = raw_amount / (10 ** from_decimals)
amount = f"{formatted_amount:,.4f} {from_token}"
except (ValueError, TypeError):
amount = str(swap["from_amount"])
# Format USD value
value = "N/A"
if "usd_value" in swap and swap["usd_value"] is not None:
try:
usd_val = float(swap["usd_value"])
value = f"${usd_val:,.2f}"
except (ValueError, TypeError):
value = str(swap["usd_value"])
# Format timestamp
timestamp = "N/A"
if "block_timestamp" in swap and swap["block_timestamp"] is not None:
try:
from datetime import datetime
dt = datetime.fromisoformat(swap["block_timestamp"].replace('Z', '+00:00'))
timestamp = dt.strftime('%Y-%m-%d %H:%M:%S')
except (ValueError, TypeError):
timestamp = str(swap["block_timestamp"])
# Format transaction hash
tx_hash = swap.get("transaction_hash", "N/A")
if len(tx_hash) > 10:
tx_hash = f"`{tx_hash[:6]}...{tx_hash[-4:]}`"
result += f"| {from_token} | {to_token} | {amount} | {value} | {timestamp} | {tx_hash} |\n"
# Show total count if more than display limit
if len(swaps) > display_limit:
result += f"\n*Showing {display_limit} of {len(swaps)} swap transactions*\n"
return result
except Exception as e:
print(f"ERROR in get_wallet_swaps: {str(e)}", file=sys.stderr)
return f"Terjadi kesalahan saat mengambil data swap wallet: {str(e)}"
@mcp.tool()
async def get_token_price_history(network: str, address: str, from_date: str = None, to_date: str = None, interval: str = "24h") -> str:
"""Get historical price data for a token over time.
Args:
network: Network identifier (mainnet, devnet)
address: Token mint address
from_date: Start date (ISO 8601)
to_date: End date (ISO 8601)
interval: Price data interval (1h, 6h, 24h)
"""
try:
# Berdasarkan dokumentasi Moralis API, format endpoint yang benar adalah:
# /token/{network}/{address}/price/history
endpoint = f"/token/{network}/{address}/price/history"
# Siapkan parameter
params = {"interval": interval}
if from_date:
params["from_date"] = from_date
if to_date:
params["to_date"] = to_date
print(f"DEBUG - Fetching token price history for {address} on {network}", file=sys.stderr)
data = await fetch_moralis_json(endpoint, params=params)
if not data:
return f"Tidak dapat mengambil data riwayat harga token untuk {address} di {network}."
if "error" in data:
return f"Tidak dapat mengambil data riwayat harga token untuk {address} di {network}. Error: {data.get('error')}"
# Format response as markdown
result = f"# Token Price History\n\n"
# Check if data is a list or has a prices property
prices = data if isinstance(data, list) else data.get("prices", [])
if not prices:
return f"Tidak ada data riwayat harga yang ditemukan untuk token {address} di {network}."
# Format as table
result += f"| Date | Price (USD) | Volume | Market Cap |\n"
result += f"|------|-------------|--------|------------|\n"
# Limit to first 20 price points for readability
display_limit = min(20, len(prices))
for price in prices[:display_limit]:
# Format timestamp
timestamp = "N/A"
if "date" in price and price["date"] is not None:
try:
from datetime import datetime
dt = datetime.fromisoformat(price["date"].replace('Z', '+00:00'))
timestamp = dt.strftime('%Y-%m-%d %H:%M')
except (ValueError, TypeError):
timestamp = str(price["date"])
# Format price
price_usd = "N/A"
if "price" in price and price["price"] is not None:
try:
price_val = float(price["price"])
if price_val < 0.01:
price_usd = f"${price_val:.8f}"
else:
price_usd = f"${price_val:.4f}"
except (ValueError, TypeError):
price_usd = str(price["price"])
# Format volume
volume = "N/A"
if "volume" in price and price["volume"] is not None:
try:
vol_val = float(price["volume"])
volume = f"${vol_val:,.2f}"
except (ValueError, TypeError):
volume = str(price["volume"])
# Format market cap
market_cap = "N/A"
if "market_cap" in price and price["market_cap"] is not None:
try:
mcap_val = float(price["market_cap"])
market_cap = f"${mcap_val:,.2f}"
except (ValueError, TypeError):
market_cap = str(price["market_cap"])
result += f"| {timestamp} | {price_usd} | {volume} | {market_cap} |\n"
# Show total count if more than display limit
if len(prices) > display_limit:
result += f"\n*Showing {display_limit} of {len(prices)} price points*\n"
return result
except Exception as e:
print(f"ERROR in get_token_price_history: {str(e)}", file=sys.stderr)
return f"Terjadi kesalahan saat mengambil data riwayat harga token: {str(e)}"
@mcp.tool()
async def get_ohlcv_by_token(network: str, address: str, timeframe: str, from_date: str = None, to_date: str = None, limit: int = 500) -> str:
"""Get OHLCV candlestick data for a token across all pairs.
Args:
network: Network identifier (mainnet, devnet)
address: Token mint address
timeframe: Timeframe for candles (1m, 5m, 15m, 30m, 1h, 4h, 1d, 1w)
from_date: Start date (ISO 8601)
to_date: End date (ISO 8601)
limit: Number of candles to return (max 2000)
"""
try:
# Berdasarkan dokumentasi Moralis API, format endpoint yang benar adalah:
# /token/{network}/{address}/ohlc
endpoint = f"/token/{network}/{address}/ohlc"
# Siapkan parameter
params = {
"timeframe": timeframe,
"limit": min(limit, 2000) # Ensure limit doesn't exceed 2000
}
if from_date:
params["from_date"] = from_date
if to_date:
params["to_date"] = to_date
print(f"DEBUG - Fetching OHLCV data for {address} on {network} with timeframe {timeframe}", file=sys.stderr)
data = await fetch_moralis_json(endpoint, params=params)
if not data:
return f"Tidak dapat mengambil data OHLCV untuk token {address} di {network}."
if "error" in data:
return f"Tidak dapat mengambil data OHLCV untuk token {address} di {network}. Error: {data.get('error')}"
# Format response as markdown
result = f"# OHLCV Data for Token ({timeframe})\n\n"
# Check if data is a list or has a candles property
candles = data if isinstance(data, list) else data.get("candles", [])
if not candles:
return f"Tidak ada data OHLCV yang ditemukan untuk token {address} di {network} dengan timeframe {timeframe}."
# Format as table
result += f"| Time | Open | High | Low | Close | Volume |\n"
result += f"|------|------|------|-----|-------|--------|\n"
# Limit to first 15 candles for readability
display_limit = min(15, len(candles))
for candle in candles[:display_limit]:
# Format timestamp
timestamp = "N/A"
if "timestamp" in candle and candle["timestamp"] is not None:
try:
from datetime import datetime
dt = datetime.fromisoformat(candle["timestamp"].replace('Z', '+00:00'))
timestamp = dt.strftime('%Y-%m-%d %H:%M')
except (ValueError, TypeError):
timestamp = str(candle["timestamp"])
# Format OHLC values
open_price = format_price(candle.get("open"))
high_price = format_price(candle.get("high"))
low_price = format_price(candle.get("low"))
close_price = format_price(candle.get("close"))
# Format volume
volume = "N/A"
if "volume" in candle and candle["volume"] is not None:
try:
vol_val = float(candle["volume"])
volume = f"${vol_val:,.2f}"
except (ValueError, TypeError):
volume = str(candle["volume"])
result += f"| {timestamp} | {open_price} | {high_price} | {low_price} | {close_price} | {volume} |\n"
# Show total count if more than display limit
if len(candles) > display_limit:
result += f"\n*Showing {display_limit} of {len(candles)} candles*\n"
return result
except Exception as e:
print(f"ERROR in get_ohlcv_by_token: {str(e)}", file=sys.stderr)
return f"Terjadi kesalahan saat mengambil data OHLCV token: {str(e)}"
# Helper function for formatting price values
def format_price(price_value):
if price_value is None:
return "N/A"
try:
price_val = float(price_value)
if price_val < 0.01:
return f"${price_val:.8f}"
else:
return f"${price_val:.4f}"
except (ValueError, TypeError):
return str(price_value)
# Tambahkan fungsi-fungsi lain sesuai kebutuhan
if __name__ == "__main__":
# Jalankan server MCP
mcp.run()