ocsf_get_classes
Retrieve OCSF event classes and descriptions for a specific schema version to identify suitable mappings for cybersecurity log data.
Instructions
Get all OCSF event classes and their descriptions for a specific schema version.
Use this tool to:
Browse available OCSF event classes before creating a mapping
Identify which class best matches your log data
Understand the purpose and scope of each event class
Once you identify a candidate class, use ocsf_get_class to see its
complete schema with all fields and attributes.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| version | Yes | OCSF schema version (e.g., '1.6.0') |
Implementation Reference
- The @mcp.tool decorated async handler function that loads the OCSF schema, extracts event classes with descriptions, formats as markdown and structured JSON, with comprehensive error handling.@mcp.tool( name="ocsf_get_classes", tags={"ocsf"}, annotations={ "title": "List OCSF event classes", "readOnlyHint": True, "idempotentHint": True, "openWorldHint": False, }, ) async def ocsf_get_classes( version: Annotated[str, Field(description="OCSF schema version (e.g., '1.6.0')")], ) -> ToolResult: """Get all OCSF event classes and their descriptions for a specific schema version. Use this tool to: - Browse available OCSF event classes before creating a mapping - Identify which class best matches your log data - Understand the purpose and scope of each event class Once you identify a candidate class, use `ocsf_get_class` to see its complete schema with all fields and attributes.""" try: schema = load_ocsf_schema(version) # Extract event classes from the schema event_classes = {} if "classes" in schema: for class_id, class_data in schema["classes"].items(): class_name = class_data.get("name", class_id) description = class_data.get("description", "No description available") event_classes[class_name] = description # Format as markdown list markdown_lines = [f"## OCSF Event Classes (v{version})\n"] for name, desc in sorted(event_classes.items()): markdown_lines.append(f"- **{name}**: {desc}") return ToolResult( content="\n".join(markdown_lines), # Markdown list structured_content={"classes": event_classes, "version": version}, # JSON ) except FileNotFoundError: error_msg = f"OCSF schema version {version} not found" logger.error(error_msg) return ToolResult(content=error_msg, structured_content={"error": error_msg}) except json.JSONDecodeError as e: error_msg = f"Failed to parse OCSF schema for version {version}: {e}" logger.error(error_msg) return ToolResult(content=error_msg, structured_content={"error": error_msg}) except Exception as e: error_msg = f"Failed to get OCSF event classes for version {version}: {e}" logger.error(error_msg) return ToolResult(content=error_msg, structured_content={"error": error_msg})
- Helper utility to load and parse the OCSF schema JSON file for a given version from package resources.def load_ocsf_schema(version: str) -> dict[str, Any]: """ Load and parse an OCSF schema for the specified version. Args: version: The OCSF schema version to load Returns: Dictionary containing the parsed OCSF schema Raises: FileNotFoundError: If the schema version is not found json.JSONDecodeError: If the schema JSON is invalid Exception: For other loading errors """ schema_text = files("tenzir_mcp.data.ocsf").joinpath(f"{version}.json").read_text() schema: dict[str, Any] = json.loads(schema_text) return schema
- src/tenzir_mcp/tools/ocsf/__init__.py:4-4 (registration)Imports the ocsf_get_classes tool function for exposure via the package __init__.from .ocsf_get_classes import ocsf_get_classes
- Input schema definition using Pydantic Annotated and Field for the version parameter.version: Annotated[str, Field(description="OCSF schema version (e.g., '1.6.0')")], ) -> ToolResult: