collect_artifact
Schedule forensic artifact collection from Velociraptor endpoints for digital investigation and threat hunting workflows.
Instructions
Schedule artifact collection on a Velociraptor client.
Args: client_id: The client ID (e.g., 'C.1234567890abcdef') artifacts: List of artifact names to collect parameters: Optional dict of parameters for the artifacts. Format: {"ArtifactName": {"param1": "value1"}} timeout: Query timeout in seconds (default 600) urgent: If True, prioritize this collection (default False)
Returns: Flow ID for tracking the collection.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| client_id | Yes | ||
| artifacts | Yes | ||
| parameters | No | ||
| timeout | No | ||
| urgent | No |
Implementation Reference
- The `collect_artifact` handler is defined here using the `@mcp.tool()` decorator, validating inputs and initiating a collection flow on the Velociraptor server.
@mcp.tool() async def collect_artifact( client_id: str, artifacts: list[str], parameters: Optional[dict[str, Any]] = None, timeout: int = 600, urgent: bool = False, ) -> list[TextContent]: """Schedule artifact collection on a Velociraptor client. Args: client_id: The client ID (e.g., 'C.1234567890abcdef') artifacts: List of artifact names to collect parameters: Optional dict of parameters for the artifacts. Format: {"ArtifactName": {"param1": "value1"}} timeout: Query timeout in seconds (default 600) urgent: If True, prioritize this collection (default False) Returns: Flow ID for tracking the collection. """ try: # Validate inputs client_id = validate_client_id(client_id) if not artifacts: return [TextContent( type="text", text=json.dumps({ "error": "artifacts parameter is required and cannot be empty", "hint": "Use list_artifacts tool to find available artifacts" }) )] if timeout < 1: return [TextContent( type="text", text=json.dumps({ "error": f"timeout must be at least 1 second, got {timeout}", "hint": "Specify a positive timeout value in seconds" }) )] client = get_client() # Build the artifacts list artifacts_str = ", ".join(f"'{a}'" for a in artifacts) # Build the spec parameter if parameters are provided spec_part = "" if parameters: spec_json = json.dumps(parameters) spec_part = f", spec={spec_json}" urgent_part = ", urgent=true" if urgent else "" vql = f""" SELECT collect_client( client_id='{client_id}', artifacts=[{artifacts_str}], timeout={timeout} {spec_part} {urgent_part} ) AS collection FROM scope() """ results = client.query(vql) if not results: return [TextContent( type="text", text=json.dumps({ "error": "Failed to start collection", "hint": "Verify client_id exists and is online" }) )] collection = results[0].get("collection", {}) return [TextContent( type="text", text=json.dumps({ "status": "collection_started", "client_id": client_id, "artifacts": artifacts, "flow_id": collection.get("flow_id", ""), "request": collection.get("request", {}), }, indent=2, default=str) )] except ValueError as e: # Validation errors return [TextContent( type="text", text=json.dumps({ "error": str(e), "hint": "Check your client_id and other parameters" }) )] except grpc.RpcError as e: # gRPC errors error_info = map_grpc_error(e, f"collecting artifact on client {client_id}") return [TextContent( type="text", text=json.dumps(error_info, indent=2) )] except Exception: # Generic errors - don't expose internals return [TextContent( type="text", text=json.dumps({ "error": "Failed to schedule artifact collection", "hint": "Check Velociraptor server connection and try again" }) )]