Skip to main content
Glama
panther-labs

Panther MCP Server

Official

query_data_lake

Read-only

Execute SQL queries on Panther's security data lake to analyze logs, investigate threats, and perform security monitoring with required time filters for optimal performance.

Instructions

Query Panther's security data lake using SQL for log analysis and threat hunting.

REQUIRED: Include time filter with p_event_time (required for performance and partitioning)

Panther Time Filter Macros (Recommended - optimized for performance):

  • p_occurs_since(timeOffset [, tableAlias[, column]]) Examples: p_occurs_since('1 d'), p_occurs_since('6 h'), p_occurs_since('2 weeks'), p_occurs_since(3600) Time formats: '30 s', '15 m', '6 h', '2 d', '1 w' OR '2 weeks', '1 day' OR numeric seconds

  • p_occurs_between(startTime, endTime [, tableAlias[, column]]) Examples: p_occurs_between('2024-01-01', '2024-01-02'), p_occurs_between('2024-03-20T00:00:00Z', '2024-03-20T23:59:59Z')

  • p_occurs_around(timestamp, timeOffset [, tableAlias[, column]]) Example: p_occurs_around('2024-01-15T10:30:00Z', '1 h') # ±1 hour around timestamp

  • p_occurs_after(timestamp [, tableAlias[, column]])

  • p_occurs_before(timestamp [, tableAlias[, column]])

Alternative (manual): WHERE p_event_time >= '2024-01-01' AND p_event_time < '2024-01-02'

Best Practices:

  • Always use time filters (macros preferred over manual p_event_time conditions)

  • Start with summary queries, then drill down to specific timeframes

  • Use p_any_* fields for faster correlation (p_any_ip_addresses, p_any_usernames, p_any_emails)

  • Query specific fields instead of SELECT * for better performance

Pagination:

  • First call: No cursor parameter - returns first page with max_rows results

  • Subsequent calls: Use next_cursor from previous response to get next page

  • Continue until has_next_page is False

Common Examples:

  • Recent failed logins: "SELECT * FROM panther_logs.public.aws_cloudtrail WHERE p_occurs_since('1 d') AND errorcode IS NOT NULL"

  • IP activity summary: "SELECT sourceippaddress, COUNT(*) FROM panther_logs.public.aws_cloudtrail WHERE p_occurs_since('6 h') GROUP BY sourceippaddress"

  • User correlation: "SELECT * FROM panther_logs.public.aws_cloudtrail WHERE p_occurs_since('2 h') AND ARRAY_CONTAINS('user@domain.com'::VARIANT, p_any_emails)"

  • Nested field access: "SELECT p_enrichment:ipinfo_privacy:"context.ip_address" FROM table WHERE p_occurs_since('1 h')"

Query Syntax (Snowflake SQL):

  • Access nested JSON: column:field.subfield

  • Quote special characters: column:"field name" or p_enrichment:"context.ip_address"

  • Array searches: ARRAY_CONTAINS('value'::VARIANT, array_column)

Returns: Dict with query results: - results: List of matching rows (paginated based on cursor parameter) - results_truncated: True if results were truncated (only for non-paginated requests) - total_rows_available: Total rows found (for non-paginated requests) - has_next_page: True if more results are available - next_cursor: Cursor for next page (use in subsequent call) - column_info: Column names and data types - stats: Query performance metrics (execution time, bytes scanned) - success/status/message: Query execution status

Permissions:{'all_of': ['Query Data Lake']}

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
sqlYesThe SQL query to execute. Must include a p_event_time filter condition after WHERE or AND. The query must be compatible with Snowflake SQL.
database_nameNopanther_logs.public
timeoutNoTimeout in seconds before the SQL query is cancelled. If the query fails due to timeout, the caller should consider a longer timeout.
max_rowsNoMaximum number of result rows to return (prevents context overflow)
cursorNoOptional pagination cursor from previous query to fetch next page of results

Output Schema

TableJSON Schema
NameRequiredDescriptionDefault

No arguments

