"""
iTerm2 Integration Types and Async Protocols - Agent Orchestration Platform
Architecture Integration:
- Design Patterns: Async Command pattern with iTerm2 API integration
- Security Model: Secure terminal access with session isolation
- Performance Profile: Async operations with connection pooling
Technical Decisions:
- Async Protocols: Non-blocking iTerm2 operations with timeout handling
- Tab Management: Comprehensive tab lifecycle with health monitoring
- Text Injection: Safe text injection with encoding and validation
- Connection Pooling: Efficient resource usage with connection reuse
Dependencies & Integration:
- External: iTerm2 Python API for terminal integration
- Internal: Foundation for all terminal-based agent operations
Quality Assurance:
- Test Coverage: Property-based testing for all iTerm2 operations
- Error Handling: Comprehensive error recovery with fallback strategies
Author: Adder_3 | Created: 2025-06-26 | Last Modified: 2025-06-26
"""
import asyncio
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
from pathlib import Path
from typing import Any, Awaitable, Callable, Dict, List, Optional, Union
from .communication import Message, MessageResult
from .ids import AgentId, ProcessId, SessionId, TabId
# ============================================================================
# ITERM2 ENUMERATION - iTerm2 States and Operations
# ============================================================================
class TabState(Enum):
"""iTerm2 tab states."""
CREATED = "created"
ACTIVE = "active"
INACTIVE = "inactive"
BUSY = "busy"
ERROR = "error"
CLOSED = "closed"
# Alias for backwards compatibility
TabStatus = TabState
class ITermOperation(Enum):
"""Types of iTerm2 operations."""
CREATE_TAB = "create_tab"
CLOSE_TAB = "close_tab"
ACTIVATE_TAB = "activate_tab"
INJECT_TEXT = "inject_text"
CAPTURE_OUTPUT = "capture_output"
HEALTH_CHECK = "health_check"
GET_STATUS = "get_status"
class ConnectionState(Enum):
"""iTerm2 connection states."""
DISCONNECTED = "disconnected"
CONNECTING = "connecting"
CONNECTED = "connected"
AUTHENTICATED = "authenticated"
ERROR = "error"
# ============================================================================
# ITERM2 EXCEPTIONS - Typed Error Handling
# ============================================================================
class ITermError(Exception):
"""Base exception for iTerm2-related errors."""
def __init__(
self,
message: str,
operation: ITermOperation = None,
error_code: str = "ITERM_ERROR",
):
self.operation = operation
self.error_code = error_code
super().__init__(
f"[{error_code}] {operation.value if operation else 'iterm'}: {message}"
)
class ITermConnectionError(ITermError):
"""Exception for iTerm2 connection failures."""
def __init__(self, message: str):
super().__init__(message, None, "ITERM_CONNECTION_ERROR")
class ITermTabError(ITermError):
"""Exception for iTerm2 tab operation failures."""
def __init__(self, message: str, operation: ITermOperation = None):
super().__init__(message, operation, "ITERM_TAB_ERROR")
class ITermTextInjectionError(ITermError):
"""Exception for text injection failures."""
def __init__(self, message: str):
super().__init__(
message, ITermOperation.INJECT_TEXT, "ITERM_TEXT_INJECTION_ERROR"
)
# ============================================================================
# TAB INFORMATION - iTerm2 Tab Management
# ============================================================================
@dataclass(frozen=True)
class TabInfo:
"""
Immutable iTerm2 tab information with comprehensive metadata.
Architecture:
- Pattern: Value Object with complete tab state
- Security: Secure tab isolation with session boundaries
- Performance: Efficient tab management with state tracking
- Integration: Foundation for agent-tab association
Contracts:
Preconditions:
- Tab identifier is unique within iTerm2 session
- Process information is accurate and current
- Working directory is within session boundaries
Postconditions:
- Tab state accurately reflects iTerm2 tab status
- Process association is maintained and validated
- Performance metrics are current and accurate
Invariants:
- Tab-process association consistency
- Working directory security boundaries
- Session isolation and access control
Security Implementation:
- Tab Isolation: Each tab operates in isolated context
- Process Tracking: Secure process association and monitoring
- Directory Control: Working directory within session boundaries
- Access Validation: Tab access controlled by session context
"""
# Tab identification
tab_id: TabId
tab_title: str
tab_index: int
window_id: str
# Tab state and status
state: TabState
is_active: bool = False
is_busy: bool = False
last_activity_time: datetime = field(default_factory=datetime.now)
# Process and session information
process_id: Optional[ProcessId] = None
session_id: Optional[SessionId] = None
agent_id: Optional[AgentId] = None
working_directory: Optional[Path] = None
# Performance and monitoring
creation_time: datetime = field(default_factory=datetime.now)
last_health_check: datetime = field(default_factory=datetime.now)
response_time_ms: Optional[float] = None
# Configuration
profile_name: str = "Default"
custom_environment: Dict[str, str] = field(default_factory=dict)
def __post_init__(self):
"""Validate tab information structure."""
# Tab identification validation
if self.tab_index < 0:
raise ValueError("Tab index cannot be negative")
if not self.window_id or not self.window_id.strip():
raise ValueError("Window ID cannot be empty")
if not self.tab_title or not self.tab_title.strip():
raise ValueError("Tab title cannot be empty")
# Working directory validation
if self.working_directory and not self.working_directory.is_absolute():
raise ValueError(
f"Working directory must be absolute: {self.working_directory}"
)
# Performance validation
if self.response_time_ms is not None and self.response_time_ms < 0:
raise ValueError("Response time cannot be negative")
# Profile name validation
if len(self.profile_name) > 50:
raise ValueError("Profile name too long")
# Environment validation
if len(self.custom_environment) > 100:
raise ValueError("Too many custom environment variables")
for key, value in self.custom_environment.items():
if not isinstance(key, str) or len(key) > 100:
raise ValueError(f"Invalid environment key: {key}")
if not isinstance(value, str) or len(value) > 1000:
raise ValueError(f"Invalid environment value for key {key}")
def is_healthy(self) -> bool:
"""Check if tab is in healthy state."""
return self.state in [
TabState.ACTIVE,
TabState.INACTIVE,
] and self.last_activity_time > datetime.now() - timedelta(minutes=10)
def needs_health_check(self, check_interval_seconds: int = 30) -> bool:
"""Check if tab needs health check."""
check_age = datetime.now() - self.last_health_check
return check_age.total_seconds() > check_interval_seconds
def get_uptime(self) -> timedelta:
"""Get tab uptime since creation."""
return datetime.now() - self.creation_time
def get_idle_time(self) -> timedelta:
"""Get time since last activity."""
return datetime.now() - self.last_activity_time
def with_state_update(self, new_state: TabState, is_busy: bool = None) -> "TabInfo":
"""
Create new tab info with updated state.
Args:
new_state: New tab state
is_busy: Optional busy flag update
Returns:
TabInfo: New tab info with updated state
"""
return TabInfo(
tab_id=self.tab_id,
tab_title=self.tab_title,
tab_index=self.tab_index,
window_id=self.window_id,
state=new_state,
is_active=self.is_active,
is_busy=is_busy if is_busy is not None else self.is_busy,
last_activity_time=(
datetime.now()
if new_state == TabState.ACTIVE
else self.last_activity_time
),
process_id=self.process_id,
session_id=self.session_id,
agent_id=self.agent_id,
working_directory=self.working_directory,
creation_time=self.creation_time,
last_health_check=datetime.now(),
response_time_ms=self.response_time_ms,
profile_name=self.profile_name,
custom_environment=self.custom_environment,
)
def with_process_association(
self, process_id: ProcessId, agent_id: AgentId
) -> "TabInfo":
"""
Create new tab info with process association.
Args:
process_id: Process ID to associate
agent_id: Agent ID to associate
Returns:
TabInfo: New tab info with process association
"""
return TabInfo(
tab_id=self.tab_id,
tab_title=self.tab_title,
tab_index=self.tab_index,
window_id=self.window_id,
state=self.state,
is_active=self.is_active,
is_busy=self.is_busy,
last_activity_time=self.last_activity_time,
process_id=process_id,
session_id=self.session_id,
agent_id=agent_id,
working_directory=self.working_directory,
creation_time=self.creation_time,
last_health_check=self.last_health_check,
response_time_ms=self.response_time_ms,
profile_name=self.profile_name,
custom_environment=self.custom_environment,
)
# ============================================================================
# TEXT INJECTION - Safe Text Injection with Validation
# ============================================================================
@dataclass(frozen=True)
class TextInjectionRequest:
"""
Immutable text injection request with comprehensive validation.
Contracts:
Invariants:
- Text content is sanitized and safe for injection
- Encoding is valid and supported
- Timing parameters are within safe ranges
"""
# Target and content
tab_id: TabId
text_content: str
# Injection options
append_newline: bool = True
escape_special_chars: bool = True
encoding: str = "utf-8"
# Timing and behavior
injection_delay_ms: int = 100
chunk_size: int = 1000
wait_for_prompt: bool = False
# Validation and safety
validate_before_injection: bool = True
max_injection_size: int = 100000 # 100KB limit
def __post_init__(self):
"""Validate text injection request."""
# Content validation
if not self.text_content:
raise ValueError("Text content cannot be empty")
if len(self.text_content.encode(self.encoding)) > self.max_injection_size:
raise ValueError(
f"Text content exceeds maximum size: {self.max_injection_size} bytes"
)
# Encoding validation
valid_encodings = {"utf-8", "ascii", "latin-1"}
if self.encoding not in valid_encodings:
raise ValueError(f"Invalid encoding: {self.encoding}")
# Timing validation
if self.injection_delay_ms < 0 or self.injection_delay_ms > 5000:
raise ValueError(
f"Injection delay {self.injection_delay_ms} must be in range [0, 5000]"
)
if self.chunk_size <= 0 or self.chunk_size > 10000:
raise ValueError(
f"Chunk size {self.chunk_size} must be in range (0, 10000]"
)
# Security validation
if self.validate_before_injection:
dangerous_patterns = [
"rm -rf",
"sudo rm",
"dd if=",
"mkfs",
"format c:",
"del /f /s /q",
"rmdir /s",
"> /dev/",
"| sh",
"| bash",
]
content_lower = self.text_content.lower()
for pattern in dangerous_patterns:
if pattern in content_lower:
raise ValueError(f"Text contains dangerous pattern: {pattern}")
def get_sanitized_text(self) -> str:
"""
Get sanitized text for safe injection.
Returns:
str: Sanitized text content
"""
text = self.text_content
if self.escape_special_chars:
# Escape shell special characters
special_chars = {
"'": "\\'",
'"': '\\"',
"`": "\\`",
"$": "\\$",
"\\": "\\\\",
}
for char, escaped in special_chars.items():
text = text.replace(char, escaped)
if self.append_newline and not text.endswith("\n"):
text += "\n"
return text
def get_text_chunks(self) -> List[str]:
"""
Split text into chunks for gradual injection.
Returns:
List[str]: Text chunks for injection
"""
text = self.get_sanitized_text()
chunks = []
for i in range(0, len(text), self.chunk_size):
chunk = text[i : i + self.chunk_size]
chunks.append(chunk)
return chunks
@dataclass(frozen=True)
class TextInjectionResult:
"""
Immutable result of text injection operation.
Contracts:
Invariants:
- Injection status accurately reflects operation outcome
- Performance metrics are current and accurate
- Error information is complete when injection fails
"""
# Request and result
request: TextInjectionRequest
success: bool
injection_time: datetime = field(default_factory=datetime.now)
# Performance metrics
total_duration_ms: Optional[int] = None
chunks_injected: int = 0
bytes_injected: int = 0
# Error information
error_message: Optional[str] = None
failed_at_chunk: Optional[int] = None
def __post_init__(self):
"""Validate text injection result."""
if self.total_duration_ms is not None and self.total_duration_ms < 0:
raise ValueError("Duration cannot be negative")
if self.chunks_injected < 0:
raise ValueError("Chunks injected cannot be negative")
if self.bytes_injected < 0:
raise ValueError("Bytes injected cannot be negative")
if not self.success and not self.error_message:
raise ValueError("Failed injection must have error message")
if self.failed_at_chunk is not None and self.failed_at_chunk < 0:
raise ValueError("Failed chunk index cannot be negative")
def get_injection_rate_bps(self) -> float:
"""
Calculate injection rate in bytes per second.
Returns:
float: Injection rate in bytes per second
"""
if not self.total_duration_ms or self.total_duration_ms <= 0:
return 0.0
duration_seconds = self.total_duration_ms / 1000.0
return self.bytes_injected / duration_seconds
def get_completion_percentage(self) -> float:
"""
Calculate injection completion percentage.
Returns:
float: Completion percentage (0-100)
"""
total_chunks = len(self.request.get_text_chunks())
if total_chunks == 0:
return 100.0
return (self.chunks_injected / total_chunks) * 100.0
# ============================================================================
# ITERM2 CONNECTION - Connection Management and Pooling
# ============================================================================
@dataclass(frozen=True)
class ITermConnection:
"""
Immutable iTerm2 connection information with health tracking.
Contracts:
Invariants:
- Connection state accurately reflects iTerm2 API status
- Performance metrics are current and validated
- Health information provides actionable insights
"""
# Connection identification
connection_id: str
host: str = "localhost"
port: Optional[int] = None
# Connection state
state: ConnectionState = ConnectionState.DISCONNECTED
connected_at: Optional[datetime] = None
last_activity: datetime = field(default_factory=datetime.now)
# Performance metrics
latency_ms: Optional[float] = None
success_rate: Optional[float] = None
total_operations: int = 0
failed_operations: int = 0
# Health and monitoring
health_check_interval: int = 30
last_health_check: datetime = field(default_factory=datetime.now)
consecutive_failures: int = 0
max_consecutive_failures: int = 3
def __post_init__(self):
"""Validate iTerm2 connection information."""
if not self.connection_id or not self.connection_id.strip():
raise ValueError("Connection ID cannot be empty")
if self.port is not None and not (1 <= self.port <= 65535):
raise ValueError(f"Port {self.port} must be in range [1, 65535]")
if self.latency_ms is not None and self.latency_ms < 0:
raise ValueError("Latency cannot be negative")
if self.success_rate is not None and not (0 <= self.success_rate <= 100):
raise ValueError("Success rate must be in range [0, 100]")
if any(
count < 0
for count in [
self.total_operations,
self.failed_operations,
self.consecutive_failures,
]
):
raise ValueError("Operation counts cannot be negative")
if self.failed_operations > self.total_operations:
raise ValueError("Failed operations cannot exceed total operations")
if self.health_check_interval <= 0 or self.health_check_interval > 300:
raise ValueError("Health check interval must be in range (0, 300]")
if self.max_consecutive_failures <= 0 or self.max_consecutive_failures > 100:
raise ValueError("Max consecutive failures must be in range (0, 100]")
def is_connected(self) -> bool:
"""Check if connection is active."""
return self.state in [ConnectionState.CONNECTED, ConnectionState.AUTHENTICATED]
def is_healthy(self) -> bool:
"""Check if connection is healthy."""
return (
self.is_connected()
and self.consecutive_failures < self.max_consecutive_failures
and (self.success_rate is None or self.success_rate > 80.0)
)
def needs_health_check(self) -> bool:
"""Check if connection needs health check."""
check_age = datetime.now() - self.last_health_check
return check_age.total_seconds() > self.health_check_interval
def get_uptime(self) -> Optional[timedelta]:
"""Get connection uptime if connected."""
if not self.connected_at:
return None
return datetime.now() - self.connected_at
def get_idle_time(self) -> timedelta:
"""Get time since last activity."""
return datetime.now() - self.last_activity
def with_operation_result(
self, success: bool, latency_ms: float = None
) -> "ITermConnection":
"""
Create new connection with updated operation statistics.
Args:
success: Whether operation succeeded
latency_ms: Optional operation latency
Returns:
ITermConnection: New connection with updated statistics
"""
new_total = self.total_operations + 1
new_failed = self.failed_operations + (0 if success else 1)
new_consecutive_failures = 0 if success else self.consecutive_failures + 1
# Calculate new success rate
new_success_rate = (
((new_total - new_failed) / new_total) * 100 if new_total > 0 else 100.0
)
return ITermConnection(
connection_id=self.connection_id,
host=self.host,
port=self.port,
state=self.state,
connected_at=self.connected_at,
last_activity=datetime.now(),
latency_ms=latency_ms or self.latency_ms,
success_rate=new_success_rate,
total_operations=new_total,
failed_operations=new_failed,
health_check_interval=self.health_check_interval,
last_health_check=self.last_health_check,
consecutive_failures=new_consecutive_failures,
max_consecutive_failures=self.max_consecutive_failures,
)
# ============================================================================
# ASYNC PROTOCOLS - iTerm2 Async Operation Interfaces
# ============================================================================
class ITermAsyncProtocol:
"""
Async protocol interface for iTerm2 operations.
Architecture:
- Pattern: Async Protocol with comprehensive error handling
- Security: Secure iTerm2 API access with validation
- Performance: Efficient async operations with connection pooling
- Integration: Foundation for all iTerm2 agent operations
Contracts:
Invariants:
- All operations are async and non-blocking
- Connection pooling optimizes resource usage
- Error handling provides comprehensive recovery
- Timeout handling prevents hanging operations
"""
async def create_tab(
self,
session_id: SessionId,
tab_title: str,
working_directory: Path,
profile_name: str = "Default",
) -> TabInfo:
"""
Create new iTerm2 tab asynchronously.
Args:
session_id: Session context for tab creation
tab_title: Title for the new tab
working_directory: Working directory for tab
profile_name: iTerm2 profile to use
Returns:
TabInfo: Information about created tab
Raises:
ITermTabError: If tab creation fails
"""
raise NotImplementedError("Subclasses must implement create_tab")
async def close_tab(self, tab_id: TabId) -> bool:
"""
Close iTerm2 tab asynchronously.
Args:
tab_id: Tab identifier to close
Returns:
bool: True if tab closed successfully
Raises:
ITermTabError: If tab closure fails
"""
raise NotImplementedError("Subclasses must implement close_tab")
async def inject_text(self, request: TextInjectionRequest) -> TextInjectionResult:
"""
Inject text into iTerm2 tab asynchronously.
Args:
request: Text injection request with parameters
Returns:
TextInjectionResult: Result of injection operation
Raises:
ITermTextInjectionError: If text injection fails
"""
raise NotImplementedError("Subclasses must implement inject_text")
async def capture_output(self, tab_id: TabId, timeout_seconds: int = 10) -> str:
"""
Capture output from iTerm2 tab asynchronously.
Args:
tab_id: Tab identifier to capture from
timeout_seconds: Maximum time to wait for output
Returns:
str: Captured output text
Raises:
ITermTabError: If output capture fails
"""
raise NotImplementedError("Subclasses must implement capture_output")
async def get_tab_info(self, tab_id: TabId) -> TabInfo:
"""
Get current tab information asynchronously.
Args:
tab_id: Tab identifier to query
Returns:
TabInfo: Current tab information
Raises:
ITermTabError: If tab query fails
"""
raise NotImplementedError("Subclasses must implement get_tab_info")
async def health_check_tab(self, tab_id: TabId) -> bool:
"""
Perform health check on iTerm2 tab asynchronously.
Args:
tab_id: Tab identifier to check
Returns:
bool: True if tab is healthy
Raises:
ITermTabError: If health check fails
"""
raise NotImplementedError("Subclasses must implement health_check_tab")
async def list_tabs(self, session_id: Optional[SessionId] = None) -> List[TabInfo]:
"""
List all tabs, optionally filtered by session.
Args:
session_id: Optional session filter
Returns:
List[TabInfo]: List of tab information
Raises:
ITermError: If tab listing fails
"""
raise NotImplementedError("Subclasses must implement list_tabs")
# ============================================================================
# MOCK ITERM PROTOCOL - Testing and Development Support
# ============================================================================
class MockITermProtocol(ITermAsyncProtocol):
"""
Mock iTerm2 protocol for testing and development.
This implementation provides a complete mock of iTerm2 functionality
for testing and development environments where iTerm2 is not available.
"""
def __init__(self):
"""Initialize mock protocol with tracking."""
self.tabs: Dict[TabId, TabInfo] = {}
self.next_tab_index = 0
self.connection = ITermConnection(
connection_id="mock_connection",
state=ConnectionState.CONNECTED,
connected_at=datetime.now(),
)
async def create_tab(
self,
session_id: SessionId,
tab_title: str,
working_directory: Path,
profile_name: str = "Default",
) -> TabInfo:
"""Create mock tab with simulated creation time."""
await asyncio.sleep(0.1) # Simulate creation delay
from .ids import TabId
tab_id = TabId(f"mock_tab_{self.next_tab_index}")
self.next_tab_index += 1
tab_info = TabInfo(
tab_id=tab_id,
tab_title=tab_title,
tab_index=len(self.tabs),
window_id="mock_window",
state=TabState.ACTIVE,
is_active=True,
session_id=session_id,
working_directory=working_directory,
profile_name=profile_name,
)
self.tabs[tab_id] = tab_info
return tab_info
async def close_tab(self, tab_id: TabId) -> bool:
"""Close mock tab with cleanup."""
await asyncio.sleep(0.05) # Simulate close delay
if tab_id in self.tabs:
# Update tab state to closed
tab_info = self.tabs[tab_id].with_state_update(TabState.CLOSED)
self.tabs[tab_id] = tab_info
return True
return False
async def inject_text(self, request: TextInjectionRequest) -> TextInjectionResult:
"""Simulate text injection with timing."""
start_time = datetime.now()
if request.tab_id not in self.tabs:
return TextInjectionResult(
request=request, success=False, error_message="Tab not found"
)
# Simulate injection with chunks
chunks = request.get_text_chunks()
for i, chunk in enumerate(chunks):
await asyncio.sleep(request.injection_delay_ms / 1000.0)
duration_ms = int((datetime.now() - start_time).total_seconds() * 1000)
return TextInjectionResult(
request=request,
success=True,
total_duration_ms=duration_ms,
chunks_injected=len(chunks),
bytes_injected=len(request.get_sanitized_text().encode(request.encoding)),
)
async def capture_output(self, tab_id: TabId, timeout_seconds: int = 10) -> str:
"""Simulate output capture."""
await asyncio.sleep(0.1) # Simulate capture delay
if tab_id not in self.tabs:
raise ITermTabError(
f"Tab {tab_id} not found", ITermOperation.CAPTURE_OUTPUT
)
# Return mock output
return f"Mock output from tab {tab_id} at {datetime.now().isoformat()}"
async def get_tab_info(self, tab_id: TabId) -> TabInfo:
"""Get mock tab information."""
await asyncio.sleep(0.01) # Simulate query delay
if tab_id not in self.tabs:
raise ITermTabError(f"Tab {tab_id} not found", ITermOperation.GET_STATUS)
return self.tabs[tab_id]
async def health_check_tab(self, tab_id: TabId) -> bool:
"""Perform mock health check."""
await asyncio.sleep(0.05) # Simulate health check delay
if tab_id not in self.tabs:
return False
tab_info = self.tabs[tab_id]
return tab_info.state in [TabState.ACTIVE, TabState.INACTIVE]
async def list_tabs(self, session_id: Optional[SessionId] = None) -> List[TabInfo]:
"""List mock tabs with optional session filter."""
await asyncio.sleep(0.02) # Simulate list delay
tabs = list(self.tabs.values())
if session_id:
tabs = [tab for tab in tabs if tab.session_id == session_id]
return tabs
# ============================================================================
# ITERM UTILITY FUNCTIONS - Helper Functions and Validators
# ============================================================================
def validate_tab_title(title: str) -> None:
"""
Validate tab title for iTerm2 compatibility.
Args:
title: Tab title to validate
Raises:
ValueError: If title is invalid
"""
if not title or not title.strip():
raise ValueError("Tab title cannot be empty")
if len(title) > 100:
raise ValueError("Tab title too long (max 100 characters)")
# Check for problematic characters
problematic_chars = ["\n", "\r", "\t", "\0"]
for char in problematic_chars:
if char in title:
raise ValueError(f"Tab title contains invalid character: {repr(char)}")
def validate_working_directory(directory: Path, session_root: Path) -> None:
"""
Validate working directory for security and accessibility.
Args:
directory: Working directory to validate
session_root: Session root for boundary checking
Raises:
ValueError: If directory is invalid or outside boundaries
"""
if not directory.is_absolute():
raise ValueError(f"Working directory must be absolute: {directory}")
try:
# Check if directory is within session boundaries
directory.resolve().relative_to(session_root.resolve())
except ValueError:
raise ValueError(f"Working directory outside session boundaries: {directory}")
# Check accessibility (this is a basic check)
if not directory.exists():
raise ValueError(f"Working directory does not exist: {directory}")
def create_tab_title(agent_name: str, session_name: str) -> str:
"""
Create formatted tab title for agent.
Args:
agent_name: Agent name (e.g., "Agent_1")
session_name: Session name
Returns:
str: Formatted tab title
"""
title = f"{agent_name} - {session_name}"
# Truncate if too long
if len(title) > 80:
title = f"{agent_name} - {session_name[:70-len(agent_name)]}..."
validate_tab_title(title)
return title
async def wait_for_tab_ready(
protocol: ITermAsyncProtocol, tab_id: TabId, timeout_seconds: int = 30
) -> bool:
"""
Wait for tab to become ready for operations.
Args:
protocol: iTerm2 async protocol
tab_id: Tab to wait for
timeout_seconds: Maximum wait time
Returns:
bool: True if tab became ready, False if timeout
"""
start_time = datetime.now()
while (datetime.now() - start_time).total_seconds() < timeout_seconds:
try:
tab_info = await protocol.get_tab_info(tab_id)
if tab_info.state == TabState.ACTIVE and not tab_info.is_busy:
return True
except ITermTabError:
pass # Tab might not be ready yet
await asyncio.sleep(1.0) # Check every second
return False
def estimate_injection_time(request: TextInjectionRequest) -> float:
"""
Estimate time required for text injection.
Args:
request: Text injection request
Returns:
float: Estimated injection time in seconds
"""
chunks = request.get_text_chunks()
base_time = len(chunks) * (request.injection_delay_ms / 1000.0)
# Add overhead for processing
overhead_per_chunk = 0.01 # 10ms overhead per chunk
total_overhead = len(chunks) * overhead_per_chunk
return base_time + total_overhead