create_cluster
Create a new Dataproc cluster in Google Cloud by specifying cluster name, worker instances, machine type, disk size, and image version.
Instructions
Create a new Dataproc cluster.
Args:
cluster_name: Name for the new cluster
project_id: Google Cloud project ID (optional, uses gcloud config default)
region: Dataproc region (optional, uses gcloud config default)
num_instances: Number of worker instances
machine_type: Machine type for cluster nodes
disk_size_gb: Boot disk size in GB
image_version: Dataproc image version
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| cluster_name | Yes | ||
| project_id | No | ||
| region | No | ||
| num_instances | No | ||
| machine_type | No | n1-standard-4 | |
| disk_size_gb | No | ||
| image_version | No | 2.1-debian11 |
Implementation Reference
- src/dataproc_mcp_server/server.py:70-110 (handler)MCP tool handler for 'create_cluster' decorated with @mcp.tool(). Resolves project and region, instantiates DataprocClient, and delegates to its create_cluster method.@mcp.tool() async def create_cluster( cluster_name: str, project_id: str | None = None, region: str | None = None, num_instances: int = 2, machine_type: str = "n1-standard-4", disk_size_gb: int = 100, image_version: str = "2.1-debian11", ) -> str: """Create a new Dataproc cluster. Args: cluster_name: Name for the new cluster project_id: Google Cloud project ID (optional, uses gcloud config default) region: Dataproc region (optional, uses gcloud config default) num_instances: Number of worker instances machine_type: Machine type for cluster nodes disk_size_gb: Boot disk size in GB image_version: Dataproc image version """ resolved = resolve_project_and_region(project_id, region) if isinstance(resolved, str): # Error message return resolved project_id, region = resolved client = DataprocClient() try: result = await client.create_cluster( project_id=project_id, region=region, cluster_name=cluster_name, num_instances=num_instances, machine_type=machine_type, disk_size_gb=disk_size_gb, image_version=image_version, ) return str(result) except Exception as e: logger.error("Failed to create cluster", error=str(e)) return f"Error: {str(e)}"
- Core implementation of cluster creation in DataprocClient class. Constructs ClusterConfig using provided parameters and invokes the Google Cloud Dataproc API's create_cluster method via run_in_executor.async def create_cluster( self, project_id: str, region: str, cluster_name: str, num_instances: int = 2, machine_type: str = "n1-standard-4", disk_size_gb: int = 100, image_version: str = "2.1-debian11", ) -> dict[str, Any]: """Create a new Dataproc cluster.""" try: loop = asyncio.get_event_loop() client = self._get_cluster_client(region) # Configure cluster cluster_config = types.ClusterConfig( master_config=types.InstanceGroupConfig( num_instances=1, machine_type_uri=machine_type, disk_config=types.DiskConfig( boot_disk_type="pd-standard", boot_disk_size_gb=disk_size_gb ), ), worker_config=types.InstanceGroupConfig( num_instances=num_instances, machine_type_uri=machine_type, disk_config=types.DiskConfig( boot_disk_type="pd-standard", boot_disk_size_gb=disk_size_gb ), ), software_config=types.SoftwareConfig(image_version=image_version), ) cluster = types.Cluster( project_id=project_id, cluster_name=cluster_name, config=cluster_config ) request = types.CreateClusterRequest( project_id=project_id, region=region, cluster=cluster ) # Create cluster (this is a long-running operation) operation = await loop.run_in_executor(None, client.create_cluster, request) operation_name = getattr(operation, "name", str(operation)) return { "operation_name": operation_name, "cluster_name": cluster_name, "status": "CREATING", "message": f"Cluster creation initiated. Operation: {operation_name}", } except Exception as e: logger.error("Failed to create cluster", error=str(e)) raise
- Helper function used by create_cluster handler to resolve project_id and region from inputs or gcloud defaults, returning error string if unresolved.def resolve_project_and_region( project_id: str | None, region: str | None ) -> tuple[str, str] | str: """Resolve project_id and region from parameters or gcloud config defaults. Returns: Tuple of (project_id, region) if successful, error message string if failed. """ # Resolve project_id if project_id is None: project_id = get_default_project() if project_id is None: return "Error: No project_id provided and no default project configured in gcloud. Run 'gcloud config set project PROJECT_ID' or provide project_id parameter." # Resolve region if region is None: region = get_default_region() if region is None: return "Error: No region provided and no default region configured in gcloud. Run 'gcloud config set compute/region REGION' or provide region parameter." return project_id, region