get_multiple_studies
Retrieve detailed information for multiple cancer genomics studies simultaneously from cBioPortal to analyze genomic data, mutations, and clinical information.
Instructions
Get details for multiple studies concurrently.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| study_ids | Yes |
Implementation Reference
- The primary handler function implementing get_multiple_studies. Fetches details for multiple studies concurrently using asyncio.gather, handles errors per study, and returns a dictionary with results and metadata.
async def get_multiple_studies(self, study_ids: List[str]) -> Dict: """ Get details for multiple studies concurrently. This method demonstrates the power of async concurrency by fetching multiple studies in parallel, which is much faster than sequential requests. Args: study_ids: List of study IDs to fetch Returns: Dictionary mapping study IDs to their details, with metadata about the operation """ if not study_ids: return { "studies": {}, "metadata": {"count": 0, "errors": 0, "concurrent": True}, } # Create a reusable async function for fetching a single study async def fetch_study(study_id): try: data = await self.api_client.make_api_request(f"studies/{study_id}") return {"study_id": study_id, "data": data, "success": True} except Exception as e: return {"study_id": study_id, "error": str(e), "success": False} # Create tasks for all study IDs and run them concurrently tasks = [fetch_study(study_id) for study_id in study_ids] start_time = time.perf_counter() # Use asyncio.gather to execute all tasks concurrently results = await asyncio.gather(*tasks) end_time = time.perf_counter() # Process results into a structured response studies_dict = {} error_count = 0 for result in results: if result["success"]: studies_dict[result["study_id"]] = result["data"] else: studies_dict[result["study_id"]] = {"error": result["error"]} error_count += 1 return { "studies": studies_dict, "metadata": { "count": len(study_ids), "errors": error_count, "execution_time": round(end_time - start_time, 3), "concurrent": True, }, } - cbioportal_mcp/server.py:97-131 (registration)Registers 'get_multiple_studies' (line 108 in tool_methods list) as an MCP tool using FastMCP.add_tool in a loop over predefined tool methods.
"""Register tool methods as MCP tools.""" # List of methods to register as tools (explicitly defined) tool_methods = [ # Pagination utilities "paginate_results", "collect_all_results", # Studies endpoints "get_cancer_studies", "get_cancer_types", "search_studies", "get_study_details", "get_multiple_studies", # Genes endpoints "search_genes", "get_genes", "get_multiple_genes", "get_mutations_in_gene", # Samples endpoints "get_samples_in_study", "get_sample_list_id", # Molecular profiles endpoints "get_molecular_profiles", "get_clinical_data", "get_gene_panels_for_study", "get_gene_panel_details", ] for method_name in tool_methods: if hasattr(self, method_name): method = getattr(self, method_name) self.mcp.add_tool(method) logger.debug(f"Registered tool: {method_name}") else: logger.warning(f"Method {method_name} not found for tool registration") - cbioportal_mcp/server.py:211-213 (helper)Thin wrapper/delegator in the main server class that forwards the get_multiple_studies call to the StudiesEndpoints instance (self.studies).
async def get_multiple_studies(self, study_ids: List[str]) -> Dict: """Get details for multiple studies concurrently.""" return await self.studies.get_multiple_studies(study_ids)