Skip to main content
Glama

create_batch_job

Submit and manage batch jobs on Google Cloud Dataproc. Define job types (Spark, PySpark, Spark SQL), include main files, JARs, arguments, and configure properties for efficient job execution.

Instructions

Create a Dataproc batch job.

Args: project_id: Google Cloud project ID region: Dataproc region batch_id: Unique identifier for the batch job job_type: Type of batch job (spark, pyspark, spark_sql) main_file: Main file/class for the job args: Job arguments jar_files: JAR files to include properties: Job properties service_account: Service account email network_uri: Network URI subnetwork_uri: Subnetwork URI

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
argsNo
batch_idYes
jar_filesNo
job_typeYes
main_fileYes
network_uriNo
project_idYes
propertiesNo
regionYes
service_accountNo
subnetwork_uriNo

Implementation Reference

  • MCP tool handler and registration for create_batch_job via @mcp.tool() decorator. Defines input schema via type hints and docstring. Delegates execution to DataprocBatchClient.
    @mcp.tool() async def create_batch_job( project_id: str, region: str, batch_id: str, job_type: str, main_file: str, args: list[str] | None = None, jar_files: list[str] | None = None, properties: dict[str, str] | None = None, service_account: str | None = None, network_uri: str | None = None, subnetwork_uri: str | None = None, ) -> str: """Create a Dataproc batch job. Args: project_id: Google Cloud project ID region: Dataproc region batch_id: Unique identifier for the batch job job_type: Type of batch job (spark, pyspark, spark_sql) main_file: Main file/class for the job args: Job arguments jar_files: JAR files to include properties: Job properties service_account: Service account email network_uri: Network URI subnetwork_uri: Subnetwork URI """ batch_client = DataprocBatchClient() try: result = await batch_client.create_batch_job( project_id=project_id, region=region, batch_id=batch_id, job_type=job_type, main_file=main_file, args=args or [], jar_files=jar_files or [], properties=properties or {}, service_account=service_account, network_uri=network_uri, subnetwork_uri=subnetwork_uri, ) return str(result) except Exception as e: logger.error("Failed to create batch job", error=str(e)) return f"Error: {str(e)}"
  • Core helper method in DataprocBatchClient that implements the batch job creation logic, constructing protobuf messages based on job_type and invoking the Google Cloud Dataproc API.
    async def create_batch_job( self, project_id: str, region: str, batch_id: str, job_type: str, main_file: str, args: list[str] | None = None, jar_files: list[str] | None = None, properties: dict[str, str] | None = None, service_account: str | None = None, network_uri: str | None = None, subnetwork_uri: str | None = None, ) -> dict[str, Any]: """Create a batch job.""" try: loop = asyncio.get_event_loop() client = self._get_batch_client(region) args = args or [] jar_files = jar_files or [] properties = properties or {} # Configure runtime runtime_config = types.RuntimeConfig() if properties: runtime_config.properties = properties # Configure environment environment_config = types.EnvironmentConfig() if service_account or network_uri or subnetwork_uri: execution_config = types.ExecutionConfig() if service_account: execution_config.service_account = service_account if network_uri: execution_config.network_uri = network_uri if subnetwork_uri: execution_config.subnetwork_uri = subnetwork_uri environment_config.execution_config = execution_config # Configure job based on type if job_type == "spark": job_config = types.SparkBatch( main_class=main_file, jar_file_uris=jar_files, args=args ) batch = types.Batch( runtime_config=runtime_config, environment_config=environment_config, spark_batch=job_config, ) elif job_type == "pyspark": job_config = types.PySparkBatch( main_python_file_uri=main_file, args=args, jar_file_uris=jar_files ) batch = types.Batch( runtime_config=runtime_config, environment_config=environment_config, pyspark_batch=job_config, ) elif job_type == "spark_sql": job_config = types.SparkSqlBatch( query_file_uri=main_file, jar_file_uris=jar_files ) batch = types.Batch( runtime_config=runtime_config, environment_config=environment_config, spark_sql_batch=job_config, ) else: raise ValueError(f"Unsupported batch job type: {job_type}") request = types.CreateBatchRequest( parent=f"projects/{project_id}/locations/{region}", batch=batch, batch_id=batch_id, ) operation = await loop.run_in_executor(None, client.create_batch, request) operation_name = getattr(operation, "name", str(operation)) return { "operation_name": operation_name, "batch_id": batch_id, "job_type": job_type, "status": "CREATING", "message": f"Batch job creation initiated. Operation: {operation_name}", } except Exception as e: logger.error("Failed to create batch job", error=str(e)) raise

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/warrenzhu25/dataproc-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server