get_top100_corridors
Fetch the top 100 token corridors by transaction count from Wormholescan API to analyze cross-chain activity patterns.
Instructions
Fetch top 100 token corridors by number of transactions from Wormholescan API.
Args:
timeSpan: Time span for data (2d, 7d). Default: 2d
Returns:
String representation of a pandas DataFrame containing top 100 corridors
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| timeSpan | No | 2d |
Implementation Reference
- main.py:403-462 (handler)The main handler function for the 'get_top100_corridors' tool. It is registered via the @mcp.tool() decorator. Fetches top 100 corridors data from the Wormholescan API based on timeSpan (2d or 7d), transforms it into a pandas DataFrame with chain names resolved via id2name, sorts by txs descending, and returns a markdown table.# Define the get_top100_corridors tool @mcp.tool() async def get_top100_corridors( timeSpan: str = "2d" ) -> str: """ Fetch top 100 token corridors by number of transactions from Wormholescan API. Args: timeSpan: Time span for data (2d, 7d). Default: 2d Returns: String representation of a pandas DataFrame containing top 100 corridors """ try: # Validate parameters valid_time_spans = {"2d", "7d"} if timeSpan not in valid_time_spans: raise ValueError(f"Invalid timeSpan. Must be one of {valid_time_spans}") # Construct query parameters params = {"timeSpan": timeSpan} # Make API request async with httpx.AsyncClient() as client: response = await client.get( f"{API_BASE}/api/v1/top-100-corridors", params=params ) response.raise_for_status() # Parse JSON response data = response.json() # Transform data for DataFrame rows = [ { "source_chain": id2name(item.get("emitter_chain")), "target_chain": id2name(item.get("target_chain")), "token_chain": id2name(item.get("token_chain")), "token_address": item.get("token_address"), "txs": item.get("txs") } for item in data.get("corridors", []) ] # Create DataFrame df = pd.DataFrame(rows) # Convert txs to numeric df["txs"] = pd.to_numeric(df["txs"], errors="coerce") # Sort by txs descending for readability df = df.sort_values("txs", ascending=False) return df.to_markdown(index=False) except Exception as e: return str(e)