Skip to main content
Glama
warrenzhu25

Dataproc MCP Server

by warrenzhu25

compare_batch_jobs

Analyze and identify differences between two Dataproc batch jobs to troubleshoot configurations, monitor changes, or optimize performance.

Instructions

Compare two Dataproc batch jobs and return detailed differences.

Args:
    batch_id_1: First batch job ID to compare
    batch_id_2: Second batch job ID to compare
    project_id: Google Cloud project ID (optional, uses gcloud config default)
    region: Dataproc region (optional, uses gcloud config default)

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
batch_id_1Yes
batch_id_2Yes
project_idNo
regionNo

Implementation Reference

  • The MCP tool handler for 'compare_batch_jobs'. It resolves project_id and region from parameters or gcloud defaults, instantiates DataprocBatchClient, calls its compare_batches method, and returns the result as string or error message.
    @mcp.tool()
    async def compare_batch_jobs(
        batch_id_1: str,
        batch_id_2: str,
        project_id: str | None = None,
        region: str | None = None,
    ) -> str:
        """Compare two Dataproc batch jobs and return detailed differences.
    
        Args:
            batch_id_1: First batch job ID to compare
            batch_id_2: Second batch job ID to compare
            project_id: Google Cloud project ID (optional, uses gcloud config default)
            region: Dataproc region (optional, uses gcloud config default)
        """
        resolved = resolve_project_and_region(project_id, region)
        if isinstance(resolved, str):  # Error message
            return resolved
        project_id, region = resolved
    
        batch_client = DataprocBatchClient()
        try:
            result = await batch_client.compare_batches(
                project_id, region, batch_id_1, batch_id_2
            )
            return str(result)
        except Exception as e:
            logger.error("Failed to compare batch jobs", error=str(e))
            return f"Error: {str(e)}"
  • Core helper method in DataprocBatchClient that implements the batch job comparison logic. Fetches details for both batch_ids using get_batch_job, compares basic info, job/runtime/environment configs, labels, performance metrics (DCU, storage, execution time), state history, and returns a structured diff report with summary.
    async def compare_batches(
        self, project_id: str, region: str, batch_id_1: str, batch_id_2: str
    ) -> dict[str, Any]:
        """Compare two batch jobs and return detailed differences."""
        try:
            # Get details for both batches
            batch_1 = await self.get_batch_job(project_id, region, batch_id_1)
            batch_2 = await self.get_batch_job(project_id, region, batch_id_2)
    
            # Compare basic information
            basic_comparison = {
                "batch_id": {
                    "batch_1": batch_1["batch_id"],
                    "batch_2": batch_2["batch_id"],
                },
                "job_type": {
                    "batch_1": batch_1["job_type"],
                    "batch_2": batch_2["job_type"],
                    "same": batch_1["job_type"] == batch_2["job_type"],
                },
                "state": {
                    "batch_1": batch_1["state"],
                    "batch_2": batch_2["state"],
                    "same": batch_1["state"] == batch_2["state"],
                },
                "creator": {
                    "batch_1": batch_1.get("creator"),
                    "batch_2": batch_2.get("creator"),
                    "same": batch_1.get("creator") == batch_2.get("creator"),
                },
                "create_time": {
                    "batch_1": batch_1.get("create_time"),
                    "batch_2": batch_2.get("create_time"),
                },
            }
    
            # Compare job configurations
            config_comparison = {
                "same_config": batch_1["job_config"] == batch_2["job_config"],
                "batch_1_config": batch_1["job_config"],
                "batch_2_config": batch_2["job_config"],
            }
    
            # Compare runtime configurations
            runtime_comparison = {
                "same_runtime": batch_1["runtime_config"] == batch_2["runtime_config"],
                "batch_1_runtime": batch_1["runtime_config"],
                "batch_2_runtime": batch_2["runtime_config"],
            }
    
            # Compare environment configurations
            env_comparison = {
                "same_environment": batch_1["environment_config"]
                == batch_2["environment_config"],
                "batch_1_environment": batch_1["environment_config"],
                "batch_2_environment": batch_2["environment_config"],
            }
    
            # Compare labels
            labels_comparison = {
                "same_labels": batch_1["labels"] == batch_2["labels"],
                "batch_1_labels": batch_1["labels"],
                "batch_2_labels": batch_2["labels"],
            }
    
            # Compare performance/runtime info
            performance_comparison = {}
            runtime_1 = batch_1.get("runtime_info", {})
            runtime_2 = batch_2.get("runtime_info", {})
    
            if runtime_1.get("approximate_usage") and runtime_2.get(
                "approximate_usage"
            ):
                usage_1 = runtime_1["approximate_usage"]
                usage_2 = runtime_2["approximate_usage"]
                performance_comparison = {
                    "resource_usage": {
                        "batch_1_milli_dcu_seconds": usage_1.get("milli_dcu_seconds"),
                        "batch_2_milli_dcu_seconds": usage_2.get("milli_dcu_seconds"),
                        "batch_1_shuffle_storage_gb_seconds": usage_1.get(
                            "shuffle_storage_gb_seconds"
                        ),
                        "batch_2_shuffle_storage_gb_seconds": usage_2.get(
                            "shuffle_storage_gb_seconds"
                        ),
                    }
                }
    
            # Compare state history (execution timeline)
            history_comparison = {
                "batch_1_states": [
                    state["state"] for state in batch_1.get("state_history", [])
                ],
                "batch_2_states": [
                    state["state"] for state in batch_2.get("state_history", [])
                ],
                "same_state_progression": [
                    state["state"] for state in batch_1.get("state_history", [])
                ]
                == [state["state"] for state in batch_2.get("state_history", [])],
            }
    
            # Calculate execution duration if possible
            def calculate_duration(batch_data: dict[str, Any]) -> float | None:
                state_history = batch_data.get("state_history", [])
                if len(state_history) >= 2:
                    from datetime import datetime
    
                    try:
                        start_time = datetime.fromisoformat(
                            state_history[0]["state_start_time"].replace("Z", "+00:00")
                        )
                        end_time = datetime.fromisoformat(
                            state_history[-1]["state_start_time"].replace("Z", "+00:00")
                        )
                        return (end_time - start_time).total_seconds()
                    except (ValueError, TypeError):
                        return None
                return None
    
            duration_1 = calculate_duration(batch_1)
            duration_2 = calculate_duration(batch_2)
    
            if duration_1 is not None and duration_2 is not None:
                performance_comparison["execution_time"] = {
                    "batch_1_seconds": duration_1,
                    "batch_2_seconds": duration_2,
                    "difference_seconds": abs(duration_1 - duration_2),
                }
    
            # Summary of differences
            differences = []
            if not basic_comparison["job_type"]["same"]:
                differences.append("Different job types")
            if not basic_comparison["state"]["same"]:
                differences.append("Different current states")
            if not basic_comparison["creator"]["same"]:
                differences.append("Different creators")
            if not config_comparison["same_config"]:
                differences.append("Different job configurations")
            if not runtime_comparison["same_runtime"]:
                differences.append("Different runtime configurations")
            if not env_comparison["same_environment"]:
                differences.append("Different environment configurations")
            if not labels_comparison["same_labels"]:
                differences.append("Different labels")
            if not history_comparison["same_state_progression"]:
                differences.append("Different state progression")
    
            return {
                "comparison_summary": {
                    "batch_1_id": batch_id_1,
                    "batch_2_id": batch_id_2,
                    "identical": len(differences) == 0,
                    "differences": differences,
                },
                "basic_info": basic_comparison,
                "job_configuration": config_comparison,
                "runtime_configuration": runtime_comparison,
                "environment_configuration": env_comparison,
                "labels": labels_comparison,
                "performance": performance_comparison,
                "state_history": history_comparison,
            }
    
        except Exception as e:
            logger.error("Failed to compare batch jobs", error=str(e))
            raise
  • The @mcp.tool() decorator registers the compare_batch_jobs function as an MCP tool.
    @mcp.tool()
    async def compare_batch_jobs(
        batch_id_1: str,
        batch_id_2: str,
        project_id: str | None = None,
        region: str | None = None,
    ) -> str:
        """Compare two Dataproc batch jobs and return detailed differences.
    
        Args:
            batch_id_1: First batch job ID to compare
            batch_id_2: Second batch job ID to compare
            project_id: Google Cloud project ID (optional, uses gcloud config default)
            region: Dataproc region (optional, uses gcloud config default)
        """
        resolved = resolve_project_and_region(project_id, region)
        if isinstance(resolved, str):  # Error message
            return resolved
        project_id, region = resolved
    
        batch_client = DataprocBatchClient()
        try:
            result = await batch_client.compare_batches(
                project_id, region, batch_id_1, batch_id_2
            )
            return str(result)
        except Exception as e:
            logger.error("Failed to compare batch jobs", error=str(e))
            return f"Error: {str(e)}"

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/warrenzhu25/dataproc-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server