list_violations
Retrieve compliance violations from the latest snapshot, optionally filtered by severity to identify security issues in vSphere environments.
Instructions
[READ] Latest snapshot's violations, optionally filtered by severity.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| severity | No |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
| result | Yes |
Implementation Reference
- vmware_harden/mcp/tools.py:48-88 (handler)The actual implementation of list_violations. Queries the Twin database for the latest snapshot's violations, optionally filtered by severity, and returns a list of dicts with id, rule_id, node_id, severity, baseline_id, and evidence.
@vmware_tool(risk_level="low") def list_violations(severity: str | None = None) -> list[dict]: """[READ] Latest snapshot's violations, optionally filtered by severity.""" from vmware_harden.store.twin import Twin twin = Twin(_resolve_db()) try: latest = twin.conn.execute( "SELECT id FROM snapshots ORDER BY scan_started_at DESC LIMIT 1" ).fetchone() if not latest: return [] params: list = [latest[0]] sql = ( "SELECT id, rule_id, node_id, severity, baseline_id, evidence " "FROM violation WHERE snapshot_id = ?" ) if severity: sql += " AND severity = ?" params.append(severity) sql += " ORDER BY severity DESC, rule_id" rows = twin.conn.execute(sql, params).fetchall() out: list[dict] = [] for r in rows: try: ev = json.loads(r[5]) if r[5] else None except Exception: ev = None out.append( { "id": r[0], "rule_id": r[1], "node_id": r[2], "severity": r[3], "baseline_id": r[4], "evidence": ev, } ) return out finally: twin.close() - mcp_server/server.py:24-27 (registration)Registration of the 'list_violations' MCP tool via @server.tool decorator on the FastMCP server. Delegates to t.list_violations(severity).
@server.tool(name="list_violations") def _list_violations_impl(severity: str | None = None) -> list[dict]: """[READ] Latest snapshot's violations, optionally filtered by severity.""" return t.list_violations(severity) - vmware_harden/store/schema.py:59-71 (schema)The 'violation' table DDL that defines the schema for violation records queried by list_violations.
""" CREATE TABLE IF NOT EXISTS violation ( id VARCHAR PRIMARY KEY, snapshot_id VARCHAR NOT NULL, baseline_id VARCHAR NOT NULL, rule_id VARCHAR NOT NULL, node_id VARCHAR NOT NULL, severity VARCHAR NOT NULL, evidence JSON, detected_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, status VARCHAR DEFAULT 'open' ) """, - vmware_harden/mcp/tools.py:17-19 (helper)Helper function _resolve_db() used by list_violations to get the database path.
def _resolve_db() -> Path: """Return the configured DB path, defaulting to user dir.""" return _DB_PATH or Path(os.path.expanduser("~/.vmware-harden/twin.duckdb")) - vmware_harden/store/twin.py:13-50 (helper)The Twin class (DuckDB-backed store) that list_violations instantiates to query the database.
class Twin: """Single-file DuckDB-backed estate twin.""" def __init__(self, db_path: Path): self.db_path = db_path self.conn = duckdb.connect(str(db_path)) self.init_schema() # idempotent; CREATE IF NOT EXISTS def init_schema(self) -> None: """Create all tables if they don't exist (idempotent).""" for stmt in DDL: self.conn.execute(stmt) def list_tables(self) -> list[str]: """Return names of all user tables in the database.""" rows = self.conn.execute( "SELECT table_name FROM information_schema.tables " "WHERE table_schema = 'main'" ).fetchall() return [r[0] for r in rows] def start_snapshot(self, target: str) -> str: """Begin a new scan snapshot. Returns the snapshot id (UUID).""" snap_id = str(uuid.uuid4()) self.conn.execute( "INSERT INTO snapshots (id, target, scan_started_at) VALUES (?, ?, ?)", [snap_id, target, datetime.now(timezone.utc)], ) return snap_id def finish_snapshot(self, snapshot_id: str, status: str = "completed") -> None: """Mark a snapshot finished with the given status.""" self.conn.execute( "UPDATE snapshots SET scan_finished_at = ?, status = ? WHERE id = ?", [datetime.now(timezone.utc), status, snapshot_id], ) def write_node_state(