get_alert_event_stats
Analyze patterns across multiple alerts by grouping event data into time windows to identify related activities, common entities, and temporal patterns for incident investigation.
Instructions
Analyze patterns and relationships across multiple alerts by aggregating their event data into time-based groups.
For each time window (configurable from 1-60 minutes), the tool collects unique entities (IPs, emails, usernames, trace IDs) and alert metadata (IDs, rules, severities) to help identify related activities.
Results are ordered chronologically with the most recent first, helping analysts identify temporal patterns, common entities, and potential incident scope.
Returns: Dict containing: - success: Boolean indicating if the query was successful - status: Status of the query (e.g., "succeeded", "failed", "cancelled") - message: Error message if unsuccessful - results: List of query result rows - column_info: Dict containing column names and types - stats: Dict containing stats about the query - has_next_page: Boolean indicating if there are more results available - next_cursor: Cursor for fetching the next page of results, or null if no more pages
Permissions:{'all_of': ['Query Data Lake']}
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| alert_ids | Yes | List of alert IDs to analyze | |
| time_window | No | The time window in minutes to group distinct events by | |
| start_date | No | Optional start date in ISO-8601 format. Defaults to start of today UTC. | |
| end_date | No | Optional end date in ISO-8601 format. Defaults to end of today UTC. |
Implementation Reference
- The @mcp_tool decorated function implementing the get_alert_event_stats tool. Includes input schema via Annotated parameters, handler logic that constructs and executes a SQL query on correlation signals data, and calls the internal query_data_lake helper.@mcp_tool( annotations={ "permissions": all_perms(Permission.DATA_ANALYTICS_READ), "readOnlyHint": True, } ) async def get_alert_event_stats( alert_ids: Annotated[ List[str], Field( description="List of alert IDs to analyze", examples=[["alert-123", "alert-456", "alert-789"]], ), ], time_window: Annotated[ int, Field( description="The time window in minutes to group distinct events by", ge=1, le=60, default=30, ), ] = 30, start_date: Annotated[ str | None, Field( description="Optional start date in ISO-8601 format. Defaults to start of today UTC.", examples=["2024-03-20T00:00:00Z"], ), ] = None, end_date: Annotated[ str | None, Field( description="Optional end date in ISO-8601 format. Defaults to end of today UTC.", examples=["2024-03-20T00:00:00Z"], ), ] = None, ) -> Dict[str, Any]: """Analyze patterns and relationships across multiple alerts by aggregating their event data into time-based groups. For each time window (configurable from 1-60 minutes), the tool collects unique entities (IPs, emails, usernames, trace IDs) and alert metadata (IDs, rules, severities) to help identify related activities. Results are ordered chronologically with the most recent first, helping analysts identify temporal patterns, common entities, and potential incident scope. Returns: Dict containing: - success: Boolean indicating if the query was successful - status: Status of the query (e.g., "succeeded", "failed", "cancelled") - message: Error message if unsuccessful - results: List of query result rows - column_info: Dict containing column names and types - stats: Dict containing stats about the query - has_next_page: Boolean indicating if there are more results available - next_cursor: Cursor for fetching the next page of results, or null if no more pages """ if time_window not in [1, 5, 15, 30, 60]: raise ValueError("Time window must be 1, 5, 15, 30, or 60") # Get default date range if not provided if not start_date or not end_date: default_start, default_end = _get_today_date_range() start_date = start_date or default_start end_date = end_date or default_end # Convert alert IDs list to SQL array alert_ids_str = ", ".join(f"'{aid}'" for aid in alert_ids) # Use the date strings directly (already in GraphQL format) start_date_str = start_date end_date_str = end_date query = f""" SELECT DATE_TRUNC('DAY', cs.p_event_time) AS event_day, DATE_TRUNC('MINUTE', DATEADD('MINUTE', {time_window} * FLOOR(EXTRACT(MINUTE FROM cs.p_event_time) / {time_window}), DATE_TRUNC('HOUR', cs.p_event_time))) AS time_{time_window}_minute, cs.p_log_type, cs.p_any_ip_addresses AS source_ips, cs.p_any_emails AS emails, cs.p_any_usernames AS usernames, cs.p_any_trace_ids AS trace_ids, COUNT(DISTINCT cs.p_alert_id) AS alert_count, ARRAY_AGG(DISTINCT cs.p_alert_id) AS alert_ids, ARRAY_AGG(DISTINCT cs.p_rule_id) AS rule_ids, MIN(cs.p_event_time) AS first_event, MAX(cs.p_event_time) AS last_event, ARRAY_AGG(DISTINCT cs.p_alert_severity) AS severities FROM panther_signals.public.correlation_signals cs WHERE cs.p_alert_id IN ({alert_ids_str}) AND cs.p_event_time BETWEEN '{start_date_str}' AND '{end_date_str}' GROUP BY event_day, time_{time_window}_minute, cs.p_log_type, cs.p_any_ip_addresses, cs.p_any_emails, cs.p_any_usernames, cs.p_any_trace_ids HAVING COUNT(DISTINCT cs.p_alert_id) > 0 ORDER BY event_day DESC, time_{time_window}_minute DESC, alert_count DESC """ return await query_data_lake(query, "panther_signals.public", max_rows=100)
- Input schema defined using Annotated with Pydantic Field for parameters: alert_ids (List[str]), time_window (int, 1-60 default 30), start_date/end_date (optional str ISO-8601). Output: Dict[str, Any].async def get_alert_event_stats( alert_ids: Annotated[ List[str], Field( description="List of alert IDs to analyze", examples=[["alert-123", "alert-456", "alert-789"]], ), ], time_window: Annotated[ int, Field( description="The time window in minutes to group distinct events by", ge=1, le=60, default=30, ), ] = 30, start_date: Annotated[ str | None, Field( description="Optional start date in ISO-8601 format. Defaults to start of today UTC.", examples=["2024-03-20T00:00:00Z"], ), ] = None, end_date: Annotated[ str | None, Field( description="Optional end date in ISO-8601 format. Defaults to end of today UTC.", examples=["2024-03-20T00:00:00Z"], ), ] = None, ) -> Dict[str, Any]:
- src/mcp_panther/panther_mcp_core/tools/data_lake.py:142-147 (registration)The tool is registered using the @mcp_tool decorator with permissions for DATA_ANALYTICS_READ and readOnlyHint.@mcp_tool( annotations={ "permissions": all_perms(Permission.DATA_ANALYTICS_READ), "readOnlyHint": True, } )