#!/usr/bin/env python3
"""
加密货币指标 MCP 客户端示例
演示如何使用 MCP 客户端调用加密货币指标服务
用法:
python crypto_indicators_client_example.py
"""
import asyncio
import json
from contextlib import AsyncExitStack
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
async def main():
"""主函数"""
print("="*60)
print("📊 加密货币指标 MCP 客户端示例")
print("="*60)
# 服务器参数
server_params = StdioServerParameters(
command="python3",
args=["crypto_indicators_mcp.py"],
env=None,
)
async with AsyncExitStack() as stack:
# 连接到 MCP 服务器
print("\n🔌 正在连接到 MCP 服务器...")
stdio_transport = await stack.enter_async_context(stdio_client(server_params))
stdio, write = stdio_transport
session = await stack.enter_async_context(ClientSession(stdio, write))
# 初始化会话
print("✅ 连接成功,正在初始化...")
await session.initialize()
# 列出可用工具
print("\n📋 可用工具列表:")
tools_result = await session.list_tools()
for tool in tools_result.tools:
print(f" • {tool.name}: {tool.description}")
# 示例 1: 获取单个加密货币的指标
print("\n" + "="*60)
print("示例 1: 获取 BTC 的1小时指标")
print("="*60)
result1 = await session.call_tool(
"get_crypto_indicators",
arguments={
"symbol": "BTC",
"timeframe": "1h",
"limit": 500,
}
)
for content in result1.content:
if content.type == "text":
print(content.text)
# 示例 2: 多时间框架分析
print("\n" + "="*60)
print("示例 2: ETH 多时间框架分析")
print("="*60)
result2 = await session.call_tool(
"get_multi_timeframe_analysis",
arguments={
"symbol": "ETH",
"timeframes": ["1d", "4h", "1h", "15m"],
}
)
for content in result2.content:
if content.type == "text":
print(content.text)
# 示例 3: 获取山寨币指标
print("\n" + "="*60)
print("示例 3: SOL 15分钟指标")
print("="*60)
result3 = await session.call_tool(
"get_crypto_indicators",
arguments={
"symbol": "SOL",
"timeframe": "15m",
"limit": 200,
}
)
for content in result3.content:
if content.type == "text":
print(content.text)
print("\n✅ 客户端示例执行完成")
if __name__ == "__main__":
asyncio.run(main())