run_vql
Execute Velociraptor Query Language queries for forensic analysis, endpoint management, and threat hunting investigations.
Instructions
Execute an arbitrary VQL (Velociraptor Query Language) query.
VQL is the query language used by Velociraptor for forensic analysis. It follows a SQL-like syntax with plugins instead of tables.
Common VQL patterns:
SELECT * FROM info() -- Get server info
SELECT * FROM clients() -- List all clients
SELECT * FROM pslist() -- List processes (client artifact)
SELECT * FROM Artifact.Windows.System.Pslist() -- Run artifact
Args: query: The VQL query to execute env: Optional environment variables to pass to the query. Use this to safely pass dynamic values instead of string interpolation. max_rows: Maximum number of rows to return (default 10000) org_id: Optional organization ID for multi-tenant deployments
Returns: Query results as JSON.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| query | Yes | ||
| env | No | ||
| max_rows | No | ||
| org_id | No |
Implementation Reference
- src/megaraptor_mcp/tools/vql.py:23-115 (handler)The handler implementation for the `run_vql` tool, which executes VQL queries against a Velociraptor client, including input validation and error handling.
@mcp.tool() async def run_vql( query: str, env: Optional[dict[str, Any]] = None, max_rows: int = 10000, org_id: Optional[str] = None, ) -> list[TextContent]: """Execute an arbitrary VQL (Velociraptor Query Language) query. VQL is the query language used by Velociraptor for forensic analysis. It follows a SQL-like syntax with plugins instead of tables. Common VQL patterns: - SELECT * FROM info() -- Get server info - SELECT * FROM clients() -- List all clients - SELECT * FROM pslist() -- List processes (client artifact) - SELECT * FROM Artifact.Windows.System.Pslist() -- Run artifact Args: query: The VQL query to execute env: Optional environment variables to pass to the query. Use this to safely pass dynamic values instead of string interpolation. max_rows: Maximum number of rows to return (default 10000) org_id: Optional organization ID for multi-tenant deployments Returns: Query results as JSON. """ try: # Input validation if not query or not query.strip(): return [TextContent( type="text", text=json.dumps({ "error": "query parameter is required and cannot be empty" }) )] max_rows = validate_limit(max_rows) query = validate_vql_syntax_basics(query) # Add LIMIT if not already present and query doesn't have one query_upper = query.upper() if "LIMIT" not in query_upper: query = f"{query.rstrip(';')} LIMIT {max_rows}" client = get_client() results = client.query(query, env=env, org_id=org_id) return [TextContent( type="text", text=json.dumps({ "query": query, "row_count": len(results), "results": results, }, indent=2, default=str) )] except grpc.RpcError as e: error_response = map_grpc_error(e, "VQL query execution") # For INVALID_ARGUMENT errors, try to extract VQL-specific hints if error_response.get("grpc_status") == "INVALID_ARGUMENT": error_message = str(e) vql_hint = extract_vql_error_hint(error_message) if vql_hint: error_response["vql_hint"] = vql_hint error_response["query"] = query return [TextContent( type="text", text=json.dumps(error_response) )] except ValueError as e: # Validation errors return [TextContent( type="text", text=json.dumps({ "error": str(e), "hint": "Check VQL syntax and max_rows parameter" }) )] except Exception: # Generic errors - don't expose internals return [TextContent( type="text", text=json.dumps({ "error": "Failed to execute VQL query", "hint": "Check VQL syntax and Velociraptor server connection" }) )]