Skip to main content
Glama

dex-kline-mcp

main.py5.26 kB
import asyncio import httpx from datetime import datetime from mcp.server.fastmcp import FastMCP, Context from typing import Dict, Any, Optional from tabulate import tabulate # Initialize MCP server mcp = FastMCP("DexKlineMCP", dependencies=["httpx", "tabulate"]) # Base URL for Geckoterminal API BASE_URL = "https://api.geckoterminal.com/api/v2" async def get_best_pool(chain: str, token_address: str) -> Dict[str, Any]: """Fetch the pool with highest liquidity for a given token""" async with httpx.AsyncClient() as client: url = f"{BASE_URL}/networks/{chain}/tokens/{token_address}/pools" try: response = await client.get(url) response.raise_for_status() pools = response.json().get("data", []) if not pools: raise ValueError("No pools found for the specified token") # Sort pools by reserve_in_usd (highest liquidity first) pools.sort(key=lambda x: float(x["attributes"]["reserve_in_usd"]), reverse=True) return pools[0] except httpx.HTTPError as e: raise ValueError(f"Failed to fetch pools: {str(e)}") async def get_kline_data(chain: str, pool_address: str, timeframe: str, end_time: Optional[str], limit: int) -> Dict[str, Any]: """Fetch K-line data for a specific pool""" # Map timeframe to API aggregate parameter and endpoint timeframe_map = { "1m": {"aggregate": "1", "endpoint": "minute"}, "5m": {"aggregate": "5", "endpoint": "minute"}, "15m": {"aggregate": "15", "endpoint": "minute"}, "1h": {"aggregate": "1", "endpoint": "hour"}, "4h": {"aggregate": "4", "endpoint": "hour"}, "12h": {"aggregate": "12", "endpoint": "hour"}, "1d": {"aggregate": "1", "endpoint": "day"} } if timeframe not in timeframe_map: raise ValueError(f"Invalid timeframe. Must be one of: {', '.join(timeframe_map.keys())}") if limit > 1000: raise ValueError("Limit cannot exceed 1000") params = { "aggregate": timeframe_map[timeframe]["aggregate"], "limit": str(min(limit, 1000)) } if end_time: try: datetime.fromisoformat(end_time.replace("Z", "+00:00")) params["before_timestamp"] = str(int(datetime.fromisoformat(end_time.replace("Z", "+00:00")).timestamp())) except ValueError: raise ValueError("Invalid end_time format. Must be ISO 8601 string") else: # Use current UTC time as default end_time params["before_timestamp"] = str(int(datetime.utcnow().timestamp())) async with httpx.AsyncClient() as client: url = f"{BASE_URL}/networks/{chain}/pools/{pool_address}/ohlcv/{timeframe_map[timeframe]['endpoint']}" try: response = await client.get(url, params=params) response.raise_for_status() return response.json() except httpx.HTTPError as e: raise ValueError(f"Failed to fetch K-line data: {str(e)}") @mcp.tool() async def get_kline(chain: str, address: str, timeframe: str = "1m", end_time: Optional[str] = None, limit: int = 100, ctx: Context = None) -> str: """ Fetch K-line data for a specified token on a given chain and return it as a formatted table. Parameters: chain (str): Blockchain network (e.g., 'eth', 'bsc', 'solana') address (str): Token contract address timeframe (str): K-line timeframe (e.g., '1m', '5m', '15m', '1h', '4h', '12h', '1d'). Default: '1n' end_time (str, optional): ISO 8601 timestamp for data end time (e.g., '2025-07-03T02:14:00Z'). Default: current UTC time limit (int): Number of data points to return (max 1000). Default: 100 Returns: str: String containing pair name and K-line data in a formatted table """ ctx.info(f"Fetching K-line data for {chain}/{address}") # Validate inputs supported_chains = ["eth", "bsc", "solana"] if chain not in supported_chains: raise ValueError(f"Unsupported chain. Must be one of: {', '.join(supported_chains)}") # Get best pool pool_data = await get_best_pool(chain, address) pool_address = pool_data["attributes"]["address"] # Fetch K-line data kline_response = await get_kline_data( chain, pool_address, timeframe, end_time, limit ) # Format output pair = f"{pool_data['attributes']['name'].split(' ')[0]}/{pool_data['attributes']['name'].split(' ')[2]}" kline_data = kline_response["data"]["attributes"]["ohlcv_list"] # Convert K-line data to table headers = ["Timestamp", "Open", "High", "Low", "Close", "Volume"] table_data = [ [ datetime.fromtimestamp(row[0]).strftime("%Y-%m-%d %H:%M:%S"), f"{row[1]:.8f}", f"{row[2]:.8f}", f"{row[3]:.8f}", f"{row[4]:.8f}", f"{row[5]:.8f}" ] for row in kline_data ] table = tabulate(table_data, headers=headers, tablefmt="grid") ctx.info(f"Successfully fetched {len(kline_data)} K-line data points") return f"# Pair: {pair}\n\n{table}" if __name__ == "__main__": mcp.run()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kukapay/dex-kline-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server