Skip to main content
Glama

get_full_market_data

Retrieve comprehensive historical and real-time market data for specified stock codes, with customizable periods, date ranges, and fields to analyze trends effectively.

Instructions

获取历史+最新行情数据

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
codesYes股票代码列表,用逗号分隔,例如 "000001.SZ,600519.SH"
end_dateNo结束日期,格式为 "YYYYMMDD",为空表示当前日期
fieldsNo字段列表,用逗号分隔,为空表示所有字段
periodNo周期,例如 "1d", "1m", "5m" 等1d
start_dateNo开始日期,格式为 "YYYYMMDD"

Implementation Reference

  • The primary handler function that executes the tool logic. It parses input, validates codes, fetches full market data using xtdata.get_market_data with historical parameters (count=-1), converts numpy arrays to lists, and returns the data as a dictionary.
    async def get_full_market_data(input: GetMarketDataInput) -> Dict[str, Any]:
        """
        获取历史+最新行情数据
        
        Args:
            codes: 股票代码列表,用逗号分隔,例如 "000001.SZ,600519.SH"
            period: 周期,例如 "1d", "1m", "5m" 等
            start_date: 开始日期,格式为 "YYYYMMDD"
            end_date: 结束日期,格式为 "YYYYMMDD",为空表示当前日期
            fields: 字段列表,用逗号分隔,为空表示所有字段
            
        Returns:
            历史+最新行情数据
        """
        try:
            # 确保XTQuant数据中心已初始化
            ensure_xtdc_initialized()
            
            if xtdata is None:
                return {"error": "xtdata模块未正确加载"}
            
            # 解析股票代码列表
            codes = [code.strip() for code in input.codes.split(",") if code.strip()]
            if not codes:
                return {"error": "未提供有效的股票代码"}
            
            # 过滤有效的股票代码
            valid_codes = []
            for code in codes:
                # 检查股票代码格式
                if "." in code and len(code) >= 6:
                    valid_codes.append(code)
            
            if not valid_codes:
                return {"error": "未提供有效的股票代码"}
            
            # 解析字段列表
            fields = []
            if input.fields:
                fields = [field.strip() for field in input.fields.split(",") if field.strip()]
            
            # 如果未指定字段,使用默认字段
            if not fields:
                fields = ["open", "high", "low", "close", "volume"]
            
            print(f"获取历史+最新行情数据: 股票={valid_codes}, 周期={input.period}, 字段={fields}, 开始日期={input.start_date}, 结束日期={input.end_date}")
            
            try:
                # 获取历史+最新行情数据
                print(f"调用xtdata.get_market_data({fields}, {valid_codes}, {input.period}, {input.start_date}, {input.end_date}, count=-1)")
                data = xtdata.get_market_data(fields, valid_codes, period=input.period, 
                                              start_time=input.start_date, end_time=input.end_date, count=-1)
                
                # 处理返回值
                if data is None:
                    return {"error": "获取历史+最新行情数据失败"}
                
                # 将数据转换为可序列化的格式
                result = {}
                for code, stock_data in data.items():
                    code_result = {}
                    for field, values in stock_data.items():
                        # 将numpy数组转换为列表
                        if hasattr(values, "tolist"):
                            code_result[field] = values.tolist()
                        else:
                            code_result[field] = list(values)
                    result[code] = code_result
                
                return result
            except Exception as e:
                print(f"获取历史+最新行情数据出错: {str(e)}")
                traceback.print_exc()
                return {"error": f"获取历史+最新行情数据失败: {str(e)}"}
        except Exception as e:
            print(f"处理历史+最新行情数据请求出错: {str(e)}")
            traceback.print_exc()
            return {"error": str(e)}
  • Pydantic model defining the input schema for the get_full_market_data tool (shared with similar market data tools), including codes, period, dates, and fields.
    class GetMarketDataInput(BaseModel):
        codes: str  # 股票代码列表,用逗号分隔,例如 "000001.SZ,600519.SH"
        period: str = "1d"  # 周期,例如 "1d", "1m", "5m" 等
        start_date: str = ""  # 开始日期,格式为 "YYYYMMDD"
        end_date: str = ""  # 结束日期,格式为 "YYYYMMDD",为空表示当前日期
        fields: str = ""  # 字段列表,用逗号分隔,为空表示所有字段
  • Registration of the tool in the @server.list_tools() function, specifying name, description, and JSON input schema matching the Pydantic model.
    types.Tool(
        name="get_full_market_data",
        description="获取历史+最新行情数据",
        inputSchema={
            "type": "object",
            "properties": {
                "codes": {
                    "type": "string",
                    "description": "股票代码列表,用逗号分隔,例如 \"000001.SZ,600519.SH\""
                },
                "period": {
                    "type": "string",
                    "description": "周期,例如 \"1d\", \"1m\", \"5m\" 等",
                    "default": "1d"
                },
                "start_date": {
                    "type": "string",
                    "description": "开始日期,格式为 \"YYYYMMDD\"",
                    "default": ""
                },
                "end_date": {
                    "type": "string",
                    "description": "结束日期,格式为 \"YYYYMMDD\",为空表示当前日期",
                    "default": ""
                },
                "fields": {
                    "type": "string",
                    "description": "字段列表,用逗号分隔,为空表示所有字段",
                    "default": ""
                }
            },
            "required": ["codes"]
        }
  • Dispatch logic in the @server.call_tool() handler that parses arguments, creates the input model, calls the get_full_market_data function, and formats the JSON response.
    elif name == "get_full_market_data":
        if not arguments or "codes" not in arguments:
            return [types.TextContent(type="text", text="错误: 缺少必要参数 'codes'")]
        
        # 处理字符串格式的codes参数
        codes_str = arguments["codes"]
        codes = [code.strip() for code in codes_str.split(",") if code.strip()]
        
        period = arguments.get("period", "1d")
        start_date = arguments.get("start_date", "")
        end_date = arguments.get("end_date", "")
        
        # 处理字符串格式的fields参数
        fields_str = arguments.get("fields", "")
        fields = [field.strip() for field in fields_str.split(",") if field.strip()]
        
        input_model = GetMarketDataInput(codes=codes_str, period=period, start_date=start_date, end_date=end_date, fields=fields_str)
        result = await get_full_market_data(input_model)
        return [types.TextContent(type="text", text=json.dumps(result, ensure_ascii=False, indent=2))]
Install Server

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/dfkai/xtquantai'

If you have feedback or need assistance with the MCP directory API, please join our Discord server