get_cluster_metrics
Retrieve CPU, memory, network, and disk metrics for Databricks clusters to monitor performance and resource utilization over time.
Instructions
Get cluster CPU/Memory/Network/Disk metrics
Data source: system.compute.node_timeline (one record per minute)
Args: cluster_id: Cluster ID start_time: Start time (ISO format), defaults to last 1 hour end_time: End time (ISO format), defaults to now limit: Max number of records to return, default 60 (1 hour)
Returns: Metrics time series and summary statistics
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| cluster_id | Yes | ||
| start_time | No | ||
| end_time | No | ||
| limit | No |
Implementation Reference
- tools/metrics.py:18-120 (handler)The function handles the retrieval and processing of cluster metrics by executing a SQL query against system tables and calculating summary statistics.
def get_cluster_metrics( ctx: Context, cluster_id: str, start_time: str = None, end_time: str = None, limit: int = 60 ) -> Dict[str, Any]: """ Get cluster CPU/Memory/Network/Disk metrics Data source: system.compute.node_timeline (one record per minute) Args: cluster_id: Cluster ID start_time: Start time (ISO format), defaults to last 1 hour end_time: End time (ISO format), defaults to now limit: Max number of records to return, default 60 (1 hour) Returns: Metrics time series and summary statistics """ # Validate cluster_id format if not CLUSTER_ID_PATTERN.match(cluster_id): raise ToolError("Invalid cluster_id format") time_condition = f"cluster_id = '{cluster_id}'" if start_time: if not DATETIME_PATTERN.match(start_time): raise ToolError("Invalid start_time format. Use ISO format: YYYY-MM-DDTHH:MM:SS") time_condition += f" AND start_time >= '{start_time}'" if end_time: if not DATETIME_PATTERN.match(end_time): raise ToolError("Invalid end_time format. Use ISO format: YYYY-MM-DDTHH:MM:SS") time_condition += f" AND end_time <= '{end_time}'" metrics_sql = f""" SELECT start_time, end_time, instance_id, driver, node_type, ROUND(cpu_user_percent, 2) as cpu_user_pct, ROUND(cpu_system_percent, 2) as cpu_system_pct, ROUND(cpu_wait_percent, 2) as cpu_wait_pct, ROUND(cpu_user_percent + cpu_system_percent, 2) as cpu_total_pct, ROUND(mem_used_percent, 2) as mem_used_pct, ROUND(mem_swap_percent, 2) as mem_swap_pct, network_sent_bytes, network_received_bytes, disk_free_bytes_per_mount_point FROM system.compute.node_timeline WHERE {time_condition} ORDER BY start_time DESC LIMIT {limit} """ ctx.info(f"Querying cluster {cluster_id} metrics...") metrics = execute_sql(ctx, metrics_sql) if not metrics: return { "cluster_id": cluster_id, "error": "No metrics data found, cluster may not be running or out of time range", "metrics": [], "summary": {} } cpu_totals = [float(m.get("cpu_total_pct", 0) or 0) for m in metrics] mem_used = [float(m.get("mem_used_pct", 0) or 0) for m in metrics] for m in metrics: m["time_local"] = utc_to_taipei(m.get("start_time")) m["start_time"] = str(m.get("start_time")) m["end_time"] = str(m.get("end_time")) summary = { "data_points": len(metrics), "time_range_local": { "start": utc_to_taipei(metrics[-1].get("start_time")) if metrics else None, "end": utc_to_taipei(metrics[0].get("end_time")) if metrics else None }, "cpu": { "avg_pct": round(sum(cpu_totals) / len(cpu_totals), 2) if cpu_totals else 0, "max_pct": round(max(cpu_totals), 2) if cpu_totals else 0, "min_pct": round(min(cpu_totals), 2) if cpu_totals else 0 }, "memory": { "avg_pct": round(sum(mem_used) / len(mem_used), 2) if mem_used else 0, "max_pct": round(max(mem_used), 2) if mem_used else 0, "min_pct": round(min(mem_used), 2) if mem_used else 0 }, "network": { "total_sent_gb": round(sum(int(m.get("network_sent_bytes", 0) or 0) for m in metrics) / 1024**3, 3), "total_received_gb": round(sum(int(m.get("network_received_bytes", 0) or 0) for m in metrics) / 1024**3, 3) } } return { "cluster_id": cluster_id, "metrics": metrics, "summary": summary }