"""
Property-Based Tests for Agent Deletion
Comprehensive property-based testing for agent deletion operations using
Hypothesis to verify correctness across all possible input combinations.
Test Properties:
- Deletion Idempotency: Deleting twice has same effect as deleting once
- Resource Cleanup: All resources freed after successful deletion
- State Consistency: No partial states after deletion
- Security Preservation: Permissions enforced regardless of inputs
- Audit Completeness: All deletions produce audit trails
Author: ADDER_4 | Created: 2025-06-26 | Last Modified: 2025-06-26
"""
import asyncio
from typing import Dict, Set, Optional, List, Any
from dataclasses import dataclass
from datetime import datetime
import pytest
from hypothesis import given, strategies as st, assume, note
from hypothesis.stateful import (
Bundle, RuleBasedStateMachine, rule, invariant,
initialize, precondition, multiple
)
# Import test utilities
from tests.fixtures.domain import (
create_test_agent_id, create_test_session_id,
create_test_agent_state, create_test_security_context
)
from tests.strategies.hypothesis_strategies import (
agent_name_strategy, security_context_strategy,
agent_state_strategy
)
@dataclass
class MockAgent:
"""Mock agent for testing."""
name: str
agent_id: str
session_id: str
process_id: int
iterm_tab_id: str
has_critical_ops: bool = False
is_deleted: bool = False
resources: Dict[str, Any] = None
def __post_init__(self):
if self.resources is None:
self.resources = {
"memory_mb": 256,
"cpu_percent": 10.0,
"disk_mb": 100
}
class AgentDeletionStateMachine(RuleBasedStateMachine):
"""
State machine for testing agent deletion properties.
Models a system with multiple agents that can be created and deleted,
verifying that all operations maintain system invariants.
"""
# Bundles for Hypothesis
agents = Bundle('agents')
deleted_agents = Bundle('deleted_agents')
def __init__(self):
super().__init__()
self.active_agents: Dict[str, MockAgent] = {}
self.deleted_agent_names: Set[str] = set()
self.total_resources = {
"memory_mb": 8192, # 8GB total
"cpu_percent": 800.0, # 8 cores
"disk_mb": 100000 # 100GB
}
self.audit_log: List[Dict[str, Any]] = []
self.next_agent_number = 1
@initialize()
def setup(self):
"""Initialize the state machine."""
note("Initialized agent deletion state machine")
@rule(
target=agents,
has_critical_ops=st.booleans()
)
def create_agent(self, has_critical_ops):
"""Create a new agent in the system."""
agent_name = f"Agent_{self.next_agent_number}"
self.next_agent_number += 1
# Check resource availability
required_resources = {
"memory_mb": 256,
"cpu_percent": 10.0,
"disk_mb": 100
}
if self._has_sufficient_resources(required_resources):
agent = MockAgent(
name=agent_name,
agent_id=create_test_agent_id(),
session_id=create_test_session_id(),
process_id=10000 + self.next_agent_number,
iterm_tab_id=f"tab_{self.next_agent_number}",
has_critical_ops=has_critical_ops,
resources=required_resources
)
self.active_agents[agent_name] = agent
self._allocate_resources(required_resources)
self.audit_log.append({
"operation": "agent_created",
"agent_name": agent_name,
"timestamp": datetime.utcnow(),
"resources": required_resources
})
note(f"Created agent {agent_name} (critical_ops={has_critical_ops})")
return agent
@rule(
agent=agents,
force=st.booleans(),
timeout=st.integers(min_value=1, max_value=60)
)
def delete_agent(self, agent: MockAgent, force: bool, timeout: int):
"""Delete an agent from the system."""
assume(not agent.is_deleted)
# Check deletion conditions
can_delete = force or not agent.has_critical_ops
if can_delete:
# Perform deletion
agent.is_deleted = True
del self.active_agents[agent.name]
self.deleted_agent_names.add(agent.name)
# Free resources
self._free_resources(agent.resources)
# Audit log
self.audit_log.append({
"operation": "agent_deleted",
"agent_name": agent.name,
"force": force,
"timeout": timeout,
"timestamp": datetime.utcnow(),
"resources_freed": agent.resources
})
note(f"Deleted agent {agent.name} (force={force})")
return multiple(deleted_agents.filter(lambda a: a.name == agent.name))
else:
# Deletion blocked
self.audit_log.append({
"operation": "deletion_blocked",
"agent_name": agent.name,
"reason": "critical_operations",
"timestamp": datetime.utcnow()
})
note(f"Deletion of {agent.name} blocked due to critical operations")
@rule(
agent=agents,
critical_ops=st.booleans()
)
def update_agent_state(self, agent: MockAgent, critical_ops: bool):
"""Update agent's critical operations state."""
assume(not agent.is_deleted)
old_state = agent.has_critical_ops
agent.has_critical_ops = critical_ops
self.audit_log.append({
"operation": "state_updated",
"agent_name": agent.name,
"old_critical_ops": old_state,
"new_critical_ops": critical_ops,
"timestamp": datetime.utcnow()
})
note(f"Updated {agent.name} critical_ops: {old_state} -> {critical_ops}")
@invariant()
def agents_are_either_active_or_deleted(self):
"""Each agent is either active or deleted, never both."""
active_names = set(self.active_agents.keys())
assert len(active_names & self.deleted_agent_names) == 0
@invariant()
def deleted_agents_stay_deleted(self):
"""Once deleted, agents cannot be reactivated."""
for name in self.deleted_agent_names:
assert name not in self.active_agents
@invariant()
def resources_are_conserved(self):
"""Total resources should be conserved."""
used_resources = {
"memory_mb": 0,
"cpu_percent": 0.0,
"disk_mb": 0
}
for agent in self.active_agents.values():
for resource, amount in agent.resources.items():
used_resources[resource] += amount
for resource, total in self.total_resources.items():
used = used_resources[resource]
assert used <= total, f"{resource}: used {used} > total {total}"
@invariant()
def audit_log_is_complete(self):
"""Every operation should be in the audit log."""
# Count operations in audit log
creations = sum(1 for entry in self.audit_log if entry["operation"] == "agent_created")
deletions = sum(1 for entry in self.audit_log if entry["operation"] == "agent_deleted")
# Verify counts
assert creations >= len(self.active_agents) + len(self.deleted_agent_names)
assert deletions == len(self.deleted_agent_names)
@invariant()
def agent_names_are_unique(self):
"""Agent names must be unique across active and deleted."""
all_names = set(self.active_agents.keys()) | self.deleted_agent_names
assert len(all_names) == len(self.active_agents) + len(self.deleted_agent_names)
def _has_sufficient_resources(self, required: Dict[str, Any]) -> bool:
"""Check if sufficient resources are available."""
used = self._calculate_used_resources()
for resource, amount in required.items():
available = self.total_resources[resource] - used[resource]
if amount > available:
return False
return True
def _calculate_used_resources(self) -> Dict[str, Any]:
"""Calculate currently used resources."""
used = {"memory_mb": 0, "cpu_percent": 0.0, "disk_mb": 0}
for agent in self.active_agents.values():
for resource, amount in agent.resources.items():
used[resource] += amount
return used
def _allocate_resources(self, resources: Dict[str, Any]) -> None:
"""Allocate resources (for tracking)."""
# Resources tracked implicitly through agents
pass
def _free_resources(self, resources: Dict[str, Any]) -> None:
"""Free resources (for tracking)."""
# Resources freed implicitly when agent removed
pass
@given(
agent_names=st.lists(
agent_name_strategy(),
min_size=0,
max_size=10,
unique=True
),
force_flags=st.lists(st.booleans(), min_size=10, max_size=10),
timeouts=st.lists(
st.integers(min_value=1, max_value=60),
min_size=10,
max_size=10
)
)
def test_deletion_idempotency(agent_names, force_flags, timeouts):
"""
Property: Deleting an agent twice has the same effect as deleting once.
"""
deleted_agents = set()
deletion_results = {}
for i, agent_name in enumerate(agent_names):
force = force_flags[i % len(force_flags)]
timeout = timeouts[i % len(timeouts)]
# First deletion
if agent_name not in deleted_agents:
deletion_results[agent_name] = {
"first_deletion": True,
"force": force,
"timeout": timeout
}
deleted_agents.add(agent_name)
# Second deletion (idempotent)
if agent_name in deleted_agents:
# Should have same result
assert agent_name in deletion_results
assert deletion_results[agent_name]["first_deletion"]
@given(
num_agents=st.integers(min_value=0, max_value=20),
deletion_pattern=st.lists(
st.tuples(
st.integers(min_value=0, max_value=19),
st.booleans() # force flag
),
min_size=0,
max_size=20
)
)
def test_resource_cleanup_completeness(num_agents, deletion_pattern):
"""
Property: All resources are properly freed after deletion.
"""
# Create agents
agents = {}
total_allocated = {"memory": 0, "cpu": 0}
for i in range(num_agents):
agent_name = f"Agent_{i}"
resources = {
"memory": 256 + (i * 10), # Variable memory
"cpu": 10 + (i % 4) # Variable CPU
}
agents[agent_name] = resources
total_allocated["memory"] += resources["memory"]
total_allocated["cpu"] += resources["cpu"]
# Delete agents according to pattern
for agent_idx, force in deletion_pattern:
if 0 <= agent_idx < num_agents:
agent_name = f"Agent_{agent_idx}"
if agent_name in agents:
resources = agents[agent_name]
del agents[agent_name]
total_allocated["memory"] -= resources["memory"]
total_allocated["cpu"] -= resources["cpu"]
# Verify resource accounting
remaining_memory = sum(r["memory"] for r in agents.values())
remaining_cpu = sum(r["cpu"] for r in agents.values())
assert remaining_memory == total_allocated["memory"]
assert remaining_cpu == total_allocated["cpu"]
@given(
security_context=security_context_strategy(),
agent_states=st.lists(
agent_state_strategy(),
min_size=1,
max_size=5
),
force_override=st.booleans()
)
def test_security_preservation(security_context, agent_states, force_override):
"""
Property: Security policies are enforced regardless of deletion parameters.
"""
# Test security rules
for agent_state in agent_states:
# Rule 1: Can't delete agents with critical operations without force
if agent_state.has_critical_operations and not force_override:
can_delete = security_context.security_level == "ADMIN"
assert not can_delete or force_override
# Rule 2: Cross-session deletion requires elevated permissions
if agent_state.session_id != security_context.session_id:
can_delete = security_context.security_level in ["ADMIN", "MANAGER"]
assert can_delete or force_override
# Rule 3: Audit trail required for all deletions
# This is always true - no assertion needed, just noting the requirement
@given(
num_operations=st.integers(min_value=0, max_value=100),
operation_types=st.lists(
st.sampled_from(["create", "delete", "update"]),
min_size=100,
max_size=100
)
)
def test_audit_trail_completeness(num_operations, operation_types):
"""
Property: Every deletion operation produces a complete audit trail.
"""
audit_log = []
agents = {}
for i in range(num_operations):
op_type = operation_types[i % len(operation_types)]
if op_type == "create":
agent_name = f"Agent_{len(agents)}"
agents[agent_name] = {"created_at": i}
audit_log.append({
"operation": "create",
"agent": agent_name,
"index": i
})
elif op_type == "delete" and agents:
# Delete random agent
agent_name = list(agents.keys())[i % len(agents)]
del agents[agent_name]
audit_log.append({
"operation": "delete",
"agent": agent_name,
"index": i
})
# Verify audit log properties
for i, entry in enumerate(audit_log):
assert "operation" in entry
assert "agent" in entry
assert "index" in entry
assert entry["index"] == i # Ordered
# Run the state machine test
TestAgentDeletion = AgentDeletionStateMachine.TestCase
if __name__ == "__main__":
# Run property tests
test_deletion_idempotency()
test_resource_cleanup_completeness()
test_security_preservation()
test_audit_trail_completeness()
# Run state machine tests
state_machine = AgentDeletionStateMachine()
state_machine.run_tests()