search_alerts_by_container
Search for alerts by container name across multiple Kubernetes clusters. Optionally filter by cluster to focus on specific environments.
Instructions
Search for alerts by container name across multiple clusters
Args: container_name: Name of the container to search for cluster_filter: Optional cluster name filter (e.g., 'teddy-prod', 'edge-prod'). If empty, searches all clusters.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| container_name | Yes | ||
| cluster_filter | No |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
| result | Yes |
Implementation Reference
- src/karma_mcp/server.py:958-1147 (handler)Main handler function for the search_alerts_by_container tool. Defined with @mcp.tool() decorator, accepts container_name and optional cluster_filter parameters. Fetches alerts from Karma API, searches for matching container labels, and returns formatted results grouped by cluster and alert name.
@mcp.tool() async def search_alerts_by_container( container_name: str, cluster_filter: str = "" ) -> str: """Search for alerts by container name across multiple clusters Args: container_name: Name of the container to search for cluster_filter: Optional cluster name filter (e.g., 'teddy-prod', 'edge-prod'). If empty, searches all clusters. """ try: async with httpx.AsyncClient() as client: response = await client.post( f"{KARMA_URL}/alerts.json", headers={"Content-Type": "application/json"}, json={}, ) if response.status_code == 200: data = response.json() matching_alerts = [] cluster_stats = {} grids = data.get("grids", []) for grid in grids: for group in grid.get("alertGroups", []): # Get group labels (contains alertname) group_labels_dict = {} for label in group.get("labels", []): group_labels_dict[label.get("name", "")] = label.get( "value", "" ) alertname = group_labels_dict.get("alertname", "unknown") for alert in group.get("alerts", []): # Convert alert labels to dict alert_labels_dict = {} for label in alert.get("labels", []): alert_labels_dict[label.get("name", "")] = label.get( "value", "" ) # Check if this alert has the container label we're looking for alert_container = alert_labels_dict.get("container", "") if container_name.lower() in alert_container.lower(): # Get cluster information alertmanagers = alert.get("alertmanager", []) for am in alertmanagers: cluster = am.get("cluster", "unknown") # Apply cluster filter if specified if ( cluster_filter and cluster_filter.lower() not in cluster.lower() ): continue # Track cluster stats if cluster not in cluster_stats: cluster_stats[cluster] = { "total": 0, "active": 0, "suppressed": 0, } cluster_stats[cluster]["total"] += 1 state = alert.get("state", "unknown").lower() if state in cluster_stats[cluster]: cluster_stats[cluster][state] += 1 matching_alerts.append( { "alert_name": alertname, "container": alert_container, "cluster": cluster, "state": alert.get("state", "unknown"), "severity": resolve_severity( group_labels_dict, alert_labels_dict ), "namespace": alert_labels_dict.get( "namespace", "N/A" ), "instance": alert_labels_dict.get( "instance", "N/A" ), "pod": alert_labels_dict.get("pod", "N/A"), "starts_at": alert.get("startsAt", "N/A"), "alertmanager_name": am.get("name", "N/A"), } ) break # Found in this cluster, no need to check other alertmanagers if not matching_alerts: filter_text = ( f" in cluster '{cluster_filter}'" if cluster_filter else " across all clusters" ) return ( f"No alerts found for container '{container_name}'{filter_text}" ) # Format output filter_text = ( f" in cluster '{cluster_filter}'" if cluster_filter else " (multi-cluster search)" ) result = f"Container Alert Search: '{container_name}'{filter_text}\n" result += "=" * 60 + "\n\n" # Cluster summary result += "📊 Cluster Summary:\n" for cluster, stats in sorted(cluster_stats.items()): result += f" {cluster}: {stats['total']} alerts " result += f"({stats.get('active', 0)} active, {stats.get('suppressed', 0)} suppressed)\n" result += "\n" # Group alerts by cluster and then by alert name clusters_alerts = {} for alert in matching_alerts: cluster = alert["cluster"] alert_name = alert["alert_name"] if cluster not in clusters_alerts: clusters_alerts[cluster] = {} if alert_name not in clusters_alerts[cluster]: clusters_alerts[cluster][alert_name] = [] clusters_alerts[cluster][alert_name].append(alert) # Display alerts grouped by cluster for cluster, alert_groups in sorted(clusters_alerts.items()): result += f"🏗️ Cluster: {cluster}\n" result += "-" * 40 + "\n" for alert_name, alerts in sorted(alert_groups.items()): # Count states for this alert type state_counts = {"active": 0, "suppressed": 0} for alert in alerts: state = alert["state"].lower() if state in state_counts: state_counts[state] += 1 state_emoji = "🔥" if state_counts["active"] > 0 else "🔕" result += f" {state_emoji} {alert_name} ({len(alerts)} instance{'s' if len(alerts) > 1 else ''})\n" result += f" Severity: {alerts[0]['severity']}\n" result += f" States: {state_counts['active']} active, {state_counts['suppressed']} suppressed\n" # Show container instances (limit to avoid clutter) containers_shown = set() for alert in alerts[:8]: # Limit to 8 instances container_info = ( f"{alert['container']} ({alert['namespace']})" ) if container_info not in containers_shown: state_icon = ( "🔥" if alert["state"].lower() == "active" else "🔕" ) result += f" {state_icon} Container: {alert['container']}\n" result += f" Namespace: {alert['namespace']}\n" if alert["pod"] != "N/A": result += f" Pod: {alert['pod']}\n" if alert["instance"] != "N/A": result += ( f" Instance: {alert['instance']}\n" ) result += "\n" containers_shown.add(container_info) if len(alerts) > 8: result += f" ... and {len(alerts) - len(containers_shown)} more instances\n" result += "\n" # Final summary total_alerts = len(matching_alerts) total_clusters = len(cluster_stats) result += f"📋 Total: {total_alerts} alert instance{'s' if total_alerts != 1 else ''} " result += f"across {total_clusters} cluster{'s' if total_clusters != 1 else ''}" return result else: return f"Error fetching alerts: code {response.status_code}" except Exception as e: return f"Error connecting to Karma: {str(e)}" - src/karma_mcp/server.py:958-960 (registration)Registered as an MCP tool via the @mcp.tool() decorator on line 958, which registers it with the FastMCP server as 'search_alerts_by_container'.
@mcp.tool() async def search_alerts_by_container( container_name: str, cluster_filter: str = "" - src/karma_mcp/http_server.py:407-424 (schema)Input schema definition for search_alerts_by_container in the MCP tools/list response. Defines container_name (required string) and cluster_filter (optional string with default '') parameters.
{ "name": "search_alerts_by_container", "description": "Search for alerts by container name across multiple clusters", "inputSchema": { "type": "object", "properties": { "container_name": { "type": "string", "description": "Name of the container to search for", }, "cluster_filter": { "type": "string", "description": "Optional cluster name filter. If empty, searches all clusters.", "default": "", }, }, "required": ["container_name"], }, - src/karma_mcp/http_server.py:66-69 (helper)Pydantic model ContainerSearchRequest used as the request schema for the /alerts/search/container REST endpoint that wraps search_alerts_by_container.
class ContainerSearchRequest(BaseModel): container_name: str cluster_filter: str | None = "" - src/karma_mcp/http_server.py:188-198 (registration)REST endpoint registration at POST /alerts/search/container that delegates to search_alerts_by_container. Also referenced in the /mcp/tools/{tool_name}, /mcp/sse (tools/call), and /mcp/execute endpoints as dispatch targets.
@app.post("/alerts/search/container", response_model=MCPResponse) async def search_container_alerts(request: ContainerSearchRequest): """Search for alerts by container name across multiple clusters""" try: result = await search_alerts_by_container( request.container_name, request.cluster_filter ) return MCPResponse(success=True, data=result) except Exception as e: logger.error(f"Error searching container alerts: {e}") return MCPResponse(success=False, data="", error=str(e))