Skip to main content
Glama
warrenzhu25

Dataproc MCP Server

by warrenzhu25

submit_job

Submit data processing jobs to Google Cloud Dataproc clusters for Spark, PySpark, Hive, and Hadoop workloads using specified project, region, and cluster parameters.

Instructions

Submit a job to a Dataproc cluster.

Args:
    project_id: Google Cloud project ID
    region: Dataproc region
    cluster_name: Target cluster name
    job_type: Type of job (spark, pyspark, spark_sql, hive, pig, hadoop)
    main_file: Main file/class for the job
    args: Job arguments
    jar_files: JAR files to include
    properties: Job properties

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
project_idYes
regionYes
cluster_nameYes
job_typeYes
main_fileYes
argsNo
jar_filesNo
propertiesNo

Output Schema

TableJSON Schema
NameRequiredDescriptionDefault
resultYes

Implementation Reference

  • MCP tool handler and registration for 'submit_job'. This is the primary entry point for the tool, decorated with @mcp.tool(). It validates inputs via type hints (serving as schema) and delegates to DataprocClient.submit_job.
    @mcp.tool()
    async def submit_job(
        project_id: str,
        region: str,
        cluster_name: str,
        job_type: str,
        main_file: str,
        args: list[str] | None = None,
        jar_files: list[str] | None = None,
        properties: dict[str, str] | None = None,
    ) -> str:
        """Submit a job to a Dataproc cluster.
    
        Args:
            project_id: Google Cloud project ID
            region: Dataproc region
            cluster_name: Target cluster name
            job_type: Type of job (spark, pyspark, spark_sql, hive, pig, hadoop)
            main_file: Main file/class for the job
            args: Job arguments
            jar_files: JAR files to include
            properties: Job properties
        """
        client = DataprocClient()
        try:
            result = await client.submit_job(
                project_id=project_id,
                region=region,
                cluster_name=cluster_name,
                job_type=job_type,
                main_file=main_file,
                args=args or [],
                jar_files=jar_files or [],
                properties=properties or {},
            )
            return str(result)
        except Exception as e:
            logger.error("Failed to submit job", error=str(e))
            return f"Error: {str(e)}"
  • Supporting utility in DataprocClient class that implements the actual job submission logic to Google Cloud Dataproc API, handling different job types (spark, pyspark, etc.) and constructing the SubmitJobRequest.
    async def submit_job(
        self,
        project_id: str,
        region: str,
        cluster_name: str,
        job_type: str,
        main_file: str,
        args: list[str] | None = None,
        jar_files: list[str] | None = None,
        properties: dict[str, str] | None = None,
    ) -> dict[str, Any]:
        """Submit a job to a Dataproc cluster."""
        try:
            loop = asyncio.get_event_loop()
            client = self._get_job_client(region)
    
            args = args or []
            jar_files = jar_files or []
            properties = properties or {}
    
            # Configure job based on type
            placement_config = types.JobPlacement(cluster_name=cluster_name)
    
            if job_type == "spark":
                job_config = types.SparkJob(
                    main_class=main_file,
                    jar_file_uris=jar_files,
                    args=args,
                    properties=properties,
                )
                job = types.Job(placement=placement_config, spark_job=job_config)
            elif job_type == "pyspark":
                job_config = types.PySparkJob(
                    main_python_file_uri=main_file,
                    args=args,
                    jar_file_uris=jar_files,
                    properties=properties,
                )
                job = types.Job(placement=placement_config, pyspark_job=job_config)
            elif job_type == "spark_sql":
                job_config = types.SparkSqlJob(
                    query_file_uri=main_file,
                    jar_file_uris=jar_files,
                    properties=properties,
                )
                job = types.Job(placement=placement_config, spark_sql_job=job_config)
            elif job_type == "hive":
                job_config = types.HiveJob(
                    query_file_uri=main_file,
                    jar_file_uris=jar_files,
                    properties=properties,
                )
                job = types.Job(placement=placement_config, hive_job=job_config)
            elif job_type == "hadoop":
                job_config = types.HadoopJob(
                    main_class=main_file,
                    jar_file_uris=jar_files,
                    args=args,
                    properties=properties,
                )
                job = types.Job(placement=placement_config, hadoop_job=job_config)
            else:
                raise ValueError(f"Unsupported job type: {job_type}")
    
            request = types.SubmitJobRequest(
                project_id=project_id, region=region, job=job
            )
    
            job_result = await loop.run_in_executor(None, client.submit_job, request)
    
            return {
                "job_id": job_result.reference.job_id,
                "job_type": job_type,
                "cluster_name": cluster_name,
                "status": job_result.status.state.name,
                "driver_output_uri": job_result.driver_output_resource_uri,
                "submission_time": datetime.now().isoformat(),
            }
    
        except Exception as e:
            logger.error("Failed to submit job", error=str(e))
            raise
Behavior2/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

With no annotations provided, the description carries full burden for behavioral disclosure. It states this is a job submission operation (implying a write/mutation), but doesn't describe what happens after submission (e.g., job execution, status tracking), potential side effects, error conditions, or authentication requirements. For a mutation tool with zero annotation coverage, this is insufficient.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness4/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is appropriately sized and well-structured with a clear opening statement followed by a parameter list. Every sentence earns its place, though the parameter explanations could be slightly more detailed for complex fields like 'properties'.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness3/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Given this is a mutation tool with 8 parameters, no annotations, but with an output schema (which reduces need to describe return values), the description is moderately complete. It covers parameter semantics well but lacks behavioral context about what happens after submission, error handling, or prerequisites. For a job submission operation, more context about execution flow would be helpful.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters5/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

The description provides excellent parameter semantics beyond the schema. With 0% schema description coverage, the description compensates fully by explaining what each parameter represents (e.g., 'Google Cloud project ID', 'Type of job', 'Main file/class for the job'), including the specific job_type enum values. This adds significant value beyond the bare schema.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose4/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description clearly states the action ('submit a job') and target resource ('to a Dataproc cluster'), providing specific context. However, it doesn't distinguish this tool from sibling tools like 'create_batch_job' or 'get_job', which appear to be related job operations in the same system.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines2/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

No guidance is provided about when to use this tool versus alternatives like 'create_batch_job' or 'get_job'. The description mentions only what the tool does, not when it's appropriate or what prerequisites might be needed (e.g., cluster must be running).

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/warrenzhu25/dataproc-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server