Skip to main content
Glama
panther-labs

Panther MCP Server

Official

get_alert_event_stats

Analyze patterns across multiple alerts by grouping event data into time windows to identify related activities, common entities, and temporal patterns for incident investigation.

Instructions

Analyze patterns and relationships across multiple alerts by aggregating their event data into time-based groups.

For each time window (configurable from 1-60 minutes), the tool collects unique entities (IPs, emails, usernames, trace IDs) and alert metadata (IDs, rules, severities) to help identify related activities.

Results are ordered chronologically with the most recent first, helping analysts identify temporal patterns, common entities, and potential incident scope.

Returns: Dict containing: - success: Boolean indicating if the query was successful - status: Status of the query (e.g., "succeeded", "failed", "cancelled") - message: Error message if unsuccessful - results: List of query result rows - column_info: Dict containing column names and types - stats: Dict containing stats about the query - has_next_page: Boolean indicating if there are more results available - next_cursor: Cursor for fetching the next page of results, or null if no more pages

Permissions:{'all_of': ['Query Data Lake']}

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
alert_idsYesList of alert IDs to analyze
time_windowNoThe time window in minutes to group distinct events by
start_dateNoOptional start date in ISO-8601 format. Defaults to start of today UTC.
end_dateNoOptional end date in ISO-8601 format. Defaults to end of today UTC.

Implementation Reference

  • The @mcp_tool decorated function implementing the get_alert_event_stats tool. Includes input schema via Annotated parameters, handler logic that constructs and executes a SQL query on correlation signals data, and calls the internal query_data_lake helper.
    @mcp_tool( annotations={ "permissions": all_perms(Permission.DATA_ANALYTICS_READ), "readOnlyHint": True, } ) async def get_alert_event_stats( alert_ids: Annotated[ List[str], Field( description="List of alert IDs to analyze", examples=[["alert-123", "alert-456", "alert-789"]], ), ], time_window: Annotated[ int, Field( description="The time window in minutes to group distinct events by", ge=1, le=60, default=30, ), ] = 30, start_date: Annotated[ str | None, Field( description="Optional start date in ISO-8601 format. Defaults to start of today UTC.", examples=["2024-03-20T00:00:00Z"], ), ] = None, end_date: Annotated[ str | None, Field( description="Optional end date in ISO-8601 format. Defaults to end of today UTC.", examples=["2024-03-20T00:00:00Z"], ), ] = None, ) -> Dict[str, Any]: """Analyze patterns and relationships across multiple alerts by aggregating their event data into time-based groups. For each time window (configurable from 1-60 minutes), the tool collects unique entities (IPs, emails, usernames, trace IDs) and alert metadata (IDs, rules, severities) to help identify related activities. Results are ordered chronologically with the most recent first, helping analysts identify temporal patterns, common entities, and potential incident scope. Returns: Dict containing: - success: Boolean indicating if the query was successful - status: Status of the query (e.g., "succeeded", "failed", "cancelled") - message: Error message if unsuccessful - results: List of query result rows - column_info: Dict containing column names and types - stats: Dict containing stats about the query - has_next_page: Boolean indicating if there are more results available - next_cursor: Cursor for fetching the next page of results, or null if no more pages """ if time_window not in [1, 5, 15, 30, 60]: raise ValueError("Time window must be 1, 5, 15, 30, or 60") # Get default date range if not provided if not start_date or not end_date: default_start, default_end = _get_today_date_range() start_date = start_date or default_start end_date = end_date or default_end # Convert alert IDs list to SQL array alert_ids_str = ", ".join(f"'{aid}'" for aid in alert_ids) # Use the date strings directly (already in GraphQL format) start_date_str = start_date end_date_str = end_date query = f""" SELECT DATE_TRUNC('DAY', cs.p_event_time) AS event_day, DATE_TRUNC('MINUTE', DATEADD('MINUTE', {time_window} * FLOOR(EXTRACT(MINUTE FROM cs.p_event_time) / {time_window}), DATE_TRUNC('HOUR', cs.p_event_time))) AS time_{time_window}_minute, cs.p_log_type, cs.p_any_ip_addresses AS source_ips, cs.p_any_emails AS emails, cs.p_any_usernames AS usernames, cs.p_any_trace_ids AS trace_ids, COUNT(DISTINCT cs.p_alert_id) AS alert_count, ARRAY_AGG(DISTINCT cs.p_alert_id) AS alert_ids, ARRAY_AGG(DISTINCT cs.p_rule_id) AS rule_ids, MIN(cs.p_event_time) AS first_event, MAX(cs.p_event_time) AS last_event, ARRAY_AGG(DISTINCT cs.p_alert_severity) AS severities FROM panther_signals.public.correlation_signals cs WHERE cs.p_alert_id IN ({alert_ids_str}) AND cs.p_event_time BETWEEN '{start_date_str}' AND '{end_date_str}' GROUP BY event_day, time_{time_window}_minute, cs.p_log_type, cs.p_any_ip_addresses, cs.p_any_emails, cs.p_any_usernames, cs.p_any_trace_ids HAVING COUNT(DISTINCT cs.p_alert_id) > 0 ORDER BY event_day DESC, time_{time_window}_minute DESC, alert_count DESC """ return await query_data_lake(query, "panther_signals.public", max_rows=100)
  • Input schema defined using Annotated with Pydantic Field for parameters: alert_ids (List[str]), time_window (int, 1-60 default 30), start_date/end_date (optional str ISO-8601). Output: Dict[str, Any].
    async def get_alert_event_stats( alert_ids: Annotated[ List[str], Field( description="List of alert IDs to analyze", examples=[["alert-123", "alert-456", "alert-789"]], ), ], time_window: Annotated[ int, Field( description="The time window in minutes to group distinct events by", ge=1, le=60, default=30, ), ] = 30, start_date: Annotated[ str | None, Field( description="Optional start date in ISO-8601 format. Defaults to start of today UTC.", examples=["2024-03-20T00:00:00Z"], ), ] = None, end_date: Annotated[ str | None, Field( description="Optional end date in ISO-8601 format. Defaults to end of today UTC.", examples=["2024-03-20T00:00:00Z"], ), ] = None, ) -> Dict[str, Any]:
  • The tool is registered using the @mcp_tool decorator with permissions for DATA_ANALYTICS_READ and readOnlyHint.
    @mcp_tool( annotations={ "permissions": all_perms(Permission.DATA_ANALYTICS_READ), "readOnlyHint": True, } )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/panther-labs/mcp-panther'

If you have feedback or need assistance with the MCP directory API, please join our Discord server