#!/usr/bin/env python3
"""
加密货币市场数据和技术指标计算 MCP 服务
结合了:
1. Go 项目的指标计算方式(序列增量计算,O(n)时间复杂度)
2. Aster DEX API 数据获取(加密货币)
3. 清晰的数据结构和返回格式
使用方式:
python crypto_indicators_mcp.py
"""
import json
import logging
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass, asdict
from datetime import datetime
import math
import requests
import numpy as np
import talib
# MCP SDK
from mcp.server.models import InitializationOptions
import mcp.types as types
from mcp.server import NotificationOptions, Server
import mcp.server.stdio
# 设置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# ============================================================================
# 数据结构定义(参考 Go 项目的 market.Data)
# ============================================================================
@dataclass
class IntradaySeriesData:
"""日内系列数据(最近N个数据点)"""
mid_prices: List[float] # 收盘价序列
volume_values: List[float] # 成交量序列
ema20_values: List[float] # EMA20序列
ema60_values: List[float] # EMA60序列
macd_dif: List[float] # MACD DIF序列(MACD线)
macd_dea: List[float] # MACD DEA序列(信号线)
macd_hist: List[float] # MACD HIST序列(柱状图)
rsi7_values: List[float] # RSI7序列
rsi14_values: List[float] # RSI14序列
atr14_values: List[float] # ATR14序列
bb_upper: List[float] # 布林带上轨
bb_middle: List[float] # 布林带中轨
bb_lower: List[float] # 布林带下轨
@dataclass
class OIData:
"""Open Interest 数据"""
latest: float
average: float
@dataclass
class MarketData:
"""市场数据(参考 Go 的 market.Data 结构)"""
symbol: str
timeframe: str
current_price: float
price_change_1h: float # 1小时价格变化百分比
price_change_4h: float # 4小时价格变化百分比
current_ema20: float
current_ema60: float
current_macd_hist: float # MACD柱状图(HIST = DIF - DEA)
current_rsi7: float
current_rsi14: float
current_atr14: float
open_interest: Optional[OIData]
funding_rate: float
intraday_series: IntradaySeriesData
timestamp: str # ISO 格式时间戳
@dataclass
class Kline:
"""K线数据"""
open_time: int
open: float
high: float
low: float
close: float
volume: float
close_time: int
# ============================================================================
# 技术指标计算(参考 Go 项目的序列增量计算方式)
# ============================================================================
class IndicatorCalculator:
"""技术指标计算器(优化版本,使用序列增量计算)"""
@staticmethod
def calculate_ema_sequence(close_prices: np.ndarray, period: int) -> np.ndarray:
"""
计算EMA序列(增量计算,O(n)时间复杂度)
参考 Go 的 calculateEMASequence 方法
"""
if len(close_prices) < period:
return np.array([])
# 使用 talib 计算(内部已优化)
ema = talib.EMA(close_prices, timeperiod=period)
return ema
@staticmethod
def calculate_macd_sequence(close_prices: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
计算MACD序列(DIF、DEA、HIST)
参考 Go 的 calculateMACDSequence 方法
注意:MACD柱(HIST)乘以2,与交易所显示习惯保持一致
返回:(DIF序列, DEA序列, HIST序列)
"""
if len(close_prices) < 26:
return np.array([]), np.array([]), np.array([])
# 使用 talib 计算
dif, dea, hist = talib.MACD(
close_prices,
fastperiod=12,
slowperiod=26,
signalperiod=9
)
# MACD柱乘以2(与Go项目和交易所规则保持一致)
hist = hist * 2.0
return dif, dea, hist
@staticmethod
def calculate_rsi_sequence(close_prices: np.ndarray, period: int) -> np.ndarray:
"""
计算RSI序列(增量计算)
参考 Go 的 calculateRSISequence 方法
"""
if len(close_prices) <= period:
return np.array([])
rsi = talib.RSI(close_prices, timeperiod=period)
return rsi
@staticmethod
def calculate_atr_sequence(high: np.ndarray, low: np.ndarray,
close: np.ndarray, period: int) -> np.ndarray:
"""计算ATR序列"""
if len(close) <= period:
return np.array([])
atr = talib.ATR(high, low, close, timeperiod=period)
return atr
@staticmethod
def calculate_bollinger_bands(close_prices: np.ndarray,
period: int = 20, nbdev: float = 2.0) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""计算布林带"""
if len(close_prices) < period:
return np.array([]), np.array([]), np.array([])
upper, middle, lower = talib.BBANDS(
close_prices,
timeperiod=period,
nbdevup=nbdev,
nbdevdn=nbdev,
matype=0
)
return upper, middle, lower
@staticmethod
def safe_get_last_n(sequence: np.ndarray, n: int = 10) -> List[Optional[float]]:
"""
安全获取序列的最后N个值(处理NaN)
参考 Go 的 safeGetLastN 方法
"""
if len(sequence) == 0:
return []
# 获取最后N个值
last_n = sequence[-n:] if len(sequence) >= n else sequence
# 将NaN转换为None,便于JSON序列化
result = []
for val in last_n:
if isinstance(val, (float, np.floating)) and (math.isnan(val) or np.isnan(val)):
result.append(None)
else:
result.append(float(val))
return result
# ============================================================================
# 数据获取(Aster API)
# ============================================================================
# 时间框架映射(Aster API 格式)
TIMEFRAME_MAPPING = {
'1m': '1m',
'3m': '3m',
'5m': '5m',
'15m': '15m',
'30m': '30m',
'1h': '1h',
'2h': '2h',
'4h': '4h',
'6h': '6h',
'12h': '12h',
'1d': '1d',
'1w': '1w',
}
class CryptoDataProvider:
"""加密货币数据提供者(使用 Aster API)"""
def __init__(self, base_url: str = "https://fapi.asterdex.com"):
"""初始化 Aster API 连接"""
self.base_url = base_url
logger.info(f"✅ Aster API 连接已配置: {base_url}")
def normalize_symbol(self, symbol: str) -> str:
"""标准化symbol,确保是USDT交易对"""
symbol = symbol.upper()
if not symbol.endswith("USDT"):
symbol = symbol + "USDT"
return symbol
def get_klines(self, symbol: str, interval: str, limit: int = 1000) -> List[Kline]:
"""
获取K线数据
Args:
symbol: 交易对(如 'BTCUSDT')
interval: 时间周期('1m', '5m', '15m', '1h', '4h', '1d', '1w')
limit: 获取K线数量(默认1000)
Returns:
Kline对象列表
"""
# 标准化symbol
symbol = self.normalize_symbol(symbol)
# 标准化时间周期(转为小写)
interval = interval.lower()
# 验证时间周期
if interval not in TIMEFRAME_MAPPING:
logger.error(f"不支持的时间周期: {interval}")
return []
try:
url = f"{self.base_url}/fapi/v1/klines"
params = {
'symbol': symbol,
'interval': interval,
'limit': limit
}
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
raw_data = response.json()
if not raw_data:
logger.warning(f"未获取到 {symbol} 的K线数据")
return []
# 解析K线数据
klines = []
for item in raw_data:
if len(item) < 7:
continue
kline = Kline(
open_time=int(item[0]),
open=float(item[1]),
high=float(item[2]),
low=float(item[3]),
close=float(item[4]),
volume=float(item[5]),
close_time=int(item[6])
)
klines.append(kline)
logger.info(f"✅ 成功获取 {symbol} ({interval}) K线数据,共 {len(klines)} 根")
return klines
except Exception as e:
logger.error(f"❌ 获取 {symbol} K线数据失败: {e}")
return []
def get_open_interest(self, symbol: str) -> Optional[OIData]:
"""获取Open Interest数据"""
symbol = self.normalize_symbol(symbol)
try:
url = f"{self.base_url}/fapi/v1/openInterest"
params = {'symbol': symbol}
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
oi = float(data.get('openInterest', 0))
return OIData(latest=oi, average=oi)
except Exception as e:
logger.error(f"❌ 获取 {symbol} OI数据失败: {e}")
return None
def get_funding_rate(self, symbol: str) -> float:
"""获取资金费率"""
symbol = self.normalize_symbol(symbol)
try:
url = f"{self.base_url}/fapi/v1/premiumIndex"
params = {'symbol': symbol}
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
rate = float(data.get('lastFundingRate', 0))
return rate
except Exception as e:
logger.error(f"❌ 获取 {symbol} 资金费率失败: {e}")
return 0.0
def calculate_price_change(self, klines: List[Kline], timeframe: str, hours: float) -> float:
"""
计算价格变化百分比
Args:
klines: K线数据
timeframe: 当前时间框架
hours: 需要计算的小时数(1h或4h)
Returns:
价格变化百分比
"""
if not klines or len(klines) < 2:
return 0.0
# 根据时间框架计算需要的K线数量
klines_needed = 0
if timeframe == '1m':
klines_needed = int(hours * 60)
elif timeframe == '3m':
klines_needed = int(hours * 20)
elif timeframe == '5m':
klines_needed = int(hours * 12)
elif timeframe == '15m':
klines_needed = int(hours * 4)
elif timeframe == '30m':
klines_needed = int(hours * 2)
elif timeframe == '1h':
klines_needed = int(hours)
elif timeframe == '4h':
if hours == 1:
return 0.0 # 4h框架无法计算1小时变化
klines_needed = int(hours / 4)
else:
return 0.0
if klines_needed > 0 and len(klines) >= klines_needed + 1:
price_old = klines[-(klines_needed + 1)].close
price_current = klines[-1].close
if price_old > 0:
return ((price_current - price_old) / price_old) * 100
return 0.0
def get_market_data(self, symbol: str, timeframe: str = '1h', limit: int = 1000) -> Optional[MarketData]:
"""
获取指定币种和周期的市场数据
Args:
symbol: 交易对(如 'BTC', 'BTCUSDT')
timeframe: 时间周期('1m', '5m', '15m', '30m', '1h', '4h', '1d', '1w')
limit: 获取K线数量(默认1000)
Returns:
MarketData 对象,如果失败返回 None
"""
# 标准化symbol
symbol = self.normalize_symbol(symbol)
# 获取K线数据
klines = self.get_klines(symbol, timeframe, limit)
if not klines or len(klines) < 20:
logger.warning(f"{symbol} K线数据不足(需要至少20根)")
return None
# 转换为numpy数组
close = np.array([k.close for k in klines])
high = np.array([k.high for k in klines])
low = np.array([k.low for k in klines])
volume = np.array([k.volume for k in klines])
# 计算指标
calc = IndicatorCalculator()
# 计算EMA序列
ema20_seq = calc.calculate_ema_sequence(close, 20)
ema60_seq = calc.calculate_ema_sequence(close, 60)
# 计算MACD序列
dif_seq, dea_seq, hist_seq = calc.calculate_macd_sequence(close)
# 计算RSI序列
rsi7_seq = calc.calculate_rsi_sequence(close, 7)
rsi14_seq = calc.calculate_rsi_sequence(close, 14)
# 计算ATR序列
atr14_seq = calc.calculate_atr_sequence(high, low, close, 14)
# 计算布林带
bb_upper, bb_middle, bb_lower = calc.calculate_bollinger_bands(close, 20, 2.0)
# 获取当前值(最新一根K线的指标)
current_price = float(close[-1])
current_ema20 = float(ema20_seq[-1]) if len(ema20_seq) > 0 and not np.isnan(ema20_seq[-1]) else 0.0
current_ema60 = float(ema60_seq[-1]) if len(ema60_seq) > 0 and not np.isnan(ema60_seq[-1]) else 0.0
current_macd_hist = float(hist_seq[-1]) if len(hist_seq) > 0 and not np.isnan(hist_seq[-1]) else 0.0
current_rsi7 = float(rsi7_seq[-1]) if len(rsi7_seq) > 0 and not np.isnan(rsi7_seq[-1]) else 0.0
current_rsi14 = float(rsi14_seq[-1]) if len(rsi14_seq) > 0 and not np.isnan(rsi14_seq[-1]) else 0.0
current_atr14 = float(atr14_seq[-1]) if len(atr14_seq) > 0 and not np.isnan(atr14_seq[-1]) else 0.0
# 计算价格变化百分比
price_change_1h = self.calculate_price_change(klines, timeframe, 1.0)
price_change_4h = self.calculate_price_change(klines, timeframe, 4.0)
# 获取OI数据
oi_data = self.get_open_interest(symbol)
if not oi_data:
oi_data = OIData(latest=0, average=0)
# 获取Funding Rate
funding_rate = self.get_funding_rate(symbol)
# 构建日内系列数据(最近10个数据点)
intraday_series = IntradaySeriesData(
mid_prices=calc.safe_get_last_n(close, 10),
volume_values=calc.safe_get_last_n(volume, 10),
ema20_values=calc.safe_get_last_n(ema20_seq, 10),
ema60_values=calc.safe_get_last_n(ema60_seq, 10),
macd_dif=calc.safe_get_last_n(dif_seq, 10),
macd_dea=calc.safe_get_last_n(dea_seq, 10),
macd_hist=calc.safe_get_last_n(hist_seq, 10),
rsi7_values=calc.safe_get_last_n(rsi7_seq, 10),
rsi14_values=calc.safe_get_last_n(rsi14_seq, 10),
atr14_values=calc.safe_get_last_n(atr14_seq, 10),
bb_upper=calc.safe_get_last_n(bb_upper, 10),
bb_middle=calc.safe_get_last_n(bb_middle, 10),
bb_lower=calc.safe_get_last_n(bb_lower, 10),
)
# 构建市场数据对象
market_data = MarketData(
symbol=symbol,
timeframe=timeframe,
current_price=current_price,
price_change_1h=price_change_1h,
price_change_4h=price_change_4h,
current_ema20=current_ema20,
current_ema60=current_ema60,
current_macd_hist=current_macd_hist,
current_rsi7=current_rsi7,
current_rsi14=current_rsi14,
current_atr14=current_atr14,
open_interest=oi_data,
funding_rate=funding_rate,
intraday_series=intraday_series,
timestamp=datetime.utcnow().isoformat() + 'Z'
)
logger.info(f"✅ 成功获取 {symbol} ({timeframe}) 市场数据")
return market_data
# ============================================================================
# MCP 服务器
# ============================================================================
class CryptoIndicatorsMCPServer:
"""加密货币指标 MCP 服务器"""
def __init__(self, base_url: str = "https://fapi.asterdex.com"):
self.server = Server("crypto-indicators")
self.data_provider = CryptoDataProvider(base_url)
# 注册工具
self._register_tools()
logger.info("📊 加密货币指标 MCP 服务器已初始化")
def _register_tools(self):
"""注册MCP工具"""
@self.server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
"""列出所有可用工具"""
return [
types.Tool(
name="get_crypto_indicators",
description="获取指定加密货币和周期的技术指标数据(支持分钟/小时/日/周多周期)",
inputSchema={
"type": "object",
"properties": {
"symbol": {
"type": "string",
"description": "交易对代码(例如:BTC, ETH, BTCUSDT, ETHUSDT)",
},
"timeframe": {
"type": "string",
"description": "时间周期:1m/3m/5m/15m/30m/1h/2h/4h/6h/12h/1d/1w,默认 1h",
"default": "1h",
},
"limit": {
"type": "integer",
"description": "获取K线数量(默认1000)",
"default": 1000,
},
},
"required": ["symbol"],
},
),
types.Tool(
name="get_multi_timeframe_analysis",
description="获取指定加密货币的多时间框架分析(同时获取多个周期的数据)",
inputSchema={
"type": "object",
"properties": {
"symbol": {
"type": "string",
"description": "交易对代码(例如:BTC, ETH)",
},
"timeframes": {
"type": "array",
"items": {"type": "string"},
"description": "时间周期列表(默认:['1d', '4h', '1h', '15m'])",
"default": ["1d", "4h", "1h", "15m"],
},
},
"required": ["symbol"],
},
),
]
@self.server.call_tool()
async def handle_call_tool(
name: str, arguments: dict | None
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
"""处理工具调用"""
if name == "get_crypto_indicators":
return await self._handle_get_crypto_indicators(arguments or {})
elif name == "get_multi_timeframe_analysis":
return await self._handle_multi_timeframe_analysis(arguments or {})
else:
raise ValueError(f"未知工具: {name}")
async def _handle_get_crypto_indicators(self, arguments: dict) -> list[types.TextContent]:
"""处理获取加密货币指标请求"""
symbol = arguments.get("symbol")
timeframe = arguments.get("timeframe", "1h")
limit = arguments.get("limit", 1000)
if not symbol:
return [types.TextContent(
type="text",
text=json.dumps({"error": "缺少必需参数: symbol"}, ensure_ascii=False, indent=2)
)]
# 获取市场数据
market_data = self.data_provider.get_market_data(symbol, timeframe, limit)
if not market_data:
return [types.TextContent(
type="text",
text=json.dumps({"error": f"获取 {symbol} ({timeframe}) 数据失败"}, ensure_ascii=False, indent=2)
)]
# 格式化输出(参考Go的Format方法)
formatted_output = self._format_market_data(market_data)
return [types.TextContent(
type="text",
text=formatted_output
)]
async def _handle_multi_timeframe_analysis(self, arguments: dict) -> list[types.TextContent]:
"""处理多时间框架分析请求"""
symbol = arguments.get("symbol")
timeframes = arguments.get("timeframes", ["1d", "4h", "1h", "15m"])
if not symbol:
return [types.TextContent(
type="text",
text=json.dumps({"error": "缺少必需参数: symbol"}, ensure_ascii=False, indent=2)
)]
# 获取多个时间框架的数据
results = {}
for tf in timeframes:
market_data = self.data_provider.get_market_data(symbol, tf, 1000)
if market_data:
results[tf] = asdict(market_data)
else:
results[tf] = {"error": f"获取 {tf} 数据失败"}
# 格式化输出
symbol_normalized = self.data_provider.normalize_symbol(symbol)
output = f"# {symbol_normalized} 多时间框架分析\n\n"
for tf, data in results.items():
if "error" in data:
output += f"## {tf} 周期:{data['error']}\n\n"
else:
output += f"## {tf} 周期\n"
output += f"当前价格: ${data['current_price']:.2f}\n"
output += f"EMA20: ${data['current_ema20']:.2f}\n"
output += f"EMA60: ${data['current_ema60']:.2f}\n"
output += f"MACD柱: {data['current_macd_hist']:.3f}\n"
output += f"RSI(7): {data['current_rsi7']:.2f}\n"
output += f"RSI(14): {data['current_rsi14']:.2f}\n"
output += f"ATR(14): ${data['current_atr14']:.2f}\n"
output += f"1h价格变化: {data['price_change_1h']:.2f}%\n"
output += f"4h价格变化: {data['price_change_4h']:.2f}%\n"
output += f"资金费率: {data['funding_rate']:.6f}\n"
if data.get('open_interest'):
oi = data['open_interest']
output += f"持仓量: {oi['latest']:.2f}\n"
output += "\n"
return [types.TextContent(
type="text",
text=output
)]
def _format_market_data(self, data: MarketData) -> str:
"""格式化市场数据输出(参考Go的Format方法)"""
output = f"# {data.symbol} ({data.timeframe}) 市场数据\n\n"
output += f"**当前价格**: ${data.current_price:.2f}\n"
output += f"**EMA20**: ${data.current_ema20:.2f}\n"
output += f"**EMA60**: ${data.current_ema60:.2f}\n"
output += f"**MACD柱**: {data.current_macd_hist:.3f}\n"
output += f"**RSI(7)**: {data.current_rsi7:.2f}\n"
output += f"**RSI(14)**: {data.current_rsi14:.2f}\n"
output += f"**ATR(14)**: ${data.current_atr14:.2f}\n"
output += f"**1小时价格变化**: {data.price_change_1h:.2f}%\n"
output += f"**4小时价格变化**: {data.price_change_4h:.2f}%\n"
output += f"**资金费率**: {data.funding_rate:.6f}\n"
if data.open_interest:
output += f"**持仓量**: 最新={data.open_interest.latest:.2f}, 平均={data.open_interest.average:.2f}\n"
output += "\n## 日内系列数据(最近10个数据点,从旧到新)\n\n"
series = data.intraday_series
def format_list(values: List[Optional[float]], precision: int = 2, prefix: str = "") -> str:
"""格式化数值列表"""
formatted = []
for v in values:
if v is None:
formatted.append("NaN")
else:
formatted.append(f"{prefix}{v:.{precision}f}")
return "[" + ", ".join(formatted) + "]"
output += f"**价格**: {format_list(series.mid_prices, 2, '$')}\n\n"
output += f"**成交量**: {format_list(series.volume_values, 0)}\n\n"
output += f"**EMA20**: {format_list(series.ema20_values, 2, '$')}\n\n"
output += f"**EMA60**: {format_list(series.ema60_values, 2, '$')}\n\n"
output += f"**MACD DIF**: {format_list(series.macd_dif, 3)}\n\n"
output += f"**MACD DEA**: {format_list(series.macd_dea, 3)}\n\n"
output += f"**MACD HIST**: {format_list(series.macd_hist, 3)}\n\n"
output += f"**RSI(7)**: {format_list(series.rsi7_values, 2)}\n\n"
output += f"**RSI(14)**: {format_list(series.rsi14_values, 2)}\n\n"
output += f"**ATR(14)**: {format_list(series.atr14_values, 2, '$')}\n\n"
output += f"**布林带上轨**: {format_list(series.bb_upper, 2, '$')}\n\n"
output += f"**布林带中轨**: {format_list(series.bb_middle, 2, '$')}\n\n"
output += f"**布林带下轨**: {format_list(series.bb_lower, 2, '$')}\n\n"
output += f"*数据时间: {data.timestamp}*\n"
return output
async def run(self):
"""运行MCP服务器"""
logger.info("🚀 启动加密货币指标 MCP 服务器...")
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await self.server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="crypto-indicators",
server_version="1.0.0",
capabilities=self.server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)
# ============================================================================
# 主程序
# ============================================================================
async def main():
"""主入口"""
logger.info("="*60)
logger.info("📊 加密货币市场数据和技术指标计算 MCP 服务")
logger.info("="*60)
# 创建并运行服务器
server = CryptoIndicatorsMCPServer()
await server.run()
if __name__ == "__main__":
import asyncio
asyncio.run(main())