get_latest_liquidations
Retrieve Binance liquidation events in a Markdown table format, sorted by timestamp in descending order. Specify the limit to control the number of events returned.
Instructions
Retrieve the latest liquidation events from Binance in a table format.
Args:
limit (int): The maximum number of liquidation events to return (default: 10, max: 1000).
ctx (Context, optional): The MCP context for logging and server interaction. Defaults to None.
Returns:
str: A Markdown table containing the latest liquidation events, sorted by timestamp in descending order.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| ctx | No | ||
| limit | No |
Implementation Reference
- main.py:88-114 (handler)The handler function decorated with @mcp.tool(), which defines and registers the get_latest_liquidations tool. It fetches liquidation events from the application context, sorts and limits them, then formats into a Markdown table.@mcp.tool() def get_latest_liquidations(limit: int = 10, ctx: Context | None = None) -> str: """Retrieve the latest liquidation events from Binance in a table format. Args: limit (int): The maximum number of liquidation events to return (default: 10, max: 1000). ctx (Context, optional): The MCP context for logging and server interaction. Defaults to None. Returns: str: A Markdown table containing the latest liquidation events, sorted by timestamp in descending order. """ app_ctx = ctx.request_context.lifespan_context if ctx else mcp.get_context().request_context.lifespan_context filtered = [vars(event) for event in app_ctx.liquidations] filtered = sorted(filtered, key=lambda x: x['timestamp'], reverse=True)[:min(limit, MAX_LIQUIDATIONS)] if ctx: ctx.info(f"Retrieved {len(filtered)} latest liquidation events") # Generate Markdown table if not filtered: return "No liquidation events available." table = "| Symbol | Side | Price | Quantity | Time |\n" table += "|--------|------|-------|----------|------|\n" for event in filtered: dt = datetime.fromtimestamp(event['timestamp'] / 1000.0) time_str = dt.strftime("%H:%M:%S") table += f"| {event['symbol']} | {event['side']} | {event['price']} | {event['quantity']} | {time_str} |\n" return table
- main.py:52-79 (helper)Helper function that listens to the Binance WebSocket stream for force liquidation orders and populates the liquidations list in the app context, which is used by the tool.async def listen_binance_liquidations(ctx: AppContext): """Listen for liquidation events from Binance WebSocket""" if not ctx.binance_ws: return try: async for message in ctx.binance_ws: data = json.loads(message) event_data = data.get('o', {}) event = LiquidationEvent( symbol=event_data.get('s', ''), side=event_data.get('S', ''), price=float(event_data.get('p', 0)), quantity=float(event_data.get('q', 0)), timestamp=int(event_data.get('T', 0)) ) ctx.liquidations.append(event) # Maintain max 1000 events if len(ctx.liquidations) > MAX_LIQUIDATIONS: ctx.liquidations.pop(0) except Exception as e: ctx.liquidations.append(LiquidationEvent( symbol="ERROR", side="NONE", price=0.0, quantity=0.0, timestamp=int(asyncio.get_event_loop().time() * 1000), ))
- main.py:17-23 (helper)Dataclass defining the structure of a liquidation event, used to store parsed WebSocket data.@dataclass class LiquidationEvent: symbol: str side: str # BUY or SELL price: float quantity: float timestamp: int