Skip to main content
Glama
panther-labs

Panther MCP Server

Official

get_severity_alert_metrics

Analyze alert metrics by severity over time to identify security monitoring hotspots and trends in rule and policy alerts.

Instructions

Gets alert metrics grouped by severity for rule and policy alert types within a given time period. Use this tool to identify hot spots in your alerts, and use the list_alerts tool for specific details. Keep in mind that these metrics combine errors and alerts, so there may be inconsistencies from what list_alerts returns.

Returns: Dict: - alerts_per_severity: List of series with breakdown by severity - total_alerts: Total number of alerts in the period - start_date: Start date of the period - end_date: End date of the period - interval_in_minutes: Grouping interval for the metrics

Permissions:{'all_of': ['Read Panther Metrics']}

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
start_dateNoOptional start date in ISO-8601 format. If provided, defaults to the start of the current day UTC.
end_dateNoOptional end date in ISO-8601 format. If provided, defaults to the end of the current day UTC.
alert_typesNoThe specific Panther alert types to get metrics for.
severitiesNoThe specific Panther alert severities to get metrics for.
interval_in_minutesNoHow data points are aggregated over time, with smaller intervals providing more granular detail of when events occurred, while larger intervals show broader trends but obscure the precise timing of incidents.

Implementation Reference

  • The core handler implementation for the 'get_severity_alert_metrics' tool. This async function is decorated with @mcp_tool for automatic registration, defines the input schema using Pydantic Annotated types with validators and Field descriptions, executes a GraphQL query to fetch alert metrics grouped by severity, filters results based on provided alert_types and severities, handles default date ranges, and returns a dictionary with success status, filtered metrics, totals, and parameters.
    @mcp_tool(
        annotations={
            "permissions": all_perms(Permission.SUMMARY_READ),
            "readOnlyHint": True,
        }
    )
    async def get_severity_alert_metrics(
        start_date: Annotated[
            str | None,
            Field(
                description="Optional start date in ISO-8601 format. If provided, defaults to the start of the current day UTC.",
                examples=["2024-03-20T00:00:00Z"],
            ),
        ] = None,
        end_date: Annotated[
            str | None,
            Field(
                description="Optional end date in ISO-8601 format. If provided, defaults to the end of the current day UTC.",
                examples=["2024-03-20T00:00:00Z"],
            ),
        ] = None,
        alert_types: Annotated[
            list[str],
            BeforeValidator(_validate_alert_types),
            Field(
                description="The specific Panther alert types to get metrics for.",
                examples=[["Rule"], ["Policy"], ["Rule", "Policy"]],
            ),
        ] = ["Rule", "Policy"],
        severities: Annotated[
            list[str],
            BeforeValidator(_validate_severities),
            Field(
                description="The specific Panther alert severities to get metrics for.",
                examples=[
                    ["CRITICAL", "HIGH"],
                    ["MEDIUM", "LOW"],
                    ["CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO"],
                ],
            ),
        ] = ["CRITICAL", "HIGH", "MEDIUM", "LOW"],
        interval_in_minutes: Annotated[
            int,
            BeforeValidator(_validate_interval),
            Field(
                description="How data points are aggregated over time, with smaller intervals providing more granular detail of when events occurred, while larger intervals show broader trends but obscure the precise timing of incidents.",
                examples=[15, 30, 60, 180, 360, 720, 1440],
            ),
        ] = 1440,
    ) -> dict[str, Any]:
        """Gets alert metrics grouped by severity for rule and policy alert types within a given time period. Use this tool to identify hot spots in your alerts, and use the list_alerts tool for specific details. Keep in mind that these metrics combine errors and alerts, so there may be inconsistencies from what list_alerts returns.
    
        Returns:
            Dict:
            - alerts_per_severity: List of series with breakdown by severity
            - total_alerts: Total number of alerts in the period
            - start_date: Start date of the period
            - end_date: End date of the period
            - interval_in_minutes: Grouping interval for the metrics
        """
        try:
            # If start or end date is missing, use week's date range
            if not start_date or not end_date:
                default_start_date, default_end_date = _get_week_date_range()
                if not start_date:
                    start_date = default_start_date
                if not end_date:
                    end_date = default_end_date
    
            logger.info(
                f"Fetching alerts per severity metrics from {start_date} to {end_date}"
            )
    
            # Prepare variables for GraphQL query
            variables = {
                "input": {
                    "fromDate": start_date,
                    "toDate": end_date,
                    "intervalInMinutes": interval_in_minutes,
                }
            }
    
            # Execute GraphQL query
            result = await _execute_query(METRICS_ALERTS_PER_SEVERITY_QUERY, variables)
    
            if not result or "metrics" not in result:
                logger.error(f"Could not find key 'metrics' in result: {result}")
                raise Exception("Failed to fetch metrics data")
    
            metrics_data = result["metrics"]
    
            # Filter metrics data by alert types and severities
            alerts_per_severity = [
                item
                for item in metrics_data["alertsPerSeverity"]
                if any(alert_type in item["label"] for alert_type in alert_types)
                and any(severity in item["label"] for severity in severities)
            ]
    
            return {
                "success": True,
                "alerts_per_severity": alerts_per_severity,
                "total_alerts": metrics_data["totalAlerts"],
                "start_date": start_date,
                "end_date": end_date,
                "interval_in_minutes": interval_in_minutes,
            }
    
        except Exception as e:
            logger.error(f"Failed to fetch alerts per severity metrics: {str(e)}")
            return {
                "success": False,
                "message": f"Failed to fetch alerts per severity metrics: {str(e)}",
            }
  • The _validate_severities validator function used in the tool's input schema to validate severity parameters. Similar validators like _validate_alert_types are in this file, providing schema validation helpers.
    def _validate_severities(v: list[str]) -> list[str]:
        """Validate severities are valid."""
        valid_severities = {"CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO"}
        for severity in v:
            if severity not in valid_severities:
                raise ValueError(
                    f"Invalid severity '{severity}'. Must be one of: {', '.join(sorted(valid_severities))}"
                )
        return v
  • The GraphQL query METRICS_ALERTS_PER_SEVERITY_QUERY used by the handler to fetch raw metrics data from Panther.
                        logProcessingRole
                        logStreamType
                        logStreamTypeOptions {
                            jsonArrayEnvelopeField
                        }
                        managedBucketNotifications
                        s3Bucket
                        s3Prefix
                        s3PrefixLogTypes {
                            prefix
                            logTypes
                            excludedPrefixes
                        }
                        stackName
                    }
                }
            }
            pageInfo {
                hasNextPage
                hasPreviousPage
                startCursor
                endCursor
            }
        }
    }
    """)
    
    # Data Lake Queries
    EXECUTE_DATA_LAKE_QUERY = gql("""
    mutation ExecuteDataLakeQuery($input: ExecuteDataLakeQueryInput!) {
        executeDataLakeQuery(input: $input) {
            id
        }
    }
    """)
    
    GET_DATA_LAKE_QUERY = gql("""
    query GetDataLakeQuery($id: ID!, $root: Boolean = false, $resultsInput: DataLakeQueryResultsInput) {
        dataLakeQuery(id: $id, root: $root) {
            id
            status
            message
            sql
            startedAt
            completedAt
            results(input: $resultsInput) {
                edges {
                    node
                }
                pageInfo {
                    hasNextPage
                    endCursor
                }
                columnInfo {
                    order
                    types
                }
                stats {
                    bytesScanned
                    executionTime
                    rowCount
                }
            }
        }
    }
    """)
    
    LIST_DATABASES_QUERY = gql("""
    query ListDatabases {
        dataLakeDatabases {
            name
            description
        }
    }
    """)
    
    LIST_TABLES_QUERY = gql("""
    query ListTables($databaseName: String!, $pageSize: Int, $cursor: String) {
      dataLakeDatabaseTables(
        input: {
          databaseName: $databaseName
          pageSize: $pageSize
          cursor: $cursor
        }
      ) {
        edges {
          node {
            name
            description
            logType
          }
        }
        pageInfo {
          hasNextPage
          endCursor
        }
      }
    }
    """)
    
    GET_COLUMNS_FOR_TABLE_QUERY = gql("""
    query GetColumnDetails($databaseName: String!, $tableName: String!) {
      dataLakeDatabaseTable(input: { databaseName: $databaseName, tableName: $tableName }) {
        name,
        displayName,
        description,
        logType,
        columns {
          name,
          type,
          description
        }
      }
    }
    """)
    
    LIST_SCHEMAS_QUERY = gql("""
    query ListSchemas($input: SchemasInput!) {
        schemas(input: $input) {
            edges {
                node {
                    name
                    description
                    revision
                    isArchived
                    isManaged
                    referenceURL
                    createdAt
                    updatedAt
                }
            }
        }
    }
    """)
    
    CREATE_OR_UPDATE_SCHEMA_MUTATION = gql("""
    mutation CreateOrUpdateSchema($input: CreateOrUpdateSchemaInput!) {
        createOrUpdateSchema(input: $input) {
            schema {
                name
                description
                spec
                version
                revision
                isArchived
                isManaged
                isFieldDiscoveryEnabled
                referenceURL
                discoveredSpec
                createdAt
                updatedAt
            }
        }
    }
    """)
    
    # Metrics Queries
    METRICS_ALERTS_PER_SEVERITY_QUERY = gql("""
  • The call to register_all_tools(mcp) which registers all @mcp_tool decorated functions, including get_severity_alert_metrics, with the MCP server instance.
    register_all_tools(mcp)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/panther-labs/mcp-panther'

If you have feedback or need assistance with the MCP directory API, please join our Discord server