Skip to main content
Glama
wagonbomb

Megaraptor MCP

by wagonbomb

create_hunt

Create a Velociraptor hunt to collect forensic artifacts across multiple endpoints with configurable filters for targeted investigation.

Instructions

Create a new Velociraptor hunt to collect artifacts across multiple clients.

Args: artifacts: List of artifact names to collect description: Description of the hunt's purpose parameters: Optional parameters for artifacts. Format: {"ArtifactName": {"param": "value"}} include_labels: Only include clients with these labels exclude_labels: Exclude clients with these labels os_filter: Filter by OS: 'windows', 'linux', 'darwin' timeout: Query timeout per client in seconds (default 600) expires_hours: Hunt expiration in hours (default 24) paused: Create hunt in paused state (default True for safety)

Returns: Hunt ID and details.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
artifactsYes
descriptionYes
parametersNo
include_labelsNo
exclude_labelsNo
os_filterNo
timeoutNo
expires_hoursNo
pausedNo

Implementation Reference

  • The 'create_hunt' function is defined as an MCP tool in this file, which handles the creation of a Velociraptor hunt via VQL query execution through the MCP client.
    @mcp.tool()
    async def create_hunt(
        artifacts: list[str],
        description: str,
        parameters: Optional[dict[str, Any]] = None,
        include_labels: Optional[list[str]] = None,
        exclude_labels: Optional[list[str]] = None,
        os_filter: Optional[str] = None,
        timeout: int = 600,
        expires_hours: int = 24,
        paused: bool = True,
    ) -> list[TextContent]:
        """Create a new Velociraptor hunt to collect artifacts across multiple clients.
    
        Args:
            artifacts: List of artifact names to collect
            description: Description of the hunt's purpose
            parameters: Optional parameters for artifacts. Format: {"ArtifactName": {"param": "value"}}
            include_labels: Only include clients with these labels
            exclude_labels: Exclude clients with these labels
            os_filter: Filter by OS: 'windows', 'linux', 'darwin'
            timeout: Query timeout per client in seconds (default 600)
            expires_hours: Hunt expiration in hours (default 24)
            paused: Create hunt in paused state (default True for safety)
    
        Returns:
            Hunt ID and details.
        """
        # Input validation
        if not artifacts:
            return [TextContent(
                type="text",
                text=json.dumps({
                    "error": "artifacts parameter is required and cannot be empty"
                })
            )]
    
        if not description:
            return [TextContent(
                type="text",
                text=json.dumps({
                    "error": "description parameter is required and cannot be empty"
                })
            )]
    
        if os_filter and os_filter not in ['windows', 'linux', 'darwin']:
            return [TextContent(
                type="text",
                text=json.dumps({
                    "error": f"Invalid os_filter '{os_filter}'. Must be one of: windows, linux, darwin"
                })
            )]
    
        try:
            client = get_client()
    
            # Build the artifacts list
            artifacts_str = ", ".join(f"'{a}'" for a in artifacts)
    
            # Build optional parameters
            parts = [
                f"artifacts=[{artifacts_str}]",
                f"description='{description}'",
                f"timeout={timeout}",
                f"expires=now() + {expires_hours * 3600}",
                f"pause={'true' if paused else 'false'}",
            ]
    
            if parameters:
                spec_json = json.dumps(parameters).replace("'", "\\'")
                parts.append(f"spec={spec_json}")
    
            if include_labels:
                labels_str = ", ".join(f"'{l}'" for l in include_labels)
                parts.append(f"include_labels=[{labels_str}]")
    
            if exclude_labels:
                labels_str = ", ".join(f"'{l}'" for l in exclude_labels)
                parts.append(f"exclude_labels=[{labels_str}]")
    
            if os_filter:
                parts.append(f"os='{os_filter}'")
    
            params_str = ", ".join(parts)
            vql = f"SELECT hunt({params_str}) AS hunt FROM scope()"
    
            results = client.query(vql)
    
            if not results:
                return [TextContent(
                    type="text",
                    text=json.dumps({"error": "Failed to create hunt"})
                )]
    
            hunt = results[0].get("hunt", {})
    
            return [TextContent(
                type="text",
                text=json.dumps({
                    "status": "hunt_created",
                    "hunt_id": hunt.get("hunt_id", ""),
                    "description": description,
                    "artifacts": artifacts,
                    "state": "PAUSED" if paused else "RUNNING",
                    "expires": hunt.get("expires", ""),
                }, indent=2, default=str)
            )]
    
        except grpc.RpcError as e:
            error_response = map_grpc_error(e, "hunt creation")
            return [TextContent(
                type="text",
                text=json.dumps(error_response)
            )]
        except Exception:
            # Generic errors - don't expose internals
            return [TextContent(
                type="text",
                text=json.dumps({
                    "error": "Unexpected error during hunt creation",
                    "hint": "Check Velociraptor server logs or contact administrator"
                })
            )]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wagonbomb/megaraptor-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server