Skip to main content
Glama
warrenzhu25

Dataproc MCP Server

by warrenzhu25

compare_batch_jobs

Analyze and identify differences between two Dataproc batch jobs to troubleshoot configurations, monitor changes, or optimize performance.

Instructions

Compare two Dataproc batch jobs and return detailed differences.

Args:
    batch_id_1: First batch job ID to compare
    batch_id_2: Second batch job ID to compare
    project_id: Google Cloud project ID (optional, uses gcloud config default)
    region: Dataproc region (optional, uses gcloud config default)

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
batch_id_1Yes
batch_id_2Yes
project_idNo
regionNo

Output Schema

TableJSON Schema
NameRequiredDescriptionDefault
resultYes

Implementation Reference

  • The MCP tool handler for 'compare_batch_jobs'. It resolves project_id and region from parameters or gcloud defaults, instantiates DataprocBatchClient, calls its compare_batches method, and returns the result as string or error message.
    @mcp.tool()
    async def compare_batch_jobs(
        batch_id_1: str,
        batch_id_2: str,
        project_id: str | None = None,
        region: str | None = None,
    ) -> str:
        """Compare two Dataproc batch jobs and return detailed differences.
    
        Args:
            batch_id_1: First batch job ID to compare
            batch_id_2: Second batch job ID to compare
            project_id: Google Cloud project ID (optional, uses gcloud config default)
            region: Dataproc region (optional, uses gcloud config default)
        """
        resolved = resolve_project_and_region(project_id, region)
        if isinstance(resolved, str):  # Error message
            return resolved
        project_id, region = resolved
    
        batch_client = DataprocBatchClient()
        try:
            result = await batch_client.compare_batches(
                project_id, region, batch_id_1, batch_id_2
            )
            return str(result)
        except Exception as e:
            logger.error("Failed to compare batch jobs", error=str(e))
            return f"Error: {str(e)}"
  • Core helper method in DataprocBatchClient that implements the batch job comparison logic. Fetches details for both batch_ids using get_batch_job, compares basic info, job/runtime/environment configs, labels, performance metrics (DCU, storage, execution time), state history, and returns a structured diff report with summary.
    async def compare_batches(
        self, project_id: str, region: str, batch_id_1: str, batch_id_2: str
    ) -> dict[str, Any]:
        """Compare two batch jobs and return detailed differences."""
        try:
            # Get details for both batches
            batch_1 = await self.get_batch_job(project_id, region, batch_id_1)
            batch_2 = await self.get_batch_job(project_id, region, batch_id_2)
    
            # Compare basic information
            basic_comparison = {
                "batch_id": {
                    "batch_1": batch_1["batch_id"],
                    "batch_2": batch_2["batch_id"],
                },
                "job_type": {
                    "batch_1": batch_1["job_type"],
                    "batch_2": batch_2["job_type"],
                    "same": batch_1["job_type"] == batch_2["job_type"],
                },
                "state": {
                    "batch_1": batch_1["state"],
                    "batch_2": batch_2["state"],
                    "same": batch_1["state"] == batch_2["state"],
                },
                "creator": {
                    "batch_1": batch_1.get("creator"),
                    "batch_2": batch_2.get("creator"),
                    "same": batch_1.get("creator") == batch_2.get("creator"),
                },
                "create_time": {
                    "batch_1": batch_1.get("create_time"),
                    "batch_2": batch_2.get("create_time"),
                },
            }
    
            # Compare job configurations
            config_comparison = {
                "same_config": batch_1["job_config"] == batch_2["job_config"],
                "batch_1_config": batch_1["job_config"],
                "batch_2_config": batch_2["job_config"],
            }
    
            # Compare runtime configurations
            runtime_comparison = {
                "same_runtime": batch_1["runtime_config"] == batch_2["runtime_config"],
                "batch_1_runtime": batch_1["runtime_config"],
                "batch_2_runtime": batch_2["runtime_config"],
            }
    
            # Compare environment configurations
            env_comparison = {
                "same_environment": batch_1["environment_config"]
                == batch_2["environment_config"],
                "batch_1_environment": batch_1["environment_config"],
                "batch_2_environment": batch_2["environment_config"],
            }
    
            # Compare labels
            labels_comparison = {
                "same_labels": batch_1["labels"] == batch_2["labels"],
                "batch_1_labels": batch_1["labels"],
                "batch_2_labels": batch_2["labels"],
            }
    
            # Compare performance/runtime info
            performance_comparison = {}
            runtime_1 = batch_1.get("runtime_info", {})
            runtime_2 = batch_2.get("runtime_info", {})
    
            if runtime_1.get("approximate_usage") and runtime_2.get(
                "approximate_usage"
            ):
                usage_1 = runtime_1["approximate_usage"]
                usage_2 = runtime_2["approximate_usage"]
                performance_comparison = {
                    "resource_usage": {
                        "batch_1_milli_dcu_seconds": usage_1.get("milli_dcu_seconds"),
                        "batch_2_milli_dcu_seconds": usage_2.get("milli_dcu_seconds"),
                        "batch_1_shuffle_storage_gb_seconds": usage_1.get(
                            "shuffle_storage_gb_seconds"
                        ),
                        "batch_2_shuffle_storage_gb_seconds": usage_2.get(
                            "shuffle_storage_gb_seconds"
                        ),
                    }
                }
    
            # Compare state history (execution timeline)
            history_comparison = {
                "batch_1_states": [
                    state["state"] for state in batch_1.get("state_history", [])
                ],
                "batch_2_states": [
                    state["state"] for state in batch_2.get("state_history", [])
                ],
                "same_state_progression": [
                    state["state"] for state in batch_1.get("state_history", [])
                ]
                == [state["state"] for state in batch_2.get("state_history", [])],
            }
    
            # Calculate execution duration if possible
            def calculate_duration(batch_data: dict[str, Any]) -> float | None:
                state_history = batch_data.get("state_history", [])
                if len(state_history) >= 2:
                    from datetime import datetime
    
                    try:
                        start_time = datetime.fromisoformat(
                            state_history[0]["state_start_time"].replace("Z", "+00:00")
                        )
                        end_time = datetime.fromisoformat(
                            state_history[-1]["state_start_time"].replace("Z", "+00:00")
                        )
                        return (end_time - start_time).total_seconds()
                    except (ValueError, TypeError):
                        return None
                return None
    
            duration_1 = calculate_duration(batch_1)
            duration_2 = calculate_duration(batch_2)
    
            if duration_1 is not None and duration_2 is not None:
                performance_comparison["execution_time"] = {
                    "batch_1_seconds": duration_1,
                    "batch_2_seconds": duration_2,
                    "difference_seconds": abs(duration_1 - duration_2),
                }
    
            # Summary of differences
            differences = []
            if not basic_comparison["job_type"]["same"]:
                differences.append("Different job types")
            if not basic_comparison["state"]["same"]:
                differences.append("Different current states")
            if not basic_comparison["creator"]["same"]:
                differences.append("Different creators")
            if not config_comparison["same_config"]:
                differences.append("Different job configurations")
            if not runtime_comparison["same_runtime"]:
                differences.append("Different runtime configurations")
            if not env_comparison["same_environment"]:
                differences.append("Different environment configurations")
            if not labels_comparison["same_labels"]:
                differences.append("Different labels")
            if not history_comparison["same_state_progression"]:
                differences.append("Different state progression")
    
            return {
                "comparison_summary": {
                    "batch_1_id": batch_id_1,
                    "batch_2_id": batch_id_2,
                    "identical": len(differences) == 0,
                    "differences": differences,
                },
                "basic_info": basic_comparison,
                "job_configuration": config_comparison,
                "runtime_configuration": runtime_comparison,
                "environment_configuration": env_comparison,
                "labels": labels_comparison,
                "performance": performance_comparison,
                "state_history": history_comparison,
            }
    
        except Exception as e:
            logger.error("Failed to compare batch jobs", error=str(e))
            raise
  • The @mcp.tool() decorator registers the compare_batch_jobs function as an MCP tool.
    @mcp.tool()
    async def compare_batch_jobs(
        batch_id_1: str,
        batch_id_2: str,
        project_id: str | None = None,
        region: str | None = None,
    ) -> str:
        """Compare two Dataproc batch jobs and return detailed differences.
    
        Args:
            batch_id_1: First batch job ID to compare
            batch_id_2: Second batch job ID to compare
            project_id: Google Cloud project ID (optional, uses gcloud config default)
            region: Dataproc region (optional, uses gcloud config default)
        """
        resolved = resolve_project_and_region(project_id, region)
        if isinstance(resolved, str):  # Error message
            return resolved
        project_id, region = resolved
    
        batch_client = DataprocBatchClient()
        try:
            result = await batch_client.compare_batches(
                project_id, region, batch_id_1, batch_id_2
            )
            return str(result)
        except Exception as e:
            logger.error("Failed to compare batch jobs", error=str(e))
            return f"Error: {str(e)}"
