stock_mcp_server.py19.2 kB
#!/usr/bin/env python3
"""
股票分析MCP服务器
基于fastmcp和AKShare API实现股票数据分析功能
支持CherryStudio的SSE标准协议
"""
import asyncio
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
import pandas as pd
import akshare as ak
from fastmcp import FastMCP
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 创建MCP服务器实例
mcp = FastMCP("股票分析工具")
class StockAnalyzer:
"""股票分析器类"""
def __init__(self):
self.cache = {}
self.cache_timeout = 300 # 5分钟缓存
def _get_cache_key(self, func_name: str, **kwargs) -> str:
"""生成缓存键"""
return f"{func_name}_{hash(str(sorted(kwargs.items())))}"
def _is_cache_valid(self, timestamp: datetime) -> bool:
"""检查缓存是否有效"""
return (datetime.now() - timestamp).seconds < self.cache_timeout
def _get_from_cache(self, key: str) -> Optional[Any]:
"""从缓存获取数据"""
if key in self.cache:
data, timestamp = self.cache[key]
if self._is_cache_valid(timestamp):
return data
else:
del self.cache[key]
return None
def _set_cache(self, key: str, data: Any):
"""设置缓存"""
self.cache[key] = (data, datetime.now())
# 创建股票分析器实例
analyzer = StockAnalyzer()
@mcp.tool()
def get_stock_realtime_data(symbol: str) -> Dict[str, Any]:
"""
获取股票实时行情数据
Args:
symbol: 股票代码,如 '000001' 或 'sh000001'
Returns:
包含实时行情数据的字典
"""
try:
cache_key = analyzer._get_cache_key("realtime", symbol=symbol)
cached_data = analyzer._get_from_cache(cache_key)
if cached_data:
return cached_data
# 获取实时数据
df = ak.stock_zh_a_spot_em()
stock_data = df[df['代码'] == symbol]
if stock_data.empty:
return {"error": f"未找到股票代码 {symbol} 的数据"}
result = {
"股票代码": symbol,
"股票名称": stock_data.iloc[0]['名称'],
"最新价": float(stock_data.iloc[0]['最新价']),
"涨跌幅": float(stock_data.iloc[0]['涨跌幅']),
"涨跌额": float(stock_data.iloc[0]['涨跌额']),
"成交量": int(stock_data.iloc[0]['成交量']),
"成交额": float(stock_data.iloc[0]['成交额']),
"振幅": float(stock_data.iloc[0]['振幅']),
"最高": float(stock_data.iloc[0]['最高']),
"最低": float(stock_data.iloc[0]['最低']),
"今开": float(stock_data.iloc[0]['今开']),
"昨收": float(stock_data.iloc[0]['昨收']),
"更新时间": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
analyzer._set_cache(cache_key, result)
return result
except Exception as e:
logger.error(f"获取实时数据失败: {e}")
return {"error": f"获取实时数据失败: {str(e)}"}
@mcp.tool()
def get_stock_history_data(symbol: str, period: str = "daily", start_date: str = "", end_date: str = "") -> Dict[str, Any]:
"""
获取股票历史数据
Args:
symbol: 股票代码
period: 数据周期 ('daily', 'weekly', 'monthly')
start_date: 开始日期 (YYYYMMDD格式)
end_date: 结束日期 (YYYYMMDD格式)
Returns:
包含历史数据的字典
"""
try:
cache_key = analyzer._get_cache_key("history", symbol=symbol, period=period, start_date=start_date, end_date=end_date)
cached_data = analyzer._get_from_cache(cache_key)
if cached_data:
return cached_data
# 设置默认日期范围
if not end_date:
end_date = datetime.now().strftime("%Y%m%d")
if not start_date:
start_date = (datetime.now() - timedelta(days=365)).strftime("%Y%m%d")
# 获取历史数据
if period == "daily":
df = ak.stock_zh_a_hist(symbol=symbol, period="daily", start_date=start_date, end_date=end_date)
elif period == "weekly":
df = ak.stock_zh_a_hist(symbol=symbol, period="weekly", start_date=start_date, end_date=end_date)
elif period == "monthly":
df = ak.stock_zh_a_hist(symbol=symbol, period="monthly", start_date=start_date, end_date=end_date)
else:
return {"error": "不支持的周期类型"}
if df.empty:
return {"error": f"未找到股票代码 {symbol} 的历史数据"}
# 转换数据格式
history_data = []
for _, row in df.tail(100).iterrows(): # 限制返回最近100条记录
history_data.append({
"日期": row['日期'].strftime("%Y-%m-%d") if pd.notna(row['日期']) else "",
"开盘": float(row['开盘']) if pd.notna(row['开盘']) else 0,
"收盘": float(row['收盘']) if pd.notna(row['收盘']) else 0,
"最高": float(row['最高']) if pd.notna(row['最高']) else 0,
"最低": float(row['最低']) if pd.notna(row['最低']) else 0,
"成交量": int(row['成交量']) if pd.notna(row['成交量']) else 0,
"成交额": float(row['成交额']) if pd.notna(row['成交额']) else 0,
"振幅": float(row['振幅']) if pd.notna(row['振幅']) else 0,
"涨跌幅": float(row['涨跌幅']) if pd.notna(row['涨跌幅']) else 0,
"涨跌额": float(row['涨跌额']) if pd.notna(row['涨跌额']) else 0,
"换手率": float(row['换手率']) if pd.notna(row['换手率']) else 0
})
result = {
"股票代码": symbol,
"数据周期": period,
"开始日期": start_date,
"结束日期": end_date,
"数据条数": len(history_data),
"历史数据": history_data
}
analyzer._set_cache(cache_key, result)
return result
except Exception as e:
logger.error(f"获取历史数据失败: {e}")
return {"error": f"获取历史数据失败: {str(e)}"}
@mcp.tool()
def calculate_technical_indicators(symbol: str, indicators: List[str] = None) -> Dict[str, Any]:
"""
计算技术指标
Args:
symbol: 股票代码
indicators: 指标列表 ['ma', 'macd', 'rsi', 'boll', 'kdj']
Returns:
包含技术指标的字典
"""
try:
if indicators is None:
indicators = ['ma', 'macd', 'rsi']
cache_key = analyzer._get_cache_key("indicators", symbol=symbol, indicators=str(indicators))
cached_data = analyzer._get_from_cache(cache_key)
if cached_data:
return cached_data
# 获取历史数据用于计算指标
end_date = datetime.now().strftime("%Y%m%d")
start_date = (datetime.now() - timedelta(days=365)).strftime("%Y%m%d")
df = ak.stock_zh_a_hist(symbol=symbol, period="daily", start_date=start_date, end_date=end_date)
if df.empty:
return {"error": f"未找到股票代码 {symbol} 的数据"}
result = {"股票代码": symbol, "技术指标": {}}
# 计算移动平均线
if 'ma' in indicators:
df['MA5'] = df['收盘'].rolling(window=5).mean()
df['MA10'] = df['收盘'].rolling(window=10).mean()
df['MA20'] = df['收盘'].rolling(window=20).mean()
df['MA60'] = df['收盘'].rolling(window=60).mean()
latest_data = df.iloc[-1]
result["技术指标"]["移动平均线"] = {
"MA5": float(latest_data['MA5']) if pd.notna(latest_data['MA5']) else None,
"MA10": float(latest_data['MA10']) if pd.notna(latest_data['MA10']) else None,
"MA20": float(latest_data['MA20']) if pd.notna(latest_data['MA20']) else None,
"MA60": float(latest_data['MA60']) if pd.notna(latest_data['MA60']) else None,
"当前价格": float(latest_data['收盘'])
}
# 计算RSI
if 'rsi' in indicators:
delta = df['收盘'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
result["技术指标"]["RSI"] = {
"RSI14": float(rsi.iloc[-1]) if pd.notna(rsi.iloc[-1]) else None,
"信号": "超买" if rsi.iloc[-1] > 70 else "超卖" if rsi.iloc[-1] < 30 else "正常"
}
# 计算MACD
if 'macd' in indicators:
exp1 = df['收盘'].ewm(span=12).mean()
exp2 = df['收盘'].ewm(span=26).mean()
macd_line = exp1 - exp2
signal_line = macd_line.ewm(span=9).mean()
histogram = macd_line - signal_line
result["技术指标"]["MACD"] = {
"MACD": float(macd_line.iloc[-1]) if pd.notna(macd_line.iloc[-1]) else None,
"信号线": float(signal_line.iloc[-1]) if pd.notna(signal_line.iloc[-1]) else None,
"柱状图": float(histogram.iloc[-1]) if pd.notna(histogram.iloc[-1]) else None,
"趋势": "看涨" if histogram.iloc[-1] > 0 else "看跌"
}
analyzer._set_cache(cache_key, result)
return result
except Exception as e:
logger.error(f"计算技术指标失败: {e}")
return {"error": f"计算技术指标失败: {str(e)}"}
@mcp.tool()
def get_market_sentiment(symbol: str = "") -> Dict[str, Any]:
"""
获取市场情绪分析
Args:
symbol: 股票代码,为空时获取整体市场情绪
Returns:
包含市场情绪数据的字典
"""
try:
cache_key = analyzer._get_cache_key("sentiment", symbol=symbol)
cached_data = analyzer._get_from_cache(cache_key)
if cached_data:
return cached_data
result = {"分析时间": datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
if symbol:
# 个股情绪分析
# 获取资金流向数据
try:
df_flow = ak.stock_individual_fund_flow(stock=symbol, market="sh" if symbol.startswith("6") else "sz")
if not df_flow.empty:
latest_flow = df_flow.iloc[-1]
result["个股情绪"] = {
"股票代码": symbol,
"主力净流入": float(latest_flow['主力净流入']) if '主力净流入' in latest_flow else 0,
"超大单净流入": float(latest_flow['超大单净流入']) if '超大单净流入' in latest_flow else 0,
"大单净流入": float(latest_flow['大单净流入']) if '大单净流入' in latest_flow else 0,
"中单净流入": float(latest_flow['中单净流入']) if '中单净流入' in latest_flow else 0,
"小单净流入": float(latest_flow['小单净流入']) if '小单净流入' in latest_flow else 0,
"情绪判断": "积极" if latest_flow.get('主力净流入', 0) > 0 else "消极"
}
except:
result["个股情绪"] = {"错误": "无法获取个股资金流向数据"}
# 市场整体情绪
try:
# 获取A股市场总体情况
df_market = ak.stock_zh_a_spot_em()
if not df_market.empty:
up_count = len(df_market[df_market['涨跌幅'] > 0])
down_count = len(df_market[df_market['涨跌幅'] < 0])
total_count = len(df_market)
result["市场情绪"] = {
"上涨股票数": up_count,
"下跌股票数": down_count,
"平盘股票数": total_count - up_count - down_count,
"上涨比例": round(up_count / total_count * 100, 2),
"下跌比例": round(down_count / total_count * 100, 2),
"市场情绪": "乐观" if up_count > down_count else "悲观" if down_count > up_count else "中性"
}
except:
result["市场情绪"] = {"错误": "无法获取市场整体数据"}
# 获取北向资金流向
try:
df_north = ak.stock_hsgt_fund_flow_summary_em()
if not df_north.empty:
latest_north = df_north.iloc[-1]
result["北向资金"] = {
"日期": latest_north['日期'].strftime("%Y-%m-%d") if pd.notna(latest_north['日期']) else "",
"沪股通净流入": float(latest_north['沪股通净流入']) if pd.notna(latest_north['沪股通净流入']) else 0,
"深股通净流入": float(latest_north['深股通净流入']) if pd.notna(latest_north['深股通净流入']) else 0,
"北向资金净流入": float(latest_north['北向资金净流入']) if pd.notna(latest_north['北向资金净流入']) else 0,
"外资情绪": "积极" if latest_north.get('北向资金净流入', 0) > 0 else "消极"
}
except:
result["北向资金"] = {"错误": "无法获取北向资金数据"}
analyzer._set_cache(cache_key, result)
return result
except Exception as e:
logger.error(f"获取市场情绪失败: {e}")
return {"error": f"获取市场情绪失败: {str(e)}"}
@mcp.tool()
def search_stock_info(keyword: str) -> Dict[str, Any]:
"""
搜索股票信息
Args:
keyword: 搜索关键词(股票名称或代码)
Returns:
包含搜索结果的字典
"""
try:
cache_key = analyzer._get_cache_key("search", keyword=keyword)
cached_data = analyzer._get_from_cache(cache_key)
if cached_data:
return cached_data
# 获取所有A股列表
df = ak.stock_zh_a_spot_em()
# 搜索匹配的股票
matches = df[
df['名称'].str.contains(keyword, na=False) |
df['代码'].str.contains(keyword, na=False)
].head(10) # 限制返回10个结果
if matches.empty:
return {"error": f"未找到包含关键词 '{keyword}' 的股票"}
results = []
for _, row in matches.iterrows():
results.append({
"股票代码": row['代码'],
"股票名称": row['名称'],
"最新价": float(row['最新价']),
"涨跌幅": float(row['涨跌幅']),
"成交量": int(row['成交量']),
"市值": float(row['总市值']) if '总市值' in row else 0
})
result = {
"搜索关键词": keyword,
"匹配数量": len(results),
"搜索结果": results
}
analyzer._set_cache(cache_key, result)
return result
except Exception as e:
logger.error(f"搜索股票信息失败: {e}")
return {"error": f"搜索股票信息失败: {str(e)}"}
@mcp.tool()
def get_stock_news(symbol: str = "", limit: int = 10) -> Dict[str, Any]:
"""
获取股票相关新闻
Args:
symbol: 股票代码,为空时获取市场新闻
limit: 新闻数量限制
Returns:
包含新闻数据的字典
"""
try:
cache_key = analyzer._get_cache_key("news", symbol=symbol, limit=limit)
cached_data = analyzer._get_from_cache(cache_key)
if cached_data:
return cached_data
result = {"获取时间": datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
if symbol:
# 获取个股新闻
try:
df_news = ak.stock_news_em(symbol=symbol)
if not df_news.empty:
news_list = []
for _, row in df_news.head(limit).iterrows():
news_list.append({
"标题": row['新闻标题'] if '新闻标题' in row else "",
"发布时间": row['发布时间'].strftime("%Y-%m-%d %H:%M:%S") if pd.notna(row.get('发布时间')) else "",
"新闻内容": row['新闻内容'][:200] + "..." if len(str(row.get('新闻内容', ''))) > 200 else str(row.get('新闻内容', '')),
"来源": row['新闻来源'] if '新闻来源' in row else ""
})
result["个股新闻"] = {
"股票代码": symbol,
"新闻数量": len(news_list),
"新闻列表": news_list
}
else:
result["个股新闻"] = {"错误": f"未找到股票 {symbol} 的相关新闻"}
except:
result["个股新闻"] = {"错误": "无法获取个股新闻数据"}
else:
# 获取市场新闻
try:
df_market_news = ak.stock_news_em()
if not df_market_news.empty:
market_news_list = []
for _, row in df_market_news.head(limit).iterrows():
market_news_list.append({
"标题": row['新闻标题'] if '新闻标题' in row else "",
"发布时间": row['发布时间'].strftime("%Y-%m-%d %H:%M:%S") if pd.notna(row.get('发布时间')) else "",
"新闻内容": row['新闻内容'][:200] + "..." if len(str(row.get('新闻内容', ''))) > 200 else str(row.get('新闻内容', '')),
"来源": row['新闻来源'] if '新闻来源' in row else ""
})
result["市场新闻"] = {
"新闻数量": len(market_news_list),
"新闻列表": market_news_list
}
else:
result["市场新闻"] = {"错误": "未找到市场新闻"}
except:
result["市场新闻"] = {"错误": "无法获取市场新闻数据"}
analyzer._set_cache(cache_key, result)
return result
except Exception as e:
logger.error(f"获取新闻失败: {e}")
return {"error": f"获取新闻失败: {str(e)}"}
if __name__ == "__main__":
# 启动MCP服务器
mcp.run()