get_kline
Fetch K-line (candlestick) data for tokens on supported blockchain chains using token address, timeframe, and limit parameters. Returns formatted table for analysis.
Instructions
Fetch K-line data for a specified token on a given chain and return it as a formatted table.
Parameters:
chain (str): Blockchain network (e.g., 'eth', 'bsc', 'solana')
address (str): Token contract address
timeframe (str): K-line timeframe (e.g., '1m', '5m', '15m', '1h', '4h', '12h', '1d'). Default: '1n'
end_time (str, optional): ISO 8601 timestamp for data end time (e.g., '2025-07-03T02:14:00Z'). Default: current UTC time
limit (int): Number of data points to return (max 1000). Default: 100
Returns:
str: String containing pair name and K-line data in a formatted table
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| address | Yes | ||
| chain | Yes | ||
| end_time | No | ||
| limit | No | ||
| timeframe | No | 1m |
Implementation Reference
- main.py:73-129 (handler)The handler function for the 'get_kline' MCP tool, decorated with @mcp.tool(). It validates inputs, finds the best pool, fetches K-line data, formats it into a table using tabulate, and returns a formatted string.@mcp.tool() async def get_kline(chain: str, address: str, timeframe: str = "1m", end_time: Optional[str] = None, limit: int = 100, ctx: Context = None) -> str: """ Fetch K-line data for a specified token on a given chain and return it as a formatted table. Parameters: chain (str): Blockchain network (e.g., 'eth', 'bsc', 'solana') address (str): Token contract address timeframe (str): K-line timeframe (e.g., '1m', '5m', '15m', '1h', '4h', '12h', '1d'). Default: '1n' end_time (str, optional): ISO 8601 timestamp for data end time (e.g., '2025-07-03T02:14:00Z'). Default: current UTC time limit (int): Number of data points to return (max 1000). Default: 100 Returns: str: String containing pair name and K-line data in a formatted table """ ctx.info(f"Fetching K-line data for {chain}/{address}") # Validate inputs supported_chains = ["eth", "bsc", "solana"] if chain not in supported_chains: raise ValueError(f"Unsupported chain. Must be one of: {', '.join(supported_chains)}") # Get best pool pool_data = await get_best_pool(chain, address) pool_address = pool_data["attributes"]["address"] # Fetch K-line data kline_response = await get_kline_data( chain, pool_address, timeframe, end_time, limit ) # Format output pair = f"{pool_data['attributes']['name'].split(' ')[0]}/{pool_data['attributes']['name'].split(' ')[2]}" kline_data = kline_response["data"]["attributes"]["ohlcv_list"] # Convert K-line data to table headers = ["Timestamp", "Open", "High", "Low", "Close", "Volume"] table_data = [ [ datetime.fromtimestamp(row[0]).strftime("%Y-%m-%d %H:%M:%S"), f"{row[1]:.8f}", f"{row[2]:.8f}", f"{row[3]:.8f}", f"{row[4]:.8f}", f"{row[5]:.8f}" ] for row in kline_data ] table = tabulate(table_data, headers=headers, tablefmt="grid") ctx.info(f"Successfully fetched {len(kline_data)} K-line data points") return f"# Pair: {pair}\n\n{table}"
- main.py:75-87 (schema)Docstring providing the input schema (parameters with types and descriptions) and output description for the get_kline tool.""" Fetch K-line data for a specified token on a given chain and return it as a formatted table. Parameters: chain (str): Blockchain network (e.g., 'eth', 'bsc', 'solana') address (str): Token contract address timeframe (str): K-line timeframe (e.g., '1m', '5m', '15m', '1h', '4h', '12h', '1d'). Default: '1n' end_time (str, optional): ISO 8601 timestamp for data end time (e.g., '2025-07-03T02:14:00Z'). Default: current UTC time limit (int): Number of data points to return (max 1000). Default: 100 Returns: str: String containing pair name and K-line data in a formatted table """
- main.py:30-72 (helper)Helper function to fetch raw OHLCV (K-line) data from the GeckoTerminal API for a specific pool address, handling timeframe mapping, parameters, and API call.async def get_kline_data(chain: str, pool_address: str, timeframe: str, end_time: Optional[str], limit: int) -> Dict[str, Any]: """Fetch K-line data for a specific pool""" # Map timeframe to API aggregate parameter and endpoint timeframe_map = { "1m": {"aggregate": "1", "endpoint": "minute"}, "5m": {"aggregate": "5", "endpoint": "minute"}, "15m": {"aggregate": "15", "endpoint": "minute"}, "1h": {"aggregate": "1", "endpoint": "hour"}, "4h": {"aggregate": "4", "endpoint": "hour"}, "12h": {"aggregate": "12", "endpoint": "hour"}, "1d": {"aggregate": "1", "endpoint": "day"} } if timeframe not in timeframe_map: raise ValueError(f"Invalid timeframe. Must be one of: {', '.join(timeframe_map.keys())}") if limit > 1000: raise ValueError("Limit cannot exceed 1000") params = { "aggregate": timeframe_map[timeframe]["aggregate"], "limit": str(min(limit, 1000)) } if end_time: try: datetime.fromisoformat(end_time.replace("Z", "+00:00")) params["before_timestamp"] = str(int(datetime.fromisoformat(end_time.replace("Z", "+00:00")).timestamp())) except ValueError: raise ValueError("Invalid end_time format. Must be ISO 8601 string") else: # Use current UTC time as default end_time params["before_timestamp"] = str(int(datetime.utcnow().timestamp())) async with httpx.AsyncClient() as client: url = f"{BASE_URL}/networks/{chain}/pools/{pool_address}/ohlcv/{timeframe_map[timeframe]['endpoint']}" try: response = await client.get(url, params=params) response.raise_for_status() return response.json() except httpx.HTTPError as e: raise ValueError(f"Failed to fetch K-line data: {str(e)}")
- main.py:14-29 (helper)Helper function to find and return the pool with the highest liquidity (reserve_in_usd) for a given token address on the specified chain.async def get_best_pool(chain: str, token_address: str) -> Dict[str, Any]: """Fetch the pool with highest liquidity for a given token""" async with httpx.AsyncClient() as client: url = f"{BASE_URL}/networks/{chain}/tokens/{token_address}/pools" try: response = await client.get(url) response.raise_for_status() pools = response.json().get("data", []) if not pools: raise ValueError("No pools found for the specified token") # Sort pools by reserve_in_usd (highest liquidity first) pools.sort(key=lambda x: float(x["attributes"]["reserve_in_usd"]), reverse=True) return pools[0] except httpx.HTTPError as e: raise ValueError(f"Failed to fetch pools: {str(e)}")