Skip to main content
Glama
driosalido
by driosalido

get_alert_details_multi_cluster

Retrieve detailed information about specific Kubernetes alerts across multiple clusters to monitor and analyze issues in your infrastructure.

Instructions

Get detailed information about a specific alert across multiple clusters

Args: alert_name: Name of the alert to search for (e.g., 'KubePodCrashLooping') cluster_filter: Optional cluster name filter. If empty, searches all clusters.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
alert_nameYes
cluster_filterNo

Implementation Reference

  • The main implementation of the get_alert_details_multi_cluster tool, which fetches alert data from the Karma API, filters by alert name and optional cluster, and formats the result.
    async def get_alert_details_multi_cluster(
        alert_name: str, cluster_filter: str = ""
    ) -> str:
        """Get detailed information about a specific alert across multiple clusters
    
        Args:
            alert_name: Name of the alert to search for (e.g., 'KubePodCrashLooping')
            cluster_filter: Optional cluster name filter. If empty, searches all clusters.
        """
        try:
            async with httpx.AsyncClient() as client:
                response = await client.post(
                    f"{KARMA_URL}/alerts.json",
                    headers={"Content-Type": "application/json"},
                    json={},
                )
    
                if response.status_code == 200:
                    data = response.json()
    
                    matching_alerts = []
                    cluster_stats = {}
                    grids = data.get("grids", [])
    
                    for grid in grids:
                        for group in grid.get("alertGroups", []):
                            # Get group labels (contains alertname)
                            group_labels_dict = {}
                            for label in group.get("labels", []):
                                group_labels_dict[label.get("name", "")] = label.get(
                                    "value", ""
                                )
    
                            # Check if this group matches the alert name
                            if (
                                group_labels_dict.get("alertname", "").lower()
                                == alert_name.lower()
                            ):
                                for alert in group.get("alerts", []):
                                    # Get alert labels
                                    alert_labels_dict = {}
                                    for label in alert.get("labels", []):
                                        alert_labels_dict[label.get("name", "")] = (
                                            label.get("value", "")
                                        )
    
                                    # Get cluster information
                                    alertmanagers = alert.get("alertmanager", [])
                                    for am in alertmanagers:
                                        cluster = am.get("cluster", "unknown")
    
                                        # Apply cluster filter if specified
                                        if (
                                            cluster_filter
                                            and cluster_filter.lower()
                                            not in cluster.lower()
                                        ):
                                            continue
    
                                        # Track cluster stats
                                        if cluster not in cluster_stats:
                                            cluster_stats[cluster] = {
                                                "total": 0,
                                                "active": 0,
                                                "suppressed": 0,
                                            }
    
                                        cluster_stats[cluster]["total"] += 1
                                        state = alert.get("state", "unknown").lower()
                                        if state in cluster_stats[cluster]:
                                            cluster_stats[cluster][state] += 1
    
                                        # Get annotations
                                        annotations_dict = {}
                                        for annotation in alert.get("annotations", []):
                                            annotations_dict[annotation.get("name", "")] = (
                                                annotation.get("value", "")
                                            )
    
                                        matching_alerts.append(
                                            {
                                                "alert_name": group_labels_dict.get(
                                                    "alertname", "unknown"
                                                ),
                                                "cluster": cluster,
                                                "state": alert.get("state", "unknown"),
                                                "severity": resolve_severity(
                                                    group_labels_dict, alert_labels_dict
                                                ),
                                                "namespace": alert_labels_dict.get(
                                                    "namespace", "N/A"
                                                ),
                                                "instance": alert_labels_dict.get(
                                                    "instance", "N/A"
                                                ),
                                                "pod": alert_labels_dict.get("pod", "N/A"),
                                                "container": alert_labels_dict.get(
                                                    "container", "N/A"
                                                ),
                                                "starts_at": alert.get("startsAt", "N/A"),
                                                "alertmanager_name": am.get("name", "N/A"),
                                                "annotations": annotations_dict,
                                                "labels": alert_labels_dict,
                                            }
                                        )
                                        break  # Found in this cluster
    
                    if not matching_alerts:
                        filter_text = (
                            f" in cluster '{cluster_filter}'"
                            if cluster_filter
                            else " across all clusters"
                        )
                        return f"No instances of alert '{alert_name}' found{filter_text}"
    
                    # Format output
                    filter_text = (
                        f" in cluster '{cluster_filter}'"
                        if cluster_filter
                        else " (multi-cluster search)"
                    )
                    result = f"Alert Details: '{alert_name}'{filter_text}\n"
                    result += "=" * 60 + "\n\n"
    
                    # Overall summary
                    result += "📊 Summary:\n"
                    result += f"   Alert Name: {alert_name}\n"
                    severity = (
                        matching_alerts[0]["severity"] if matching_alerts else "unknown"
                    )
                    result += f"   Severity: {severity}\n"
                    result += f"   Total Instances: {len(matching_alerts)}\n"
                    result += f"   Clusters Affected: {len(cluster_stats)}\n\n"
    
                    # Cluster breakdown
                    result += "📈 Cluster Breakdown:\n"
                    for cluster, stats in sorted(cluster_stats.items()):
                        result += f"   {cluster}: {stats['total']} instances "
                        result += f"({stats.get('active', 0)} active, {stats.get('suppressed', 0)} suppressed)\n"
                    result += "\n"
    
                    # Group alerts by cluster
                    clusters_alerts = {}
                    for alert in matching_alerts:
                        cluster = alert["cluster"]
                        if cluster not in clusters_alerts:
                            clusters_alerts[cluster] = []
                        clusters_alerts[cluster].append(alert)
    
                    # Display detailed information per cluster
                    for cluster, alerts in sorted(clusters_alerts.items()):
                        result += f"🏗️  Cluster: {cluster}\n"
                        result += "-" * 40 + "\n"
    
                        for i, alert in enumerate(
                            alerts[:10], 1
                        ):  # Limit to 10 per cluster
                            state_emoji = (
                                "🔥" if alert["state"].lower() == "active" else "🔕"
                            )
                            result += f"  {i}. {state_emoji} {alert['alert_name']}\n"
                            result += f"      State: {alert['state']}\n"
                            result += f"      Started: {alert['starts_at']}\n"
    
                            if alert["namespace"] != "N/A":
                                result += f"      Namespace: {alert['namespace']}\n"
                            if alert["instance"] != "N/A":
                                result += f"      Instance: {alert['instance']}\n"
                            if alert["pod"] != "N/A":
                                result += f"      Pod: {alert['pod']}\n"
                            if alert["container"] != "N/A":
                                result += f"      Container: {alert['container']}\n"
    
                            # Show important annotations
                            if "description" in alert["annotations"]:
                                desc = alert["annotations"]["description"]
                                if len(desc) > 150:
                                    desc = desc[:150] + "..."
                                result += f"      Description: {desc}\n"
                            if "summary" in alert["annotations"]:
                                result += (
                                    f"      Summary: {alert['annotations']['summary']}\n"
                                )
    
                            # Show key labels (limit to most important ones)
                            important_labels = [
                                "job",
                                "service",
                                "deployment",
                                "statefulset",
                            ]
                            shown_labels = []
                            for label in important_labels:
                                if label in alert["labels"]:
                                    shown_labels.append(f"{label}={alert['labels'][label]}")
                            if shown_labels:
                                result += f"      Labels: {', '.join(shown_labels)}\n"
    
                            result += "\n"
    
                        if len(alerts) > 10:
                            result += f"      ... and {len(alerts) - 10} more instances\n\n"
    
                    # Final summary
                    active_count = sum(
                        1 for a in matching_alerts if a["state"].lower() == "active"
                    )
                    suppressed_count = sum(
                        1 for a in matching_alerts if a["state"].lower() == "suppressed"
                    )
                    result += f"📋 Total: {len(matching_alerts)} instance{'s' if len(matching_alerts) != 1 else ''} "
                    result += f"({active_count} active, {suppressed_count} suppressed) "
                    result += f"across {len(cluster_stats)} cluster{'s' if len(cluster_stats) != 1 else ''}"
    
                    return result
                else:
                    return f"Error fetching alerts: code {response.status_code}"
    
        except Exception as e:
            return f"Error connecting to Karma: {str(e)}"

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/driosalido/mcp-karma'

If you have feedback or need assistance with the MCP directory API, please join our Discord server