Implementation Reference

  • The primary handler function for the 'query_data_lake' tool. It validates the SQL query for time filters, processes reserved words, executes the query via GraphQL mutation, polls for completion, supports pagination with cursor and max_rows, handles timeouts and cancellations, and returns structured results including metadata and pagination info.
        annotations={
            "permissions": all_perms(Permission.DATA_ANALYTICS_READ),
            "readOnlyHint": True,
        }
    )
    async def query_data_lake(
        sql: Annotated[
            str,
            Field(
                description="The SQL query to execute. Must include a p_event_time filter condition after WHERE or AND. The query must be compatible with Snowflake SQL."
            ),
        ],
        database_name: str = "panther_logs.public",
        timeout: Annotated[
            int,
            Field(
                description="Timeout in seconds before the SQL query is cancelled. If the query fails due to timeout, the caller should consider a longer timeout."
            ),
        ] = 30,
        max_rows: Annotated[
            int,
            Field(
                description="Maximum number of result rows to return (prevents context overflow)",
                ge=1,
                le=999,
            ),
        ] = 100,
        cursor: Annotated[
            str | None,
            Field(
                description="Optional pagination cursor from previous query to fetch next page of results",
            ),
        ] = None,
    ) -> Dict[str, Any]:
        """Query Panther's security data lake using SQL for log analysis and threat hunting.
    
        REQUIRED: Include time filter with p_event_time (required for performance and partitioning)
    
        Panther Time Filter Macros (Recommended - optimized for performance):
        - p_occurs_since(timeOffset [, tableAlias[, column]])
          Examples: p_occurs_since('1 d'), p_occurs_since('6 h'), p_occurs_since('2 weeks'), p_occurs_since(3600)
          Time formats: '30 s', '15 m', '6 h', '2 d', '1 w' OR '2 weeks', '1 day' OR numeric seconds
    
        - p_occurs_between(startTime, endTime [, tableAlias[, column]])
          Examples: p_occurs_between('2024-01-01', '2024-01-02'), p_occurs_between('2024-03-20T00:00:00Z', '2024-03-20T23:59:59Z')
    
        - p_occurs_around(timestamp, timeOffset [, tableAlias[, column]])
          Example: p_occurs_around('2024-01-15T10:30:00Z', '1 h')  # ±1 hour around timestamp
    
        - p_occurs_after(timestamp [, tableAlias[, column]])
        - p_occurs_before(timestamp [, tableAlias[, column]])
    
        Alternative (manual): WHERE p_event_time >= '2024-01-01' AND p_event_time < '2024-01-02'
    
        Best Practices:
        - Always use time filters (macros preferred over manual p_event_time conditions)
        - Start with summary queries, then drill down to specific timeframes
        - Use p_any_* fields for faster correlation (p_any_ip_addresses, p_any_usernames, p_any_emails)
        - Query specific fields instead of SELECT * for better performance
    
        Pagination:
        - First call: No cursor parameter - returns first page with max_rows results
        - Subsequent calls: Use next_cursor from previous response to get next page
        - Continue until has_next_page is False
    
        Common Examples:
        - Recent failed logins: "SELECT * FROM panther_logs.public.aws_cloudtrail WHERE p_occurs_since('1 d') AND errorcode IS NOT NULL"
        - IP activity summary: "SELECT sourceippaddress, COUNT(*) FROM panther_logs.public.aws_cloudtrail WHERE p_occurs_since('6 h') GROUP BY sourceippaddress"
        - User correlation: "SELECT * FROM panther_logs.public.aws_cloudtrail WHERE p_occurs_since('2 h') AND ARRAY_CONTAINS('user@domain.com'::VARIANT, p_any_emails)"
        - Nested field access: "SELECT p_enrichment:ipinfo_privacy:\"context.ip_address\" FROM table WHERE p_occurs_since('1 h')"
    
        Query Syntax (Snowflake SQL):
        - Access nested JSON: column:field.subfield
        - Quote special characters: column:"field name" or p_enrichment:"context.ip_address"
        - Array searches: ARRAY_CONTAINS('value'::VARIANT, array_column)
    
        Returns:
            Dict with query results:
            - results: List of matching rows (paginated based on cursor parameter)
            - results_truncated: True if results were truncated (only for non-paginated requests)
            - total_rows_available: Total rows found (for non-paginated requests)
            - has_next_page: True if more results are available
            - next_cursor: Cursor for next page (use in subsequent call)
            - column_info: Column names and data types
            - stats: Query performance metrics (execution time, bytes scanned)
            - success/status/message: Query execution status
        """
        logger.info("Executing data lake query")
    
        start_time = time.time()
    
        # Validate that the query includes a time filter (p_event_time or Panther macros)
        sql_lower = sql.lower().replace("\n", " ")
        has_p_event_time = re.search(
            r"\b(where|and)\s+.*?(?:[\w.]+\.)?p_event_time\s*(>=|<=|=|>|<|between)",
            sql_lower,
        )
        has_panther_macros = re.search(
            r"p_occurs_(since|between|around|after|before)\s*\(",
            sql_lower,
        )
    
        if (not (has_p_event_time or has_panther_macros)) and re.search(
            r"\Wpanther_(views|signals|rule_matches|rule_errors|monitor|logs|cloudsecurity)\.",
            sql_lower,
        ):
            error_msg = "Query must include a time filter: either `p_event_time` condition or Panther macro (p_occurs_since, p_occurs_between, etc.)"
            logger.error(error_msg)
            return {
                "success": False,
                "message": error_msg,
                "query_id": None,
            }
    
        try:
            # Process reserved words in the SQL
            processed_sql = wrap_reserved_words(sql)
            logger.debug(f"Original SQL: {sql}")
            logger.debug(f"Processed SQL: {processed_sql}")
    
            # Prepare input variables
            variables = {"input": {"sql": processed_sql, "databaseName": database_name}}
    
            logger.debug(f"Query variables: {variables}")
    
            # Execute the query using shared client
            result = await _execute_query(EXECUTE_DATA_LAKE_QUERY, variables)
    
            # Get query ID from result
            query_id = result.get("executeDataLakeQuery", {}).get("id")
    
            if not query_id:
                raise ValueError("No query ID returned from execution")
    
            logger.info(f"Successfully executed query with ID: {query_id}")
    
            sleep_time = INITIAL_QUERY_SLEEP_SECONDS
            while True:
                await asyncio.sleep(sleep_time)
    
                result = await _get_data_lake_query_results(
                    query_id=query_id, max_rows=max_rows, cursor=cursor
                )
    
                if result.get("status") == "running":
                    if (time.time() - start_time) >= timeout:
                        await _cancel_data_lake_query(query_id=query_id)
                        return {
                            "success": False,
                            "status": "cancelled",
                            "message": "Query time exceeded timeout, and has been cancelled. A longer timout may be required. "
                            "Retrying may be faster due to caching, or you may need to reduce the duration of data being queried.",
                            "query_id": query_id,
                        }
                else:
                    return result
    
                if sleep_time <= MAX_QUERY_SLEEP_SECONDS:
                    sleep_time += 1
        except Exception as e:
            logger.error(f"Failed to execute data lake query: {str(e)}")
            # Try to get query_id if it was set before the error
            query_id = locals().get("query_id")
            return {
                "success": False,
                "message": f"Failed to execute data lake query: {str(e)}",
                "query_id": query_id,
            }
  • Pydantic-based input schema definition using Annotated and Field for parameters: sql (required), database_name, timeout, max_rows, cursor.
        sql: Annotated[
            str,
            Field(
                description="The SQL query to execute. Must include a p_event_time filter condition after WHERE or AND. The query must be compatible with Snowflake SQL."
            ),
        ],
        database_name: str = "panther_logs.public",
        timeout: Annotated[
            int,
            Field(
                description="Timeout in seconds before the SQL query is cancelled. If the query fails due to timeout, the caller should consider a longer timeout."
            ),
        ] = 30,
        max_rows: Annotated[
            int,
            Field(
                description="Maximum number of result rows to return (prevents context overflow)",
                ge=1,
                le=999,
            ),
        ] = 100,
        cursor: Annotated[
            str | None,
            Field(
                description="Optional pagination cursor from previous query to fetch next page of results",
            ),
        ] = None,
    ) -> Dict[str, Any]:
  • Registers all tools decorated with @mcp_tool, including query_data_lake, with the FastMCP server instance.
    register_all_tools(mcp)
  • Helper function to process SQL for Snowflake reserved words by converting single-quoted reserved identifiers to double-quoted.
    def wrap_reserved_words(sql: str) -> str:
        """
        Simple function to wrap reserved words in SQL using sqlparse.
    
        This function:
        1. Parses the SQL using sqlparse
        2. Identifies string literals that match reserved words
        3. Converts single-quoted reserved words to double-quoted ones
    
        Args:
            sql: The SQL query string to process
    
        Returns:
            The SQL with reserved words properly quoted
        """
        try:
            # Parse the SQL
            parsed = sqlparse.parse(sql)[0]
    
            # Convert the parsed SQL back to string, but process tokens
            result = []
            for token in parsed.flatten():
                if token.ttype is sqlparse.tokens.Literal.String.Single:
                    # Remove quotes and check if it's a reserved word
                    value = token.value.strip("'")
                    if value.upper() in SNOWFLAKE_RESERVED_WORDS:
                        # Convert to double-quoted identifier
                        result.append(f'"{value}"')
                    else:
                        result.append(token.value)
                else:
                    result.append(token.value)
    
            return "".join(result)
        except Exception as e:
            logger.warning(f"Failed to parse SQL for reserved words: {e}")
            return sql
  • Internal helper to retrieve query results, supporting pagination and formatting the response structure used by query_data_lake.
    async def _get_data_lake_query_results(
        query_id: Annotated[
            str,
            Field(
                description="The ID of the query to get results for",
                examples=["01be5f14-0206-3c48-000d-9eff005dd176"],
            ),
        ],
        max_rows: int = 100,
        cursor: str | None = None,
    ) -> Dict[str, Any]:
        """Get the results of a previously executed data lake query.
    
        Returns:
            Dict containing:
            - success: Boolean indicating if the query was successful
            - status: Status of the query (e.g., "succeeded", "running", "failed", "cancelled")
            - message: Error message if unsuccessful
            - results: List of query result rows
            - column_info: Dict containing column names and types
            - stats: Dict containing stats about the query
            - has_next_page: Boolean indicating if there are more results available
            - next_cursor: Cursor for fetching the next page of results, or null if no more pages
        """
        logger.info(f"Fetching data lake query results for query ID: {query_id}")
    
        try:
            # Prepare input variables for pagination
            variables = {
                "id": query_id,
                "root": False,
                "resultsInput": {
                    "pageSize": max_rows,
                    "cursor": cursor,
                },
            }
    
            if cursor:
                logger.info(f"Using pagination cursor: {cursor}")
    
            logger.debug(f"Query variables: {variables}")
    
            # Execute the query using shared client
            result = await _execute_query(GET_DATA_LAKE_QUERY, variables)
    
            # Get query data
            query_data = result.get("dataLakeQuery", {})
    
            if not query_data:
                logger.warning(f"No query found with ID: {query_id}")
                return {
                    "success": False,
                    "message": f"No query found with ID: {query_id}",
                    "query_id": query_id,
                }
    
            # Get query status
            status = query_data.get("status")
            if status == "running":
                return {
                    "success": True,
                    "status": "running",
                    "message": "Query is still running",
                    "query_id": query_id,
                }
            elif status == "failed":
                return {
                    "success": False,
                    "status": "failed",
                    "message": query_data.get("message", "Query failed"),
                    "query_id": query_id,
                }
            elif status == "cancelled":
                return {
                    "success": False,
                    "status": "cancelled",
                    "message": "Query was cancelled",
                    "query_id": query_id,
                }
    
            # Get results data
            results = query_data.get("results", {})
            edges = results.get("edges", [])
            column_info = results.get("columnInfo", {})
            stats = results.get("stats", {})
    
            # Extract results from edges
            query_results = [edge["node"] for edge in edges]
    
            # Check pagination info
            page_info = results.get("pageInfo", {})
            has_next_page = page_info.get("hasNextPage", False)
            end_cursor = page_info.get("endCursor")
    
            # Track pagination state
            original_count = len(query_results)
    
            # For cursor-based requests, we don't truncate since GraphQL handles pagination
            was_truncated = False
            if cursor:
                logger.info(
                    f"Retrieved page of {len(query_results)} results using cursor pagination"
                )
            else:
                # For initial requests without cursor, apply legacy truncation if needed
                was_truncated = original_count > max_rows
                if was_truncated:
                    query_results = query_results[:max_rows]
                    logger.info(
                        f"Query results truncated from {original_count} to {max_rows} rows for context window protection"
                    )
                else:
                    logger.info(
                        f"Retrieved {len(query_results)} results (no truncation needed)"
                    )
    
            logger.info(
                f"Successfully retrieved {len(query_results)} results for query ID: {query_id}"
            )
    
            # Format the response
            return {
                "success": True,
                "status": status,
                "results": query_results,
                "results_truncated": was_truncated,
                "total_rows_available": original_count,
                "rows_returned": len(query_results),
                "column_info": {
                    "order": column_info.get("order", []),
                    "types": column_info.get("types", {}),
                },
                "stats": {
                    "bytes_scanned": stats.get("bytesScanned", 0),
                    "execution_time": stats.get("executionTime", 0),
                    "row_count": stats.get("rowCount", 0),
                },
                "has_next_page": has_next_page,
                "next_cursor": end_cursor,
                "message": query_data.get("message", "Query executed successfully"),
                "query_id": query_id,
            }
        except Exception as e:
            logger.error(f"Failed to fetch data lake query results: {str(e)}")
            return {
                "success": False,
                "message": f"Failed to fetch data lake query results: {str(e)}",
                "query_id": query_id,
            }
