get_multiple_studies
Retrieve detailed information for multiple cancer genomics studies at once using specified study IDs. Simplify data exploration and analysis for comprehensive insights.
Instructions
Get details for multiple studies concurrently.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| study_ids | Yes |
Implementation Reference
- Core handler implementation that fetches multiple study details concurrently using asyncio.gather, processes results into a dictionary with metadata including execution time and error count.@handle_api_errors("get multiple studies") async def get_multiple_studies(self, study_ids: List[str]) -> Dict: """ Get details for multiple studies concurrently. This method demonstrates the power of async concurrency by fetching multiple studies in parallel, which is much faster than sequential requests. Args: study_ids: List of study IDs to fetch Returns: Dictionary mapping study IDs to their details, with metadata about the operation """ if not study_ids: return { "studies": {}, "metadata": {"count": 0, "errors": 0, "concurrent": True}, } # Create a reusable async function for fetching a single study async def fetch_study(study_id): try: data = await self.api_client.make_api_request(f"studies/{study_id}") return {"study_id": study_id, "data": data, "success": True} except Exception as e: return {"study_id": study_id, "error": str(e), "success": False} # Create tasks for all study IDs and run them concurrently tasks = [fetch_study(study_id) for study_id in study_ids] start_time = time.perf_counter() # Use asyncio.gather to execute all tasks concurrently results = await asyncio.gather(*tasks) end_time = time.perf_counter() # Process results into a structured response studies_dict = {} error_count = 0 for result in results: if result["success"]: studies_dict[result["study_id"]] = result["data"] else: studies_dict[result["study_id"]] = {"error": result["error"]} error_count += 1 return { "studies": studies_dict, "metadata": { "count": len(study_ids), "errors": error_count, "execution_time": round(end_time - start_time, 3), "concurrent": True, }, }
- cbioportal_mcp/server.py:211-213 (handler)Top-level MCP tool handler method that delegates to the StudiesEndpoints implementation.async def get_multiple_studies(self, study_ids: List[str]) -> Dict: """Get details for multiple studies concurrently.""" return await self.studies.get_multiple_studies(study_ids)
- cbioportal_mcp/server.py:96-131 (registration)Registers the get_multiple_studies method (and others) as an MCP tool using FastMCP's add_tool.def _register_tools(self): """Register tool methods as MCP tools.""" # List of methods to register as tools (explicitly defined) tool_methods = [ # Pagination utilities "paginate_results", "collect_all_results", # Studies endpoints "get_cancer_studies", "get_cancer_types", "search_studies", "get_study_details", "get_multiple_studies", # Genes endpoints "search_genes", "get_genes", "get_multiple_genes", "get_mutations_in_gene", # Samples endpoints "get_samples_in_study", "get_sample_list_id", # Molecular profiles endpoints "get_molecular_profiles", "get_clinical_data", "get_gene_panels_for_study", "get_gene_panel_details", ] for method_name in tool_methods: if hasattr(self, method_name): method = getattr(self, method_name) self.mcp.add_tool(method) logger.debug(f"Registered tool: {method_name}") else: logger.warning(f"Method {method_name} not found for tool registration")