ocsf_get_object
Retrieve complete OCSF object definitions to understand nested structures, view fields and types, and map source data correctly for cybersecurity event analysis.
Instructions
Get the complete definition of an OCSF object type including all fields and metadata.
Use this tool to:
Understand complex nested object structures in OCSF classes
See the fields and types within objects like 'file', 'process', 'user'
Map source data to nested OCSF structures correctly
Reference when constructing TQL operators for OCSF mapping
Objects are reusable components within OCSF event classes, defining standard structures like endpoints, files, processes, etc.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| version | Yes | OCSF schema version (e.g., '1.3.0') | |
| name | Yes | OCSF object name (e.g., 'email', 'file', 'process') |
Implementation Reference
- The core handler function for the 'ocsf_get_object' tool. Decorated with @mcp.tool, it takes version and name parameters, loads the OCSF schema using load_ocsf_schema helper, searches for the object, and returns formatted markdown content and structured JSON data or error messages.
@mcp.tool( name="ocsf_get_object", tags={"ocsf"}, annotations={ "title": "Get OCSF object", "readOnlyHint": True, "idempotentHint": True, "openWorldHint": False, }, ) async def ocsf_get_object( version: Annotated[str, Field(description="OCSF schema version (e.g., '1.3.0')")], name: Annotated[ str, Field(description="OCSF object name (e.g., 'email', 'file', 'process')") ], ) -> ToolResult: """Get the complete definition of an OCSF object type including all fields and metadata. Use this tool to: - Understand complex nested object structures in OCSF classes - See the fields and types within objects like 'file', 'process', 'user' - Map source data to nested OCSF structures correctly - Reference when constructing TQL operators for OCSF mapping Objects are reusable components within OCSF event classes, defining standard structures like endpoints, files, processes, etc.""" try: schema = load_ocsf_schema(version) # Look for the object in the schema if "objects" not in schema: error_msg = f"No objects found in OCSF schema version {version}" return ToolResult( content=error_msg, structured_content={"error": error_msg} ) # Search for object by name (case-insensitive) for object_id, object_data in schema["objects"].items(): object_name = object_data.get("name", object_id) if object_name.lower() == name.lower() or object_id.lower() == name.lower(): # Format as markdown description = object_data.get("description", "No description") schema_json = json.dumps(object_data, indent=2, sort_keys=True) markdown = ( f"# {object_name}\n\n" f"**ID**: {object_id}\n\n" f"**Description**: {description}\n\n" "## Schema\n" f"```json\n{schema_json}\n```" ) result = {"id": object_id, "name": object_name, "data": object_data} return ToolResult( content=markdown, # Markdown summary structured_content=result, # Full JSON data ) error_msg = f"Object '{name}' not found in OCSF schema version {version}" return ToolResult(content=error_msg, structured_content={"error": error_msg}) except FileNotFoundError: error_msg = f"OCSF schema version {version} not found" logger.error(error_msg) return ToolResult(content=error_msg, structured_content={"error": error_msg}) except json.JSONDecodeError as e: error_msg = f"Failed to parse OCSF schema for version {version}: {e}" logger.error(error_msg) return ToolResult(content=error_msg, structured_content={"error": error_msg}) except Exception as e: error_msg = f"Failed to get OCSF object {name} for version {version}: {e}" logger.error(error_msg) return ToolResult(content=error_msg, structured_content={"error": error_msg}) - Helper function used by the ocsf_get_object tool to load and parse the OCSF schema JSON file for the given version from package resources.
def load_ocsf_schema(version: str) -> dict[str, Any]: """ Load and parse an OCSF schema for the specified version. Args: version: The OCSF schema version to load Returns: Dictionary containing the parsed OCSF schema Raises: FileNotFoundError: If the schema version is not found json.JSONDecodeError: If the schema JSON is invalid Exception: For other loading errors """ schema_text = files("tenzir_mcp.data.ocsf").joinpath(f"{version}.json").read_text() schema: dict[str, Any] = json.loads(schema_text) return schema - src/tenzir_mcp/tools/ocsf/__init__.py:6-6 (registration)Import statement in the OCSF tools package __init__.py that exposes the ocsf_get_object tool function.
from .ocsf_get_object import ocsf_get_object