main.py•5.26 kB
import asyncio
import httpx
from datetime import datetime
from mcp.server.fastmcp import FastMCP, Context
from typing import Dict, Any, Optional
from tabulate import tabulate
# Initialize MCP server
mcp = FastMCP("DexKlineMCP", dependencies=["httpx", "tabulate"])
# Base URL for Geckoterminal API
BASE_URL = "https://api.geckoterminal.com/api/v2"
async def get_best_pool(chain: str, token_address: str) -> Dict[str, Any]:
"""Fetch the pool with highest liquidity for a given token"""
async with httpx.AsyncClient() as client:
url = f"{BASE_URL}/networks/{chain}/tokens/{token_address}/pools"
try:
response = await client.get(url)
response.raise_for_status()
pools = response.json().get("data", [])
if not pools:
raise ValueError("No pools found for the specified token")
# Sort pools by reserve_in_usd (highest liquidity first)
pools.sort(key=lambda x: float(x["attributes"]["reserve_in_usd"]), reverse=True)
return pools[0]
except httpx.HTTPError as e:
raise ValueError(f"Failed to fetch pools: {str(e)}")
async def get_kline_data(chain: str, pool_address: str, timeframe: str, end_time: Optional[str], limit: int) -> Dict[str, Any]:
"""Fetch K-line data for a specific pool"""
# Map timeframe to API aggregate parameter and endpoint
timeframe_map = {
"1m": {"aggregate": "1", "endpoint": "minute"},
"5m": {"aggregate": "5", "endpoint": "minute"},
"15m": {"aggregate": "15", "endpoint": "minute"},
"1h": {"aggregate": "1", "endpoint": "hour"},
"4h": {"aggregate": "4", "endpoint": "hour"},
"12h": {"aggregate": "12", "endpoint": "hour"},
"1d": {"aggregate": "1", "endpoint": "day"}
}
if timeframe not in timeframe_map:
raise ValueError(f"Invalid timeframe. Must be one of: {', '.join(timeframe_map.keys())}")
if limit > 1000:
raise ValueError("Limit cannot exceed 1000")
params = {
"aggregate": timeframe_map[timeframe]["aggregate"],
"limit": str(min(limit, 1000))
}
if end_time:
try:
datetime.fromisoformat(end_time.replace("Z", "+00:00"))
params["before_timestamp"] = str(int(datetime.fromisoformat(end_time.replace("Z", "+00:00")).timestamp()))
except ValueError:
raise ValueError("Invalid end_time format. Must be ISO 8601 string")
else:
# Use current UTC time as default end_time
params["before_timestamp"] = str(int(datetime.utcnow().timestamp()))
async with httpx.AsyncClient() as client:
url = f"{BASE_URL}/networks/{chain}/pools/{pool_address}/ohlcv/{timeframe_map[timeframe]['endpoint']}"
try:
response = await client.get(url, params=params)
response.raise_for_status()
return response.json()
except httpx.HTTPError as e:
raise ValueError(f"Failed to fetch K-line data: {str(e)}")
@mcp.tool()
async def get_kline(chain: str, address: str, timeframe: str = "1m", end_time: Optional[str] = None, limit: int = 100, ctx: Context = None) -> str:
"""
Fetch K-line data for a specified token on a given chain and return it as a formatted table.
Parameters:
chain (str): Blockchain network (e.g., 'eth', 'bsc', 'solana')
address (str): Token contract address
timeframe (str): K-line timeframe (e.g., '1m', '5m', '15m', '1h', '4h', '12h', '1d'). Default: '1n'
end_time (str, optional): ISO 8601 timestamp for data end time (e.g., '2025-07-03T02:14:00Z'). Default: current UTC time
limit (int): Number of data points to return (max 1000). Default: 100
Returns:
str: String containing pair name and K-line data in a formatted table
"""
ctx.info(f"Fetching K-line data for {chain}/{address}")
# Validate inputs
supported_chains = ["eth", "bsc", "solana"]
if chain not in supported_chains:
raise ValueError(f"Unsupported chain. Must be one of: {', '.join(supported_chains)}")
# Get best pool
pool_data = await get_best_pool(chain, address)
pool_address = pool_data["attributes"]["address"]
# Fetch K-line data
kline_response = await get_kline_data(
chain,
pool_address,
timeframe,
end_time,
limit
)
# Format output
pair = f"{pool_data['attributes']['name'].split(' ')[0]}/{pool_data['attributes']['name'].split(' ')[2]}"
kline_data = kline_response["data"]["attributes"]["ohlcv_list"]
# Convert K-line data to table
headers = ["Timestamp", "Open", "High", "Low", "Close", "Volume"]
table_data = [
[
datetime.fromtimestamp(row[0]).strftime("%Y-%m-%d %H:%M:%S"),
f"{row[1]:.8f}",
f"{row[2]:.8f}",
f"{row[3]:.8f}",
f"{row[4]:.8f}",
f"{row[5]:.8f}"
]
for row in kline_data
]
table = tabulate(table_data, headers=headers, tablefmt="grid")
ctx.info(f"Successfully fetched {len(kline_data)} K-line data points")
return f"# Pair: {pair}\n\n{table}"
if __name__ == "__main__":
mcp.run()