import httpx
import logging
import sys
import os
from mcp_app import mcp
logging.basicConfig(
stream=sys.stderr, level=logging.INFO, format="[LOG] %(message)s", force=True
)
@mcp.tool()
async def convert_crypto(amount: float, from_coin: str, to_coin: str) -> str:
"""
Convert cryptocurrency using real-time rates (e.g., BTC to ETH, USDC to USD).
Args:
amount: Volume to convert (e.g., 1.5)
from_coin: Coin ID/Symbol (e.g., 'bitcoin', 'ethereum', 'usdc')
to_coin: Target currency (e.g., 'usd', 'eur', 'eth')
"""
if not isinstance(amount, (int, float)):
return "Error: Amount must be a number."
if not isinstance(from_coin, str):
return "Error: From Coin must be a string"
if not isinstance(to_coin, str):
return "Error: To Coin must be a string"
from_coin = from_coin.lower()
to_coin = to_coin.lower()
api_key = os.getenv("COINGECKO_API_KEY")
headers = {}
if api_key:
headers = {"x-cg-demo-api-key": api_key}
logging.info(f"Converting {amount} {from_coin} to {to_coin}")
url = f"https://api.coingecko.com/api/v3/simple/price?ids={from_coin}&vs_currencies={to_coin}"
async with httpx.AsyncClient() as client:
try:
response = await client.get(url, headers=headers)
logging.info(f"Response: {response}")
data = response.json()
if from_coin not in data:
return f"Error: Coin '{from_coin}' not found. Try using the full name (e.g., 'bitcoin' instead of 'btc')."
if to_coin not in data[from_coin]:
return f"Error: Target currency '{to_coin}' not supported."
rate = data[from_coin][to_coin]
total = amount * rate
return f"{amount} {from_coin.upper()} = {total} {to_coin.upper()} (Rate: {rate})"
except Exception as e:
return f"Error fetching crypto prices: {str(e)}"