Skip to main content
Glama
panther-labs

Panther MCP Server

Official

get_table_schema

Retrieve column names and data types for a specific table in the Panther data lake. Use this tool in conjunction with get_panther_log_type_schema() to optimize queries by understanding nested structures and field mappings.

Instructions

Get column details for a specific data lake table.

IMPORTANT: This returns the table structure in Snowflake. For writing optimal queries, ALSO call get_panther_log_type_schema() to understand:

  • Nested object structures (only shown as 'object' type here)

  • Which fields map to p_any_* indicator columns

  • Array element structures

Example workflow:

  1. get_panther_log_type_schema(["AWS.CloudTrail"]) - understand structure

  2. get_table_schema("panther_logs.public", "aws_cloudtrail") - get column names/types

  3. Write query using both: nested paths from log schema, column names from table schema

Returns: Dict containing: - success: Boolean indicating if the query was successful - name: Table name - display_name: Table display name - description: Table description - log_type: Log type - columns: List of columns, each containing: - name: Column name - type: Column data type - description: Column description - message: Error message if unsuccessful

Permissions:{'all_of': ['Query Data Lake']}

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
database_nameYesThe name of the database where the table is located
table_nameYesThe name of the table to get columns for

Implementation Reference

  • The primary handler function for the 'get_table_schema' MCP tool. Decorated with @mcp_tool for automatic registration, includes input schema via Annotated Fields, and implements the logic to query Panther's GraphQL API for Snowflake table column details using GET_COLUMNS_FOR_TABLE_QUERY.
    @mcp_tool( annotations={ "permissions": all_perms(Permission.DATA_ANALYTICS_READ), "readOnlyHint": True, } ) async def get_table_schema( database_name: Annotated[ str, Field( description="The name of the database where the table is located", examples=["panther_logs.public"], ), ], table_name: Annotated[ str, Field( description="The name of the table to get columns for", examples=["Panther.Audit"], ), ], ) -> Dict[str, Any]: """Get column details for a specific data lake table. IMPORTANT: This returns the table structure in Snowflake. For writing optimal queries, ALSO call get_panther_log_type_schema() to understand: - Nested object structures (only shown as 'object' type here) - Which fields map to p_any_* indicator columns - Array element structures Example workflow: 1. get_panther_log_type_schema(["AWS.CloudTrail"]) - understand structure 2. get_table_schema("panther_logs.public", "aws_cloudtrail") - get column names/types 3. Write query using both: nested paths from log schema, column names from table schema Returns: Dict containing: - success: Boolean indicating if the query was successful - name: Table name - display_name: Table display name - description: Table description - log_type: Log type - columns: List of columns, each containing: - name: Column name - type: Column data type - description: Column description - message: Error message if unsuccessful """ table_full_path = f"{database_name}.{table_name}" logger.info(f"Fetching column information for table: {table_full_path}") try: # Prepare input variables variables = {"databaseName": database_name, "tableName": table_name} logger.debug(f"Query variables: {variables}") # Execute the query asynchronously async with await _create_panther_client() as session: result = await session.execute( GET_COLUMNS_FOR_TABLE_QUERY, variable_values=variables ) # Get query data query_data = result.get("dataLakeDatabaseTable", {}) columns = query_data.get("columns", []) if not columns: logger.warning(f"No columns found for table: {table_full_path}") return { "success": False, "message": f"No columns found for table: {table_full_path}", } logger.info(f"Successfully retrieved {len(columns)} columns") # Format the response return { "success": True, "status": "succeeded", **query_data, "stats": { "table_count": len(columns), }, } except Exception as e: logger.error(f"Failed to get columns for table: {str(e)}") return { "success": False, "message": f"Failed to get columns for table: {str(e)}", }
  • Input schema definition for the get_table_schema tool using Pydantic Annotated types and Field descriptions/examples.
    async def get_table_schema( database_name: Annotated[ str, Field( description="The name of the database where the table is located", examples=["panther_logs.public"], ), ], table_name: Annotated[ str, Field( description="The name of the table to get columns for", examples=["Panther.Audit"], ), ], ) -> Dict[str, Any]:
  • Tool registration via the @mcp_tool decorator, specifying required permissions and read-only hint.
    @mcp_tool( annotations={ "permissions": all_perms(Permission.DATA_ANALYTICS_READ), "readOnlyHint": True, } )
  • GraphQL query definition used by get_table_schema to retrieve table metadata including columns, types, and descriptions from the Panther data lake.
    GET_COLUMNS_FOR_TABLE_QUERY = gql(""" query GetColumnDetails($databaseName: String!, $tableName: String!) { dataLakeDatabaseTable(input: { databaseName: $databaseName, tableName: $tableName }) { name, displayName, description, logType, columns { name, type, description } } } """)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/panther-labs/mcp-panther'

If you have feedback or need assistance with the MCP directory API, please join our Discord server