scan_crypto_flow
Scan multiple cryptocurrencies for orderflow signals to assess portfolio-wide risk. Returns strongest buy/sell signals sorted by confidence.
Instructions
Scan multiple cryptocurrencies for orderflow signals at once.
Returns a summary with strongest buy/sell signals and individual results
sorted by confidence. Use this for portfolio-wide risk assessment.
Args:
symbols: Comma-separated trading pairs (default: BTC, ETH, SOL, BNB, XRP)Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| symbols | No | BTCUSDT,ETHUSDT,SOLUSDT,BNBUSDT,XRPUSDT |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
| result | Yes |
Implementation Reference
- horus_mcp_public.py:127-159 (handler)Tool 3: Multi-Symbol Scanner - async function decorated with @mcp.tool() that scans multiple crypto symbols for orderflow signals simultaneously using asyncio.gather, sorts results by confidence, and returns a JSON summary with strongest buy/sell signals.
@mcp.tool() async def scan_crypto_flow(symbols: str = "BTCUSDT,ETHUSDT,SOLUSDT,BNBUSDT,XRPUSDT") -> str: """Scan multiple cryptocurrencies for orderflow signals at once. Returns a summary with strongest buy/sell signals and individual results sorted by confidence. Use this for portfolio-wide risk assessment. Args: symbols: Comma-separated trading pairs (default: BTC, ETH, SOL, BNB, XRP) """ import asyncio symbol_list = [s.strip().upper() for s in symbols.split(",")] tasks = [_fetch(f"/v1/flow/crypto/{sym}") for sym in symbol_list] raw_results = await asyncio.gather(*tasks) results = [] for sym, data in zip(symbol_list, raw_results): if not data.get("error"): results.append(data) else: results.append({"symbol": sym, "signal": "UNAVAILABLE", "error": data.get("detail", "No data")}) results.sort(key=lambda x: x.get("confidence", 0), reverse=True) summary = { "scanned": len(symbol_list), "available": len([r for r in results if r.get("signal") != "UNAVAILABLE"]), "strongest_buy": next((r["symbol"] for r in results if r.get("signal") == "BUY_PRESSURE"), None), "strongest_sell": next((r["symbol"] for r in results if r.get("signal") == "SELL_PRESSURE"), None), "results": results, } return json.dumps(summary, indent=2) - horus_mcp_public.py:127-127 (registration)The @mcp.tool() decorator registers scan_crypto_flow as an MCP tool on the FastMCP server instance.
@mcp.tool() - horus_mcp_public.py:58-89 (helper)The _fetch helper function is used internally by scan_crypto_flow to make HTTP requests to the RapidAPI endpoint.
async def _fetch(endpoint: str) -> dict: """Fetch data from the live RapidAPI endpoint.""" async with httpx.AsyncClient(timeout=10.0) as client: try: resp = await client.get( f"{RAPIDAPI_BASE_URL}{endpoint}", headers=HEADERS, ) if resp.status_code == 200: return resp.json() elif resp.status_code in [401, 403]: return { "error": True, "signal": "UNAUTHORIZED", "detail": "Invalid or missing RAPIDAPI_KEY. Please verify your RapidAPI subscription." } elif resp.status_code == 429: return { "error": True, "signal": "RATE_LIMITED", "detail": "You have exceeded your RapidAPI quota. Please upgrade your plan." } return { "error": True, "status_code": resp.status_code, "detail": resp.text, } except Exception as e: return { "error": True, "detail": f"Network Error: {str(e)}" }