Skip to main content
Glama
liqiongyu

Xueqiu MCP

by liqiongyu

capital_flow

Retrieve real-time capital inflow and outflow data for Chinese stocks, providing minute-by-minute updates on stock code performance.

Instructions

获取当日资金流如流出数据,每分钟数据

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
stock_codeNoSZ000002

Implementation Reference

  • main.py:106-110 (handler)
    The handler function for the 'capital_flow' MCP tool. It calls pysnowball's ball.capital_flow to fetch intraday capital flow data (minute-level) for a given stock code and processes the result using process_data before returning it as a dict.
    @mcp.tool() def capital_flow(stock_code: str="SZ000002") -> dict: """获取当日资金流如流出数据,每分钟数据""" result = ball.capital_flow(stock_code) return process_data(result)
  • main.py:34-61 (helper)
    Shared helper function used by the capital_flow tool (and others) to process the raw data, primarily converting timestamps to readable datetime strings.
    def process_data(data, process_config=None): """ 通用数据处理函数,可扩展添加各种数据处理操作 Args: data: 原始数据 process_config: 处理配置字典,用于指定要执行的处理操作 例如: {'convert_timestamps': True, 'other_process': params} Returns: 处理后的数据 """ if process_config is None: # 默认配置 process_config = { 'convert_timestamps': True } # 如果开启了时间戳转换 if process_config.get('convert_timestamps', True): data = convert_timestamps(data) # 在这里可以添加更多的数据处理逻辑 # 例如: # if 'format_numbers' in process_config: # data = format_numbers(data, **process_config['format_numbers']) return data
  • main.py:14-31 (helper)
    Helper function called by process_data to recursively convert timestamp fields in the data to formatted datetime strings.
    def convert_timestamps(data): """递归地将数据中的所有 timestamp 转换为 datetime 字符串""" if isinstance(data, dict): for key, value in list(data.items()): if key == 'timestamp' and isinstance(value, (int, float)) and value > 1000000000000: # 毫秒级时间戳 data[key] = datetime.datetime.fromtimestamp(value/1000).strftime('%Y-%m-%d %H:%M:%S') elif key == 'timestamp' and isinstance(value, (int, float)) and value > 1000000000: # 秒级时间戳 data[key] = datetime.datetime.fromtimestamp(value).strftime('%Y-%m-%d %H:%M:%S') elif key.endswith('_date') and isinstance(value, (int, float)) and value > 1000000000000: # 毫秒级时间戳 data[key] = datetime.datetime.fromtimestamp(value/1000).strftime('%Y-%m-%d %H:%M:%S') elif key.endswith('_date') and isinstance(value, (int, float)) and value > 1000000000: # 秒级时间戳 data[key] = datetime.datetime.fromtimestamp(value).strftime('%Y-%m-%d %H:%M:%S') elif isinstance(value, (dict, list)): data[key] = convert_timestamps(value) elif isinstance(data, list): for i, item in enumerate(data): data[i] = convert_timestamps(item) return data

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/liqiongyu/xueqiu_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server