Skip to main content
Glama
warrenzhu25

Dataproc MCP Server

by warrenzhu25

create_batch_job

Create and submit Dataproc batch jobs for Spark, PySpark, or Spark SQL workloads on Google Cloud. Configure job parameters, dependencies, and execution environment.

Instructions

Create a Dataproc batch job.

Args:
    project_id: Google Cloud project ID
    region: Dataproc region
    batch_id: Unique identifier for the batch job
    job_type: Type of batch job (spark, pyspark, spark_sql)
    main_file: Main file/class for the job
    args: Job arguments
    jar_files: JAR files to include
    properties: Job properties
    service_account: Service account email
    network_uri: Network URI
    subnetwork_uri: Subnetwork URI

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
project_idYes
regionYes
batch_idYes
job_typeYes
main_fileYes
argsNo
jar_filesNo
propertiesNo
service_accountNo
network_uriNo
subnetwork_uriNo

Output Schema

TableJSON Schema
NameRequiredDescriptionDefault
resultYes

Implementation Reference

  • MCP tool handler and registration for 'create_batch_job'. This is the entry point for the tool, decorated with @mcp.tool(), defining the input schema via type hints and parameters, and delegating execution to DataprocBatchClient.
    @mcp.tool()
    async def create_batch_job(
        project_id: str,
        region: str,
        batch_id: str,
        job_type: str,
        main_file: str,
        args: list[str] | None = None,
        jar_files: list[str] | None = None,
        properties: dict[str, str] | None = None,
        service_account: str | None = None,
        network_uri: str | None = None,
        subnetwork_uri: str | None = None,
    ) -> str:
        """Create a Dataproc batch job.
    
        Args:
            project_id: Google Cloud project ID
            region: Dataproc region
            batch_id: Unique identifier for the batch job
            job_type: Type of batch job (spark, pyspark, spark_sql)
            main_file: Main file/class for the job
            args: Job arguments
            jar_files: JAR files to include
            properties: Job properties
            service_account: Service account email
            network_uri: Network URI
            subnetwork_uri: Subnetwork URI
        """
        batch_client = DataprocBatchClient()
        try:
            result = await batch_client.create_batch_job(
                project_id=project_id,
                region=region,
                batch_id=batch_id,
                job_type=job_type,
                main_file=main_file,
                args=args or [],
                jar_files=jar_files or [],
                properties=properties or {},
                service_account=service_account,
                network_uri=network_uri,
                subnetwork_uri=subnetwork_uri,
            )
            return str(result)
        except Exception as e:
            logger.error("Failed to create batch job", error=str(e))
            return f"Error: {str(e)}"
  • Core implementation of batch job creation in DataprocBatchClient class. Handles Google Cloud Dataproc API calls to create the batch based on job_type (spark, pyspark, spark_sql), configures runtime, environment, and submits the request.
    async def create_batch_job(
        self,
        project_id: str,
        region: str,
        batch_id: str,
        job_type: str,
        main_file: str,
        args: list[str] | None = None,
        jar_files: list[str] | None = None,
        properties: dict[str, str] | None = None,
        service_account: str | None = None,
        network_uri: str | None = None,
        subnetwork_uri: str | None = None,
    ) -> dict[str, Any]:
        """Create a batch job."""
        try:
            loop = asyncio.get_event_loop()
            client = self._get_batch_client(region)
    
            args = args or []
            jar_files = jar_files or []
            properties = properties or {}
    
            # Configure runtime
            runtime_config = types.RuntimeConfig()
            if properties:
                runtime_config.properties = properties
    
            # Configure environment
            environment_config = types.EnvironmentConfig()
            if service_account or network_uri or subnetwork_uri:
                execution_config = types.ExecutionConfig()
                if service_account:
                    execution_config.service_account = service_account
                if network_uri:
                    execution_config.network_uri = network_uri
                if subnetwork_uri:
                    execution_config.subnetwork_uri = subnetwork_uri
                environment_config.execution_config = execution_config
    
            # Configure job based on type
            if job_type == "spark":
                job_config = types.SparkBatch(
                    main_class=main_file, jar_file_uris=jar_files, args=args
                )
                batch = types.Batch(
                    runtime_config=runtime_config,
                    environment_config=environment_config,
                    spark_batch=job_config,
                )
            elif job_type == "pyspark":
                job_config = types.PySparkBatch(
                    main_python_file_uri=main_file, args=args, jar_file_uris=jar_files
                )
                batch = types.Batch(
                    runtime_config=runtime_config,
                    environment_config=environment_config,
                    pyspark_batch=job_config,
                )
            elif job_type == "spark_sql":
                job_config = types.SparkSqlBatch(
                    query_file_uri=main_file, jar_file_uris=jar_files
                )
                batch = types.Batch(
                    runtime_config=runtime_config,
                    environment_config=environment_config,
                    spark_sql_batch=job_config,
                )
            else:
                raise ValueError(f"Unsupported batch job type: {job_type}")
    
            request = types.CreateBatchRequest(
                parent=f"projects/{project_id}/locations/{region}",
                batch=batch,
                batch_id=batch_id,
            )
    
            operation = await loop.run_in_executor(None, client.create_batch, request)
    
            operation_name = getattr(operation, "name", str(operation))
            return {
                "operation_name": operation_name,
                "batch_id": batch_id,
                "job_type": job_type,
                "status": "CREATING",
                "message": f"Batch job creation initiated. Operation: {operation_name}",
            }
    
        except Exception as e:
            logger.error("Failed to create batch job", error=str(e))
            raise
Behavior2/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

With no annotations provided, the description carries full burden for behavioral disclosure. While 'Create' implies a write/mutation operation, it doesn't mention authentication requirements, rate limits, side effects, what happens on failure, or whether the job starts immediately. For a complex creation tool with 11 parameters, this is a significant gap in behavioral context.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness4/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is efficiently structured with a clear purpose statement followed by a parameter list. Every sentence serves a purpose, though the parameter explanations are quite brief. The front-loaded purpose statement helps the agent quickly understand the tool's function.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness3/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Given the tool's complexity (11 parameters, creation operation) and the presence of an output schema, the description covers the basics but lacks important context. It explains parameters well but misses behavioral aspects, usage guidelines, and doesn't leverage the output schema's existence to provide more complete guidance about what happens after creation.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters4/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

The description provides a parameter list with brief explanations for all 11 parameters, adding substantial value beyond the input schema which has 0% description coverage. It clarifies what each parameter represents (e.g., 'job_type: Type of batch job (spark, pyspark, spark_sql)'), though some explanations could be more detailed about format expectations.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose4/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description clearly states the verb 'Create' and resource 'Dataproc batch job', making the purpose immediately understandable. However, it doesn't differentiate from sibling tools like 'submit_job' or 'create_cluster', which would require explicit comparison to achieve a perfect score.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines2/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

The description provides no guidance on when to use this tool versus alternatives like 'submit_job' or 'create_cluster'. It lacks context about prerequisites, dependencies, or typical use cases, leaving the agent with insufficient information to make appropriate selection decisions.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/warrenzhu25/dataproc-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server