Skip to main content
Glama
warrenzhu25

Dataproc MCP Server

by warrenzhu25

list_jobs

Retrieve and filter Dataproc job listings by project, region, cluster, and state to monitor and manage data processing workflows.

Instructions

List jobs in a Dataproc cluster.

Args:
    project_id: Google Cloud project ID
    region: Dataproc region
    cluster_name: Cluster name (optional)
    job_states: Filter by job states

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
project_idYes
regionYes
cluster_nameNo
job_statesNo

Implementation Reference

  • The primary MCP tool handler for 'list_jobs', registered via @mcp.tool() decorator. It invokes the DataprocClient's list_jobs method and handles errors.
    @mcp.tool()
    async def list_jobs(
        project_id: str,
        region: str,
        cluster_name: str | None = None,
        job_states: list[str] | None = None,
    ) -> str:
        """List jobs in a Dataproc cluster.
    
        Args:
            project_id: Google Cloud project ID
            region: Dataproc region
            cluster_name: Cluster name (optional)
            job_states: Filter by job states
        """
        client = DataprocClient()
        try:
            result = await client.list_jobs(
                project_id=project_id,
                region=region,
                cluster_name=cluster_name,
                job_states=job_states or [],
            )
            return str(result)
        except Exception as e:
            logger.error("Failed to list jobs", error=str(e))
            return f"Error: {str(e)}"
  • Supporting helper function in DataprocClient that performs the actual Google Cloud API call to list jobs, applies filtering, and formats the response.
    async def list_jobs(
        self,
        project_id: str,
        region: str,
        cluster_name: str | None = None,
        job_states: list[str] | None = None,
    ) -> dict[str, Any]:
        """List jobs in a region."""
        try:
            loop = asyncio.get_event_loop()
            client = self._get_job_client(region)
    
            request = types.ListJobsRequest(
                project_id=project_id,
                region=region,
                cluster_name=cluster_name,
                job_state_matcher=types.ListJobsRequest.StateMatcherType.ALL,
            )
    
            response = await loop.run_in_executor(None, client.list_jobs, request)
    
            jobs = []
            for job in response:
                # Filter by states if provided
                if job_states and job.status.state.name not in job_states:
                    continue
    
                jobs.append(
                    {
                        "job_id": job.reference.job_id,
                        "cluster_name": job.placement.cluster_name,
                        "status": job.status.state.name,
                        "job_type": self._get_job_type(job),
                        "submission_time": job.status.state_start_time.isoformat()
                        if job.status.state_start_time
                        else None,
                        "driver_output_uri": job.driver_output_resource_uri,
                    }
                )
    
            return {
                "jobs": jobs,
                "total_count": len(jobs),
                "project_id": project_id,
                "region": region,
                "cluster_name": cluster_name,
            }
    
        except Exception as e:
            logger.error("Failed to list jobs", error=str(e))
            raise

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/warrenzhu25/dataproc-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server