search_alerts_by_container
Find alerts related to specific containers across Kubernetes clusters. Filter results by cluster name to monitor and analyze container-specific issues in your infrastructure.
Instructions
Search for alerts by container name across multiple clusters
Args: container_name: Name of the container to search for cluster_filter: Optional cluster name filter (e.g., 'teddy-prod', 'edge-prod'). If empty, searches all clusters.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| container_name | Yes | ||
| cluster_filter | No |
Implementation Reference
- src/karma_mcp/server.py:959-1147 (handler)Implementation of the `search_alerts_by_container` MCP tool, which queries the Karma API and filters alerts by container name and optional cluster.
async def search_alerts_by_container( container_name: str, cluster_filter: str = "" ) -> str: """Search for alerts by container name across multiple clusters Args: container_name: Name of the container to search for cluster_filter: Optional cluster name filter (e.g., 'teddy-prod', 'edge-prod'). If empty, searches all clusters. """ try: async with httpx.AsyncClient() as client: response = await client.post( f"{KARMA_URL}/alerts.json", headers={"Content-Type": "application/json"}, json={}, ) if response.status_code == 200: data = response.json() matching_alerts = [] cluster_stats = {} grids = data.get("grids", []) for grid in grids: for group in grid.get("alertGroups", []): # Get group labels (contains alertname) group_labels_dict = {} for label in group.get("labels", []): group_labels_dict[label.get("name", "")] = label.get( "value", "" ) alertname = group_labels_dict.get("alertname", "unknown") for alert in group.get("alerts", []): # Convert alert labels to dict alert_labels_dict = {} for label in alert.get("labels", []): alert_labels_dict[label.get("name", "")] = label.get( "value", "" ) # Check if this alert has the container label we're looking for alert_container = alert_labels_dict.get("container", "") if container_name.lower() in alert_container.lower(): # Get cluster information alertmanagers = alert.get("alertmanager", []) for am in alertmanagers: cluster = am.get("cluster", "unknown") # Apply cluster filter if specified if ( cluster_filter and cluster_filter.lower() not in cluster.lower() ): continue # Track cluster stats if cluster not in cluster_stats: cluster_stats[cluster] = { "total": 0, "active": 0, "suppressed": 0, } cluster_stats[cluster]["total"] += 1 state = alert.get("state", "unknown").lower() if state in cluster_stats[cluster]: cluster_stats[cluster][state] += 1 matching_alerts.append( { "alert_name": alertname, "container": alert_container, "cluster": cluster, "state": alert.get("state", "unknown"), "severity": resolve_severity( group_labels_dict, alert_labels_dict ), "namespace": alert_labels_dict.get( "namespace", "N/A" ), "instance": alert_labels_dict.get( "instance", "N/A" ), "pod": alert_labels_dict.get("pod", "N/A"), "starts_at": alert.get("startsAt", "N/A"), "alertmanager_name": am.get("name", "N/A"), } ) break # Found in this cluster, no need to check other alertmanagers if not matching_alerts: filter_text = ( f" in cluster '{cluster_filter}'" if cluster_filter else " across all clusters" ) return ( f"No alerts found for container '{container_name}'{filter_text}" ) # Format output filter_text = ( f" in cluster '{cluster_filter}'" if cluster_filter else " (multi-cluster search)" ) result = f"Container Alert Search: '{container_name}'{filter_text}\n" result += "=" * 60 + "\n\n" # Cluster summary result += "📊 Cluster Summary:\n" for cluster, stats in sorted(cluster_stats.items()): result += f" {cluster}: {stats['total']} alerts " result += f"({stats.get('active', 0)} active, {stats.get('suppressed', 0)} suppressed)\n" result += "\n" # Group alerts by cluster and then by alert name clusters_alerts = {} for alert in matching_alerts: cluster = alert["cluster"] alert_name = alert["alert_name"] if cluster not in clusters_alerts: clusters_alerts[cluster] = {} if alert_name not in clusters_alerts[cluster]: clusters_alerts[cluster][alert_name] = [] clusters_alerts[cluster][alert_name].append(alert) # Display alerts grouped by cluster for cluster, alert_groups in sorted(clusters_alerts.items()): result += f"🏗️ Cluster: {cluster}\n" result += "-" * 40 + "\n" for alert_name, alerts in sorted(alert_groups.items()): # Count states for this alert type state_counts = {"active": 0, "suppressed": 0} for alert in alerts: state = alert["state"].lower() if state in state_counts: state_counts[state] += 1 state_emoji = "🔥" if state_counts["active"] > 0 else "🔕" result += f" {state_emoji} {alert_name} ({len(alerts)} instance{'s' if len(alerts) > 1 else ''})\n" result += f" Severity: {alerts[0]['severity']}\n" result += f" States: {state_counts['active']} active, {state_counts['suppressed']} suppressed\n" # Show container instances (limit to avoid clutter) containers_shown = set() for alert in alerts[:8]: # Limit to 8 instances container_info = ( f"{alert['container']} ({alert['namespace']})" ) if container_info not in containers_shown: state_icon = ( "🔥" if alert["state"].lower() == "active" else "🔕" ) result += f" {state_icon} Container: {alert['container']}\n" result += f" Namespace: {alert['namespace']}\n" if alert["pod"] != "N/A": result += f" Pod: {alert['pod']}\n" if alert["instance"] != "N/A": result += ( f" Instance: {alert['instance']}\n" ) result += "\n" containers_shown.add(container_info) if len(alerts) > 8: result += f" ... and {len(alerts) - len(containers_shown)} more instances\n" result += "\n" # Final summary total_alerts = len(matching_alerts) total_clusters = len(cluster_stats) result += f"📋 Total: {total_alerts} alert instance{'s' if total_alerts != 1 else ''} " result += f"across {total_clusters} cluster{'s' if total_clusters != 1 else ''}" return result else: return f"Error fetching alerts: code {response.status_code}" except Exception as e: return f"Error connecting to Karma: {str(e)}"