Skip to main content
Glama

create_batch_job

Create and submit Dataproc batch jobs for Spark, PySpark, or Spark SQL workloads on Google Cloud. Configure job parameters, dependencies, and execution environment.

Instructions

Create a Dataproc batch job.

Args: project_id: Google Cloud project ID region: Dataproc region batch_id: Unique identifier for the batch job job_type: Type of batch job (spark, pyspark, spark_sql) main_file: Main file/class for the job args: Job arguments jar_files: JAR files to include properties: Job properties service_account: Service account email network_uri: Network URI subnetwork_uri: Subnetwork URI

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
project_idYes
regionYes
batch_idYes
job_typeYes
main_fileYes
argsNo
jar_filesNo
propertiesNo
service_accountNo
network_uriNo
subnetwork_uriNo

Implementation Reference

  • MCP tool handler and registration for 'create_batch_job'. This is the entry point for the tool, decorated with @mcp.tool(), defining the input schema via type hints and parameters, and delegating execution to DataprocBatchClient.
    @mcp.tool() async def create_batch_job( project_id: str, region: str, batch_id: str, job_type: str, main_file: str, args: list[str] | None = None, jar_files: list[str] | None = None, properties: dict[str, str] | None = None, service_account: str | None = None, network_uri: str | None = None, subnetwork_uri: str | None = None, ) -> str: """Create a Dataproc batch job. Args: project_id: Google Cloud project ID region: Dataproc region batch_id: Unique identifier for the batch job job_type: Type of batch job (spark, pyspark, spark_sql) main_file: Main file/class for the job args: Job arguments jar_files: JAR files to include properties: Job properties service_account: Service account email network_uri: Network URI subnetwork_uri: Subnetwork URI """ batch_client = DataprocBatchClient() try: result = await batch_client.create_batch_job( project_id=project_id, region=region, batch_id=batch_id, job_type=job_type, main_file=main_file, args=args or [], jar_files=jar_files or [], properties=properties or {}, service_account=service_account, network_uri=network_uri, subnetwork_uri=subnetwork_uri, ) return str(result) except Exception as e: logger.error("Failed to create batch job", error=str(e)) return f"Error: {str(e)}"
  • Core implementation of batch job creation in DataprocBatchClient class. Handles Google Cloud Dataproc API calls to create the batch based on job_type (spark, pyspark, spark_sql), configures runtime, environment, and submits the request.
    async def create_batch_job( self, project_id: str, region: str, batch_id: str, job_type: str, main_file: str, args: list[str] | None = None, jar_files: list[str] | None = None, properties: dict[str, str] | None = None, service_account: str | None = None, network_uri: str | None = None, subnetwork_uri: str | None = None, ) -> dict[str, Any]: """Create a batch job.""" try: loop = asyncio.get_event_loop() client = self._get_batch_client(region) args = args or [] jar_files = jar_files or [] properties = properties or {} # Configure runtime runtime_config = types.RuntimeConfig() if properties: runtime_config.properties = properties # Configure environment environment_config = types.EnvironmentConfig() if service_account or network_uri or subnetwork_uri: execution_config = types.ExecutionConfig() if service_account: execution_config.service_account = service_account if network_uri: execution_config.network_uri = network_uri if subnetwork_uri: execution_config.subnetwork_uri = subnetwork_uri environment_config.execution_config = execution_config # Configure job based on type if job_type == "spark": job_config = types.SparkBatch( main_class=main_file, jar_file_uris=jar_files, args=args ) batch = types.Batch( runtime_config=runtime_config, environment_config=environment_config, spark_batch=job_config, ) elif job_type == "pyspark": job_config = types.PySparkBatch( main_python_file_uri=main_file, args=args, jar_file_uris=jar_files ) batch = types.Batch( runtime_config=runtime_config, environment_config=environment_config, pyspark_batch=job_config, ) elif job_type == "spark_sql": job_config = types.SparkSqlBatch( query_file_uri=main_file, jar_file_uris=jar_files ) batch = types.Batch( runtime_config=runtime_config, environment_config=environment_config, spark_sql_batch=job_config, ) else: raise ValueError(f"Unsupported batch job type: {job_type}") request = types.CreateBatchRequest( parent=f"projects/{project_id}/locations/{region}", batch=batch, batch_id=batch_id, ) operation = await loop.run_in_executor(None, client.create_batch, request) operation_name = getattr(operation, "name", str(operation)) return { "operation_name": operation_name, "batch_id": batch_id, "job_type": job_type, "status": "CREATING", "message": f"Batch job creation initiated. Operation: {operation_name}", } except Exception as e: logger.error("Failed to create batch job", error=str(e)) raise

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/warrenzhu25/dataproc-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server