"""
Performance Benchmarking Tests
This test module implements comprehensive performance benchmarking for the
Agent Orchestration Platform, measuring key performance metrics and ensuring
the system meets performance targets.
Performance Targets:
- Agent creation: < 10 seconds average
- MCP tool response: < 2 seconds average
- Memory usage per agent: < 512MB maximum
- Concurrent agents: 8+ per session, 32 total
- Session recovery: < 30 seconds
- Health check latency: < 100ms
Author: Adder_2 | Created: 2025-06-26 | Last Modified: 2025-06-26
"""
import asyncio
import pytest
import time
import psutil
import gc
from typing import List, Dict, Any, Tuple
from datetime import datetime
from statistics import mean, median, stdev
from unittest.mock import AsyncMock, MagicMock, patch
import tracemalloc
# Import components to benchmark
from src.core.integrated_server import IntegratedAgentOrchestrationServer
from src.core.coordinator import OrchestrationCoordinator, CoordinationStrategy, CoordinationContext
from src.core.agent_manager import AgentManager
from src.core.session_manager import SessionManager
# Import types
from src.models.ids import SessionId, AgentId
from src.models.agent import AgentCreationRequest, AgentSpecialization
from src.models.session import SecurityLevel
from pathlib import Path
class BenchmarkMetrics:
"""Container for benchmark metrics."""
def __init__(self, name: str):
self.name = name
self.times: List[float] = []
self.memory_usage: List[int] = []
self.cpu_usage: List[float] = []
self.errors: int = 0
def add_timing(self, duration: float):
"""Add timing measurement."""
self.times.append(duration)
def add_memory(self, memory_bytes: int):
"""Add memory measurement."""
self.memory_usage.append(memory_bytes)
def add_cpu(self, cpu_percent: float):
"""Add CPU measurement."""
self.cpu_usage.append(cpu_percent)
def increment_errors(self):
"""Increment error count."""
self.errors += 1
def get_summary(self) -> Dict[str, Any]:
"""Get summary statistics."""
summary = {
"name": self.name,
"samples": len(self.times),
"errors": self.errors
}
if self.times:
summary.update({
"avg_time_ms": mean(self.times) * 1000,
"median_time_ms": median(self.times) * 1000,
"min_time_ms": min(self.times) * 1000,
"max_time_ms": max(self.times) * 1000,
"stdev_time_ms": stdev(self.times) * 1000 if len(self.times) > 1 else 0
})
if self.memory_usage:
summary.update({
"avg_memory_mb": mean(self.memory_usage) / (1024 * 1024),
"max_memory_mb": max(self.memory_usage) / (1024 * 1024)
})
if self.cpu_usage:
summary.update({
"avg_cpu_percent": mean(self.cpu_usage),
"max_cpu_percent": max(self.cpu_usage)
})
return summary
@pytest.fixture
async def benchmark_server():
"""Create server optimized for benchmarking with mocked external deps."""
server = IntegratedAgentOrchestrationServer(
name="BenchmarkServer",
coordination_strategy=CoordinationStrategy.PERFORMANCE_OPTIMIZED,
max_agents=32,
max_sessions=16
)
# Mock external dependencies for consistent benchmarking
with patch('src.core.iterm_manager.ITermManager') as mock_iterm_class, \
patch('src.core.claude_manager.ClaudeCodeManager') as mock_claude_class:
# Create lightweight mocks
mock_iterm = AsyncMock()
mock_claude = AsyncMock()
mock_iterm_class.return_value = mock_iterm
mock_claude_class.return_value = mock_claude
# Minimal delays for consistent benchmarking
async def fast_create_tab(*args, **kwargs):
await asyncio.sleep(0.001) # 1ms simulated delay
return f"tab_{kwargs.get('name', 'bench')}"
async def fast_spawn_process(*args, **kwargs):
await asyncio.sleep(0.001) # 1ms simulated delay
return 10000 + hash(str(kwargs)) % 10000
mock_iterm.initialize.return_value = None
mock_iterm.create_tab.side_effect = fast_create_tab
mock_iterm.close_tab.return_value = True
mock_iterm.is_tab_alive.return_value = True
mock_claude.initialize.return_value = None
mock_claude.spawn_claude_process.side_effect = fast_spawn_process
mock_claude.terminate_process.return_value = True
mock_claude.is_process_alive.return_value = True
await server.initialize()
yield server
await server.shutdown()
class TestAgentCreationBenchmarks:
"""Benchmarks for agent creation operations."""
@pytest.mark.benchmark
async def test_single_agent_creation_latency(self, benchmark_server):
"""Benchmark single agent creation latency."""
server = benchmark_server
metrics = BenchmarkMetrics("single_agent_creation")
# Create test session
session_result = await server.coordinator.session_manager.create_session(
root_path=Path("/bench/single"),
session_name="bench_session",
security_level=SecurityLevel.MEDIUM
)
assert session_result.success
session_id = session_result.session_id
# Warm up
for _ in range(3):
agent_request = AgentCreationRequest(
session_id=session_id,
agent_name="WarmupAgent",
specialization=None,
system_prompt_suffix="",
claude_config={}
)
context = CoordinationContext(
operation_id="warmup",
strategy=CoordinationStrategy.PERFORMANCE_OPTIMIZED,
security_level=SecurityLevel.MEDIUM,
resource_limits={},
priority=8,
metadata={}
)
try:
await server.coordinator.create_agent_coordinated(
session_id, agent_request, context
)
except:
pass
# Clear warmup agents
for agent_id in list(server.coordinator.agent_manager.agents.keys()):
agent_state = server.coordinator.agent_manager.agents[agent_id]
await server.coordinator.agent_manager.delete_agent(agent_state.name)
# Benchmark runs
num_runs = 10
for i in range(num_runs):
agent_request = AgentCreationRequest(
session_id=session_id,
agent_name=f"BenchAgent_{i}",
specialization=None,
system_prompt_suffix="Benchmark agent",
claude_config={"model": "claude-3-opus"}
)
context = CoordinationContext(
operation_id=f"bench_{i}",
strategy=CoordinationStrategy.PERFORMANCE_OPTIMIZED,
security_level=SecurityLevel.MEDIUM,
resource_limits={},
priority=8,
metadata={}
)
# Measure timing
start_time = time.perf_counter()
try:
result = await server.coordinator.create_agent_coordinated(
session_id, agent_request, context
)
end_time = time.perf_counter()
if result.success:
metrics.add_timing(end_time - start_time)
else:
metrics.increment_errors()
except Exception as e:
metrics.increment_errors()
print(f"Error in benchmark: {e}")
# Print results
summary = metrics.get_summary()
print(f"\n{summary['name']} Results:")
print(f" Average: {summary['avg_time_ms']:.2f}ms")
print(f" Median: {summary['median_time_ms']:.2f}ms")
print(f" Min: {summary['min_time_ms']:.2f}ms")
print(f" Max: {summary['max_time_ms']:.2f}ms")
print(f" StdDev: {summary['stdev_time_ms']:.2f}ms")
# Assert performance targets
assert summary['avg_time_ms'] < 10000 # < 10 seconds average
assert summary['max_time_ms'] < 15000 # < 15 seconds max
# Cleanup
await server.coordinator.session_manager.delete_session(session_id)
@pytest.mark.benchmark
async def test_concurrent_agent_creation_throughput(self, benchmark_server):
"""Benchmark concurrent agent creation throughput."""
server = benchmark_server
metrics = BenchmarkMetrics("concurrent_agent_creation")
# Create test sessions
num_sessions = 3
sessions = []
for i in range(num_sessions):
result = await server.coordinator.session_manager.create_session(
root_path=Path(f"/bench/concurrent_{i}"),
session_name=f"concurrent_session_{i}",
security_level=SecurityLevel.LOW
)
assert result.success
sessions.append(result.session_id)
# Benchmark concurrent creation
agents_per_session = 4
total_agents = num_sessions * agents_per_session
start_time = time.perf_counter()
# Create all agent requests
tasks = []
for session_idx, session_id in enumerate(sessions):
for agent_idx in range(agents_per_session):
agent_num = session_idx * agents_per_session + agent_idx
agent_request = AgentCreationRequest(
session_id=session_id,
agent_name=f"ConcurrentAgent_{agent_num}",
specialization=None,
system_prompt_suffix="Concurrent benchmark",
claude_config={}
)
context = CoordinationContext(
operation_id=f"concurrent_{agent_num}",
strategy=CoordinationStrategy.PERFORMANCE_OPTIMIZED,
security_level=SecurityLevel.LOW,
resource_limits={},
priority=7,
metadata={}
)
task = server.coordinator.create_agent_coordinated(
session_id, agent_request, context
)
tasks.append(task)
# Execute concurrently
results = await asyncio.gather(*tasks, return_exceptions=True)
end_time = time.perf_counter()
total_time = end_time - start_time
# Count successes
successes = sum(1 for r in results if not isinstance(r, Exception) and r.success)
errors = total_agents - successes
# Calculate throughput
throughput = successes / total_time if total_time > 0 else 0
print(f"\nConcurrent Agent Creation Results:")
print(f" Total agents: {total_agents}")
print(f" Successful: {successes}")
print(f" Errors: {errors}")
print(f" Total time: {total_time:.2f}s")
print(f" Throughput: {throughput:.2f} agents/second")
print(f" Avg time per agent: {(total_time / successes * 1000):.2f}ms")
# Assert performance targets
assert successes >= total_agents * 0.9 # 90% success rate
assert throughput > 0.5 # At least 0.5 agents per second
# Cleanup
for session_id in sessions:
await server.coordinator.session_manager.delete_session(session_id)
@pytest.mark.benchmark
async def test_agent_creation_memory_usage(self, benchmark_server):
"""Benchmark memory usage during agent creation."""
server = benchmark_server
metrics = BenchmarkMetrics("agent_memory_usage")
# Start memory tracking
tracemalloc.start()
gc.collect() # Clean baseline
# Get baseline memory
baseline_memory = tracemalloc.get_traced_memory()[0]
# Create session
session_result = await server.coordinator.session_manager.create_session(
root_path=Path("/bench/memory"),
session_name="memory_session",
security_level=SecurityLevel.MEDIUM
)
session_id = session_result.session_id
# Create agents and track memory
num_agents = 8
memory_per_agent = []
for i in range(num_agents):
gc.collect() # Clean before measurement
pre_memory = tracemalloc.get_traced_memory()[0]
agent_request = AgentCreationRequest(
session_id=session_id,
agent_name=f"MemoryAgent_{i}",
specialization=AgentSpecialization.RESEARCH if i % 2 == 0 else None,
system_prompt_suffix="Memory benchmark agent with some additional context",
claude_config={"model": "claude-3-opus", "temperature": 0.7}
)
context = CoordinationContext(
operation_id=f"memory_{i}",
strategy=CoordinationStrategy.PERFORMANCE_OPTIMIZED,
security_level=SecurityLevel.MEDIUM,
resource_limits={},
priority=7,
metadata={"benchmark": "memory"}
)
result = await server.coordinator.create_agent_coordinated(
session_id, agent_request, context
)
post_memory = tracemalloc.get_traced_memory()[0]
if result.success:
agent_memory = post_memory - pre_memory
memory_per_agent.append(agent_memory)
metrics.add_memory(agent_memory)
# Get peak memory
current, peak = tracemalloc.get_traced_memory()
total_memory_used = peak - baseline_memory
tracemalloc.stop()
# Calculate statistics
if memory_per_agent:
avg_memory_per_agent = mean(memory_per_agent)
max_memory_per_agent = max(memory_per_agent)
else:
avg_memory_per_agent = 0
max_memory_per_agent = 0
print(f"\nAgent Memory Usage Results:")
print(f" Agents created: {len(memory_per_agent)}")
print(f" Avg memory per agent: {avg_memory_per_agent / (1024 * 1024):.2f}MB")
print(f" Max memory per agent: {max_memory_per_agent / (1024 * 1024):.2f}MB")
print(f" Total memory used: {total_memory_used / (1024 * 1024):.2f}MB")
# Assert memory targets
assert avg_memory_per_agent < 512 * 1024 * 1024 # < 512MB per agent average
assert max_memory_per_agent < 768 * 1024 * 1024 # < 768MB per agent max
# Cleanup
await server.coordinator.session_manager.delete_session(session_id)
class TestSystemScalabilityBenchmarks:
"""Benchmarks for system scalability."""
@pytest.mark.benchmark
async def test_max_concurrent_agents(self, benchmark_server):
"""Benchmark maximum concurrent agents the system can handle."""
server = benchmark_server
metrics = BenchmarkMetrics("max_concurrent_agents")
# Create multiple sessions
sessions = []
for i in range(4):
result = await server.coordinator.session_manager.create_session(
root_path=Path(f"/bench/scale_{i}"),
session_name=f"scale_session_{i}",
security_level=SecurityLevel.LOW,
max_agents=8
)
sessions.append(result.session_id)
# Try to create maximum agents
target_agents = 32 # System max
created_agents = []
start_time = time.perf_counter()
process = psutil.Process()
initial_cpu = process.cpu_percent()
for i in range(target_agents):
session_id = sessions[i % len(sessions)]
agent_request = AgentCreationRequest(
session_id=session_id,
agent_name=f"ScaleAgent_{i}",
specialization=None,
system_prompt_suffix="Scale test",
claude_config={}
)
context = CoordinationContext(
operation_id=f"scale_{i}",
strategy=CoordinationStrategy.RESOURCE_CONSERVATIVE,
security_level=SecurityLevel.LOW,
resource_limits={},
priority=5,
metadata={}
)
try:
result = await server.coordinator.create_agent_coordinated(
session_id, agent_request, context
)
if result.success:
created_agents.append(f"ScaleAgent_{i}")
metrics.add_cpu(process.cpu_percent())
except Exception:
break # Hit limit
end_time = time.perf_counter()
total_time = end_time - start_time
# Get final metrics
final_cpu = process.cpu_percent()
memory_info = process.memory_info()
print(f"\nMax Concurrent Agents Results:")
print(f" Target agents: {target_agents}")
print(f" Created agents: {len(created_agents)}")
print(f" Total time: {total_time:.2f}s")
print(f" CPU usage: {initial_cpu:.1f}% -> {final_cpu:.1f}%")
print(f" Memory usage: {memory_info.rss / (1024 * 1024):.2f}MB")
# Assert scalability targets
assert len(created_agents) >= 24 # Should handle at least 75% of max
assert final_cpu < 80 # CPU should not be saturated
# Cleanup
for session_id in sessions:
await server.coordinator.session_manager.delete_session(session_id)
@pytest.mark.benchmark
async def test_health_check_performance(self, benchmark_server):
"""Benchmark health check performance with many agents."""
server = benchmark_server
metrics = BenchmarkMetrics("health_check_performance")
# Create session with many agents
session_result = await server.coordinator.session_manager.create_session(
root_path=Path("/bench/health"),
session_name="health_session",
security_level=SecurityLevel.LOW,
max_agents=16
)
session_id = session_result.session_id
# Create agents
num_agents = 16
for i in range(num_agents):
agent_request = AgentCreationRequest(
session_id=session_id,
agent_name=f"HealthAgent_{i}",
specialization=None,
system_prompt_suffix="",
claude_config={}
)
context = CoordinationContext(
operation_id=f"health_create_{i}",
strategy=CoordinationStrategy.PERFORMANCE_OPTIMIZED,
security_level=SecurityLevel.LOW,
resource_limits={},
priority=6,
metadata={}
)
await server.coordinator.create_agent_coordinated(
session_id, agent_request, context
)
# Benchmark health checks
num_checks = 20
for i in range(num_checks):
start_time = time.perf_counter()
await server.coordinator.agent_manager._check_agent_health()
end_time = time.perf_counter()
metrics.add_timing(end_time - start_time)
# Small delay between checks
await asyncio.sleep(0.1)
# Calculate results
summary = metrics.get_summary()
print(f"\nHealth Check Performance Results:")
print(f" Agents monitored: {num_agents}")
print(f" Checks performed: {num_checks}")
print(f" Avg check time: {summary['avg_time_ms']:.2f}ms")
print(f" Max check time: {summary['max_time_ms']:.2f}ms")
# Assert performance targets
assert summary['avg_time_ms'] < 100 # < 100ms average
assert summary['max_time_ms'] < 200 # < 200ms max
# Cleanup
await server.coordinator.session_manager.delete_session(session_id)
@pytest.mark.benchmark
async def test_session_recovery_performance(self, benchmark_server):
"""Benchmark session recovery performance."""
server = benchmark_server
metrics = BenchmarkMetrics("session_recovery")
# Create sessions with agents
num_sessions = 5
agents_per_session = 3
# Setup phase
session_states = []
for i in range(num_sessions):
session_result = await server.coordinator.session_manager.create_session(
root_path=Path(f"/bench/recovery_{i}"),
session_name=f"recovery_session_{i}",
security_level=SecurityLevel.MEDIUM
)
session_id = session_result.session_id
agents = []
for j in range(agents_per_session):
agent_request = AgentCreationRequest(
session_id=session_id,
agent_name=f"RecoveryAgent_{i}_{j}",
specialization=None,
system_prompt_suffix="Recovery test",
claude_config={}
)
context = CoordinationContext(
operation_id=f"recovery_setup_{i}_{j}",
strategy=CoordinationStrategy.PERFORMANCE_OPTIMIZED,
security_level=SecurityLevel.MEDIUM,
resource_limits={},
priority=7,
metadata={}
)
result = await server.coordinator.create_agent_coordinated(
session_id, agent_request, context
)
if result.success:
agents.append(result.metadata["agent_id"])
session_states.append({
"session_id": session_id,
"agents": agents
})
# Simulate saving state
state_data = await server.coordinator.state_manager.get_all_state()
# Simulate recovery (measure time to restore all sessions)
start_time = time.perf_counter()
# In a real scenario, we would:
# 1. Clear current state
# 2. Restore from saved state
# 3. Verify all agents are recovered
# For benchmark, we simulate by getting status of all sessions
for session_state in session_states:
status = await server.coordinator.session_manager.get_session_status(
session_state["session_id"]
)
assert status["agent_count"] == len(session_state["agents"])
# Verify all agents are accessible
total_agents = sum(len(s["agents"]) for s in session_states)
assert len(server.coordinator.agent_manager.agents) == total_agents
end_time = time.perf_counter()
recovery_time = end_time - start_time
print(f"\nSession Recovery Performance Results:")
print(f" Sessions recovered: {num_sessions}")
print(f" Total agents: {total_agents}")
print(f" Recovery time: {recovery_time * 1000:.2f}ms")
print(f" Time per session: {(recovery_time / num_sessions) * 1000:.2f}ms")
# Assert recovery targets
assert recovery_time < 30 # < 30 seconds total
# Cleanup
for session_state in session_states:
await server.coordinator.session_manager.delete_session(
session_state["session_id"]
)
class TestResourceMonitoringBenchmarks:
"""Benchmarks for resource monitoring overhead."""
@pytest.mark.benchmark
async def test_monitoring_overhead(self, benchmark_server):
"""Benchmark overhead of resource monitoring."""
server = benchmark_server
metrics = BenchmarkMetrics("monitoring_overhead")
# Create baseline load
session_result = await server.coordinator.session_manager.create_session(
root_path=Path("/bench/monitoring"),
session_name="monitoring_session",
security_level=SecurityLevel.LOW
)
session_id = session_result.session_id
# Create some agents
for i in range(8):
agent_request = AgentCreationRequest(
session_id=session_id,
agent_name=f"MonitorAgent_{i}",
specialization=None,
system_prompt_suffix="",
claude_config={}
)
context = CoordinationContext(
operation_id=f"monitor_setup_{i}",
strategy=CoordinationStrategy.PERFORMANCE_OPTIMIZED,
security_level=SecurityLevel.LOW,
resource_limits={},
priority=6,
metadata={}
)
await server.coordinator.create_agent_coordinated(
session_id, agent_request, context
)
# Measure baseline operation without monitoring
baseline_times = []
for i in range(10):
start_time = time.perf_counter()
# Simple operation
_ = await server.coordinator.get_system_health()
end_time = time.perf_counter()
baseline_times.append(end_time - start_time)
# Enable intensive monitoring
server.coordinator.resource_monitor.monitoring_interval = 0.1 # 100ms
# Measure with monitoring
monitored_times = []
for i in range(10):
start_time = time.perf_counter()
# Same operation
_ = await server.coordinator.get_system_health()
end_time = time.perf_counter()
monitored_times.append(end_time - start_time)
await asyncio.sleep(0.1) # Let monitor run
# Calculate overhead
baseline_avg = mean(baseline_times)
monitored_avg = mean(monitored_times)
overhead_percent = ((monitored_avg - baseline_avg) / baseline_avg) * 100
print(f"\nMonitoring Overhead Results:")
print(f" Baseline avg: {baseline_avg * 1000:.2f}ms")
print(f" Monitored avg: {monitored_avg * 1000:.2f}ms")
print(f" Overhead: {overhead_percent:.1f}%")
# Assert overhead is reasonable
assert overhead_percent < 20 # Less than 20% overhead
# Cleanup
await server.coordinator.session_manager.delete_session(session_id)
def print_benchmark_summary(benchmarks: List[Dict[str, Any]]):
"""Print summary of all benchmarks."""
print("\n" + "="*60)
print("BENCHMARK SUMMARY")
print("="*60)
for benchmark in benchmarks:
print(f"\n{benchmark['name']}:")
print(f" Status: {'PASS' if benchmark['passed'] else 'FAIL'}")
if 'avg_time_ms' in benchmark:
print(f" Avg time: {benchmark['avg_time_ms']:.2f}ms")
if 'throughput' in benchmark:
print(f" Throughput: {benchmark['throughput']:.2f} ops/sec")
if 'memory_mb' in benchmark:
print(f" Memory: {benchmark['memory_mb']:.2f}MB")
if 'notes' in benchmark:
print(f" Notes: {benchmark['notes']}")
print("\n" + "="*60)