Skip to main content
Glama

get_batch_job

Retrieve detailed information about a specific batch job on Google Cloud Dataproc. Provide project ID, region, and batch ID to access job status and execution details.

Instructions

Get details of a specific batch job.

Args: project_id: Google Cloud project ID region: Dataproc region batch_id: Batch job ID

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
batch_idYes
project_idYes
regionYes

Implementation Reference

  • MCP tool registration and wrapper handler for 'get_batch_job'. Delegates to DataprocBatchClient.get_batch_job and serializes the result to string.
    @mcp.tool() async def get_batch_job(project_id: str, region: str, batch_id: str) -> str: """Get details of a specific batch job. Args: project_id: Google Cloud project ID region: Dataproc region batch_id: Batch job ID """ batch_client = DataprocBatchClient() try: result = await batch_client.get_batch_job(project_id, region, batch_id) return str(result) except Exception as e: logger.error("Failed to get batch job", error=str(e)) return f"Error: {str(e)}"
  • Core handler implementation that fetches the batch job from Google Cloud Dataproc API, extracts and formats comprehensive details including job config, runtime info, environment config, and state history.
    async def get_batch_job( self, project_id: str, region: str, batch_id: str ) -> dict[str, Any]: """Get details of a specific batch job.""" try: loop = asyncio.get_event_loop() client = self._get_batch_client(region) request = types.GetBatchRequest( name=f"projects/{project_id}/locations/{region}/batches/{batch_id}" ) batch = await loop.run_in_executor(None, client.get_batch, request) # Extract runtime info if available runtime_info = {} if batch.runtime_info: runtime_info = { "endpoints": dict(batch.runtime_info.endpoints) if batch.runtime_info.endpoints else {}, "output_uri": batch.runtime_info.output_uri if batch.runtime_info.output_uri else None, "diagnostic_output_uri": batch.runtime_info.diagnostic_output_uri if batch.runtime_info.diagnostic_output_uri else None, } # Add usage information if available if batch.runtime_info.approximate_usage: runtime_info["approximate_usage"] = { "milli_dcu_seconds": str( batch.runtime_info.approximate_usage.milli_dcu_seconds ), "shuffle_storage_gb_seconds": str( batch.runtime_info.approximate_usage.shuffle_storage_gb_seconds ), } if batch.runtime_info.current_usage: runtime_info["current_usage"] = { "milli_dcu": str(batch.runtime_info.current_usage.milli_dcu), "shuffle_storage_gb": str( batch.runtime_info.current_usage.shuffle_storage_gb ), } # Extract job configuration details job_config: dict[str, Any] = {} job_type = self._get_batch_job_type(batch) if batch.spark_batch: job_config = { "main_class": batch.spark_batch.main_class if batch.spark_batch.main_class else None, "main_jar_file_uri": batch.spark_batch.main_jar_file_uri if batch.spark_batch.main_jar_file_uri else None, "jar_file_uris": list(batch.spark_batch.jar_file_uris) if batch.spark_batch.jar_file_uris else [], "file_uris": list(batch.spark_batch.file_uris) if batch.spark_batch.file_uris else [], "archive_uris": list(batch.spark_batch.archive_uris) if batch.spark_batch.archive_uris else [], "args": list(batch.spark_batch.args) if batch.spark_batch.args else [], } elif batch.pyspark_batch: job_config = { "main_python_file_uri": batch.pyspark_batch.main_python_file_uri, "python_file_uris": list(batch.pyspark_batch.python_file_uris) if batch.pyspark_batch.python_file_uris else [], "jar_file_uris": list(batch.pyspark_batch.jar_file_uris) if batch.pyspark_batch.jar_file_uris else [], "file_uris": list(batch.pyspark_batch.file_uris) if batch.pyspark_batch.file_uris else [], "archive_uris": list(batch.pyspark_batch.archive_uris) if batch.pyspark_batch.archive_uris else [], "args": list(batch.pyspark_batch.args) if batch.pyspark_batch.args else [], } elif batch.spark_sql_batch: job_config = { "query_file_uri": batch.spark_sql_batch.query_file_uri, "query_variables": dict(batch.spark_sql_batch.query_variables) if batch.spark_sql_batch.query_variables else {}, "jar_file_uris": list(batch.spark_sql_batch.jar_file_uris) if batch.spark_sql_batch.jar_file_uris else [], } elif batch.spark_r_batch: job_config = { "main_r_file_uri": batch.spark_r_batch.main_r_file_uri, "file_uris": list(batch.spark_r_batch.file_uris) if batch.spark_r_batch.file_uris else [], "archive_uris": list(batch.spark_r_batch.archive_uris) if batch.spark_r_batch.archive_uris else [], "args": list(batch.spark_r_batch.args) if batch.spark_r_batch.args else [], } # Extract runtime config details runtime_config = {} if batch.runtime_config: runtime_config = { "version": batch.runtime_config.version if batch.runtime_config.version else None, "container_image": batch.runtime_config.container_image if batch.runtime_config.container_image else None, "properties": dict(batch.runtime_config.properties) if batch.runtime_config.properties else {}, } # Extract environment config details environment_config: dict[str, Any] = {} if batch.environment_config: environment_config = { "execution_config": {}, "peripherals_config": {}, } if batch.environment_config.execution_config: exec_config = batch.environment_config.execution_config environment_config["execution_config"] = { "service_account": exec_config.service_account if exec_config.service_account else None, "network_uri": exec_config.network_uri if exec_config.network_uri else None, "subnetwork_uri": exec_config.subnetwork_uri if exec_config.subnetwork_uri else None, "network_tags": list(exec_config.network_tags) if exec_config.network_tags else [], "kms_key": exec_config.kms_key if exec_config.kms_key else None, } if batch.environment_config.peripherals_config: periph_config = batch.environment_config.peripherals_config environment_config["peripherals_config"] = { "metastore_service": periph_config.metastore_service if periph_config.metastore_service else None, "spark_history_server_config": {}, } if periph_config.spark_history_server_config: environment_config["peripherals_config"][ "spark_history_server_config" ] = { "dataproc_cluster": periph_config.spark_history_server_config.dataproc_cluster if periph_config.spark_history_server_config.dataproc_cluster else None, } return { "name": batch.name, "batch_id": batch.name.split("/")[-1], "uuid": batch.uuid if batch.uuid else None, "state": batch.state.name, "state_message": batch.state_message, "state_time": batch.state_time.isoformat() if batch.state_time else None, "create_time": batch.create_time.isoformat() if batch.create_time else None, "creator": batch.creator if batch.creator else None, "labels": dict(batch.labels) if batch.labels else {}, "job_type": job_type, "job_config": job_config, "runtime_config": runtime_config, "environment_config": environment_config, "runtime_info": runtime_info, "operation": batch.operation if batch.operation else None, "state_history": [ { "state": state.state.name, "state_message": state.state_message, "state_start_time": state.state_start_time.isoformat() if state.state_start_time else None, } for state in batch.state_history ], } except Exception as e: logger.error("Failed to get batch job", error=str(e)) raise
  • Helper utility to determine the type of batch job (spark, pyspark, etc.) from the batch object. Used within get_batch_job.
    def _get_batch_job_type(self, batch: types.Batch) -> str: """Extract job type from batch object.""" if batch.spark_batch: return "spark" elif batch.pyspark_batch: return "pyspark" elif batch.spark_sql_batch: return "spark_sql" elif batch.spark_r_batch: return "spark_r" else: return "unknown"

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/warrenzhu25/dataproc-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server