Skip to main content
Glama
warrenzhu25

Dataproc MCP Server

by warrenzhu25

get_batch_job

Retrieve detailed information about a specific Dataproc batch job using project ID, region, and batch ID parameters to monitor job status and configuration.

Instructions

Get details of a specific batch job.

Args:
    project_id: Google Cloud project ID
    region: Dataproc region
    batch_id: Batch job ID

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
project_idYes
regionYes
batch_idYes

Output Schema

TableJSON Schema
NameRequiredDescriptionDefault
resultYes

Implementation Reference

  • MCP tool handler for 'get_batch_job'. Creates DataprocBatchClient and calls its get_batch_job method to retrieve and format batch job details.
    @mcp.tool()
    async def get_batch_job(project_id: str, region: str, batch_id: str) -> str:
        """Get details of a specific batch job.
    
        Args:
            project_id: Google Cloud project ID
            region: Dataproc region
            batch_id: Batch job ID
        """
        batch_client = DataprocBatchClient()
        try:
            result = await batch_client.get_batch_job(project_id, region, batch_id)
            return str(result)
        except Exception as e:
            logger.error("Failed to get batch job", error=str(e))
            return f"Error: {str(e)}"
  • Core helper method in DataprocBatchClient that calls the Dataproc BatchController API to get batch details and constructs a comprehensive dictionary with job config, runtime info, environment config, state history, etc.
    async def get_batch_job(
        self, project_id: str, region: str, batch_id: str
    ) -> dict[str, Any]:
        """Get details of a specific batch job."""
        try:
            loop = asyncio.get_event_loop()
            client = self._get_batch_client(region)
    
            request = types.GetBatchRequest(
                name=f"projects/{project_id}/locations/{region}/batches/{batch_id}"
            )
    
            batch = await loop.run_in_executor(None, client.get_batch, request)
    
            # Extract runtime info if available
            runtime_info = {}
            if batch.runtime_info:
                runtime_info = {
                    "endpoints": dict(batch.runtime_info.endpoints)
                    if batch.runtime_info.endpoints
                    else {},
                    "output_uri": batch.runtime_info.output_uri
                    if batch.runtime_info.output_uri
                    else None,
                    "diagnostic_output_uri": batch.runtime_info.diagnostic_output_uri
                    if batch.runtime_info.diagnostic_output_uri
                    else None,
                }
    
                # Add usage information if available
                if batch.runtime_info.approximate_usage:
                    runtime_info["approximate_usage"] = {
                        "milli_dcu_seconds": str(
                            batch.runtime_info.approximate_usage.milli_dcu_seconds
                        ),
                        "shuffle_storage_gb_seconds": str(
                            batch.runtime_info.approximate_usage.shuffle_storage_gb_seconds
                        ),
                    }
    
                if batch.runtime_info.current_usage:
                    runtime_info["current_usage"] = {
                        "milli_dcu": str(batch.runtime_info.current_usage.milli_dcu),
                        "shuffle_storage_gb": str(
                            batch.runtime_info.current_usage.shuffle_storage_gb
                        ),
                    }
    
            # Extract job configuration details
            job_config: dict[str, Any] = {}
            job_type = self._get_batch_job_type(batch)
    
            if batch.spark_batch:
                job_config = {
                    "main_class": batch.spark_batch.main_class
                    if batch.spark_batch.main_class
                    else None,
                    "main_jar_file_uri": batch.spark_batch.main_jar_file_uri
                    if batch.spark_batch.main_jar_file_uri
                    else None,
                    "jar_file_uris": list(batch.spark_batch.jar_file_uris)
                    if batch.spark_batch.jar_file_uris
                    else [],
                    "file_uris": list(batch.spark_batch.file_uris)
                    if batch.spark_batch.file_uris
                    else [],
                    "archive_uris": list(batch.spark_batch.archive_uris)
                    if batch.spark_batch.archive_uris
                    else [],
                    "args": list(batch.spark_batch.args)
                    if batch.spark_batch.args
                    else [],
                }
            elif batch.pyspark_batch:
                job_config = {
                    "main_python_file_uri": batch.pyspark_batch.main_python_file_uri,
                    "python_file_uris": list(batch.pyspark_batch.python_file_uris)
                    if batch.pyspark_batch.python_file_uris
                    else [],
                    "jar_file_uris": list(batch.pyspark_batch.jar_file_uris)
                    if batch.pyspark_batch.jar_file_uris
                    else [],
                    "file_uris": list(batch.pyspark_batch.file_uris)
                    if batch.pyspark_batch.file_uris
                    else [],
                    "archive_uris": list(batch.pyspark_batch.archive_uris)
                    if batch.pyspark_batch.archive_uris
                    else [],
                    "args": list(batch.pyspark_batch.args)
                    if batch.pyspark_batch.args
                    else [],
                }
            elif batch.spark_sql_batch:
                job_config = {
                    "query_file_uri": batch.spark_sql_batch.query_file_uri,
                    "query_variables": dict(batch.spark_sql_batch.query_variables)
                    if batch.spark_sql_batch.query_variables
                    else {},
                    "jar_file_uris": list(batch.spark_sql_batch.jar_file_uris)
                    if batch.spark_sql_batch.jar_file_uris
                    else [],
                }
            elif batch.spark_r_batch:
                job_config = {
                    "main_r_file_uri": batch.spark_r_batch.main_r_file_uri,
                    "file_uris": list(batch.spark_r_batch.file_uris)
                    if batch.spark_r_batch.file_uris
                    else [],
                    "archive_uris": list(batch.spark_r_batch.archive_uris)
                    if batch.spark_r_batch.archive_uris
                    else [],
                    "args": list(batch.spark_r_batch.args)
                    if batch.spark_r_batch.args
                    else [],
                }
    
            # Extract runtime config details
            runtime_config = {}
            if batch.runtime_config:
                runtime_config = {
                    "version": batch.runtime_config.version
                    if batch.runtime_config.version
                    else None,
                    "container_image": batch.runtime_config.container_image
                    if batch.runtime_config.container_image
                    else None,
                    "properties": dict(batch.runtime_config.properties)
                    if batch.runtime_config.properties
                    else {},
                }
    
            # Extract environment config details
            environment_config: dict[str, Any] = {}
            if batch.environment_config:
                environment_config = {
                    "execution_config": {},
                    "peripherals_config": {},
                }
    
                if batch.environment_config.execution_config:
                    exec_config = batch.environment_config.execution_config
                    environment_config["execution_config"] = {
                        "service_account": exec_config.service_account
                        if exec_config.service_account
                        else None,
                        "network_uri": exec_config.network_uri
                        if exec_config.network_uri
                        else None,
                        "subnetwork_uri": exec_config.subnetwork_uri
                        if exec_config.subnetwork_uri
                        else None,
                        "network_tags": list(exec_config.network_tags)
                        if exec_config.network_tags
                        else [],
                        "kms_key": exec_config.kms_key if exec_config.kms_key else None,
                    }
    
                if batch.environment_config.peripherals_config:
                    periph_config = batch.environment_config.peripherals_config
                    environment_config["peripherals_config"] = {
                        "metastore_service": periph_config.metastore_service
                        if periph_config.metastore_service
                        else None,
                        "spark_history_server_config": {},
                    }
    
                    if periph_config.spark_history_server_config:
                        environment_config["peripherals_config"][
                            "spark_history_server_config"
                        ] = {
                            "dataproc_cluster": periph_config.spark_history_server_config.dataproc_cluster
                            if periph_config.spark_history_server_config.dataproc_cluster
                            else None,
                        }
    
            return {
                "name": batch.name,
                "batch_id": batch.name.split("/")[-1],
                "uuid": batch.uuid if batch.uuid else None,
                "state": batch.state.name,
                "state_message": batch.state_message,
                "state_time": batch.state_time.isoformat()
                if batch.state_time
                else None,
                "create_time": batch.create_time.isoformat()
                if batch.create_time
                else None,
                "creator": batch.creator if batch.creator else None,
                "labels": dict(batch.labels) if batch.labels else {},
                "job_type": job_type,
                "job_config": job_config,
                "runtime_config": runtime_config,
                "environment_config": environment_config,
                "runtime_info": runtime_info,
                "operation": batch.operation if batch.operation else None,
                "state_history": [
                    {
                        "state": state.state.name,
                        "state_message": state.state_message,
                        "state_start_time": state.state_start_time.isoformat()
                        if state.state_start_time
                        else None,
                    }
                    for state in batch.state_history
                ],
            }
    
        except Exception as e:
            logger.error("Failed to get batch job", error=str(e))
            raise
  • Helper function used by get_batch_job to determine the job type (spark, pyspark, etc.) from the batch object.
    def _get_batch_job_type(self, batch: types.Batch) -> str:
        """Extract job type from batch object."""
        if batch.spark_batch:
            return "spark"
        elif batch.pyspark_batch:
            return "pyspark"
        elif batch.spark_sql_batch:
            return "spark_sql"
        elif batch.spark_r_batch:
            return "spark_r"
        else:
            return "unknown"
