"""Turn-taking state machine to prevent VAD/TTS race conditions."""
import threading
from typing import Optional, Callable
from statemachine import StateMachine, State
class TurnStateMachine(StateMachine):
"""
Governs turn-taking to prevent VAD/TTS race conditions.
Layer 2 of two-layer state architecture:
- Layer 1 (ModeStateMachine): Controls WHAT is enabled (STT, TTS, or both)
- This layer: Controls WHEN audio flows (turn-taking)
State transitions:
IDLE -> LISTENING (start_listening): VAD activated
LISTENING -> PROCESSING (voice_detected): Speech ended, transcribing
PROCESSING -> SPEAKING (start_speaking): LLM response ready
SPEAKING -> IDLE (finish_speaking): TTS complete
SPEAKING -> LISTENING (barge_in): User interrupts
ANY -> IDLE (cancel): Abort current turn
IDLE -> SPEAKING (start_speaking): Direct TTS (no voice input)
"""
# States
idle = State(initial=True)
listening = State()
processing = State()
speaking = State()
# Normal flow transitions
start_listening = idle.to(listening)
voice_detected = listening.to(processing)
start_speaking = processing.to(speaking) | idle.to(speaking)
finish_speaking = speaking.to(idle)
# Interruption handling
barge_in = speaking.to(listening)
# Cancel from any active state
cancel = listening.to(idle) | processing.to(idle) | speaking.to(idle)
def __init__(self):
super().__init__()
self._lock = threading.Lock()
self._is_transitioning = False
# Callbacks for state entry/exit (set by VoiceModeController)
self._on_enter_listening: Optional[Callable] = None
self._on_exit_listening: Optional[Callable] = None
self._on_enter_speaking: Optional[Callable] = None
self._on_exit_speaking: Optional[Callable] = None
def safe_send(self, event_name: str) -> bool:
"""
Thread-safe event dispatch with transition guard.
Prevents concurrent transitions that could cause race conditions.
Returns True if transition succeeded, False if rejected.
"""
with self._lock:
if self._is_transitioning:
return False # Reject concurrent transitions
self._is_transitioning = True
try:
self.send(event_name)
return True
except Exception:
# Invalid transition for current state
return False
finally:
with self._lock:
self._is_transitioning = False
def can_listen(self) -> bool:
"""Check if transitioning to listening is valid."""
return self.current_state.id == "idle"
def can_speak(self) -> bool:
"""Check if transitioning to speaking is valid."""
return self.current_state.id in ["idle", "processing"]
def is_idle(self) -> bool:
"""Check if in idle state."""
return self.current_state.id == "idle"
def is_listening(self) -> bool:
"""Check if actively listening."""
return self.current_state.id == "listening"
def is_processing(self) -> bool:
"""Check if processing (transcribing/thinking)."""
return self.current_state.id == "processing"
def is_speaking(self) -> bool:
"""Check if TTS is active."""
return self.current_state.id == "speaking"
@property
def state_name(self) -> str:
"""Get current state name as string."""
return self.current_state.id
# Callback setters
def set_on_enter_listening(self, callback: Callable) -> None:
"""Set callback for entering listening state."""
self._on_enter_listening = callback
def set_on_exit_listening(self, callback: Callable) -> None:
"""Set callback for exiting listening state."""
self._on_exit_listening = callback
def set_on_enter_speaking(self, callback: Callable) -> None:
"""Set callback for entering speaking state."""
self._on_enter_speaking = callback
def set_on_exit_speaking(self, callback: Callable) -> None:
"""Set callback for exiting speaking state."""
self._on_exit_speaking = callback
# State machine callbacks
def on_enter_listening(self):
"""Called when entering listening state - activate VAD."""
if self._on_enter_listening:
self._on_enter_listening()
def on_exit_listening(self):
"""Called when exiting listening state - deactivate VAD."""
if self._on_exit_listening:
self._on_exit_listening()
def on_enter_speaking(self):
"""Called when entering speaking state - TTS starting."""
if self._on_enter_speaking:
self._on_enter_speaking()
def on_exit_speaking(self):
"""Called when exiting speaking state - TTS finished."""
if self._on_exit_speaking:
self._on_exit_speaking()