list_jobs
Retrieve and filter jobs within a Dataproc cluster by specifying project ID, region, cluster name, and job states. Simplify job management and monitoring in Google Cloud Dataproc environments.
Instructions
List jobs in a Dataproc cluster.
Args:
project_id: Google Cloud project ID
region: Dataproc region
cluster_name: Cluster name (optional)
job_states: Filter by job states
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| cluster_name | No | ||
| job_states | No | ||
| project_id | Yes | ||
| region | Yes |
Implementation Reference
- The MCP tool handler and registration for 'list_jobs'. The @mcp.tool() decorator registers the tool, and this async function handles the execution by calling the DataprocClient.@mcp.tool() async def list_jobs( project_id: str, region: str, cluster_name: str | None = None, job_states: list[str] | None = None, ) -> str: """List jobs in a Dataproc cluster. Args: project_id: Google Cloud project ID region: Dataproc region cluster_name: Cluster name (optional) job_states: Filter by job states """ client = DataprocClient() try: result = await client.list_jobs( project_id=project_id, region=region, cluster_name=cluster_name, job_states=job_states or [], ) return str(result) except Exception as e: logger.error("Failed to list jobs", error=str(e)) return f"Error: {str(e)}"
- Helper method in DataprocClient class that performs the actual Google Cloud Dataproc API call to list jobs, filters by states if provided, and formats the response.async def list_jobs( self, project_id: str, region: str, cluster_name: str | None = None, job_states: list[str] | None = None, ) -> dict[str, Any]: """List jobs in a region.""" try: loop = asyncio.get_event_loop() client = self._get_job_client(region) request = types.ListJobsRequest( project_id=project_id, region=region, cluster_name=cluster_name, job_state_matcher=types.ListJobsRequest.StateMatcherType.ALL, ) response = await loop.run_in_executor(None, client.list_jobs, request) jobs = [] for job in response: # Filter by states if provided if job_states and job.status.state.name not in job_states: continue jobs.append( { "job_id": job.reference.job_id, "cluster_name": job.placement.cluster_name, "status": job.status.state.name, "job_type": self._get_job_type(job), "submission_time": job.status.state_start_time.isoformat() if job.status.state_start_time else None, "driver_output_uri": job.driver_output_resource_uri, } ) return { "jobs": jobs, "total_count": len(jobs), "project_id": project_id, "region": region, "cluster_name": cluster_name, } except Exception as e: logger.error("Failed to list jobs", error=str(e)) raise