"""
MCP Server for Cryptocurrency Market Data.
Provides tools for fetching real-time and historical crypto data.
"""
import asyncio
import json
from typing import Any, Sequence
from mcp.server import Server
from mcp.types import Tool, TextContent
from mcp.server.stdio import stdio_server
from .crypto_api import CryptoAPI
from .cache import CachedCryptoAPI
class CryptoMCPServer:
"""MCP Server for cryptocurrency market data."""
def __init__(self, exchange_id: str = 'binance', cache_ttl: int = 60):
"""
Initialize the MCP server.
Args:
exchange_id: Exchange to use (default: binance)
cache_ttl: Cache time-to-live in seconds
"""
self.server = Server("crypto-mcp-server")
self.crypto_api = CryptoAPI(exchange_id)
self.cached_api = CachedCryptoAPI(self.crypto_api, cache_ttl)
# Register request handlers
self._setup_handlers()
def _setup_handlers(self):
"""Set up MCP request handlers."""
@self.server.list_tools()
async def list_tools() -> list[Tool]:
"""List available tools."""
return [
Tool(
name="get_crypto_price",
description="Get current price for a cryptocurrency pair (e.g., BTC/USDT)",
inputSchema={
"type": "object",
"properties": {
"symbol": {
"type": "string",
"description": "Trading pair symbol (e.g., 'BTC/USDT', 'ETH/USDT')"
},
"use_cache": {
"type": "boolean",
"description": "Whether to use cached data (default: true)",
"default": True
}
},
"required": ["symbol"]
}
),
Tool(
name="get_multiple_prices",
description="Get current prices for multiple cryptocurrency pairs",
inputSchema={
"type": "object",
"properties": {
"symbols": {
"type": "array",
"items": {"type": "string"},
"description": "List of trading pairs (e.g., ['BTC/USDT', 'ETH/USDT'])"
}
},
"required": ["symbols"]
}
),
Tool(
name="get_historical_data",
description="Get historical OHLCV (Open, High, Low, Close, Volume) data",
inputSchema={
"type": "object",
"properties": {
"symbol": {
"type": "string",
"description": "Trading pair symbol"
},
"timeframe": {
"type": "string",
"description": "Timeframe (e.g., '1m', '5m', '1h', '1d')",
"default": "1d"
},
"limit": {
"type": "integer",
"description": "Number of data points to fetch",
"default": 100
},
"use_cache": {
"type": "boolean",
"description": "Whether to use cached data",
"default": True
}
},
"required": ["symbol"]
}
),
Tool(
name="get_market_summary",
description="Get comprehensive market summary including 24h stats and price changes",
inputSchema={
"type": "object",
"properties": {
"symbol": {
"type": "string",
"description": "Trading pair symbol"
},
"use_cache": {
"type": "boolean",
"description": "Whether to use cached data",
"default": True
}
},
"required": ["symbol"]
}
),
Tool(
name="get_orderbook",
description="Get current order book (bids and asks) for a trading pair",
inputSchema={
"type": "object",
"properties": {
"symbol": {
"type": "string",
"description": "Trading pair symbol"
},
"limit": {
"type": "integer",
"description": "Depth of order book",
"default": 20
}
},
"required": ["symbol"]
}
),
Tool(
name="search_symbols",
description="Search for available trading pairs containing a specific cryptocurrency",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query (e.g., 'BTC', 'ETH')"
}
},
"required": ["query"]
}
),
Tool(
name="get_supported_exchanges",
description="Get list of all supported cryptocurrency exchanges",
inputSchema={
"type": "object",
"properties": {}
}
),
Tool(
name="clear_cache",
description="Clear all cached data",
inputSchema={
"type": "object",
"properties": {}
}
),
Tool(
name="get_cache_stats",
description="Get cache statistics (total entries, valid entries, etc.)",
inputSchema={
"type": "object",
"properties": {}
}
)
]
@self.server.call_tool()
async def call_tool(name: str, arguments: Any) -> Sequence[TextContent]:
"""Handle tool calls."""
try:
if name == "get_crypto_price":
symbol = arguments["symbol"]
use_cache = arguments.get("use_cache", True)
result = self.cached_api.get_current_price(symbol, use_cache)
elif name == "get_multiple_prices":
symbols = arguments["symbols"]
result = self.crypto_api.get_multiple_prices(symbols)
elif name == "get_historical_data":
symbol = arguments["symbol"]
timeframe = arguments.get("timeframe", "1d")
limit = arguments.get("limit", 100)
use_cache = arguments.get("use_cache", True)
result = self.cached_api.get_historical_ohlcv(
symbol, timeframe, limit, use_cache
)
elif name == "get_market_summary":
symbol = arguments["symbol"]
use_cache = arguments.get("use_cache", True)
result = self.cached_api.get_market_summary(symbol, use_cache)
elif name == "get_orderbook":
symbol = arguments["symbol"]
limit = arguments.get("limit", 20)
result = self.crypto_api.get_orderbook(symbol, limit)
elif name == "search_symbols":
query = arguments["query"]
result = self.crypto_api.search_symbols(query)
elif name == "get_supported_exchanges":
result = CryptoAPI.get_supported_exchanges()
elif name == "clear_cache":
self.cached_api.clear_cache()
result = {"status": "Cache cleared successfully"}
elif name == "get_cache_stats":
result = self.cached_api.get_cache_stats()
else:
raise ValueError(f"Unknown tool: {name}")
return [TextContent(
type="text",
text=json.dumps(result, indent=2)
)]
except Exception as e:
error_msg = {
"error": str(e),
"tool": name,
"arguments": arguments
}
return [TextContent(
type="text",
text=json.dumps(error_msg, indent=2)
)]
async def run(self):
"""Run the MCP server."""
async with stdio_server() as (read_stream, write_stream):
await self.server.run(
read_stream,
write_stream,
self.server.create_initialization_options()
)
async def main():
"""Main entry point."""
server = CryptoMCPServer(
exchange_id='binance',
cache_ttl=60
)
await server.run()
if __name__ == "__main__":
asyncio.run(main())