get_batch_job
Retrieve detailed information about a specific batch job on Google Cloud Dataproc. Provide project ID, region, and batch ID to access job status and execution details.
Instructions
Get details of a specific batch job.
Args:
project_id: Google Cloud project ID
region: Dataproc region
batch_id: Batch job ID
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| batch_id | Yes | ||
| project_id | Yes | ||
| region | Yes |
Implementation Reference
- src/dataproc_mcp_server/server.py:337-352 (registration)MCP tool registration and wrapper handler for 'get_batch_job'. Delegates to DataprocBatchClient.get_batch_job and serializes the result to string.@mcp.tool() async def get_batch_job(project_id: str, region: str, batch_id: str) -> str: """Get details of a specific batch job. Args: project_id: Google Cloud project ID region: Dataproc region batch_id: Batch job ID """ batch_client = DataprocBatchClient() try: result = await batch_client.get_batch_job(project_id, region, batch_id) return str(result) except Exception as e: logger.error("Failed to get batch job", error=str(e)) return f"Error: {str(e)}"
- Core handler implementation that fetches the batch job from Google Cloud Dataproc API, extracts and formats comprehensive details including job config, runtime info, environment config, and state history.async def get_batch_job( self, project_id: str, region: str, batch_id: str ) -> dict[str, Any]: """Get details of a specific batch job.""" try: loop = asyncio.get_event_loop() client = self._get_batch_client(region) request = types.GetBatchRequest( name=f"projects/{project_id}/locations/{region}/batches/{batch_id}" ) batch = await loop.run_in_executor(None, client.get_batch, request) # Extract runtime info if available runtime_info = {} if batch.runtime_info: runtime_info = { "endpoints": dict(batch.runtime_info.endpoints) if batch.runtime_info.endpoints else {}, "output_uri": batch.runtime_info.output_uri if batch.runtime_info.output_uri else None, "diagnostic_output_uri": batch.runtime_info.diagnostic_output_uri if batch.runtime_info.diagnostic_output_uri else None, } # Add usage information if available if batch.runtime_info.approximate_usage: runtime_info["approximate_usage"] = { "milli_dcu_seconds": str( batch.runtime_info.approximate_usage.milli_dcu_seconds ), "shuffle_storage_gb_seconds": str( batch.runtime_info.approximate_usage.shuffle_storage_gb_seconds ), } if batch.runtime_info.current_usage: runtime_info["current_usage"] = { "milli_dcu": str(batch.runtime_info.current_usage.milli_dcu), "shuffle_storage_gb": str( batch.runtime_info.current_usage.shuffle_storage_gb ), } # Extract job configuration details job_config: dict[str, Any] = {} job_type = self._get_batch_job_type(batch) if batch.spark_batch: job_config = { "main_class": batch.spark_batch.main_class if batch.spark_batch.main_class else None, "main_jar_file_uri": batch.spark_batch.main_jar_file_uri if batch.spark_batch.main_jar_file_uri else None, "jar_file_uris": list(batch.spark_batch.jar_file_uris) if batch.spark_batch.jar_file_uris else [], "file_uris": list(batch.spark_batch.file_uris) if batch.spark_batch.file_uris else [], "archive_uris": list(batch.spark_batch.archive_uris) if batch.spark_batch.archive_uris else [], "args": list(batch.spark_batch.args) if batch.spark_batch.args else [], } elif batch.pyspark_batch: job_config = { "main_python_file_uri": batch.pyspark_batch.main_python_file_uri, "python_file_uris": list(batch.pyspark_batch.python_file_uris) if batch.pyspark_batch.python_file_uris else [], "jar_file_uris": list(batch.pyspark_batch.jar_file_uris) if batch.pyspark_batch.jar_file_uris else [], "file_uris": list(batch.pyspark_batch.file_uris) if batch.pyspark_batch.file_uris else [], "archive_uris": list(batch.pyspark_batch.archive_uris) if batch.pyspark_batch.archive_uris else [], "args": list(batch.pyspark_batch.args) if batch.pyspark_batch.args else [], } elif batch.spark_sql_batch: job_config = { "query_file_uri": batch.spark_sql_batch.query_file_uri, "query_variables": dict(batch.spark_sql_batch.query_variables) if batch.spark_sql_batch.query_variables else {}, "jar_file_uris": list(batch.spark_sql_batch.jar_file_uris) if batch.spark_sql_batch.jar_file_uris else [], } elif batch.spark_r_batch: job_config = { "main_r_file_uri": batch.spark_r_batch.main_r_file_uri, "file_uris": list(batch.spark_r_batch.file_uris) if batch.spark_r_batch.file_uris else [], "archive_uris": list(batch.spark_r_batch.archive_uris) if batch.spark_r_batch.archive_uris else [], "args": list(batch.spark_r_batch.args) if batch.spark_r_batch.args else [], } # Extract runtime config details runtime_config = {} if batch.runtime_config: runtime_config = { "version": batch.runtime_config.version if batch.runtime_config.version else None, "container_image": batch.runtime_config.container_image if batch.runtime_config.container_image else None, "properties": dict(batch.runtime_config.properties) if batch.runtime_config.properties else {}, } # Extract environment config details environment_config: dict[str, Any] = {} if batch.environment_config: environment_config = { "execution_config": {}, "peripherals_config": {}, } if batch.environment_config.execution_config: exec_config = batch.environment_config.execution_config environment_config["execution_config"] = { "service_account": exec_config.service_account if exec_config.service_account else None, "network_uri": exec_config.network_uri if exec_config.network_uri else None, "subnetwork_uri": exec_config.subnetwork_uri if exec_config.subnetwork_uri else None, "network_tags": list(exec_config.network_tags) if exec_config.network_tags else [], "kms_key": exec_config.kms_key if exec_config.kms_key else None, } if batch.environment_config.peripherals_config: periph_config = batch.environment_config.peripherals_config environment_config["peripherals_config"] = { "metastore_service": periph_config.metastore_service if periph_config.metastore_service else None, "spark_history_server_config": {}, } if periph_config.spark_history_server_config: environment_config["peripherals_config"][ "spark_history_server_config" ] = { "dataproc_cluster": periph_config.spark_history_server_config.dataproc_cluster if periph_config.spark_history_server_config.dataproc_cluster else None, } return { "name": batch.name, "batch_id": batch.name.split("/")[-1], "uuid": batch.uuid if batch.uuid else None, "state": batch.state.name, "state_message": batch.state_message, "state_time": batch.state_time.isoformat() if batch.state_time else None, "create_time": batch.create_time.isoformat() if batch.create_time else None, "creator": batch.creator if batch.creator else None, "labels": dict(batch.labels) if batch.labels else {}, "job_type": job_type, "job_config": job_config, "runtime_config": runtime_config, "environment_config": environment_config, "runtime_info": runtime_info, "operation": batch.operation if batch.operation else None, "state_history": [ { "state": state.state.name, "state_message": state.state_message, "state_start_time": state.state_start_time.isoformat() if state.state_start_time else None, } for state in batch.state_history ], } except Exception as e: logger.error("Failed to get batch job", error=str(e)) raise
- Helper utility to determine the type of batch job (spark, pyspark, etc.) from the batch object. Used within get_batch_job.def _get_batch_job_type(self, batch: types.Batch) -> str: """Extract job type from batch object.""" if batch.spark_batch: return "spark" elif batch.pyspark_batch: return "pyspark" elif batch.spark_sql_batch: return "spark_sql" elif batch.spark_r_batch: return "spark_r" else: return "unknown"