Skip to main content
Glama

uniswap-poolspy-mcp

main.py6.41 kB
from mcp.server.fastmcp import FastMCP import httpx import json from datetime import datetime, timedelta from dotenv import load_dotenv import os # Load environment variables from .env file load_dotenv() # Retrieve the API key from environment variables API_KEY = os.getenv("THEGRAPH_API_KEY") if not API_KEY: raise ValueError("THEGRAPH_API_KEY is not set in the .env file") SUBGRAPH_URLS = { "ethereum": f"https://gateway.thegraph.com/api/{API_KEY}/subgraphs/id/5zvR82QoaXYFyDEKLZ9t6v9adgnptxYpKpSbxtgVENFV", "base": f"https://gateway.thegraph.com/api/{API_KEY}/subgraphs/id/GqzP4Xaehti8KSfQmv3ZctFSjnSUYZ4En5NRsiTbvZpz", "optimism": f"https://gateway.thegraph.com/api/{API_KEY}/subgraphs/id/Cghf4LfVqPiFw6fp6Y5X5Ubc8UpmUhSfJL82zwiBFLaj", "arbitrum": f"https://gateway.thegraph.com/api/{API_KEY}/subgraphs/id/FbCGRftH4a3yZugY7TnbYgPJVEv2LvMT6oF1fxPe9aJM", "polygon": f"https://gateway.thegraph.com/api/{API_KEY}/subgraphs/id/3hCPRGf4z88VC5rsBKU5AA9FBBq5nF3jbKJG7VZCbhjm", "bsc": f"https://gateway.thegraph.com/api/{API_KEY}/subgraphs/id/A1fvJWQLBeUAggX2WQTMm3FKjXTekNXo77ZySun4YN2m", "avalanche": f"https://gateway.thegraph.com/api/{API_KEY}/subgraphs/id/GVH9h9KZ9CqheUEL93qMbq7QwgoBu32QXQDPR6bev4Eo", "celo": f"https://gateway.thegraph.com/api/{API_KEY}/subgraphs/id/ESdrTJ3twMwWVoQ1hUE2u7PugEHX3QkenudD6aXCkDQ4", "blast": f"https://gateway.thegraph.com/api/{API_KEY}/subgraphs/id/2LHovKznvo8YmKC9ZprPjsYAZDCc4K5q4AYz8s3cnQn1", } ORDER_BY_OPTIONS = { "timestamp": "createdAtTimestamp", "txcount": "txCount", "volume": "volumeUSD", "tvl": "totalValueLockedUSD" } # Initialize the MCP server mcp = FastMCP("Uniswap-PoolSpy") # Async function to query the subgraph using httpx async def query_subgraph(chain: str, query: str, variables: dict = None) -> dict: async with httpx.AsyncClient(timeout=10.0) as client: payload = {"query": query} if variables: payload["variables"] = variables response = await client.post(SUBGRAPH_URLS[chain], json=payload) if response.status_code != 200: raise Exception(f"Subgraph query failed with status {response.status_code}: {response.text}") result = response.json() if "errors" in result: raise Exception(f"GraphQL errors: {json.dumps(result['errors'], indent=2)}") return result # Fetch recently created pools with a customizable time range and limit async def fetch_recent_pools(chain: str, order_by:str, time_range_seconds: int, limit: int) -> list[dict]: time_ago = int((datetime.utcnow() - timedelta(seconds=time_range_seconds)).timestamp()) order_by = ORDER_BY_OPTIONS[order_by] query = f""" query RecentPools($timestamp: BigInt!, $limit: Int!) {{ pools( where: {{ createdAtTimestamp_gt: $timestamp }} orderBy: {order_by} orderDirection: desc first: $limit ) {{ id token0 {{ symbol }} token1 {{ symbol }} createdAtTimestamp createdAtBlockNumber txCount volumeUSD totalValueLockedUSD }} }} """ variables = {"timestamp": str(time_ago), "limit": limit} # Convert timestamp to string for BigInt try: result = await query_subgraph(chain, query, variables) pools = result.get("data", {}).get("pools", []) if not pools: print(f"No pools found for timestamp > {time_ago}. Response: {json.dumps(result, indent=2)}") return pools except Exception as e: print(f"Error in fetch_recent_pools: {str(e)}") raise # MCP Tool: List newly created trading pools @mcp.tool() async def get_new_pools(chain: str = "ethereum", order_by: str = "timestamp", time_range_seconds: int = 300, limit: int = 100) -> str: """ Returns a list of trading pools created in the specified time range on Uniswap V3. Parameters: chain (str): The blockchain on which Uniswap is deployed. Default is 'ethereum'. Supported options include: 'ethereum', 'base', 'optimism', 'arbitrum', 'polygon', 'bsc', 'avalanche', 'celo' and 'blast'. order_by (str): The field to sort data in descending order before returning to the user. Default is 'timestamp'. Supported options include: - timestamp: Sort by Timestamp - txcount: Sort by Transaction Count - tvl: Sort by Total Value Locked - volume: Sort by Volume time_range_seconds (int): The time range in seconds to look back for new pools. Default is 300 seconds (5 minutes). limit (int): The maximum number of pools to return. Default is 100 pools. """ try: chain = chain.lower() if chain not in SUBGRAPH_URLS: raise ValueError(f"Chain must be one of {list(SUBGRAPH_URLS.keys())}") order_by = order_by.lower() if order_by not in ORDER_BY_OPTIONS: raise ValueError(f"Order_by must be one of {list(ORDER_BY_OPTIONS.keys())}") pools = await fetch_recent_pools(chain, order_by, time_range_seconds=time_range_seconds, limit=limit) time_range_minutes = time_range_seconds // 60 # Convert to minutes for display output = f"Newly Created Trading Pools (Last {time_range_minutes} Minutes, Limit: {limit}):\n" for pool in pools: timestamp = datetime.fromtimestamp(int(pool["createdAtTimestamp"])).strftime('%Y-%m-%d %H:%M:%S') volume_usd = float(pool["volumeUSD"]) # Ensure float for formatting tvl_usd = float(pool["totalValueLockedUSD"]) # Ensure float for formatting output += ( f"Pool Address: {pool['id']}\n" f"Tokens: {pool['token0']['symbol']}/{pool['token1']['symbol']}\n" f"Created At: {timestamp}\n" f"Block Number: {pool['createdAtBlockNumber']}\n" f"Transaction Count: {pool['txCount']}\n" f"Volume (USD): {volume_usd:.2f}\n" f"Total Value Locked (USD): {tvl_usd:.2f}\n\n" ) return output if pools else f"No pools created in the last {time_range_minutes} minutes." except Exception as e: return f"Error fetching new pools: {str(e)}" # Run the server if __name__ == "__main__": mcp.run()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kukapay/uniswap-poolspy-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server