create_batch_job
Submit and manage batch jobs on Google Cloud Dataproc. Define job types (Spark, PySpark, Spark SQL), include main files, JARs, arguments, and configure properties for efficient job execution.
Instructions
Create a Dataproc batch job.
Args:
project_id: Google Cloud project ID
region: Dataproc region
batch_id: Unique identifier for the batch job
job_type: Type of batch job (spark, pyspark, spark_sql)
main_file: Main file/class for the job
args: Job arguments
jar_files: JAR files to include
properties: Job properties
service_account: Service account email
network_uri: Network URI
subnetwork_uri: Subnetwork URI
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| args | No | ||
| batch_id | Yes | ||
| jar_files | No | ||
| job_type | Yes | ||
| main_file | Yes | ||
| network_uri | No | ||
| project_id | Yes | ||
| properties | No | ||
| region | Yes | ||
| service_account | No | ||
| subnetwork_uri | No |
Implementation Reference
- MCP tool handler and registration for create_batch_job via @mcp.tool() decorator. Defines input schema via type hints and docstring. Delegates execution to DataprocBatchClient.@mcp.tool() async def create_batch_job( project_id: str, region: str, batch_id: str, job_type: str, main_file: str, args: list[str] | None = None, jar_files: list[str] | None = None, properties: dict[str, str] | None = None, service_account: str | None = None, network_uri: str | None = None, subnetwork_uri: str | None = None, ) -> str: """Create a Dataproc batch job. Args: project_id: Google Cloud project ID region: Dataproc region batch_id: Unique identifier for the batch job job_type: Type of batch job (spark, pyspark, spark_sql) main_file: Main file/class for the job args: Job arguments jar_files: JAR files to include properties: Job properties service_account: Service account email network_uri: Network URI subnetwork_uri: Subnetwork URI """ batch_client = DataprocBatchClient() try: result = await batch_client.create_batch_job( project_id=project_id, region=region, batch_id=batch_id, job_type=job_type, main_file=main_file, args=args or [], jar_files=jar_files or [], properties=properties or {}, service_account=service_account, network_uri=network_uri, subnetwork_uri=subnetwork_uri, ) return str(result) except Exception as e: logger.error("Failed to create batch job", error=str(e)) return f"Error: {str(e)}"
- Core helper method in DataprocBatchClient that implements the batch job creation logic, constructing protobuf messages based on job_type and invoking the Google Cloud Dataproc API.async def create_batch_job( self, project_id: str, region: str, batch_id: str, job_type: str, main_file: str, args: list[str] | None = None, jar_files: list[str] | None = None, properties: dict[str, str] | None = None, service_account: str | None = None, network_uri: str | None = None, subnetwork_uri: str | None = None, ) -> dict[str, Any]: """Create a batch job.""" try: loop = asyncio.get_event_loop() client = self._get_batch_client(region) args = args or [] jar_files = jar_files or [] properties = properties or {} # Configure runtime runtime_config = types.RuntimeConfig() if properties: runtime_config.properties = properties # Configure environment environment_config = types.EnvironmentConfig() if service_account or network_uri or subnetwork_uri: execution_config = types.ExecutionConfig() if service_account: execution_config.service_account = service_account if network_uri: execution_config.network_uri = network_uri if subnetwork_uri: execution_config.subnetwork_uri = subnetwork_uri environment_config.execution_config = execution_config # Configure job based on type if job_type == "spark": job_config = types.SparkBatch( main_class=main_file, jar_file_uris=jar_files, args=args ) batch = types.Batch( runtime_config=runtime_config, environment_config=environment_config, spark_batch=job_config, ) elif job_type == "pyspark": job_config = types.PySparkBatch( main_python_file_uri=main_file, args=args, jar_file_uris=jar_files ) batch = types.Batch( runtime_config=runtime_config, environment_config=environment_config, pyspark_batch=job_config, ) elif job_type == "spark_sql": job_config = types.SparkSqlBatch( query_file_uri=main_file, jar_file_uris=jar_files ) batch = types.Batch( runtime_config=runtime_config, environment_config=environment_config, spark_sql_batch=job_config, ) else: raise ValueError(f"Unsupported batch job type: {job_type}") request = types.CreateBatchRequest( parent=f"projects/{project_id}/locations/{region}", batch=batch, batch_id=batch_id, ) operation = await loop.run_in_executor(None, client.create_batch, request) operation_name = getattr(operation, "name", str(operation)) return { "operation_name": operation_name, "batch_id": batch_id, "job_type": job_type, "status": "CREATING", "message": f"Batch job creation initiated. Operation: {operation_name}", } except Exception as e: logger.error("Failed to create batch job", error=str(e)) raise