create_batch_job
Create and submit Dataproc batch jobs for Spark, PySpark, or Spark SQL workloads on Google Cloud. Configure job parameters, dependencies, and execution environment.
Instructions
Create a Dataproc batch job.
Args:
project_id: Google Cloud project ID
region: Dataproc region
batch_id: Unique identifier for the batch job
job_type: Type of batch job (spark, pyspark, spark_sql)
main_file: Main file/class for the job
args: Job arguments
jar_files: JAR files to include
properties: Job properties
service_account: Service account email
network_uri: Network URI
subnetwork_uri: Subnetwork URI
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| project_id | Yes | ||
| region | Yes | ||
| batch_id | Yes | ||
| job_type | Yes | ||
| main_file | Yes | ||
| args | No | ||
| jar_files | No | ||
| properties | No | ||
| service_account | No | ||
| network_uri | No | ||
| subnetwork_uri | No |
Implementation Reference
- MCP tool handler and registration for 'create_batch_job'. This is the entry point for the tool, decorated with @mcp.tool(), defining the input schema via type hints and parameters, and delegating execution to DataprocBatchClient.@mcp.tool() async def create_batch_job( project_id: str, region: str, batch_id: str, job_type: str, main_file: str, args: list[str] | None = None, jar_files: list[str] | None = None, properties: dict[str, str] | None = None, service_account: str | None = None, network_uri: str | None = None, subnetwork_uri: str | None = None, ) -> str: """Create a Dataproc batch job. Args: project_id: Google Cloud project ID region: Dataproc region batch_id: Unique identifier for the batch job job_type: Type of batch job (spark, pyspark, spark_sql) main_file: Main file/class for the job args: Job arguments jar_files: JAR files to include properties: Job properties service_account: Service account email network_uri: Network URI subnetwork_uri: Subnetwork URI """ batch_client = DataprocBatchClient() try: result = await batch_client.create_batch_job( project_id=project_id, region=region, batch_id=batch_id, job_type=job_type, main_file=main_file, args=args or [], jar_files=jar_files or [], properties=properties or {}, service_account=service_account, network_uri=network_uri, subnetwork_uri=subnetwork_uri, ) return str(result) except Exception as e: logger.error("Failed to create batch job", error=str(e)) return f"Error: {str(e)}"
- Core implementation of batch job creation in DataprocBatchClient class. Handles Google Cloud Dataproc API calls to create the batch based on job_type (spark, pyspark, spark_sql), configures runtime, environment, and submits the request.async def create_batch_job( self, project_id: str, region: str, batch_id: str, job_type: str, main_file: str, args: list[str] | None = None, jar_files: list[str] | None = None, properties: dict[str, str] | None = None, service_account: str | None = None, network_uri: str | None = None, subnetwork_uri: str | None = None, ) -> dict[str, Any]: """Create a batch job.""" try: loop = asyncio.get_event_loop() client = self._get_batch_client(region) args = args or [] jar_files = jar_files or [] properties = properties or {} # Configure runtime runtime_config = types.RuntimeConfig() if properties: runtime_config.properties = properties # Configure environment environment_config = types.EnvironmentConfig() if service_account or network_uri or subnetwork_uri: execution_config = types.ExecutionConfig() if service_account: execution_config.service_account = service_account if network_uri: execution_config.network_uri = network_uri if subnetwork_uri: execution_config.subnetwork_uri = subnetwork_uri environment_config.execution_config = execution_config # Configure job based on type if job_type == "spark": job_config = types.SparkBatch( main_class=main_file, jar_file_uris=jar_files, args=args ) batch = types.Batch( runtime_config=runtime_config, environment_config=environment_config, spark_batch=job_config, ) elif job_type == "pyspark": job_config = types.PySparkBatch( main_python_file_uri=main_file, args=args, jar_file_uris=jar_files ) batch = types.Batch( runtime_config=runtime_config, environment_config=environment_config, pyspark_batch=job_config, ) elif job_type == "spark_sql": job_config = types.SparkSqlBatch( query_file_uri=main_file, jar_file_uris=jar_files ) batch = types.Batch( runtime_config=runtime_config, environment_config=environment_config, spark_sql_batch=job_config, ) else: raise ValueError(f"Unsupported batch job type: {job_type}") request = types.CreateBatchRequest( parent=f"projects/{project_id}/locations/{region}", batch=batch, batch_id=batch_id, ) operation = await loop.run_in_executor(None, client.create_batch, request) operation_name = getattr(operation, "name", str(operation)) return { "operation_name": operation_name, "batch_id": batch_id, "job_type": job_type, "status": "CREATING", "message": f"Batch job creation initiated. Operation: {operation_name}", } except Exception as e: logger.error("Failed to create batch job", error=str(e)) raise