Skip to main content
Glama

submit_job

Submit a job to a Dataproc cluster by specifying project ID, region, cluster name, job type, main file, and optional arguments, JAR files, and properties. Supports Spark, PySpark, Hive, Pig, and Hadoop job types.

Instructions

Submit a job to a Dataproc cluster.

Args: project_id: Google Cloud project ID region: Dataproc region cluster_name: Target cluster name job_type: Type of job (spark, pyspark, spark_sql, hive, pig, hadoop) main_file: Main file/class for the job args: Job arguments jar_files: JAR files to include properties: Job properties

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
argsNo
cluster_nameYes
jar_filesNo
job_typeYes
main_fileYes
project_idYes
propertiesNo
regionYes

Implementation Reference

  • MCP tool handler for 'submit_job', registered via @mcp.tool() decorator. Handles input parameters, creates DataprocClient, and calls its submit_job method.
    @mcp.tool() async def submit_job( project_id: str, region: str, cluster_name: str, job_type: str, main_file: str, args: list[str] | None = None, jar_files: list[str] | None = None, properties: dict[str, str] | None = None, ) -> str: """Submit a job to a Dataproc cluster. Args: project_id: Google Cloud project ID region: Dataproc region cluster_name: Target cluster name job_type: Type of job (spark, pyspark, spark_sql, hive, pig, hadoop) main_file: Main file/class for the job args: Job arguments jar_files: JAR files to include properties: Job properties """ client = DataprocClient() try: result = await client.submit_job( project_id=project_id, region=region, cluster_name=cluster_name, job_type=job_type, main_file=main_file, args=args or [], jar_files=jar_files or [], properties=properties or {}, ) return str(result) except Exception as e: logger.error("Failed to submit job", error=str(e)) return f"Error: {str(e)}"
  • Helper method in DataprocClient that implements the core logic: constructs Google Cloud Dataproc Job object based on job_type (spark, pyspark, etc.), creates SubmitJobRequest, and submits via JobControllerClient.
    async def submit_job( self, project_id: str, region: str, cluster_name: str, job_type: str, main_file: str, args: list[str] | None = None, jar_files: list[str] | None = None, properties: dict[str, str] | None = None, ) -> dict[str, Any]: """Submit a job to a Dataproc cluster.""" try: loop = asyncio.get_event_loop() client = self._get_job_client(region) args = args or [] jar_files = jar_files or [] properties = properties or {} # Configure job based on type placement_config = types.JobPlacement(cluster_name=cluster_name) if job_type == "spark": job_config = types.SparkJob( main_class=main_file, jar_file_uris=jar_files, args=args, properties=properties, ) job = types.Job(placement=placement_config, spark_job=job_config) elif job_type == "pyspark": job_config = types.PySparkJob( main_python_file_uri=main_file, args=args, jar_file_uris=jar_files, properties=properties, ) job = types.Job(placement=placement_config, pyspark_job=job_config) elif job_type == "spark_sql": job_config = types.SparkSqlJob( query_file_uri=main_file, jar_file_uris=jar_files, properties=properties, ) job = types.Job(placement=placement_config, spark_sql_job=job_config) elif job_type == "hive": job_config = types.HiveJob( query_file_uri=main_file, jar_file_uris=jar_files, properties=properties, ) job = types.Job(placement=placement_config, hive_job=job_config) elif job_type == "hadoop": job_config = types.HadoopJob( main_class=main_file, jar_file_uris=jar_files, args=args, properties=properties, ) job = types.Job(placement=placement_config, hadoop_job=job_config) else: raise ValueError(f"Unsupported job type: {job_type}") request = types.SubmitJobRequest( project_id=project_id, region=region, job=job ) job_result = await loop.run_in_executor(None, client.submit_job, request) return { "job_id": job_result.reference.job_id, "job_type": job_type, "cluster_name": cluster_name, "status": job_result.status.state.name, "driver_output_uri": job_result.driver_output_resource_uri, "submission_time": datetime.now().isoformat(), } except Exception as e: logger.error("Failed to submit job", error=str(e)) raise

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/warrenzhu25/dataproc-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server