Skip to main content
Glama
panther-labs

Panther MCP Server

Official

query_data_lake

Execute custom SQL queries on Panther's data lake for advanced analysis, ensuring Snowflake SQL syntax and p_event_time filtering. Retrieve results, column info, and stats efficiently.

Instructions

Execute custom SQL queries against Panther's data lake for advanced data analysis and aggregation.

All queries MUST conform to Snowflake's SQL syntax.

If the table has a p_event_time column, it must use a WHERE clause to filter upon it.

Guidance:

For efficiency, when checking for values in an array, use the snowflake function ARRAY_CONTAINS( <value_expr> , <array> ).

When using ARRAY_CONTAINS, make sure to cast the value_expr to a variant, for example: ARRAY_CONTAINS('example@example.com'::VARIANT, p_any_emails).

When interacting with object type columns use dot notation to traverse a path in a JSON object: <column>:<level1_element>.<level2_element>.<level3_element>. Optionally enclose element names in double quotes: <column>:"<level1_element>"."<level2_element>"."<level3_element>".

If an object/JSON element name does not conform to Snowflake SQL identifier rules, for example if it contains spaces, then you must enclose the element name in double quotes.

Returns: Dict containing: - success: Boolean indicating if the query was successful - status: Status of the query (e.g., "succeeded", "failed", "cancelled") - message: Error message if unsuccessful - query_id: The unique identifier for the query (null if query execution failed) - results: List of query result rows - column_info: Dict containing column names and types - stats: Dict containing stats about the query - has_next_page: Boolean indicating if there are more results available - end_cursor: Cursor for fetching the next page of results, or null if no more pages

Permissions:{'all_of': ['Query Data Lake']}

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
database_nameNopanther_logs.public
sqlYesThe SQL query to execute. Must include a p_event_time filter condition after WHERE or AND. The query must be compatible with Snowflake SQL.
timeoutNoTimeout in seconds before the SQL query is cancelled. If the query fails due to timeout, the caller should consider a longer timeout.

