check_agent_deployment
Verify which agents have successfully enrolled with a specific deployment in the Velociraptor platform, allowing you to monitor endpoint enrollment status and filter results by client hostname or labels.
Instructions
Verify agent enrollment status for a deployment.
Checks which agents have successfully enrolled with the server.
Args: deployment_id: The deployment to check client_search: Optional search filter for client hostname/ID labels: Filter by client labels
Returns: List of enrolled clients and their status.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| deployment_id | Yes | ||
| client_search | No | ||
| labels | No |
Implementation Reference
- The 'check_agent_deployment' function implements agent enrollment verification using a VQL query to the Velociraptor client API. It categorizes clients by their last seen time as either online or offline.
async def check_agent_deployment( deployment_id: str, client_search: Optional[str] = None, labels: Optional[list[str]] = None, ) -> list[TextContent]: """Verify agent enrollment status for a deployment. Checks which agents have successfully enrolled with the server. Args: deployment_id: The deployment to check client_search: Optional search filter for client hostname/ID labels: Filter by client labels Returns: List of enrolled clients and their status. """ try: from ..client import get_client client = get_client() # Build VQL query conditions = [] if client_search: conditions.append(f"os_info.hostname =~ '{client_search}' OR client_id =~ '{client_search}'") if labels: label_conditions = " OR ".join(f"'{l}' in labels" for l in labels) conditions.append(f"({label_conditions})") where_clause = f" WHERE {' AND '.join(conditions)}" if conditions else "" vql = f""" SELECT client_id, os_info.hostname AS hostname, os_info.system AS os, labels, last_seen_at, first_seen_at FROM clients() {where_clause} ORDER BY last_seen_at DESC LIMIT 100 """ results = client.query(vql) # Categorize by status now = datetime.now(timezone.utc) online = [] offline = [] for r in results: last_seen = r.get("last_seen_at", 0) if isinstance(last_seen, (int, float)): last_seen_dt = datetime.fromtimestamp(last_seen / 1000000, tz=timezone.utc) minutes_ago = (now - last_seen_dt).total_seconds() / 60 r["minutes_since_seen"] = round(minutes_ago, 1) if minutes_ago < 10: online.append(r) else: offline.append(r) else: offline.append(r) return [TextContent( type="text", text=json.dumps({ "deployment_id": deployment_id, "total_clients": len(results), "online": len(online), "offline": len(offline), "online_clients": online, "offline_clients": offline, }, indent=2, default=str) )] except ImportError as e: return [TextContent( type="text", text=json.dumps({ "error": f"Missing dependency: {str(e)}", "hint": "Install required packages with: pip install megaraptor-mcp[deployment]" }, indent=2) )] except Exception: # Generic errors - don't expose internals return [TextContent( type="text", text=json.dumps({ "error": "Operation failed", "hint": "Check deployment configuration and try again" }, indent=2) )]