submit_job
Submit data processing jobs to Google Cloud Dataproc clusters for Spark, PySpark, Hive, and Hadoop workloads using specified project, region, and cluster parameters.
Instructions
Submit a job to a Dataproc cluster.
Args:
project_id: Google Cloud project ID
region: Dataproc region
cluster_name: Target cluster name
job_type: Type of job (spark, pyspark, spark_sql, hive, pig, hadoop)
main_file: Main file/class for the job
args: Job arguments
jar_files: JAR files to include
properties: Job properties
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| project_id | Yes | ||
| region | Yes | ||
| cluster_name | Yes | ||
| job_type | Yes | ||
| main_file | Yes | ||
| args | No | ||
| jar_files | No | ||
| properties | No |
Implementation Reference
- MCP tool handler and registration for 'submit_job'. This is the primary entry point for the tool, decorated with @mcp.tool(). It validates inputs via type hints (serving as schema) and delegates to DataprocClient.submit_job.@mcp.tool() async def submit_job( project_id: str, region: str, cluster_name: str, job_type: str, main_file: str, args: list[str] | None = None, jar_files: list[str] | None = None, properties: dict[str, str] | None = None, ) -> str: """Submit a job to a Dataproc cluster. Args: project_id: Google Cloud project ID region: Dataproc region cluster_name: Target cluster name job_type: Type of job (spark, pyspark, spark_sql, hive, pig, hadoop) main_file: Main file/class for the job args: Job arguments jar_files: JAR files to include properties: Job properties """ client = DataprocClient() try: result = await client.submit_job( project_id=project_id, region=region, cluster_name=cluster_name, job_type=job_type, main_file=main_file, args=args or [], jar_files=jar_files or [], properties=properties or {}, ) return str(result) except Exception as e: logger.error("Failed to submit job", error=str(e)) return f"Error: {str(e)}"
- Supporting utility in DataprocClient class that implements the actual job submission logic to Google Cloud Dataproc API, handling different job types (spark, pyspark, etc.) and constructing the SubmitJobRequest.async def submit_job( self, project_id: str, region: str, cluster_name: str, job_type: str, main_file: str, args: list[str] | None = None, jar_files: list[str] | None = None, properties: dict[str, str] | None = None, ) -> dict[str, Any]: """Submit a job to a Dataproc cluster.""" try: loop = asyncio.get_event_loop() client = self._get_job_client(region) args = args or [] jar_files = jar_files or [] properties = properties or {} # Configure job based on type placement_config = types.JobPlacement(cluster_name=cluster_name) if job_type == "spark": job_config = types.SparkJob( main_class=main_file, jar_file_uris=jar_files, args=args, properties=properties, ) job = types.Job(placement=placement_config, spark_job=job_config) elif job_type == "pyspark": job_config = types.PySparkJob( main_python_file_uri=main_file, args=args, jar_file_uris=jar_files, properties=properties, ) job = types.Job(placement=placement_config, pyspark_job=job_config) elif job_type == "spark_sql": job_config = types.SparkSqlJob( query_file_uri=main_file, jar_file_uris=jar_files, properties=properties, ) job = types.Job(placement=placement_config, spark_sql_job=job_config) elif job_type == "hive": job_config = types.HiveJob( query_file_uri=main_file, jar_file_uris=jar_files, properties=properties, ) job = types.Job(placement=placement_config, hive_job=job_config) elif job_type == "hadoop": job_config = types.HadoopJob( main_class=main_file, jar_file_uris=jar_files, args=args, properties=properties, ) job = types.Job(placement=placement_config, hadoop_job=job_config) else: raise ValueError(f"Unsupported job type: {job_type}") request = types.SubmitJobRequest( project_id=project_id, region=region, job=job ) job_result = await loop.run_in_executor(None, client.submit_job, request) return { "job_id": job_result.reference.job_id, "job_type": job_type, "cluster_name": cluster_name, "status": job_result.status.state.name, "driver_output_uri": job_result.driver_output_resource_uri, "submission_time": datetime.now().isoformat(), } except Exception as e: logger.error("Failed to submit job", error=str(e)) raise