advanced_query
Execute complex genomic queries in batches using parallel or sequential strategies to retrieve gene information and analysis results.
Instructions
高级批量查询 - 支持复杂查询策略
Args: queries: 查询列表,每个元素包含 {"query": str, "type": str} strategy: 执行策略(parallel/sequential) delay: 查询间隔(秒)
Returns: 批量查询结果
Examples: advanced_query([ {"query": "TP53", "type": "info"}, {"query": "BRCA1", "type": "info"}, {"query": "cancer", "type": "search"} ])
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| queries | Yes | ||
| strategy | No | parallel | |
| delay | No |
Implementation Reference
- src/genome_mcp/core/tools.py:352-426 (handler)The main handler function for the 'advanced_query' tool, including execution logic for batch queries with parallel or sequential strategies. Registered via @mcp.tool() decorator.@mcp.tool() async def advanced_query( queries: list[dict[str, Any]], strategy: str = "parallel", delay: float = 0.34, # NCBI API频率限制 ) -> AdvancedQueryResult: """ 高级批量查询 - 支持复杂查询策略 Args: queries: 查询列表,每个元素包含 {"query": str, "type": str} strategy: 执行策略(parallel/sequential) delay: 查询间隔(秒) Returns: 批量查询结果 Examples: advanced_query([ {"query": "TP53", "type": "info"}, {"query": "BRCA1", "type": "info"}, {"query": "cancer", "type": "search"} ]) """ results = {} async def execute_single_query(index: int, query_dict: dict[str, Any]): try: parsed = QueryParser.parse_by_type( query_dict["query"], query_dict.get("type", "auto") ) result = await _query_executor.execute(parsed, **query_dict) results[index] = result await asyncio.sleep(delay) # 遵守频率限制 except ValidationError as e: results[index] = format_simple_error( e, query=query_dict.get("query", ""), operation="advanced_query" ) except Exception as e: results[index] = format_simple_error( e, query=query_dict.get("query", ""), operation="advanced_query" ) if strategy == "parallel": # 并发查询 await asyncio.gather( *[execute_single_query(i, q) for i, q in enumerate(queries)] ) else: # 顺序查询(适用于依赖查询) for i, query_dict in enumerate(queries): try: parsed = QueryParser.parse_by_type( query_dict["query"], query_dict.get("type", "auto") ) result = await _query_executor.execute(parsed, **query_dict) results[i] = result await asyncio.sleep(delay) # 遵守频率限制 except ValidationError as e: results[i] = format_simple_error( e, query=query_dict.get("query", ""), operation="advanced_query" ) except Exception as e: results[i] = format_simple_error( e, query=query_dict.get("query", ""), operation="advanced_query" ) return { "strategy": strategy, "total_queries": len(queries), "successful": len([r for r in results.values() if "error" not in r]), "results": results, }
- src/genome_mcp/core/types.py:59-66 (schema)Type definition (TypedDict) for the return type of the advanced_query tool.class AdvancedQueryResult(TypedDict): """高级查询结果类型""" strategy: str total_queries: int successful: int results: dict[int, dict[str, Any]]
- src/genome_mcp/core/tools.py:352-426 (registration)The @mcp.tool() decorator registers the advanced_query function with the MCP server.@mcp.tool() async def advanced_query( queries: list[dict[str, Any]], strategy: str = "parallel", delay: float = 0.34, # NCBI API频率限制 ) -> AdvancedQueryResult: """ 高级批量查询 - 支持复杂查询策略 Args: queries: 查询列表,每个元素包含 {"query": str, "type": str} strategy: 执行策略(parallel/sequential) delay: 查询间隔(秒) Returns: 批量查询结果 Examples: advanced_query([ {"query": "TP53", "type": "info"}, {"query": "BRCA1", "type": "info"}, {"query": "cancer", "type": "search"} ]) """ results = {} async def execute_single_query(index: int, query_dict: dict[str, Any]): try: parsed = QueryParser.parse_by_type( query_dict["query"], query_dict.get("type", "auto") ) result = await _query_executor.execute(parsed, **query_dict) results[index] = result await asyncio.sleep(delay) # 遵守频率限制 except ValidationError as e: results[index] = format_simple_error( e, query=query_dict.get("query", ""), operation="advanced_query" ) except Exception as e: results[index] = format_simple_error( e, query=query_dict.get("query", ""), operation="advanced_query" ) if strategy == "parallel": # 并发查询 await asyncio.gather( *[execute_single_query(i, q) for i, q in enumerate(queries)] ) else: # 顺序查询(适用于依赖查询) for i, query_dict in enumerate(queries): try: parsed = QueryParser.parse_by_type( query_dict["query"], query_dict.get("type", "auto") ) result = await _query_executor.execute(parsed, **query_dict) results[i] = result await asyncio.sleep(delay) # 遵守频率限制 except ValidationError as e: results[i] = format_simple_error( e, query=query_dict.get("query", ""), operation="advanced_query" ) except Exception as e: results[i] = format_simple_error( e, query=query_dict.get("query", ""), operation="advanced_query" ) return { "strategy": strategy, "total_queries": len(queries), "successful": len([r for r in results.values() if "error" not in r]), "results": results, }