Skip to main content
Glama
panther-labs

Panther MCP Server

Official

get_table_schema

Retrieve column names and data types for data lake tables to understand table structure and write optimized queries in Snowflake.

Instructions

Get column details for a specific data lake table.

IMPORTANT: This returns the table structure in Snowflake. For writing optimal queries, ALSO call get_panther_log_type_schema() to understand:

  • Nested object structures (only shown as 'object' type here)

  • Which fields map to p_any_* indicator columns

  • Array element structures

Example workflow:

  1. get_panther_log_type_schema(["AWS.CloudTrail"]) - understand structure

  2. get_table_schema("panther_logs.public", "aws_cloudtrail") - get column names/types

  3. Write query using both: nested paths from log schema, column names from table schema

Returns: Dict containing: - success: Boolean indicating if the query was successful - name: Table name - display_name: Table display name - description: Table description - log_type: Log type - columns: List of columns, each containing: - name: Column name - type: Column data type - description: Column description - message: Error message if unsuccessful

Permissions:{'all_of': ['Query Data Lake']}

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
database_nameYesThe name of the database where the table is located
table_nameYesThe name of the table to get columns for

Implementation Reference

  • The core handler function that implements the get_table_schema tool. It takes database_name and table_name parameters, constructs a GraphQL query using GET_COLUMNS_FOR_TABLE_QUERY, executes it via _execute_query, and returns the table's column information including names, types, and descriptions.
    async def get_table_schema(
        database_name: Annotated[
            str,
            Field(
                description="The name of the database where the table is located",
                examples=["panther_logs.public"],
            ),
        ],
        table_name: Annotated[
            str,
            Field(
                description="The name of the table to get columns for",
                examples=["Panther.Audit"],
            ),
        ],
    ) -> Dict[str, Any]:
        """Get column details for a specific data lake table.
    
        IMPORTANT: This returns the table structure in Snowflake. For writing
        optimal queries, ALSO call get_panther_log_type_schema() to understand:
        - Nested object structures (only shown as 'object' type here)
        - Which fields map to p_any_* indicator columns
        - Array element structures
    
        Example workflow:
        1. get_panther_log_type_schema(["AWS.CloudTrail"]) - understand structure
        2. get_table_schema("panther_logs.public", "aws_cloudtrail") - get column names/types
        3. Write query using both: nested paths from log schema, column names from table schema
    
        Returns:
            Dict containing:
            - success: Boolean indicating if the query was successful
            - name: Table name
            - display_name: Table display name
            - description: Table description
            - log_type: Log type
            - columns: List of columns, each containing:
                - name: Column name
                - type: Column data type
                - description: Column description
            - message: Error message if unsuccessful
        """
        table_full_path = f"{database_name}.{table_name}"
        logger.info(f"Fetching column information for table: {table_full_path}")
    
        try:
            # Prepare input variables
            variables = {"databaseName": database_name, "tableName": table_name}
    
            logger.debug(f"Query variables: {variables}")
    
            # Execute the query using shared client
            result = await _execute_query(GET_COLUMNS_FOR_TABLE_QUERY, variables)
    
            # Get query data
            query_data = result.get("dataLakeDatabaseTable", {})
            columns = query_data.get("columns", [])
    
            if not columns:
                logger.warning(f"No columns found for table: {table_full_path}")
                return {
                    "success": False,
                    "message": f"No columns found for table: {table_full_path}",
                }
    
            logger.info(f"Successfully retrieved {len(columns)} columns")
    
            # Format the response
            return {
                "success": True,
                "status": "succeeded",
                **query_data,
                "stats": {
                    "table_count": len(columns),
                },
            }
        except Exception as e:
            logger.error(f"Failed to get columns for table: {str(e)}")
            return {
                "success": False,
                "message": f"Failed to get columns for table: {str(e)}",
            }
  • The @mcp_tool decorator call that registers get_table_schema as an MCP tool, specifying required permissions (DATA_ANALYTICS_READ) and marking it as read-only.
    @mcp_tool(
        annotations={
            "permissions": all_perms(Permission.DATA_ANALYTICS_READ),
            "readOnlyHint": True,
        }
    )
  • Input schema defined via Pydantic Field annotations for database_name and table_name. Output schema described in the function docstring, returning structured table metadata and columns.
    async def get_table_schema(
        database_name: Annotated[
            str,
            Field(
                description="The name of the database where the table is located",
                examples=["panther_logs.public"],
            ),
        ],
        table_name: Annotated[
            str,
            Field(
                description="The name of the table to get columns for",
                examples=["Panther.Audit"],
            ),
        ],
    ) -> Dict[str, Any]:
        """Get column details for a specific data lake table.
    
        IMPORTANT: This returns the table structure in Snowflake. For writing
        optimal queries, ALSO call get_panther_log_type_schema() to understand:
        - Nested object structures (only shown as 'object' type here)
        - Which fields map to p_any_* indicator columns
        - Array element structures
    
        Example workflow:
        1. get_panther_log_type_schema(["AWS.CloudTrail"]) - understand structure
        2. get_table_schema("panther_logs.public", "aws_cloudtrail") - get column names/types
        3. Write query using both: nested paths from log schema, column names from table schema
    
        Returns:
            Dict containing:
            - success: Boolean indicating if the query was successful
            - name: Table name
            - display_name: Table display name
            - description: Table description
            - log_type: Log type
            - columns: List of columns, each containing:
                - name: Column name
                - type: Column data type
                - description: Column description
            - message: Error message if unsuccessful
        """
        table_full_path = f"{database_name}.{table_name}"
        logger.info(f"Fetching column information for table: {table_full_path}")
    
        try:
            # Prepare input variables
            variables = {"databaseName": database_name, "tableName": table_name}
    
            logger.debug(f"Query variables: {variables}")
    
            # Execute the query using shared client
            result = await _execute_query(GET_COLUMNS_FOR_TABLE_QUERY, variables)
    
            # Get query data
            query_data = result.get("dataLakeDatabaseTable", {})
            columns = query_data.get("columns", [])
    
            if not columns:
                logger.warning(f"No columns found for table: {table_full_path}")
                return {
                    "success": False,
                    "message": f"No columns found for table: {table_full_path}",
                }
    
            logger.info(f"Successfully retrieved {len(columns)} columns")
    
            # Format the response
            return {
                "success": True,
                "status": "succeeded",
                **query_data,
                "stats": {
                    "table_count": len(columns),
                },
            }
        except Exception as e:
            logger.error(f"Failed to get columns for table: {str(e)}")
            return {
                "success": False,
                "message": f"Failed to get columns for table: {str(e)}",
            }
  • GraphQL query definition used by the handler to fetch table column details from the Panther GraphQL API.
    GET_COLUMNS_FOR_TABLE_QUERY = gql("""
    query GetColumnDetails($databaseName: String!, $tableName: String!) {
      dataLakeDatabaseTable(input: { databaseName: $databaseName, tableName: $tableName }) {
        name,
        displayName,
        description,
        logType,
        columns {
          name,
          type,
          description
        }
      }
    }
    """)
  • The mcp_tool decorator and register_all_tools function that handle collecting and registering all decorated tools with the MCP server instance.
    def mcp_tool(
        func: Optional[Callable] = None,
        *,
        name: Optional[str] = None,
        description: Optional[str] = None,
        annotations: Optional[Dict[str, Any]] = None,
    ) -> Callable:
        """
        Decorator to mark a function as an MCP tool.
    
        Functions decorated with this will be automatically registered
        when register_all_tools() is called.
    
        Can be used in two ways:
        1. Direct decoration:
            @mcp_tool
            def my_tool():
                ...
    
        2. With parameters:
            @mcp_tool(
                name="custom_name",
                description="Custom description",
                annotations={"category": "data_analysis"}
            )
            def my_tool():
                ...
    
        Args:
            func: The function to decorate
            name: Optional custom name for the tool. If not provided, uses the function name.
            description: Optional description of what the tool does. If not provided, uses the function's docstring.
            annotations: Optional dictionary of additional annotations for the tool.
        """
    
        def decorator(func: Callable) -> Callable:
            # Store metadata on the function
            func._mcp_tool_metadata = {
                "name": name,
                "description": description,
                "annotations": annotations,
            }
            _tool_registry.add(func)
    
            @wraps(func)
            def wrapper(*args, **kwargs):
                return func(*args, **kwargs)
    
            return wrapper
    
        # Handle both @mcp_tool and @mcp_tool(...) cases
        if func is None:
            return decorator
        return decorator(func)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/panther-labs/mcp-panther'

If you have feedback or need assistance with the MCP directory API, please join our Discord server