get_recent_trades
Retrieve recent public trades (the tape) for any symbol on a specified exchange. Use to inspect order flow, identify large trades, or compute short-term trade-driven metrics.
Instructions
Get recent public trades (the tape) for a symbol on a specific exchange.
Use to inspect order flow, identify large prints, or compute very short-term trade-driven metrics.
Args: exchange_id: CCXT exchange ID. symbol: Unified symbol, e.g. "BTC/USDT". limit: Number of recent trades to return (max ~1000 depending on venue).
Returns:
Array of trades with id, timestamp, datetime, symbol, side,
price, amount, cost, takerOrMaker.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| exchange_id | Yes | ||
| symbol | Yes | ||
| limit | No |
Implementation Reference
- coin_mcp/ccxt_tools.py:119-143 (handler)Handler function for the get_recent_trades tool. Decorated with @mcp.tool(), it fetches recent public trades from a given exchange using CCXT's fetch_trades, with limit clamped to 1-1000.
@mcp.tool() async def get_recent_trades( exchange_id: str, symbol: str, limit: int = 50, ) -> Any: """Get recent public trades (the tape) for a symbol on a specific exchange. Use to inspect order flow, identify large prints, or compute very short-term trade-driven metrics. Args: exchange_id: CCXT exchange ID. symbol: Unified symbol, e.g. "BTC/USDT". limit: Number of recent trades to return (max ~1000 depending on venue). Returns: Array of trades with `id`, `timestamp`, `datetime`, `symbol`, `side`, `price`, `amount`, `cost`, `takerOrMaker`. """ def _do() -> Any: ex = _get_ccxt_exchange(exchange_id) return ex.fetch_trades(symbol, limit=max(1, min(limit, 1000))) return await _ccxt_call(_do) - coin_mcp/ccxt_tools.py:119-119 (registration)Registration via @mcp.tool() decorator on the FastMCP instance imported from core.
@mcp.tool() - coin_mcp/core.py:309-330 (helper)Shared helper _ccxt_call that runs blocking CCXT calls in a thread executor, with optional per-exchange locking and structured error handling.
async def _ccxt_call(fn, *args, exchange_id: str | None = None, **kwargs): """Run a blocking CCXT call in a thread; return structured error on failure. When `exchange_id` is supplied, the call is serialized through the per-exchange lock so concurrent fetches against the same instance don't corrupt CCXT's internal session/rate-limit state. When omitted (legacy callers), no lock is taken — preserving prior behavior. """ loop = asyncio.get_running_loop() def _runner(): if exchange_id: with _per_lock(exchange_id): return fn(*args, **kwargs) return fn(*args, **kwargs) try: return await loop.run_in_executor(None, _runner) except ccxt.BaseError as e: return {"error": f"{type(e).__name__}: {e}"} except Exception as e: # pragma: no cover - safety net return {"error": f"unexpected error: {type(e).__name__}: {e}"} - coin_mcp/core.py:257-306 (helper)Helper that gets or builds a cached CCXT exchange instance with LRU eviction and thread-safe construction.
def _get_ccxt_exchange(exchange_id: str) -> ccxt.Exchange: """Get (or build & cache) a CCXT exchange instance by id. The cache is bounded (LRU, max `_CCXT_CACHE_MAX` entries) and thread-safe. On first construction `load_markets()` is pre-called so concurrent method calls don't race on the lazy-initialized markets table. """ exchange_id = exchange_id.lower().strip() cls = getattr(ccxt, exchange_id, None) if cls is None or not isinstance(cls, type) or not issubclass(cls, ccxt.Exchange): raise ValueError( f"Unknown CCXT exchange id: {exchange_id!r}. " f"Use list_supported_exchanges to see valid ids." ) with _exchange_cache_lock: if exchange_id in _exchange_cache: _exchange_cache.move_to_end(exchange_id) return _exchange_cache[exchange_id] # Build outside the cache lock; serialize construction per-id. lock = _per_lock(exchange_id) with lock: # Re-check after acquiring per-id lock — another thread may have built # and cached it while we waited. with _exchange_cache_lock: if exchange_id in _exchange_cache: _exchange_cache.move_to_end(exchange_id) return _exchange_cache[exchange_id] inst = cls({"enableRateLimit": True, "timeout": int(DEFAULT_TIMEOUT * 1000)}) # Pre-load markets so the first per-method call doesn't race on lazy # initialization. Failures (geo-block, no markets endpoint) are # ignored here; the actual call site will surface a clean error. try: inst.load_markets() except Exception: pass with _exchange_cache_lock: _exchange_cache[exchange_id] = inst _exchange_cache.move_to_end(exchange_id) while len(_exchange_cache) > _CCXT_CACHE_MAX: evicted_id, evicted_inst = _exchange_cache.popitem(last=False) try: if hasattr(evicted_inst, "close"): evicted_inst.close() except Exception: pass return inst - coin_mcp/ccxt_tools.py:119-143 (schema)Schema definition via type annotations: exchange_id (str), symbol (str), limit (int, default 50). Returns Any (array of trade dicts).
@mcp.tool() async def get_recent_trades( exchange_id: str, symbol: str, limit: int = 50, ) -> Any: """Get recent public trades (the tape) for a symbol on a specific exchange. Use to inspect order flow, identify large prints, or compute very short-term trade-driven metrics. Args: exchange_id: CCXT exchange ID. symbol: Unified symbol, e.g. "BTC/USDT". limit: Number of recent trades to return (max ~1000 depending on venue). Returns: Array of trades with `id`, `timestamp`, `datetime`, `symbol`, `side`, `price`, `amount`, `cost`, `takerOrMaker`. """ def _do() -> Any: ex = _get_ccxt_exchange(exchange_id) return ex.fetch_trades(symbol, limit=max(1, min(limit, 1000))) return await _ccxt_call(_do)