Skip to main content
Glama

get_latest_liquidations

Retrieve Binance liquidation events in a Markdown table format, sorted by timestamp in descending order. Specify the limit to control the number of events returned.

Instructions

Retrieve the latest liquidation events from Binance in a table format.

Args: limit (int): The maximum number of liquidation events to return (default: 10, max: 1000). ctx (Context, optional): The MCP context for logging and server interaction. Defaults to None. Returns: str: A Markdown table containing the latest liquidation events, sorted by timestamp in descending order.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
ctxNo
limitNo

Implementation Reference

  • main.py:88-114 (handler)
    The handler function decorated with @mcp.tool(), which defines and registers the get_latest_liquidations tool. It fetches liquidation events from the application context, sorts and limits them, then formats into a Markdown table.
    @mcp.tool() def get_latest_liquidations(limit: int = 10, ctx: Context | None = None) -> str: """Retrieve the latest liquidation events from Binance in a table format. Args: limit (int): The maximum number of liquidation events to return (default: 10, max: 1000). ctx (Context, optional): The MCP context for logging and server interaction. Defaults to None. Returns: str: A Markdown table containing the latest liquidation events, sorted by timestamp in descending order. """ app_ctx = ctx.request_context.lifespan_context if ctx else mcp.get_context().request_context.lifespan_context filtered = [vars(event) for event in app_ctx.liquidations] filtered = sorted(filtered, key=lambda x: x['timestamp'], reverse=True)[:min(limit, MAX_LIQUIDATIONS)] if ctx: ctx.info(f"Retrieved {len(filtered)} latest liquidation events") # Generate Markdown table if not filtered: return "No liquidation events available." table = "| Symbol | Side | Price | Quantity | Time |\n" table += "|--------|------|-------|----------|------|\n" for event in filtered: dt = datetime.fromtimestamp(event['timestamp'] / 1000.0) time_str = dt.strftime("%H:%M:%S") table += f"| {event['symbol']} | {event['side']} | {event['price']} | {event['quantity']} | {time_str} |\n" return table
  • main.py:52-79 (helper)
    Helper function that listens to the Binance WebSocket stream for force liquidation orders and populates the liquidations list in the app context, which is used by the tool.
    async def listen_binance_liquidations(ctx: AppContext): """Listen for liquidation events from Binance WebSocket""" if not ctx.binance_ws: return try: async for message in ctx.binance_ws: data = json.loads(message) event_data = data.get('o', {}) event = LiquidationEvent( symbol=event_data.get('s', ''), side=event_data.get('S', ''), price=float(event_data.get('p', 0)), quantity=float(event_data.get('q', 0)), timestamp=int(event_data.get('T', 0)) ) ctx.liquidations.append(event) # Maintain max 1000 events if len(ctx.liquidations) > MAX_LIQUIDATIONS: ctx.liquidations.pop(0) except Exception as e: ctx.liquidations.append(LiquidationEvent( symbol="ERROR", side="NONE", price=0.0, quantity=0.0, timestamp=int(asyncio.get_event_loop().time() * 1000), ))
  • main.py:17-23 (helper)
    Dataclass defining the structure of a liquidation event, used to store parsed WebSocket data.
    @dataclass class LiquidationEvent: symbol: str side: str # BUY or SELL price: float quantity: float timestamp: int

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kukapay/crypto-liquidations-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server