Skip to main content
Glama
driosalido
by driosalido

search_alerts_by_container

Search for alerts by container name across multiple Kubernetes clusters. Optionally filter by cluster to focus on specific environments.

Instructions

Search for alerts by container name across multiple clusters

Args: container_name: Name of the container to search for cluster_filter: Optional cluster name filter (e.g., 'teddy-prod', 'edge-prod'). If empty, searches all clusters.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
container_nameYes
cluster_filterNo

Output Schema

TableJSON Schema
NameRequiredDescriptionDefault
resultYes

Implementation Reference

  • Main handler function for the search_alerts_by_container tool. Defined with @mcp.tool() decorator, accepts container_name and optional cluster_filter parameters. Fetches alerts from Karma API, searches for matching container labels, and returns formatted results grouped by cluster and alert name.
    @mcp.tool()
    async def search_alerts_by_container(
        container_name: str, cluster_filter: str = ""
    ) -> str:
        """Search for alerts by container name across multiple clusters
    
        Args:
            container_name: Name of the container to search for
            cluster_filter: Optional cluster name filter (e.g., 'teddy-prod', 'edge-prod'). If empty, searches all clusters.
        """
        try:
            async with httpx.AsyncClient() as client:
                response = await client.post(
                    f"{KARMA_URL}/alerts.json",
                    headers={"Content-Type": "application/json"},
                    json={},
                )
    
                if response.status_code == 200:
                    data = response.json()
    
                    matching_alerts = []
                    cluster_stats = {}
                    grids = data.get("grids", [])
    
                    for grid in grids:
                        for group in grid.get("alertGroups", []):
                            # Get group labels (contains alertname)
                            group_labels_dict = {}
                            for label in group.get("labels", []):
                                group_labels_dict[label.get("name", "")] = label.get(
                                    "value", ""
                                )
    
                            alertname = group_labels_dict.get("alertname", "unknown")
    
                            for alert in group.get("alerts", []):
                                # Convert alert labels to dict
                                alert_labels_dict = {}
                                for label in alert.get("labels", []):
                                    alert_labels_dict[label.get("name", "")] = label.get(
                                        "value", ""
                                    )
    
                                # Check if this alert has the container label we're looking for
                                alert_container = alert_labels_dict.get("container", "")
    
                                if container_name.lower() in alert_container.lower():
                                    # Get cluster information
                                    alertmanagers = alert.get("alertmanager", [])
                                    for am in alertmanagers:
                                        cluster = am.get("cluster", "unknown")
    
                                        # Apply cluster filter if specified
                                        if (
                                            cluster_filter
                                            and cluster_filter.lower()
                                            not in cluster.lower()
                                        ):
                                            continue
    
                                        # Track cluster stats
                                        if cluster not in cluster_stats:
                                            cluster_stats[cluster] = {
                                                "total": 0,
                                                "active": 0,
                                                "suppressed": 0,
                                            }
    
                                        cluster_stats[cluster]["total"] += 1
                                        state = alert.get("state", "unknown").lower()
                                        if state in cluster_stats[cluster]:
                                            cluster_stats[cluster][state] += 1
    
                                        matching_alerts.append(
                                            {
                                                "alert_name": alertname,
                                                "container": alert_container,
                                                "cluster": cluster,
                                                "state": alert.get("state", "unknown"),
                                                "severity": resolve_severity(
                                                    group_labels_dict, alert_labels_dict
                                                ),
                                                "namespace": alert_labels_dict.get(
                                                    "namespace", "N/A"
                                                ),
                                                "instance": alert_labels_dict.get(
                                                    "instance", "N/A"
                                                ),
                                                "pod": alert_labels_dict.get("pod", "N/A"),
                                                "starts_at": alert.get("startsAt", "N/A"),
                                                "alertmanager_name": am.get("name", "N/A"),
                                            }
                                        )
                                        break  # Found in this cluster, no need to check other alertmanagers
    
                    if not matching_alerts:
                        filter_text = (
                            f" in cluster '{cluster_filter}'"
                            if cluster_filter
                            else " across all clusters"
                        )
                        return (
                            f"No alerts found for container '{container_name}'{filter_text}"
                        )
    
                    # Format output
                    filter_text = (
                        f" in cluster '{cluster_filter}'"
                        if cluster_filter
                        else " (multi-cluster search)"
                    )
                    result = f"Container Alert Search: '{container_name}'{filter_text}\n"
                    result += "=" * 60 + "\n\n"
    
                    # Cluster summary
                    result += "📊 Cluster Summary:\n"
                    for cluster, stats in sorted(cluster_stats.items()):
                        result += f"   {cluster}: {stats['total']} alerts "
                        result += f"({stats.get('active', 0)} active, {stats.get('suppressed', 0)} suppressed)\n"
                    result += "\n"
    
                    # Group alerts by cluster and then by alert name
                    clusters_alerts = {}
                    for alert in matching_alerts:
                        cluster = alert["cluster"]
                        alert_name = alert["alert_name"]
    
                        if cluster not in clusters_alerts:
                            clusters_alerts[cluster] = {}
                        if alert_name not in clusters_alerts[cluster]:
                            clusters_alerts[cluster][alert_name] = []
                        clusters_alerts[cluster][alert_name].append(alert)
    
                    # Display alerts grouped by cluster
                    for cluster, alert_groups in sorted(clusters_alerts.items()):
                        result += f"🏗️  Cluster: {cluster}\n"
                        result += "-" * 40 + "\n"
    
                        for alert_name, alerts in sorted(alert_groups.items()):
                            # Count states for this alert type
                            state_counts = {"active": 0, "suppressed": 0}
                            for alert in alerts:
                                state = alert["state"].lower()
                                if state in state_counts:
                                    state_counts[state] += 1
    
                            state_emoji = "🔥" if state_counts["active"] > 0 else "🔕"
                            result += f"  {state_emoji} {alert_name} ({len(alerts)} instance{'s' if len(alerts) > 1 else ''})\n"
                            result += f"      Severity: {alerts[0]['severity']}\n"
                            result += f"      States: {state_counts['active']} active, {state_counts['suppressed']} suppressed\n"
    
                            # Show container instances (limit to avoid clutter)
                            containers_shown = set()
                            for alert in alerts[:8]:  # Limit to 8 instances
                                container_info = (
                                    f"{alert['container']} ({alert['namespace']})"
                                )
                                if container_info not in containers_shown:
                                    state_icon = (
                                        "🔥" if alert["state"].lower() == "active" else "🔕"
                                    )
                                    result += f"      {state_icon} Container: {alert['container']}\n"
                                    result += f"         Namespace: {alert['namespace']}\n"
                                    if alert["pod"] != "N/A":
                                        result += f"         Pod: {alert['pod']}\n"
                                    if alert["instance"] != "N/A":
                                        result += (
                                            f"         Instance: {alert['instance']}\n"
                                        )
                                    result += "\n"
                                    containers_shown.add(container_info)
    
                            if len(alerts) > 8:
                                result += f"      ... and {len(alerts) - len(containers_shown)} more instances\n"
    
                        result += "\n"
    
                    # Final summary
                    total_alerts = len(matching_alerts)
                    total_clusters = len(cluster_stats)
                    result += f"📋 Total: {total_alerts} alert instance{'s' if total_alerts != 1 else ''} "
                    result += f"across {total_clusters} cluster{'s' if total_clusters != 1 else ''}"
    
                    return result
                else:
                    return f"Error fetching alerts: code {response.status_code}"
    
        except Exception as e:
            return f"Error connecting to Karma: {str(e)}"
  • Registered as an MCP tool via the @mcp.tool() decorator on line 958, which registers it with the FastMCP server as 'search_alerts_by_container'.
    @mcp.tool()
    async def search_alerts_by_container(
        container_name: str, cluster_filter: str = ""
  • Input schema definition for search_alerts_by_container in the MCP tools/list response. Defines container_name (required string) and cluster_filter (optional string with default '') parameters.
    {
        "name": "search_alerts_by_container",
        "description": "Search for alerts by container name across multiple clusters",
        "inputSchema": {
            "type": "object",
            "properties": {
                "container_name": {
                    "type": "string",
                    "description": "Name of the container to search for",
                },
                "cluster_filter": {
                    "type": "string",
                    "description": "Optional cluster name filter. If empty, searches all clusters.",
                    "default": "",
                },
            },
            "required": ["container_name"],
        },
  • Pydantic model ContainerSearchRequest used as the request schema for the /alerts/search/container REST endpoint that wraps search_alerts_by_container.
    class ContainerSearchRequest(BaseModel):
        container_name: str
        cluster_filter: str | None = ""
  • REST endpoint registration at POST /alerts/search/container that delegates to search_alerts_by_container. Also referenced in the /mcp/tools/{tool_name}, /mcp/sse (tools/call), and /mcp/execute endpoints as dispatch targets.
    @app.post("/alerts/search/container", response_model=MCPResponse)
    async def search_container_alerts(request: ContainerSearchRequest):
        """Search for alerts by container name across multiple clusters"""
        try:
            result = await search_alerts_by_container(
                request.container_name, request.cluster_filter
            )
            return MCPResponse(success=True, data=result)
        except Exception as e:
            logger.error(f"Error searching container alerts: {e}")
            return MCPResponse(success=False, data="", error=str(e))
Behavior3/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

No annotations are provided, so the description must carry the burden. It states the tool searches across multiple clusters, suggests read-only behavior (search), but does not explicitly confirm read-only, nor mention side effects, rate limits, or authentication. Some context is given but not comprehensive.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness5/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is concise and well-structured as a docstring, with a brief summary sentence followed by parameter details. It uses minimal words and front-loads the purpose, with no irrelevant information.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness4/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

For a simple 2-parameter tool, the description covers the core behavior and parameter roles. An output schema exists, so return values are not required. However, it lacks details on pagination or result limits, which could be relevant for a search tool. Overall, mostly complete.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters4/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

Schema coverage is 0%, meaning the schema only provides names and types. The description adds clear explanations for both parameters: container_name as 'Name of the container to search for' and cluster_filter as 'Optional cluster name filter...'. This significantly adds meaning beyond the schema.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose5/5

Does the description clearly state what the tool does and how it differs from similar tools?

Description clearly states 'Search for alerts by container name across multiple clusters.' It specifies the verb 'Search', the resource 'alerts', and the method 'by container name', which distinguishes it from sibling tools like 'list_alerts' or 'get_alerts_by_state'.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines2/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

The description does not provide explicit guidance on when to use this tool versus alternatives. It only describes the function and parameters, with a note on cluster_filter being optional, but lacks when-not or alternative tool references.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/driosalido/mcp-karma'

If you have feedback or need assistance with the MCP directory API, please join our Discord server