get_table_schema
Retrieve column names and data types for data lake tables to understand table structure and write optimized queries in Snowflake.
Instructions
Get column details for a specific data lake table.
IMPORTANT: This returns the table structure in Snowflake. For writing optimal queries, ALSO call get_panther_log_type_schema() to understand:
Nested object structures (only shown as 'object' type here)
Which fields map to p_any_* indicator columns
Array element structures
Example workflow:
get_panther_log_type_schema(["AWS.CloudTrail"]) - understand structure
get_table_schema("panther_logs.public", "aws_cloudtrail") - get column names/types
Write query using both: nested paths from log schema, column names from table schema
Returns: Dict containing: - success: Boolean indicating if the query was successful - name: Table name - display_name: Table display name - description: Table description - log_type: Log type - columns: List of columns, each containing: - name: Column name - type: Column data type - description: Column description - message: Error message if unsuccessful
Permissions:{'all_of': ['Query Data Lake']}
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| database_name | Yes | The name of the database where the table is located | |
| table_name | Yes | The name of the table to get columns for |
Implementation Reference
- The core handler function that implements the get_table_schema tool. It takes database_name and table_name parameters, constructs a GraphQL query using GET_COLUMNS_FOR_TABLE_QUERY, executes it via _execute_query, and returns the table's column information including names, types, and descriptions.async def get_table_schema( database_name: Annotated[ str, Field( description="The name of the database where the table is located", examples=["panther_logs.public"], ), ], table_name: Annotated[ str, Field( description="The name of the table to get columns for", examples=["Panther.Audit"], ), ], ) -> Dict[str, Any]: """Get column details for a specific data lake table. IMPORTANT: This returns the table structure in Snowflake. For writing optimal queries, ALSO call get_panther_log_type_schema() to understand: - Nested object structures (only shown as 'object' type here) - Which fields map to p_any_* indicator columns - Array element structures Example workflow: 1. get_panther_log_type_schema(["AWS.CloudTrail"]) - understand structure 2. get_table_schema("panther_logs.public", "aws_cloudtrail") - get column names/types 3. Write query using both: nested paths from log schema, column names from table schema Returns: Dict containing: - success: Boolean indicating if the query was successful - name: Table name - display_name: Table display name - description: Table description - log_type: Log type - columns: List of columns, each containing: - name: Column name - type: Column data type - description: Column description - message: Error message if unsuccessful """ table_full_path = f"{database_name}.{table_name}" logger.info(f"Fetching column information for table: {table_full_path}") try: # Prepare input variables variables = {"databaseName": database_name, "tableName": table_name} logger.debug(f"Query variables: {variables}") # Execute the query using shared client result = await _execute_query(GET_COLUMNS_FOR_TABLE_QUERY, variables) # Get query data query_data = result.get("dataLakeDatabaseTable", {}) columns = query_data.get("columns", []) if not columns: logger.warning(f"No columns found for table: {table_full_path}") return { "success": False, "message": f"No columns found for table: {table_full_path}", } logger.info(f"Successfully retrieved {len(columns)} columns") # Format the response return { "success": True, "status": "succeeded", **query_data, "stats": { "table_count": len(columns), }, } except Exception as e: logger.error(f"Failed to get columns for table: {str(e)}") return { "success": False, "message": f"Failed to get columns for table: {str(e)}", }
- src/mcp_panther/panther_mcp_core/tools/data_lake.py:705-710 (registration)The @mcp_tool decorator call that registers get_table_schema as an MCP tool, specifying required permissions (DATA_ANALYTICS_READ) and marking it as read-only.@mcp_tool( annotations={ "permissions": all_perms(Permission.DATA_ANALYTICS_READ), "readOnlyHint": True, } )
- Input schema defined via Pydantic Field annotations for database_name and table_name. Output schema described in the function docstring, returning structured table metadata and columns.async def get_table_schema( database_name: Annotated[ str, Field( description="The name of the database where the table is located", examples=["panther_logs.public"], ), ], table_name: Annotated[ str, Field( description="The name of the table to get columns for", examples=["Panther.Audit"], ), ], ) -> Dict[str, Any]: """Get column details for a specific data lake table. IMPORTANT: This returns the table structure in Snowflake. For writing optimal queries, ALSO call get_panther_log_type_schema() to understand: - Nested object structures (only shown as 'object' type here) - Which fields map to p_any_* indicator columns - Array element structures Example workflow: 1. get_panther_log_type_schema(["AWS.CloudTrail"]) - understand structure 2. get_table_schema("panther_logs.public", "aws_cloudtrail") - get column names/types 3. Write query using both: nested paths from log schema, column names from table schema Returns: Dict containing: - success: Boolean indicating if the query was successful - name: Table name - display_name: Table display name - description: Table description - log_type: Log type - columns: List of columns, each containing: - name: Column name - type: Column data type - description: Column description - message: Error message if unsuccessful """ table_full_path = f"{database_name}.{table_name}" logger.info(f"Fetching column information for table: {table_full_path}") try: # Prepare input variables variables = {"databaseName": database_name, "tableName": table_name} logger.debug(f"Query variables: {variables}") # Execute the query using shared client result = await _execute_query(GET_COLUMNS_FOR_TABLE_QUERY, variables) # Get query data query_data = result.get("dataLakeDatabaseTable", {}) columns = query_data.get("columns", []) if not columns: logger.warning(f"No columns found for table: {table_full_path}") return { "success": False, "message": f"No columns found for table: {table_full_path}", } logger.info(f"Successfully retrieved {len(columns)} columns") # Format the response return { "success": True, "status": "succeeded", **query_data, "stats": { "table_count": len(columns), }, } except Exception as e: logger.error(f"Failed to get columns for table: {str(e)}") return { "success": False, "message": f"Failed to get columns for table: {str(e)}", }
- GraphQL query definition used by the handler to fetch table column details from the Panther GraphQL API.GET_COLUMNS_FOR_TABLE_QUERY = gql(""" query GetColumnDetails($databaseName: String!, $tableName: String!) { dataLakeDatabaseTable(input: { databaseName: $databaseName, tableName: $tableName }) { name, displayName, description, logType, columns { name, type, description } } } """)
- The mcp_tool decorator and register_all_tools function that handle collecting and registering all decorated tools with the MCP server instance.def mcp_tool( func: Optional[Callable] = None, *, name: Optional[str] = None, description: Optional[str] = None, annotations: Optional[Dict[str, Any]] = None, ) -> Callable: """ Decorator to mark a function as an MCP tool. Functions decorated with this will be automatically registered when register_all_tools() is called. Can be used in two ways: 1. Direct decoration: @mcp_tool def my_tool(): ... 2. With parameters: @mcp_tool( name="custom_name", description="Custom description", annotations={"category": "data_analysis"} ) def my_tool(): ... Args: func: The function to decorate name: Optional custom name for the tool. If not provided, uses the function name. description: Optional description of what the tool does. If not provided, uses the function's docstring. annotations: Optional dictionary of additional annotations for the tool. """ def decorator(func: Callable) -> Callable: # Store metadata on the function func._mcp_tool_metadata = { "name": name, "description": description, "annotations": annotations, } _tool_registry.add(func) @wraps(func) def wrapper(*args, **kwargs): return func(*args, **kwargs) return wrapper # Handle both @mcp_tool and @mcp_tool(...) cases if func is None: return decorator return decorator(func)