list_flows
Retrieve forensic collection flows for a Velociraptor client to monitor investigation status and collected artifacts.
Instructions
List collection flows for a Velociraptor client.
Args: client_id: The client ID (e.g., 'C.1234567890abcdef') limit: Maximum number of flows to return (default 50)
Returns: List of flows with their status and artifacts.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| client_id | Yes | ||
| limit | No |
Implementation Reference
- src/megaraptor_mcp/tools/flows.py:23-80 (handler)The implementation of the `list_flows` tool. It takes a client_id and a limit, validates them, queries the Velociraptor server using a VQL query, and formats the response.
@mcp.tool() async def list_flows( client_id: str, limit: int = 50, ) -> list[TextContent]: """List collection flows for a Velociraptor client. Args: client_id: The client ID (e.g., 'C.1234567890abcdef') limit: Maximum number of flows to return (default 50) Returns: List of flows with their status and artifacts. """ try: # Input validation client_id = validate_client_id(client_id) limit = validate_limit(limit) client = get_client() vql = f"SELECT * FROM flows(client_id='{client_id}') LIMIT {limit}" results = client.query(vql) # Format the results formatted = [] for row in results: flow = { "flow_id": row.get("session_id", ""), "state": row.get("state", ""), "artifacts": row.get("artifacts_with_results", []), "request": { "artifacts": row.get("request", {}).get("artifacts", []), "creator": row.get("request", {}).get("creator", ""), }, "create_time": row.get("create_time", ""), "start_time": row.get("start_time", ""), "active_time": row.get("active_time", ""), "total_uploaded_bytes": row.get("total_uploaded_bytes", 0), "total_collected_rows": row.get("total_collected_rows", 0), "total_logs": row.get("total_logs", 0), } formatted.append(flow) return [TextContent( type="text", text=json.dumps(formatted, indent=2, default=str) )] except grpc.RpcError as e: error_response = map_grpc_error(e, f"listing flows for {client_id}") # Check if it's a not-found error if "NOT_FOUND" in error_response.get("grpc_status", ""): error_response["hint"] = f"Client {client_id} may not exist. Use list_clients() to see available clients." return [TextContent( type="text", text=json.dumps(error_response) )]