pancakeswap-poolspy-mcp
by kukapay
from mcp.server.fastmcp import FastMCP
import httpx
import json
from datetime import datetime, timedelta
from dotenv import load_dotenv
import os
# Load environment variables from .env file
load_dotenv()
# Retrieve the API key from environment variables
API_KEY = os.getenv("THEGRAPH_API_KEY")
if not API_KEY:
raise ValueError("THEGRAPH_API_KEY is not set in the .env file")
SUBGRAPH_URL = f"https://gateway.thegraph.com/api/{API_KEY}/subgraphs/id/A1fvJWQLBeUAggX2WQTMm3FKjXTekNXo77ZySun4YN2m"
# Initialize the MCP server
mcp = FastMCP("PancakeSwap-PoolSpy")
# Async function to query the subgraph using httpx
async def query_subgraph(query: str, variables: dict = None) -> dict:
async with httpx.AsyncClient(timeout=10.0) as client:
payload = {"query": query}
if variables:
payload["variables"] = variables
response = await client.post(SUBGRAPH_URL, json=payload)
if response.status_code != 200:
raise Exception(f"Subgraph query failed with status {response.status_code}: {response.text}")
result = response.json()
if "errors" in result:
raise Exception(f"GraphQL errors: {json.dumps(result['errors'], indent=2)}")
return result
# Fetch recently created pools with a customizable time range and limit
async def fetch_recent_pools(time_range_seconds: int = 300, limit: int = 100) -> list[dict]:
time_ago = int((datetime.utcnow() - timedelta(seconds=time_range_seconds)).timestamp())
query = """
query RecentPools($timestamp: BigInt!, $limit: Int!) {
pools(
where: { createdAtTimestamp_gt: $timestamp }
orderBy: createdAtTimestamp
orderDirection: desc
first: $limit
) {
id
token0 { symbol }
token1 { symbol }
createdAtTimestamp
createdAtBlockNumber
txCount
volumeUSD
totalValueLockedUSD
}
}
"""
variables = {"timestamp": str(time_ago), "limit": limit} # Convert timestamp to string for BigInt
try:
result = await query_subgraph(query, variables)
pools = result.get("data", {}).get("pools", [])
if not pools:
print(f"No pools found for timestamp > {time_ago}. Response: {json.dumps(result, indent=2)}")
return pools
except Exception as e:
print(f"Error in fetch_recent_pools: {str(e)}")
raise
# MCP Tool: List newly created trading pools
@mcp.tool()
async def get_new_pools_bsc(time_range_seconds: int = 300, limit: int = 100) -> str:
"""
Returns a list of trading pools created in the specified time range on Pancake Swap V3 BNB Smart Chain.
Parameters:
time_range_seconds (int): The time range in seconds to look back for new pools.
Default is 300 seconds (5 minutes).
limit (int): The maximum number of pools to return.
Default is 100 pools.
"""
try:
pools = await fetch_recent_pools(time_range_seconds=time_range_seconds, limit=limit)
time_range_minutes = time_range_seconds // 60 # Convert to minutes for display
output = f"Newly Created Trading Pools (Last {time_range_minutes} Minutes, Limit: {limit}):\n"
for pool in pools:
timestamp = datetime.fromtimestamp(int(pool["createdAtTimestamp"])).strftime('%Y-%m-%d %H:%M:%S')
volume_usd = float(pool["volumeUSD"]) # Ensure float for formatting
tvl_usd = float(pool["totalValueLockedUSD"]) # Ensure float for formatting
output += (
f"Pool Address: {pool['id']}\n"
f"Tokens: {pool['token0']['symbol']}/{pool['token1']['symbol']}\n"
f"Created At: {timestamp}\n"
f"Block Number: {pool['createdAtBlockNumber']}\n"
f"Transaction Count: {pool['txCount']}\n"
f"Volume (USD): {volume_usd:.2f}\n"
f"Total Value Locked (USD): {tvl_usd:.2f}\n\n"
)
return output if pools else f"No pools created in the last {time_range_minutes} minutes."
except Exception as e:
return f"Error fetching new pools: {str(e)}"
# Run the server
if __name__ == "__main__":
mcp.run()