get_stock_restricted_release_queue
Retrieve restricted share release schedules for Chinese stocks to track when locked-up shares become available for trading on the market.
Instructions
获取指定股票的个股限售解禁情况
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| symbol | Yes | 股票代码 (例如: '000001') | |
| output_format | No | 输出数据格式: json, csv, xml, excel, markdown, html。默认: markdown | markdown |
Implementation Reference
- src/china_stock_mcp/server.py:857-859 (registration)Registration of the 'get_stock_restricted_release_queue' tool using the @mcp.tool decorator.@mcp.tool( name="get_stock_restricted_release_queue", description="获取指定股票的个股限售解禁情况" )
- Input schema for the tool: symbol (stock code) and output_format.def get_stock_restricted_release_queue( symbol: Annotated[str, Field(description="股票代码 (例如: '000001')")], output_format: Annotated[ Literal["json", "csv", "xml", "excel", "markdown", "html"], Field(description="输出数据格式: json, csv, xml, excel, markdown, html。默认: markdown"), ] = "markdown" ) -> str:
- src/china_stock_mcp/server.py:867-886 (handler)Handler implementation: defines a fetcher for sina and em sources, uses _fetch_data_with_fallback to get data, handles empty df, and formats output."""获取指定股票的个股限售解禁情况,支持降级使用 stock_restricted_release_queue_em.""" def _restricted_release_queue_fetcher(source: str, **kwargs: Any) -> pd.DataFrame: if source == "sina": return ak.stock_restricted_release_queue_sina(**kwargs) elif source == "em": return ak.stock_restricted_release_queue_em(**kwargs) else: raise ValueError(f"不支持的数据源: {source}") df = _fetch_data_with_fallback( fetch_func=_restricted_release_queue_fetcher, primary_source="sina", fallback_sources=["em"], symbol=symbol, ) if df.empty: df = pd.DataFrame() return _format_dataframe_output(df, output_format)
- src/china_stock_mcp/server.py:15-72 (helper)Shared helper function for data fetching with fallback sources, used by the tool.def _fetch_data_with_fallback( fetch_func: Callable[..., pd.DataFrame], primary_source: str, fallback_sources: List[str], **kwargs: Any, ) -> pd.DataFrame: """ 通用的数据源故障切换辅助函数。 按优先级尝试数据源,直到获取到有效数据或所有数据源都失败。 Args: fetch_func: 实际调用 akshare 或 akshare_one 获取数据的函数。 这个函数应该接受 'source' 参数,或者在内部处理 source 的映射。 primary_source: 用户指定的首选数据源。 fallback_sources: 备用数据源列表,按优先级排序。 **kwargs: 传递给 fetch_func 的其他参数。 Returns: pd.DataFrame: 获取到的数据。 Raises: RuntimeError: 如果所有数据源都未能获取到有效数据。 """ if primary_source is None: return fetch_func(**kwargs) data_source_priority = [primary_source] + fallback_sources # 移除重复项并保持顺序 seen = set() unique_data_source_priority = [] for x in data_source_priority: if x not in seen: unique_data_source_priority.append(x) seen.add(x) df = None errors = [] for current_source in unique_data_source_priority: try: # 假设 fetch_func 能够接受 source 参数 # 或者 fetch_func 内部根据 kwargs 中的 source 参数进行逻辑判断 temp_df = fetch_func(source=current_source, **kwargs) if temp_df is not None and not temp_df.empty: print(f"成功从数据源 '{current_source}' 获取数据。") df = temp_df break else: errors.append(f"数据源 '{current_source}' 返回空数据。") except Exception as e: errors.append(f"从数据源 '{current_source}' 获取数据失败: {str(e)}") if df is None or df.empty: raise RuntimeError( f"所有数据源都未能获取到有效数据。详细错误: {'; '.join(errors)}" ) return df