Implementation Reference

  • Primary implementation of the query_data_lake tool. Decorated with @mcp_tool for MCP registration. Validates SQL for time filters and reserved words, executes via GraphQL mutation, polls for completion with exponential backoff, supports pagination via cursor, handles timeouts by cancelling, and returns structured results with metadata.
    @mcp_tool( annotations={ "permissions": all_perms(Permission.DATA_ANALYTICS_READ), "readOnlyHint": True, } ) async def query_data_lake( sql: Annotated[ str, Field( description="The SQL query to execute. Must include a p_event_time filter condition after WHERE or AND. The query must be compatible with Snowflake SQL." ), ], database_name: str = "panther_logs.public", timeout: Annotated[ int, Field( description="Timeout in seconds before the SQL query is cancelled. If the query fails due to timeout, the caller should consider a longer timeout." ), ] = 30, max_rows: Annotated[ int, Field( description="Maximum number of result rows to return (prevents context overflow)", ge=1, le=999, ), ] = 100, cursor: Annotated[ str | None, Field( description="Optional pagination cursor from previous query to fetch next page of results", ), ] = None, ) -> Dict[str, Any]: """Query Panther's security data lake using SQL for log analysis and threat hunting. REQUIRED: Include time filter with p_event_time (required for performance and partitioning) Panther Time Filter Macros (Recommended - optimized for performance): - p_occurs_since(timeOffset [, tableAlias[, column]]) Examples: p_occurs_since('1 d'), p_occurs_since('6 h'), p_occurs_since('2 weeks'), p_occurs_since(3600) Time formats: '30 s', '15 m', '6 h', '2 d', '1 w' OR '2 weeks', '1 day' OR numeric seconds - p_occurs_between(startTime, endTime [, tableAlias[, column]]) Examples: p_occurs_between('2024-01-01', '2024-01-02'), p_occurs_between('2024-03-20T00:00:00Z', '2024-03-20T23:59:59Z') - p_occurs_around(timestamp, timeOffset [, tableAlias[, column]]) Example: p_occurs_around('2024-01-15T10:30:00Z', '1 h') # ±1 hour around timestamp - p_occurs_after(timestamp [, tableAlias[, column]]) - p_occurs_before(timestamp [, tableAlias[, column]]) Alternative (manual): WHERE p_event_time >= '2024-01-01' AND p_event_time < '2024-01-02' Best Practices: - Always use time filters (macros preferred over manual p_event_time conditions) - Start with summary queries, then drill down to specific timeframes - Use p_any_* fields for faster correlation (p_any_ip_addresses, p_any_usernames, p_any_emails) - Query specific fields instead of SELECT * for better performance Pagination: - First call: No cursor parameter - returns first page with max_rows results - Subsequent calls: Use next_cursor from previous response to get next page - Continue until has_next_page is False Common Examples: - Recent failed logins: "SELECT * FROM panther_logs.public.aws_cloudtrail WHERE p_occurs_since('1 d') AND errorcode IS NOT NULL" - IP activity summary: "SELECT sourceippaddress, COUNT(*) FROM panther_logs.public.aws_cloudtrail WHERE p_occurs_since('6 h') GROUP BY sourceippaddress" - User correlation: "SELECT * FROM panther_logs.public.aws_cloudtrail WHERE p_occurs_since('2 h') AND ARRAY_CONTAINS('user@domain.com'::VARIANT, p_any_emails)" - Nested field access: "SELECT p_enrichment:ipinfo_privacy:\"context.ip_address\" FROM table WHERE p_occurs_since('1 h')" Query Syntax (Snowflake SQL): - Access nested JSON: column:field.subfield - Quote special characters: column:"field name" or p_enrichment:"context.ip_address" - Array searches: ARRAY_CONTAINS('value'::VARIANT, array_column) Returns: Dict with query results: - results: List of matching rows (paginated based on cursor parameter) - results_truncated: True if results were truncated (only for non-paginated requests) - total_rows_available: Total rows found (for non-paginated requests) - has_next_page: True if more results are available - next_cursor: Cursor for next page (use in subsequent call) - column_info: Column names and data types - stats: Query performance metrics (execution time, bytes scanned) - success/status/message: Query execution status """ logger.info("Executing data lake query") start_time = time.time() # Validate that the query includes a time filter (p_event_time or Panther macros) sql_lower = sql.lower().replace("\n", " ") has_p_event_time = re.search( r"\b(where|and)\s+.*?(?:[\w.]+\.)?p_event_time\s*(>=|<=|=|>|<|between)", sql_lower, ) has_panther_macros = re.search( r"p_occurs_(since|between|around|after|before)\s*\(", sql_lower, ) if (not (has_p_event_time or has_panther_macros)) and re.search( r"\Wpanther_(views|signals|rule_matches|rule_errors|monitor|logs|cloudsecurity)\.", sql_lower, ): error_msg = "Query must include a time filter: either `p_event_time` condition or Panther macro (p_occurs_since, p_occurs_between, etc.)" logger.error(error_msg) return { "success": False, "message": error_msg, "query_id": None, } try: # Process reserved words in the SQL processed_sql = wrap_reserved_words(sql) logger.debug(f"Original SQL: {sql}") logger.debug(f"Processed SQL: {processed_sql}") # Prepare input variables variables = {"input": {"sql": processed_sql, "databaseName": database_name}} logger.debug(f"Query variables: {variables}") # Execute the query asynchronously async with await _create_panther_client() as session: result = await session.execute( EXECUTE_DATA_LAKE_QUERY, variable_values=variables ) # Get query ID from result query_id = result.get("executeDataLakeQuery", {}).get("id") if not query_id: raise ValueError("No query ID returned from execution") logger.info(f"Successfully executed query with ID: {query_id}") sleep_time = INITIAL_QUERY_SLEEP_SECONDS while True: await asyncio.sleep(sleep_time) result = await _get_data_lake_query_results( query_id=query_id, max_rows=max_rows, cursor=cursor ) if result.get("status") == "running": if (time.time() - start_time) >= timeout: await _cancel_data_lake_query(query_id=query_id) return { "success": False, "status": "cancelled", "message": "Query time exceeded timeout, and has been cancelled. A longer timout may be required. " "Retrying may be faster due to caching, or you may need to reduce the duration of data being queried.", "query_id": query_id, } else: return result if sleep_time <= MAX_QUERY_SLEEP_SECONDS: sleep_time += 1 except Exception as e: logger.error(f"Failed to execute data lake query: {str(e)}") # Try to get query_id if it was set before the error query_id = locals().get("query_id") return { "success": False, "message": f"Failed to execute data lake query: {str(e)}", "query_id": query_id, }
  • wrap_reserved_words: Processes input SQL to automatically convert single-quoted Snowflake reserved words to double-quoted identifiers using sqlparse, called within query_data_lake.
    def wrap_reserved_words(sql: str) -> str: """ Simple function to wrap reserved words in SQL using sqlparse. This function: 1. Parses the SQL using sqlparse 2. Identifies string literals that match reserved words 3. Converts single-quoted reserved words to double-quoted ones Args: sql: The SQL query string to process Returns: The SQL with reserved words properly quoted """ try: # Parse the SQL parsed = sqlparse.parse(sql)[0] # Convert the parsed SQL back to string, but process tokens result = [] for token in parsed.flatten(): if token.ttype is sqlparse.tokens.Literal.String.Single: # Remove quotes and check if it's a reserved word value = token.value.strip("'") if value.upper() in SNOWFLAKE_RESERVED_WORDS: # Convert to double-quoted identifier result.append(f'"{value}"') else: result.append(token.value) else: result.append(token.value) return "".join(result) except Exception as e: logger.warning(f"Failed to parse SQL for reserved words: {e}") return sql
  • _get_data_lake_query_results: Fetches paginated results for a query_id via GraphQL, handles status checks (running/succeeded/failed), applies legacy truncation for non-cursor calls, extracts column info/stats/pagination cursors.
    async def _get_data_lake_query_results( query_id: Annotated[ str, Field( description="The ID of the query to get results for", examples=["01be5f14-0206-3c48-000d-9eff005dd176"], ), ], max_rows: int = 100, cursor: str | None = None, ) -> Dict[str, Any]: """Get the results of a previously executed data lake query. Returns: Dict containing: - success: Boolean indicating if the query was successful - status: Status of the query (e.g., "succeeded", "running", "failed", "cancelled") - message: Error message if unsuccessful - results: List of query result rows - column_info: Dict containing column names and types - stats: Dict containing stats about the query - has_next_page: Boolean indicating if there are more results available - next_cursor: Cursor for fetching the next page of results, or null if no more pages """ logger.info(f"Fetching data lake queryresults for query ID: {query_id}") try: # Prepare input variables for pagination variables = { "id": query_id, "root": False, "resultsInput": { "pageSize": max_rows, "cursor": cursor, }, } if cursor: logger.info(f"Using pagination cursor: {cursor}") logger.debug(f"Query variables: {variables}") # Execute the query asynchronously async with await _create_panther_client() as session: result = await session.execute( GET_DATA_LAKE_QUERY, variable_values=variables ) # Get query data query_data = result.get("dataLakeQuery", {}) if not query_data: logger.warning(f"No query found with ID: {query_id}") return { "success": False, "message": f"No query found with ID: {query_id}", "query_id": query_id, } # Get query status status = query_data.get("status") if status == "running": return { "success": True, "status": "running", "message": "Query is still running", "query_id": query_id, } elif status == "failed": return { "success": False, "status": "failed", "message": query_data.get("message", "Query failed"), "query_id": query_id, } elif status == "cancelled": return { "success": False, "status": "cancelled", "message": "Query was cancelled", "query_id": query_id, } # Get results data results = query_data.get("results", {}) edges = results.get("edges", []) column_info = results.get("columnInfo", {}) stats = results.get("stats", {}) # Extract results from edges query_results = [edge["node"] for edge in edges] # Check pagination info page_info = results.get("pageInfo", {}) has_next_page = page_info.get("hasNextPage", False) end_cursor = page_info.get("endCursor") # Track pagination state original_count = len(query_results) # For cursor-based requests, we don't truncate since GraphQL handles pagination was_truncated = False if cursor: logger.info( f"Retrieved page of {len(query_results)} results using cursor pagination" ) else: # For initial requests without cursor, apply legacy truncation if needed was_truncated = original_count > max_rows if was_truncated: query_results = query_results[:max_rows] logger.info( f"Query results truncated from {original_count} to {max_rows} rows for context window protection" ) else: logger.info( f"Retrieved {len(query_results)} results (no truncation needed)" ) logger.info( f"Successfully retrieved {len(query_results)} results for query ID: {query_id}" ) # Format the response return { "success": True, "status": status, "results": query_results, "results_truncated": was_truncated, "total_rows_available": original_count, "rows_returned": len(query_results), "column_info": { "order": column_info.get("order", []), "types": column_info.get("types", {}), }, "stats": { "bytes_scanned": stats.get("bytesScanned", 0), "execution_time": stats.get("executionTime", 0), "row_count": stats.get("rowCount", 0), }, "has_next_page": has_next_page, "next_cursor": end_cursor, "message": query_data.get("message", "Query executed successfully"), "query_id": query_id, } except Exception as e: logger.error(f"Failed to fetch data lake query results: {str(e)}") return { "success": False, "message": f"Failed to fetch data lake query results: {str(e)}", "query_id": query_id, }
  • _cancel_data_lake_query: Cancels a running data lake query via GraphQL mutation, provides specific error messages for common cases like already completed or permission denied.
    async def _cancel_data_lake_query( query_id: Annotated[ str, Field(description="The ID of the query to cancel"), ], ) -> Dict[str, Any]: """Cancel a running data lake query to free up resources and prevent system overload. This tool is critical for managing data lake performance and preventing resource exhaustion. Use it to cancel long-running queries that are consuming excessive resources or are no longer needed. IMPORTANT: Only running queries can be cancelled. Completed, failed, or already cancelled queries cannot be cancelled again. Common use cases: - Cancel runaway queries consuming too many resources - Stop queries that are taking longer than expected - Clean up queries that are no longer needed - Prevent system overload during peak usage Best practices: 1. First use list_data_lake_queries(status=['running']) to find running queries 2. Review the SQL and timing information before cancelling 3. Cancel queries from oldest to newest if multiple queries need cancellation 4. Monitor system load after cancellation to ensure improvement Returns: Dict containing: - success: Boolean indicating if the cancellation was successful - query_id: ID of the cancelled query - message: Success/error message """ logger.info(f"Cancelling data lake query: {query_id}") try: client = await _create_panther_client() variables = {"input": {"id": query_id}} # Execute the cancellation async with client as session: result = await session.execute( CANCEL_DATA_LAKE_QUERY, variable_values=variables ) # Parse results cancellation_data = result.get("cancelDataLakeQuery", {}) cancelled_id = cancellation_data.get("id") if not cancelled_id: raise ValueError("No query ID returned from cancellation") logger.info(f"Successfully cancelled data lake query: {cancelled_id}") return { "success": True, "query_id": cancelled_id, "message": f"Successfully cancelled query {cancelled_id}", } except Exception as e: logger.error(f"Failed to cancel data lake query: {str(e)}") # Provide helpful error messages for common issues error_message = str(e) if "not found" in error_message.lower(): error_message = f"Query {query_id} not found. It may have already completed or been cancelled." elif "cannot be cancelled" in error_message.lower(): error_message = f"Query {query_id} cannot be cancelled. Only running queries can be cancelled." elif "permission" in error_message.lower(): error_message = f"Permission denied. You may not have permission to cancel query {query_id}." else: error_message = f"Failed to cancel query {query_id}: {error_message}" return { "success": False, "message": error_message, }
  • mcp_tool decorator definition: Registers functions decorated with @mcp_tool into a global registry (_tool_registry), which are later registered to MCP server via register_all_tools(mcp_instance).
    def mcp_tool( func: Optional[Callable] = None, *, name: Optional[str] = None, description: Optional[str] = None, annotations: Optional[Dict[str, Any]] = None, ) -> Callable: """ Decorator to mark a function as an MCP tool. Functions decorated with this will be automatically registered when register_all_tools() is called. Can be used in two ways: 1. Direct decoration: @mcp_tool def my_tool(): ... 2. With parameters: @mcp_tool( name="custom_name", description="Custom description", annotations={"category": "data_analysis"} ) def my_tool(): ... Args: func: The function to decorate name: Optional custom name for the tool. If not provided, uses the function name. description: Optional description of what the tool does. If not provided, uses the function's docstring. annotations: Optional dictionary of additional annotations for the tool. """ def decorator(func: Callable) -> Callable: # Store metadata on the function func._mcp_tool_metadata = { "name": name, "description": description, "annotations": annotations, } _tool_registry.add(func) @wraps(func) def wrapper(*args, **kwargs): return func(*args, **kwargs) return wrapper # Handle both @mcp_tool and @mcp_tool(...) cases if func is None: return decorator return decorator(func)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/panther-labs/mcp-panther'

If you have feedback or need assistance with the MCP directory API, please join our Discord server