Skip to main content
Glama

wormhole-metrics-mcp

main.py16.2 kB
import asyncio import httpx import pandas as pd import json from mcp.server.fastmcp import FastMCP # Create the MCP server instance using FastMCP mcp = FastMCP( name="wormhole-metrics-mcp", version="0.1.0" ) API_BASE = "https://api.wormholescan.io" WORMHOLE_CHAINS = { "1": "Solana", "2": "Ethereum", "4": "BNB Smart Chain", "5": "Polygon", "6": "Avalanche", "8": "Algorand", "10": "Fantom", "13": "Kaia", "14": "Celo", "15": "NEAR", "16": "Moonbeam", "17": "Neon", "18": "Terra 2.0", "19": "Injective", "20": "Osmosis", "21": "Sui", "22": "Aptos", "23": "Arbitrum", "24": "Optimism", "25": "Gnosis", "26": "Pythnet", "30": "Base", "32": "Sei", "34": "Scroll", "35": "Mantle", "36": "Blast", "37": "X Layer", "38": "Linea", "39": "Berachain", "40": "Seievm", "43": "SNAXchain", "44": "Unichain", "45": "World Chain", "46": "Ink", "47": "HyperEVM", "48": "Monad", "50": "Mezo", "52": "Sonic", "53": "Converge", "4000": "Cosmos Hub", "4001": "Evmos", "4002": "Kujira", "4003": "Neutron", "4004": "Celestia", "4005": "Stargaze", "4006": "SEDA", "4007": "Dymension", "4008": "Provenance", "4009": "Noble" } def id2name(id) -> str: id = str(id) return WORMHOLE_CHAINS.get(id, id) # Define the get_cross_chain_activity tool @mcp.tool() async def get_cross_chain_activity( timeSpan: str = "7d", by: str = "notional", app: str = "" ) -> str: """ Fetch cross-chain activity data from Wormholescan API and return as a pandas DataFrame. Args: timeSpan: Time span for data (7d, 30d, 90d, 1y, all-time). Default: 7d by: Render results by notional or tx count. Default: notional app: Comma-separated list of apps. Default: all apps Returns: String representation of a pandas DataFrame containing cross-chain activity data """ try: # Validate parameters valid_time_spans = {"7d", "30d", "90d", "1y", "all-time"} valid_by = {"notional", "tx count"} if timeSpan not in valid_time_spans: raise ValueError(f"Invalid timeSpan. Must be one of {valid_time_spans}") if by not in valid_by: raise ValueError(f"Invalid 'by' parameter. Must be one of {valid_by}") # Construct query parameters params = { "timeSpan": timeSpan, "by": by } if app: params["apps"] = app # API expects 'apps' as query param name # Make API request async with httpx.AsyncClient() as client: response = await client.get( f"{API_BASE}/api/v1/x-chain-activity", params=params ) response.raise_for_status() # Parse JSON response data = response.json() # Flatten the nested data for DataFrame rows = [] for tx in data.get("txs", []): source_chain = tx.get("chain") for dest in tx.get("destinations", []): rows.append({ "source_chain": id2name(source_chain), "dest_chain": id2name(dest.get("chain")), "volume": dest.get("volume"), }) # Create DataFrame df = pd.DataFrame(rows) pivot_df = df.pivot(index="source_chain", columns="dest_chain", values="volume") # Convert volume to numeric and fill NaN with empty string pivot_df = pivot_df.apply(pd.to_numeric, errors="coerce").fillna("") return pivot_df.to_markdown() except Exception as e: return str(e) # Define the get_money_flow tool @mcp.tool() async def get_money_flow( timespan: str = "1d", from_date: str = "", to_date: str = "", appId: str = "", sourceChain: str = "", targetChain: str = "" ) -> str: """ Fetch transaction count and volume data from Wormholescan API for a specific period. Args: timespan: Time span for data (1h, 1d, 1mo, 1y). Default: 1d from_date: From date in ISO 8601 format (e.g., 2024-01-01T15:04:05Z). Default: empty to_date: To date in ISO 8601 format (e.g., 2024-01-01T15:04:05Z). Default: empty appId: Application ID to filter results. Default: empty sourceChain: Source chain ID to filter results. Default: empty targetChain: Target chain ID to filter results. Default: empty Returns: String representation of a pandas DataFrame containing transaction count and volume data """ try: # Validate parameters valid_timespans = {"1h", "1d", "1mo", "1y"} if timespan not in valid_timespans: raise ValueError(f"Invalid timespan. Must be one of {valid_timespans}") # Construct query parameters params = {"timespan": timespan} if from_date: params["from"] = from_date if to_date: params["to"] = to_date if appId: params["appId"] = appId if sourceChain: params["sourceChain"] = sourceChain if targetChain: params["targetChain"] = targetChain # Make API request async with httpx.AsyncClient() as client: response = await client.get( f"{API_BASE}/api/v1/x-chain-activity/tops", params=params ) response.raise_for_status() # Parse JSON response data = response.json() # Transform data for DataFrame rows = [ { "from": item.get("from"), "to": item.get("to"), "source_chain": id2name(item.get("emitter_chain")), "volume": item.get("volume"), "count": item.get("count") } for item in data ] # Create DataFrame df = pd.DataFrame(rows) # Convert numeric columns df["volume"] = pd.to_numeric(df["volume"], errors="coerce") df["count"] = pd.to_numeric(df["count"], errors="coerce") # Sort by 'from' date for readability df = df.sort_values("from") return df.to_markdown(index=False) except Exception as e: return str(e) # Define the get_top_assets_by_volume tool @mcp.tool() async def get_top_assets_by_volume( timeSpan: str = "7d" ) -> str: """ Fetch top assets by volume from Wormholescan API. Args: timeSpan: Time span for data (7d, 15d, 30d). Default: 7d Returns: String representation of a pandas DataFrame containing top assets by volume """ try: # Validate parameters valid_time_spans = {"7d", "15d", "30d"} if timeSpan not in valid_time_spans: raise ValueError(f"Invalid timeSpan. Must be one of {valid_time_spans}") # Construct query parameters params = {"timeSpan": timeSpan} # Make API request async with httpx.AsyncClient() as client: response = await client.get( f"{API_BASE}/api/v1/top-assets-by-volume", params=params ) response.raise_for_status() # Parse JSON response data = response.json() # Transform data for DataFrame rows = [ { "emitter_chain": id2name(item.get("emitterChain")), "symbol": item.get("symbol"), "token_chain": id2name(item.get("tokenChain")), "token_address": item.get("tokenAddress"), "volume": item.get("volume") } for item in data.get("assets", []) ] # Create DataFrame df = pd.DataFrame(rows) # Convert volume to numeric df["volume"] = pd.to_numeric(df["volume"], errors="coerce") # Sort by volume descending for readability df = df.sort_values("volume", ascending=False) return df.to_markdown(index=False) except Exception as e: return str(e) # Define the get_top_chain_pairs_by_num_transfers tool @mcp.tool() async def get_top_chain_pairs_by_num_transfers( timeSpan: str = "7d" ) -> str: """ Fetch top chain pairs by number of transfers from Wormholescan API. Args: timeSpan: Time span for data (7d, 15d, 30d). Default: 7d Returns: String representation of a pandas DataFrame containing top chain pairs by number of transfers """ try: # Validate parameters valid_time_spans = {"7d", "15d", "30d"} if timeSpan not in valid_time_spans: raise ValueError(f"Invalid timeSpan. Must be one of {valid_time_spans}") # Construct query parameters params = {"timeSpan": timeSpan} # Make API request async with httpx.AsyncClient() as client: response = await client.get( f"{API_BASE}/api/v1/top-chain-pairs-by-num-transfers", params=params ) response.raise_for_status() # Parse JSON response data = response.json() # Transform data for DataFrame rows = [ { "source_chain": id2name(item.get("emitterChain")), "destination_chain": id2name(item.get("destinationChain")), "number_of_transfers": item.get("numberOfTransfers") } for item in data.get("chainPairs", []) ] # Create DataFrame df = pd.DataFrame(rows) # Convert number_of_transfers to numeric df["number_of_transfers"] = pd.to_numeric(df["number_of_transfers"], errors="coerce") # Sort by number_of_transfers descending for readability df = df.sort_values("number_of_transfers", ascending=False) return df.to_markdown(index=False) except Exception as e: return str(e) # Define the get_top_symbols_by_volume tool @mcp.tool() async def get_top_symbols_by_volume( timeSpan: str = "7d" ) -> str: """ Fetch top symbols by volume from Wormholescan API. Args: timeSpan: Time span for data (7d, 15d, 30d). Default: 7d Returns: String representation of a pandas DataFrame containing top symbols by volume """ try: # Validate parameters valid_time_spans = {"7d", "15d", "30d"} if timeSpan not in valid_time_spans: raise ValueError(f"Invalid timeSpan. Must be one of {valid_time_spans}") # Construct query parameters params = {"timeSpan": timeSpan} # Make API request async with httpx.AsyncClient() as client: response = await client.get( f"{API_BASE}/api/v1/top-symbols-by-volume", params=params ) response.raise_for_status() # Parse JSON response data = response.json() # Transform data for DataFrame rows = [ { "symbol": item.get("symbol"), "volume": item.get("volume"), "txs": item.get("txs") } for item in data.get("symbols", []) ] # Create DataFrame df = pd.DataFrame(rows) # Convert numeric columns df["volume"] = pd.to_numeric(df["volume"], errors="coerce") df["txs"] = pd.to_numeric(df["txs"], errors="coerce") # Sort by volume descending for readability df = df.sort_values("volume", ascending=False) return df.to_markdown(index=False) except Exception as e: return str(e) # Define the get_top100_corridors tool @mcp.tool() async def get_top100_corridors( timeSpan: str = "2d" ) -> str: """ Fetch top 100 token corridors by number of transactions from Wormholescan API. Args: timeSpan: Time span for data (2d, 7d). Default: 2d Returns: String representation of a pandas DataFrame containing top 100 corridors """ try: # Validate parameters valid_time_spans = {"2d", "7d"} if timeSpan not in valid_time_spans: raise ValueError(f"Invalid timeSpan. Must be one of {valid_time_spans}") # Construct query parameters params = {"timeSpan": timeSpan} # Make API request async with httpx.AsyncClient() as client: response = await client.get( f"{API_BASE}/api/v1/top-100-corridors", params=params ) response.raise_for_status() # Parse JSON response data = response.json() # Transform data for DataFrame rows = [ { "source_chain": id2name(item.get("emitter_chain")), "target_chain": id2name(item.get("target_chain")), "token_chain": id2name(item.get("token_chain")), "token_address": item.get("token_address"), "txs": item.get("txs") } for item in data.get("corridors", []) ] # Create DataFrame df = pd.DataFrame(rows) # Convert txs to numeric df["txs"] = pd.to_numeric(df["txs"], errors="coerce") # Sort by txs descending for readability df = df.sort_values("txs", ascending=False) return df.to_markdown(index=False) except Exception as e: return str(e) # Define the get_kpi_list tool @mcp.tool() async def get_kpi_list() -> str: """ Fetch a list of KPIs for Wormhole from Wormholescan API. Returns: String representation of a pandas DataFrame containing Wormhole KPIs """ try: # Make API request async with httpx.AsyncClient() as client: response = await client.get( f"{API_BASE}/api/v1/scorecards" ) response.raise_for_status() # Parse JSON response data = response.json() # Transform data for DataFrame rows = [{ "24h_messages": data.get("24h_messages"), "total_messages": data.get("total_messages"), "total_tx_count": data.get("total_tx_count"), "total_volume": data.get("total_volume"), "tvl": data.get("tvl"), "24h_volume": data.get("24h_volume"), "7d_volume": data.get("7d_volume"), "30d_volume": data.get("30d_volume") }] # Create DataFrame df = pd.DataFrame(rows) # Convert numeric columns for col in df.columns: df[col] = pd.to_numeric(df[col], errors="coerce") return df.to_markdown(index=False) except Exception as e: return str(e) if __name__ == "__main__": mcp.run()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kukapay/wormhole-metrics-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server