Skip to main content
Glama

compare_batch_jobs

Analyze and identify differences between two Dataproc batch jobs to troubleshoot configurations, monitor changes, or optimize performance.

Instructions

Compare two Dataproc batch jobs and return detailed differences.

Args: batch_id_1: First batch job ID to compare batch_id_2: Second batch job ID to compare project_id: Google Cloud project ID (optional, uses gcloud config default) region: Dataproc region (optional, uses gcloud config default)

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
batch_id_1Yes
batch_id_2Yes
project_idNo
regionNo

Implementation Reference

  • The MCP tool handler for 'compare_batch_jobs'. It resolves project_id and region from parameters or gcloud defaults, instantiates DataprocBatchClient, calls its compare_batches method, and returns the result as string or error message.
    @mcp.tool() async def compare_batch_jobs( batch_id_1: str, batch_id_2: str, project_id: str | None = None, region: str | None = None, ) -> str: """Compare two Dataproc batch jobs and return detailed differences. Args: batch_id_1: First batch job ID to compare batch_id_2: Second batch job ID to compare project_id: Google Cloud project ID (optional, uses gcloud config default) region: Dataproc region (optional, uses gcloud config default) """ resolved = resolve_project_and_region(project_id, region) if isinstance(resolved, str): # Error message return resolved project_id, region = resolved batch_client = DataprocBatchClient() try: result = await batch_client.compare_batches( project_id, region, batch_id_1, batch_id_2 ) return str(result) except Exception as e: logger.error("Failed to compare batch jobs", error=str(e)) return f"Error: {str(e)}"
  • Core helper method in DataprocBatchClient that implements the batch job comparison logic. Fetches details for both batch_ids using get_batch_job, compares basic info, job/runtime/environment configs, labels, performance metrics (DCU, storage, execution time), state history, and returns a structured diff report with summary.
    async def compare_batches( self, project_id: str, region: str, batch_id_1: str, batch_id_2: str ) -> dict[str, Any]: """Compare two batch jobs and return detailed differences.""" try: # Get details for both batches batch_1 = await self.get_batch_job(project_id, region, batch_id_1) batch_2 = await self.get_batch_job(project_id, region, batch_id_2) # Compare basic information basic_comparison = { "batch_id": { "batch_1": batch_1["batch_id"], "batch_2": batch_2["batch_id"], }, "job_type": { "batch_1": batch_1["job_type"], "batch_2": batch_2["job_type"], "same": batch_1["job_type"] == batch_2["job_type"], }, "state": { "batch_1": batch_1["state"], "batch_2": batch_2["state"], "same": batch_1["state"] == batch_2["state"], }, "creator": { "batch_1": batch_1.get("creator"), "batch_2": batch_2.get("creator"), "same": batch_1.get("creator") == batch_2.get("creator"), }, "create_time": { "batch_1": batch_1.get("create_time"), "batch_2": batch_2.get("create_time"), }, } # Compare job configurations config_comparison = { "same_config": batch_1["job_config"] == batch_2["job_config"], "batch_1_config": batch_1["job_config"], "batch_2_config": batch_2["job_config"], } # Compare runtime configurations runtime_comparison = { "same_runtime": batch_1["runtime_config"] == batch_2["runtime_config"], "batch_1_runtime": batch_1["runtime_config"], "batch_2_runtime": batch_2["runtime_config"], } # Compare environment configurations env_comparison = { "same_environment": batch_1["environment_config"] == batch_2["environment_config"], "batch_1_environment": batch_1["environment_config"], "batch_2_environment": batch_2["environment_config"], } # Compare labels labels_comparison = { "same_labels": batch_1["labels"] == batch_2["labels"], "batch_1_labels": batch_1["labels"], "batch_2_labels": batch_2["labels"], } # Compare performance/runtime info performance_comparison = {} runtime_1 = batch_1.get("runtime_info", {}) runtime_2 = batch_2.get("runtime_info", {}) if runtime_1.get("approximate_usage") and runtime_2.get( "approximate_usage" ): usage_1 = runtime_1["approximate_usage"] usage_2 = runtime_2["approximate_usage"] performance_comparison = { "resource_usage": { "batch_1_milli_dcu_seconds": usage_1.get("milli_dcu_seconds"), "batch_2_milli_dcu_seconds": usage_2.get("milli_dcu_seconds"), "batch_1_shuffle_storage_gb_seconds": usage_1.get( "shuffle_storage_gb_seconds" ), "batch_2_shuffle_storage_gb_seconds": usage_2.get( "shuffle_storage_gb_seconds" ), } } # Compare state history (execution timeline) history_comparison = { "batch_1_states": [ state["state"] for state in batch_1.get("state_history", []) ], "batch_2_states": [ state["state"] for state in batch_2.get("state_history", []) ], "same_state_progression": [ state["state"] for state in batch_1.get("state_history", []) ] == [state["state"] for state in batch_2.get("state_history", [])], } # Calculate execution duration if possible def calculate_duration(batch_data: dict[str, Any]) -> float | None: state_history = batch_data.get("state_history", []) if len(state_history) >= 2: from datetime import datetime try: start_time = datetime.fromisoformat( state_history[0]["state_start_time"].replace("Z", "+00:00") ) end_time = datetime.fromisoformat( state_history[-1]["state_start_time"].replace("Z", "+00:00") ) return (end_time - start_time).total_seconds() except (ValueError, TypeError): return None return None duration_1 = calculate_duration(batch_1) duration_2 = calculate_duration(batch_2) if duration_1 is not None and duration_2 is not None: performance_comparison["execution_time"] = { "batch_1_seconds": duration_1, "batch_2_seconds": duration_2, "difference_seconds": abs(duration_1 - duration_2), } # Summary of differences differences = [] if not basic_comparison["job_type"]["same"]: differences.append("Different job types") if not basic_comparison["state"]["same"]: differences.append("Different current states") if not basic_comparison["creator"]["same"]: differences.append("Different creators") if not config_comparison["same_config"]: differences.append("Different job configurations") if not runtime_comparison["same_runtime"]: differences.append("Different runtime configurations") if not env_comparison["same_environment"]: differences.append("Different environment configurations") if not labels_comparison["same_labels"]: differences.append("Different labels") if not history_comparison["same_state_progression"]: differences.append("Different state progression") return { "comparison_summary": { "batch_1_id": batch_id_1, "batch_2_id": batch_id_2, "identical": len(differences) == 0, "differences": differences, }, "basic_info": basic_comparison, "job_configuration": config_comparison, "runtime_configuration": runtime_comparison, "environment_configuration": env_comparison, "labels": labels_comparison, "performance": performance_comparison, "state_history": history_comparison, } except Exception as e: logger.error("Failed to compare batch jobs", error=str(e)) raise
  • The @mcp.tool() decorator registers the compare_batch_jobs function as an MCP tool.
    @mcp.tool() async def compare_batch_jobs( batch_id_1: str, batch_id_2: str, project_id: str | None = None, region: str | None = None, ) -> str: """Compare two Dataproc batch jobs and return detailed differences. Args: batch_id_1: First batch job ID to compare batch_id_2: Second batch job ID to compare project_id: Google Cloud project ID (optional, uses gcloud config default) region: Dataproc region (optional, uses gcloud config default) """ resolved = resolve_project_and_region(project_id, region) if isinstance(resolved, str): # Error message return resolved project_id, region = resolved batch_client = DataprocBatchClient() try: result = await batch_client.compare_batches( project_id, region, batch_id_1, batch_id_2 ) return str(result) except Exception as e: logger.error("Failed to compare batch jobs", error=str(e)) return f"Error: {str(e)}"

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/warrenzhu25/dataproc-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server