Skip to main content
Glama
crypto_adapter.py9.73 kB
# src/server/domain/adapters/crypto_adapter.py """CryptoAdapter using CoinGecko API for cryptocurrency data. All methods are async and use httpx for HTTP requests. """ import asyncio from datetime import datetime from decimal import Decimal from typing import Any, Dict, List, Optional import httpx from src.server.domain.adapters.base import BaseDataAdapter from src.server.domain.types import ( AdapterCapability, Asset, AssetPrice, AssetSearchQuery, AssetSearchResult, AssetType, DataSource, Exchange, MarketInfo, MarketStatus, ) from src.server.utils.logger import logger class CryptoAdapter(BaseDataAdapter): name = "crypto" def __init__(self, cache): super().__init__(DataSource.CRYPTO) self.cache = cache self.logger = logger def get_capabilities(self) -> List[AdapterCapability]: """Declare Crypto adapter's capabilities.""" return [ AdapterCapability(asset_type=AssetType.CRYPTO, exchanges={Exchange.CRYPTO}), ] def convert_to_source_ticker(self, internal_ticker: str) -> str: """Convert CRYPTO:SYMBOL to CoinGecko ID format.""" if ":" in internal_ticker: symbol = internal_ticker.split(":")[1].lower() else: symbol = internal_ticker.lower() # Map common symbols to CoinGecko IDs symbol_map = { "btc": "bitcoin", "eth": "ethereum", "usdt": "tether", "bnb": "binancecoin", "usdc": "usd-coin", "xrp": "ripple", "ada": "cardano", "doge": "dogecoin", "sol": "solana", "dot": "polkadot", } return symbol_map.get(symbol, symbol) def convert_to_internal_ticker( self, source_ticker: str, default_exchange: Optional[str] = None ) -> str: """Convert CoinGecko ID to CRYPTO:SYMBOL.""" # Reverse map CoinGecko IDs to symbols id_map = { "bitcoin": "BTC", "ethereum": "ETH", "tether": "USDT", "binancecoin": "BNB", "usd-coin": "USDC", "ripple": "XRP", "cardano": "ADA", "dogecoin": "DOGE", "solana": "SOL", "polkadot": "DOT", } symbol = id_map.get(source_ticker.lower(), source_ticker.upper()) return f"CRYPTO:{symbol}" async def _run(self, func, *args, **kwargs): loop = asyncio.get_event_loop() return await loop.run_in_executor(None, lambda: func(*args, **kwargs)) def _to_coingecko_id(self, ticker: str) -> str: return self.convert_to_source_ticker(ticker) async def get_asset_info(self, ticker: str) -> Optional[Asset]: """Fetch detailed asset information.""" cache_key = f"crypto:info:{ticker}" cached = await self.cache.get(cache_key) if cached: return Asset.model_validate(cached) coin_id = self._to_coingecko_id(ticker) url = f"https://api.coingecko.com/api/v3/coins/{coin_id}" try: async with httpx.AsyncClient() as client: resp = await client.get(url, timeout=30) if resp.status_code == 404: return None resp.raise_for_status() data = resp.json() asset = Asset( ticker=ticker, asset_type=AssetType.CRYPTO, name=data.get("name", ticker), market_info=MarketInfo( exchange="CRYPTO", country="Global", currency="USD", timezone="UTC", market_status=MarketStatus.OPEN, ), source_mappings={DataSource.CRYPTO: coin_id}, properties={ "symbol": data.get("symbol", "").upper(), "description": data.get("description", {}).get("en", ""), "homepage": data.get("links", {}).get("homepage", [""])[0], "market_cap_rank": data.get("market_cap_rank"), }, ) await self.cache.set(cache_key, asset.model_dump(), ttl=3600) return asset except Exception as e: self.logger.warning(f"Failed to fetch asset info for {ticker}: {e}") return None async def get_real_time_price(self, ticker: str) -> Optional[AssetPrice]: """Fetch current price.""" cache_key = f"crypto:price:{ticker}" cached = await self.cache.get(cache_key) if cached: return AssetPrice.from_dict(cached) coin_id = self._to_coingecko_id(ticker) url = ( f"https://api.coingecko.com/api/v3/simple/price" f"?ids={coin_id}&vs_currencies=usd&include_market_cap=true&include_24hr_vol=true&include_24hr_change=true" ) try: async with httpx.AsyncClient() as client: resp = await client.get(url, timeout=30) resp.raise_for_status() data = resp.json() if coin_id not in data: return None item = data[coin_id] asset_price = AssetPrice( ticker=ticker, price=Decimal(str(item["usd"])), currency="USD", timestamp=datetime.utcnow(), volume=Decimal(str(item.get("usd_24h_vol", 0))), change=None, # Can calculate if we had prev close change_percent=Decimal(str(item.get("usd_24h_change", 0))), market_cap=Decimal(str(item.get("usd_market_cap", 0))), source=DataSource.CRYPTO, ) await self.cache.set(cache_key, asset_price.to_dict(), ttl=30) return asset_price except Exception as e: self.logger.warning(f"Failed to fetch price for {ticker}: {e}") return None async def get_historical_prices( self, ticker: str, start_date: datetime, end_date: datetime, interval: str = "1d", ) -> List[AssetPrice]: """Fetch historical prices.""" start_ts = int(start_date.timestamp()) end_ts = int(end_date.timestamp()) cache_key = f"crypto:history:{ticker}:{start_ts}:{end_ts}:{interval}" cached = await self.cache.get(cache_key) if cached: return [AssetPrice.from_dict(item) for item in cached] coin_id = self._to_coingecko_id(ticker) url = f"https://api.coingecko.com/api/v3/coins/{coin_id}/market_chart/range" params = { "vs_currency": "usd", "from": start_ts, "to": end_ts, } try: async with httpx.AsyncClient() as client: resp = await client.get(url, params=params, timeout=30) resp.raise_for_status() data = resp.json() prices = [] # data["prices"] is list of [timestamp, price] # data["total_volumes"] is list of [timestamp, volume] # We assume they align or just use prices for ts, price_val in data.get("prices", []): timestamp = datetime.utcfromtimestamp(ts / 1000) price = AssetPrice( ticker=ticker, price=Decimal(str(price_val)), currency="USD", timestamp=timestamp, volume=Decimal("0"), # Could match with total_volumes open_price=Decimal( str(price_val) ), # OHLC not available in this endpoint high_price=Decimal(str(price_val)), low_price=Decimal(str(price_val)), close_price=Decimal(str(price_val)), source=DataSource.CRYPTO, ) prices.append(price) await self.cache.set(cache_key, [p.to_dict() for p in prices], ttl=3600) return prices except Exception as e: self.logger.error(f"Failed to fetch history for {ticker}: {e}") return [] async def search_assets(self, query: AssetSearchQuery) -> List[AssetSearchResult]: """Search cryptocurrencies.""" cache_key = f"crypto:search:{query.query}" cached = await self.cache.get(cache_key) if cached: return [AssetSearchResult(**item) for item in cached] url = f"https://api.coingecko.com/api/v3/search?query={query.query}" try: async with httpx.AsyncClient() as client: resp = await client.get(url, timeout=30) resp.raise_for_status() data = resp.json() results = [] for coin in data.get("coins", [])[: query.limit]: internal_ticker = self.convert_to_internal_ticker(coin.get("id", "")) results.append( AssetSearchResult( ticker=internal_ticker, asset_type=AssetType.CRYPTO, name=coin.get("name", ""), exchange="CRYPTO", country="Global", currency="USD", relevance_score=1.0, # Simple score ) ) await self.cache.set(cache_key, [r.model_dump() for r in results], ttl=3600) return results except Exception as e: self.logger.warning(f"Crypto search failed: {e}") return []

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/huweihua123/stock-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server