Skip to main content
Glama

get_latest_liquidations

Retrieve Binance liquidation events in a table format to monitor market volatility and risk exposure.

Instructions

Retrieve the latest liquidation events from Binance in a table format.

Args: limit (int): The maximum number of liquidation events to return (default: 10, max: 1000). ctx (Context, optional): The MCP context for logging and server interaction. Defaults to None. Returns: str: A Markdown table containing the latest liquidation events, sorted by timestamp in descending order.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
limitNo
ctxNo

Implementation Reference

  • main.py:88-114 (handler)
    The handler function for the 'get_latest_liquidations' tool, decorated with @mcp.tool() for registration. It extracts liquidation events from the application lifespan context, sorts them by timestamp descending, limits to the specified number, logs the count if context provided, and formats the data into a Markdown table.
    @mcp.tool() def get_latest_liquidations(limit: int = 10, ctx: Context | None = None) -> str: """Retrieve the latest liquidation events from Binance in a table format. Args: limit (int): The maximum number of liquidation events to return (default: 10, max: 1000). ctx (Context, optional): The MCP context for logging and server interaction. Defaults to None. Returns: str: A Markdown table containing the latest liquidation events, sorted by timestamp in descending order. """ app_ctx = ctx.request_context.lifespan_context if ctx else mcp.get_context().request_context.lifespan_context filtered = [vars(event) for event in app_ctx.liquidations] filtered = sorted(filtered, key=lambda x: x['timestamp'], reverse=True)[:min(limit, MAX_LIQUIDATIONS)] if ctx: ctx.info(f"Retrieved {len(filtered)} latest liquidation events") # Generate Markdown table if not filtered: return "No liquidation events available." table = "| Symbol | Side | Price | Quantity | Time |\n" table += "|--------|------|-------|----------|------|\n" for event in filtered: dt = datetime.fromtimestamp(event['timestamp'] / 1000.0) time_str = dt.strftime("%H:%M:%S") table += f"| {event['symbol']} | {event['side']} | {event['price']} | {event['quantity']} | {time_str} |\n" return table
  • main.py:17-23 (helper)
    Dataclass defining the structure for individual liquidation events stored in the application context, used by the tool indirectly.
    @dataclass class LiquidationEvent: symbol: str side: str # BUY or SELL price: float quantity: float timestamp: int
  • main.py:52-78 (helper)
    Helper function that listens to the Binance WebSocket stream for force liquidation orders and populates the liquidations list in the app context, providing the data source for the tool.
    async def listen_binance_liquidations(ctx: AppContext): """Listen for liquidation events from Binance WebSocket""" if not ctx.binance_ws: return try: async for message in ctx.binance_ws: data = json.loads(message) event_data = data.get('o', {}) event = LiquidationEvent( symbol=event_data.get('s', ''), side=event_data.get('S', ''), price=float(event_data.get('p', 0)), quantity=float(event_data.get('q', 0)), timestamp=int(event_data.get('T', 0)) ) ctx.liquidations.append(event) # Maintain max 1000 events if len(ctx.liquidations) > MAX_LIQUIDATIONS: ctx.liquidations.pop(0) except Exception as e: ctx.liquidations.append(LiquidationEvent( symbol="ERROR", side="NONE", price=0.0, quantity=0.0, timestamp=int(asyncio.get_event_loop().time() * 1000), ))

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kukapay/crypto-liquidations-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server