"""
Get real-time price data for specific tokens on blockchain networks.
"""
import json
from typing import Optional, Union
from mcp import types
from pydantic import BaseModel
from ...helpers import make_odos_request, resolve_chain_id, resolve_token_address
from ...mcp import mcp
class GetTokenPriceArgs(BaseModel):
"""Arguments for getting token price"""
chain_id: Union[str, int]
token_addr: str
currencyId: Optional[str] = None
@mcp.tool(
name="get_token_price",
description=(
"Get the token price info for a given chain and address."
" Price will be null if asset is valid but price is not available"
),
)
async def get_token_price(args: GetTokenPriceArgs) -> list[types.TextContent]:
"""
Gets token price for a specific token on a specific chain.
"""
try:
chain_id = resolve_chain_id(args.chain_id)
resolved_token_address = await resolve_token_address(chain_id, args.token_addr)
# Build URL with path parameters
url = f"/pricing/token/{chain_id}/{resolved_token_address}"
# Add query parameters if provided
params = {}
if args.currencyId:
params["currencyId"] = args.currencyId
response_data = await make_odos_request(url, method="GET", params=params)
output = {
"message": "Successfully fetched token price.",
"chainId": chain_id,
"tokenAddress": resolved_token_address,
"price": response_data.get("price"),
"currencyId": response_data.get("currencyId"),
}
return [types.TextContent(type="text", text=json.dumps(output, indent=2))]
except ConnectionError as e:
return [
types.TextContent(
type="text", text=f"❌ API Error in get_token_price: {str(e)}"
)
]
except (ValueError, KeyError, TypeError) as e:
return [
types.TextContent(
type="text", text=f"❌ Unexpected Error in get_token_price: {str(e)}"
)
]