get_table_schema
Retrieve column names and data types for a specific table in the Panther data lake. Use this tool in conjunction with get_panther_log_type_schema() to optimize queries by understanding nested structures and field mappings.
Instructions
Get column details for a specific data lake table.
IMPORTANT: This returns the table structure in Snowflake. For writing optimal queries, ALSO call get_panther_log_type_schema() to understand:
Nested object structures (only shown as 'object' type here)
Which fields map to p_any_* indicator columns
Array element structures
Example workflow:
get_panther_log_type_schema(["AWS.CloudTrail"]) - understand structure
get_table_schema("panther_logs.public", "aws_cloudtrail") - get column names/types
Write query using both: nested paths from log schema, column names from table schema
Returns: Dict containing: - success: Boolean indicating if the query was successful - name: Table name - display_name: Table display name - description: Table description - log_type: Log type - columns: List of columns, each containing: - name: Column name - type: Column data type - description: Column description - message: Error message if unsuccessful
Permissions:{'all_of': ['Query Data Lake']}
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| database_name | Yes | The name of the database where the table is located | |
| table_name | Yes | The name of the table to get columns for |
Implementation Reference
- The primary handler function for the 'get_table_schema' MCP tool. Decorated with @mcp_tool for automatic registration, includes input schema via Annotated Fields, and implements the logic to query Panther's GraphQL API for Snowflake table column details using GET_COLUMNS_FOR_TABLE_QUERY.@mcp_tool( annotations={ "permissions": all_perms(Permission.DATA_ANALYTICS_READ), "readOnlyHint": True, } ) async def get_table_schema( database_name: Annotated[ str, Field( description="The name of the database where the table is located", examples=["panther_logs.public"], ), ], table_name: Annotated[ str, Field( description="The name of the table to get columns for", examples=["Panther.Audit"], ), ], ) -> Dict[str, Any]: """Get column details for a specific data lake table. IMPORTANT: This returns the table structure in Snowflake. For writing optimal queries, ALSO call get_panther_log_type_schema() to understand: - Nested object structures (only shown as 'object' type here) - Which fields map to p_any_* indicator columns - Array element structures Example workflow: 1. get_panther_log_type_schema(["AWS.CloudTrail"]) - understand structure 2. get_table_schema("panther_logs.public", "aws_cloudtrail") - get column names/types 3. Write query using both: nested paths from log schema, column names from table schema Returns: Dict containing: - success: Boolean indicating if the query was successful - name: Table name - display_name: Table display name - description: Table description - log_type: Log type - columns: List of columns, each containing: - name: Column name - type: Column data type - description: Column description - message: Error message if unsuccessful """ table_full_path = f"{database_name}.{table_name}" logger.info(f"Fetching column information for table: {table_full_path}") try: # Prepare input variables variables = {"databaseName": database_name, "tableName": table_name} logger.debug(f"Query variables: {variables}") # Execute the query asynchronously async with await _create_panther_client() as session: result = await session.execute( GET_COLUMNS_FOR_TABLE_QUERY, variable_values=variables ) # Get query data query_data = result.get("dataLakeDatabaseTable", {}) columns = query_data.get("columns", []) if not columns: logger.warning(f"No columns found for table: {table_full_path}") return { "success": False, "message": f"No columns found for table: {table_full_path}", } logger.info(f"Successfully retrieved {len(columns)} columns") # Format the response return { "success": True, "status": "succeeded", **query_data, "stats": { "table_count": len(columns), }, } except Exception as e: logger.error(f"Failed to get columns for table: {str(e)}") return { "success": False, "message": f"Failed to get columns for table: {str(e)}", }
- Input schema definition for the get_table_schema tool using Pydantic Annotated types and Field descriptions/examples.async def get_table_schema( database_name: Annotated[ str, Field( description="The name of the database where the table is located", examples=["panther_logs.public"], ), ], table_name: Annotated[ str, Field( description="The name of the table to get columns for", examples=["Panther.Audit"], ), ], ) -> Dict[str, Any]:
- src/mcp_panther/panther_mcp_core/tools/data_lake.py:715-720 (registration)Tool registration via the @mcp_tool decorator, specifying required permissions and read-only hint.@mcp_tool( annotations={ "permissions": all_perms(Permission.DATA_ANALYTICS_READ), "readOnlyHint": True, } )
- GraphQL query definition used by get_table_schema to retrieve table metadata including columns, types, and descriptions from the Panther data lake.GET_COLUMNS_FOR_TABLE_QUERY = gql(""" query GetColumnDetails($databaseName: String!, $tableName: String!) { dataLakeDatabaseTable(input: { databaseName: $databaseName, tableName: $tableName }) { name, displayName, description, logType, columns { name, type, description } } } """)