create_hunt
Create a Velociraptor hunt to collect forensic artifacts across multiple endpoints with configurable filters for targeted investigation.
Instructions
Create a new Velociraptor hunt to collect artifacts across multiple clients.
Args: artifacts: List of artifact names to collect description: Description of the hunt's purpose parameters: Optional parameters for artifacts. Format: {"ArtifactName": {"param": "value"}} include_labels: Only include clients with these labels exclude_labels: Exclude clients with these labels os_filter: Filter by OS: 'windows', 'linux', 'darwin' timeout: Query timeout per client in seconds (default 600) expires_hours: Hunt expiration in hours (default 24) paused: Create hunt in paused state (default True for safety)
Returns: Hunt ID and details.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| artifacts | Yes | ||
| description | Yes | ||
| parameters | No | ||
| include_labels | No | ||
| exclude_labels | No | ||
| os_filter | No | ||
| timeout | No | ||
| expires_hours | No | ||
| paused | No |
Implementation Reference
- src/megaraptor_mcp/tools/hunts.py:22-145 (handler)The 'create_hunt' function is defined as an MCP tool in this file, which handles the creation of a Velociraptor hunt via VQL query execution through the MCP client.
@mcp.tool() async def create_hunt( artifacts: list[str], description: str, parameters: Optional[dict[str, Any]] = None, include_labels: Optional[list[str]] = None, exclude_labels: Optional[list[str]] = None, os_filter: Optional[str] = None, timeout: int = 600, expires_hours: int = 24, paused: bool = True, ) -> list[TextContent]: """Create a new Velociraptor hunt to collect artifacts across multiple clients. Args: artifacts: List of artifact names to collect description: Description of the hunt's purpose parameters: Optional parameters for artifacts. Format: {"ArtifactName": {"param": "value"}} include_labels: Only include clients with these labels exclude_labels: Exclude clients with these labels os_filter: Filter by OS: 'windows', 'linux', 'darwin' timeout: Query timeout per client in seconds (default 600) expires_hours: Hunt expiration in hours (default 24) paused: Create hunt in paused state (default True for safety) Returns: Hunt ID and details. """ # Input validation if not artifacts: return [TextContent( type="text", text=json.dumps({ "error": "artifacts parameter is required and cannot be empty" }) )] if not description: return [TextContent( type="text", text=json.dumps({ "error": "description parameter is required and cannot be empty" }) )] if os_filter and os_filter not in ['windows', 'linux', 'darwin']: return [TextContent( type="text", text=json.dumps({ "error": f"Invalid os_filter '{os_filter}'. Must be one of: windows, linux, darwin" }) )] try: client = get_client() # Build the artifacts list artifacts_str = ", ".join(f"'{a}'" for a in artifacts) # Build optional parameters parts = [ f"artifacts=[{artifacts_str}]", f"description='{description}'", f"timeout={timeout}", f"expires=now() + {expires_hours * 3600}", f"pause={'true' if paused else 'false'}", ] if parameters: spec_json = json.dumps(parameters).replace("'", "\\'") parts.append(f"spec={spec_json}") if include_labels: labels_str = ", ".join(f"'{l}'" for l in include_labels) parts.append(f"include_labels=[{labels_str}]") if exclude_labels: labels_str = ", ".join(f"'{l}'" for l in exclude_labels) parts.append(f"exclude_labels=[{labels_str}]") if os_filter: parts.append(f"os='{os_filter}'") params_str = ", ".join(parts) vql = f"SELECT hunt({params_str}) AS hunt FROM scope()" results = client.query(vql) if not results: return [TextContent( type="text", text=json.dumps({"error": "Failed to create hunt"}) )] hunt = results[0].get("hunt", {}) return [TextContent( type="text", text=json.dumps({ "status": "hunt_created", "hunt_id": hunt.get("hunt_id", ""), "description": description, "artifacts": artifacts, "state": "PAUSED" if paused else "RUNNING", "expires": hunt.get("expires", ""), }, indent=2, default=str) )] except grpc.RpcError as e: error_response = map_grpc_error(e, "hunt creation") return [TextContent( type="text", text=json.dumps(error_response) )] except Exception: # Generic errors - don't expose internals return [TextContent( type="text", text=json.dumps({ "error": "Unexpected error during hunt creation", "hint": "Check Velociraptor server logs or contact administrator" }) )]