list_log_type_schemas
Retrieve available log type schemas in Panther to understand transformation rules for converting raw audit logs into structured data for analysis and detection.
Instructions
List all available log type schemas in Panther. Schemas are transformation instructions that convert raw audit logs into structured data for the data lake and real-time Python rules.
Returns: Dict containing: - success: Boolean indicating if the query was successful - schemas: List of schemas, each containing: - name: Schema name (Log Type) - description: Schema description - revision: Schema revision number - isArchived: Whether the schema is archived - isManaged: Whether the schema is managed by a pack - referenceURL: Optional documentation URL - createdAt: Creation timestamp - updatedAt: Last update timestamp - message: Error message if unsuccessful
Permissions:{'all_of': ['View Log Sources']}
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| contains | No | Optional filter by name or schema field name | |
| is_archived | No | Filter by archive status (default: False shows non-archived) | |
| is_in_use | No | Filter for used/active schemas (default: False shows all) | |
| is_managed | No | Filter for pack-managed schemas (default: False shows all) |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Implementation Reference
- The handler function that implements the core logic of listing log type schemas using a GraphQL query with optional filters. Includes input schema definitions via Pydantic and output formatting.
@mcp_tool( annotations={ "permissions": all_perms(Permission.LOG_SOURCE_READ), "readOnlyHint": True, } ) async def list_log_type_schemas( contains: Annotated[ str | None, Field(description="Optional filter by name or schema field name"), ] = None, is_archived: Annotated[ bool, Field( description="Filter by archive status (default: False shows non-archived)" ), ] = False, is_in_use: Annotated[ bool, Field(description="Filter for used/active schemas (default: False shows all)"), ] = False, is_managed: Annotated[ bool, Field(description="Filter for pack-managed schemas (default: False shows all)"), ] = False, ) -> dict[str, Any]: """List all available log type schemas in Panther. Schemas are transformation instructions that convert raw audit logs into structured data for the data lake and real-time Python rules. Returns: Dict containing: - success: Boolean indicating if the query was successful - schemas: List of schemas, each containing: - name: Schema name (Log Type) - description: Schema description - revision: Schema revision number - isArchived: Whether the schema is archived - isManaged: Whether the schema is managed by a pack - referenceURL: Optional documentation URL - createdAt: Creation timestamp - updatedAt: Last update timestamp - message: Error message if unsuccessful """ logger.info("Fetching available schemas") try: # Prepare input variables, only including non-default values input_vars = {} if contains is not None: input_vars["contains"] = contains if is_archived: input_vars["isArchived"] = is_archived if is_in_use: input_vars["isInUse"] = is_in_use if is_managed: input_vars["isManaged"] = is_managed variables = {"input": input_vars} # Execute the query using shared client result = await _execute_query(LIST_SCHEMAS_QUERY, variables) # Get schemas data and ensure we have the required structure schemas_data = result.get("schemas") if not schemas_data: return {"success": False, "message": "No schemas data returned from server"} edges = schemas_data.get("edges", []) schemas = [edge["node"] for edge in edges] if edges else [] logger.info(f"Successfully retrieved {len(schemas)} schemas") # Format the response return { "success": True, "schemas": schemas, } except Exception as e: logger.error(f"Failed to fetch schemas: {str(e)}") return { "success": False, "message": f"Failed to fetch schemas: {str(e)}", } - src/mcp_panther/panther_mcp_core/tools/schemas.py:19-24 (registration)The @mcp_tool decorator registers list_log_type_schemas in the tool registry during module import.
@mcp_tool( annotations={ "permissions": all_perms(Permission.LOG_SOURCE_READ), "readOnlyHint": True, } ) - src/mcp_panther/panther_mcp_core/tools/__init__.py:35-35 (registration)Import of the schemas module in tools __init__.py triggers loading and registration of the list_log_type_schemas tool via its decorator.
schemas, - src/mcp_panther/server.py:72-72 (registration)Calls register_all_tools(mcp) to register all tools from the registry, including list_log_type_schemas, with the FastMCP server instance.
register_all_tools(mcp) - Pydantic-based input schema parameters and docstring defining expected output structure for the tool.
async def list_log_type_schemas( contains: Annotated[ str | None, Field(description="Optional filter by name or schema field name"), ] = None, is_archived: Annotated[ bool, Field( description="Filter by archive status (default: False shows non-archived)" ), ] = False, is_in_use: Annotated[ bool, Field(description="Filter for used/active schemas (default: False shows all)"), ] = False, is_managed: Annotated[ bool, Field(description="Filter for pack-managed schemas (default: False shows all)"), ] = False, ) -> dict[str, Any]: