"""
Performance Benchmarking Framework for Agent Orchestration Platform.
This module provides comprehensive performance testing capabilities including:
- Agent creation and management benchmarks
- Concurrent operation performance testing
- Resource usage monitoring and limits validation
- iTerm2 integration performance testing
- MCP tool response time benchmarks
- System scalability and stress testing
Integrates with pytest-benchmark for detailed performance analysis and regression detection.
Author: Adder_1 | Created: 2025-06-26 | Testing Infrastructure Task
"""
import pytest
import asyncio
import time
import psutil
import statistics
from typing import Dict, List, Any, Callable, Optional, Tuple
from dataclasses import dataclass, field
from unittest.mock import AsyncMock
import concurrent.futures
from contextlib import asynccontextmanager
import gc
import tracemalloc
# ============================================================================
# Performance Test Configuration and Metrics
# ============================================================================
@dataclass
class PerformanceMetrics:
"""Container for performance measurement results."""
execution_time: float
memory_usage_mb: float
cpu_usage_percent: float
operations_per_second: float
peak_memory_mb: float
gc_collections: int
error_count: int = 0
success_count: int = 0
@property
def success_rate(self) -> float:
"""Calculate success rate as percentage."""
total = self.success_count + self.error_count
return (self.success_count / total * 100) if total > 0 else 0.0
@dataclass
class PerformanceConfig:
"""Configuration for performance testing parameters."""
# Benchmark settings
warmup_iterations: int = 5
benchmark_iterations: int = 100
max_execution_time: float = 60.0
# Resource limits
max_memory_mb: int = 512
max_cpu_percent: float = 80.0
max_agents: int = 8
max_concurrent_operations: int = 16
# Scalability testing
scale_test_enabled: bool = True
scale_test_max_agents: int = 32
scale_test_step_size: int = 4
# Stress testing
stress_test_enabled: bool = False
stress_test_duration: int = 300 # 5 minutes
stress_test_load_factor: float = 1.5
@dataclass
class BenchmarkResult:
"""Result of a performance benchmark test."""
test_name: str
metrics: PerformanceMetrics
baseline_comparison: Optional[float] = None
status: str = "PASS"
warnings: List[str] = field(default_factory=list)
details: Dict[str, Any] = field(default_factory=dict)
# ============================================================================
# Performance Monitoring Context Manager
# ============================================================================
@asynccontextmanager
async def performance_monitor():
"""
Context manager for monitoring performance metrics during test execution.
Yields:
PerformanceMetrics object that gets populated during execution
"""
# Start monitoring
tracemalloc.start()
process = psutil.Process()
start_time = time.perf_counter()
start_memory = process.memory_info().rss / 1024 / 1024 # MB
start_cpu = process.cpu_percent()
# Track peak memory usage
peak_memory = start_memory
gc_start = sum(gc.get_stats()[i]['collections'] for i in range(len(gc.get_stats())))
metrics = PerformanceMetrics(
execution_time=0.0,
memory_usage_mb=start_memory,
cpu_usage_percent=start_cpu,
operations_per_second=0.0,
peak_memory_mb=start_memory,
gc_collections=0
)
try:
yield metrics
finally:
# Calculate final metrics
end_time = time.perf_counter()
execution_time = end_time - start_time
end_memory = process.memory_info().rss / 1024 / 1024
end_cpu = process.cpu_percent()
current, peak = tracemalloc.get_traced_memory()
peak_memory_mb = peak / 1024 / 1024
gc_end = sum(gc.get_stats()[i]['collections'] for i in range(len(gc.get_stats())))
# Update metrics
metrics.execution_time = execution_time
metrics.memory_usage_mb = end_memory - start_memory
metrics.cpu_usage_percent = max(end_cpu, start_cpu)
metrics.peak_memory_mb = peak_memory_mb
metrics.gc_collections = gc_end - gc_start
tracemalloc.stop()
# ============================================================================
# Core Performance Testing Framework
# ============================================================================
class PerformanceBenchmarkFramework:
"""
Comprehensive performance benchmarking framework for agent orchestration.
Provides benchmarking capabilities for all critical system operations
with detailed metrics collection and regression analysis.
"""
def __init__(self, config: PerformanceConfig = None):
self.config = config or PerformanceConfig()
self.baseline_results: Dict[str, PerformanceMetrics] = {}
self.test_results: List[BenchmarkResult] = []
async def benchmark_agent_creation(
self,
agent_factory: Callable[[], Any],
num_agents: int = 8,
benchmark: Optional[Any] = None
) -> BenchmarkResult:
"""
Benchmark agent creation performance.
Args:
agent_factory: Function to create agents
num_agents: Number of agents to create
benchmark: pytest-benchmark fixture (optional)
Returns:
BenchmarkResult with detailed performance metrics
"""
async def create_agents():
agents = []
async with performance_monitor() as metrics:
start_time = time.perf_counter()
for i in range(num_agents):
try:
agent = await agent_factory()
agents.append(agent)
metrics.success_count += 1
except Exception as e:
metrics.error_count += 1
end_time = time.perf_counter()
execution_time = end_time - start_time
metrics.operations_per_second = num_agents / execution_time if execution_time > 0 else 0
return metrics, agents
if benchmark:
# Use pytest-benchmark for detailed timing
result = benchmark(asyncio.run, create_agents)
metrics, agents = result
else:
# Direct execution for non-benchmark tests
metrics, agents = await create_agents()
# Analyze results
warnings = []
status = "PASS"
if metrics.execution_time > self.config.max_execution_time:
warnings.append(f"Execution time {metrics.execution_time:.2f}s exceeds limit {self.config.max_execution_time}s")
status = "SLOW"
if metrics.memory_usage_mb > self.config.max_memory_mb:
warnings.append(f"Memory usage {metrics.memory_usage_mb:.2f}MB exceeds limit {self.config.max_memory_mb}MB")
status = "FAIL"
if metrics.success_rate < 100.0:
warnings.append(f"Success rate {metrics.success_rate:.1f}% is below 100%")
status = "FAIL"
result = BenchmarkResult(
test_name="agent_creation",
metrics=metrics,
status=status,
warnings=warnings,
details={
"num_agents": num_agents,
"agents_created": len(agents),
"target_ops_per_second": 2.0 # Baseline expectation
}
)
self.test_results.append(result)
return result
async def benchmark_concurrent_operations(
self,
operation_func: Callable[[], Any],
num_concurrent: int = 16,
operations_per_worker: int = 10
) -> BenchmarkResult:
"""
Benchmark concurrent operation performance.
Args:
operation_func: Async function to execute concurrently
num_concurrent: Number of concurrent workers
operations_per_worker: Operations per worker
Returns:
BenchmarkResult with concurrency performance metrics
"""
total_operations = num_concurrent * operations_per_worker
async def worker(worker_id: int) -> Tuple[int, int]:
"""Worker function that executes operations."""
success_count = 0
error_count = 0
for op_id in range(operations_per_worker):
try:
await operation_func()
success_count += 1
except Exception:
error_count += 1
return success_count, error_count
async with performance_monitor() as metrics:
start_time = time.perf_counter()
# Execute concurrent operations
tasks = [worker(i) for i in range(num_concurrent)]
results = await asyncio.gather(*tasks, return_exceptions=True)
end_time = time.perf_counter()
execution_time = end_time - start_time
# Aggregate results
total_success = 0
total_errors = 0
for result in results:
if isinstance(result, tuple):
success, errors = result
total_success += success
total_errors += errors
else:
total_errors += operations_per_worker
metrics.success_count = total_success
metrics.error_count = total_errors
metrics.operations_per_second = total_operations / execution_time if execution_time > 0 else 0
# Analyze concurrency performance
warnings = []
status = "PASS"
if metrics.operations_per_second < (total_operations / 10.0): # Should complete in under 10s
warnings.append(f"Concurrent operations too slow: {metrics.operations_per_second:.2f} ops/sec")
status = "SLOW"
if metrics.success_rate < 95.0: # Allow 5% failure rate in concurrent scenarios
warnings.append(f"High failure rate in concurrent operations: {metrics.success_rate:.1f}%")
status = "FAIL"
result = BenchmarkResult(
test_name="concurrent_operations",
metrics=metrics,
status=status,
warnings=warnings,
details={
"num_workers": num_concurrent,
"operations_per_worker": operations_per_worker,
"total_operations": total_operations,
"concurrency_factor": num_concurrent
}
)
self.test_results.append(result)
return result
async def benchmark_mcp_tool_response_time(
self,
mcp_tool_func: Callable[[Dict[str, Any]], Any],
test_requests: List[Dict[str, Any]]
) -> BenchmarkResult:
"""
Benchmark MCP tool response times.
Args:
mcp_tool_func: MCP tool function to benchmark
test_requests: List of test request parameters
Returns:
BenchmarkResult with MCP tool performance metrics
"""
response_times = []
async with performance_monitor() as metrics:
for request_params in test_requests:
start_time = time.perf_counter()
try:
await mcp_tool_func(request_params)
end_time = time.perf_counter()
response_times.append(end_time - start_time)
metrics.success_count += 1
except Exception:
metrics.error_count += 1
# Calculate response time statistics
if response_times:
avg_response_time = statistics.mean(response_times)
median_response_time = statistics.median(response_times)
p95_response_time = sorted(response_times)[int(0.95 * len(response_times))]
metrics.operations_per_second = len(response_times) / sum(response_times) if response_times else 0
else:
avg_response_time = median_response_time = p95_response_time = 0
# Analyze response time performance
warnings = []
status = "PASS"
if avg_response_time > 2.0: # Average response should be under 2 seconds
warnings.append(f"Average response time {avg_response_time:.2f}s exceeds 2s target")
status = "SLOW"
if p95_response_time > 5.0: # 95th percentile should be under 5 seconds
warnings.append(f"95th percentile response time {p95_response_time:.2f}s exceeds 5s target")
status = "SLOW"
if metrics.success_rate < 99.0: # MCP tools should be very reliable
warnings.append(f"MCP tool reliability {metrics.success_rate:.1f}% below 99% target")
status = "FAIL"
result = BenchmarkResult(
test_name="mcp_tool_response_time",
metrics=metrics,
status=status,
warnings=warnings,
details={
"num_requests": len(test_requests),
"avg_response_time": avg_response_time,
"median_response_time": median_response_time,
"p95_response_time": p95_response_time,
"response_times": response_times[:10] # Sample of response times
}
)
self.test_results.append(result)
return result
async def benchmark_system_scalability(
self,
system_factory: Callable[[int], Any],
max_load: int = None
) -> BenchmarkResult:
"""
Benchmark system scalability under increasing load.
Args:
system_factory: Function that creates system with specified load
max_load: Maximum load to test (defaults to config max_agents)
Returns:
BenchmarkResult with scalability analysis
"""
max_load = max_load or self.config.scale_test_max_agents
step_size = self.config.scale_test_step_size
scalability_data = []
async with performance_monitor() as metrics:
for load in range(step_size, max_load + 1, step_size):
load_start_time = time.perf_counter()
try:
system = await system_factory(load)
load_end_time = time.perf_counter()
load_time = load_end_time - load_start_time
ops_per_second = load / load_time if load_time > 0 else 0
scalability_data.append({
"load": load,
"execution_time": load_time,
"ops_per_second": ops_per_second,
"success": True
})
metrics.success_count += 1
except Exception as e:
scalability_data.append({
"load": load,
"execution_time": None,
"ops_per_second": 0,
"success": False,
"error": str(e)
})
metrics.error_count += 1
# Calculate overall scalability metrics
successful_tests = [d for d in scalability_data if d["success"]]
if successful_tests:
max_successful_load = max(d["load"] for d in successful_tests)
avg_ops_per_second = statistics.mean(d["ops_per_second"] for d in successful_tests)
metrics.operations_per_second = avg_ops_per_second
else:
max_successful_load = 0
avg_ops_per_second = 0
# Analyze scalability performance
warnings = []
status = "PASS"
if max_successful_load < self.config.max_agents:
warnings.append(f"System fails before reaching target load {self.config.max_agents}")
status = "FAIL"
# Check for performance degradation
if len(successful_tests) >= 2:
first_ops = successful_tests[0]["ops_per_second"]
last_ops = successful_tests[-1]["ops_per_second"]
degradation = (first_ops - last_ops) / first_ops * 100 if first_ops > 0 else 0
if degradation > 50: # More than 50% performance degradation
warnings.append(f"Performance degradation of {degradation:.1f}% under load")
status = "DEGRADED"
result = BenchmarkResult(
test_name="system_scalability",
metrics=metrics,
status=status,
warnings=warnings,
details={
"max_load_tested": max_load,
"max_successful_load": max_successful_load,
"scalability_data": scalability_data,
"performance_degradation": degradation if 'degradation' in locals() else 0
}
)
self.test_results.append(result)
return result
async def run_stress_test(
self,
stress_operation: Callable[[], Any],
duration_seconds: int = None
) -> BenchmarkResult:
"""
Run stress test with sustained load.
Args:
stress_operation: Operation to stress test
duration_seconds: Duration of stress test
Returns:
BenchmarkResult with stress test analysis
"""
if not self.config.stress_test_enabled:
return BenchmarkResult(
test_name="stress_test",
metrics=PerformanceMetrics(0, 0, 0, 0, 0, 0),
status="SKIPPED",
warnings=["Stress testing disabled in configuration"]
)
duration = duration_seconds or self.config.stress_test_duration
load_factor = self.config.stress_test_load_factor
stress_data = []
async with performance_monitor() as metrics:
start_time = time.perf_counter()
end_time = start_time + duration
operation_count = 0
while time.perf_counter() < end_time:
interval_start = time.perf_counter()
# Run operations for 10-second intervals
interval_success = 0
interval_errors = 0
interval_end = min(interval_start + 10.0, end_time)
while time.perf_counter() < interval_end:
try:
await stress_operation()
interval_success += 1
operation_count += 1
except Exception:
interval_errors += 1
# Apply load factor (stress multiplier)
if load_factor > 1.0:
for _ in range(int(load_factor) - 1):
try:
await stress_operation()
interval_success += 1
operation_count += 1
except Exception:
interval_errors += 1
# Record interval data
interval_duration = time.perf_counter() - interval_start
stress_data.append({
"timestamp": time.perf_counter() - start_time,
"operations": interval_success,
"errors": interval_errors,
"ops_per_second": interval_success / interval_duration if interval_duration > 0 else 0
})
metrics.success_count += interval_success
metrics.error_count += interval_errors
total_duration = time.perf_counter() - start_time
metrics.operations_per_second = operation_count / total_duration if total_duration > 0 else 0
# Analyze stress test results
warnings = []
status = "PASS"
if stress_data:
# Check for performance degradation over time
early_ops = statistics.mean(d["ops_per_second"] for d in stress_data[:3])
late_ops = statistics.mean(d["ops_per_second"] for d in stress_data[-3:])
degradation = (early_ops - late_ops) / early_ops * 100 if early_ops > 0 else 0
if degradation > 30: # More than 30% degradation during stress test
warnings.append(f"Performance degraded {degradation:.1f}% during stress test")
status = "DEGRADED"
# Check error rate increase
early_errors = sum(d["errors"] for d in stress_data[:len(stress_data)//3])
late_errors = sum(d["errors"] for d in stress_data[2*len(stress_data)//3:])
if late_errors > early_errors * 2: # Error rate doubled
warnings.append("Error rate increased significantly during stress test")
status = "UNSTABLE"
if metrics.success_rate < 90.0: # Allow higher failure rate in stress tests
warnings.append(f"High failure rate during stress test: {metrics.success_rate:.1f}%")
status = "FAIL"
result = BenchmarkResult(
test_name="stress_test",
metrics=metrics,
status=status,
warnings=warnings,
details={
"duration_seconds": duration,
"load_factor": load_factor,
"total_operations": operation_count,
"stress_data": stress_data,
"performance_degradation": degradation if 'degradation' in locals() else 0
}
)
self.test_results.append(result)
return result
def generate_performance_report(self) -> Dict[str, Any]:
"""
Generate comprehensive performance report.
Returns:
Dictionary with performance analysis and recommendations
"""
if not self.test_results:
return {"status": "NO_TESTS", "message": "No performance tests have been run"}
# Aggregate results by status
status_counts = {}
for result in self.test_results:
status_counts[result.status] = status_counts.get(result.status, 0) + 1
# Calculate overall performance score
total_tests = len(self.test_results)
pass_rate = status_counts.get("PASS", 0) / total_tests * 100 if total_tests > 0 else 0
# Identify performance bottlenecks
bottlenecks = []
for result in self.test_results:
if result.status in ["SLOW", "FAIL", "DEGRADED"]:
bottlenecks.append({
"test": result.test_name,
"issue": result.status,
"warnings": result.warnings
})
# Generate recommendations
recommendations = []
if pass_rate < 80:
recommendations.append("Overall performance is below acceptable levels - investigate system bottlenecks")
if any(r.status == "DEGRADED" for r in self.test_results):
recommendations.append("Performance degradation detected under load - optimize resource management")
if any("memory" in w.lower() for r in self.test_results for w in r.warnings):
recommendations.append("Memory usage issues detected - implement memory optimization")
if any("response time" in w.lower() for r in self.test_results for w in r.warnings):
recommendations.append("Response time issues detected - optimize critical paths")
return {
"summary": {
"total_tests": total_tests,
"pass_rate": pass_rate,
"status_breakdown": status_counts
},
"bottlenecks": bottlenecks,
"recommendations": recommendations,
"detailed_results": [
{
"test": r.test_name,
"status": r.status,
"execution_time": r.metrics.execution_time,
"operations_per_second": r.metrics.operations_per_second,
"success_rate": r.metrics.success_rate,
"warnings": r.warnings
}
for r in self.test_results
]
}
# ============================================================================
# Performance Test Utilities and Decorators
# ============================================================================
def performance_test(
max_execution_time: float = 60.0,
max_memory_mb: int = 512,
target_ops_per_second: float = 1.0
):
"""
Decorator for performance tests with specific requirements.
Args:
max_execution_time: Maximum allowed execution time
max_memory_mb: Maximum allowed memory usage
target_ops_per_second: Target operations per second
"""
def decorator(test_func):
test_func._performance_test = True
test_func._max_execution_time = max_execution_time
test_func._max_memory_mb = max_memory_mb
test_func._target_ops_per_second = target_ops_per_second
return test_func
return decorator
def benchmark_fixture(
iterations: int = 100,
warmup: int = 5
):
"""Create a benchmark fixture with specified parameters."""
def decorator(benchmark_func):
benchmark_func._benchmark_iterations = iterations
benchmark_func._benchmark_warmup = warmup
return benchmark_func
return decorator
# Export main components
__all__ = [
'PerformanceBenchmarkFramework', 'PerformanceConfig', 'PerformanceMetrics',
'BenchmarkResult', 'performance_monitor', 'performance_test', 'benchmark_fixture'
]