submit_job
Submit a job to a Dataproc cluster by specifying project ID, region, cluster name, job type, main file, and optional arguments, JAR files, and properties. Supports Spark, PySpark, Hive, Pig, and Hadoop job types.
Instructions
Submit a job to a Dataproc cluster.
Args:
project_id: Google Cloud project ID
region: Dataproc region
cluster_name: Target cluster name
job_type: Type of job (spark, pyspark, spark_sql, hive, pig, hadoop)
main_file: Main file/class for the job
args: Job arguments
jar_files: JAR files to include
properties: Job properties
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| args | No | ||
| cluster_name | Yes | ||
| jar_files | No | ||
| job_type | Yes | ||
| main_file | Yes | ||
| project_id | Yes | ||
| properties | No | ||
| region | Yes |
Implementation Reference
- MCP tool handler for 'submit_job', registered via @mcp.tool() decorator. Handles input parameters, creates DataprocClient, and calls its submit_job method.@mcp.tool() async def submit_job( project_id: str, region: str, cluster_name: str, job_type: str, main_file: str, args: list[str] | None = None, jar_files: list[str] | None = None, properties: dict[str, str] | None = None, ) -> str: """Submit a job to a Dataproc cluster. Args: project_id: Google Cloud project ID region: Dataproc region cluster_name: Target cluster name job_type: Type of job (spark, pyspark, spark_sql, hive, pig, hadoop) main_file: Main file/class for the job args: Job arguments jar_files: JAR files to include properties: Job properties """ client = DataprocClient() try: result = await client.submit_job( project_id=project_id, region=region, cluster_name=cluster_name, job_type=job_type, main_file=main_file, args=args or [], jar_files=jar_files or [], properties=properties or {}, ) return str(result) except Exception as e: logger.error("Failed to submit job", error=str(e)) return f"Error: {str(e)}"
- Helper method in DataprocClient that implements the core logic: constructs Google Cloud Dataproc Job object based on job_type (spark, pyspark, etc.), creates SubmitJobRequest, and submits via JobControllerClient.async def submit_job( self, project_id: str, region: str, cluster_name: str, job_type: str, main_file: str, args: list[str] | None = None, jar_files: list[str] | None = None, properties: dict[str, str] | None = None, ) -> dict[str, Any]: """Submit a job to a Dataproc cluster.""" try: loop = asyncio.get_event_loop() client = self._get_job_client(region) args = args or [] jar_files = jar_files or [] properties = properties or {} # Configure job based on type placement_config = types.JobPlacement(cluster_name=cluster_name) if job_type == "spark": job_config = types.SparkJob( main_class=main_file, jar_file_uris=jar_files, args=args, properties=properties, ) job = types.Job(placement=placement_config, spark_job=job_config) elif job_type == "pyspark": job_config = types.PySparkJob( main_python_file_uri=main_file, args=args, jar_file_uris=jar_files, properties=properties, ) job = types.Job(placement=placement_config, pyspark_job=job_config) elif job_type == "spark_sql": job_config = types.SparkSqlJob( query_file_uri=main_file, jar_file_uris=jar_files, properties=properties, ) job = types.Job(placement=placement_config, spark_sql_job=job_config) elif job_type == "hive": job_config = types.HiveJob( query_file_uri=main_file, jar_file_uris=jar_files, properties=properties, ) job = types.Job(placement=placement_config, hive_job=job_config) elif job_type == "hadoop": job_config = types.HadoopJob( main_class=main_file, jar_file_uris=jar_files, args=args, properties=properties, ) job = types.Job(placement=placement_config, hadoop_job=job_config) else: raise ValueError(f"Unsupported job type: {job_type}") request = types.SubmitJobRequest( project_id=project_id, region=region, job=job ) job_result = await loop.run_in_executor(None, client.submit_job, request) return { "job_id": job_result.reference.job_id, "job_type": job_type, "cluster_name": cluster_name, "status": job_result.status.state.name, "driver_output_uri": job_result.driver_output_resource_uri, "submission_time": datetime.now().isoformat(), } except Exception as e: logger.error("Failed to submit job", error=str(e)) raise