stock_zt_pool_strong_em
Retrieve Chinese A-share market (Shanghai, Shenzhen) strong stock pool data to identify high-performing stocks for analysis and decision-making.
Instructions
获取中国A股市场(上证、深证)的强势股池数据
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| date | No | 交易日日期(可选),默认为最近的交易日,格式: 20251231 | |
| limit | No | 返回数量(int,30-100) |
Implementation Reference
- mcp_aktools/__init__.py:267-280 (handler)The main handler function for the 'stock_zt_pool_strong_em' tool. It fetches data using ak_cache on ak.stock_zt_pool_strong_em, processes the DataFrame by dropping columns, sorting by '成交额', limiting rows, and returns formatted CSV.def stock_zt_pool_strong_em( date: str = Field("", description="交易日日期(可选),默认为最近的交易日,格式: 20251231"), limit: int = Field(50, description="返回数量(int,30-100)", strict=False), ): if not date: date = recent_trade_date().strftime("%Y%m%d") dfs = ak_cache(ak.stock_zt_pool_strong_em, date=date, ttl=1200) try: dfs.drop(columns=["序号", "流通市值", "总市值"], inplace=True) except Exception: pass dfs.sort_values("成交额", ascending=False, inplace=True) dfs = dfs.head(int(limit)) return dfs.to_csv(index=False, float_format="%.2f").strip()
- mcp_aktools/__init__.py:263-266 (registration)The @mcp.tool decorator registers the 'stock_zt_pool_strong_em' tool with FastMCP, including title and description.@mcp.tool( title="A股强势股池", description="获取中国A股市场(上证、深证)的强势股池数据", )
- mcp_aktools/__init__.py:268-269 (schema)Pydantic Field definitions for the tool's input parameters: date (optional trading date) and limit (number of results).date: str = Field("", description="交易日日期(可选),默认为最近的交易日,格式: 20251231"), limit: int = Field(50, description="返回数量(int,30-100)", strict=False),
- mcp_aktools/__init__.py:564-580 (helper)Helper function for caching akshare API calls, used in the tool to cache ak.stock_zt_pool_strong_em results with TTL.def ak_cache(fun, *args, **kwargs) -> pd.DataFrame | None: key = kwargs.pop("key", None) if not key: key = f"{fun.__name__}-{args}-{kwargs}" ttl1 = kwargs.pop("ttl", 86400) ttl2 = kwargs.pop("ttl2", None) cache = CacheKey.init(key, ttl1, ttl2) all = cache.get() if all is None: try: _LOGGER.info("Request akshare: %s", [key, args, kwargs]) all = fun(*args, **kwargs) cache.set(all) except Exception as exc: _LOGGER.exception(str(exc)) return all
- mcp_aktools/__init__.py:229-239 (helper)Helper function to determine the most recent trading date, used to default the 'date' parameter.def recent_trade_date(): now = datetime.now().date() dfs = ak_cache(ak.tool_trade_date_hist_sina, ttl=43200) if dfs is None: return now dfs.sort_values("trade_date", ascending=False, inplace=True) for d in dfs["trade_date"]: if d <= now: return d return now