query_data_lake
Execute SQL queries on Panther's security data lake to analyze logs, investigate threats, and perform security monitoring with required time filters for optimal performance.
Instructions
Query Panther's security data lake using SQL for log analysis and threat hunting.
REQUIRED: Include time filter with p_event_time (required for performance and partitioning)
Panther Time Filter Macros (Recommended - optimized for performance):
p_occurs_since(timeOffset [, tableAlias[, column]]) Examples: p_occurs_since('1 d'), p_occurs_since('6 h'), p_occurs_since('2 weeks'), p_occurs_since(3600) Time formats: '30 s', '15 m', '6 h', '2 d', '1 w' OR '2 weeks', '1 day' OR numeric seconds
p_occurs_between(startTime, endTime [, tableAlias[, column]]) Examples: p_occurs_between('2024-01-01', '2024-01-02'), p_occurs_between('2024-03-20T00:00:00Z', '2024-03-20T23:59:59Z')
p_occurs_around(timestamp, timeOffset [, tableAlias[, column]]) Example: p_occurs_around('2024-01-15T10:30:00Z', '1 h') # ±1 hour around timestamp
p_occurs_after(timestamp [, tableAlias[, column]])
p_occurs_before(timestamp [, tableAlias[, column]])
Alternative (manual): WHERE p_event_time >= '2024-01-01' AND p_event_time < '2024-01-02'
Best Practices:
Always use time filters (macros preferred over manual p_event_time conditions)
Start with summary queries, then drill down to specific timeframes
Use p_any_* fields for faster correlation (p_any_ip_addresses, p_any_usernames, p_any_emails)
Query specific fields instead of SELECT * for better performance
Pagination:
First call: No cursor parameter - returns first page with max_rows results
Subsequent calls: Use next_cursor from previous response to get next page
Continue until has_next_page is False
Common Examples:
Recent failed logins: "SELECT * FROM panther_logs.public.aws_cloudtrail WHERE p_occurs_since('1 d') AND errorcode IS NOT NULL"
IP activity summary: "SELECT sourceippaddress, COUNT(*) FROM panther_logs.public.aws_cloudtrail WHERE p_occurs_since('6 h') GROUP BY sourceippaddress"
User correlation: "SELECT * FROM panther_logs.public.aws_cloudtrail WHERE p_occurs_since('2 h') AND ARRAY_CONTAINS('user@domain.com'::VARIANT, p_any_emails)"
Nested field access: "SELECT p_enrichment:ipinfo_privacy:"context.ip_address" FROM table WHERE p_occurs_since('1 h')"
Query Syntax (Snowflake SQL):
Access nested JSON: column:field.subfield
Quote special characters: column:"field name" or p_enrichment:"context.ip_address"
Array searches: ARRAY_CONTAINS('value'::VARIANT, array_column)
Returns: Dict with query results: - results: List of matching rows (paginated based on cursor parameter) - results_truncated: True if results were truncated (only for non-paginated requests) - total_rows_available: Total rows found (for non-paginated requests) - has_next_page: True if more results are available - next_cursor: Cursor for next page (use in subsequent call) - column_info: Column names and data types - stats: Query performance metrics (execution time, bytes scanned) - success/status/message: Query execution status
Permissions:{'all_of': ['Query Data Lake']}
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| sql | Yes | The SQL query to execute. Must include a p_event_time filter condition after WHERE or AND. The query must be compatible with Snowflake SQL. | |
| database_name | No | panther_logs.public | |
| timeout | No | Timeout in seconds before the SQL query is cancelled. If the query fails due to timeout, the caller should consider a longer timeout. | |
| max_rows | No | Maximum number of result rows to return (prevents context overflow) | |
| cursor | No | Optional pagination cursor from previous query to fetch next page of results |
Implementation Reference
- The primary handler function for the 'query_data_lake' tool. It validates the SQL query for time filters, processes reserved words, executes the query via GraphQL mutation, polls for completion, supports pagination with cursor and max_rows, handles timeouts and cancellations, and returns structured results including metadata and pagination info.annotations={ "permissions": all_perms(Permission.DATA_ANALYTICS_READ), "readOnlyHint": True, } ) async def query_data_lake( sql: Annotated[ str, Field( description="The SQL query to execute. Must include a p_event_time filter condition after WHERE or AND. The query must be compatible with Snowflake SQL." ), ], database_name: str = "panther_logs.public", timeout: Annotated[ int, Field( description="Timeout in seconds before the SQL query is cancelled. If the query fails due to timeout, the caller should consider a longer timeout." ), ] = 30, max_rows: Annotated[ int, Field( description="Maximum number of result rows to return (prevents context overflow)", ge=1, le=999, ), ] = 100, cursor: Annotated[ str | None, Field( description="Optional pagination cursor from previous query to fetch next page of results", ), ] = None, ) -> Dict[str, Any]: """Query Panther's security data lake using SQL for log analysis and threat hunting. REQUIRED: Include time filter with p_event_time (required for performance and partitioning) Panther Time Filter Macros (Recommended - optimized for performance): - p_occurs_since(timeOffset [, tableAlias[, column]]) Examples: p_occurs_since('1 d'), p_occurs_since('6 h'), p_occurs_since('2 weeks'), p_occurs_since(3600) Time formats: '30 s', '15 m', '6 h', '2 d', '1 w' OR '2 weeks', '1 day' OR numeric seconds - p_occurs_between(startTime, endTime [, tableAlias[, column]]) Examples: p_occurs_between('2024-01-01', '2024-01-02'), p_occurs_between('2024-03-20T00:00:00Z', '2024-03-20T23:59:59Z') - p_occurs_around(timestamp, timeOffset [, tableAlias[, column]]) Example: p_occurs_around('2024-01-15T10:30:00Z', '1 h') # ±1 hour around timestamp - p_occurs_after(timestamp [, tableAlias[, column]]) - p_occurs_before(timestamp [, tableAlias[, column]]) Alternative (manual): WHERE p_event_time >= '2024-01-01' AND p_event_time < '2024-01-02' Best Practices: - Always use time filters (macros preferred over manual p_event_time conditions) - Start with summary queries, then drill down to specific timeframes - Use p_any_* fields for faster correlation (p_any_ip_addresses, p_any_usernames, p_any_emails) - Query specific fields instead of SELECT * for better performance Pagination: - First call: No cursor parameter - returns first page with max_rows results - Subsequent calls: Use next_cursor from previous response to get next page - Continue until has_next_page is False Common Examples: - Recent failed logins: "SELECT * FROM panther_logs.public.aws_cloudtrail WHERE p_occurs_since('1 d') AND errorcode IS NOT NULL" - IP activity summary: "SELECT sourceippaddress, COUNT(*) FROM panther_logs.public.aws_cloudtrail WHERE p_occurs_since('6 h') GROUP BY sourceippaddress" - User correlation: "SELECT * FROM panther_logs.public.aws_cloudtrail WHERE p_occurs_since('2 h') AND ARRAY_CONTAINS('user@domain.com'::VARIANT, p_any_emails)" - Nested field access: "SELECT p_enrichment:ipinfo_privacy:\"context.ip_address\" FROM table WHERE p_occurs_since('1 h')" Query Syntax (Snowflake SQL): - Access nested JSON: column:field.subfield - Quote special characters: column:"field name" or p_enrichment:"context.ip_address" - Array searches: ARRAY_CONTAINS('value'::VARIANT, array_column) Returns: Dict with query results: - results: List of matching rows (paginated based on cursor parameter) - results_truncated: True if results were truncated (only for non-paginated requests) - total_rows_available: Total rows found (for non-paginated requests) - has_next_page: True if more results are available - next_cursor: Cursor for next page (use in subsequent call) - column_info: Column names and data types - stats: Query performance metrics (execution time, bytes scanned) - success/status/message: Query execution status """ logger.info("Executing data lake query") start_time = time.time() # Validate that the query includes a time filter (p_event_time or Panther macros) sql_lower = sql.lower().replace("\n", " ") has_p_event_time = re.search( r"\b(where|and)\s+.*?(?:[\w.]+\.)?p_event_time\s*(>=|<=|=|>|<|between)", sql_lower, ) has_panther_macros = re.search( r"p_occurs_(since|between|around|after|before)\s*\(", sql_lower, ) if (not (has_p_event_time or has_panther_macros)) and re.search( r"\Wpanther_(views|signals|rule_matches|rule_errors|monitor|logs|cloudsecurity)\.", sql_lower, ): error_msg = "Query must include a time filter: either `p_event_time` condition or Panther macro (p_occurs_since, p_occurs_between, etc.)" logger.error(error_msg) return { "success": False, "message": error_msg, "query_id": None, } try: # Process reserved words in the SQL processed_sql = wrap_reserved_words(sql) logger.debug(f"Original SQL: {sql}") logger.debug(f"Processed SQL: {processed_sql}") # Prepare input variables variables = {"input": {"sql": processed_sql, "databaseName": database_name}} logger.debug(f"Query variables: {variables}") # Execute the query using shared client result = await _execute_query(EXECUTE_DATA_LAKE_QUERY, variables) # Get query ID from result query_id = result.get("executeDataLakeQuery", {}).get("id") if not query_id: raise ValueError("No query ID returned from execution") logger.info(f"Successfully executed query with ID: {query_id}") sleep_time = INITIAL_QUERY_SLEEP_SECONDS while True: await asyncio.sleep(sleep_time) result = await _get_data_lake_query_results( query_id=query_id, max_rows=max_rows, cursor=cursor ) if result.get("status") == "running": if (time.time() - start_time) >= timeout: await _cancel_data_lake_query(query_id=query_id) return { "success": False, "status": "cancelled", "message": "Query time exceeded timeout, and has been cancelled. A longer timout may be required. " "Retrying may be faster due to caching, or you may need to reduce the duration of data being queried.", "query_id": query_id, } else: return result if sleep_time <= MAX_QUERY_SLEEP_SECONDS: sleep_time += 1 except Exception as e: logger.error(f"Failed to execute data lake query: {str(e)}") # Try to get query_id if it was set before the error query_id = locals().get("query_id") return { "success": False, "message": f"Failed to execute data lake query: {str(e)}", "query_id": query_id, }
- Pydantic-based input schema definition using Annotated and Field for parameters: sql (required), database_name, timeout, max_rows, cursor.sql: Annotated[ str, Field( description="The SQL query to execute. Must include a p_event_time filter condition after WHERE or AND. The query must be compatible with Snowflake SQL." ), ], database_name: str = "panther_logs.public", timeout: Annotated[ int, Field( description="Timeout in seconds before the SQL query is cancelled. If the query fails due to timeout, the caller should consider a longer timeout." ), ] = 30, max_rows: Annotated[ int, Field( description="Maximum number of result rows to return (prevents context overflow)", ge=1, le=999, ), ] = 100, cursor: Annotated[ str | None, Field( description="Optional pagination cursor from previous query to fetch next page of results", ), ] = None, ) -> Dict[str, Any]:
- src/mcp_panther/server.py:72-72 (registration)Registers all tools decorated with @mcp_tool, including query_data_lake, with the FastMCP server instance.register_all_tools(mcp)
- Helper function to process SQL for Snowflake reserved words by converting single-quoted reserved identifiers to double-quoted.def wrap_reserved_words(sql: str) -> str: """ Simple function to wrap reserved words in SQL using sqlparse. This function: 1. Parses the SQL using sqlparse 2. Identifies string literals that match reserved words 3. Converts single-quoted reserved words to double-quoted ones Args: sql: The SQL query string to process Returns: The SQL with reserved words properly quoted """ try: # Parse the SQL parsed = sqlparse.parse(sql)[0] # Convert the parsed SQL back to string, but process tokens result = [] for token in parsed.flatten(): if token.ttype is sqlparse.tokens.Literal.String.Single: # Remove quotes and check if it's a reserved word value = token.value.strip("'") if value.upper() in SNOWFLAKE_RESERVED_WORDS: # Convert to double-quoted identifier result.append(f'"{value}"') else: result.append(token.value) else: result.append(token.value) return "".join(result) except Exception as e: logger.warning(f"Failed to parse SQL for reserved words: {e}") return sql
- Internal helper to retrieve query results, supporting pagination and formatting the response structure used by query_data_lake.async def _get_data_lake_query_results( query_id: Annotated[ str, Field( description="The ID of the query to get results for", examples=["01be5f14-0206-3c48-000d-9eff005dd176"], ), ], max_rows: int = 100, cursor: str | None = None, ) -> Dict[str, Any]: """Get the results of a previously executed data lake query. Returns: Dict containing: - success: Boolean indicating if the query was successful - status: Status of the query (e.g., "succeeded", "running", "failed", "cancelled") - message: Error message if unsuccessful - results: List of query result rows - column_info: Dict containing column names and types - stats: Dict containing stats about the query - has_next_page: Boolean indicating if there are more results available - next_cursor: Cursor for fetching the next page of results, or null if no more pages """ logger.info(f"Fetching data lake query results for query ID: {query_id}") try: # Prepare input variables for pagination variables = { "id": query_id, "root": False, "resultsInput": { "pageSize": max_rows, "cursor": cursor, }, } if cursor: logger.info(f"Using pagination cursor: {cursor}") logger.debug(f"Query variables: {variables}") # Execute the query using shared client result = await _execute_query(GET_DATA_LAKE_QUERY, variables) # Get query data query_data = result.get("dataLakeQuery", {}) if not query_data: logger.warning(f"No query found with ID: {query_id}") return { "success": False, "message": f"No query found with ID: {query_id}", "query_id": query_id, } # Get query status status = query_data.get("status") if status == "running": return { "success": True, "status": "running", "message": "Query is still running", "query_id": query_id, } elif status == "failed": return { "success": False, "status": "failed", "message": query_data.get("message", "Query failed"), "query_id": query_id, } elif status == "cancelled": return { "success": False, "status": "cancelled", "message": "Query was cancelled", "query_id": query_id, } # Get results data results = query_data.get("results", {}) edges = results.get("edges", []) column_info = results.get("columnInfo", {}) stats = results.get("stats", {}) # Extract results from edges query_results = [edge["node"] for edge in edges] # Check pagination info page_info = results.get("pageInfo", {}) has_next_page = page_info.get("hasNextPage", False) end_cursor = page_info.get("endCursor") # Track pagination state original_count = len(query_results) # For cursor-based requests, we don't truncate since GraphQL handles pagination was_truncated = False if cursor: logger.info( f"Retrieved page of {len(query_results)} results using cursor pagination" ) else: # For initial requests without cursor, apply legacy truncation if needed was_truncated = original_count > max_rows if was_truncated: query_results = query_results[:max_rows] logger.info( f"Query results truncated from {original_count} to {max_rows} rows for context window protection" ) else: logger.info( f"Retrieved {len(query_results)} results (no truncation needed)" ) logger.info( f"Successfully retrieved {len(query_results)} results for query ID: {query_id}" ) # Format the response return { "success": True, "status": status, "results": query_results, "results_truncated": was_truncated, "total_rows_available": original_count, "rows_returned": len(query_results), "column_info": { "order": column_info.get("order", []), "types": column_info.get("types", {}), }, "stats": { "bytes_scanned": stats.get("bytesScanned", 0), "execution_time": stats.get("executionTime", 0), "row_count": stats.get("rowCount", 0), }, "has_next_page": has_next_page, "next_cursor": end_cursor, "message": query_data.get("message", "Query executed successfully"), "query_id": query_id, } except Exception as e: logger.error(f"Failed to fetch data lake query results: {str(e)}") return { "success": False, "message": f"Failed to fetch data lake query results: {str(e)}", "query_id": query_id, }