main.py•16.2 kB
import asyncio
import httpx
import pandas as pd
import json
from mcp.server.fastmcp import FastMCP
# Create the MCP server instance using FastMCP
mcp = FastMCP(
name="wormhole-metrics-mcp",
version="0.1.0"
)
API_BASE = "https://api.wormholescan.io"
WORMHOLE_CHAINS = {
"1": "Solana",
"2": "Ethereum",
"4": "BNB Smart Chain",
"5": "Polygon",
"6": "Avalanche",
"8": "Algorand",
"10": "Fantom",
"13": "Kaia",
"14": "Celo",
"15": "NEAR",
"16": "Moonbeam",
"17": "Neon",
"18": "Terra 2.0",
"19": "Injective",
"20": "Osmosis",
"21": "Sui",
"22": "Aptos",
"23": "Arbitrum",
"24": "Optimism",
"25": "Gnosis",
"26": "Pythnet",
"30": "Base",
"32": "Sei",
"34": "Scroll",
"35": "Mantle",
"36": "Blast",
"37": "X Layer",
"38": "Linea",
"39": "Berachain",
"40": "Seievm",
"43": "SNAXchain",
"44": "Unichain",
"45": "World Chain",
"46": "Ink",
"47": "HyperEVM",
"48": "Monad",
"50": "Mezo",
"52": "Sonic",
"53": "Converge",
"4000": "Cosmos Hub",
"4001": "Evmos",
"4002": "Kujira",
"4003": "Neutron",
"4004": "Celestia",
"4005": "Stargaze",
"4006": "SEDA",
"4007": "Dymension",
"4008": "Provenance",
"4009": "Noble"
}
def id2name(id) -> str:
id = str(id)
return WORMHOLE_CHAINS.get(id, id)
# Define the get_cross_chain_activity tool
@mcp.tool()
async def get_cross_chain_activity(
timeSpan: str = "7d",
by: str = "notional",
app: str = ""
) -> str:
"""
Fetch cross-chain activity data from Wormholescan API and return as a pandas DataFrame.
Args:
timeSpan: Time span for data (7d, 30d, 90d, 1y, all-time). Default: 7d
by: Render results by notional or tx count. Default: notional
app: Comma-separated list of apps. Default: all apps
Returns:
String representation of a pandas DataFrame containing cross-chain activity data
"""
try:
# Validate parameters
valid_time_spans = {"7d", "30d", "90d", "1y", "all-time"}
valid_by = {"notional", "tx count"}
if timeSpan not in valid_time_spans:
raise ValueError(f"Invalid timeSpan. Must be one of {valid_time_spans}")
if by not in valid_by:
raise ValueError(f"Invalid 'by' parameter. Must be one of {valid_by}")
# Construct query parameters
params = {
"timeSpan": timeSpan,
"by": by
}
if app:
params["apps"] = app # API expects 'apps' as query param name
# Make API request
async with httpx.AsyncClient() as client:
response = await client.get(
f"{API_BASE}/api/v1/x-chain-activity",
params=params
)
response.raise_for_status()
# Parse JSON response
data = response.json()
# Flatten the nested data for DataFrame
rows = []
for tx in data.get("txs", []):
source_chain = tx.get("chain")
for dest in tx.get("destinations", []):
rows.append({
"source_chain": id2name(source_chain),
"dest_chain": id2name(dest.get("chain")),
"volume": dest.get("volume"),
})
# Create DataFrame
df = pd.DataFrame(rows)
pivot_df = df.pivot(index="source_chain", columns="dest_chain", values="volume")
# Convert volume to numeric and fill NaN with empty string
pivot_df = pivot_df.apply(pd.to_numeric, errors="coerce").fillna("")
return pivot_df.to_markdown()
except Exception as e:
return str(e)
# Define the get_money_flow tool
@mcp.tool()
async def get_money_flow(
timespan: str = "1d",
from_date: str = "",
to_date: str = "",
appId: str = "",
sourceChain: str = "",
targetChain: str = ""
) -> str:
"""
Fetch transaction count and volume data from Wormholescan API for a specific period.
Args:
timespan: Time span for data (1h, 1d, 1mo, 1y). Default: 1d
from_date: From date in ISO 8601 format (e.g., 2024-01-01T15:04:05Z). Default: empty
to_date: To date in ISO 8601 format (e.g., 2024-01-01T15:04:05Z). Default: empty
appId: Application ID to filter results. Default: empty
sourceChain: Source chain ID to filter results. Default: empty
targetChain: Target chain ID to filter results. Default: empty
Returns:
String representation of a pandas DataFrame containing transaction count and volume data
"""
try:
# Validate parameters
valid_timespans = {"1h", "1d", "1mo", "1y"}
if timespan not in valid_timespans:
raise ValueError(f"Invalid timespan. Must be one of {valid_timespans}")
# Construct query parameters
params = {"timespan": timespan}
if from_date:
params["from"] = from_date
if to_date:
params["to"] = to_date
if appId:
params["appId"] = appId
if sourceChain:
params["sourceChain"] = sourceChain
if targetChain:
params["targetChain"] = targetChain
# Make API request
async with httpx.AsyncClient() as client:
response = await client.get(
f"{API_BASE}/api/v1/x-chain-activity/tops",
params=params
)
response.raise_for_status()
# Parse JSON response
data = response.json()
# Transform data for DataFrame
rows = [
{
"from": item.get("from"),
"to": item.get("to"),
"source_chain": id2name(item.get("emitter_chain")),
"volume": item.get("volume"),
"count": item.get("count")
}
for item in data
]
# Create DataFrame
df = pd.DataFrame(rows)
# Convert numeric columns
df["volume"] = pd.to_numeric(df["volume"], errors="coerce")
df["count"] = pd.to_numeric(df["count"], errors="coerce")
# Sort by 'from' date for readability
df = df.sort_values("from")
return df.to_markdown(index=False)
except Exception as e:
return str(e)
# Define the get_top_assets_by_volume tool
@mcp.tool()
async def get_top_assets_by_volume(
timeSpan: str = "7d"
) -> str:
"""
Fetch top assets by volume from Wormholescan API.
Args:
timeSpan: Time span for data (7d, 15d, 30d). Default: 7d
Returns:
String representation of a pandas DataFrame containing top assets by volume
"""
try:
# Validate parameters
valid_time_spans = {"7d", "15d", "30d"}
if timeSpan not in valid_time_spans:
raise ValueError(f"Invalid timeSpan. Must be one of {valid_time_spans}")
# Construct query parameters
params = {"timeSpan": timeSpan}
# Make API request
async with httpx.AsyncClient() as client:
response = await client.get(
f"{API_BASE}/api/v1/top-assets-by-volume",
params=params
)
response.raise_for_status()
# Parse JSON response
data = response.json()
# Transform data for DataFrame
rows = [
{
"emitter_chain": id2name(item.get("emitterChain")),
"symbol": item.get("symbol"),
"token_chain": id2name(item.get("tokenChain")),
"token_address": item.get("tokenAddress"),
"volume": item.get("volume")
}
for item in data.get("assets", [])
]
# Create DataFrame
df = pd.DataFrame(rows)
# Convert volume to numeric
df["volume"] = pd.to_numeric(df["volume"], errors="coerce")
# Sort by volume descending for readability
df = df.sort_values("volume", ascending=False)
return df.to_markdown(index=False)
except Exception as e:
return str(e)
# Define the get_top_chain_pairs_by_num_transfers tool
@mcp.tool()
async def get_top_chain_pairs_by_num_transfers(
timeSpan: str = "7d"
) -> str:
"""
Fetch top chain pairs by number of transfers from Wormholescan API.
Args:
timeSpan: Time span for data (7d, 15d, 30d). Default: 7d
Returns:
String representation of a pandas DataFrame containing top chain pairs by number of transfers
"""
try:
# Validate parameters
valid_time_spans = {"7d", "15d", "30d"}
if timeSpan not in valid_time_spans:
raise ValueError(f"Invalid timeSpan. Must be one of {valid_time_spans}")
# Construct query parameters
params = {"timeSpan": timeSpan}
# Make API request
async with httpx.AsyncClient() as client:
response = await client.get(
f"{API_BASE}/api/v1/top-chain-pairs-by-num-transfers",
params=params
)
response.raise_for_status()
# Parse JSON response
data = response.json()
# Transform data for DataFrame
rows = [
{
"source_chain": id2name(item.get("emitterChain")),
"destination_chain": id2name(item.get("destinationChain")),
"number_of_transfers": item.get("numberOfTransfers")
}
for item in data.get("chainPairs", [])
]
# Create DataFrame
df = pd.DataFrame(rows)
# Convert number_of_transfers to numeric
df["number_of_transfers"] = pd.to_numeric(df["number_of_transfers"], errors="coerce")
# Sort by number_of_transfers descending for readability
df = df.sort_values("number_of_transfers", ascending=False)
return df.to_markdown(index=False)
except Exception as e:
return str(e)
# Define the get_top_symbols_by_volume tool
@mcp.tool()
async def get_top_symbols_by_volume(
timeSpan: str = "7d"
) -> str:
"""
Fetch top symbols by volume from Wormholescan API.
Args:
timeSpan: Time span for data (7d, 15d, 30d). Default: 7d
Returns:
String representation of a pandas DataFrame containing top symbols by volume
"""
try:
# Validate parameters
valid_time_spans = {"7d", "15d", "30d"}
if timeSpan not in valid_time_spans:
raise ValueError(f"Invalid timeSpan. Must be one of {valid_time_spans}")
# Construct query parameters
params = {"timeSpan": timeSpan}
# Make API request
async with httpx.AsyncClient() as client:
response = await client.get(
f"{API_BASE}/api/v1/top-symbols-by-volume",
params=params
)
response.raise_for_status()
# Parse JSON response
data = response.json()
# Transform data for DataFrame
rows = [
{
"symbol": item.get("symbol"),
"volume": item.get("volume"),
"txs": item.get("txs")
}
for item in data.get("symbols", [])
]
# Create DataFrame
df = pd.DataFrame(rows)
# Convert numeric columns
df["volume"] = pd.to_numeric(df["volume"], errors="coerce")
df["txs"] = pd.to_numeric(df["txs"], errors="coerce")
# Sort by volume descending for readability
df = df.sort_values("volume", ascending=False)
return df.to_markdown(index=False)
except Exception as e:
return str(e)
# Define the get_top100_corridors tool
@mcp.tool()
async def get_top100_corridors(
timeSpan: str = "2d"
) -> str:
"""
Fetch top 100 token corridors by number of transactions from Wormholescan API.
Args:
timeSpan: Time span for data (2d, 7d). Default: 2d
Returns:
String representation of a pandas DataFrame containing top 100 corridors
"""
try:
# Validate parameters
valid_time_spans = {"2d", "7d"}
if timeSpan not in valid_time_spans:
raise ValueError(f"Invalid timeSpan. Must be one of {valid_time_spans}")
# Construct query parameters
params = {"timeSpan": timeSpan}
# Make API request
async with httpx.AsyncClient() as client:
response = await client.get(
f"{API_BASE}/api/v1/top-100-corridors",
params=params
)
response.raise_for_status()
# Parse JSON response
data = response.json()
# Transform data for DataFrame
rows = [
{
"source_chain": id2name(item.get("emitter_chain")),
"target_chain": id2name(item.get("target_chain")),
"token_chain": id2name(item.get("token_chain")),
"token_address": item.get("token_address"),
"txs": item.get("txs")
}
for item in data.get("corridors", [])
]
# Create DataFrame
df = pd.DataFrame(rows)
# Convert txs to numeric
df["txs"] = pd.to_numeric(df["txs"], errors="coerce")
# Sort by txs descending for readability
df = df.sort_values("txs", ascending=False)
return df.to_markdown(index=False)
except Exception as e:
return str(e)
# Define the get_kpi_list tool
@mcp.tool()
async def get_kpi_list() -> str:
"""
Fetch a list of KPIs for Wormhole from Wormholescan API.
Returns:
String representation of a pandas DataFrame containing Wormhole KPIs
"""
try:
# Make API request
async with httpx.AsyncClient() as client:
response = await client.get(
f"{API_BASE}/api/v1/scorecards"
)
response.raise_for_status()
# Parse JSON response
data = response.json()
# Transform data for DataFrame
rows = [{
"24h_messages": data.get("24h_messages"),
"total_messages": data.get("total_messages"),
"total_tx_count": data.get("total_tx_count"),
"total_volume": data.get("total_volume"),
"tvl": data.get("tvl"),
"24h_volume": data.get("24h_volume"),
"7d_volume": data.get("7d_volume"),
"30d_volume": data.get("30d_volume")
}]
# Create DataFrame
df = pd.DataFrame(rows)
# Convert numeric columns
for col in df.columns:
df[col] = pd.to_numeric(df[col], errors="coerce")
return df.to_markdown(index=False)
except Exception as e:
return str(e)
if __name__ == "__main__":
mcp.run()