Skip to main content
Glama
server.py45.3 kB
from datetime import datetime from typing import Annotated, Literal, Callable, Any, List import io import json import akshare as ak import akshare_one as ako import pandas as pd from akshare_one import indicators from fastmcp import FastMCP from pydantic import Field from .cache_utils import cached_data_fetch def _fetch_data_with_fallback( fetch_func: Callable[..., pd.DataFrame], primary_source: str, fallback_sources: List[str], **kwargs: Any, ) -> pd.DataFrame: """ 通用的数据源故障切换辅助函数。 按优先级尝试数据源,直到获取到有效数据或所有数据源都失败。 Args: fetch_func: 实际调用 akshare 或 akshare_one 获取数据的函数。 这个函数应该接受 'source' 参数,或者在内部处理 source 的映射。 primary_source: 用户指定的首选数据源。 fallback_sources: 备用数据源列表,按优先级排序。 **kwargs: 传递给 fetch_func 的其他参数。 Returns: pd.DataFrame: 获取到的数据。 Raises: RuntimeError: 如果所有数据源都未能获取到有效数据。 """ if primary_source is None: return fetch_func(**kwargs) data_source_priority = [primary_source] + fallback_sources # 移除重复项并保持顺序 seen = set() unique_data_source_priority = [] for x in data_source_priority: if x not in seen: unique_data_source_priority.append(x) seen.add(x) df = None errors = [] for current_source in unique_data_source_priority: try: # 假设 fetch_func 能够接受 source 参数 # 或者 fetch_func 内部根据 kwargs 中的 source 参数进行逻辑判断 temp_df = fetch_func(source=current_source, **kwargs) if temp_df is not None and not temp_df.empty: print(f"成功从数据源 '{current_source}' 获取数据。") df = temp_df break else: errors.append(f"数据源 '{current_source}' 返回空数据。") except Exception as e: errors.append(f"从数据源 '{current_source}' 获取数据失败: {str(e)}") if df is None or df.empty: raise RuntimeError( f"所有数据源都未能获取到有效数据。详细错误: {'; '.join(errors)}" ) return df def _get_market_from_symbol(symbol: str) -> str: """ 根据股票代码判断所属市场。 上海证券交易所: sh (600, 601, 603, 605, 688 开头) 深圳证券交易所: sz (000, 001, 002, 300 开头) 北京证券交易所: bj (830, 870, 880 开头) 默认返回 "sh" """ if symbol.startswith(("600", "601", "603", "605", "688")): return "sh" elif symbol.startswith(("000", "001", "002", "300")): return "sz" elif symbol.startswith(("830", "870", "880")): return "bj" return "sh" # 默认上海市场 def _format_dataframe_output( df: pd.DataFrame, output_format: Literal["json", "csv", "xml", "excel", "markdown", "html"], ) -> str: """ 根据指定的格式格式化 DataFrame 输出。 """ if df.empty: return json.dumps([]) if output_format == "json": return df.to_json(orient="records", force_ascii=False) elif output_format == "csv": return df.to_csv(index=False) elif output_format == "xml": return df.to_xml(index=False) elif output_format == "excel": # 使用 BytesIO 将 Excel 写入内存 output = io.BytesIO() df.to_excel(output, index=False, engine='openpyxl') # 返回 base64 编码的二进制数据,或者直接返回字节流 # 为了兼容性,这里尝试返回 utf-8 编码的字符串,但对于二进制文件,通常直接传输字节流更合适 return output.getvalue().decode("utf-8", errors="ignore") elif output_format == "markdown": return df.to_markdown(index=False) elif output_format == "html": return df.to_html(index=False) else: return df.to_json(orient="records", force_ascii=False) mcp = FastMCP(name="china-stock-mcp") # 初始化 FastMCP 服务器实例 @mcp.tool( name="get_hist_data", description="获取指定的股票历史行情数据及技术指标" ) def get_hist_data( symbol: Annotated[str, Field(description="股票代码 (例如: '000001')")], interval: Annotated[ Literal["minute", "hour", "day", "week", "month", "year"], Field(description="时间周期: minute, hour, day, week, month, year。默认:day"), ] = "day", interval_multiplier: Annotated[int, Field(description="时间周期乘数", ge=1)] = 1, start_date: Annotated[ str, Field(description="开始日期,格式为 YYYY-MM-DD") ] = "1970-01-01", end_date: Annotated[ str, Field(description="结束日期,格式为 YYYY-MM-DD") ] = "2030-12-31", adjust: Annotated[ Literal["none", "qfq", "hfq"], Field(description="复权类型: none, qfq(前复权), hfq(后复权)。默认:none"), ] = "none", indicators_list: Annotated[ str | list[ Literal[ "SMA", "EMA", "RSI", "MACD", "BOLL", "STOCH", "ATR", "CCI", "ADX", "WILLR", "AD", "ADOSC", "OBV", "MOM", "SAR", "TSF", "APO", "AROON", "AROONOSC", "BOP", "CMO", "DX", "MFI", "MINUS_DI", "MINUS_DM", "PLUS_DI", "PLUS_DM", "PPO", "ROC", "ROCP", "ROCR", "ROCR100", "TRIX", "ULTOSC", ] ]|None, Field( description="要添加的技术指标,可以是逗号分隔的字符串(例如: 'SMA,EMA')或字符串列表(例如: ['SMA', 'EMA'])。支持的指标包括: SMA, EMA, RSI, MACD, BOLL, STOCH, ATR, CCI, ADX, WILLR, AD, ADOSC, OBV, MOM, SAR, TSF, APO, AROON, AROONOSC, BOP, CMO, DX, MFI, MINUS_DI, MINUS_DM, PLUS_DI, PLUS_DM, PPO, ROC, ROCP, ROCR, ROCR100, TRIX, ULTOSC。常用指标:SMA, EMA, RSI, MACD, BOLL, STOCH, OBV, MFI,建议不超过10个。" ), ] = "SMA, EMA, RSI, MACD, BOLL, STOCH, OBV, MFI", output_format: Annotated[ Literal["json", "csv", "xml", "excel", "markdown", "html"], Field(description="输出数据格式: json, csv, xml, excel, markdown, html。默认: markdown"), ] = "markdown" ) -> str: """获取股票历史行情数据.""" # 定义内部 fetch_func def hist_data_fetcher(source: str, **kwargs: Any) -> pd.DataFrame: return ako.get_hist_data(source=source, **kwargs) df = _fetch_data_with_fallback( fetch_func=hist_data_fetcher, primary_source="eastmoney", fallback_sources=["eastmoney_direct", "sina"], symbol=symbol, interval=interval, interval_multiplier=interval_multiplier, start_date=start_date, end_date=end_date, adjust=adjust, ) indicator_map = { "SMA": (indicators.get_sma, {"window": 20}), "EMA": (indicators.get_ema, {"window": 20}), "RSI": (indicators.get_rsi, {"window": 14}), "MACD": (indicators.get_macd, {"fast": 12, "slow": 26, "signal": 9}), "BOLL": (indicators.get_bollinger_bands, {"window": 20, "std": 2}), "STOCH": ( indicators.get_stoch, {"window": 14, "smooth_d": 3, "smooth_k": 3}, ), "ATR": (indicators.get_atr, {"window": 14}), "CCI": (indicators.get_cci, {"window": 14}), "ADX": (indicators.get_adx, {"window": 14}), "WILLR": (indicators.get_willr, {"window": 14}), "AD": (indicators.get_ad, {}), "ADOSC": (indicators.get_adosc, {"fast_period": 3, "slow_period": 10}), "OBV": (indicators.get_obv, {}), "MOM": (indicators.get_mom, {"window": 10}), "SAR": (indicators.get_sar, {"acceleration": 0.02, "maximum": 0.2}), "TSF": (indicators.get_tsf, {"window": 14}), "APO": ( indicators.get_apo, {"fast_period": 12, "slow_period": 26, "ma_type": 0}, ), "AROON": (indicators.get_aroon, {"window": 14}), "AROONOSC": (indicators.get_aroonosc, {"window": 14}), "BOP": (indicators.get_bop, {}), "CMO": (indicators.get_cmo, {"window": 14}), "DX": (indicators.get_dx, {"window": 14}), "MFI": (indicators.get_mfi, {"window": 14}), "MINUS_DI": (indicators.get_minus_di, {"window": 14}), "MINUS_DM": (indicators.get_minus_dm, {"window": 14}), "PLUS_DI": (indicators.get_plus_di, {"window": 14}), "PLUS_DM": (indicators.get_plus_dm, {"window": 14}), "PPO": ( indicators.get_ppo, {"fast_period": 12, "slow_period": 26, "ma_type": 0}, ), "ROC": (indicators.get_roc, {"window": 10}), "ROCP": (indicators.get_rocp, {"window": 10}), "ROCR": (indicators.get_rocr, {"window": 10}), "ROCR100": (indicators.get_rocr100, {"window": 10}), "TRIX": (indicators.get_trix, {"window": 30}), "ULTOSC": ( indicators.get_ultosc, {"window1": 7, "window2": 14, "window3": 28}, ), } print("indicators_list: ", indicators_list) if indicators_list is None: indicators_list = [] elif isinstance(indicators_list, str): indicators_list = [ indicator.strip() for indicator in indicators_list.split(",") if indicator.strip() ] # 过滤掉无效的指标 if indicators_list: valid_indicators = [] for indicator in indicators_list: if indicator in indicator_map: valid_indicators.append(indicator) else: print(f"警告: 指标 '{indicator}' 不存在,将被忽略。") indicators_list = valid_indicators temp = [] for indicator in indicators_list: if indicator in indicator_map: func, params = indicator_map[indicator] indicator_df = func(df, **params) temp.append(indicator_df) if temp: df = df.join(temp) return _format_dataframe_output(df, output_format) @mcp.tool( name="get_realtime_data", description="获取指定的股票实时行情数据" ) def get_realtime_data( symbol: Annotated[str, Field(description="股票代码 (例如: '000001')")], output_format: Annotated[ Literal["json", "csv", "xml", "excel", "markdown", "html"], Field(description="输出数据格式: json, csv, xml, excel, markdown, html。默认: markdown"), ] = "markdown" ) -> str: """获取实时股票行情数据. 'eastmoney_direct' """ # 定义内部 fetch_func def realtime_data_fetcher(source: str, **kwargs: Any) -> pd.DataFrame: return ako.get_realtime_data(source=source, **kwargs) df = _fetch_data_with_fallback( fetch_func=realtime_data_fetcher, primary_source="eastmoney", fallback_sources=["eastmoney_direct","xueqiu"], symbol=symbol, ) return _format_dataframe_output(df, output_format) @mcp.tool(name="get_news_data", description="获取股票相关的新闻数据") def get_news_data( symbol: Annotated[str, Field(description="股票代码 (例如: '000001')")], output_format: Annotated[ Literal["json", "csv", "xml", "excel", "markdown", "html"], Field(description="输出数据格式: json, csv, xml, excel, markdown, html。默认: markdown"), ] = "markdown" ) -> str: """获取股票相关新闻数据.""" df = ako.get_news_data(symbol=symbol, source="eastmoney") return _format_dataframe_output(df, output_format) @mcp.tool(name="get_balance_sheet", description="获取公司的资产负债表") def get_balance_sheet( symbol: Annotated[str, Field(description="股票代码 (例如: '000001')")], output_format: Annotated[ Literal["json", "csv", "xml", "excel", "markdown", "html"], Field(description="输出数据格式: json, csv, xml, excel, markdown, html。默认: markdown"), ] = "markdown" ) -> str: """获取公司资产负债表数据.""" df = ako.get_balance_sheet(symbol=symbol, source="sina") if df.empty: df = pd.DataFrame() return _format_dataframe_output(df, output_format) @mcp.tool(name="get_income_statement", description="获取指定股票代码的公司的利润表") def get_income_statement( symbol: Annotated[str, Field(description="股票代码 (例如: '000001')")], output_format: Annotated[ Literal["json", "csv", "xml", "excel", "markdown", "html"], Field(description="输出数据格式: json, csv, xml, excel, markdown, html。默认: markdown"), ] = "markdown" ) -> str: """获取公司利润表数据.""" df = ako.get_income_statement(symbol=symbol, source="sina") if df.empty: df = pd.DataFrame() return _format_dataframe_output(df, output_format) @mcp.tool(name="get_cash_flow", description="获取指定股票代码的公司的现金流量表") def get_cash_flow( symbol: Annotated[str, Field(description="股票代码 (例如: '000001')")], output_format: Annotated[ Literal["json", "csv", "xml", "excel", "markdown", "html"], Field(description="输出数据格式: json, csv, xml, excel, markdown, html。默认: markdown"), ] = "markdown" ) -> str: """获取公司现金流量表数据.""" df = ako.get_cash_flow(symbol=symbol, source="sina") if df.empty: df = pd.DataFrame() return _format_dataframe_output(df, output_format) @mcp.tool(name="get_fund_flow", description="获取股票资金流向数据 (近100交易日)") def get_fund_flow( symbol: Annotated[str, Field(description="股票代码 (例如: '000001')")], output_format: Annotated[ Literal["json", "csv", "xml", "excel", "markdown", "html"], Field(description="输出数据格式: json, csv, xml, excel, markdown, html。默认: markdown"), ] = "markdown" ) -> str: market = _get_market_from_symbol(symbol) df = ak.stock_individual_fund_flow(stock=symbol, market=market) if df.empty: df = pd.DataFrame() return _format_dataframe_output(df, output_format) @mcp.tool(name="get_inner_trade_data", description="获取公司内部股东交易数据") def get_inner_trade_data( symbol: Annotated[str, Field(description="股票代码 (例如: '000001')")], output_format: Annotated[ Literal["json", "csv", "xml", "excel", "markdown", "html"], Field(description="输出数据格式: json, csv, xml, excel, markdown, html。默认: markdown"), ] = "markdown" ) -> str: """获取公司内部股东交易数据.""" df = ako.get_inner_trade_data(symbol, source="xueqiu") if df.empty: df = pd.DataFrame() return _format_dataframe_output(df, output_format) @mcp.tool(name="get_financial_metrics", description="获取公司关键财务指标") def get_financial_metrics( symbol: Annotated[str, Field(description="股票代码 (例如: '000001')")], output_format: Annotated[ Literal["json", "csv", "xml", "excel", "markdown", "html"], Field(description="输出数据格式: json, csv, xml, excel, markdown, html。默认: markdown"), ] = "markdown" ) -> str: """ 获取三大财务报表的关键财务指标. """ def get_get_financial_metrics_fetcher(source: str, **kwargs: Any) -> pd.DataFrame: if source == "sina": return ak.stock_financial_abstract(**kwargs) elif source == "ths": return ak.stock_financial_abstract_ths(**kwargs,indicator="按报告期") else: return ako.get_financial_metrics(**kwargs) df = _fetch_data_with_fallback( fetch_func=get_get_financial_metrics_fetcher, primary_source="sina", fallback_sources=[ "eastmoney_direct", "ths" ], symbol=symbol, ) if df.empty: df = pd.DataFrame() return _format_dataframe_output(df, output_format) @mcp.tool( name="get_time_info", description="获取当前时间(ISO格式、时间戳)和最近一个交易日" ) def get_time_info() -> dict: """获取当前时间(ISO格式、时间戳)和最近一个交易日.""" local_time = datetime.now().astimezone() current_date = local_time.date() # 获取交易日历数据 trade_date_df = ak.tool_trade_date_hist_sina() trade_dates = [d for d in trade_date_df["trade_date"]] # 提取所有交易日期 # 筛选出小于等于当前日期的交易日,并按降序排列 past_dates = sorted([d for d in trade_dates if d <= current_date], reverse=True) # 找到最近的一个交易日 last_trading_day = past_dates[0].strftime("%Y-%m-%d") if past_dates else None return { "iso_format": local_time.isoformat(), "timestamp": local_time.timestamp(), "last_trading_day": last_trading_day, } @mcp.tool(name="get_stock_basic_info", description="获取指定股票的基本概要信息") def get_stock_basic_info( symbol: Annotated[str, Field(description="股票代码 (例如: '000001')")], output_format: Annotated[ Literal["json", "csv", "xml", "excel", "markdown", "html"], Field(description="输出数据格式: json, csv, xml, excel, markdown, html。默认: markdown"), ] = "markdown" ) -> str: """获取股票基本概要信息,支持 A 股和港股""" # 定义内部 fetch_func def get_stock_basic_info_fetcher(source: str, **kwargs: Any) -> pd.DataFrame: if source == "eastmoney": df = ak.stock_individual_info_em(symbol) elif source == "xueqiu": df = ak.stock_individual_basic_info_xq(symbol) elif source == "cninfo": df = ak.stock_profile_cninfo(symbol) elif source == "xq": df = ak.stock_individual_basic_info_hk_xq(symbol) return df df = _fetch_data_with_fallback( fetch_func=get_stock_basic_info_fetcher, primary_source="cninfo", fallback_sources=[ "eastmoney", "xq", "xueqiu", ], symbol=symbol, ) if df.empty: df = pd.DataFrame() return _format_dataframe_output(df, output_format) @mcp.tool(name="get_macro_data", description="获取宏观经济指标数据,支持多个指标") def get_macro_data( indicator: Annotated[ str | list[ Literal["money_supply", "gdp", "cpi", "pmi", "stock_summary"] ] | None, Field( description="要获取的宏观经济指标,可以是逗号分隔的字符串(例如: 'gdp,cpi')或字符串列表(例如: ['gdp', 'cpi'])。支持的指标包括: money_supply, gdp, cpi, pmi, stock_summary。默认: 'gdp'" ), ] = "gdp", output_format: Annotated[ Literal["json", "csv", "xml", "excel", "markdown", "html"], Field(description="输出数据格式: json, csv, xml, excel, markdown, html。默认: markdown"), ] = "markdown" ) -> str: """获取宏观经济数据""" def _clean_macro_data(df: pd.DataFrame) -> pd.DataFrame: """ 通用数据清洗函数,删除全 null 列和全 null 行。 """ if df.empty: return df # 删除所有列值都为 null 的列 df = df.dropna(axis=1, how='all') # 删除所有行值都为 null 的行 df = df.dropna(axis=0, how='all') return df macro_indicator_map = { "money_supply": ak.macro_china_money_supply, "gdp": ak.macro_china_gdp_yearly, "cpi": ak.macro_china_cpi_yearly, "pmi": ak.macro_china_pmi_yearly, "stock_summary": ak.macro_china_stock_market_cap, } @cached_data_fetch() def get_macro_data_fetcher( indicator_name: str, **kwargs: Any ) -> pd.DataFrame: fetch_func = macro_indicator_map.get(indicator_name) if fetch_func: return fetch_func(**kwargs) else: raise ValueError(f"不支持的宏观经济指标: {indicator_name}") if indicator is None: indicator_list = ["gdp"] # 默认指标 elif isinstance(indicator, str): indicator_list = [ ind.strip() for ind in indicator.split(",") if ind.strip() ] else: # 假设是 list[Literal[...]] indicator_list = indicator # 过滤掉无效的指标 valid_indicators = [] for ind in indicator_list: if ind in macro_indicator_map: valid_indicators.append(ind) else: print(f"警告: 宏观经济指标 '{ind}' 不存在,将被忽略。") if not valid_indicators: print("警告: 未找到有效的宏观经济指标,将返回空数据。") return _format_dataframe_output(pd.DataFrame(), output_format) df_list = [] for ind in valid_indicators: try: temp_df = get_macro_data_fetcher(ind) if temp_df is not None and not temp_df.empty: temp_df = _clean_macro_data(temp_df) temp_df['indicator'] = ind # 添加指标名称列 df_list.append(temp_df) except Exception as e: print(f"获取宏观经济指标 '{ind}' 失败: {e}") if df_list: df = pd.concat(df_list, ignore_index=True) else: df = pd.DataFrame() # 如果没有获取到任何数据,返回空的DataFrame return _format_dataframe_output(df, output_format) @mcp.tool(name="get_investor_sentiment", description="分析投资者情绪数据,包括用户关注指数、日度市场参与意愿、股票评级记录和机构参与度。") def get_investor_sentiment( symbol: Annotated[str, Field(description="股票代码 (例如: '000001')")], output_format: Annotated[ Literal["json", "csv", "xml", "excel", "markdown", "html"], Field(description="输出数据格式: json, csv, xml, excel, markdown, html。默认: markdown"), ] = "markdown" ) -> str: """分析散户和机构投资者的投资情绪""" def get_investor_sentiment_fetcher( symbol: str, indicator: str, **kwargs: Any ) -> pd.DataFrame: """获取投资情绪数据""" if indicator == "用户关注指数": df = ak.stock_comment_detail_scrd_focus_em(symbol) elif indicator == "日度市场参与意愿": df = ak.stock_comment_detail_scrd_desire_daily_em(symbol) # elif indicator == "northbound_flow": # df = ak.stock_hsgt_fund_flow_summary_em() elif indicator == "股票评级记录": df = ak.stock_institute_recommend_detail(symbol) elif indicator == "机构参与度": df = ak.stock_comment_detail_zlkp_jgcyd_em(symbol) return df def get_all_investor_sentiment_fetcher( symbol: str, **kwargs: Any ) -> pd.DataFrame: df_list = [] indicators = [ "用户关注指数", "日度市场参与意愿", "股票评级记录", "机构参与度", ] for indicator in indicators: indicator_df = get_investor_sentiment_fetcher(symbol, indicator, **kwargs) if indicator_df is not None and not indicator_df.empty: # 为DataFrame添加指标名称列,以便区分不同指标的数据 indicator_df['indicator'] = indicator df_list.append(indicator_df) if df_list: # 使用 pd.concat 一次性合并所有DataFrame df = pd.concat(df_list, ignore_index=True) else: # 如果没有获取到任何数据,返回空的DataFrame df = pd.DataFrame() return df df = get_all_investor_sentiment_fetcher(symbol) if df.empty: df = pd.DataFrame() return _format_dataframe_output(df, output_format) @mcp.tool(name="get_shareholder_info", description="获取指定股票的股东情况") def get_shareholder_info( symbol: Annotated[str, Field(description="股票代码 (例如: '000001')")] , output_format: Annotated[ Literal["json", "csv", "xml", "excel", "markdown", "html"], Field(description="输出数据格式: json, csv, xml, excel, markdown, html。默认: markdown"), ] = "markdown" ) -> str: """获取股东情况""" def get_shareholder_info_fetcher( symbol: str, **kwargs: Any ) -> pd.DataFrame: """获取股东数据""" return ak.stock_zh_a_gdhs_detail_em(symbol) df = get_shareholder_info_fetcher(symbol) if df.empty: df = pd.DataFrame() return _format_dataframe_output(df, output_format) @mcp.tool(name="get_product_info", description="获取公司主要产品/业务构成") def get_product_info( symbol: Annotated[str, Field(description="股票代码 (例如: '000001')")], output_format: Annotated[ Literal["json", "csv", "xml", "excel", "markdown", "html"], Field(description="输出数据格式: json, csv, xml, excel, markdown, html。默认: markdown"), ] = "markdown" ) -> str: """获取产品情况""" def get_product_info_fetcher( source:str,**kwargs: Any ) -> pd.DataFrame: if source == "ths": df = ak.stock_zyjs_ths(symbol) # elif data_source == "eastmoney": else: df = ak.stock_zygc_em(symbol) return df df = _fetch_data_with_fallback( fetch_func=get_product_info_fetcher, primary_source="ths", fallback_sources=["eastmoney"], symbol=symbol, ) if df.empty: df = pd.DataFrame() return _format_dataframe_output(df, output_format) @mcp.tool(name="get_profit_forecast", description="获取股票的业绩预测数据,包括预测年报净利润和每股收益") def get_profit_forecast( symbol: Annotated[str, Field(description="股票代码 (例如: '600519')")], output_format: Annotated[ Literal["json", "csv", "xml", "excel", "markdown", "html"], Field(description="输出数据格式: json, csv, xml, excel, markdown, html。默认: markdown"), ] = "markdown" ) -> str: """ 获取股票的业绩预测数据。 """ supported_indicators = ["预测年报净利润", "预测年报每股收益"] df_list = [] for ind in supported_indicators: temp_df = ak.stock_profit_forecast_ths(symbol=symbol, indicator=ind) if not temp_df.empty: temp_df["indicator"] = ind # 添加指标列以便区分 df_list.append(temp_df) if df_list: df = pd.concat(df_list, ignore_index=True) else: df = pd.DataFrame() return _format_dataframe_output(df, output_format) @mcp.tool( name="get_stock_fhps_detail", description="获取指定股票的分红配送情况" ) def get_stock_fhps_detail( symbol: Annotated[str, Field(description="股票代码 (例如: '000001')")], output_format: Annotated[ Literal["json", "csv", "xml", "excel", "markdown", "html"], Field(description="输出数据格式: json, csv, xml, excel, markdown, html。默认: markdown"), ] = "markdown" ) -> str: """获取指定股票的分红配送情况,支持降级使用 stock_fhps_detail_ths 和 stock_dividend_cninfo.""" def _fhps_detail_fetcher(source: str, **kwargs: Any) -> pd.DataFrame: if source == "em": return ak.stock_fhps_detail_em(**kwargs) elif source == "ths": return ak.stock_fhps_detail_ths(**kwargs) elif source == "cninfo": return ak.stock_dividend_cninfo(symbol=kwargs["symbol"]) else: raise ValueError(f"不支持的数据源: {source}") df = _fetch_data_with_fallback( fetch_func=_fhps_detail_fetcher, primary_source="em", fallback_sources=["ths", "cninfo"], symbol=symbol, ) if df.empty: df = pd.DataFrame() return _format_dataframe_output(df, output_format) @mcp.tool( name="get_stock_cyq", description="获取指定股票的筹码分布情况" ) def get_stock_cyq( symbol: Annotated[str, Field(description="股票代码 (例如: '000001')")], output_format: Annotated[ Literal["json", "csv", "xml", "excel", "markdown", "html"], Field(description="输出数据格式: json, csv, xml, excel, markdown, html。默认: markdown"), ] = "markdown" ) -> str: """获取指定股票的筹码分布情况.""" df = ak.stock_cyq_em(symbol=symbol) if df.empty: df = pd.DataFrame() return _format_dataframe_output(df, output_format) @mcp.tool( name="get_stock_research_report", description="获取指定股票的个股研报及盈利预测" ) def get_stock_research_report( symbol: Annotated[str, Field(description="股票代码 (例如: '000001')")], output_format: Annotated[ Literal["json", "csv", "xml", "excel", "markdown", "html"], Field(description="输出数据格式: json, csv, xml, excel, markdown, html。默认: markdown"), ] = "markdown" ) -> str: """获取指定股票的个股研报及盈利预测.""" df = ak.stock_research_report_em(symbol=symbol) if df.empty: df = pd.DataFrame() return _format_dataframe_output(df, output_format) @mcp.tool( name="get_stock_circulate_stock_holder", description="获取指定股票的流通股东情况" ) def get_stock_circulate_stock_holder( symbol: Annotated[str, Field(description="股票代码 (例如: '000001')")], output_format: Annotated[ Literal["json", "csv", "xml", "excel", "markdown", "html"], Field(description="输出数据格式: json, csv, xml, excel, markdown, html。默认: markdown"), ] = "markdown" ) -> str: """获取指定股票的流通股东情况,支持降级使用 stock_main_stock_holder.""" def _circulate_stock_holder_fetcher(source: str, **kwargs: Any) -> pd.DataFrame: if source == "akshare": return ak.stock_circulate_stock_holder(**kwargs) elif source == "main_stock_holder": return ak.stock_main_stock_holder(**kwargs) else: raise ValueError(f"不支持的数据源: {source}") df = _fetch_data_with_fallback( fetch_func=_circulate_stock_holder_fetcher, primary_source="akshare", fallback_sources=["main_stock_holder"], symbol=symbol, ) if df.empty: df = pd.DataFrame() return _format_dataframe_output(df, output_format) @mcp.tool( name="get_stock_management_change", description="获取指定股票的高管持股变动情况" ) def get_stock_management_change( symbol: Annotated[str, Field(description="股票代码 (例如: '000001')")], output_format: Annotated[ Literal["json", "csv", "xml", "excel", "markdown", "html"], Field(description="输出数据格式: json, csv, xml, excel, markdown, html。默认: markdown"), ] = "markdown" ) -> str: """获取指定股票的高管持股变动情况,支持降级使用 stock_shareholder_change_ths、stock_share_hold_change_szse、stock_share_hold_change_bse.""" def _management_change_fetcher(source: str, **kwargs: Any) -> pd.DataFrame: if source == "ths": return ak.stock_management_change_ths(**kwargs) elif source == "shareholder_change_ths": return ak.stock_shareholder_change_ths(**kwargs) else: raise ValueError(f"不支持的数据源: {source}") df = _fetch_data_with_fallback( fetch_func=_management_change_fetcher, primary_source="ths", fallback_sources=["shareholder_change_ths"], symbol=symbol, ) if df.empty: df = pd.DataFrame() return _format_dataframe_output(df, output_format) @mcp.tool( name="get_stock_restricted_release_queue", description="获取指定股票的个股限售解禁情况" ) def get_stock_restricted_release_queue( symbol: Annotated[str, Field(description="股票代码 (例如: '000001')")], output_format: Annotated[ Literal["json", "csv", "xml", "excel", "markdown", "html"], Field(description="输出数据格式: json, csv, xml, excel, markdown, html。默认: markdown"), ] = "markdown" ) -> str: """获取指定股票的个股限售解禁情况,支持降级使用 stock_restricted_release_queue_em.""" def _restricted_release_queue_fetcher(source: str, **kwargs: Any) -> pd.DataFrame: if source == "sina": return ak.stock_restricted_release_queue_sina(**kwargs) elif source == "em": return ak.stock_restricted_release_queue_em(**kwargs) else: raise ValueError(f"不支持的数据源: {source}") df = _fetch_data_with_fallback( fetch_func=_restricted_release_queue_fetcher, primary_source="sina", fallback_sources=["em"], symbol=symbol, ) if df.empty: df = pd.DataFrame() return _format_dataframe_output(df, output_format) @mcp.tool( name="get_stock_value", description="获取指定股票的个股估值分析数据" ) def get_stock_value( symbol: Annotated[str, Field(description="股票代码 (例如: '000001')")], output_format: Annotated[ Literal["json", "csv", "xml", "excel", "markdown", "html"], Field(description="输出数据格式: json, csv, xml, excel, markdown, html。默认: markdown"), ] = "markdown" ) -> str: """获取指定股票的个股估值分析数据.""" df = ak.stock_value_em(symbol=symbol) if df.empty: df = pd.DataFrame() return _format_dataframe_output(df, output_format) @mcp.tool( name="get_stock_a_code_name", description="获取沪深京 A 股股票代码和股票简称数据" ) def get_stock_a_code_name( output_format: Annotated[ Literal["json", "csv", "xml", "excel", "markdown", "html"], Field(description="输出数据格式: json, csv, xml, excel, markdown, html。默认: markdown"), ] = "markdown" ) -> str: """获取沪深京 A 股股票代码和股票简称数据.""" @cached_data_fetch() def get_stock_a_code_name_fetch() -> pd.DataFrame: return ak.stock_info_a_code_name() df = get_stock_a_code_name_fetch() if df.empty: df = pd.DataFrame() return _format_dataframe_output(df, output_format) @mcp.tool( name="get_stock_volatility", description="通过分钟级历史行情计算指定个股的波动率指标" ) def get_stock_volatility( symbol: Annotated[str, Field(description="股票代码 (例如: '000001')")], start_date: Annotated[ str, Field(description="开始日期,格式为 YYYY-MM-DD HH:MM:SS (例如: '2021-10-20 09:30:00')") ], end_date: Annotated[ str, Field(description="结束日期,格式为 YYYY-MM-DD HH:MM:SS (例如: '2024-11-01 15:00:00')") ], period: Annotated[ Literal["1", "5", "15", "30", "60"], Field(description="分钟级历史行情时间周期,分钟级别 (例如: '1', '5', '15', '30', '60')"), ], adjust: Annotated[ Literal["none", "qfq", "hfq"], Field(description="复权类型: none, qfq(前复权), hfq(后复权)。默认:none"), ] = "none", output_format: Annotated[ Literal["json", "csv", "xml", "excel", "markdown", "html"], Field(description="输出数据格式: json, csv, xml, excel, markdown, html。默认: markdown"), ] = "markdown" ) -> str: """计算指定个股的已实现波动率指标.""" stock_df = ak.rv_from_stock_zh_a_hist_min_em( symbol=symbol, start_date=start_date, end_date=end_date, period=period, adjust=adjust, ) if stock_df.empty: return _format_dataframe_output(pd.DataFrame(), output_format) volatility_yz_rv_df = ak.volatility_yz_rv(data=stock_df) return _format_dataframe_output(volatility_yz_rv_df, output_format) @mcp.tool( name="get_all_cni_indices", description="获取所有指数的代码和基本信息" ) def get_all_cni_indices( output_format: Annotated[ Literal["json", "csv", "xml", "excel", "markdown", "html"], Field(description="输出数据格式: json, csv, xml, excel, markdown, html。默认: markdown"), ] = "markdown" ) -> str: """获取所有指数的代码和基本信息""" @cached_data_fetch() def _fetch_all_cni_indices() -> pd.DataFrame: df = ak.index_all_cni() # 去除实时变动数据 columns_to_drop = [ "收盘点位", "涨跌幅", "PE滚动", "成交量", "成交额", "总市值", "自由流通市值", ] df = df.drop(columns=[col for col in columns_to_drop if col in df.columns]) return df df = _fetch_all_cni_indices() if df.empty: df = pd.DataFrame() return _format_dataframe_output(df, output_format) @mcp.tool( name="get_cni_index_hist", description="获取指定指数的日频率历史行情数据" ) def get_cni_index_hist( symbol: Annotated[str, Field(description="指数代码 (例如: '399005')")], start_date: Annotated[str, Field(description="开始日期,格式为 YYYYMMDD (例如: '20230114')")], end_date: Annotated[str, Field(description="结束日期,格式为 YYYYMMDD (例如: '20240114')")], output_format: Annotated[ Literal["json", "csv", "xml", "excel", "markdown", "html"], Field(description="输出数据格式: json, csv, xml, excel, markdown, html。默认: markdown"), ] = "markdown" ) -> str: """获取指定指数的日频率历史行情数据.""" df = ak.index_hist_cni(symbol=symbol, start_date=start_date, end_date=end_date) if df.empty: df = pd.DataFrame() return _format_dataframe_output(df, output_format) @mcp.tool( name="get_cni_index_detail", description="获取指定指数的成分股样本详情" ) def get_cni_index_detail( symbol: Annotated[str, Field(description="指数代码 (例如: '399001')")], date: Annotated[str, Field(description="日期,格式为 YYYYMM (例如: '202404')")], output_format: Annotated[ Literal["json", "csv", "xml", "excel", "markdown", "html"], Field(description="输出数据格式: json, csv, xml, excel, markdown, html。默认: markdown"), ] = "markdown" ) -> str: """获取指定指数的成分股样本详情.""" df = ak.index_detail_cni(symbol=symbol, date=date) if df.empty: df = pd.DataFrame() return _format_dataframe_output(df, output_format) @mcp.tool( name="get_stock_technical_rank", description="获取技术选股指标数据,包括创新高、创新低、连续上涨、连续下跌、持续放量、持续缩量、向上突破、向下突破、量价齐升、量价齐跌、险资举牌。" ) def get_stock_technical_rank( indicator_name: Annotated[ Literal[ "创新高-创月新高", "创新高-半年新高", "创新高-一年新高", "创新高-历史新高", "创新低-创月新低", "创新低-半年新低", "创新低-一年新低", "创新低-历史新低", "连续上涨", "连续下跌", "持续放量", "持续缩量", "向上突破-5日均线", "向上突破-10日均线", "向上突破-20日均线", "向上突破-30日均线", "向上突破-60日均线", "向上突破-90日均线", "向上突破-250日均线", "向上突破-500日均线", "向下突破-5日均线", "向下突破-10日均线", "向下突破-20日均线", "向下突破-30日均线", "向下突破-60日均线", "向下突破-90日均线", "向下突破-250日均线", "向下突破-500日均线", "量价齐升", "量价齐跌", "险资举牌", ], Field(description="要获取的技术指标名称"), ], output_format: Annotated[ Literal["json", "csv", "xml", "excel", "markdown", "html"], Field(description="输出数据格式: json, csv, xml, excel, markdown, html。默认: markdown"), ] = "markdown" ) -> str: """根据指定的指标名称获取同花顺技术选股数据。""" df = pd.DataFrame() if indicator_name.startswith("创新高"): symbol_map = { "创新高-创月新高": "创月新高", "创新高-半年新高": "半年新高", "创新高-一年新高": "一年新高", "创新高-历史新高": "历史新高", } symbol_param = symbol_map.get(indicator_name) if symbol_param: df = ak.stock_rank_cxg_ths(symbol=symbol_param) elif indicator_name.startswith("创新低"): symbol_map = { "创新低-创月新低": "创月新低", "创新低-半年新低": "半年新低", "创新低-一年新低": "一年新低", "创新低-历史新低": "历史新低", } symbol_param = symbol_map.get(indicator_name) if symbol_param: df = ak.stock_rank_cxd_ths(symbol=symbol_param) elif indicator_name == "连续上涨": df = ak.stock_rank_lxsz_ths() elif indicator_name == "连续下跌": df = ak.stock_rank_lxxd_ths() elif indicator_name == "持续放量": df = ak.stock_rank_cxfl_ths() elif indicator_name == "持续缩量": df = ak.stock_rank_cxsl_ths() elif indicator_name.startswith("向上突破"): symbol_map = { "向上突破-5日均线": "5日均线", "向上突破-10日均线": "10日均线", "向上突破-20日均线": "20日均线", "向上突破-30日均线": "30日均线", "向上突破-60日均线": "60日均线", "向上突破-90日均线": "90日均线", "向上突破-250日均线": "250日均线", "向上突破-500日均线": "500日均线", } symbol_param = symbol_map.get(indicator_name) if symbol_param: df = ak.stock_rank_xstp_ths(symbol=symbol_param) elif indicator_name.startswith("向下突破"): symbol_map = { "向下突破-5日均线": "5日均线", "向下突破-10日均线": "10日均线", "向下突破-20日均线": "20日均线", "向下突破-30日均线": "30日均线", "向下突破-60日均线": "60日均线", "向下突破-90日均线": "90日均线", "向下突破-250日均线": "250日均线", "向下突破-500日均线": "500日均线", } symbol_param = symbol_map.get(indicator_name) if symbol_param: df = ak.stock_rank_xxtp_ths(symbol=symbol_param) elif indicator_name == "量价齐升": df = ak.stock_rank_ljqs_ths() elif indicator_name == "量价齐跌": df = ak.stock_rank_ljqd_ths() elif indicator_name == "险资举牌": df = ak.stock_rank_xzjp_ths() if df.empty: df = pd.DataFrame() return _format_dataframe_output(df, output_format) @mcp.tool( name="get_stock_board_industry_summary", description="获取所有行业板块实时行情数据" ) def get_stock_board_industry_summary( output_format: Annotated[ Literal["json", "csv", "xml", "excel", "markdown", "html"], Field(description="输出数据格式: json, csv, xml, excel, markdown, html。默认: markdown"), ] = "markdown" ) -> str: """获取同花顺行业板块实时行情数据.""" def _fetch_stock_board_industry_summary_ths() -> pd.DataFrame: return ak.stock_board_industry_summary_ths() df = _fetch_stock_board_industry_summary_ths() if df.empty: df = pd.DataFrame() return _format_dataframe_output(df, output_format)

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/xinkuang/china-stock-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server