Skip to main content
Glama

submit_job

Submit data processing jobs to Google Cloud Dataproc clusters for Spark, PySpark, Hive, and Hadoop workloads using specified project, region, and cluster parameters.

Instructions

Submit a job to a Dataproc cluster.

Args: project_id: Google Cloud project ID region: Dataproc region cluster_name: Target cluster name job_type: Type of job (spark, pyspark, spark_sql, hive, pig, hadoop) main_file: Main file/class for the job args: Job arguments jar_files: JAR files to include properties: Job properties

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
project_idYes
regionYes
cluster_nameYes
job_typeYes
main_fileYes
argsNo
jar_filesNo
propertiesNo

Implementation Reference

  • MCP tool handler and registration for 'submit_job'. This is the primary entry point for the tool, decorated with @mcp.tool(). It validates inputs via type hints (serving as schema) and delegates to DataprocClient.submit_job.
    @mcp.tool() async def submit_job( project_id: str, region: str, cluster_name: str, job_type: str, main_file: str, args: list[str] | None = None, jar_files: list[str] | None = None, properties: dict[str, str] | None = None, ) -> str: """Submit a job to a Dataproc cluster. Args: project_id: Google Cloud project ID region: Dataproc region cluster_name: Target cluster name job_type: Type of job (spark, pyspark, spark_sql, hive, pig, hadoop) main_file: Main file/class for the job args: Job arguments jar_files: JAR files to include properties: Job properties """ client = DataprocClient() try: result = await client.submit_job( project_id=project_id, region=region, cluster_name=cluster_name, job_type=job_type, main_file=main_file, args=args or [], jar_files=jar_files or [], properties=properties or {}, ) return str(result) except Exception as e: logger.error("Failed to submit job", error=str(e)) return f"Error: {str(e)}"
  • Supporting utility in DataprocClient class that implements the actual job submission logic to Google Cloud Dataproc API, handling different job types (spark, pyspark, etc.) and constructing the SubmitJobRequest.
    async def submit_job( self, project_id: str, region: str, cluster_name: str, job_type: str, main_file: str, args: list[str] | None = None, jar_files: list[str] | None = None, properties: dict[str, str] | None = None, ) -> dict[str, Any]: """Submit a job to a Dataproc cluster.""" try: loop = asyncio.get_event_loop() client = self._get_job_client(region) args = args or [] jar_files = jar_files or [] properties = properties or {} # Configure job based on type placement_config = types.JobPlacement(cluster_name=cluster_name) if job_type == "spark": job_config = types.SparkJob( main_class=main_file, jar_file_uris=jar_files, args=args, properties=properties, ) job = types.Job(placement=placement_config, spark_job=job_config) elif job_type == "pyspark": job_config = types.PySparkJob( main_python_file_uri=main_file, args=args, jar_file_uris=jar_files, properties=properties, ) job = types.Job(placement=placement_config, pyspark_job=job_config) elif job_type == "spark_sql": job_config = types.SparkSqlJob( query_file_uri=main_file, jar_file_uris=jar_files, properties=properties, ) job = types.Job(placement=placement_config, spark_sql_job=job_config) elif job_type == "hive": job_config = types.HiveJob( query_file_uri=main_file, jar_file_uris=jar_files, properties=properties, ) job = types.Job(placement=placement_config, hive_job=job_config) elif job_type == "hadoop": job_config = types.HadoopJob( main_class=main_file, jar_file_uris=jar_files, args=args, properties=properties, ) job = types.Job(placement=placement_config, hadoop_job=job_config) else: raise ValueError(f"Unsupported job type: {job_type}") request = types.SubmitJobRequest( project_id=project_id, region=region, job=job ) job_result = await loop.run_in_executor(None, client.submit_job, request) return { "job_id": job_result.reference.job_id, "job_type": job_type, "cluster_name": cluster_name, "status": job_result.status.state.name, "driver_output_uri": job_result.driver_output_resource_uri, "submission_time": datetime.now().isoformat(), } except Exception as e: logger.error("Failed to submit job", error=str(e)) raise

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/warrenzhu25/dataproc-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server