Skip to main content
Glama
warrenzhu25

Dataproc MCP Server

by warrenzhu25

create_cluster

Create a new Dataproc cluster in Google Cloud by specifying cluster name, worker instances, machine type, disk size, and image version.

Instructions

Create a new Dataproc cluster.

Args:
    cluster_name: Name for the new cluster
    project_id: Google Cloud project ID (optional, uses gcloud config default)
    region: Dataproc region (optional, uses gcloud config default)
    num_instances: Number of worker instances
    machine_type: Machine type for cluster nodes
    disk_size_gb: Boot disk size in GB
    image_version: Dataproc image version

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
cluster_nameYes
project_idNo
regionNo
num_instancesNo
machine_typeNon1-standard-4
disk_size_gbNo
image_versionNo2.1-debian11

Implementation Reference

  • MCP tool handler for 'create_cluster' decorated with @mcp.tool(). Resolves project and region, instantiates DataprocClient, and delegates to its create_cluster method.
    @mcp.tool()
    async def create_cluster(
        cluster_name: str,
        project_id: str | None = None,
        region: str | None = None,
        num_instances: int = 2,
        machine_type: str = "n1-standard-4",
        disk_size_gb: int = 100,
        image_version: str = "2.1-debian11",
    ) -> str:
        """Create a new Dataproc cluster.
    
        Args:
            cluster_name: Name for the new cluster
            project_id: Google Cloud project ID (optional, uses gcloud config default)
            region: Dataproc region (optional, uses gcloud config default)
            num_instances: Number of worker instances
            machine_type: Machine type for cluster nodes
            disk_size_gb: Boot disk size in GB
            image_version: Dataproc image version
        """
        resolved = resolve_project_and_region(project_id, region)
        if isinstance(resolved, str):  # Error message
            return resolved
        project_id, region = resolved
    
        client = DataprocClient()
        try:
            result = await client.create_cluster(
                project_id=project_id,
                region=region,
                cluster_name=cluster_name,
                num_instances=num_instances,
                machine_type=machine_type,
                disk_size_gb=disk_size_gb,
                image_version=image_version,
            )
            return str(result)
        except Exception as e:
            logger.error("Failed to create cluster", error=str(e))
            return f"Error: {str(e)}"
  • Core implementation of cluster creation in DataprocClient class. Constructs ClusterConfig using provided parameters and invokes the Google Cloud Dataproc API's create_cluster method via run_in_executor.
    async def create_cluster(
        self,
        project_id: str,
        region: str,
        cluster_name: str,
        num_instances: int = 2,
        machine_type: str = "n1-standard-4",
        disk_size_gb: int = 100,
        image_version: str = "2.1-debian11",
    ) -> dict[str, Any]:
        """Create a new Dataproc cluster."""
        try:
            loop = asyncio.get_event_loop()
            client = self._get_cluster_client(region)
    
            # Configure cluster
            cluster_config = types.ClusterConfig(
                master_config=types.InstanceGroupConfig(
                    num_instances=1,
                    machine_type_uri=machine_type,
                    disk_config=types.DiskConfig(
                        boot_disk_type="pd-standard", boot_disk_size_gb=disk_size_gb
                    ),
                ),
                worker_config=types.InstanceGroupConfig(
                    num_instances=num_instances,
                    machine_type_uri=machine_type,
                    disk_config=types.DiskConfig(
                        boot_disk_type="pd-standard", boot_disk_size_gb=disk_size_gb
                    ),
                ),
                software_config=types.SoftwareConfig(image_version=image_version),
            )
    
            cluster = types.Cluster(
                project_id=project_id, cluster_name=cluster_name, config=cluster_config
            )
    
            request = types.CreateClusterRequest(
                project_id=project_id, region=region, cluster=cluster
            )
    
            # Create cluster (this is a long-running operation)
            operation = await loop.run_in_executor(None, client.create_cluster, request)
    
            operation_name = getattr(operation, "name", str(operation))
            return {
                "operation_name": operation_name,
                "cluster_name": cluster_name,
                "status": "CREATING",
                "message": f"Cluster creation initiated. Operation: {operation_name}",
            }
    
        except Exception as e:
            logger.error("Failed to create cluster", error=str(e))
            raise
  • Helper function used by create_cluster handler to resolve project_id and region from inputs or gcloud defaults, returning error string if unresolved.
    def resolve_project_and_region(
        project_id: str | None, region: str | None
    ) -> tuple[str, str] | str:
        """Resolve project_id and region from parameters or gcloud config defaults.
    
        Returns:
            Tuple of (project_id, region) if successful, error message string if failed.
        """
        # Resolve project_id
        if project_id is None:
            project_id = get_default_project()
            if project_id is None:
                return "Error: No project_id provided and no default project configured in gcloud. Run 'gcloud config set project PROJECT_ID' or provide project_id parameter."
    
        # Resolve region
        if region is None:
            region = get_default_region()
            if region is None:
                return "Error: No region provided and no default region configured in gcloud. Run 'gcloud config set compute/region REGION' or provide region parameter."
    
        return project_id, region

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/warrenzhu25/dataproc-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server