get_job
Retrieve detailed information about a specific Dataproc job, including status, configuration, and execution details, by providing project ID, region, and job ID.
Instructions
Get details of a specific job.
Args:
project_id: Google Cloud project ID
region: Dataproc region
job_id: Job ID
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| project_id | Yes | ||
| region | Yes | ||
| job_id | Yes |
Implementation Reference
- MCP tool handler function for 'get_job', decorated with @mcp.tool(). It instantiates DataprocClient and calls its get_job method to retrieve and return job details as a string.@mcp.tool() async def get_job(project_id: str, region: str, job_id: str) -> str: """Get details of a specific job. Args: project_id: Google Cloud project ID region: Dataproc region job_id: Job ID """ client = DataprocClient() try: result = await client.get_job(project_id, region, job_id) return str(result) except Exception as e: logger.error("Failed to get job", error=str(e)) return f"Error: {str(e)}"
- Core implementation of get_job in DataprocClient class. Uses Google Cloud Dataproc_v1 JobControllerClient to fetch job details via API and formats the response dictionary.async def get_job( self, project_id: str, region: str, job_id: str ) -> dict[str, Any]: """Get details of a specific job.""" try: loop = asyncio.get_event_loop() client = self._get_job_client(region) request = types.GetJobRequest( project_id=project_id, region=region, job_id=job_id ) job = await loop.run_in_executor(None, client.get_job, request) return { "job_id": job.reference.job_id, "cluster_name": job.placement.cluster_name, "status": job.status.state.name, "status_detail": job.status.details, "job_type": self._get_job_type(job), "submission_time": job.status.state_start_time.isoformat() if job.status.state_start_time else None, "start_time": job.status.state_start_time.isoformat() if job.status.state_start_time else None, "end_time": job.status.state_start_time.isoformat() if job.status.state_start_time else None, "driver_output_uri": job.driver_output_resource_uri, "driver_control_files_uri": job.driver_control_files_uri, } except Exception as e: logger.error("Failed to get job", error=str(e)) raise
- Helper method to determine the type of a Dataproc job based on which job config is present.def _get_job_type(self, job: types.Job) -> str: """Extract job type from job object.""" if job.spark_job: return "spark" elif job.pyspark_job: return "pyspark" elif job.spark_sql_job: return "spark_sql" elif job.hive_job: return "hive" elif job.pig_job: return "pig" elif job.hadoop_job: return "hadoop" else: return "unknown"