get_severity_alert_metrics
Analyze alert metrics by severity over time to identify security monitoring hotspots and trends in rule and policy alerts.
Instructions
Gets alert metrics grouped by severity for rule and policy alert types within a given time period. Use this tool to identify hot spots in your alerts, and use the list_alerts tool for specific details. Keep in mind that these metrics combine errors and alerts, so there may be inconsistencies from what list_alerts returns.
Returns: Dict: - alerts_per_severity: List of series with breakdown by severity - total_alerts: Total number of alerts in the period - start_date: Start date of the period - end_date: End date of the period - interval_in_minutes: Grouping interval for the metrics
Permissions:{'all_of': ['Read Panther Metrics']}
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| start_date | No | Optional start date in ISO-8601 format. If provided, defaults to the start of the current day UTC. | |
| end_date | No | Optional end date in ISO-8601 format. If provided, defaults to the end of the current day UTC. | |
| alert_types | No | The specific Panther alert types to get metrics for. | |
| severities | No | The specific Panther alert severities to get metrics for. | |
| interval_in_minutes | No | How data points are aggregated over time, with smaller intervals providing more granular detail of when events occurred, while larger intervals show broader trends but obscure the precise timing of incidents. |
Implementation Reference
- The core handler implementation for the 'get_severity_alert_metrics' tool. This async function is decorated with @mcp_tool for automatic registration, defines the input schema using Pydantic Annotated types with validators and Field descriptions, executes a GraphQL query to fetch alert metrics grouped by severity, filters results based on provided alert_types and severities, handles default date ranges, and returns a dictionary with success status, filtered metrics, totals, and parameters.@mcp_tool( annotations={ "permissions": all_perms(Permission.SUMMARY_READ), "readOnlyHint": True, } ) async def get_severity_alert_metrics( start_date: Annotated[ str | None, Field( description="Optional start date in ISO-8601 format. If provided, defaults to the start of the current day UTC.", examples=["2024-03-20T00:00:00Z"], ), ] = None, end_date: Annotated[ str | None, Field( description="Optional end date in ISO-8601 format. If provided, defaults to the end of the current day UTC.", examples=["2024-03-20T00:00:00Z"], ), ] = None, alert_types: Annotated[ list[str], BeforeValidator(_validate_alert_types), Field( description="The specific Panther alert types to get metrics for.", examples=[["Rule"], ["Policy"], ["Rule", "Policy"]], ), ] = ["Rule", "Policy"], severities: Annotated[ list[str], BeforeValidator(_validate_severities), Field( description="The specific Panther alert severities to get metrics for.", examples=[ ["CRITICAL", "HIGH"], ["MEDIUM", "LOW"], ["CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO"], ], ), ] = ["CRITICAL", "HIGH", "MEDIUM", "LOW"], interval_in_minutes: Annotated[ int, BeforeValidator(_validate_interval), Field( description="How data points are aggregated over time, with smaller intervals providing more granular detail of when events occurred, while larger intervals show broader trends but obscure the precise timing of incidents.", examples=[15, 30, 60, 180, 360, 720, 1440], ), ] = 1440, ) -> dict[str, Any]: """Gets alert metrics grouped by severity for rule and policy alert types within a given time period. Use this tool to identify hot spots in your alerts, and use the list_alerts tool for specific details. Keep in mind that these metrics combine errors and alerts, so there may be inconsistencies from what list_alerts returns. Returns: Dict: - alerts_per_severity: List of series with breakdown by severity - total_alerts: Total number of alerts in the period - start_date: Start date of the period - end_date: End date of the period - interval_in_minutes: Grouping interval for the metrics """ try: # If start or end date is missing, use week's date range if not start_date or not end_date: default_start_date, default_end_date = _get_week_date_range() if not start_date: start_date = default_start_date if not end_date: end_date = default_end_date logger.info( f"Fetching alerts per severity metrics from {start_date} to {end_date}" ) # Prepare variables for GraphQL query variables = { "input": { "fromDate": start_date, "toDate": end_date, "intervalInMinutes": interval_in_minutes, } } # Execute GraphQL query result = await _execute_query(METRICS_ALERTS_PER_SEVERITY_QUERY, variables) if not result or "metrics" not in result: logger.error(f"Could not find key 'metrics' in result: {result}") raise Exception("Failed to fetch metrics data") metrics_data = result["metrics"] # Filter metrics data by alert types and severities alerts_per_severity = [ item for item in metrics_data["alertsPerSeverity"] if any(alert_type in item["label"] for alert_type in alert_types) and any(severity in item["label"] for severity in severities) ] return { "success": True, "alerts_per_severity": alerts_per_severity, "total_alerts": metrics_data["totalAlerts"], "start_date": start_date, "end_date": end_date, "interval_in_minutes": interval_in_minutes, } except Exception as e: logger.error(f"Failed to fetch alerts per severity metrics: {str(e)}") return { "success": False, "message": f"Failed to fetch alerts per severity metrics: {str(e)}", }
- The _validate_severities validator function used in the tool's input schema to validate severity parameters. Similar validators like _validate_alert_types are in this file, providing schema validation helpers.def _validate_severities(v: list[str]) -> list[str]: """Validate severities are valid.""" valid_severities = {"CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO"} for severity in v: if severity not in valid_severities: raise ValueError( f"Invalid severity '{severity}'. Must be one of: {', '.join(sorted(valid_severities))}" ) return v
- The GraphQL query METRICS_ALERTS_PER_SEVERITY_QUERY used by the handler to fetch raw metrics data from Panther.logProcessingRole logStreamType logStreamTypeOptions { jsonArrayEnvelopeField } managedBucketNotifications s3Bucket s3Prefix s3PrefixLogTypes { prefix logTypes excludedPrefixes } stackName } } } pageInfo { hasNextPage hasPreviousPage startCursor endCursor } } } """) # Data Lake Queries EXECUTE_DATA_LAKE_QUERY = gql(""" mutation ExecuteDataLakeQuery($input: ExecuteDataLakeQueryInput!) { executeDataLakeQuery(input: $input) { id } } """) GET_DATA_LAKE_QUERY = gql(""" query GetDataLakeQuery($id: ID!, $root: Boolean = false, $resultsInput: DataLakeQueryResultsInput) { dataLakeQuery(id: $id, root: $root) { id status message sql startedAt completedAt results(input: $resultsInput) { edges { node } pageInfo { hasNextPage endCursor } columnInfo { order types } stats { bytesScanned executionTime rowCount } } } } """) LIST_DATABASES_QUERY = gql(""" query ListDatabases { dataLakeDatabases { name description } } """) LIST_TABLES_QUERY = gql(""" query ListTables($databaseName: String!, $pageSize: Int, $cursor: String) { dataLakeDatabaseTables( input: { databaseName: $databaseName pageSize: $pageSize cursor: $cursor } ) { edges { node { name description logType } } pageInfo { hasNextPage endCursor } } } """) GET_COLUMNS_FOR_TABLE_QUERY = gql(""" query GetColumnDetails($databaseName: String!, $tableName: String!) { dataLakeDatabaseTable(input: { databaseName: $databaseName, tableName: $tableName }) { name, displayName, description, logType, columns { name, type, description } } } """) LIST_SCHEMAS_QUERY = gql(""" query ListSchemas($input: SchemasInput!) { schemas(input: $input) { edges { node { name description revision isArchived isManaged referenceURL createdAt updatedAt } } } } """) CREATE_OR_UPDATE_SCHEMA_MUTATION = gql(""" mutation CreateOrUpdateSchema($input: CreateOrUpdateSchemaInput!) { createOrUpdateSchema(input: $input) { schema { name description spec version revision isArchived isManaged isFieldDiscoveryEnabled referenceURL discoveredSpec createdAt updatedAt } } } """) # Metrics Queries METRICS_ALERTS_PER_SEVERITY_QUERY = gql("""
- src/mcp_panther/server.py:72-72 (registration)The call to register_all_tools(mcp) which registers all @mcp_tool decorated functions, including get_severity_alert_metrics, with the MCP server instance.register_all_tools(mcp)