"""
Enhanced iTerm2 Interactor - Event-Driven Architecture (No iTerm2 Alerts)
Architecture Integration:
- Event-Driven: Replaces iterm2.Alert with structured event emission
- Pull-Based Status: Events consumed by MCP client via get_system_status tool
- Agent Management: Maintains agent pause capabilities without disruptive alerts
Technical Decisions:
- No iterm2.Alert usage: Professional event-driven notification system
- Structured event logging: All agent state changes recorded as events
- Session association: Events linked to specific iTerm2 sessions
- Agent variable tracking: Status stored in iTerm2 session variables
Dependencies & Integration:
- event_logger: Centralized event emission for all agent state changes
- command_analyzer: Enhanced command analysis with event integration
- decision_queue: Quarantine workflow with event-driven status updates
"""
import asyncio
import iterm2
import subprocess
import traceback
import re
import uuid
from typing import Optional, Dict, Any
# Import centralized event logging system
from event_logger import event_logger, EventType, log_agent_event, log_rpc_event
# Import command analyzer (will be imported when available)
try:
from command_analyzer import CommandAnalyzer
except ImportError:
print("WARNING: command_analyzer not available, using basic analyzer")
# Basic fallback analyzer
class CommandAnalyzer:
def analyze_command(self, command: str):
# Simple fallback - allow safe commands, block dangerous ones
dangerous_patterns = [r'\brm\s+-rf\b', r'\bmkfs\b', r'dd\s+if=/dev/zero']
for pattern in dangerous_patterns:
if re.search(pattern, command):
return "DANGEROUS", pattern
return "SAFE", None
# Import decision queue (will be imported when available)
try:
from decision_queue import decision_queue, ResolutionAction
except ImportError:
print("WARNING: decision_queue not available, using simple decision handling")
# Simple fallback decision handling
class ResolutionAction:
APPROVE = "APPROVE"
DENY = "DENY"
REINSTRUCT = "REINSTRUCT"
class decision_queue:
@staticmethod
def queue_decision(command, triggered_rule, session_id, safety_level):
return str(uuid.uuid4())
# Prompt detection regex
PROMPT_REGEX = re.compile(r"^\s*│\s*\w+:\s*`(.*)`")
class AgentStatusManager:
"""
Agent status management with event-driven status tracking.
Features:
- iTerm2 session variable integration for persistent status
- Event emission for all status changes
- Standardized status enumeration with clear semantics
- Session association for multi-agent environments
"""
@staticmethod
async def set_agent_status(session: iterm2.Session, status: str, emit_event: bool = True) -> None:
"""Set agent status with optional event emission."""
try:
await session.async_set_variable("user.agentStatus", status)
if emit_event:
agent_name = await session.async_get_variable("user.agentName") or "Unnamed Agent"
log_agent_event(
EventType.AGENT_STATUS_CHANGE,
{
"agent_name": agent_name,
"session_id": session.session_id,
"new_status": status,
"timestamp": None # Will be auto-generated by event logger
}
)
except Exception as e:
print(f"STATUS_MANAGER_ERROR: Failed to set agent status: {e}")
@staticmethod
async def get_agent_status(session: iterm2.Session) -> str:
"""Get current agent status from session variables."""
try:
return await session.async_get_variable("user.agentStatus") or "Unknown"
except Exception as e:
print(f"STATUS_MANAGER_ERROR: Failed to get agent status: {e}")
return "Error"
@staticmethod
async def set_agent_name(session: iterm2.Session, name: str) -> None:
"""Set agent name with event emission."""
try:
await session.async_set_variable("user.agentName", name)
log_agent_event(
EventType.AGENT_NAMED,
{
"agent_name": name,
"session_id": session.session_id,
"assigned_timestamp": None # Will be auto-generated by event logger
}
)
except Exception as e:
print(f"STATUS_MANAGER_ERROR: Failed to set agent name: {e}")
class KeystrokeExecutor:
"""Reliable keystroke execution using AppleScript for agent control."""
@staticmethod
def execute_action(action: str, text: str = "") -> bool:
"""Execute keystroke action with event logging."""
try:
script = ""
if action == 'deny':
script = '''
tell application "System Events"
key code 125
delay 0.1
key code 125
delay 0.1
key code 36
end tell
'''
elif action == 'approve':
script = '''
tell application "System Events"
key code 36
end tell
'''
elif action == 'escape':
script = '''
tell application "System Events"
key code 53
end tell
'''
elif action == 'instruct' and text:
escaped_text = text.replace('"', '\\"').replace('\\\\', '\\\\\\\\')
script = f'''
tell application "System Events"
keystroke "{escaped_text}"
delay 0.1
key code 36
end tell
'''
if script:
full_script = f'tell application "iTerm2" to activate\n{script}'
result = subprocess.run(
['osascript', '-e', full_script],
capture_output=True,
text=True,
timeout=5
)
success = result.returncode == 0
# Log keystroke execution event
log_rpc_event(
success,
"keystroke_execution",
{
"action": action,
"has_text": bool(text),
"error": result.stderr if not success else None
}
)
return success
return False
except Exception as e:
log_rpc_event(
False,
"keystroke_execution",
{
"action": action,
"error": str(e)
}
)
return False
class EventDrivenInteractor:
"""
Event-driven iTerm2 interactor without alert dependencies.
Features:
- Complete event-driven architecture with no iTerm2 alerts
- Structured event emission for all agent state changes
- Decision queue integration with event status tracking
- RPC interface for MCP client communication
- Agent pause management with event notifications
"""
def __init__(self):
self.analyzer = CommandAnalyzer()
self.keystroke_executor = KeystrokeExecutor()
self.status_manager = AgentStatusManager()
self.last_command_handled = {} # Track per session
self.monitoring_enabled = False
# Log system startup
log_agent_event(
EventType.SYSTEM_STARTUP,
{
"component": "event_driven_interactor",
"monitoring_ready": True
}
)
async def setup_rpc_interface(self, connection):
"""Setup comprehensive RPC interface for MCP client communication."""
@iterm2.RPC
async def resolve_claude_command(session_id: str, action: str, instruction_text: str = ""):
"""RPC endpoint for resolving queued decisions with event tracking."""
try:
log_rpc_event(
True,
"resolve_decision_request",
{
"session_id": session_id,
"action": action,
"has_instruction": bool(instruction_text)
}
)
# Execute keystroke action
if action == "APPROVE":
success = self.keystroke_executor.execute_action("approve")
elif action == "DENY":
success = self.keystroke_executor.execute_action("deny")
elif action == "REINSTRUCT":
success = self.keystroke_executor.execute_action("instruct", instruction_text)
else:
log_rpc_event(
False,
"resolve_decision_invalid_action",
{"action": action, "session_id": session_id}
)
return {"success": False, "error": f"Invalid action: {action}"}
# Log resolution event
log_agent_event(
EventType.AGENT_RESOLVED,
{
"session_id": session_id,
"resolution_action": action,
"instruction_provided": bool(instruction_text),
"keystroke_success": success
}
)
return {"success": success, "action": action}
except Exception as e:
log_rpc_event(
False,
"resolve_decision_error",
{
"session_id": session_id,
"error": str(e)
}
)
return {"success": False, "error": str(e)}
@iterm2.RPC
async def toggle_claude_monitor(enable: bool):
"""RPC endpoint for monitoring control with event tracking."""
self.monitoring_enabled = enable
log_agent_event(
EventType.AGENT_STATUS_CHANGE,
{
"operation": "monitoring_toggle",
"monitoring_enabled": enable,
"component": "event_driven_interactor"
}
)
return {"success": True, "monitoring_enabled": self.monitoring_enabled}
@iterm2.RPC
async def get_monitor_status():
"""RPC endpoint for current monitoring status."""
return {
"monitoring_enabled": self.monitoring_enabled,
"active_sessions": len(self.last_command_handled),
"last_commands": dict(self.last_command_handled)
}
@iterm2.RPC
async def name_agent(session_id: str, agent_name: str):
"""RPC endpoint for agent naming with event tracking."""
try:
# Find session by ID
app = await iterm2.async_get_app(connection)
target_session = None
for window in app.windows:
for tab in window.tabs:
if tab.current_session.session_id == session_id:
target_session = tab.current_session
break
if target_session:
break
if not target_session:
return {"success": False, "error": "Session not found"}
await self.status_manager.set_agent_name(target_session, agent_name)
await self.status_manager.set_agent_status(target_session, "Named")
return {"success": True, "agent_name": agent_name, "session_id": session_id}
except Exception as e:
log_rpc_event(
False,
"name_agent_error",
{
"session_id": session_id,
"error": str(e)
}
)
return {"success": False, "error": str(e)}
# Register all RPC endpoints
await resolve_claude_command.async_register(connection)
await toggle_claude_monitor.async_register(connection)
await get_monitor_status.async_register(connection)
await name_agent.async_register(connection)
log_agent_event(
EventType.SYSTEM_STARTUP,
{
"component": "rpc_interface",
"endpoints_registered": 4,
"status": "ready"
}
)
async def monitor_session(self, session: iterm2.Session):
"""Monitor session with comprehensive event-driven status tracking."""
session_id = session.session_id
log_agent_event(
EventType.SYSTEM_STARTUP,
{
"operation": "session_monitoring_start",
"session_id": session_id,
"monitoring_enabled": self.monitoring_enabled
}
)
try:
async with session.get_screen_streamer() as streamer:
while self.monitoring_enabled:
try:
update = await streamer.async_get()
screen_contents = update.contents
# Find command prompt
found_command = None
for line in screen_contents.split('\n'):
match = PROMPT_REGEX.search(line.strip())
if match:
found_command = match.group(1).strip()
break
# Process new command
if found_command and found_command != self.last_command_handled.get(session_id):
await self._handle_command_with_events(session, found_command)
self.last_command_handled[session_id] = found_command
await asyncio.sleep(0.5)
except Exception as e:
log_agent_event(
EventType.AGENT_ERROR,
{
"session_id": session_id,
"error_type": "screen_monitoring",
"error_message": str(e),
"severity": "WARNING"
},
"ERROR"
)
await asyncio.sleep(1)
except Exception as e:
log_agent_event(
EventType.AGENT_ERROR,
{
"session_id": session_id,
"error_type": "session_monitoring_critical",
"error_message": str(e),
"traceback": traceback.format_exc(),
"severity": "CRITICAL"
},
"CRITICAL"
)
async def _handle_command_with_events(self, session: iterm2.Session, command: str):
"""Handle command decision with comprehensive event emission."""
try:
agent_name = await session.async_get_variable("user.agentName") or "Unnamed Agent"
# Analyze command
decision, triggered_rule = self.analyzer.analyze_command(command)
# Log command analysis event
log_agent_event(
EventType.AUDIT_EVENT,
{
"agent_name": agent_name,
"session_id": session.session_id,
"command": command,
"analysis_decision": decision,
"triggered_rule": triggered_rule,
"analysis_timestamp": None # Auto-generated by event logger
}
)
if decision == "SAFE":
self.keystroke_executor.execute_action("approve")
await self.status_manager.set_agent_status(session, "Executing")
elif decision == "DANGEROUS":
self.keystroke_executor.execute_action("deny")
await self.status_manager.set_agent_status(session, "Blocked")
elif decision == "QUARANTINE":
await self._handle_quarantine_with_events(session, command, triggered_rule, agent_name)
except Exception as e:
log_agent_event(
EventType.AGENT_ERROR,
{
"session_id": session.session_id,
"error_type": "command_handling",
"command": command,
"error_message": str(e),
"traceback": traceback.format_exc()
},
"ERROR"
)
# Fail safe
self.keystroke_executor.execute_action("deny")
async def _handle_quarantine_with_events(
self,
session: iterm2.Session,
command: str,
triggered_rule: str,
agent_name: str
):
"""Handle quarantine with comprehensive event-driven workflow."""
try:
# Get active safety level (fallback to Standard if not available)
try:
active_level, _ = self.analyzer.get_active_ruleset()
except:
active_level = "Standard"
# Queue decision for human review
decision_id = decision_queue.queue_decision(
command=command,
triggered_rule=triggered_rule,
session_id=session.session_id,
safety_level=active_level
)
# Set agent status to needs review
await self.status_manager.set_agent_status(session, "NEEDS_REVIEW")
# Press Escape to pause agent
success = self.keystroke_executor.execute_action("escape")
# Emit comprehensive quarantine event
log_agent_event(
EventType.AGENT_QUEUED,
{
"decision_id": decision_id,
"agent_name": agent_name,
"session_id": session.session_id,
"command": command,
"triggered_rule": triggered_rule,
"safety_level": active_level,
"agent_paused": success,
"queue_timestamp": None # Auto-generated by event logger
}
)
except Exception as e:
log_agent_event(
EventType.AGENT_ERROR,
{
"session_id": session.session_id,
"error_type": "quarantine_handling",
"command": command,
"error_message": str(e)
},
"ERROR"
)
# Fail safe
self.keystroke_executor.execute_action("deny")
async def main(connection):
"""Main event-driven iTerm2 interactor without alert dependencies."""
print("=== Claude Safety Supervisor - Event-Driven Architecture ===")
print("🔗 No iTerm2 alerts - Event-driven status for Claude Desktop")
# Initialize event-driven interactor
interactor = EventDrivenInteractor()
# Setup RPC interface
await interactor.setup_rpc_interface(connection)
# Get iTerm2 application
app = await iterm2.async_get_app(connection)
# Start monitoring all sessions
monitoring_tasks = []
for window in app.windows:
for tab in window.tabs:
session = tab.current_session
if session:
task = asyncio.create_task(interactor.monitor_session(session))
monitoring_tasks.append(task)
# Enable monitoring
interactor.monitoring_enabled = True
log_agent_event(
EventType.SYSTEM_STARTUP,
{
"component": "main_interactor",
"monitoring_sessions": len(monitoring_tasks),
"status": "fully_operational"
}
)
try:
await asyncio.gather(*monitoring_tasks)
except KeyboardInterrupt:
log_agent_event(
EventType.SYSTEM_SHUTDOWN,
{
"component": "main_interactor",
"reason": "keyboard_interrupt"
}
)
interactor.monitoring_enabled = False
for task in monitoring_tasks:
task.cancel()
await asyncio.gather(*monitoring_tasks, return_exceptions=True)
if __name__ == "__main__":
iterm2.run_until_complete(main)