Behavior4/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

The description adds significant behavioral context beyond the readOnlyHint annotation. It explains performance requirements (time filter is 'REQUIRED'), provides detailed pagination behavior, describes query syntax (Snowflake SQL), and outlines return structure. While annotations already indicate it's read-only, the description enriches understanding with practical constraints and operational details that aren't captured in structured fields.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness3/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is comprehensive but lengthy and could be more front-loaded. While all content is valuable (macros, best practices, examples, syntax, returns), it's presented as a dense block. The core purpose is clear in the first sentence, but subsequent sections could be better organized or summarized for quicker scanning by an AI agent.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness5/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Given the tool's complexity (SQL querying with performance constraints, pagination, specific syntax) and the presence of both annotations and output schema, the description is remarkably complete. It covers purpose, usage guidelines, behavioral details, parameter context, examples, and return structure. The output schema means the description doesn't need to explain return values in detail, and it appropriately focuses on operational guidance.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters4/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

With 80% schema description coverage, the baseline is 3, but the description adds substantial value. It provides extensive context about the 'sql' parameter (time filter requirements, Panther macros, best practices, examples, syntax), explains pagination behavior related to the 'cursor' parameter, and offers performance guidance relevant to 'timeout' and 'max_rows'. This goes well beyond what the schema descriptions provide.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose5/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description clearly states the tool's purpose: 'Query Panther's security data lake using SQL for log analysis and threat hunting.' This specifies the verb ('query'), resource ('Panther's security data lake'), and context ('log analysis and threat hunting'), distinguishing it from all sibling tools which focus on alerts, detections, users, and other administrative functions rather than direct data lake querying.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines5/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

The description provides explicit guidance on when to use this tool: for SQL-based log analysis and threat hunting. It also includes detailed 'Best Practices' and 'Common Examples' sections that guide effective usage, such as starting with summary queries and using time filters. While it doesn't name specific alternatives, the context of sibling tools (which are all about managing alerts, detections, users, etc.) makes it clear this is the primary data querying tool.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/panther-labs/mcp-panther'

If you have feedback or need assistance with the MCP directory API, please join our Discord server