Skip to main content
Glama
al-one

MCP Server for stock and crypto

stock_sector_fund_flow_rank

Analyze fund flow trends across Chinese A-share market sectors to identify investment opportunities and track capital movements.

Instructions

获取中国A股市场(上证、深证)的行业资金流向数据

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
daysNo天数,仅支持: {'今日','5日','10日'},如果需要获取今日数据,请确保是交易日今日
cateNo仅支持: {'行业资金流','概念资金流','地域资金流'}行业资金流

Implementation Reference

  • The primary handler function for the 'stock_sector_fund_flow_rank' tool. It is decorated with @mcp.tool for registration. Fetches data from akshare via ak_cache, sorts by '今日涨跌幅', drops '序号' column, selects top 20 and bottom 20 rows, and returns as CSV string. Includes input schema via Field annotations.
    @mcp.tool(
        title="A股板块资金流",
        description="获取中国A股市场(上证、深证)的行业资金流向数据",
    )
    def stock_sector_fund_flow_rank(
        days: str = Field("今日", description="天数,仅支持: {'今日','5日','10日'},如果需要获取今日数据,请确保是交易日"),
        cate: str = Field("行业资金流", description="仅支持: {'行业资金流','概念资金流','地域资金流'}"),
    ):
        dfs = ak_cache(ak.stock_sector_fund_flow_rank, indicator=days, sector_type=cate, ttl=1200)
        if dfs is None:
            return "获取数据失败"
        try:
            dfs.sort_values("今日涨跌幅", ascending=False, inplace=True)
            dfs.drop(columns=["序号"], inplace=True)
        except Exception:
            pass
        try:
            dfs = pd.concat([dfs.head(20), dfs.tail(20)])
            return dfs.to_csv(index=False, float_format="%.2f").strip()
        except Exception as exc:
            return str(exc)
  • The ak_cache helper function used by the handler to cache the akshare data fetch with TTL support.
    def ak_cache(fun, *args, **kwargs) -> pd.DataFrame | None:
        key = kwargs.pop("key", None)
        if not key:
            key = f"{fun.__name__}-{args}-{kwargs}"
        ttl1 = kwargs.pop("ttl", 86400)
        ttl2 = kwargs.pop("ttl2", None)
        cache = CacheKey.init(key, ttl1, ttl2)
        all = cache.get()
        if all is None:
            try:
                _LOGGER.info("Request akshare: %s", [key, args, kwargs])
                all = fun(*args, **kwargs)
                cache.set(all)
            except Exception as exc:
                _LOGGER.exception(str(exc))
        return all
  • The @mcp.tool decorator registers the tool with title and description.
    @mcp.tool(
        title="A股板块资金流",
        description="获取中国A股市场(上证、深证)的行业资金流向数据",
    )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/al-one/mcp-aktools'

If you have feedback or need assistance with the MCP directory API, please join our Discord server