get_token_pools_v3
Retrieve Uniswap V3 pools for a specific token address and generate a markdown table with details like Version, ID, Pair, Fee Tier, Volume USD, and Liquidity for analysis and integration.
Instructions
Query all Uniswap V3 pools for a specific token and return as a formatted markdown table.
Parameters:
token_address (str): The Ethereum address of the token to query (e.g., '0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48').
ctx (Context): The API context for logging and error handling.
Returns:
A markdown-formatted string containing a table with columns: Version, ID, Pair, Fee Tier, Volume USD, Liquidity.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| token_address | Yes |
Implementation Reference
- main.py:516-549 (handler)Handler function decorated with @mcp.tool() that implements the get_token_pools_v3 tool. It queries Uniswap V3 pools for a given token address using the helper query_pools_v3, formats the results into a pandas DataFrame, and returns a markdown table.@mcp.tool() async def get_token_pools_v3(token_address: str, ctx: Context) -> str: """ Query all Uniswap V3 pools for a specific token and return as a formatted markdown table. Parameters: token_address (str): The Ethereum address of the token to query (e.g., '0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48'). ctx (Context): The API context for logging and error handling. Returns: A markdown-formatted string containing a table with columns: Version, ID, Pair, Fee Tier, Volume USD, Liquidity. """ ctx.info(f"Querying V3 pools for token: {token_address}") try: pools = await query_pools_v3(token_address) ctx.info(f"Found {len(pools)} V3 pools") # Create DataFrame directly from pools list df = pd.DataFrame([ { "Version": "v3", "ID": pool.id, "Pair": pool.pair, "Fee Tier": pool.feeTier, "Volume USD": pool.volumeUSD, "Liquidity": pool.liquidity } for pool in pools ]) return df.to_markdown(index=False) except Exception as e: ctx.error(f"Failed to query V3 pools: {str(e)}") raise
- main.py:176-236 (helper)Helper function that performs the core GraphQL query to the Uniswap V3 subgraph to fetch all pools containing the specified token, ordered by volumeUSD descending, and parses the response into Pool objects.async def query_pools_v3(token_address: str) -> List[Pool]: query = """ query($token: Bytes!) { pools( where: { or: [ {token0: $token}, {token1: $token} ] } orderBy: volumeUSD orderDirection: desc ) { id token0 { id symbol } token1 { id symbol } feeTier liquidity volumeUSD feesUSD } } """ async with httpx.AsyncClient() as client: response = await client.post( UNISWAP_V3_SUBGRAPH, headers={ "Authorization": f"Bearer {API_KEY}" }, json={ "query": query, "variables": {"token": token_address.lower()} } ) response.raise_for_status() data = response.json() if "errors" in data: raise ValueError(f"GraphQL errors: {data['errors']}") return [ Pool( id=pool["id"], token0=pool["token0"]["id"], token1=pool["token1"]["id"], feeTier=pool["feeTier"], liquidity=pool["liquidity"], volumeUSD=pool["volumeUSD"], feesUSD=pool["feesUSD"], pair=f"{pool['token0']['symbol']}/{pool['token1']['symbol']}" ) for pool in data["data"]["pools"] ]