"""
Concurrent Operation Testing Framework for Agent Orchestration Platform.
This module provides comprehensive testing infrastructure for concurrent operations including:
- Concurrent agent creation and management testing
- Race condition detection and prevention validation
- Resource contention and deadlock detection
- State consistency under concurrent modifications
- Performance testing under concurrent load
- Synchronization primitive testing
Designed to detect and prevent concurrency issues in agent orchestration.
Author: Adder_1 | Created: 2025-06-26 | Testing Infrastructure Task
"""
import asyncio
import pytest
import time
import threading
import concurrent.futures
from typing import (
Dict, List, Any, Callable, Optional, Awaitable, TypeVar, Generic,
Union, Tuple, Set
)
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from contextlib import asynccontextmanager
import secrets
import logging
from unittest.mock import AsyncMock
from hypothesis import given, strategies as st, settings, HealthCheck
import queue
import weakref
T = TypeVar('T')
R = TypeVar('R')
# ============================================================================
# Concurrent Test Configuration and Metrics
# ============================================================================
@dataclass
class ConcurrencyTestConfig:
"""Configuration for concurrent operation testing."""
max_workers: int = 16
max_operations_per_worker: int = 50
operation_timeout: float = 30.0
race_condition_detection: bool = True
resource_leak_detection: bool = True
deadlock_detection_timeout: float = 10.0
performance_monitoring: bool = True
stress_test_duration: float = 60.0
@dataclass
class ConcurrencyTestMetrics:
"""Metrics collected during concurrent operation testing."""
total_operations: int = 0
successful_operations: int = 0
failed_operations: int = 0
concurrent_peak: int = 0
total_duration: float = 0.0
average_operation_time: float = 0.0
race_conditions_detected: int = 0
resource_leaks_detected: int = 0
deadlocks_detected: int = 0
performance_degradation: float = 0.0
@property
def success_rate(self) -> float:
"""Calculate operation success rate."""
if self.total_operations == 0:
return 0.0
return (self.successful_operations / self.total_operations) * 100
@property
def operations_per_second(self) -> float:
"""Calculate operations per second."""
if self.total_duration == 0:
return 0.0
return self.total_operations / self.total_duration
@dataclass
class OperationResult:
"""Result of a concurrent operation."""
operation_id: str
worker_id: int
success: bool
duration: float
result: Any = None
error: Optional[Exception] = None
metadata: Dict[str, Any] = field(default_factory=dict)
# ============================================================================
# Race Condition Detection Utilities
# ============================================================================
class RaceConditionDetector:
"""
Utility for detecting race conditions in concurrent operations.
Monitors shared state modifications and detects potential race conditions
by tracking access patterns and timing.
"""
def __init__(self):
self.state_accesses: Dict[str, List[Tuple[float, str, str]]] = {}
self.lock = threading.Lock()
self.race_conditions: List[Dict[str, Any]] = []
def record_access(self, state_key: str, operation: str, worker_id: str):
"""Record a state access for race condition detection."""
timestamp = time.time()
with self.lock:
if state_key not in self.state_accesses:
self.state_accesses[state_key] = []
self.state_accesses[state_key].append((timestamp, operation, worker_id))
# Check for potential race conditions
self._check_race_condition(state_key)
def _check_race_condition(self, state_key: str):
"""Check for race conditions on a specific state key."""
accesses = self.state_accesses[state_key]
if len(accesses) < 2:
return
# Look for concurrent write operations
recent_accesses = [
access for access in accesses[-10:] # Last 10 accesses
if time.time() - access[0] < 0.1 # Within 100ms
]
write_operations = [
access for access in recent_accesses
if access[1] in ['write', 'update', 'delete', 'create']
]
if len(write_operations) >= 2:
# Potential race condition detected
race_condition = {
'state_key': state_key,
'timestamp': time.time(),
'conflicting_operations': write_operations,
'severity': 'HIGH' if len(write_operations) > 2 else 'MEDIUM'
}
self.race_conditions.append(race_condition)
def get_race_conditions(self) -> List[Dict[str, Any]]:
"""Get detected race conditions."""
with self.lock:
return self.race_conditions.copy()
def clear_history(self):
"""Clear access history."""
with self.lock:
self.state_accesses.clear()
self.race_conditions.clear()
# ============================================================================
# Resource Leak Detection
# ============================================================================
class ResourceLeakDetector:
"""
Utility for detecting resource leaks in concurrent operations.
Tracks resource allocation and deallocation to detect leaks.
"""
def __init__(self):
self.allocated_resources: Set[str] = set()
self.resource_history: List[Tuple[float, str, str]] = []
self.lock = threading.Lock()
self.leak_threshold = 100 # Number of unreleased resources to consider a leak
def allocate_resource(self, resource_id: str, resource_type: str):
"""Record resource allocation."""
with self.lock:
self.allocated_resources.add(resource_id)
self.resource_history.append((time.time(), 'allocate', resource_id))
def deallocate_resource(self, resource_id: str):
"""Record resource deallocation."""
with self.lock:
self.allocated_resources.discard(resource_id)
self.resource_history.append((time.time(), 'deallocate', resource_id))
def check_leaks(self) -> Dict[str, Any]:
"""Check for resource leaks."""
with self.lock:
leak_count = len(self.allocated_resources)
return {
'leak_detected': leak_count > self.leak_threshold,
'unreleased_resources': leak_count,
'leaked_resource_ids': list(self.allocated_resources) if leak_count <= 20 else [],
'total_allocations': len([h for h in self.resource_history if h[1] == 'allocate']),
'total_deallocations': len([h for h in self.resource_history if h[1] == 'deallocate'])
}
def cleanup(self):
"""Cleanup detector state."""
with self.lock:
self.allocated_resources.clear()
self.resource_history.clear()
# ============================================================================
# Deadlock Detection
# ============================================================================
class DeadlockDetector:
"""
Utility for detecting deadlocks in concurrent operations.
Monitors lock acquisition patterns and detects potential deadlocks.
"""
def __init__(self, timeout: float = 10.0):
self.timeout = timeout
self.lock_graph: Dict[str, Set[str]] = {}
self.waiting_for: Dict[str, str] = {}
self.lock = threading.Lock()
self.deadlocks: List[Dict[str, Any]] = []
def wait_for_lock(self, worker_id: str, lock_name: str):
"""Record that a worker is waiting for a lock."""
with self.lock:
self.waiting_for[worker_id] = lock_name
self._check_deadlock()
def acquire_lock(self, worker_id: str, lock_name: str):
"""Record lock acquisition."""
with self.lock:
if worker_id not in self.lock_graph:
self.lock_graph[worker_id] = set()
self.lock_graph[worker_id].add(lock_name)
self.waiting_for.pop(worker_id, None)
def release_lock(self, worker_id: str, lock_name: str):
"""Record lock release."""
with self.lock:
if worker_id in self.lock_graph:
self.lock_graph[worker_id].discard(lock_name)
def _check_deadlock(self):
"""Check for deadlock cycles."""
# Simplified deadlock detection - look for cycles in wait-for graph
visited = set()
path = set()
def has_cycle(worker: str) -> bool:
if worker in path:
return True
if worker in visited:
return False
visited.add(worker)
path.add(worker)
# Check if this worker is waiting for a lock held by another worker
waiting_lock = self.waiting_for.get(worker)
if waiting_lock:
for other_worker, held_locks in self.lock_graph.items():
if waiting_lock in held_locks and other_worker != worker:
if has_cycle(other_worker):
return True
path.remove(worker)
return False
for worker in self.waiting_for:
if has_cycle(worker):
deadlock = {
'timestamp': time.time(),
'involved_workers': list(path),
'lock_graph': dict(self.lock_graph),
'waiting_for': dict(self.waiting_for)
}
self.deadlocks.append(deadlock)
break
def get_deadlocks(self) -> List[Dict[str, Any]]:
"""Get detected deadlocks."""
with self.lock:
return self.deadlocks.copy()
def cleanup(self):
"""Cleanup detector state."""
with self.lock:
self.lock_graph.clear()
self.waiting_for.clear()
self.deadlocks.clear()
# ============================================================================
# Concurrent Operation Testing Framework
# ============================================================================
class ConcurrentOperationTester:
"""
Comprehensive framework for testing concurrent operations.
Provides utilities for running concurrent tests with race condition detection,
resource leak monitoring, and performance analysis.
"""
def __init__(self, config: ConcurrencyTestConfig = None):
self.config = config or ConcurrencyTestConfig()
self.logger = logging.getLogger(__name__)
# Detection utilities
self.race_detector = RaceConditionDetector()
self.leak_detector = ResourceLeakDetector()
self.deadlock_detector = DeadlockDetector(self.config.deadlock_detection_timeout)
# Test state
self.operation_results: List[OperationResult] = []
self.metrics = ConcurrencyTestMetrics()
self.active_operations = 0
self.peak_concurrent = 0
async def run_concurrent_operations(
self,
operation_func: Callable[..., Awaitable[Any]],
operation_args: List[Tuple] = None,
num_workers: int = None,
operations_per_worker: int = None
) -> ConcurrencyTestMetrics:
"""
Run concurrent operations and collect metrics.
Args:
operation_func: Async function to run concurrently
operation_args: Arguments for each operation (optional)
num_workers: Number of concurrent workers
operations_per_worker: Operations per worker
Returns:
ConcurrencyTestMetrics with results
"""
num_workers = num_workers or self.config.max_workers
operations_per_worker = operations_per_worker or self.config.max_operations_per_worker
self.logger.info(f"Starting concurrent test: {num_workers} workers, {operations_per_worker} ops each")
start_time = time.time()
# Create semaphore to limit concurrent operations
semaphore = asyncio.Semaphore(num_workers)
# Generate operation arguments if not provided
if operation_args is None:
total_operations = num_workers * operations_per_worker
operation_args = [(i,) for i in range(total_operations)]
# Create worker tasks
tasks = []
for worker_id in range(num_workers):
worker_args = operation_args[worker_id * operations_per_worker:(worker_id + 1) * operations_per_worker]
task = self._create_worker_task(
worker_id, operation_func, worker_args, semaphore
)
tasks.append(task)
# Run all tasks concurrently
try:
await asyncio.gather(*tasks, return_exceptions=True)
except Exception as e:
self.logger.error(f"Error in concurrent operations: {e}")
end_time = time.time()
# Calculate final metrics
self.metrics.total_duration = end_time - start_time
self.metrics.total_operations = len(self.operation_results)
self.metrics.successful_operations = len([r for r in self.operation_results if r.success])
self.metrics.failed_operations = self.metrics.total_operations - self.metrics.successful_operations
self.metrics.concurrent_peak = self.peak_concurrent
if self.operation_results:
self.metrics.average_operation_time = sum(r.duration for r in self.operation_results) / len(self.operation_results)
# Check for issues
if self.config.race_condition_detection:
self.metrics.race_conditions_detected = len(self.race_detector.get_race_conditions())
if self.config.resource_leak_detection:
leak_info = self.leak_detector.check_leaks()
self.metrics.resource_leaks_detected = 1 if leak_info['leak_detected'] else 0
deadlocks = self.deadlock_detector.get_deadlocks()
self.metrics.deadlocks_detected = len(deadlocks)
self.logger.info(f"Concurrent test completed: {self.metrics.success_rate:.1f}% success rate")
return self.metrics
async def _create_worker_task(
self,
worker_id: int,
operation_func: Callable[..., Awaitable[Any]],
worker_args: List[Tuple],
semaphore: asyncio.Semaphore
):
"""Create a worker task that runs multiple operations."""
for i, args in enumerate(worker_args):
async with semaphore:
operation_id = f"worker_{worker_id}_op_{i}"
# Track concurrent operations
self.active_operations += 1
self.peak_concurrent = max(self.peak_concurrent, self.active_operations)
# Record operation start
start_time = time.time()
try:
# Run the operation with timeout
result = await asyncio.wait_for(
operation_func(*args),
timeout=self.config.operation_timeout
)
# Record successful operation
duration = time.time() - start_time
op_result = OperationResult(
operation_id=operation_id,
worker_id=worker_id,
success=True,
duration=duration,
result=result
)
except Exception as e:
# Record failed operation
duration = time.time() - start_time
op_result = OperationResult(
operation_id=operation_id,
worker_id=worker_id,
success=False,
duration=duration,
error=e
)
self.logger.debug(f"Operation {operation_id} failed: {e}")
finally:
self.active_operations -= 1
self.operation_results.append(op_result)
async def test_agent_concurrent_operations(
self,
agent_manager: Any,
session_id: str,
num_agents: int = 8
) -> ConcurrencyTestMetrics:
"""
Test concurrent agent operations (create, update, delete).
Args:
agent_manager: Agent manager instance to test
session_id: Session ID for agent operations
num_agents: Number of agents to create concurrently
Returns:
ConcurrencyTestMetrics with results
"""
async def create_agent_operation(agent_index: int):
"""Create a single agent."""
agent_name = f"Agent_{agent_index}"
# Record state access for race condition detection
self.race_detector.record_access(
f"session_{session_id}_agents", "create", f"worker_{agent_index}"
)
# Record resource allocation
agent_id = f"agent_{session_id}_{agent_index}"
self.leak_detector.allocate_resource(agent_id, "agent")
try:
result = await agent_manager.create_agent(
session_id=session_id,
agent_name=agent_name,
specialization=f"Test Agent {agent_index}"
)
return result
except Exception as e:
# Record resource deallocation on failure
self.leak_detector.deallocate_resource(agent_id)
raise
# Run concurrent agent creation
operation_args = [(i,) for i in range(num_agents)]
return await self.run_concurrent_operations(
create_agent_operation,
operation_args,
num_workers=min(num_agents, self.config.max_workers),
operations_per_worker=1
)
async def test_message_sending_concurrency(
self,
message_sender: Any,
agent_ids: List[str],
messages_per_agent: int = 10
) -> ConcurrencyTestMetrics:
"""
Test concurrent message sending to multiple agents.
Args:
message_sender: Message sender instance
agent_ids: List of agent IDs to send messages to
messages_per_agent: Number of messages per agent
Returns:
ConcurrencyTestMetrics with results
"""
async def send_message_operation(agent_id: str, message_index: int):
"""Send a message to an agent."""
message = f"Test message {message_index} to {agent_id}"
# Record state access
self.race_detector.record_access(
f"agent_{agent_id}_messages", "write", f"msg_{message_index}"
)
try:
result = await message_sender.send_message_to_agent(
agent_name=agent_id,
message=message,
wait_for_response=False
)
return result
except Exception as e:
self.logger.debug(f"Message sending failed: {e}")
raise
# Generate operation arguments
operation_args = []
for agent_id in agent_ids:
for msg_index in range(messages_per_agent):
operation_args.append((agent_id, msg_index))
return await self.run_concurrent_operations(
send_message_operation,
operation_args,
num_workers=self.config.max_workers,
operations_per_worker=1
)
async def test_session_management_concurrency(
self,
session_manager: Any,
num_sessions: int = 4
) -> ConcurrencyTestMetrics:
"""
Test concurrent session management operations.
Args:
session_manager: Session manager instance
num_sessions: Number of sessions to manage concurrently
Returns:
ConcurrencyTestMetrics with results
"""
async def session_lifecycle_operation(session_index: int):
"""Complete session lifecycle (create, use, delete)."""
session_id = f"concurrent_session_{session_index}"
try:
# Create session
self.race_detector.record_access("global_sessions", "create", session_id)
self.leak_detector.allocate_resource(session_id, "session")
create_result = await session_manager.create_session(
session_id=session_id,
name=f"Concurrent Test Session {session_index}",
root_path=f"/tmp/test_session_{session_index}"
)
# Simulate session usage
await asyncio.sleep(0.1)
# Get session status
status = await session_manager.get_session_status(session_id)
# Delete session
self.race_detector.record_access("global_sessions", "delete", session_id)
delete_result = await session_manager.delete_session(session_id)
# Record resource deallocation
self.leak_detector.deallocate_resource(session_id)
return {
"session_id": session_id,
"create_result": create_result,
"status": status,
"delete_result": delete_result
}
except Exception as e:
# Cleanup on failure
self.leak_detector.deallocate_resource(session_id)
raise
operation_args = [(i,) for i in range(num_sessions)]
return await self.run_concurrent_operations(
session_lifecycle_operation,
operation_args,
num_workers=min(num_sessions, self.config.max_workers),
operations_per_worker=1
)
def get_detailed_report(self) -> Dict[str, Any]:
"""
Generate a detailed report of concurrent testing results.
Returns:
Dictionary with comprehensive test results
"""
race_conditions = self.race_detector.get_race_conditions()
leak_info = self.leak_detector.check_leaks()
deadlocks = self.deadlock_detector.get_deadlocks()
# Analyze operation timing patterns
timing_analysis = self._analyze_operation_timing()
# Generate recommendations
recommendations = self._generate_recommendations()
return {
"summary": {
"total_operations": self.metrics.total_operations,
"success_rate": self.metrics.success_rate,
"operations_per_second": self.metrics.operations_per_second,
"average_operation_time": self.metrics.average_operation_time,
"peak_concurrent_operations": self.metrics.concurrent_peak,
"total_duration": self.metrics.total_duration
},
"concurrency_issues": {
"race_conditions": {
"count": len(race_conditions),
"details": race_conditions
},
"resource_leaks": {
"detected": leak_info['leak_detected'],
"details": leak_info
},
"deadlocks": {
"count": len(deadlocks),
"details": deadlocks
}
},
"performance_analysis": timing_analysis,
"recommendations": recommendations,
"operation_results": [
{
"operation_id": r.operation_id,
"worker_id": r.worker_id,
"success": r.success,
"duration": r.duration,
"error": str(r.error) if r.error else None
}
for r in self.operation_results[:100] # Limit to first 100 for readability
]
}
def _analyze_operation_timing(self) -> Dict[str, Any]:
"""Analyze operation timing patterns."""
if not self.operation_results:
return {}
durations = [r.duration for r in self.operation_results if r.success]
if not durations:
return {}
durations.sort()
return {
"min_duration": min(durations),
"max_duration": max(durations),
"median_duration": durations[len(durations) // 2],
"p95_duration": durations[int(0.95 * len(durations))],
"p99_duration": durations[int(0.99 * len(durations))],
"duration_std_dev": self._calculate_std_dev(durations)
}
def _calculate_std_dev(self, values: List[float]) -> float:
"""Calculate standard deviation."""
if not values:
return 0.0
mean = sum(values) / len(values)
variance = sum((x - mean) ** 2 for x in values) / len(values)
return variance ** 0.5
def _generate_recommendations(self) -> List[str]:
"""Generate recommendations based on test results."""
recommendations = []
if self.metrics.success_rate < 95.0:
recommendations.append(
f"Success rate ({self.metrics.success_rate:.1f}%) is below 95% - investigate failures"
)
if self.metrics.race_conditions_detected > 0:
recommendations.append(
f"Race conditions detected ({self.metrics.race_conditions_detected}) - add synchronization"
)
if self.metrics.resource_leaks_detected > 0:
recommendations.append("Resource leaks detected - implement proper cleanup")
if self.metrics.deadlocks_detected > 0:
recommendations.append("Deadlocks detected - review lock ordering and timeout handling")
if self.metrics.operations_per_second < 10:
recommendations.append("Low throughput - optimize critical paths")
if self.metrics.average_operation_time > 5.0:
recommendations.append("High average operation time - profile and optimize")
return recommendations
def cleanup(self):
"""Cleanup test state and detectors."""
self.race_detector.clear_history()
self.leak_detector.cleanup()
self.deadlock_detector.cleanup()
self.operation_results.clear()
self.metrics = ConcurrencyTestMetrics()
# ============================================================================
# Concurrent Test Utilities and Decorators
# ============================================================================
def concurrent_test(
max_workers: int = 8,
operations_per_worker: int = 10,
timeout: float = 30.0
):
"""
Decorator for concurrent operation tests.
Args:
max_workers: Maximum concurrent workers
operations_per_worker: Operations per worker
timeout: Test timeout in seconds
"""
def decorator(test_func):
@functools.wraps(test_func)
async def wrapper(*args, **kwargs):
config = ConcurrencyTestConfig(
max_workers=max_workers,
max_operations_per_worker=operations_per_worker,
operation_timeout=timeout
)
tester = ConcurrentOperationTester(config)
try:
return await test_func(tester, *args, **kwargs)
finally:
tester.cleanup()
return wrapper
return decorator
@asynccontextmanager
async def concurrent_test_context(config: ConcurrencyTestConfig = None):
"""Context manager for concurrent testing."""
tester = ConcurrentOperationTester(config)
try:
yield tester
finally:
tester.cleanup()
# Export main components
__all__ = [
'ConcurrentOperationTester', 'ConcurrencyTestConfig', 'ConcurrencyTestMetrics',
'RaceConditionDetector', 'ResourceLeakDetector', 'DeadlockDetector',
'OperationResult', 'concurrent_test', 'concurrent_test_context'
]