Skip to main content
Glama
warrenzhu25

Dataproc MCP Server

by warrenzhu25

get_cluster

Retrieve detailed information about a specific Dataproc cluster, including its configuration, status, and operational metrics, to monitor and manage Google Cloud data processing resources.

Instructions

Get details of a specific Dataproc cluster.

Args:
    cluster_name: Name of the cluster
    project_id: Google Cloud project ID (optional, uses gcloud config default)
    region: Dataproc region (optional, uses gcloud config default)

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
cluster_nameYes
project_idNo
regionNo

Implementation Reference

  • MCP tool registration and handler for 'get_cluster'. Resolves project/region, creates DataprocClient, calls client.get_cluster, handles errors, and returns stringified result.
    @mcp.tool()
    async def get_cluster(
        cluster_name: str, project_id: str | None = None, region: str | None = None
    ) -> str:
        """Get details of a specific Dataproc cluster.
    
        Args:
            cluster_name: Name of the cluster
            project_id: Google Cloud project ID (optional, uses gcloud config default)
            region: Dataproc region (optional, uses gcloud config default)
        """
        resolved = resolve_project_and_region(project_id, region)
        if isinstance(resolved, str):  # Error message
            return resolved
        project_id, region = resolved
    
        client = DataprocClient()
        try:
            result = await client.get_cluster(project_id, region, cluster_name)
            return str(result)
        except Exception as e:
            logger.error("Failed to get cluster", error=str(e))
            return f"Error: {str(e)}"
  • Helper method in DataprocClient that performs the actual Google Cloud Dataproc get_cluster API call, processes the response into a structured dict.
    async def get_cluster(
        self, project_id: str, region: str, cluster_name: str
    ) -> dict[str, Any]:
        """Get details of a specific cluster."""
        try:
            loop = asyncio.get_event_loop()
            client = self._get_cluster_client(region)
    
            request = types.GetClusterRequest(
                project_id=project_id, region=region, cluster_name=cluster_name
            )
    
            cluster = await loop.run_in_executor(None, client.get_cluster, request)
    
            return {
                "name": cluster.cluster_name,
                "status": cluster.status.state.name,
                "status_detail": cluster.status.detail,
                "num_instances": cluster.config.worker_config.num_instances,
                "master_machine_type": cluster.config.master_config.machine_type_uri.split(
                    "/"
                )[-1],
                "worker_machine_type": cluster.config.worker_config.machine_type_uri.split(
                    "/"
                )[-1],
                "disk_size_gb": cluster.config.master_config.disk_config.boot_disk_size_gb,
                "image_version": cluster.config.software_config.image_version,
                "creation_time": cluster.status.state_start_time.isoformat()
                if cluster.status.state_start_time
                else None,
                "zone": cluster.config.gce_cluster_config.zone_uri.split("/")[-1]
                if cluster.config.gce_cluster_config.zone_uri
                else None,
                "metrics": {
                    "hdfs_capacity_mb": getattr(
                        cluster.metrics.hdfs_metrics, "capacity_mb", None
                    )
                    if cluster.metrics and cluster.metrics.hdfs_metrics
                    else None,
                    "yarn_allocated_memory_mb": getattr(
                        cluster.metrics.yarn_metrics, "allocated_memory_mb", None
                    )
                    if cluster.metrics and cluster.metrics.yarn_metrics
                    else None,
                },
            }
    
        except Exception as e:
            logger.error("Failed to get cluster", error=str(e))
            raise
  • Supporting method to create the regional ClusterControllerClient used by get_cluster.
    def _get_cluster_client(self, region: str) -> dataproc_v1.ClusterControllerClient:
        """Get cluster controller client with regional endpoint."""
        # Configure regional endpoint
        regional_endpoint = f"{region}-dataproc.googleapis.com"
        client_opts = client_options.ClientOptions(api_endpoint=regional_endpoint)
    
        return dataproc_v1.ClusterControllerClient(
            credentials=self._credentials, client_options=client_opts
        )
  • Input schema defined by function parameters and type annotations for the MCP tool.
    async def get_cluster(
        cluster_name: str, project_id: str | None = None, region: str | None = None
    ) -> str:
        """Get details of a specific Dataproc cluster.
    
        Args:
            cluster_name: Name of the cluster
            project_id: Google Cloud project ID (optional, uses gcloud config default)
            region: Dataproc region (optional, uses gcloud config default)
        """

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/warrenzhu25/dataproc-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server