Behavior2/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

No annotations are provided, so the description carries the full burden of behavioral disclosure. It mentions the tool returns 'detailed differences' but does not specify what aspects are compared (e.g., configuration, status, metrics), whether it's a read-only operation, potential rate limits, or authentication needs. The description lacks critical behavioral traits beyond the basic function.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness5/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is appropriately sized and front-loaded: the first sentence states the core purpose, followed by a clear parameter list with brief explanations. Every sentence earns its place by adding value, with no redundant or verbose language, making it easy for an agent to parse quickly.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness4/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Given the tool's moderate complexity (4 parameters, 2 required) and the presence of an output schema (which handles return values), the description is largely complete. It covers the purpose and parameters well, but lacks behavioral context like comparison scope or error handling, which would be beneficial despite the output schema.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters4/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

The description adds significant meaning beyond the input schema, which has 0% description coverage. It explains that batch_id_1 and batch_id_2 are 'First' and 'Second' batch job IDs to compare, and clarifies that project_id and region are optional with default behaviors ('uses gcloud config default'). This compensates well for the schema's lack of descriptions, though it doesn't detail format constraints like ID patterns.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose5/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description clearly states the specific action ('Compare two Dataproc batch jobs') and the outcome ('return detailed differences'), distinguishing it from sibling tools like get_batch_job or list_batch_jobs which retrieve single or multiple jobs without comparison. The verb 'compare' is precise and the resource 'Dataproc batch jobs' is explicitly identified.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines3/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

The description implies usage when comparing two specific batch jobs, but does not explicitly state when to use this tool versus alternatives like get_batch_job for individual job details or list_batch_jobs for overviews. No guidance is provided on prerequisites, such as whether jobs must be in the same project/region, or exclusions for comparing jobs across different states.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/warrenzhu25/dataproc-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server