Skip to main content
Glama
warrenzhu25

Dataproc MCP Server

by warrenzhu25

create_batch_job

Create and submit Dataproc batch jobs for Spark, PySpark, or Spark SQL workloads on Google Cloud. Configure job parameters, dependencies, and execution environment.

Instructions

Create a Dataproc batch job.

Args:
    project_id: Google Cloud project ID
    region: Dataproc region
    batch_id: Unique identifier for the batch job
    job_type: Type of batch job (spark, pyspark, spark_sql)
    main_file: Main file/class for the job
    args: Job arguments
    jar_files: JAR files to include
    properties: Job properties
    service_account: Service account email
    network_uri: Network URI
    subnetwork_uri: Subnetwork URI

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
project_idYes
regionYes
batch_idYes
job_typeYes
main_fileYes
argsNo
jar_filesNo
propertiesNo
service_accountNo
network_uriNo
subnetwork_uriNo

Implementation Reference

  • MCP tool handler and registration for 'create_batch_job'. This is the entry point for the tool, decorated with @mcp.tool(), defining the input schema via type hints and parameters, and delegating execution to DataprocBatchClient.
    @mcp.tool()
    async def create_batch_job(
        project_id: str,
        region: str,
        batch_id: str,
        job_type: str,
        main_file: str,
        args: list[str] | None = None,
        jar_files: list[str] | None = None,
        properties: dict[str, str] | None = None,
        service_account: str | None = None,
        network_uri: str | None = None,
        subnetwork_uri: str | None = None,
    ) -> str:
        """Create a Dataproc batch job.
    
        Args:
            project_id: Google Cloud project ID
            region: Dataproc region
            batch_id: Unique identifier for the batch job
            job_type: Type of batch job (spark, pyspark, spark_sql)
            main_file: Main file/class for the job
            args: Job arguments
            jar_files: JAR files to include
            properties: Job properties
            service_account: Service account email
            network_uri: Network URI
            subnetwork_uri: Subnetwork URI
        """
        batch_client = DataprocBatchClient()
        try:
            result = await batch_client.create_batch_job(
                project_id=project_id,
                region=region,
                batch_id=batch_id,
                job_type=job_type,
                main_file=main_file,
                args=args or [],
                jar_files=jar_files or [],
                properties=properties or {},
                service_account=service_account,
                network_uri=network_uri,
                subnetwork_uri=subnetwork_uri,
            )
            return str(result)
        except Exception as e:
            logger.error("Failed to create batch job", error=str(e))
            return f"Error: {str(e)}"
  • Core implementation of batch job creation in DataprocBatchClient class. Handles Google Cloud Dataproc API calls to create the batch based on job_type (spark, pyspark, spark_sql), configures runtime, environment, and submits the request.
    async def create_batch_job(
        self,
        project_id: str,
        region: str,
        batch_id: str,
        job_type: str,
        main_file: str,
        args: list[str] | None = None,
        jar_files: list[str] | None = None,
        properties: dict[str, str] | None = None,
        service_account: str | None = None,
        network_uri: str | None = None,
        subnetwork_uri: str | None = None,
    ) -> dict[str, Any]:
        """Create a batch job."""
        try:
            loop = asyncio.get_event_loop()
            client = self._get_batch_client(region)
    
            args = args or []
            jar_files = jar_files or []
            properties = properties or {}
    
            # Configure runtime
            runtime_config = types.RuntimeConfig()
            if properties:
                runtime_config.properties = properties
    
            # Configure environment
            environment_config = types.EnvironmentConfig()
            if service_account or network_uri or subnetwork_uri:
                execution_config = types.ExecutionConfig()
                if service_account:
                    execution_config.service_account = service_account
                if network_uri:
                    execution_config.network_uri = network_uri
                if subnetwork_uri:
                    execution_config.subnetwork_uri = subnetwork_uri
                environment_config.execution_config = execution_config
    
            # Configure job based on type
            if job_type == "spark":
                job_config = types.SparkBatch(
                    main_class=main_file, jar_file_uris=jar_files, args=args
                )
                batch = types.Batch(
                    runtime_config=runtime_config,
                    environment_config=environment_config,
                    spark_batch=job_config,
                )
            elif job_type == "pyspark":
                job_config = types.PySparkBatch(
                    main_python_file_uri=main_file, args=args, jar_file_uris=jar_files
                )
                batch = types.Batch(
                    runtime_config=runtime_config,
                    environment_config=environment_config,
                    pyspark_batch=job_config,
                )
            elif job_type == "spark_sql":
                job_config = types.SparkSqlBatch(
                    query_file_uri=main_file, jar_file_uris=jar_files
                )
                batch = types.Batch(
                    runtime_config=runtime_config,
                    environment_config=environment_config,
                    spark_sql_batch=job_config,
                )
            else:
                raise ValueError(f"Unsupported batch job type: {job_type}")
    
            request = types.CreateBatchRequest(
                parent=f"projects/{project_id}/locations/{region}",
                batch=batch,
                batch_id=batch_id,
            )
    
            operation = await loop.run_in_executor(None, client.create_batch, request)
    
            operation_name = getattr(operation, "name", str(operation))
            return {
                "operation_name": operation_name,
                "batch_id": batch_id,
                "job_type": job_type,
                "status": "CREATING",
                "message": f"Batch job creation initiated. Operation: {operation_name}",
            }
    
        except Exception as e:
            logger.error("Failed to create batch job", error=str(e))
            raise

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/warrenzhu25/dataproc-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server