Skip to main content
Glama

get_kline

Fetch K-line candlestick data for tokens on various blockchains to analyze price movements and trends in decentralized exchanges.

Instructions

Fetch K-line data for a specified token on a given chain and return it as a formatted table.

Parameters:
    chain (str): Blockchain network (e.g., 'eth', 'bsc', 'solana')
    address (str): Token contract address
    timeframe (str): K-line timeframe (e.g., '1m', '5m', '15m', '1h', '4h', '12h', '1d'). Default: '1n'
    end_time (str, optional): ISO 8601 timestamp for data end time (e.g., '2025-07-03T02:14:00Z'). Default: current UTC time
    limit (int): Number of data points to return (max 1000). Default: 100

Returns:
    str: String containing pair name and K-line data in a formatted table

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
chainYes
addressYes
timeframeNo1m
end_timeNo
limitNo

Output Schema

TableJSON Schema
NameRequiredDescriptionDefault
resultYes

Implementation Reference

  • main.py:73-129 (handler)
    Main handler function for the 'get_kline' tool, decorated with @mcp.tool() for registration, handles input validation, fetches best pool and kline data, formats output as table.
    @mcp.tool()
    async def get_kline(chain: str, address: str, timeframe: str = "1m", end_time: Optional[str] = None, limit: int = 100, ctx: Context = None) -> str:
        """
        Fetch K-line data for a specified token on a given chain and return it as a formatted table.
        
        Parameters:
            chain (str): Blockchain network (e.g., 'eth', 'bsc', 'solana')
            address (str): Token contract address
            timeframe (str): K-line timeframe (e.g., '1m', '5m', '15m', '1h', '4h', '12h', '1d'). Default: '1n'
            end_time (str, optional): ISO 8601 timestamp for data end time (e.g., '2025-07-03T02:14:00Z'). Default: current UTC time
            limit (int): Number of data points to return (max 1000). Default: 100
        
        Returns:
            str: String containing pair name and K-line data in a formatted table
        """
        ctx.info(f"Fetching K-line data for {chain}/{address}")
        
        # Validate inputs
        supported_chains = ["eth", "bsc", "solana"]
        if chain not in supported_chains:
            raise ValueError(f"Unsupported chain. Must be one of: {', '.join(supported_chains)}")
        
        # Get best pool
        pool_data = await get_best_pool(chain, address)
        pool_address = pool_data["attributes"]["address"]
        
        # Fetch K-line data
        kline_response = await get_kline_data(
            chain,
            pool_address,
            timeframe,
            end_time,
            limit
        )
        
        # Format output
        pair = f"{pool_data['attributes']['name'].split(' ')[0]}/{pool_data['attributes']['name'].split(' ')[2]}"
        kline_data = kline_response["data"]["attributes"]["ohlcv_list"]
        
        # Convert K-line data to table
        headers = ["Timestamp", "Open", "High", "Low", "Close", "Volume"]
        table_data = [
            [
                datetime.fromtimestamp(row[0]).strftime("%Y-%m-%d %H:%M:%S"),
                f"{row[1]:.8f}",
                f"{row[2]:.8f}",
                f"{row[3]:.8f}",
                f"{row[4]:.8f}",
                f"{row[5]:.8f}"
            ]
            for row in kline_data
        ]
        table = tabulate(table_data, headers=headers, tablefmt="grid")
        
        ctx.info(f"Successfully fetched {len(kline_data)} K-line data points")
        
        return f"# Pair: {pair}\n\n{table}"
  • main.py:30-72 (helper)
    Helper function to fetch raw OHLCV (kline) data from Geckoterminal API for a specific pool.
    async def get_kline_data(chain: str, pool_address: str, timeframe: str, end_time: Optional[str], limit: int) -> Dict[str, Any]:
        """Fetch K-line data for a specific pool"""
        # Map timeframe to API aggregate parameter and endpoint
        timeframe_map = {
            "1m": {"aggregate": "1", "endpoint": "minute"},
            "5m": {"aggregate": "5", "endpoint": "minute"},
            "15m": {"aggregate": "15", "endpoint": "minute"},
            "1h": {"aggregate": "1", "endpoint": "hour"},
            "4h": {"aggregate": "4", "endpoint": "hour"},
            "12h": {"aggregate": "12", "endpoint": "hour"},
            "1d": {"aggregate": "1", "endpoint": "day"}
        }
        
        if timeframe not in timeframe_map:
            raise ValueError(f"Invalid timeframe. Must be one of: {', '.join(timeframe_map.keys())}")
        
        if limit > 1000:
            raise ValueError("Limit cannot exceed 1000")
        
        params = {
            "aggregate": timeframe_map[timeframe]["aggregate"],
            "limit": str(min(limit, 1000))
        }
        
        if end_time:
            try:
                datetime.fromisoformat(end_time.replace("Z", "+00:00"))
                params["before_timestamp"] = str(int(datetime.fromisoformat(end_time.replace("Z", "+00:00")).timestamp()))
            except ValueError:
                raise ValueError("Invalid end_time format. Must be ISO 8601 string")
        else:
            # Use current UTC time as default end_time
            params["before_timestamp"] = str(int(datetime.utcnow().timestamp()))
        
        async with httpx.AsyncClient() as client:
            url = f"{BASE_URL}/networks/{chain}/pools/{pool_address}/ohlcv/{timeframe_map[timeframe]['endpoint']}"
            try:
                response = await client.get(url, params=params)
                response.raise_for_status()
                return response.json()
            except httpx.HTTPError as e:
                raise ValueError(f"Failed to fetch K-line data: {str(e)}")
  • main.py:14-28 (helper)
    Helper function to find the pool with the highest liquidity for a given token address on the chain.
    async def get_best_pool(chain: str, token_address: str) -> Dict[str, Any]:
        """Fetch the pool with highest liquidity for a given token"""
        async with httpx.AsyncClient() as client:
            url = f"{BASE_URL}/networks/{chain}/tokens/{token_address}/pools"
            try:
                response = await client.get(url)
                response.raise_for_status()
                pools = response.json().get("data", [])
                if not pools:
                    raise ValueError("No pools found for the specified token")
                # Sort pools by reserve_in_usd (highest liquidity first)
                pools.sort(key=lambda x: float(x["attributes"]["reserve_in_usd"]), reverse=True)
                return pools[0]
            except httpx.HTTPError as e:
                raise ValueError(f"Failed to fetch pools: {str(e)}")
  • main.py:75-87 (schema)
    Docstring providing input parameters schema and description for the get_kline tool.
    """
    Fetch K-line data for a specified token on a given chain and return it as a formatted table.
    
    Parameters:
        chain (str): Blockchain network (e.g., 'eth', 'bsc', 'solana')
        address (str): Token contract address
        timeframe (str): K-line timeframe (e.g., '1m', '5m', '15m', '1h', '4h', '12h', '1d'). Default: '1n'
        end_time (str, optional): ISO 8601 timestamp for data end time (e.g., '2025-07-03T02:14:00Z'). Default: current UTC time
        limit (int): Number of data points to return (max 1000). Default: 100
    
    Returns:
        str: String containing pair name and K-line data in a formatted table
    """

Tool Definition Quality

Score is being calculated. Check back soon.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kukapay/dex-kline-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server