examples.py7.45 kB
#!/usr/bin/env python3
"""
股票分析MCP工具使用示例
"""
import json
from stock_mcp_server import (
get_stock_realtime_data,
get_stock_history_data,
calculate_technical_indicators,
get_market_sentiment,
search_stock_info,
get_stock_news
)
def example_realtime_analysis():
"""示例:实时行情分析"""
print("=" * 60)
print("示例1: 实时行情分析")
print("=" * 60)
# 获取平安银行实时数据
stock_code = "000001"
print(f"正在获取 {stock_code} 的实时行情...")
realtime_data = get_stock_realtime_data(stock_code)
if "error" not in realtime_data:
print(f"股票名称: {realtime_data['股票名称']}")
print(f"最新价: {realtime_data['最新价']}")
print(f"涨跌幅: {realtime_data['涨跌幅']}%")
print(f"成交量: {realtime_data['成交量']:,}")
print(f"成交额: {realtime_data['成交额']:,.2f}")
# 简单分析
if realtime_data['涨跌幅'] > 5:
print("📈 分析: 强势上涨")
elif realtime_data['涨跌幅'] > 0:
print("📊 分析: 温和上涨")
elif realtime_data['涨跌幅'] < -5:
print("📉 分析: 大幅下跌")
else:
print("📊 分析: 小幅波动")
else:
print(f"获取数据失败: {realtime_data['error']}")
def example_technical_analysis():
"""示例:技术指标分析"""
print("\n" + "=" * 60)
print("示例2: 技术指标分析")
print("=" * 60)
stock_code = "000001"
print(f"正在计算 {stock_code} 的技术指标...")
indicators = calculate_technical_indicators(stock_code, ["ma", "macd", "rsi"])
if "error" not in indicators:
tech_data = indicators['技术指标']
# 移动平均线分析
if '移动平均线' in tech_data:
ma_data = tech_data['移动平均线']
current_price = ma_data['当前价格']
print(f"\n📊 移动平均线分析:")
print(f"当前价格: {current_price}")
print(f"MA5: {ma_data['MA5']}")
print(f"MA20: {ma_data['MA20']}")
if current_price > ma_data['MA5'] > ma_data['MA20']:
print("✅ 多头排列,趋势向上")
elif current_price < ma_data['MA5'] < ma_data['MA20']:
print("❌ 空头排列,趋势向下")
else:
print("⚖️ 均线纠缠,方向不明")
# RSI分析
if 'RSI' in tech_data:
rsi_data = tech_data['RSI']
print(f"\n📈 RSI分析:")
print(f"RSI14: {rsi_data['RSI14']:.2f}")
print(f"信号: {rsi_data['信号']}")
# MACD分析
if 'MACD' in tech_data:
macd_data = tech_data['MACD']
print(f"\n📉 MACD分析:")
print(f"趋势: {macd_data['趋势']}")
print(f"MACD: {macd_data['MACD']:.4f}")
else:
print(f"计算技术指标失败: {indicators['error']}")
def example_market_sentiment():
"""示例:市场情绪分析"""
print("\n" + "=" * 60)
print("示例3: 市场情绪分析")
print("=" * 60)
print("正在分析市场整体情绪...")
sentiment = get_market_sentiment()
if "error" not in sentiment:
if '市场情绪' in sentiment:
market_data = sentiment['市场情绪']
print(f"\n🏛️ 市场整体情况:")
print(f"上涨股票: {market_data['上涨股票数']} ({market_data['上涨比例']}%)")
print(f"下跌股票: {market_data['下跌股票数']} ({market_data['下跌比例']}%)")
print(f"市场情绪: {market_data['市场情绪']}")
if '北向资金' in sentiment:
north_data = sentiment['北向资金']
print(f"\n💰 北向资金流向:")
print(f"北向资金净流入: {north_data['北向资金净流入']:,.2f}万元")
print(f"外资情绪: {north_data['外资情绪']}")
else:
print(f"获取市场情绪失败: {sentiment.get('error', '未知错误')}")
def example_stock_search():
"""示例:股票搜索"""
print("\n" + "=" * 60)
print("示例4: 股票搜索")
print("=" * 60)
keyword = "银行"
print(f"搜索关键词: {keyword}")
search_result = search_stock_info(keyword)
if "error" not in search_result:
print(f"\n🔍 找到 {search_result['匹配数量']} 只相关股票:")
for i, stock in enumerate(search_result['搜索结果'][:5], 1):
print(f"{i}. {stock['股票名称']}({stock['股票代码']}) - "
f"价格: {stock['最新价']} 涨跌幅: {stock['涨跌幅']}%")
else:
print(f"搜索失败: {search_result['error']}")
def example_comprehensive_analysis():
"""示例:综合分析"""
print("\n" + "=" * 60)
print("示例5: 综合股票分析")
print("=" * 60)
stock_code = "000001"
print(f"对股票 {stock_code} 进行综合分析...")
# 1. 基本信息
realtime = get_stock_realtime_data(stock_code)
if "error" not in realtime:
print(f"\n📋 基本信息:")
print(f"股票名称: {realtime['股票名称']}")
print(f"最新价: {realtime['最新价']} ({realtime['涨跌幅']:+.2f}%)")
# 2. 技术面分析
indicators = calculate_technical_indicators(stock_code, ["ma", "rsi"])
if "error" not in indicators:
tech_data = indicators['技术指标']
print(f"\n🔧 技术面:")
if 'RSI' in tech_data:
rsi_signal = tech_data['RSI']['信号']
print(f"RSI信号: {rsi_signal}")
# 3. 资金面分析
sentiment = get_market_sentiment(stock_code)
if "error" not in sentiment and '个股情绪' in sentiment:
fund_data = sentiment['个股情绪']
if '主力净流入' in fund_data:
net_inflow = fund_data['主力净流入']
print(f"\n💰 资金面:")
print(f"主力净流入: {net_inflow:,.2f}万元")
print(f"资金情绪: {fund_data.get('情绪判断', '未知')}")
# 4. 新闻面
news = get_stock_news(stock_code, 3)
if "error" not in news and '个股新闻' in news:
news_data = news['个股新闻']
if news_data.get('新闻数量', 0) > 0:
print(f"\n📰 最新消息:")
for i, item in enumerate(news_data['新闻列表'][:2], 1):
print(f"{i}. {item['标题']}")
print(f"\n✅ {stock_code} 综合分析完成")
def main():
"""主函数"""
print("🚀 股票分析MCP工具使用示例")
print("本示例将演示各种分析功能的使用方法")
try:
# 运行各种示例
example_realtime_analysis()
example_technical_analysis()
example_market_sentiment()
example_stock_search()
example_comprehensive_analysis()
print("\n" + "=" * 60)
print("🎉 所有示例运行完成!")
print("您可以根据这些示例来使用股票分析工具")
print("=" * 60)
except Exception as e:
print(f"\n❌ 运行示例时出现错误: {e}")
print("请检查网络连接和依赖包安装情况")
if __name__ == "__main__":
main()