health_check
Ping multiple upstream cryptocurrency data sources in parallel to verify operational status and measure latency. Returns per-source status and response times. Use when users report data outages or staleness.
Instructions
Parallel-ping every upstream data source and report status + latency.
DEBUGGING tool. Call only when the user reports something is broken ("X is down", "data looks stale") or when you suspect an upstream is degraded. Do NOT call on every request — issues fresh network calls.
Pings in parallel:
CoinGecko: /ping
DefiLlama: /protocols?limit=1
DexScreener: /latest/dex/search?q=BTC
Alternative.me: /fng/?limit=1
CCXT/Binance: fetch_status() (fetch_ticker fallback)
Returns:
Object with checked_at (ISO8601 UTC), all_ok (bool — true iff every
source succeeded), and sources — array sorted by latency asc. Each
entry has source, ok, latency_ms, and detail (short success
identifier or <ExceptionType>: <first 100 chars> on failure).
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Implementation Reference
- coin_mcp/aggregate.py:187-251 (handler)The @mcp.tool() decorated async function health_check that implements the 'health_check' tool. It parallel-pings CoinGecko, DefiLlama, DexScreener, Alternative.me, and CCXT/Binance, measures latency, and returns a structured dict with checked_at, all_ok, and sorted sources.
@mcp.tool() async def health_check() -> dict[str, Any]: """Parallel-ping every upstream data source and report status + latency. DEBUGGING tool. Call only when the user reports something is broken ("X is down", "data looks stale") or when you suspect an upstream is degraded. Do NOT call on every request — issues fresh network calls. Pings in parallel: - CoinGecko: /ping - DefiLlama: /protocols?limit=1 - DexScreener: /latest/dex/search?q=BTC - Alternative.me: /fng/?limit=1 - CCXT/Binance: fetch_status() (fetch_ticker fallback) Returns: Object with `checked_at` (ISO8601 UTC), `all_ok` (bool — true iff every source succeeded), and `sources` — array sorted by latency asc. Each entry has `source`, `ok`, `latency_ms`, and `detail` (short success identifier or `<ExceptionType>: <first 100 chars>` on failure). """ pings: list[Awaitable[dict[str, Any]]] = [ _timed( "CoinGecko", lambda: _http_get(f"{COINGECKO_PUBLIC_BASE}/ping"), _cg_detail, ), _timed( "DefiLlama", lambda: _http_get(f"{DEFILLAMA_BASE}/protocols", params={"limit": 1}), _llama_detail, ), _timed( "DexScreener", lambda: _http_get( f"{DEXSCREENER_BASE}/latest/dex/search", params={"q": "BTC"} ), _dex_detail, ), _timed( "Alternative.me", lambda: _http_get(f"{ALTERNATIVE_BASE}/fng/", params={"limit": 1}), _fng_detail, ), _timed("CCXT (binance)", _ping_ccxt_binance_inner, _binance_detail), ] results = await asyncio.gather(*pings, return_exceptions=True) sources: list[dict[str, Any]] = [] for r in results: if isinstance(r, Exception): sources.append({ "source": "unknown", "ok": False, "latency_ms": 0, "detail": f"{type(r).__name__}: {_short(r)}", }) else: sources.append(r) sources.sort(key=lambda s: s.get("latency_ms", 0)) return { "checked_at": _now_iso_utc(), "all_ok": all(s.get("ok") for s in sources), "sources": sources, } - coin_mcp/aggregate.py:188-207 (schema)The docstring serves as the tool's schema definition (accepted by FastMCP tool decorator) describing inputs (none), parallel pings, and the return shape: {checked_at, all_ok, sources[{source, ok, latency_ms, detail}]}.
async def health_check() -> dict[str, Any]: """Parallel-ping every upstream data source and report status + latency. DEBUGGING tool. Call only when the user reports something is broken ("X is down", "data looks stale") or when you suspect an upstream is degraded. Do NOT call on every request — issues fresh network calls. Pings in parallel: - CoinGecko: /ping - DefiLlama: /protocols?limit=1 - DexScreener: /latest/dex/search?q=BTC - Alternative.me: /fng/?limit=1 - CCXT/Binance: fetch_status() (fetch_ticker fallback) Returns: Object with `checked_at` (ISO8601 UTC), `all_ok` (bool — true iff every source succeeded), and `sources` — array sorted by latency asc. Each entry has `source`, `ok`, `latency_ms`, and `detail` (short success identifier or `<ExceptionType>: <first 100 chars>` on failure). """ - coin_mcp/aggregate.py:60-65 (helper)_timed helper function that runs a coroutine, measures elapsed time, checks for errors via is_error(), extracts detail via success_detail callback, and formats the result as a health-check row dict.
async def _timed( source: str, coro_fn: Callable[[], Awaitable[Any]], success_detail: Callable[[Any], str], ) -> dict[str, Any]: """Run `coro_fn()`, time it, and shape the result into a health row. - coin_mcp/aggregate.py:146-184 (helper)Helper functions used by health_check to extract detail strings from each data source's response: _cg_detail (CoinGecko), _llama_detail (DefiLlama), _dex_detail (DexScreener), _fng_detail (Alternative.me/FNG), _binance_detail (CCXT Binance).
def _cg_detail(resp: Any) -> str: return (resp or {}).get("gecko_says") or "ok" def _llama_detail(resp: Any) -> str: return f"{len(resp) if isinstance(resp, list) else 0} protocols" def _dex_detail(resp: Any) -> str: return f"{len((resp or {}).get('pairs') or [])} pairs" def _fng_detail(resp: Any) -> str: data = (resp or {}).get("data") or [] if not data: return "no data" d = data[0] return f"FnG {d.get('value', '?')} ({d.get('value_classification', '?')})" async def _ping_ccxt_binance_inner() -> Any: """Pre-warm binance, then call fetch_status (or fetch_ticker fallback).""" def _do() -> Any: ex = _get_ccxt_exchange("binance") if ex.has.get("fetchStatus"): return {"kind": "status", "value": ex.fetch_status()} return {"kind": "ticker", "value": ex.fetch_ticker("BTC/USDT")} return await _safe_ccxt("binance", _do, warm_timeout=15.0, call_timeout=15.0) def _binance_detail(resp: Any) -> str: if not isinstance(resp, dict): return "ok" if resp.get("kind") == "status": return f"status={(resp.get('value') or {}).get('status', '?')}" if resp.get("kind") == "ticker": return f"BTC/USDT last={(resp.get('value') or {}).get('last')}" return "ok" - server.py:18-27 (registration)The aggregate module (which contains the health_check tool) is imported in server.py, which triggers its @mcp.tool() registration on the shared FastMCP instance.
from coin_mcp import coingecko # noqa: F401 from coin_mcp import ccxt_tools # noqa: F401 from coin_mcp import sentiment # noqa: F401 from coin_mcp import indicators # noqa: F401 from coin_mcp import defillama # noqa: F401 from coin_mcp import dexscreener # noqa: F401 from coin_mcp import derivatives # noqa: F401 from coin_mcp import aggregate # noqa: F401 from coin_mcp import prompts # noqa: F401 from coin_mcp import resources # noqa: F401