create_cluster
Set up a new Dataproc cluster by specifying project ID, region, cluster name, worker instances, machine type, disk size, and image version. Streamline big data processing on Google Cloud.
Instructions
Create a new Dataproc cluster.
Args:
project_id: Google Cloud project ID
region: Dataproc region
cluster_name: Name for the new cluster
num_instances: Number of worker instances
machine_type: Machine type for cluster nodes
disk_size_gb: Boot disk size in GB
image_version: Dataproc image version
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| cluster_name | Yes | ||
| disk_size_gb | No | ||
| image_version | No | 2.1-debian11 | |
| machine_type | No | n1-standard-4 | |
| num_instances | No | ||
| project_id | Yes | ||
| region | Yes |
Implementation Reference
- src/dataproc_mcp_server/server.py:70-110 (handler)MCP tool handler for 'create_cluster'. Decorated with @mcp.tool() which registers it and defines the input schema via type hints. Implements the tool logic by resolving project/region and calling DataprocClient.create_cluster.@mcp.tool() async def create_cluster( cluster_name: str, project_id: str | None = None, region: str | None = None, num_instances: int = 2, machine_type: str = "n1-standard-4", disk_size_gb: int = 100, image_version: str = "2.1-debian11", ) -> str: """Create a new Dataproc cluster. Args: cluster_name: Name for the new cluster project_id: Google Cloud project ID (optional, uses gcloud config default) region: Dataproc region (optional, uses gcloud config default) num_instances: Number of worker instances machine_type: Machine type for cluster nodes disk_size_gb: Boot disk size in GB image_version: Dataproc image version """ resolved = resolve_project_and_region(project_id, region) if isinstance(resolved, str): # Error message return resolved project_id, region = resolved client = DataprocClient() try: result = await client.create_cluster( project_id=project_id, region=region, cluster_name=cluster_name, num_instances=num_instances, machine_type=machine_type, disk_size_gb=disk_size_gb, image_version=image_version, ) return str(result) except Exception as e: logger.error("Failed to create cluster", error=str(e)) return f"Error: {str(e)}"
- Helper function in DataprocClient that constructs the ClusterConfig and CreateClusterRequest, then calls the Google Cloud Dataproc API to initiate cluster creation.async def create_cluster( self, project_id: str, region: str, cluster_name: str, num_instances: int = 2, machine_type: str = "n1-standard-4", disk_size_gb: int = 100, image_version: str = "2.1-debian11", ) -> dict[str, Any]: """Create a new Dataproc cluster.""" try: loop = asyncio.get_event_loop() client = self._get_cluster_client(region) # Configure cluster cluster_config = types.ClusterConfig( master_config=types.InstanceGroupConfig( num_instances=1, machine_type_uri=machine_type, disk_config=types.DiskConfig( boot_disk_type="pd-standard", boot_disk_size_gb=disk_size_gb ), ), worker_config=types.InstanceGroupConfig( num_instances=num_instances, machine_type_uri=machine_type, disk_config=types.DiskConfig( boot_disk_type="pd-standard", boot_disk_size_gb=disk_size_gb ), ), software_config=types.SoftwareConfig(image_version=image_version), ) cluster = types.Cluster( project_id=project_id, cluster_name=cluster_name, config=cluster_config ) request = types.CreateClusterRequest( project_id=project_id, region=region, cluster=cluster ) # Create cluster (this is a long-running operation) operation = await loop.run_in_executor(None, client.create_cluster, request) operation_name = getattr(operation, "name", str(operation)) return { "operation_name": operation_name, "cluster_name": cluster_name, "status": "CREATING", "message": f"Cluster creation initiated. Operation: {operation_name}", } except Exception as e: logger.error("Failed to create cluster", error=str(e)) raise