"""
Mock iTerm2 Manager for isolated testing of Agent Orchestration Platform.
This module provides a comprehensive mock implementation of iTerm2 integration that:
- Simulates tab creation, management, and destruction
- Mocks text injection and output capture capabilities
- Provides health monitoring and crash recovery simulation
- Implements debugging commands (/debug, /logs, /mcp-debug, /config)
- Supports async operations matching the real iTerm2 manager interface
- Enables testing without requiring actual iTerm2 installation
Designed to match the real iTerm2 manager interface for seamless testing.
Author: Adder_1 | Created: 2025-06-26 | Testing Infrastructure Task
"""
import asyncio
import time
import secrets
from typing import Dict, List, Any, Optional, Callable, AsyncGenerator
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from unittest.mock import AsyncMock, MagicMock
import json
import logging
from pathlib import Path
# ============================================================================
# Mock iTerm2 Data Structures
# ============================================================================
@dataclass
class MockTab:
"""Mock representation of an iTerm2 tab."""
tab_id: str
session_id: str
agent_id: Optional[str] = None
title: str = "Claude Code"
is_active: bool = True
created_at: datetime = field(default_factory=datetime.now)
last_activity: datetime = field(default_factory=datetime.now)
current_directory: str = "/tmp"
process_id: Optional[int] = None
text_buffer: List[str] = field(default_factory=list)
command_history: List[str] = field(default_factory=list)
health_status: str = "HEALTHY"
debug_info: Dict[str, Any] = field(default_factory=dict)
@dataclass
class MockSession:
"""Mock representation of an iTerm2 session (collection of tabs)."""
session_id: str
name: str
tabs: Dict[str, MockTab] = field(default_factory=dict)
created_at: datetime = field(default_factory=datetime.now)
root_directory: str = "/tmp"
is_active: bool = True
@dataclass
class MockiTermState:
"""Overall state of the mock iTerm2 instance."""
sessions: Dict[str, MockSession] = field(default_factory=dict)
active_session: Optional[str] = None
total_tabs_created: int = 0
crash_simulation_enabled: bool = False
response_delay_ms: int = 0 # Simulate network/processing delays
failure_rate: float = 0.0 # Simulate occasional failures
# ============================================================================
# Mock iTerm2 Manager Implementation
# ============================================================================
class MockiTermManager:
"""
Comprehensive mock iTerm2 manager for testing agent orchestration.
Provides all the functionality of the real iTerm2 manager interface
while running entirely in-memory for fast, isolated testing.
"""
def __init__(self, config: Dict[str, Any] = None):
self.config = config or {}
self.state = MockiTermState()
self.logger = logging.getLogger(__name__)
self.event_handlers: Dict[str, List[Callable]] = {}
self.is_initialized = False
self.operation_count = 0
# Configure behavior from config
self.state.crash_simulation_enabled = self.config.get('simulate_crashes', False)
self.state.response_delay_ms = self.config.get('response_delay_ms', 0)
self.state.failure_rate = self.config.get('failure_rate', 0.0)
async def initialize(self) -> bool:
"""
Initialize the mock iTerm2 manager.
Returns:
True if initialization successful
"""
await self._simulate_delay()
if self.state.failure_rate > 0 and secrets.randbelow(100) < (self.state.failure_rate * 100):
raise ConnectionError("Mock iTerm2 initialization failed")
self.is_initialized = True
self.logger.info("Mock iTerm2 manager initialized")
# Create default session
default_session = MockSession(
session_id="default_session",
name="Default Session",
root_directory=self.config.get('default_directory', '/tmp')
)
self.state.sessions["default_session"] = default_session
self.state.active_session = "default_session"
return True
async def cleanup(self) -> None:
"""Clean up mock iTerm2 manager resources."""
await self._simulate_delay()
# Close all tabs and sessions
for session in self.state.sessions.values():
for tab in session.tabs.values():
tab.is_active = False
self.state.sessions.clear()
self.is_initialized = False
self.logger.info("Mock iTerm2 manager cleaned up")
async def create_tab(
self,
session_id: str,
agent_id: str,
working_directory: str = None,
title: str = None
) -> str:
"""
Create a new iTerm2 tab for an agent.
Args:
session_id: Session to create tab in
agent_id: Agent that will use this tab
working_directory: Directory to set as current
title: Tab title (defaults to agent name)
Returns:
Tab ID string
Raises:
ConnectionError: If iTerm2 connection fails
ValueError: If session doesn't exist
"""
await self._simulate_delay()
await self._check_initialization()
await self._simulate_random_failure("create_tab")
# Validate session exists
if session_id not in self.state.sessions:
raise ValueError(f"Session {session_id} does not exist")
session = self.state.sessions[session_id]
# Generate unique tab ID
tab_id = f"tab_{agent_id}_{int(time.time())}_{secrets.randbelow(1000)}"
# Create mock tab
tab = MockTab(
tab_id=tab_id,
session_id=session_id,
agent_id=agent_id,
title=title or f"Agent {agent_id}",
current_directory=working_directory or session.root_directory,
process_id=secrets.randbelow(32767) + 1000, # Realistic PID
debug_info={
"created_by": "MockiTermManager",
"creation_method": "create_tab",
"initial_directory": working_directory
}
)
# Add to session
session.tabs[tab_id] = tab
self.state.total_tabs_created += 1
# Simulate initial setup
tab.text_buffer.append(f"iTerm2 tab created for {agent_id}")
tab.text_buffer.append(f"Working directory: {tab.current_directory}")
tab.command_history.append("# Tab initialization")
self.logger.info(f"Created tab {tab_id} for agent {agent_id} in session {session_id}")
await self._emit_event("tab_created", {
"tab_id": tab_id,
"agent_id": agent_id,
"session_id": session_id
})
return tab_id
async def close_tab(self, tab_id: str) -> bool:
"""
Close an iTerm2 tab.
Args:
tab_id: Tab to close
Returns:
True if tab was closed successfully
"""
await self._simulate_delay()
await self._check_initialization()
await self._simulate_random_failure("close_tab")
# Find and close tab
for session in self.state.sessions.values():
if tab_id in session.tabs:
tab = session.tabs[tab_id]
tab.is_active = False
tab.health_status = "CLOSED"
# Add final log entry
tab.text_buffer.append(f"Tab {tab_id} closed at {datetime.now().isoformat()}")
# Remove from session
del session.tabs[tab_id]
self.logger.info(f"Closed tab {tab_id}")
await self._emit_event("tab_closed", {"tab_id": tab_id})
return True
self.logger.warning(f"Tab {tab_id} not found for closing")
return False
async def send_text_to_tab(
self,
tab_id: str,
text: str,
append_newline: bool = True
) -> bool:
"""
Send text to an iTerm2 tab (simulate typing).
Args:
tab_id: Target tab ID
text: Text to send
append_newline: Whether to append newline
Returns:
True if text was sent successfully
"""
await self._simulate_delay()
await self._check_initialization()
await self._simulate_random_failure("send_text")
tab = await self._find_tab(tab_id)
if not tab:
return False
# Simulate text injection
final_text = text + ("\\n" if append_newline else "")
tab.text_buffer.append(f"INPUT: {final_text}")
tab.command_history.append(text)
tab.last_activity = datetime.now()
# Simulate command processing for special debugging commands
await self._process_special_commands(tab, text)
self.logger.debug(f"Sent text to tab {tab_id}: {text[:50]}...")
await self._emit_event("text_sent", {
"tab_id": tab_id,
"text_length": len(text)
})
return True
async def get_tab_output(
self,
tab_id: str,
lines: int = 50
) -> List[str]:
"""
Get recent output from an iTerm2 tab.
Args:
tab_id: Tab to read from
lines: Number of lines to retrieve
Returns:
List of output lines
"""
await self._simulate_delay()
await self._check_initialization()
tab = await self._find_tab(tab_id)
if not tab:
return []
# Return recent buffer content
return tab.text_buffer[-lines:] if tab.text_buffer else []
async def get_tab_status(self, tab_id: str) -> Dict[str, Any]:
"""
Get detailed status of an iTerm2 tab.
Args:
tab_id: Tab to check
Returns:
Dictionary with tab status information
"""
await self._simulate_delay()
await self._check_initialization()
tab = await self._find_tab(tab_id)
if not tab:
return {
"exists": False,
"error": "Tab not found"
}
# Calculate tab metrics
uptime = datetime.now() - tab.created_at
last_activity_age = datetime.now() - tab.last_activity
return {
"exists": True,
"tab_id": tab_id,
"agent_id": tab.agent_id,
"title": tab.title,
"is_active": tab.is_active,
"health_status": tab.health_status,
"process_id": tab.process_id,
"current_directory": tab.current_directory,
"uptime_seconds": uptime.total_seconds(),
"last_activity_seconds_ago": last_activity_age.total_seconds(),
"buffer_lines": len(tab.text_buffer),
"commands_executed": len(tab.command_history),
"responsive": last_activity_age.total_seconds() < 300, # 5 minutes
"debug_info": tab.debug_info
}
async def list_tabs(self, session_id: str = None) -> List[Dict[str, Any]]:
"""
List all tabs, optionally filtered by session.
Args:
session_id: Session to filter by (optional)
Returns:
List of tab information dictionaries
"""
await self._simulate_delay()
await self._check_initialization()
tabs = []
sessions_to_check = [self.state.sessions[session_id]] if session_id else self.state.sessions.values()
for session in sessions_to_check:
for tab in session.tabs.values():
tabs.append({
"tab_id": tab.tab_id,
"session_id": tab.session_id,
"agent_id": tab.agent_id,
"title": tab.title,
"is_active": tab.is_active,
"health_status": tab.health_status,
"created_at": tab.created_at.isoformat(),
"last_activity": tab.last_activity.isoformat()
})
return tabs
async def check_health(self) -> Dict[str, Any]:
"""
Check overall health of mock iTerm2 integration.
Returns:
Health status dictionary
"""
await self._simulate_delay()
if not self.is_initialized:
return {
"status": "NOT_INITIALIZED",
"message": "Mock iTerm2 manager not initialized"
}
# Count active tabs and sessions
total_tabs = sum(len(session.tabs) for session in self.state.sessions.values())
active_tabs = sum(
len([tab for tab in session.tabs.values() if tab.is_active])
for session in self.state.sessions.values()
)
# Calculate health score
health_score = 100
if self.state.crash_simulation_enabled:
health_score -= 20
if self.state.failure_rate > 0.1:
health_score -= int(self.state.failure_rate * 100)
return {
"status": "HEALTHY" if health_score > 70 else "DEGRADED" if health_score > 30 else "UNHEALTHY",
"health_score": health_score,
"sessions_count": len(self.state.sessions),
"total_tabs": total_tabs,
"active_tabs": active_tabs,
"total_tabs_created": self.state.total_tabs_created,
"operations_count": self.operation_count,
"crash_simulation": self.state.crash_simulation_enabled,
"failure_rate": self.state.failure_rate,
"response_delay_ms": self.state.response_delay_ms
}
async def simulate_crash_recovery(self, tab_id: str) -> bool:
"""
Simulate tab crash and recovery for testing.
Args:
tab_id: Tab to crash and recover
Returns:
True if recovery successful
"""
await self._simulate_delay()
tab = await self._find_tab(tab_id)
if not tab:
return False
# Simulate crash
tab.health_status = "CRASHED"
tab.text_buffer.append("SIMULATION: Tab crashed")
await asyncio.sleep(0.1) # Brief crash period
# Simulate recovery
tab.health_status = "RECOVERING"
tab.text_buffer.append("SIMULATION: Recovery initiated")
tab.process_id = secrets.randbelow(32767) + 1000 # New PID
await asyncio.sleep(0.2) # Recovery time
# Complete recovery
tab.health_status = "HEALTHY"
tab.text_buffer.append("SIMULATION: Recovery complete")
tab.last_activity = datetime.now()
self.logger.info(f"Simulated crash and recovery for tab {tab_id}")
await self._emit_event("tab_recovered", {"tab_id": tab_id})
return True
async def inject_debugging_commands(self, tab_id: str) -> Dict[str, str]:
"""
Inject debugging commands into a tab for testing.
Args:
tab_id: Tab to inject commands into
Returns:
Dictionary mapping command names to their responses
"""
await self._simulate_delay()
tab = await self._find_tab(tab_id)
if not tab:
return {}
debug_commands = {
"/debug": "Debug mode enabled. Tab health: HEALTHY, Process ID: {}, Uptime: {}".format(
tab.process_id,
datetime.now() - tab.created_at
),
"/logs": "Recent activity:\\n" + "\\n".join(tab.text_buffer[-5:]),
"/mcp-debug": "MCP connection: ACTIVE, Tools registered: 8, Last request: 2s ago",
"/config": json.dumps({
"tab_id": tab.tab_id,
"agent_id": tab.agent_id,
"directory": tab.current_directory,
"title": tab.title
}, indent=2)
}
# Inject commands and responses
for command, response in debug_commands.items():
tab.text_buffer.append(f"INPUT: {command}")
tab.text_buffer.append(f"OUTPUT: {response}")
tab.command_history.append(command)
return debug_commands
# ========================================================================
# Session Management Methods
# ========================================================================
async def create_session(
self,
session_id: str,
name: str,
root_directory: str = "/tmp"
) -> bool:
"""Create a new iTerm2 session."""
await self._simulate_delay()
await self._check_initialization()
if session_id in self.state.sessions:
return False
session = MockSession(
session_id=session_id,
name=name,
root_directory=root_directory
)
self.state.sessions[session_id] = session
if not self.state.active_session:
self.state.active_session = session_id
self.logger.info(f"Created session {session_id}")
await self._emit_event("session_created", {
"session_id": session_id,
"name": name
})
return True
async def close_session(self, session_id: str) -> bool:
"""Close an iTerm2 session and all its tabs."""
await self._simulate_delay()
await self._check_initialization()
if session_id not in self.state.sessions:
return False
session = self.state.sessions[session_id]
# Close all tabs in session
for tab_id in list(session.tabs.keys()):
await self.close_tab(tab_id)
# Remove session
del self.state.sessions[session_id]
# Update active session if needed
if self.state.active_session == session_id:
self.state.active_session = next(iter(self.state.sessions.keys()), None)
self.logger.info(f"Closed session {session_id}")
await self._emit_event("session_closed", {"session_id": session_id})
return True
# ========================================================================
# Helper Methods
# ========================================================================
async def _simulate_delay(self) -> None:
"""Simulate processing delay if configured."""
if self.state.response_delay_ms > 0:
await asyncio.sleep(self.state.response_delay_ms / 1000.0)
self.operation_count += 1
async def _check_initialization(self) -> None:
"""Check if manager is initialized."""
if not self.is_initialized:
raise RuntimeError("Mock iTerm2 manager not initialized")
async def _simulate_random_failure(self, operation: str) -> None:
"""Simulate random failures for testing error handling."""
if self.state.failure_rate > 0:
if secrets.randbelow(100) < (self.state.failure_rate * 100):
raise ConnectionError(f"Mock failure during {operation}")
async def _find_tab(self, tab_id: str) -> Optional[MockTab]:
"""Find a tab by ID across all sessions."""
for session in self.state.sessions.values():
if tab_id in session.tabs:
return session.tabs[tab_id]
return None
async def _process_special_commands(self, tab: MockTab, command: str) -> None:
"""Process special debugging commands."""
command = command.strip()
if command == "/debug":
response = f"Debug info for tab {tab.tab_id}:\\nHealth: {tab.health_status}\\nPID: {tab.process_id}\\nUptime: {datetime.now() - tab.created_at}"
tab.text_buffer.append(f"OUTPUT: {response}")
elif command == "/logs":
logs = "\\n".join(tab.text_buffer[-10:])
tab.text_buffer.append(f"OUTPUT: Recent logs:\\n{logs}")
elif command == "/mcp-debug":
mcp_status = "MCP Status: CONNECTED\\nRegistered tools: 8\\nLast communication: 2s ago"
tab.text_buffer.append(f"OUTPUT: {mcp_status}")
elif command == "/config":
config = json.dumps({
"tab_config": {
"tab_id": tab.tab_id,
"agent_id": tab.agent_id,
"title": tab.title,
"directory": tab.current_directory
}
}, indent=2)
tab.text_buffer.append(f"OUTPUT: {config}")
async def _emit_event(self, event_type: str, data: Dict[str, Any]) -> None:
"""Emit events to registered handlers."""
if event_type in self.event_handlers:
for handler in self.event_handlers[event_type]:
try:
if asyncio.iscoroutinefunction(handler):
await handler(data)
else:
handler(data)
except Exception as e:
self.logger.error(f"Error in event handler: {e}")
def register_event_handler(self, event_type: str, handler: Callable) -> None:
"""Register an event handler."""
if event_type not in self.event_handlers:
self.event_handlers[event_type] = []
self.event_handlers[event_type].append(handler)
def get_mock_state(self) -> MockiTermState:
"""Get current mock state for testing inspection."""
return self.state
def configure_behavior(self, **kwargs) -> None:
"""Configure mock behavior for testing scenarios."""
if 'crash_simulation' in kwargs:
self.state.crash_simulation_enabled = kwargs['crash_simulation']
if 'response_delay_ms' in kwargs:
self.state.response_delay_ms = kwargs['response_delay_ms']
if 'failure_rate' in kwargs:
self.state.failure_rate = kwargs['failure_rate']
# ============================================================================
# Mock iTerm2 Manager Factory and Utilities
# ============================================================================
def create_mock_iterm_manager(
crash_simulation: bool = False,
response_delay_ms: int = 0,
failure_rate: float = 0.0
) -> MockiTermManager:
"""
Factory function to create configured mock iTerm2 manager.
Args:
crash_simulation: Enable crash simulation
response_delay_ms: Simulate network/processing delays
failure_rate: Rate of random failures (0.0 to 1.0)
Returns:
Configured MockiTermManager instance
"""
config = {
'simulate_crashes': crash_simulation,
'response_delay_ms': response_delay_ms,
'failure_rate': failure_rate
}
return MockiTermManager(config)
def create_performance_mock() -> MockiTermManager:
"""Create a high-performance mock for benchmarking."""
return create_mock_iterm_manager(
crash_simulation=False,
response_delay_ms=0,
failure_rate=0.0
)
def create_stress_test_mock() -> MockiTermManager:
"""Create a mock configured for stress testing."""
return create_mock_iterm_manager(
crash_simulation=True,
response_delay_ms=10,
failure_rate=0.05 # 5% failure rate
)
def create_development_mock() -> MockiTermManager:
"""Create a mock configured for development testing."""
return create_mock_iterm_manager(
crash_simulation=False,
response_delay_ms=50, # Realistic delay
failure_rate=0.01 # Occasional failures
)
# Export main components
__all__ = [
'MockiTermManager', 'MockTab', 'MockSession', 'MockiTermState',
'create_mock_iterm_manager', 'create_performance_mock',
'create_stress_test_mock', 'create_development_mock'
]
# Alias for backwards compatibility
MockITermManager = MockiTermManager