Skip to main content
Glama

Stock MCP Server

by huweihua123
market_service.py25 kB
#!/usr/bin/env python3 """ 市场数据服务 - 优化版本 整合优化后的数据源(akshare_optimized, tushare_optimized, tdx_service, yfinance_service) 实现智能降级机制,并能够生成完整的市场技术分析报告 """ import logging import warnings from typing import Dict, Optional, List, Any from datetime import datetime, timedelta import pandas as pd import numpy as np from ..utils.symbol_processor import get_symbol_processor from ..utils.data_source_strategy import get_data_source_strategy from ..exception.exception import DataNotFoundError logger = logging.getLogger("market_service") warnings.filterwarnings("ignore") class MarketDataService: """市场数据服务 - 支持多数据源降级和报告生成""" def __init__(self): """初始化市场数据服务""" self.symbol_processor = get_symbol_processor() self.strategy = get_data_source_strategy() self.services = {} self._init_services() def _init_services(self): """初始化各数据源服务""" # 1. Tushare优化服务 try: from .tushare_service import get_tushare_service self.services["tushare"] = get_tushare_service() logger.info("✅ Tushare优化服务初始化成功") except Exception as e: logger.warning(f"⚠️ Tushare优化服务初始化失败: {e}") # 2. AKShare优化服务 try: from .akshare_service import get_akshare_service self.services["akshare"] = get_akshare_service() logger.info("✅ AKShare优化服务初始化成功") except Exception as e: logger.warning(f"⚠️ AKShare优化服务初始化失败: {e}") # 3. 通达信服务 try: from .tdx_service import get_tdx_service self.services["tdx"] = get_tdx_service() logger.info("✅ 通达信服务初始化成功") except Exception as e: logger.warning(f"⚠️ 通达信服务初始化失败: {e}") # 4. YFinance服务 try: from .yfinance_service import YFinanceService self.services["yfinance"] = YFinanceService() logger.info("✅ YFinance服务初始化成功") except Exception as e: logger.warning(f"⚠️ YFinance服务初始化失败: {e}") def get_data_source_priority(self, symbol: str) -> List[str]: """ 根据股票代码获取数据源优先级列表 Args: symbol: 股票代码 Returns: List[str]: 数据源优先级列表 """ return self.strategy.get_market_data_sources(symbol) def get_stock_daily_data( self, symbol: str, start_date: str = None, end_date: str = None ) -> pd.DataFrame: """ 获取股票日线数据(带智能降级) Args: symbol: 股票代码 start_date: 开始日期 'YYYY-MM-DD' end_date: 结束日期 'YYYY-MM-DD' Returns: pd.DataFrame: 标准化的日线数据 """ # 设置默认日期 if end_date is None: end_date = datetime.now().strftime("%Y-%m-%d") if start_date is None: start_date = (datetime.now() - timedelta(days=180)).strftime("%Y-%m-%d") # 获取数据源优先级 data_sources = self.get_data_source_priority(symbol) logger.info(f"📊 获取 {symbol} 的市场数据 ({start_date} 到 {end_date})") logger.info(f"🔄 数据源优先级: {data_sources}") last_error = None for source in data_sources: if source not in self.services: continue try: logger.info(f"🔄 尝试从 {source} 获取数据...") data = self._get_data_from_source(source, symbol, start_date, end_date) if data is not None and not data.empty: logger.info(f"✅ 成功从 {source} 获取 {len(data)} 条数据") return self._standardize_data(data, source) except Exception as e: last_error = e logger.warning(f"⚠️ {source} 获取失败: {e}") continue # 所有数据源都失败 raise DataNotFoundError( f"无法从任何数据源获取 {symbol} 的数据。最后错误: {last_error}" ) def _get_data_from_source( self, source: str, symbol: str, start_date: str, end_date: str ) -> Optional[pd.DataFrame]: """从指定数据源获取数据""" service = self.services.get(source) if not service: return None classification = self.symbol_processor.classifier.classify_stock(symbol) if source == "tushare": # Tushare优化服务 return service.get_stock_daily(symbol, start_date, end_date) elif source == "akshare": # AKShare优化服务 if classification["is_china"]: return service.get_stock_daily(symbol, start_date, end_date) elif classification["is_hk"]: return service.get_hk_daily(symbol, start_date, end_date) elif classification["is_us"]: return service.get_us_daily(symbol, start_date, end_date) elif source == "tdx": # 通达信服务(仅支持A股) if classification["is_china"]: return service.get_stock_daily(symbol, start_date, end_date) elif source == "yfinance": # YFinance服务 yf_symbol = self.symbol_processor.get_yfinance_format(symbol) return service.get_stock_daily(yf_symbol, start_date, end_date) return None def _standardize_data(self, data: pd.DataFrame, source: str) -> pd.DataFrame: """标准化数据格式""" if data.empty: return data # 确保必要的列存在 required_columns = ["date", "open", "high", "low", "close", "volume"] # 列名映射 column_mapping = { "trade_date": "date", "datetime": "date", "Open": "open", "High": "high", "Low": "low", "Close": "close", "Volume": "volume", "vol": "volume", "amount": "turnover", "turnover": "turnover", } # 重命名列 data = data.rename(columns=column_mapping) # 确保日期列是datetime类型 if "date" in data.columns: data["date"] = pd.to_datetime(data["date"]) # 排序 if "date" in data.columns: data = data.sort_values("date") # 添加数据源标识 data["source"] = source return data def calculate_technical_indicators(self, data: pd.DataFrame) -> Dict[str, Any]: """ 计算技术指标 Args: data: 包含OHLCV的DataFrame Returns: Dict: 技术指标字典 """ if data.empty or len(data) < 20: return {} indicators = {} try: # 移动平均线 indicators["MA5"] = ( float(data["close"].rolling(5).mean().iloc[-1]) if len(data) >= 5 else None ) indicators["MA10"] = ( float(data["close"].rolling(10).mean().iloc[-1]) if len(data) >= 10 else None ) indicators["MA20"] = ( float(data["close"].rolling(20).mean().iloc[-1]) if len(data) >= 20 else None ) indicators["MA60"] = ( float(data["close"].rolling(60).mean().iloc[-1]) if len(data) >= 60 else None ) # RSI if len(data) >= 14: delta = data["close"].diff() gain = (delta.where(delta > 0, 0)).rolling(14).mean() loss = (-delta.where(delta < 0, 0)).rolling(14).mean() rs = gain / loss indicators["RSI"] = float((100 - (100 / (1 + rs))).iloc[-1]) # MACD if len(data) >= 26: exp1 = data["close"].ewm(span=12, adjust=False).mean() exp2 = data["close"].ewm(span=26, adjust=False).mean() macd = exp1 - exp2 signal = macd.ewm(span=9, adjust=False).mean() histogram = macd - signal indicators["MACD"] = float(macd.iloc[-1]) indicators["MACD_Signal"] = float(signal.iloc[-1]) indicators["MACD_Histogram"] = float(histogram.iloc[-1]) # 布林带 if len(data) >= 20: sma = data["close"].rolling(20).mean() std = data["close"].rolling(20).std() indicators["BOLL_Upper"] = float((sma + 2 * std).iloc[-1]) indicators["BOLL_Middle"] = float(sma.iloc[-1]) indicators["BOLL_Lower"] = float((sma - 2 * std).iloc[-1]) # KDJ if len(data) >= 9: low_min = data["low"].rolling(9).min() high_max = data["high"].rolling(9).max() rsv = (data["close"] - low_min) / (high_max - low_min) * 100 k = rsv.ewm(com=2, adjust=False).mean() d = k.ewm(com=2, adjust=False).mean() j = 3 * k - 2 * d indicators["KDJ_K"] = float(k.iloc[-1]) indicators["KDJ_D"] = float(d.iloc[-1]) indicators["KDJ_J"] = float(j.iloc[-1]) # ATR (平均真实波幅) if len(data) >= 14: high_low = data["high"] - data["low"] high_close = np.abs(data["high"] - data["close"].shift()) low_close = np.abs(data["low"] - data["close"].shift()) tr = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1) indicators["ATR"] = float(tr.rolling(14).mean().iloc[-1]) except Exception as e: logger.error(f"❌ 计算技术指标失败: {e}") return indicators def generate_market_report( self, symbol: str, start_date: str = None, end_date: str = None ) -> str: """ 生成完整的市场技术分析报告 Args: symbol: 股票代码 start_date: 开始日期 end_date: 结束日期 Returns: str: Markdown格式的分析报告 """ try: # 获取股票数据 data = self.get_stock_daily_data(symbol, start_date, end_date) if data.empty: return f"❌ 无法获取 {symbol} 的市场数据" # 获取股票分类信息 classification = self.symbol_processor.classifier.classify_stock(symbol) # 计算技术指标 indicators = self.calculate_technical_indicators(data) # 生成报告 report = self._format_market_report( symbol, data, classification, indicators, start_date, end_date ) return report except Exception as e: logger.error(f"❌ 生成市场报告失败: {e}") return f"❌ 生成 {symbol} 的市场报告失败: {str(e)}" def _format_market_report( self, symbol: str, data: pd.DataFrame, classification: Dict, indicators: Dict, start_date: str, end_date: str, ) -> str: """格式化市场分析报告""" # 基本信息 latest = data.iloc[-1] earliest = data.iloc[0] # 计算涨跌幅 price_change = latest["close"] - earliest["close"] price_change_pct = (price_change / earliest["close"]) * 100 # 计算波动率 returns = data["close"].pct_change().dropna() volatility = returns.std() * np.sqrt(252) * 100 # 年化波动率 report = f""" # {symbol} 股票技术分析报告 ## 一、基本信息 - **股票代码**: {symbol} - **市场**: {classification['market_name']} - **交易所**: {classification['exchange']} - **板块**: {classification['board']} - **币种**: {classification['currency']} - **分析期间**: {start_date} 至 {end_date} - **数据来源**: {data['source'].iloc[-1]} --- ## 二、价格趋势分析 ### 2.1 价格概览 - **最新价格**: {latest['close']:.2f} {classification['currency']} - **开盘价**: {latest['open']:.2f} - **最高价**: {latest['high']:.2f} - **最低价**: {latest['low']:.2f} - **成交量**: {latest['volume']:,.0f} ### 2.2 期间表现 - **期初价格**: {earliest['close']:.2f} - **期间最高**: {data['high'].max():.2f} - **期间最低**: {data['low'].min():.2f} - **期间涨跌**: {price_change:+.2f} ({price_change_pct:+.2f}%) - **年化波动率**: {volatility:.2f}% ### 2.3 趋势判断 {self._analyze_trend(data, indicators)} --- ## 三、技术指标分析 ### 3.1 移动平均线系统 {self._analyze_moving_averages(indicators, latest['close'])} ### 3.2 动量指标 {self._analyze_momentum_indicators(indicators)} ### 3.3 趋势指标 {self._analyze_trend_indicators(indicators)} ### 3.4 波动性指标 {self._analyze_volatility_indicators(indicators, latest['close'])} --- ## 四、成交量分析 {self._analyze_volume(data)} --- ## 五、支撑与压力位 {self._analyze_support_resistance(data)} --- ## 六、投资建议 {self._generate_trading_advice(data, indicators, classification)} --- ## 七、风险提示 ⚠️ **重要声明**: - 本报告基于历史数据和技术指标分析生成,仅供参考,不构成投资建议 - 技术分析存在滞后性,市场随时可能发生变化 - 投资有风险,入市需谨慎 - 请结合基本面分析和自身风险承受能力做出投资决策 --- *报告生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}* """ return report def _analyze_trend(self, data: pd.DataFrame, indicators: Dict) -> str: """分析价格趋势""" latest_close = data["close"].iloc[-1] trend_signals = [] # MA趋势判断 if indicators.get("MA5") and indicators.get("MA10") and indicators.get("MA20"): ma5 = indicators["MA5"] ma10 = indicators["MA10"] ma20 = indicators["MA20"] if ma5 > ma10 > ma20: trend_signals.append("✅ **多头排列**: MA5 > MA10 > MA20,短期趋势向上") elif ma5 < ma10 < ma20: trend_signals.append("⚠️ **空头排列**: MA5 < MA10 < MA20,短期趋势向下") else: trend_signals.append("⚡ **均线纠缠**: 均线系统混乱,趋势不明") # 价格与均线关系 if indicators.get("MA20"): if latest_close > indicators["MA20"]: trend_signals.append( f"📈 价格位于MA20上方 ({latest_close:.2f} > {indicators['MA20']:.2f}),处于相对强势" ) else: trend_signals.append( f"📉 价格位于MA20下方 ({latest_close:.2f} < {indicators['MA20']:.2f}),处于相对弱势" ) return "\n".join(trend_signals) if trend_signals else "暂无明确趋势信号" def _analyze_moving_averages(self, indicators: Dict, current_price: float) -> str: """分析移动平均线""" ma_analysis = [] for period in ["MA5", "MA10", "MA20", "MA60"]: if indicators.get(period): ma_value = indicators[period] diff = current_price - ma_value diff_pct = (diff / ma_value) * 100 ma_analysis.append( f"- **{period}**: {ma_value:.2f} (偏离度: {diff_pct:+.2f}%)" ) return "\n".join(ma_analysis) if ma_analysis else "暂无移动平均线数据" def _analyze_momentum_indicators(self, indicators: Dict) -> str: """分析动量指标""" momentum_analysis = [] # RSI分析 if indicators.get("RSI"): rsi = indicators["RSI"] if rsi > 70: momentum_analysis.append( f"- **RSI**: {rsi:.2f} - 🔴 超买区域,可能面临回调压力" ) elif rsi < 30: momentum_analysis.append( f"- **RSI**: {rsi:.2f} - 🟢 超卖区域,可能存在反弹机会" ) else: momentum_analysis.append(f"- **RSI**: {rsi:.2f} - ⚪ 中性区域") # KDJ分析 if ( indicators.get("KDJ_K") and indicators.get("KDJ_D") and indicators.get("KDJ_J") ): k = indicators["KDJ_K"] d = indicators["KDJ_D"] j = indicators["KDJ_J"] kdj_signal = "中性" if k > d and k > 50: kdj_signal = "多头信号" elif k < d and k < 50: kdj_signal = "空头信号" momentum_analysis.append( f"- **KDJ**: K={k:.2f}, D={d:.2f}, J={j:.2f} - {kdj_signal}" ) return "\n".join(momentum_analysis) if momentum_analysis else "暂无动量指标数据" def _analyze_trend_indicators(self, indicators: Dict) -> str: """分析趋势指标""" trend_analysis = [] # MACD分析 if indicators.get("MACD") and indicators.get("MACD_Signal"): macd = indicators["MACD"] signal = indicators["MACD_Signal"] hist = indicators.get("MACD_Histogram", macd - signal) if macd > signal and hist > 0: trend_analysis.append( f"- **MACD**: 金叉向上 (MACD={macd:.4f}, Signal={signal:.4f})" ) elif macd < signal and hist < 0: trend_analysis.append( f"- **MACD**: 死叉向下 (MACD={macd:.4f}, Signal={signal:.4f})" ) else: trend_analysis.append( f"- **MACD**: 震荡整理 (MACD={macd:.4f}, Signal={signal:.4f})" ) return "\n".join(trend_analysis) if trend_analysis else "暂无趋势指标数据" def _analyze_volatility_indicators( self, indicators: Dict, current_price: float ) -> str: """分析波动性指标""" volatility_analysis = [] # 布林带分析 if ( indicators.get("BOLL_Upper") and indicators.get("BOLL_Middle") and indicators.get("BOLL_Lower") ): upper = indicators["BOLL_Upper"] middle = indicators["BOLL_Middle"] lower = indicators["BOLL_Lower"] position = "中轨附近" if current_price >= upper: position = "上轨或上轨上方,超买区域" elif current_price <= lower: position = "下轨或下轨下方,超卖区域" elif current_price > middle: position = "中轨上方,相对强势" else: position = "中轨下方,相对弱势" volatility_analysis.append( f"- **布林带**: 上轨={upper:.2f}, 中轨={middle:.2f}, 下轨={lower:.2f}" ) volatility_analysis.append(f" 当前价格位于{position}") # ATR分析 if indicators.get("ATR"): atr = indicators["ATR"] volatility_analysis.append(f"- **ATR (平均真实波幅)**: {atr:.2f}") return ( "\n".join(volatility_analysis) if volatility_analysis else "暂无波动性指标数据" ) def _analyze_volume(self, data: pd.DataFrame) -> str: """分析成交量""" recent_volume = data["volume"].iloc[-5:].mean() avg_volume = data["volume"].mean() volume_ratio = recent_volume / avg_volume if avg_volume > 0 else 0 volume_analysis = f""" - **近5日平均成交量**: {recent_volume:,.0f} - **期间平均成交量**: {avg_volume:,.0f} - **成交量比率**: {volume_ratio:.2f}倍 """ if volume_ratio > 1.5: volume_analysis += "📈 **成交量放大**: 近期成交活跃,市场关注度提升" elif volume_ratio < 0.7: volume_analysis += "📉 **成交量萎缩**: 近期成交清淡,市场观望情绪浓厚" else: volume_analysis += "⚖️ **成交量正常**: 维持在平均水平" return volume_analysis def _analyze_support_resistance(self, data: pd.DataFrame) -> str: """分析支撑位和阻力位""" recent_data = data.tail(20) # 计算关键价位 resistance_levels = [] support_levels = [] # 最近高点作为阻力位 high_max = recent_data["high"].max() resistance_levels.append(high_max) # 最近低点作为支撑位 low_min = recent_data["low"].min() support_levels.append(low_min) # 添加均线作为动态支撑/阻力 current_price = data["close"].iloc[-1] analysis = f""" ### 静态支撑与阻力 - **阻力位1**: {resistance_levels[0]:.2f} (近期高点) - **支撑位1**: {support_levels[0]:.2f} (近期低点) ### 动态支撑与阻力 - 短期均线(MA5/MA10)可作为动态支撑/阻力参考 - 中期均线(MA20/MA60)可作为趋势判断依据 """ return analysis def _generate_trading_advice( self, data: pd.DataFrame, indicators: Dict, classification: Dict ) -> str: """生成交易建议""" signals = [] score = 0 # 综合评分 (-100 到 +100) # RSI信号 if indicators.get("RSI"): rsi = indicators["RSI"] if rsi > 70: signals.append("⚠️ RSI超买,注意风险") score -= 20 elif rsi < 30: signals.append("✅ RSI超卖,可能存在机会") score += 20 # MACD信号 if indicators.get("MACD_Histogram"): if indicators["MACD_Histogram"] > 0: signals.append("✅ MACD多头") score += 15 else: signals.append("⚠️ MACD空头") score -= 15 # 均线信号 current_price = data["close"].iloc[-1] if indicators.get("MA20"): if current_price > indicators["MA20"]: signals.append("✅ 价格位于MA20上方") score += 10 else: signals.append("⚠️ 价格位于MA20下方") score -= 10 # 趋势信号 if indicators.get("MA5") and indicators.get("MA10") and indicators.get("MA20"): if indicators["MA5"] > indicators["MA10"] > indicators["MA20"]: signals.append("✅ 均线多头排列") score += 25 elif indicators["MA5"] < indicators["MA10"] < indicators["MA20"]: signals.append("⚠️ 均线空头排列") score -= 25 # 生成建议 advice = "\n".join(signals) + "\n\n" if score > 30: advice += "### 📈 **建议: 积极关注**\n" advice += "技术指标整体偏多,短期可能存在上涨机会,但仍需关注市场整体环境和基本面情况。" elif score < -30: advice += "### 📉 **建议: 谨慎观望**\n" advice += "技术指标整体偏空,短期面临调整压力,建议等待更好的入场时机。" else: advice += "### ⚖️ **建议: 中性观望**\n" advice += "技术指标信号混杂,市场方向不明确,建议等待更清晰的信号再做决策。" advice += f"\n\n**综合评分**: {score}/100" return advice # ==================== 便捷函数 ==================== _global_service = None def get_market_service() -> MarketDataService: """获取市场数据服务单例""" global _global_service if _global_service is None: _global_service = MarketDataService() return _global_service def get_stock_market_data( symbol: str, start_date: str = None, end_date: str = None ) -> pd.DataFrame: """获取股票市场数据(便捷函数)""" service = get_market_service() return service.get_stock_daily_data(symbol, start_date, end_date) def generate_market_analysis_report( symbol: str, start_date: str = None, end_date: str = None ) -> str: """生成市场分析报告(便捷函数)""" service = get_market_service() return service.generate_market_report(symbol, start_date, end_date)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/huweihua123/stock-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server