"""BigBugAI MCP tools implementation.
This module defines tool functions and a registration helper `register_tools` to
attach them to a FastMCP server instance for both stdio and HTTP transports.
"""
from __future__ import annotations
import os
from typing import Any, cast
import httpx
from .auth import guarded
from .models import TokenAnalysisReq, TrendingReq
def _base_url() -> str:
return os.getenv("BTUNIFIED_API", "https://api.bigbug.ai")
def _auth_headers() -> dict[str, str] | None:
"""Build Authorization headers for BigBugAI API calls if a token is set.
The token is sourced from one of the following env vars (first found wins):
- BIGBUGAI_API_KEY
- BIGBUGAI_API_TOKEN
- BIGBUGAI_MCP_API_KEY (fallback; commonly used for local MCP auth)
"""
token = (
os.getenv("BIGBUGAI_API_KEY")
or os.getenv("BIGBUGAI_API_TOKEN")
or os.getenv("BIGBUGAI_MCP_API_KEY")
)
if not token:
return None
return {
"Authorization": f"Bearer {token}",
"X-API-Key": token,
"Accept": "application/json",
"User-Agent": "bigbugai-mcp/0.1",
}
async def _get(path: str, params: dict[str, Any] | None = None) -> dict[str, Any] | list[dict[str, Any]]:
"""Perform a GET request to the BigBugAI API.
Args:
path: API path starting with '/'.
params: Query params to include.
Returns:
Parsed JSON body as a dict or list.
Raises:
httpx.HTTPStatusError: if non-2xx response.
httpx.RequestError: for network errors.
ValueError: if response is not JSON.
"""
base = _base_url()
if not path.startswith("/"):
path = "/" + path
url = base + path
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.get(url, params=params, headers=_auth_headers())
try:
resp.raise_for_status()
except httpx.HTTPStatusError as e:
# Surface status and text for debugging
detail = resp.text[:500]
raise httpx.HTTPStatusError(
f"GET {url} failed with status {resp.status_code}: {detail}",
request=e.request,
response=e.response,
) from None
try:
data: Any = resp.json()
except ValueError as e: # JSON parse error
raise ValueError(f"Invalid JSON from {url}: {resp.text[:200]}") from e
if not isinstance(data, (dict, list)):
raise ValueError(f"Unexpected JSON shape from {url}: {type(data).__name__}")
return cast(dict[str, Any] | list[dict[str, Any]], data)
@guarded
async def get_trending_tokens(req: TrendingReq) -> list[dict[str, Any]]:
"""Fetch trending tokens from BigBugAI.
Calls GET {BTUNIFIED_API}/api/tokens/newly-ingested by default.
If that path returns 404, we automatically fall back to a small set of
candidate endpoints that may be used by newer API versions:
- /v1/trending/tokens
- /v1/tokens/trending
- /v1/trending
- /v1/market/trending
You can override the primary path via env var BTUNIFIED_TRENDING_PATH.
Normalization: if the response is an object containing an `items` array, the
array is returned. Otherwise, if the response is already an array, it's returned
directly.
"""
# Candidate endpoints in order of preference; deduplicated while preserving order
paths = list(
dict.fromkeys(
[
os.getenv("BTUNIFIED_TRENDING_PATH", "/api/tokens/newly-ingested"),
"/v1/trending/tokens",
"/v1/tokens/trending",
"/v1/trending",
"/v1/market/trending",
]
)
)
last_404: httpx.HTTPStatusError | None = None
for path in paths:
try:
# Some endpoints (like /api/tokens/newly-ingested) may not accept 'limit'
params = None if path == "/api/tokens/newly-ingested" else {"limit": req.limit}
data = await _get(path, params=params)
except httpx.HTTPStatusError as e:
# Try next candidate only on 404; otherwise bubble up
if e.response is not None and e.response.status_code == 404:
last_404 = e
continue
raise
# Normalize response shape
if isinstance(data, dict):
items = data.get("items")
if isinstance(items, list):
return items
# If dict but not items, return empty list for stability
return []
return data
# If we tried all candidates and only got 404s, raise a helpful error
tried = ", ".join(paths)
if last_404 is not None:
raise httpx.HTTPStatusError(
f"Trending tokens endpoint not found (404). Tried: {tried}",
request=last_404.request,
response=last_404.response,
)
# Defensive fallback
raise RuntimeError(f"Unable to fetch trending tokens; attempted paths: {tried}")
@guarded
async def token_analysis_by_contract(req: TokenAnalysisReq) -> dict[str, Any]:
"""Fetch token analysis for a contract address on a given chain.
Calls GET {BTUNIFIED_API}/api/token-intel/{chain}/{contract_address}/report
and returns a JSON object with analysis fields.
"""
# Normalize inputs for the path parameters
chain = req.chain.strip().lower()
# Map common aliases to API-expected slugs
chain_aliases = {
"eth": "ethereum",
"ethereum": "ethereum",
"bnb": "bsc",
"bsc": "bsc",
"binance-smart-chain": "bsc",
"sol": "solana",
"solana": "solana",
"arb": "arbitrum",
"arbitrum": "arbitrum",
"op": "optimism",
"optimism": "optimism",
"avax": "avalanche",
"avalanche": "avalanche",
"matic": "polygon",
"polygon": "polygon",
"base": "base",
"fantom": "fantom",
"ftm": "fantom",
}
chain = chain_aliases.get(chain, chain)
address = req.address.strip()
data = await _get(f"/api/token-intel/{chain}/{address}/report")
if not isinstance(data, dict):
raise ValueError("Expected a JSON object for token analysis response")
return data
def register_tools(mcp: Any) -> None:
"""Register tools with a FastMCP server instance.
We define small wrapper functions in this module so FastMCP can evaluate
type annotations without dealing with decorator globals.
"""
async def _wrap_get_trending_tokens(req: TrendingReq) -> list[dict[str, Any]]:
"""Fetch trending tokens from BigBugAI (wrapper)."""
return await get_trending_tokens(req)
async def _wrap_token_analysis_by_contract(req: TokenAnalysisReq) -> dict[str, Any]:
"""Token analysis by contract (wrapper)."""
return await token_analysis_by_contract(req)
# Register with FastMCP using explicit names
mcp.add_tool(_wrap_get_trending_tokens, name="get_trending_tokens")
mcp.add_tool(_wrap_token_analysis_by_contract, name="token_analysis_by_contract")