Skip to main content
Glama

get_kline

Fetch K-line candlestick data for tokens on various blockchains to analyze price movements and trends in decentralized exchanges.

Instructions

Fetch K-line data for a specified token on a given chain and return it as a formatted table.

Parameters:
    chain (str): Blockchain network (e.g., 'eth', 'bsc', 'solana')
    address (str): Token contract address
    timeframe (str): K-line timeframe (e.g., '1m', '5m', '15m', '1h', '4h', '12h', '1d'). Default: '1n'
    end_time (str, optional): ISO 8601 timestamp for data end time (e.g., '2025-07-03T02:14:00Z'). Default: current UTC time
    limit (int): Number of data points to return (max 1000). Default: 100

Returns:
    str: String containing pair name and K-line data in a formatted table

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
chainYes
addressYes
timeframeNo1m
end_timeNo
limitNo

Implementation Reference

  • main.py:73-129 (handler)
    Main handler function for the 'get_kline' tool, decorated with @mcp.tool() for registration, handles input validation, fetches best pool and kline data, formats output as table.
    @mcp.tool()
    async def get_kline(chain: str, address: str, timeframe: str = "1m", end_time: Optional[str] = None, limit: int = 100, ctx: Context = None) -> str:
        """
        Fetch K-line data for a specified token on a given chain and return it as a formatted table.
        
        Parameters:
            chain (str): Blockchain network (e.g., 'eth', 'bsc', 'solana')
            address (str): Token contract address
            timeframe (str): K-line timeframe (e.g., '1m', '5m', '15m', '1h', '4h', '12h', '1d'). Default: '1n'
            end_time (str, optional): ISO 8601 timestamp for data end time (e.g., '2025-07-03T02:14:00Z'). Default: current UTC time
            limit (int): Number of data points to return (max 1000). Default: 100
        
        Returns:
            str: String containing pair name and K-line data in a formatted table
        """
        ctx.info(f"Fetching K-line data for {chain}/{address}")
        
        # Validate inputs
        supported_chains = ["eth", "bsc", "solana"]
        if chain not in supported_chains:
            raise ValueError(f"Unsupported chain. Must be one of: {', '.join(supported_chains)}")
        
        # Get best pool
        pool_data = await get_best_pool(chain, address)
        pool_address = pool_data["attributes"]["address"]
        
        # Fetch K-line data
        kline_response = await get_kline_data(
            chain,
            pool_address,
            timeframe,
            end_time,
            limit
        )
        
        # Format output
        pair = f"{pool_data['attributes']['name'].split(' ')[0]}/{pool_data['attributes']['name'].split(' ')[2]}"
        kline_data = kline_response["data"]["attributes"]["ohlcv_list"]
        
        # Convert K-line data to table
        headers = ["Timestamp", "Open", "High", "Low", "Close", "Volume"]
        table_data = [
            [
                datetime.fromtimestamp(row[0]).strftime("%Y-%m-%d %H:%M:%S"),
                f"{row[1]:.8f}",
                f"{row[2]:.8f}",
                f"{row[3]:.8f}",
                f"{row[4]:.8f}",
                f"{row[5]:.8f}"
            ]
            for row in kline_data
        ]
        table = tabulate(table_data, headers=headers, tablefmt="grid")
        
        ctx.info(f"Successfully fetched {len(kline_data)} K-line data points")
        
        return f"# Pair: {pair}\n\n{table}"
  • main.py:30-72 (helper)
    Helper function to fetch raw OHLCV (kline) data from Geckoterminal API for a specific pool.
    async def get_kline_data(chain: str, pool_address: str, timeframe: str, end_time: Optional[str], limit: int) -> Dict[str, Any]:
        """Fetch K-line data for a specific pool"""
        # Map timeframe to API aggregate parameter and endpoint
        timeframe_map = {
            "1m": {"aggregate": "1", "endpoint": "minute"},
            "5m": {"aggregate": "5", "endpoint": "minute"},
            "15m": {"aggregate": "15", "endpoint": "minute"},
            "1h": {"aggregate": "1", "endpoint": "hour"},
            "4h": {"aggregate": "4", "endpoint": "hour"},
            "12h": {"aggregate": "12", "endpoint": "hour"},
            "1d": {"aggregate": "1", "endpoint": "day"}
        }
        
        if timeframe not in timeframe_map:
            raise ValueError(f"Invalid timeframe. Must be one of: {', '.join(timeframe_map.keys())}")
        
        if limit > 1000:
            raise ValueError("Limit cannot exceed 1000")
        
        params = {
            "aggregate": timeframe_map[timeframe]["aggregate"],
            "limit": str(min(limit, 1000))
        }
        
        if end_time:
            try:
                datetime.fromisoformat(end_time.replace("Z", "+00:00"))
                params["before_timestamp"] = str(int(datetime.fromisoformat(end_time.replace("Z", "+00:00")).timestamp()))
            except ValueError:
                raise ValueError("Invalid end_time format. Must be ISO 8601 string")
        else:
            # Use current UTC time as default end_time
            params["before_timestamp"] = str(int(datetime.utcnow().timestamp()))
        
        async with httpx.AsyncClient() as client:
            url = f"{BASE_URL}/networks/{chain}/pools/{pool_address}/ohlcv/{timeframe_map[timeframe]['endpoint']}"
            try:
                response = await client.get(url, params=params)
                response.raise_for_status()
                return response.json()
            except httpx.HTTPError as e:
                raise ValueError(f"Failed to fetch K-line data: {str(e)}")
  • main.py:14-28 (helper)
    Helper function to find the pool with the highest liquidity for a given token address on the chain.
    async def get_best_pool(chain: str, token_address: str) -> Dict[str, Any]:
        """Fetch the pool with highest liquidity for a given token"""
        async with httpx.AsyncClient() as client:
            url = f"{BASE_URL}/networks/{chain}/tokens/{token_address}/pools"
            try:
                response = await client.get(url)
                response.raise_for_status()
                pools = response.json().get("data", [])
                if not pools:
                    raise ValueError("No pools found for the specified token")
                # Sort pools by reserve_in_usd (highest liquidity first)
                pools.sort(key=lambda x: float(x["attributes"]["reserve_in_usd"]), reverse=True)
                return pools[0]
            except httpx.HTTPError as e:
                raise ValueError(f"Failed to fetch pools: {str(e)}")
  • main.py:75-87 (schema)
    Docstring providing input parameters schema and description for the get_kline tool.
    """
    Fetch K-line data for a specified token on a given chain and return it as a formatted table.
    
    Parameters:
        chain (str): Blockchain network (e.g., 'eth', 'bsc', 'solana')
        address (str): Token contract address
        timeframe (str): K-line timeframe (e.g., '1m', '5m', '15m', '1h', '4h', '12h', '1d'). Default: '1n'
        end_time (str, optional): ISO 8601 timestamp for data end time (e.g., '2025-07-03T02:14:00Z'). Default: current UTC time
        limit (int): Number of data points to return (max 1000). Default: 100
    
    Returns:
        str: String containing pair name and K-line data in a formatted table
    """
Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kukapay/dex-kline-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server