get_alert_details_multi_cluster
Retrieve detailed information about specific Kubernetes alerts across multiple clusters to monitor and analyze issues in your infrastructure.
Instructions
Get detailed information about a specific alert across multiple clusters
Args: alert_name: Name of the alert to search for (e.g., 'KubePodCrashLooping') cluster_filter: Optional cluster name filter. If empty, searches all clusters.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| alert_name | Yes | ||
| cluster_filter | No |
Implementation Reference
- src/karma_mcp/server.py:736-956 (handler)The main implementation of the get_alert_details_multi_cluster tool, which fetches alert data from the Karma API, filters by alert name and optional cluster, and formats the result.
async def get_alert_details_multi_cluster( alert_name: str, cluster_filter: str = "" ) -> str: """Get detailed information about a specific alert across multiple clusters Args: alert_name: Name of the alert to search for (e.g., 'KubePodCrashLooping') cluster_filter: Optional cluster name filter. If empty, searches all clusters. """ try: async with httpx.AsyncClient() as client: response = await client.post( f"{KARMA_URL}/alerts.json", headers={"Content-Type": "application/json"}, json={}, ) if response.status_code == 200: data = response.json() matching_alerts = [] cluster_stats = {} grids = data.get("grids", []) for grid in grids: for group in grid.get("alertGroups", []): # Get group labels (contains alertname) group_labels_dict = {} for label in group.get("labels", []): group_labels_dict[label.get("name", "")] = label.get( "value", "" ) # Check if this group matches the alert name if ( group_labels_dict.get("alertname", "").lower() == alert_name.lower() ): for alert in group.get("alerts", []): # Get alert labels alert_labels_dict = {} for label in alert.get("labels", []): alert_labels_dict[label.get("name", "")] = ( label.get("value", "") ) # Get cluster information alertmanagers = alert.get("alertmanager", []) for am in alertmanagers: cluster = am.get("cluster", "unknown") # Apply cluster filter if specified if ( cluster_filter and cluster_filter.lower() not in cluster.lower() ): continue # Track cluster stats if cluster not in cluster_stats: cluster_stats[cluster] = { "total": 0, "active": 0, "suppressed": 0, } cluster_stats[cluster]["total"] += 1 state = alert.get("state", "unknown").lower() if state in cluster_stats[cluster]: cluster_stats[cluster][state] += 1 # Get annotations annotations_dict = {} for annotation in alert.get("annotations", []): annotations_dict[annotation.get("name", "")] = ( annotation.get("value", "") ) matching_alerts.append( { "alert_name": group_labels_dict.get( "alertname", "unknown" ), "cluster": cluster, "state": alert.get("state", "unknown"), "severity": resolve_severity( group_labels_dict, alert_labels_dict ), "namespace": alert_labels_dict.get( "namespace", "N/A" ), "instance": alert_labels_dict.get( "instance", "N/A" ), "pod": alert_labels_dict.get("pod", "N/A"), "container": alert_labels_dict.get( "container", "N/A" ), "starts_at": alert.get("startsAt", "N/A"), "alertmanager_name": am.get("name", "N/A"), "annotations": annotations_dict, "labels": alert_labels_dict, } ) break # Found in this cluster if not matching_alerts: filter_text = ( f" in cluster '{cluster_filter}'" if cluster_filter else " across all clusters" ) return f"No instances of alert '{alert_name}' found{filter_text}" # Format output filter_text = ( f" in cluster '{cluster_filter}'" if cluster_filter else " (multi-cluster search)" ) result = f"Alert Details: '{alert_name}'{filter_text}\n" result += "=" * 60 + "\n\n" # Overall summary result += "📊 Summary:\n" result += f" Alert Name: {alert_name}\n" severity = ( matching_alerts[0]["severity"] if matching_alerts else "unknown" ) result += f" Severity: {severity}\n" result += f" Total Instances: {len(matching_alerts)}\n" result += f" Clusters Affected: {len(cluster_stats)}\n\n" # Cluster breakdown result += "📈 Cluster Breakdown:\n" for cluster, stats in sorted(cluster_stats.items()): result += f" {cluster}: {stats['total']} instances " result += f"({stats.get('active', 0)} active, {stats.get('suppressed', 0)} suppressed)\n" result += "\n" # Group alerts by cluster clusters_alerts = {} for alert in matching_alerts: cluster = alert["cluster"] if cluster not in clusters_alerts: clusters_alerts[cluster] = [] clusters_alerts[cluster].append(alert) # Display detailed information per cluster for cluster, alerts in sorted(clusters_alerts.items()): result += f"🏗️ Cluster: {cluster}\n" result += "-" * 40 + "\n" for i, alert in enumerate( alerts[:10], 1 ): # Limit to 10 per cluster state_emoji = ( "🔥" if alert["state"].lower() == "active" else "🔕" ) result += f" {i}. {state_emoji} {alert['alert_name']}\n" result += f" State: {alert['state']}\n" result += f" Started: {alert['starts_at']}\n" if alert["namespace"] != "N/A": result += f" Namespace: {alert['namespace']}\n" if alert["instance"] != "N/A": result += f" Instance: {alert['instance']}\n" if alert["pod"] != "N/A": result += f" Pod: {alert['pod']}\n" if alert["container"] != "N/A": result += f" Container: {alert['container']}\n" # Show important annotations if "description" in alert["annotations"]: desc = alert["annotations"]["description"] if len(desc) > 150: desc = desc[:150] + "..." result += f" Description: {desc}\n" if "summary" in alert["annotations"]: result += ( f" Summary: {alert['annotations']['summary']}\n" ) # Show key labels (limit to most important ones) important_labels = [ "job", "service", "deployment", "statefulset", ] shown_labels = [] for label in important_labels: if label in alert["labels"]: shown_labels.append(f"{label}={alert['labels'][label]}") if shown_labels: result += f" Labels: {', '.join(shown_labels)}\n" result += "\n" if len(alerts) > 10: result += f" ... and {len(alerts) - 10} more instances\n\n" # Final summary active_count = sum( 1 for a in matching_alerts if a["state"].lower() == "active" ) suppressed_count = sum( 1 for a in matching_alerts if a["state"].lower() == "suppressed" ) result += f"📋 Total: {len(matching_alerts)} instance{'s' if len(matching_alerts) != 1 else ''} " result += f"({active_count} active, {suppressed_count} suppressed) " result += f"across {len(cluster_stats)} cluster{'s' if len(cluster_stats) != 1 else ''}" return result else: return f"Error fetching alerts: code {response.status_code}" except Exception as e: return f"Error connecting to Karma: {str(e)}"