Behavior2/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

No annotations are provided, so the description carries full burden. It states it 'gets details' which implies a read-only operation, but doesn't disclose behavioral traits like whether it requires authentication, rate limits, error conditions, or what 'details' include (e.g., status, configuration). For a tool with no annotations, this is a significant gap in transparency.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness5/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is appropriately sized and front-loaded: the first sentence states the purpose clearly, followed by a concise parameter list. Every sentence earns its place with no redundant information, making it efficient and well-structured.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness4/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Given the tool's moderate complexity (3 required parameters) and the presence of an output schema (which handles return values), the description is fairly complete. It covers the purpose and parameters adequately. However, with no annotations and some behavioral gaps (e.g., auth needs), it could be more comprehensive, but the output schema reduces the burden.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters4/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

The description adds parameter semantics beyond the schema, which has 0% description coverage. It explains that 'project_id' is a 'Google Cloud project ID', 'region' is a 'Dataproc region', and 'batch_id' is a 'Batch job ID', providing meaningful context not in the schema. However, it doesn't specify formats or constraints (e.g., region values), so it's not a full 5.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose4/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description clearly states the tool's purpose with 'Get details of a specific batch job' - a specific verb ('Get') and resource ('batch job'). It distinguishes from siblings like 'list_batch_jobs' (plural vs. specific) and 'get_job' (batch vs. generic job), though not explicitly. However, it doesn't fully differentiate from 'compare_batch_jobs' which might also retrieve details, so it's not a perfect 5.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines2/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

No guidance is provided on when to use this tool versus alternatives. It doesn't mention prerequisites, when to choose it over 'get_job' or 'list_batch_jobs', or any context-specific usage. The description is purely functional with no usage instructions.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/warrenzhu25/dataproc-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server