get_job
Retrieve details of a specific Dataproc job by providing the Google Cloud project ID, region, and job ID. Simplify job monitoring and management on Dataproc MCP Server.
Instructions
Get details of a specific job.
Args:
project_id: Google Cloud project ID
region: Dataproc region
job_id: Job ID
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| job_id | Yes | ||
| project_id | Yes | ||
| region | Yes |
Implementation Reference
- MCP tool handler for 'get_job' tool. Decorated with @mcp.tool() which registers it and infers schema from signature. Delegates to DataprocClient.get_job()@mcp.tool() async def get_job(project_id: str, region: str, job_id: str) -> str: """Get details of a specific job. Args: project_id: Google Cloud project ID region: Dataproc region job_id: Job ID """ client = DataprocClient() try: result = await client.get_job(project_id, region, job_id) return str(result) except Exception as e: logger.error("Failed to get job", error=str(e)) return f"Error: {str(e)}"
- Core implementation of get_job using Google Cloud Dataproc_v1 JobControllerClient.get_job API call. Called by the MCP handler.async def get_job( self, project_id: str, region: str, job_id: str ) -> dict[str, Any]: """Get details of a specific job.""" try: loop = asyncio.get_event_loop() client = self._get_job_client(region) request = types.GetJobRequest( project_id=project_id, region=region, job_id=job_id ) job = await loop.run_in_executor(None, client.get_job, request) return { "job_id": job.reference.job_id, "cluster_name": job.placement.cluster_name, "status": job.status.state.name, "status_detail": job.status.details, "job_type": self._get_job_type(job), "submission_time": job.status.state_start_time.isoformat() if job.status.state_start_time else None, "start_time": job.status.state_start_time.isoformat() if job.status.state_start_time else None, "end_time": job.status.state_start_time.isoformat() if job.status.state_start_time else None, "driver_output_uri": job.driver_output_resource_uri, "driver_control_files_uri": job.driver_control_files_uri, } except Exception as e: logger.error("Failed to get job", error=str(e)) raise
- Helper method to create JobControllerClient instance for the region, used in get_job.def _get_job_client(self, region: str) -> dataproc_v1.JobControllerClient: """Get job controller client with regional endpoint.""" # Configure regional endpoint regional_endpoint = f"{region}-dataproc.googleapis.com" client_opts = client_options.ClientOptions(api_endpoint=regional_endpoint) return dataproc_v1.JobControllerClient( credentials=self._credentials, client_options=client_opts )
- Helper to determine job type from Job object, used in get_job response.def _get_job_type(self, job: types.Job) -> str: """Extract job type from job object.""" if job.spark_job: return "spark" elif job.pyspark_job: return "pyspark" elif job.spark_sql_job: return "spark_sql" elif job.hive_job: return "hive" elif job.pig_job: return "pig" elif job.hadoop_job: return "hadoop" else: return "unknown"