Skip to main content
Glama
panther-labs

Panther MCP Server

Official

get_table_schema

Retrieve column names and data types for data lake tables to understand table structure and write optimized queries in Snowflake.

Instructions

Get column details for a specific data lake table.

IMPORTANT: This returns the table structure in Snowflake. For writing optimal queries, ALSO call get_panther_log_type_schema() to understand:

  • Nested object structures (only shown as 'object' type here)

  • Which fields map to p_any_* indicator columns

  • Array element structures

Example workflow:

  1. get_panther_log_type_schema(["AWS.CloudTrail"]) - understand structure

  2. get_table_schema("panther_logs.public", "aws_cloudtrail") - get column names/types

  3. Write query using both: nested paths from log schema, column names from table schema

Returns: Dict containing: - success: Boolean indicating if the query was successful - name: Table name - display_name: Table display name - description: Table description - log_type: Log type - columns: List of columns, each containing: - name: Column name - type: Column data type - description: Column description - message: Error message if unsuccessful

Permissions:{'all_of': ['Query Data Lake']}

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
database_nameYesThe name of the database where the table is located
table_nameYesThe name of the table to get columns for

Implementation Reference

  • The core handler function that implements the get_table_schema tool. It takes database_name and table_name parameters, constructs a GraphQL query using GET_COLUMNS_FOR_TABLE_QUERY, executes it via _execute_query, and returns the table's column information including names, types, and descriptions.
    async def get_table_schema( database_name: Annotated[ str, Field( description="The name of the database where the table is located", examples=["panther_logs.public"], ), ], table_name: Annotated[ str, Field( description="The name of the table to get columns for", examples=["Panther.Audit"], ), ], ) -> Dict[str, Any]: """Get column details for a specific data lake table. IMPORTANT: This returns the table structure in Snowflake. For writing optimal queries, ALSO call get_panther_log_type_schema() to understand: - Nested object structures (only shown as 'object' type here) - Which fields map to p_any_* indicator columns - Array element structures Example workflow: 1. get_panther_log_type_schema(["AWS.CloudTrail"]) - understand structure 2. get_table_schema("panther_logs.public", "aws_cloudtrail") - get column names/types 3. Write query using both: nested paths from log schema, column names from table schema Returns: Dict containing: - success: Boolean indicating if the query was successful - name: Table name - display_name: Table display name - description: Table description - log_type: Log type - columns: List of columns, each containing: - name: Column name - type: Column data type - description: Column description - message: Error message if unsuccessful """ table_full_path = f"{database_name}.{table_name}" logger.info(f"Fetching column information for table: {table_full_path}") try: # Prepare input variables variables = {"databaseName": database_name, "tableName": table_name} logger.debug(f"Query variables: {variables}") # Execute the query using shared client result = await _execute_query(GET_COLUMNS_FOR_TABLE_QUERY, variables) # Get query data query_data = result.get("dataLakeDatabaseTable", {}) columns = query_data.get("columns", []) if not columns: logger.warning(f"No columns found for table: {table_full_path}") return { "success": False, "message": f"No columns found for table: {table_full_path}", } logger.info(f"Successfully retrieved {len(columns)} columns") # Format the response return { "success": True, "status": "succeeded", **query_data, "stats": { "table_count": len(columns), }, } except Exception as e: logger.error(f"Failed to get columns for table: {str(e)}") return { "success": False, "message": f"Failed to get columns for table: {str(e)}", }
  • The @mcp_tool decorator call that registers get_table_schema as an MCP tool, specifying required permissions (DATA_ANALYTICS_READ) and marking it as read-only.
    @mcp_tool( annotations={ "permissions": all_perms(Permission.DATA_ANALYTICS_READ), "readOnlyHint": True, } )
  • Input schema defined via Pydantic Field annotations for database_name and table_name. Output schema described in the function docstring, returning structured table metadata and columns.
    async def get_table_schema( database_name: Annotated[ str, Field( description="The name of the database where the table is located", examples=["panther_logs.public"], ), ], table_name: Annotated[ str, Field( description="The name of the table to get columns for", examples=["Panther.Audit"], ), ], ) -> Dict[str, Any]: """Get column details for a specific data lake table. IMPORTANT: This returns the table structure in Snowflake. For writing optimal queries, ALSO call get_panther_log_type_schema() to understand: - Nested object structures (only shown as 'object' type here) - Which fields map to p_any_* indicator columns - Array element structures Example workflow: 1. get_panther_log_type_schema(["AWS.CloudTrail"]) - understand structure 2. get_table_schema("panther_logs.public", "aws_cloudtrail") - get column names/types 3. Write query using both: nested paths from log schema, column names from table schema Returns: Dict containing: - success: Boolean indicating if the query was successful - name: Table name - display_name: Table display name - description: Table description - log_type: Log type - columns: List of columns, each containing: - name: Column name - type: Column data type - description: Column description - message: Error message if unsuccessful """ table_full_path = f"{database_name}.{table_name}" logger.info(f"Fetching column information for table: {table_full_path}") try: # Prepare input variables variables = {"databaseName": database_name, "tableName": table_name} logger.debug(f"Query variables: {variables}") # Execute the query using shared client result = await _execute_query(GET_COLUMNS_FOR_TABLE_QUERY, variables) # Get query data query_data = result.get("dataLakeDatabaseTable", {}) columns = query_data.get("columns", []) if not columns: logger.warning(f"No columns found for table: {table_full_path}") return { "success": False, "message": f"No columns found for table: {table_full_path}", } logger.info(f"Successfully retrieved {len(columns)} columns") # Format the response return { "success": True, "status": "succeeded", **query_data, "stats": { "table_count": len(columns), }, } except Exception as e: logger.error(f"Failed to get columns for table: {str(e)}") return { "success": False, "message": f"Failed to get columns for table: {str(e)}", }
  • GraphQL query definition used by the handler to fetch table column details from the Panther GraphQL API.
    GET_COLUMNS_FOR_TABLE_QUERY = gql(""" query GetColumnDetails($databaseName: String!, $tableName: String!) { dataLakeDatabaseTable(input: { databaseName: $databaseName, tableName: $tableName }) { name, displayName, description, logType, columns { name, type, description } } } """)
  • The mcp_tool decorator and register_all_tools function that handle collecting and registering all decorated tools with the MCP server instance.
    def mcp_tool( func: Optional[Callable] = None, *, name: Optional[str] = None, description: Optional[str] = None, annotations: Optional[Dict[str, Any]] = None, ) -> Callable: """ Decorator to mark a function as an MCP tool. Functions decorated with this will be automatically registered when register_all_tools() is called. Can be used in two ways: 1. Direct decoration: @mcp_tool def my_tool(): ... 2. With parameters: @mcp_tool( name="custom_name", description="Custom description", annotations={"category": "data_analysis"} ) def my_tool(): ... Args: func: The function to decorate name: Optional custom name for the tool. If not provided, uses the function name. description: Optional description of what the tool does. If not provided, uses the function's docstring. annotations: Optional dictionary of additional annotations for the tool. """ def decorator(func: Callable) -> Callable: # Store metadata on the function func._mcp_tool_metadata = { "name": name, "description": description, "annotations": annotations, } _tool_registry.add(func) @wraps(func) def wrapper(*args, **kwargs): return func(*args, **kwargs) return wrapper # Handle both @mcp_tool and @mcp_tool(...) cases if func is None: return decorator return decorator(func)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/panther-labs/mcp-panther'

If you have feedback or need assistance with the MCP directory API, please join our Discord server