Skip to main content
Glama

Stock MCP Server

by huweihua123
akshare_service.py38.1 kB
""" AKShare 数据服务 - 使用统一连接管理 基于参考文件 cankao/akshare_utils.py 的经过验证的API实现 """ import pandas as pd from typing import Dict, Optional, Any from datetime import datetime import logging import warnings import threading import requests import socket try: from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry except Exception: HTTPAdapter = None Retry = None try: import akshare as ak except ImportError: ak = None from ..utils.symbol_processor import get_symbol_processor from ..exception.exception import DataNotFoundError logger = logging.getLogger("akshare_service") logging.basicConfig(level=logging.INFO) warnings.filterwarnings("ignore") class AkshareService: """封装 AKShare 的数据服务(经过验证优化的版本)""" def __init__(self): """初始化AKShare服务""" if ak is None: self.connected = False logger.error("❌ AKShare未安装,请执行 'pip install akshare'") raise ImportError("akshare 未安装") try: # 测试连接 _ = ak.stock_info_a_code_name() self.connected = True # 设置更长的超时时间 self._configure_timeout() self.symbol_processor = get_symbol_processor() logger.info("✅ AKShare初始化成功") except Exception as e: self.connected = False logger.error(f"❌ AKShare连接失败: {e}") raise ConnectionError(f"AKShare 连接失败: {e}") from e def _configure_timeout(self, default_timeout: int = 60): """配置AKShare的超时设置""" try: socket.setdefaulttimeout(default_timeout) if HTTPAdapter and Retry: retry_strategy = Retry( total=3, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504], ) adapter = HTTPAdapter(max_retries=retry_strategy) session = requests.Session() session.mount("http://", adapter) session.mount("https://", adapter) self._session = session logger.info("🔧 AKShare超时配置完成: 60秒超时,3次重试") except Exception as e: logger.error(f"⚠️ AKShare超时配置失败: {e}") logger.info("🔧 使用默认超时设置") # ==================== A股数据接口 ==================== def get_stock_daily( self, symbol: str, start_date: str, end_date: str ) -> pd.DataFrame: """获取A股日线数据""" if not self.connected: raise ConnectionError("AKShare未连接") try: ak_symbol = self.symbol_processor.get_akshare_format(symbol) logger.info( f"📊 AKShare获取A股日线: {symbol} -> {ak_symbol} ({start_date} ~ {end_date})" ) df = ak.stock_zh_a_hist( symbol=ak_symbol, period="daily", start_date=start_date.replace("-", ""), end_date=end_date.replace("-", ""), adjust="", ) if df is None or df.empty: raise DataNotFoundError( f"未获取到 {symbol} 在 {start_date}~{end_date} 的日线数据" ) # 标准化列名 mapping = { "日期": "date", "开盘": "open", "收盘": "close", "最高": "high", "最低": "low", "成交量": "volume", "成交额": "amount", } for k, v in mapping.items(): if k in df.columns: df = df.rename(columns={k: v}) if "date" in df.columns: df["date"] = pd.to_datetime(df["date"]) df = df.sort_values("date") logger.info(f"✅ 成功获取A股数据: {ak_symbol}, {len(df)}条记录") return df except Exception as e: logger.error(f"❌ 获取A股日线失败: {symbol}, 错误: {e}") raise def get_stock_info(self, symbol: str) -> Dict[str, Any]: """获取A股基本信息""" if not self.connected: raise ConnectionError("AKShare未连接") try: ak_symbol = self.symbol_processor.get_akshare_format(symbol) info_df = ak.stock_info_a_code_name() row = info_df[info_df["code"] == ak_symbol] if row.empty: raise DataNotFoundError(f"未找到 {symbol} 的基本信息") return { "symbol": ak_symbol, "name": row.iloc[0]["name"], "source": "akshare", } except Exception as e: logger.error(f"❌ 获取A股信息失败: {symbol}, 错误: {e}") raise def get_financial_data(self, symbol: str) -> Dict[str, Optional[pd.DataFrame]]: """获取股票财务数据""" if not self.connected: logger.error(f"❌ AKShare未连接,无法获取{symbol}财务数据") return {} try: ak_symbol = self.symbol_processor.get_akshare_format(symbol) logger.info(f"🔍 开始获取 {symbol} -> {ak_symbol} 的AKShare财务数据") financial_data: Dict[str, Optional[pd.DataFrame]] = {} # 1. 主要财务指标 try: logger.debug(f"📊 获取 {ak_symbol} 主要财务指标...") main_indicators = ak.stock_financial_abstract(symbol=ak_symbol) if main_indicators is not None and not main_indicators.empty: financial_data["main_indicators"] = main_indicators logger.info( f"✅ 获取主要财务指标成功: {len(main_indicators)}条记录" ) else: logger.warning(f"⚠️ {symbol}主要财务指标为空") except Exception as e: logger.warning(f"❌ 获取主要财务指标失败: {e}") # 2. 资产负债表 try: if hasattr(ak, "stock_balance_sheet_by_report_em"): balance_sheet = ak.stock_balance_sheet_by_report_em( symbol=ak_symbol ) if balance_sheet is not None and not balance_sheet.empty: financial_data["balance_sheet"] = balance_sheet logger.debug(f"✅ 获取资产负债表: {len(balance_sheet)}条") except Exception as e: logger.debug(f"获取资产负债表失败: {e}") # 3. 利润表 try: if hasattr(ak, "stock_profit_sheet_by_report_em"): income_statement = ak.stock_profit_sheet_by_report_em( symbol=ak_symbol ) if income_statement is not None and not income_statement.empty: financial_data["income_statement"] = income_statement logger.debug(f"✅ 获取利润表: {len(income_statement)}条") except Exception as e: logger.debug(f"获取利润表失败: {e}") # 4. 现金流量表 try: if hasattr(ak, "stock_cash_flow_sheet_by_report_em"): cash_flow = ak.stock_cash_flow_sheet_by_report_em(symbol=ak_symbol) if cash_flow is not None and not cash_flow.empty: financial_data["cash_flow"] = cash_flow logger.debug(f"✅ 获取现金流量表: {len(cash_flow)}条") except Exception as e: logger.debug(f"获取现金流量表失败: {e}") if financial_data: logger.info( f"✅ 财务数据获取完成: {symbol}, 包含{len(financial_data)}个数据集" ) else: logger.warning(f"⚠️ 未能获取{symbol}的任何财务数据") return financial_data except Exception as e: logger.exception(f"❌ 获取财务数据失败: {symbol}, 错误: {e}") return {} # ==================== 财务数据增强接口 ==================== def get_hk_financial_report( self, symbol: str, report_type: str = "资产负债表", indicator: str = "年度" ) -> Optional[pd.DataFrame]: """ 获取港股财务报表 Args: symbol: 股票代码(如 00700) report_type: 报表类型,可选 {"资产负债表", "利润表", "现金流量表"} indicator: 报告期类型,可选 {"年度", "报告期"} Returns: 财务报表数据 """ if not self.connected: return None try: # 确保股票代码格式正确(5位数字) ak_symbol = symbol.lstrip("0").zfill(5) logger.info( f"📊 获取港股财务报表: {symbol} -> {ak_symbol}, {report_type}, {indicator}" ) df = ak.stock_financial_hk_report_em( stock=ak_symbol, symbol=report_type, indicator=indicator ) if df is not None and not df.empty: logger.info(f"✅ 获取港股{report_type}成功: {len(df)}条记录") return df else: logger.warning(f"⚠️ 港股{report_type}数据为空") return None except Exception as e: logger.error(f"❌ 获取港股{report_type}失败: {e}") return None def get_hk_financial_indicator( self, symbol: str, indicator: str = "年度" ) -> Optional[pd.DataFrame]: """ 获取港股主要财务指标 Args: symbol: 股票代码(如 00700) indicator: 报告期类型,可选 {"年度", "报告期"} Returns: 主要财务指标数据 """ if not self.connected: return None try: ak_symbol = symbol.lstrip("0").zfill(5) logger.info(f"📊 获取港股主要指标: {symbol} -> {ak_symbol}, {indicator}") df = ak.stock_financial_hk_analysis_indicator_em( symbol=ak_symbol, indicator=indicator ) if df is not None and not df.empty: logger.info(f"✅ 获取港股主要指标成功: {len(df)}条记录") return df else: logger.warning(f"⚠️ 港股主要指标数据为空") return None except Exception as e: logger.error(f"❌ 获取港股主要指标失败: {e}") return None def get_us_financial_report( self, symbol: str, report_type: str = "资产负债表", indicator: str = "年报" ) -> Optional[pd.DataFrame]: """ 获取美股财务报表 Args: symbol: 股票代码(如 TSLA) report_type: 报表类型,可选 {"资产负债表", "综合损益表", "现金流量表"} indicator: 报告期类型,可选 {"年报", "单季报", "累计季报"} Returns: 财务报表数据 """ if not self.connected: return None try: logger.info(f"📊 获取美股财务报表: {symbol}, {report_type}, {indicator}") df = ak.stock_financial_us_report_em( stock=symbol, symbol=report_type, indicator=indicator ) if df is not None and not df.empty: logger.info(f"✅ 获取美股{report_type}成功: {len(df)}条记录") return df else: logger.warning(f"⚠️ 美股{report_type}数据为空") return None except Exception as e: logger.error(f"❌ 获取美股{report_type}失败: {e}") return None def get_us_financial_indicator( self, symbol: str, indicator: str = "年报" ) -> Optional[pd.DataFrame]: """ 获取美股主要财务指标 Args: symbol: 股票代码(如 TSLA) indicator: 报告期类型,可选 {"年报", "单季报", "累计季报"} Returns: 主要财务指标数据 """ if not self.connected: return None try: logger.info(f"📊 获取美股主要指标: {symbol}, {indicator}") df = ak.stock_financial_us_analysis_indicator_em( symbol=symbol, indicator=indicator ) if df is not None and not df.empty: logger.info(f"✅ 获取美股主要指标成功: {len(df)}条记录") return df else: logger.warning(f"⚠️ 美股主要指标数据为空") return None except Exception as e: logger.error(f"❌ 获取美股主要指标失败: {e}") return None def get_stock_basic_info_xq( self, symbol: str, market: str = "cn" ) -> Optional[Dict[str, Any]]: """ 获取雪球的股票基本信息 Args: symbol: 股票代码 market: 市场类型,可选 {"cn", "us", "hk"} Returns: 基本信息字典 """ if not self.connected: return None try: logger.info(f"📊 从雪球获取{market}股票基本信息: {symbol}") if market == "cn": # A股:需要带市场前缀,如 SH600519 if not symbol.startswith(("SH", "SZ")): if symbol.startswith("6"): symbol = f"SH{symbol}" else: symbol = f"SZ{symbol}" df = ak.stock_individual_basic_info_xq(symbol=symbol) elif market == "us": df = ak.stock_individual_basic_info_us_xq(symbol=symbol) elif market == "hk": # 港股:确保5位数字格式 symbol = symbol.lstrip("0").zfill(5) df = ak.stock_individual_basic_info_hk_xq(symbol=symbol) else: logger.error(f"❌ 不支持的市场类型: {market}") return None if df is not None and not df.empty: # 转换为字典 result = dict(zip(df["item"], df["value"])) logger.info(f"✅ 获取雪球基本信息成功: {len(result)}个字段") return result else: logger.warning(f"⚠️ 雪球基本信息数据为空") return None except Exception as e: logger.error(f"❌ 获取雪球基本信息失败: {e}") return None # ==================== 港股数据接口 ==================== def get_hk_daily(self, symbol: str, start_date: str, end_date: str) -> pd.DataFrame: """获取港股日线数据(带超时保护)""" if not self.connected: raise ConnectionError("AKShare未连接") ak_symbol = self.symbol_processor.get_akshare_format(symbol) logger.info( f"🇭🇰 AKShare获取港股数据: {symbol} -> {ak_symbol} ({start_date} ~ {end_date})" ) result = [None] exception = [None] def fetch_data(): try: # symbol_processor 已经处理了代码格式 # hk_symbol = self._normalize_hk_symbol(symbol) result[0] = ak.stock_hk_hist( symbol=ak_symbol, period="daily", start_date=start_date.replace("-", ""), end_date=end_date.replace("-", ""), adjust="", ) except Exception as e: exception[0] = e thread = threading.Thread(target=fetch_data, daemon=True) thread.start() thread.join(timeout=60) if thread.is_alive(): raise TimeoutError(f"获取港股 {symbol} 日线超时(60秒)") if exception[0]: raise exception[0] df = result[0] if df is None or df.empty: raise DataNotFoundError( f"未获取到港股 {symbol} 在 {start_date}~{end_date} 的数据" ) # 标准化列名 mapping = { "日期": "date", "开盘": "open", "收盘": "close", "最高": "high", "最低": "low", "成交量": "volume", "成交额": "amount", } for k, v in mapping.items(): if k in df.columns: df = df.rename(columns={k: v}) if "date" in df.columns: df["date"] = pd.to_datetime(df["date"]) df = df.sort_values("date") df["symbol"] = ak_symbol logger.info(f"✅ 港股数据获取成功: {ak_symbol}, {len(df)}条记录") return df def get_hk_info(self, symbol: str) -> Dict[str, Any]: """ 获取港股基本信息(优化版 - 不使用全市场数据) 性能优化:移除了全市场数据调用,避免下载2000+只股票数据 推荐使用 get_stock_basic_info_xq() 和 get_hk_financial_indicator() """ if not self.connected: return { "symbol": symbol, "name": f"港股{symbol}", "currency": "HKD", "exchange": "HKG", "source": "akshare_unavailable", } try: ak_symbol = self.symbol_processor.get_akshare_format(symbol) logger.info(f"🇭🇰 获取港股信息: {symbol} -> {ak_symbol}") # 优化:直接返回基本信息,不调用全市场数据 # 详细信息应该通过专用接口获取: # - 基本信息: get_stock_basic_info_xq(symbol, "hk") # - 财务指标: get_hk_financial_indicator(symbol) return { "symbol": symbol, "name": f"港股{symbol}", "currency": "HKD", "exchange": "HKG", "source": "akshare", } except Exception as e: logger.error(f"❌ 获取港股信息失败: {e}") return { "symbol": symbol, "name": f"港股{symbol}", "currency": "HKD", "exchange": "HKG", "source": "akshare_error", "error": str(e), } # ==================== 美股数据接口 ==================== def get_us_daily(self, symbol: str, start_date: str, end_date: str) -> pd.DataFrame: """获取美股日线数据(使用新浪美股历史数据接口)""" if not self.connected: raise ConnectionError("AKShare未连接") ak_symbol = self.symbol_processor.get_akshare_format(symbol) logger.info( f"🇺🇸 AKShare获取美股数据: {symbol} -> {ak_symbol} ({start_date} ~ {end_date})" ) result = [None] exception = [None] def fetch_data(): try: # 使用AKShare的新浪美股历史数据接口 full_data = ak.stock_us_daily(symbol=ak_symbol, adjust="") if full_data is None or full_data.empty: logger.warning(f"⚠️ 美股历史数据为空: {symbol}") result[0] = pd.DataFrame() return # 过滤日期范围 if "date" in full_data.columns: full_data["date"] = pd.to_datetime(full_data["date"]) start_dt = pd.to_datetime(start_date) end_dt = pd.to_datetime(end_date) filtered_data = full_data[ (full_data["date"] >= start_dt) & (full_data["date"] <= end_dt) ].copy() if filtered_data.empty: logger.warning( f"⚠️ 指定日期范围内无美股数据: {symbol} ({start_date} ~ {end_date})" ) else: logger.debug( f"✅ 获取美股数据成功: {symbol}, {len(filtered_data)}条" ) result[0] = filtered_data else: result[0] = full_data except Exception as e: logger.error(f"❌ 获取美股数据失败: {symbol}, 错误: {e}") exception[0] = e thread = threading.Thread(target=fetch_data, daemon=True) thread.start() thread.join(timeout=120) # 美股数据较大,增加超时时间 if thread.is_alive(): raise TimeoutError(f"获取美股 {symbol} 日线超时(120秒)") if exception[0]: raise exception[0] df = result[0] if df is None or df.empty: raise DataNotFoundError( f"未获取到美股 {symbol} 在 {start_date}~{end_date} 的数据" ) # 确保列名正确 required_columns = ["date", "open", "high", "low", "close", "volume"] for col in required_columns: if col not in df.columns: logger.warning(f"⚠️ 美股数据缺少列 {col}") df[col] = 0 if col == "volume" else 0.0 if "date" in df.columns: df["date"] = pd.to_datetime(df["date"]) df = df.sort_values("date").reset_index(drop=True) df["symbol"] = ak_symbol logger.info(f"✅ 美股数据处理完成: {ak_symbol}, {len(df)}条记录") return df def get_us_info(self, symbol: str) -> Dict[str, Any]: """ 获取美股基本信息(优化版 - 不使用全市场数据) 性能优化:移除了全市场数据调用,避免下载3000+只股票数据 推荐使用 get_stock_basic_info_xq() 和 get_us_financial_indicator() """ if not self.connected: return { "symbol": symbol, "name": f"美股{symbol}", "currency": "USD", "exchange": "US", "source": "akshare_unavailable", } try: ak_symbol = self.symbol_processor.get_akshare_format(symbol) # 优化:使用预设名称,不调用全市场数据 # 详细信息应该通过专用接口获取: # - 基本信息: get_stock_basic_info_xq(symbol, "us") # - 财务指标: get_us_financial_indicator(symbol) stock_name = self._get_us_stock_name(ak_symbol) return { "symbol": ak_symbol, "name": stock_name, "currency": "USD", "exchange": "US", "source": "akshare", } except Exception as e: logger.error(f"❌ 获取美股信息失败: {e}") return { "symbol": symbol, "name": f"美股{symbol}", "currency": "USD", "exchange": "US", "source": "akshare_error", "error": str(e), } def _get_us_stock_name(self, symbol: str) -> str: """获取美股名称(使用常见映射)""" common_us_stocks = { "AAPL": "苹果公司", "MSFT": "微软公司", "GOOGL": "谷歌A类股", "GOOG": "谷歌C类股", "AMZN": "亚马逊公司", "TSLA": "特斯拉公司", "META": "Meta平台", "NVDA": "英伟达公司", "NFLX": "奈飞公司", "AMD": "超威半导体", "INTC": "英特尔公司", "CRM": "Salesforce", "ORCL": "甲骨文公司", "ADBE": "Adobe公司", "PYPL": "PayPal公司", "DIS": "迪士尼公司", "BA": "波音公司", "JPM": "摩根大通", "V": "Visa公司", "MA": "万事达卡", } if symbol in common_us_stocks: logger.info(f"✅ 使用预设名称: {symbol} -> {common_us_stocks[symbol]}") return common_us_stocks[symbol] else: logger.info(f"⚠️ 使用默认名称: {symbol}") return f"美股{symbol}" # ==================== 新闻数据接口 ==================== def get_stock_news_em(self, symbol: str, max_news: int = 20) -> pd.DataFrame: """获取东方财富个股新闻""" if not self.connected: logger.error("[东方财富新闻] ❌ AKShare未连接") return pd.DataFrame() start_time = datetime.now() ak_symbol = self.symbol_processor.get_akshare_format(symbol) logger.info(f"[东方财富新闻] 获取股票 {symbol} -> {ak_symbol} 的新闻数据") try: result = [None] exception = [None] def fetch_news(): try: result[0] = ak.stock_news_em(symbol=ak_symbol) except Exception as e: exception[0] = e thread = threading.Thread(target=fetch_news) thread.daemon = True thread.start() thread.join(timeout=30) if thread.is_alive(): elapsed = (datetime.now() - start_time).total_seconds() logger.warning( f"[东方财富新闻] ⚠️ 获取超时(30秒): {symbol}, 耗时: {elapsed:.2f}秒" ) raise TimeoutError(f"东方财富新闻获取超时(30秒): {symbol}") if exception[0]: raise exception[0] news_df = result[0] if news_df is not None and not news_df.empty: if len(news_df) > max_news: news_df = news_df.head(max_news) elapsed = (datetime.now() - start_time).total_seconds() logger.info( f"[东方财富新闻] ✅ 获取成功: {ak_symbol}, 共{len(news_df)}条, 耗时: {elapsed:.2f}秒" ) return news_df else: elapsed = (datetime.now() - start_time).total_seconds() logger.warning( f"[东方财富新闻] ⚠️ 数据为空: {symbol}, 耗时: {elapsed:.2f}秒" ) return pd.DataFrame() except Exception as e: elapsed = (datetime.now() - start_time).total_seconds() logger.error( f"[东方财富新闻] ❌ 获取失败: {symbol}, 错误: {e}, 耗时: {elapsed:.2f}秒" ) return pd.DataFrame() # ==================== 全市场数据接口 ==================== def get_china_market_spot(self) -> pd.DataFrame: """ 获取A股全市场实时行情数据 包含市盈率、市净率等估值指标 Returns: pd.DataFrame: 全市场数据 """ if not self.connected: raise ConnectionError("AKShare未连接") try: logger.info("📊 获取A股全市场实时数据...") df = ak.stock_zh_a_spot_em() if df is not None and not df.empty: logger.info(f"✅ 获取A股全市场数据成功: {len(df)} 只股票") return df else: logger.warning("⚠️ A股全市场数据为空") return pd.DataFrame() except Exception as e: logger.error(f"❌ 获取A股全市场数据失败: {e}") raise def get_hk_market_spot(self) -> pd.DataFrame: """ 获取港股全市场实时行情数据 Returns: pd.DataFrame: 全市场数据 """ if not self.connected: raise ConnectionError("AKShare未连接") try: logger.info("📊 获取港股全市场实时数据...") df = ak.stock_hk_spot_em() if df is not None and not df.empty: logger.info(f"✅ 获取港股全市场数据成功: {len(df)} 只股票") return df else: logger.warning("⚠️ 港股全市场数据为空") return pd.DataFrame() except Exception as e: logger.error(f"❌ 获取港股全市场数据失败: {e}") raise def get_us_market_spot(self) -> pd.DataFrame: """ 获取美股全市场实时行情数据 Returns: pd.DataFrame: 全市场数据 """ if not self.connected: raise ConnectionError("AKShare未连接") try: logger.info("📊 获取美股全市场实时数据...") df = ak.stock_us_spot_em() if df is not None and not df.empty: logger.info(f"✅ 获取美股全市场数据成功: {len(df)} 只股票") return df else: logger.warning("⚠️ 美股全市场数据为空") return pd.DataFrame() except Exception as e: logger.error(f"❌ 获取美股全市场数据失败: {e}") raise def get_stock_spot_info( self, symbol: str, market: str = "china" ) -> Optional[Dict[str, Any]]: """ 从全市场数据中获取单只股票的实时信息(含市盈率等) ⚠️ 性能警告: - 会下载整个市场的数据(A股4000+,港股2000+,美股3000+) - 首次请求耗时15-30秒 - 占用大量内存(~50MB) ⚠️ 不推荐用于单只股票查询! 推荐使用场景: - 批量查询(超过10只股票) - 市场概览 - 板块分析 单只股票查询请使用专用接口: - A股:get_stock_info() + Tushare财务指标 - 港股:get_stock_basic_info_xq("symbol", "hk") + get_hk_financial_indicator() - 美股:get_stock_basic_info_xq("symbol", "us") + get_us_financial_indicator() Args: symbol: 股票代码 market: 市场类型 (china/hk/us) Returns: Dict: 股票实时信息 """ # 添加性能警告 logger.warning( f"⚠️ 正在使用全市场数据接口获取单只股票({symbol})信息," f"这会下载整个{market}市场数据,建议使用专用接口" ) try: from ..utils.market_data_cache import get_market_data_cache # 使用15分钟缓存 cache = get_market_data_cache(ttl=900) # 15分钟缓存 # 尝试从缓存获取全市场数据 market_data = cache.get(market, "spot") if market_data is None: # 缓存未命中,获取新数据 logger.info(f"📊 缓存未命中,获取{market}全市场数据...") if market == "china": market_data = self.get_china_market_spot() elif market == "hk": market_data = self.get_hk_market_spot() elif market == "us": market_data = self.get_us_market_spot() else: logger.error(f"❌ 不支持的市场类型: {market}") return None # 写入缓存 if market_data is not None and not market_data.empty: cache.set(market, market_data, "spot") if market_data is None or market_data.empty: logger.warning(f"⚠️ {market}全市场数据为空") return None # 查找指定股票 ak_symbol = self.symbol_processor.get_akshare_format(symbol) # 不同市场的代码列名不同 code_column = "代码" if market == "china": # A股: 去掉前缀的纯数字代码 clean_code = ak_symbol stock_row = market_data[market_data[code_column] == clean_code] elif market == "hk": # 港股: 5位数字代码 clean_code = ak_symbol.zfill(5) stock_row = market_data[market_data[code_column] == clean_code] elif market == "us": # 美股: 股票代码 stock_row = market_data[market_data[code_column] == symbol.upper()] else: return None if stock_row.empty: logger.warning(f"⚠️ 在{market}全市场数据中未找到 {symbol} ({ak_symbol})") return None # 转换为字典 info = stock_row.iloc[0].to_dict() logger.info(f"✅ 从全市场数据获取 {symbol} 信息成功") return info except Exception as e: logger.error(f"❌ 获取股票实时信息失败: {symbol}, {e}") return None # ==================== 便捷函数 ==================== _global_service = None def get_akshare_service() -> AkshareService: """获取AKShare服务单例""" global _global_service if _global_service is None: _global_service = AkshareService() return _global_service def get_hk_stock_data_akshare( symbol: str, start_date: str = None, end_date: str = None ) -> str: """获取港股数据(便捷函数)""" try: service = get_akshare_service() data = service.get_hk_daily(symbol, start_date, end_date) if data is not None and not data.empty: return _format_hk_stock_data(symbol, data, start_date, end_date) else: return f"❌ 无法获取港股 {symbol} 的数据" except Exception as e: return f"❌ AKShare港股数据获取失败: {e}" def get_us_stock_data_akshare( symbol: str, start_date: str = None, end_date: str = None ) -> str: """获取美股数据(便捷函数)""" try: service = get_akshare_service() data = service.get_us_daily(symbol, start_date, end_date) if data is not None and not data.empty: return _format_us_stock_data(symbol, data, start_date, end_date) else: return f"❌ 无法获取美股 {symbol} 的数据" except Exception as e: return f"❌ AKShare美股数据获取失败: {e}" def _format_hk_stock_data( symbol: str, data: pd.DataFrame, start_date: str, end_date: str ) -> str: """格式化港股数据""" try: service = get_akshare_service() stock_info = service.get_hk_info(symbol) stock_name = stock_info.get("name", f"港股{symbol}") latest_price = data["close"].iloc[-1] first_price = data["close"].iloc[0] price_change = latest_price - first_price price_change_pct = (price_change / first_price) * 100 report = f""" 🇭🇰 港股数据报告 (AKShare) ================ 股票信息: - 代码: {symbol} - 名称: {stock_name} - 货币: 港币 (HKD) - 交易所: 香港交易所 (HKG) 价格信息: - 最新价格: HK${latest_price:.2f} - 期间涨跌: HK${price_change:+.2f} ({price_change_pct:+.2f}%) - 期间最高: HK${data['high'].max():.2f} - 期间最低: HK${data['low'].min():.2f} 交易信息: - 数据期间: {start_date} 至 {end_date} - 交易天数: {len(data)}天 - 平均成交量: {data['volume'].mean():,.0f}股 最近5个交易日: """ for _, row in data.tail(5).iterrows(): date_str = row["date"].strftime("%Y-%m-%d") report += f"- {date_str}: 开盘HK${row['open']:.2f}, 收盘HK${row['close']:.2f}, 成交量{row['volume']:,.0f}\n" report += "\n数据来源: AKShare (港股)\n" return report except Exception as e: logger.error(f"❌ 格式化港股数据失败: {e}") return f"❌ 港股数据格式化失败: {symbol}" def _format_us_stock_data( symbol: str, data: pd.DataFrame, start_date: str, end_date: str ) -> str: """格式化美股数据""" try: service = get_akshare_service() stock_info = service.get_us_info(symbol) stock_name = stock_info.get("name", f"美股{symbol}") latest_price = data["close"].iloc[-1] first_price = data["close"].iloc[0] price_change = latest_price - first_price price_change_pct = (price_change / first_price) * 100 report = f""" 🇺🇸 美股数据报告 (AKShare) ================ 股票信息: - 代码: {symbol} - 名称: {stock_name} - 货币: 美元 (USD) - 交易所: 美国交易所 (US) 价格信息: - 最新价格: ${latest_price:.2f} - 期间涨跌: ${price_change:+.2f} ({price_change_pct:+.2f}%) - 期间最高: ${data['high'].max():.2f} - 期间最低: ${data['low'].min():.2f} 交易信息: - 数据期间: {start_date} 至 {end_date} - 交易天数: {len(data)}天 - 平均成交量: {data['volume'].mean():,.0f}股 最近5个交易日: """ for _, row in data.tail(5).iterrows(): date_str = row["date"].strftime("%Y-%m-%d") report += f"- {date_str}: 开盘${row['open']:.2f}, 收盘${row['close']:.2f}, 成交量{row['volume']:,.0f}\n" report += "\n数据来源: AKShare (美股)\n" return report except Exception as e: logger.error(f"❌ 格式化美股数据失败: {e}") return f"❌ 美股数据格式化失败: {symbol}"

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/huweihua123/stock-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server