get_latest_liquidations
Retrieve Binance liquidation events in a table format to monitor market volatility and risk exposure.
Instructions
Retrieve the latest liquidation events from Binance in a table format.
Args:
limit (int): The maximum number of liquidation events to return (default: 10, max: 1000).
ctx (Context, optional): The MCP context for logging and server interaction. Defaults to None.
Returns:
str: A Markdown table containing the latest liquidation events, sorted by timestamp in descending order.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| limit | No | ||
| ctx | No |
Implementation Reference
- main.py:88-114 (handler)The handler function for the 'get_latest_liquidations' tool, decorated with @mcp.tool() for registration. It extracts liquidation events from the application lifespan context, sorts them by timestamp descending, limits to the specified number, logs the count if context provided, and formats the data into a Markdown table.@mcp.tool() def get_latest_liquidations(limit: int = 10, ctx: Context | None = None) -> str: """Retrieve the latest liquidation events from Binance in a table format. Args: limit (int): The maximum number of liquidation events to return (default: 10, max: 1000). ctx (Context, optional): The MCP context for logging and server interaction. Defaults to None. Returns: str: A Markdown table containing the latest liquidation events, sorted by timestamp in descending order. """ app_ctx = ctx.request_context.lifespan_context if ctx else mcp.get_context().request_context.lifespan_context filtered = [vars(event) for event in app_ctx.liquidations] filtered = sorted(filtered, key=lambda x: x['timestamp'], reverse=True)[:min(limit, MAX_LIQUIDATIONS)] if ctx: ctx.info(f"Retrieved {len(filtered)} latest liquidation events") # Generate Markdown table if not filtered: return "No liquidation events available." table = "| Symbol | Side | Price | Quantity | Time |\n" table += "|--------|------|-------|----------|------|\n" for event in filtered: dt = datetime.fromtimestamp(event['timestamp'] / 1000.0) time_str = dt.strftime("%H:%M:%S") table += f"| {event['symbol']} | {event['side']} | {event['price']} | {event['quantity']} | {time_str} |\n" return table
- main.py:17-23 (helper)Dataclass defining the structure for individual liquidation events stored in the application context, used by the tool indirectly.@dataclass class LiquidationEvent: symbol: str side: str # BUY or SELL price: float quantity: float timestamp: int
- main.py:52-78 (helper)Helper function that listens to the Binance WebSocket stream for force liquidation orders and populates the liquidations list in the app context, providing the data source for the tool.async def listen_binance_liquidations(ctx: AppContext): """Listen for liquidation events from Binance WebSocket""" if not ctx.binance_ws: return try: async for message in ctx.binance_ws: data = json.loads(message) event_data = data.get('o', {}) event = LiquidationEvent( symbol=event_data.get('s', ''), side=event_data.get('S', ''), price=float(event_data.get('p', 0)), quantity=float(event_data.get('q', 0)), timestamp=int(event_data.get('T', 0)) ) ctx.liquidations.append(event) # Maintain max 1000 events if len(ctx.liquidations) > MAX_LIQUIDATIONS: ctx.liquidations.pop(0) except Exception as e: ctx.liquidations.append(LiquidationEvent( symbol="ERROR", side="NONE", price=0.0, quantity=0.0, timestamp=int(asyncio.get_event_loop().time() * 1000), ))