Skip to main content
Glama
warrenzhu25

Dataproc MCP Server

by warrenzhu25

get_batch_job

Retrieve detailed information about a specific Dataproc batch job using project ID, region, and batch ID parameters to monitor job status and configuration.

Instructions

Get details of a specific batch job.

Args:
    project_id: Google Cloud project ID
    region: Dataproc region
    batch_id: Batch job ID

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
project_idYes
regionYes
batch_idYes

Implementation Reference

  • MCP tool handler for 'get_batch_job'. Creates DataprocBatchClient and calls its get_batch_job method to retrieve and format batch job details.
    @mcp.tool()
    async def get_batch_job(project_id: str, region: str, batch_id: str) -> str:
        """Get details of a specific batch job.
    
        Args:
            project_id: Google Cloud project ID
            region: Dataproc region
            batch_id: Batch job ID
        """
        batch_client = DataprocBatchClient()
        try:
            result = await batch_client.get_batch_job(project_id, region, batch_id)
            return str(result)
        except Exception as e:
            logger.error("Failed to get batch job", error=str(e))
            return f"Error: {str(e)}"
  • Core helper method in DataprocBatchClient that calls the Dataproc BatchController API to get batch details and constructs a comprehensive dictionary with job config, runtime info, environment config, state history, etc.
    async def get_batch_job(
        self, project_id: str, region: str, batch_id: str
    ) -> dict[str, Any]:
        """Get details of a specific batch job."""
        try:
            loop = asyncio.get_event_loop()
            client = self._get_batch_client(region)
    
            request = types.GetBatchRequest(
                name=f"projects/{project_id}/locations/{region}/batches/{batch_id}"
            )
    
            batch = await loop.run_in_executor(None, client.get_batch, request)
    
            # Extract runtime info if available
            runtime_info = {}
            if batch.runtime_info:
                runtime_info = {
                    "endpoints": dict(batch.runtime_info.endpoints)
                    if batch.runtime_info.endpoints
                    else {},
                    "output_uri": batch.runtime_info.output_uri
                    if batch.runtime_info.output_uri
                    else None,
                    "diagnostic_output_uri": batch.runtime_info.diagnostic_output_uri
                    if batch.runtime_info.diagnostic_output_uri
                    else None,
                }
    
                # Add usage information if available
                if batch.runtime_info.approximate_usage:
                    runtime_info["approximate_usage"] = {
                        "milli_dcu_seconds": str(
                            batch.runtime_info.approximate_usage.milli_dcu_seconds
                        ),
                        "shuffle_storage_gb_seconds": str(
                            batch.runtime_info.approximate_usage.shuffle_storage_gb_seconds
                        ),
                    }
    
                if batch.runtime_info.current_usage:
                    runtime_info["current_usage"] = {
                        "milli_dcu": str(batch.runtime_info.current_usage.milli_dcu),
                        "shuffle_storage_gb": str(
                            batch.runtime_info.current_usage.shuffle_storage_gb
                        ),
                    }
    
            # Extract job configuration details
            job_config: dict[str, Any] = {}
            job_type = self._get_batch_job_type(batch)
    
            if batch.spark_batch:
                job_config = {
                    "main_class": batch.spark_batch.main_class
                    if batch.spark_batch.main_class
                    else None,
                    "main_jar_file_uri": batch.spark_batch.main_jar_file_uri
                    if batch.spark_batch.main_jar_file_uri
                    else None,
                    "jar_file_uris": list(batch.spark_batch.jar_file_uris)
                    if batch.spark_batch.jar_file_uris
                    else [],
                    "file_uris": list(batch.spark_batch.file_uris)
                    if batch.spark_batch.file_uris
                    else [],
                    "archive_uris": list(batch.spark_batch.archive_uris)
                    if batch.spark_batch.archive_uris
                    else [],
                    "args": list(batch.spark_batch.args)
                    if batch.spark_batch.args
                    else [],
                }
            elif batch.pyspark_batch:
                job_config = {
                    "main_python_file_uri": batch.pyspark_batch.main_python_file_uri,
                    "python_file_uris": list(batch.pyspark_batch.python_file_uris)
                    if batch.pyspark_batch.python_file_uris
                    else [],
                    "jar_file_uris": list(batch.pyspark_batch.jar_file_uris)
                    if batch.pyspark_batch.jar_file_uris
                    else [],
                    "file_uris": list(batch.pyspark_batch.file_uris)
                    if batch.pyspark_batch.file_uris
                    else [],
                    "archive_uris": list(batch.pyspark_batch.archive_uris)
                    if batch.pyspark_batch.archive_uris
                    else [],
                    "args": list(batch.pyspark_batch.args)
                    if batch.pyspark_batch.args
                    else [],
                }
            elif batch.spark_sql_batch:
                job_config = {
                    "query_file_uri": batch.spark_sql_batch.query_file_uri,
                    "query_variables": dict(batch.spark_sql_batch.query_variables)
                    if batch.spark_sql_batch.query_variables
                    else {},
                    "jar_file_uris": list(batch.spark_sql_batch.jar_file_uris)
                    if batch.spark_sql_batch.jar_file_uris
                    else [],
                }
            elif batch.spark_r_batch:
                job_config = {
                    "main_r_file_uri": batch.spark_r_batch.main_r_file_uri,
                    "file_uris": list(batch.spark_r_batch.file_uris)
                    if batch.spark_r_batch.file_uris
                    else [],
                    "archive_uris": list(batch.spark_r_batch.archive_uris)
                    if batch.spark_r_batch.archive_uris
                    else [],
                    "args": list(batch.spark_r_batch.args)
                    if batch.spark_r_batch.args
                    else [],
                }
    
            # Extract runtime config details
            runtime_config = {}
            if batch.runtime_config:
                runtime_config = {
                    "version": batch.runtime_config.version
                    if batch.runtime_config.version
                    else None,
                    "container_image": batch.runtime_config.container_image
                    if batch.runtime_config.container_image
                    else None,
                    "properties": dict(batch.runtime_config.properties)
                    if batch.runtime_config.properties
                    else {},
                }
    
            # Extract environment config details
            environment_config: dict[str, Any] = {}
            if batch.environment_config:
                environment_config = {
                    "execution_config": {},
                    "peripherals_config": {},
                }
    
                if batch.environment_config.execution_config:
                    exec_config = batch.environment_config.execution_config
                    environment_config["execution_config"] = {
                        "service_account": exec_config.service_account
                        if exec_config.service_account
                        else None,
                        "network_uri": exec_config.network_uri
                        if exec_config.network_uri
                        else None,
                        "subnetwork_uri": exec_config.subnetwork_uri
                        if exec_config.subnetwork_uri
                        else None,
                        "network_tags": list(exec_config.network_tags)
                        if exec_config.network_tags
                        else [],
                        "kms_key": exec_config.kms_key if exec_config.kms_key else None,
                    }
    
                if batch.environment_config.peripherals_config:
                    periph_config = batch.environment_config.peripherals_config
                    environment_config["peripherals_config"] = {
                        "metastore_service": periph_config.metastore_service
                        if periph_config.metastore_service
                        else None,
                        "spark_history_server_config": {},
                    }
    
                    if periph_config.spark_history_server_config:
                        environment_config["peripherals_config"][
                            "spark_history_server_config"
                        ] = {
                            "dataproc_cluster": periph_config.spark_history_server_config.dataproc_cluster
                            if periph_config.spark_history_server_config.dataproc_cluster
                            else None,
                        }
    
            return {
                "name": batch.name,
                "batch_id": batch.name.split("/")[-1],
                "uuid": batch.uuid if batch.uuid else None,
                "state": batch.state.name,
                "state_message": batch.state_message,
                "state_time": batch.state_time.isoformat()
                if batch.state_time
                else None,
                "create_time": batch.create_time.isoformat()
                if batch.create_time
                else None,
                "creator": batch.creator if batch.creator else None,
                "labels": dict(batch.labels) if batch.labels else {},
                "job_type": job_type,
                "job_config": job_config,
                "runtime_config": runtime_config,
                "environment_config": environment_config,
                "runtime_info": runtime_info,
                "operation": batch.operation if batch.operation else None,
                "state_history": [
                    {
                        "state": state.state.name,
                        "state_message": state.state_message,
                        "state_start_time": state.state_start_time.isoformat()
                        if state.state_start_time
                        else None,
                    }
                    for state in batch.state_history
                ],
            }
    
        except Exception as e:
            logger.error("Failed to get batch job", error=str(e))
            raise
  • Helper function used by get_batch_job to determine the job type (spark, pyspark, etc.) from the batch object.
    def _get_batch_job_type(self, batch: types.Batch) -> str:
        """Extract job type from batch object."""
        if batch.spark_batch:
            return "spark"
        elif batch.pyspark_batch:
            return "pyspark"
        elif batch.spark_sql_batch:
            return "spark_sql"
        elif batch.spark_r_batch:
            return "spark_r"
        else:
            return "unknown"

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/warrenzhu25/dataproc-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server