get_flow_results
Retrieve forensic collection results from Velociraptor flows for digital forensics and incident response analysis.
Instructions
Get results from a specific Velociraptor collection flow.
Args: client_id: The client ID (e.g., 'C.1234567890abcdef') flow_id: The flow ID (e.g., 'F.1234567890') artifact: Optional specific artifact to get results for limit: Maximum number of result rows to return (default 1000)
Returns: Collection results data.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| client_id | Yes | ||
| flow_id | Yes | ||
| artifact | No | ||
| limit | No |
Implementation Reference
- The handler function 'get_flow_results' which executes the logic to fetch flow results via Velociraptor VQL.
@mcp.tool() async def get_flow_results( client_id: str, flow_id: str, artifact: Optional[str] = None, limit: int = 1000, ) -> list[TextContent]: """Get results from a specific Velociraptor collection flow. Args: client_id: The client ID (e.g., 'C.1234567890abcdef') flow_id: The flow ID (e.g., 'F.1234567890') artifact: Optional specific artifact to get results for limit: Maximum number of result rows to return (default 1000) Returns: Collection results data. """ try: # Input validation client_id = validate_client_id(client_id) flow_id = validate_flow_id(flow_id) limit = validate_limit(limit) client = get_client() # Build the VQL query if artifact: vql = f""" SELECT * FROM source( client_id='{client_id}', flow_id='{flow_id}', artifact='{artifact}' ) LIMIT {limit} """ else: vql = f""" SELECT * FROM source( client_id='{client_id}', flow_id='{flow_id}' ) LIMIT {limit} """ results = client.query(vql) return [TextContent( type="text", text=json.dumps({ "client_id": client_id, "flow_id": flow_id, "artifact": artifact, "result_count": len(results), "results": results, }, indent=2, default=str) )] except grpc.RpcError as e: error_response = map_grpc_error(e, f"flow results for {flow_id}") # Check if it's a not-found error if "NOT_FOUND" in error_response.get("grpc_status", ""): error_response["hint"] = f"Flow {flow_id} may not exist for client {client_id}. Use list_flows(client_id='{client_id}') to see available flows." return [TextContent( type="text", text=json.dumps(error_response) )] except ValueError as e: # Validation errors return [TextContent( type="text", text=json.dumps({ "error": str(e), "hint": "Provide valid client ID (C.*) and flow ID (F.*)" }) )] except Exception: # Generic errors - don't expose internals return [TextContent( type="text", text=json.dumps({ "error": "Failed to get flow results", "hint": "Check IDs and Velociraptor server connection" }) )]