Skip to main content
Glama
trading_tools.py33.5 kB
""" Hyperliquid Trading Operations Module This module provides a comprehensive set of trading tools for interacting with the Hyperliquid DEX, including order placement, modification, cancellation, leverage management, and advanced order types. Author: Backend Developer Date: 2025-11-09 """ from typing import Dict, List, Optional, Any, Union import time from datetime import datetime, timedelta class TradingTools: """ TradingTools provides methods for executing trading operations on Hyperliquid. This class wraps the Hyperliquid SDK exchange and info clients to provide a clean, type-safe interface for trading operations including: - Single and batch order placement - Order cancellation and modification - TWAP (Time-Weighted Average Price) orders - Leverage and margin management - Dead man's switch for risk management """ def __init__(self, exchange_client, info_client, account_address: str): """ Initialize TradingTools with Hyperliquid SDK clients. Args: exchange_client: Hyperliquid Exchange client instance info_client: Hyperliquid Info client instance account_address: Ethereum address of the trading account """ self.exchange = exchange_client self.info = info_client self.account = account_address async def place_order( self, coin: str, is_buy: bool, size: float, price: float, order_type: str = "limit", tif: str = "Gtc", reduce_only: bool = False, cloid: Optional[str] = None ) -> Dict[str, Any]: """ Place a single order on Hyperliquid. Args: coin: Trading pair symbol (e.g., "BTC", "ETH") is_buy: True for buy order, False for sell order size: Order size in base currency units price: Limit price (ignored for market orders) order_type: "limit" or "market" tif: Time in force - "Gtc" (Good til Cancel), "Ioc" (Immediate or Cancel), "Alo" (Add Liquidity Only) reduce_only: If True, order can only reduce existing position cloid: Client order ID for tracking (optional) Returns: Dict containing: - success: bool - order_id: int (if successful) - status: str - response: dict (full exchange response) - error: str (if failed) Raises: ValueError: If input validation fails """ try: # Input validation if size <= 0: raise ValueError(f"Order size must be positive, got {size}") if order_type not in ["limit", "market"]: raise ValueError(f"Invalid order_type: {order_type}. Must be 'limit' or 'market'") if tif not in ["Gtc", "Ioc", "Alo"]: raise ValueError(f"Invalid time in force: {tif}. Must be 'Gtc', 'Ioc', or 'Alo'") # Prepare order parameters order_params = { "coin": coin, "is_buy": is_buy, "sz": size, "limit_px": price, "order_type": {"limit": "limit", "market": "market"}[order_type], "reduce_only": reduce_only } # Add time in force for limit orders if order_type == "limit": order_params["tif"] = tif # Add client order ID if provided if cloid: order_params["cloid"] = cloid # Execute order response = self.exchange.order( coin=coin, is_buy=is_buy, sz=size, limit_px=price, order_type=order_params["order_type"], reduce_only=reduce_only, **({ "tif": tif } if order_type == "limit" else {}), **({ "cloid": cloid } if cloid else {}) ) # Parse response if response and response.get("status") == "ok": result = response.get("response", {}) return { "success": True, "order_id": result.get("data", {}).get("statuses", [{}])[0].get("resting", {}).get("oid"), "status": "placed", "coin": coin, "side": "buy" if is_buy else "sell", "size": size, "price": price, "order_type": order_type, "tif": tif, "reduce_only": reduce_only, "cloid": cloid, "timestamp": datetime.utcnow().isoformat(), "response": response } else: return { "success": False, "error": response.get("response", "Unknown error"), "coin": coin, "timestamp": datetime.utcnow().isoformat() } except ValueError as ve: return { "success": False, "error": f"Validation error: {str(ve)}", "coin": coin, "timestamp": datetime.utcnow().isoformat() } except Exception as e: return { "success": False, "error": f"Order placement failed: {str(e)}", "coin": coin, "timestamp": datetime.utcnow().isoformat() } async def place_batch_orders(self, orders: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Place multiple orders in a single batch request. Args: orders: List of order dictionaries, each containing: - coin: str - is_buy: bool - size: float - price: float - order_type: str (optional, default "limit") - tif: str (optional, default "Gtc") - reduce_only: bool (optional, default False) - cloid: str (optional) Returns: List of result dictionaries, one per order, each containing: - success: bool - order_id: int (if successful) - status: str - error: str (if failed) - order_index: int (position in batch) Example: orders = [ {"coin": "BTC", "is_buy": True, "size": 0.1, "price": 50000}, {"coin": "ETH", "is_buy": False, "size": 1.0, "price": 3000} ] results = await place_batch_orders(orders) """ try: if not orders: return [{ "success": False, "error": "No orders provided", "timestamp": datetime.utcnow().isoformat() }] # Validate all orders first validated_orders = [] for idx, order in enumerate(orders): try: # Extract parameters with defaults coin = order.get("coin") is_buy = order.get("is_buy") size = order.get("size") price = order.get("price") if not all([coin, is_buy is not None, size, price]): raise ValueError("Missing required fields: coin, is_buy, size, price") order_type = order.get("order_type", "limit") tif = order.get("tif", "Gtc") reduce_only = order.get("reduce_only", False) cloid = order.get("cloid") # Validate if size <= 0: raise ValueError(f"Order size must be positive, got {size}") validated_order = { "coin": coin, "is_buy": is_buy, "sz": size, "limit_px": price, "order_type": order_type, "reduce_only": reduce_only, "order_index": idx } if order_type == "limit": validated_order["tif"] = tif if cloid: validated_order["cloid"] = cloid validated_orders.append(validated_order) except Exception as e: validated_orders.append({ "error": str(e), "order_index": idx, "success": False }) # Execute batch order batch_response = self.exchange.bulk_orders(validated_orders) # Parse results results = [] if batch_response and batch_response.get("status") == "ok": response_data = batch_response.get("response", {}).get("data", {}) statuses = response_data.get("statuses", []) for idx, (order, status) in enumerate(zip(orders, statuses)): if status.get("resting"): results.append({ "success": True, "order_id": status["resting"].get("oid"), "status": "placed", "coin": order.get("coin"), "side": "buy" if order.get("is_buy") else "sell", "size": order.get("size"), "price": order.get("price"), "order_index": idx, "timestamp": datetime.utcnow().isoformat() }) else: results.append({ "success": False, "error": status.get("error", "Order not resting"), "coin": order.get("coin"), "order_index": idx, "timestamp": datetime.utcnow().isoformat() }) else: # All orders failed for idx, order in enumerate(orders): results.append({ "success": False, "error": batch_response.get("response", "Batch order failed"), "coin": order.get("coin"), "order_index": idx, "timestamp": datetime.utcnow().isoformat() }) return results except Exception as e: return [{ "success": False, "error": f"Batch order execution failed: {str(e)}", "timestamp": datetime.utcnow().isoformat() }] async def cancel_order( self, coin: str, order_id: Optional[int] = None, cloid: Optional[str] = None ) -> Dict[str, Any]: """ Cancel a specific order by order ID or client order ID. Args: coin: Trading pair symbol order_id: Exchange order ID (optional if cloid provided) cloid: Client order ID (optional if order_id provided) Returns: Dict containing: - success: bool - order_id: int (canceled order ID) - status: str - response: dict (full exchange response) - error: str (if failed) Raises: ValueError: If neither order_id nor cloid is provided """ try: if not order_id and not cloid: raise ValueError("Must provide either order_id or cloid") # Prepare cancellation request cancel_params = {"coin": coin} if order_id: cancel_params["oid"] = order_id elif cloid: cancel_params["cloid"] = cloid # Execute cancellation response = self.exchange.cancel( coin=coin, oid=order_id if order_id else None, **({ "cloid": cloid } if cloid and not order_id else {}) ) # Parse response if response and response.get("status") == "ok": return { "success": True, "order_id": order_id, "cloid": cloid, "coin": coin, "status": "canceled", "timestamp": datetime.utcnow().isoformat(), "response": response } else: return { "success": False, "error": response.get("response", "Cancellation failed"), "order_id": order_id, "cloid": cloid, "coin": coin, "timestamp": datetime.utcnow().isoformat() } except ValueError as ve: return { "success": False, "error": f"Validation error: {str(ve)}", "coin": coin, "timestamp": datetime.utcnow().isoformat() } except Exception as e: return { "success": False, "error": f"Order cancellation failed: {str(e)}", "coin": coin, "timestamp": datetime.utcnow().isoformat() } async def cancel_all_orders(self, coin: Optional[str] = None) -> Dict[str, Any]: """ Cancel all open orders, optionally filtered by coin. Args: coin: Trading pair symbol (if None, cancels ALL orders across all coins) Returns: Dict containing: - success: bool - canceled_count: int (number of orders canceled) - orders: list (details of canceled orders) - status: str - error: str (if failed) Warning: If coin is None, this will cancel ALL open orders on the account. Use with caution. """ try: # Get all open orders open_orders_response = self.info.open_orders(self.account) if not open_orders_response: return { "success": True, "canceled_count": 0, "orders": [], "status": "no_open_orders", "coin": coin, "timestamp": datetime.utcnow().isoformat() } # Filter by coin if specified orders_to_cancel = [] for order in open_orders_response: if coin is None or order.get("coin") == coin: orders_to_cancel.append(order) if not orders_to_cancel: return { "success": True, "canceled_count": 0, "orders": [], "status": "no_matching_orders", "coin": coin, "timestamp": datetime.utcnow().isoformat() } # Cancel all matching orders canceled_orders = [] failed_cancellations = [] for order in orders_to_cancel: try: order_coin = order.get("coin") order_id = order.get("oid") cancel_response = self.exchange.cancel( coin=order_coin, oid=order_id ) if cancel_response and cancel_response.get("status") == "ok": canceled_orders.append({ "coin": order_coin, "order_id": order_id, "side": order.get("side"), "size": order.get("sz"), "price": order.get("limitPx") }) else: failed_cancellations.append({ "coin": order_coin, "order_id": order_id, "error": cancel_response.get("response", "Unknown error") }) except Exception as e: failed_cancellations.append({ "coin": order.get("coin"), "order_id": order.get("oid"), "error": str(e) }) return { "success": len(failed_cancellations) == 0, "canceled_count": len(canceled_orders), "failed_count": len(failed_cancellations), "orders": canceled_orders, "failed_orders": failed_cancellations if failed_cancellations else None, "status": "completed" if not failed_cancellations else "partial", "coin": coin, "timestamp": datetime.utcnow().isoformat() } except Exception as e: return { "success": False, "error": f"Cancel all orders failed: {str(e)}", "coin": coin, "timestamp": datetime.utcnow().isoformat() } async def modify_order( self, coin: str, order_id: int, new_price: Optional[float] = None, new_size: Optional[float] = None ) -> Dict[str, Any]: """ Modify an existing order's price and/or size. Args: coin: Trading pair symbol order_id: Exchange order ID to modify new_price: New limit price (optional) new_size: New order size (optional) Returns: Dict containing: - success: bool - order_id: int (modified order ID) - modifications: dict (what was changed) - status: str - error: str (if failed) Raises: ValueError: If neither new_price nor new_size is provided Note: On Hyperliquid, modifying an order may result in a new order ID as it could cancel and replace the original order. """ try: if new_price is None and new_size is None: raise ValueError("Must provide at least one of new_price or new_size") if new_size is not None and new_size <= 0: raise ValueError(f"New size must be positive, got {new_size}") # Prepare modification parameters modify_params = { "coin": coin, "oid": order_id } if new_price is not None: modify_params["limit_px"] = new_price if new_size is not None: modify_params["sz"] = new_size # Execute modification response = self.exchange.modify_order( coin=coin, oid=order_id, **({ "limit_px": new_price } if new_price is not None else {}), **({ "sz": new_size } if new_size is not None else {}) ) # Parse response if response and response.get("status") == "ok": modifications = {} if new_price is not None: modifications["price"] = new_price if new_size is not None: modifications["size"] = new_size return { "success": True, "order_id": order_id, "coin": coin, "modifications": modifications, "status": "modified", "timestamp": datetime.utcnow().isoformat(), "response": response } else: return { "success": False, "error": response.get("response", "Modification failed"), "order_id": order_id, "coin": coin, "timestamp": datetime.utcnow().isoformat() } except ValueError as ve: return { "success": False, "error": f"Validation error: {str(ve)}", "order_id": order_id, "coin": coin, "timestamp": datetime.utcnow().isoformat() } except Exception as e: return { "success": False, "error": f"Order modification failed: {str(e)}", "order_id": order_id, "coin": coin, "timestamp": datetime.utcnow().isoformat() } async def place_twap_order( self, coin: str, is_buy: bool, total_size: float, duration_minutes: int, randomize: bool = False ) -> Dict[str, Any]: """ Place a Time-Weighted Average Price (TWAP) order. TWAP orders split a large order into smaller chunks executed over time to minimize market impact and achieve better average execution price. Args: coin: Trading pair symbol is_buy: True for buy, False for sell total_size: Total size to execute over the duration duration_minutes: Time period to spread the order over (in minutes) randomize: If True, randomize slice timing to reduce predictability Returns: Dict containing: - success: bool - twap_id: str (TWAP order identifier) - total_size: float - duration_minutes: int - estimated_slices: int (approximate number of child orders) - status: str - error: str (if failed) Note: TWAP orders are automatically managed by the exchange. The actual number and timing of slices is determined by the exchange algorithm. """ try: # Input validation if total_size <= 0: raise ValueError(f"Total size must be positive, got {total_size}") if duration_minutes <= 0: raise ValueError(f"Duration must be positive, got {duration_minutes}") # Estimate number of slices (typically 1 per minute) estimated_slices = max(1, duration_minutes) # Prepare TWAP parameters twap_params = { "coin": coin, "is_buy": is_buy, "sz": total_size, "duration": duration_minutes * 60, # Convert to seconds "randomize": randomize } # Execute TWAP order response = self.exchange.twap_order( coin=coin, is_buy=is_buy, sz=total_size, duration=duration_minutes * 60, randomize=randomize ) # Parse response if response and response.get("status") == "ok": twap_data = response.get("response", {}).get("data", {}) return { "success": True, "twap_id": twap_data.get("twap_id", f"twap_{int(time.time())}"), "coin": coin, "side": "buy" if is_buy else "sell", "total_size": total_size, "duration_minutes": duration_minutes, "estimated_slices": estimated_slices, "randomize": randomize, "status": "active", "start_time": datetime.utcnow().isoformat(), "estimated_end_time": ( datetime.utcnow() + timedelta(minutes=duration_minutes) ).isoformat(), "response": response } else: return { "success": False, "error": response.get("response", "TWAP order creation failed"), "coin": coin, "timestamp": datetime.utcnow().isoformat() } except ValueError as ve: return { "success": False, "error": f"Validation error: {str(ve)}", "coin": coin, "timestamp": datetime.utcnow().isoformat() } except Exception as e: return { "success": False, "error": f"TWAP order placement failed: {str(e)}", "coin": coin, "timestamp": datetime.utcnow().isoformat() } async def adjust_leverage( self, coin: str, leverage: int, is_cross: bool = True ) -> Dict[str, Any]: """ Adjust leverage for a perpetual trading pair. Args: coin: Trading pair symbol leverage: Leverage multiplier (e.g., 10 for 10x) is_cross: True for cross margin, False for isolated margin Returns: Dict containing: - success: bool - coin: str - leverage: int - margin_mode: str ("cross" or "isolated") - status: str - error: str (if failed) Warning: Higher leverage increases both potential profits and losses. Ensure you understand the risks before using high leverage. Note: Maximum leverage varies by asset. Common limits: - BTC/ETH: 50x - Altcoins: 20-30x """ try: # Input validation if leverage <= 0: raise ValueError(f"Leverage must be positive, got {leverage}") if leverage > 50: raise ValueError(f"Leverage {leverage}x exceeds typical maximum of 50x") # Execute leverage update response = self.exchange.update_leverage( coin=coin, leverage=leverage, is_cross=is_cross ) # Parse response if response and response.get("status") == "ok": return { "success": True, "coin": coin, "leverage": leverage, "margin_mode": "cross" if is_cross else "isolated", "status": "updated", "timestamp": datetime.utcnow().isoformat(), "response": response } else: return { "success": False, "error": response.get("response", "Leverage update failed"), "coin": coin, "timestamp": datetime.utcnow().isoformat() } except ValueError as ve: return { "success": False, "error": f"Validation error: {str(ve)}", "coin": coin, "timestamp": datetime.utcnow().isoformat() } except Exception as e: return { "success": False, "error": f"Leverage adjustment failed: {str(e)}", "coin": coin, "timestamp": datetime.utcnow().isoformat() } async def modify_isolated_margin( self, coin: str, amount: float, is_add: bool = True ) -> Dict[str, Any]: """ Add or remove USDC from an isolated margin position. Args: coin: Trading pair symbol amount: Amount of USDC to add or remove is_add: True to add margin, False to remove margin Returns: Dict containing: - success: bool - coin: str - amount: float - action: str ("added" or "removed") - new_margin: float (if available) - status: str - error: str (if failed) Note: This only works for positions in isolated margin mode. For cross margin positions, use adjust_leverage() to switch to isolated mode first. Warning: Removing too much margin may trigger liquidation if position margin falls below maintenance requirements. """ try: # Input validation if amount <= 0: raise ValueError(f"Amount must be positive, got {amount}") # Execute margin update response = self.exchange.update_isolated_margin( coin=coin, is_buy=is_add, ntli=amount # Note: ntli = notional transfer leverage isolated ) # Parse response if response and response.get("status") == "ok": return { "success": True, "coin": coin, "amount": amount, "action": "added" if is_add else "removed", "status": "updated", "timestamp": datetime.utcnow().isoformat(), "response": response } else: return { "success": False, "error": response.get("response", "Isolated margin update failed"), "coin": coin, "timestamp": datetime.utcnow().isoformat() } except ValueError as ve: return { "success": False, "error": f"Validation error: {str(ve)}", "coin": coin, "timestamp": datetime.utcnow().isoformat() } except Exception as e: return { "success": False, "error": f"Isolated margin modification failed: {str(e)}", "coin": coin, "timestamp": datetime.utcnow().isoformat() } async def update_dead_mans_switch(self, delay_seconds: int) -> Dict[str, Any]: """ Configure dead man's switch to auto-cancel all orders after a delay. The dead man's switch is a safety mechanism that automatically cancels all open orders if not refreshed within the specified delay. This is useful for preventing runaway orders if your trading bot crashes or loses connection. Args: delay_seconds: Time in seconds before auto-canceling (minimum 5) Returns: Dict containing: - success: bool - delay_seconds: int - trigger_time: str (ISO timestamp when switch will trigger) - status: str - error: str (if failed) Note: You must call this method periodically (before the delay expires) to prevent the switch from triggering. Common pattern is to call this every time you successfully communicate with the exchange. Warning: If the delay expires, ALL open orders will be canceled immediately. Set the delay high enough to account for network latency and temporary disconnections. """ try: # Input validation if delay_seconds < 5: raise ValueError( f"Delay must be at least 5 seconds, got {delay_seconds}" ) # Execute dead man's switch update response = self.exchange.update_dead_mans_switch( timeout=delay_seconds ) # Calculate trigger time trigger_time = datetime.utcnow() + timedelta(seconds=delay_seconds) # Parse response if response and response.get("status") == "ok": return { "success": True, "delay_seconds": delay_seconds, "trigger_time": trigger_time.isoformat(), "status": "armed", "timestamp": datetime.utcnow().isoformat(), "response": response } else: return { "success": False, "error": response.get("response", "Dead man's switch update failed"), "timestamp": datetime.utcnow().isoformat() } except ValueError as ve: return { "success": False, "error": f"Validation error: {str(ve)}", "timestamp": datetime.utcnow().isoformat() } except Exception as e: return { "success": False, "error": f"Dead man's switch update failed: {str(e)}", "timestamp": datetime.utcnow().isoformat() }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/caiovicentino/hyperliquid-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server