Speech MCP

""" Main PyQt UI implementation for the Speech UI. This module provides the main PyQt window for the speech interface. """ import os import sys import time import threading import random import math from PyQt5.QtWidgets import ( QApplication, QMainWindow, QVBoxLayout, QHBoxLayout, QWidget, QLabel, QComboBox ) from PyQt5.QtCore import Qt, QTimer, pyqtSignal # Import centralized constants from speech_mcp.constants import ( STATE_FILE, TRANSCRIPTION_FILE, RESPONSE_FILE, COMMAND_FILE, CMD_LISTEN, CMD_SPEAK, CMD_IDLE, CMD_UI_READY, CMD_UI_CLOSED, ENV_TTS_VOICE ) # Import UI components from speech_mcp.ui.components import ( AudioVisualizer, AnimatedButton, TTSAdapter, AudioProcessorUI ) # Import configuration module for voice preferences from speech_mcp.config import get_env_setting, get_setting, set_setting, set_env_setting class PyQtSpeechUI(QMainWindow): """ Main speech UI window implemented with PyQt. """ # Signal for when components are fully loaded components_ready = pyqtSignal() def __init__(self): super().__init__() self.setWindowTitle("Goose Speech Interface") self.resize(500, 300) # Set initial loading state self.tts_ready = False self.stt_ready = False self.audio_ready = False # Add a watchdog timer to ensure UI responsiveness self.watchdog_timer = QTimer(self) self.watchdog_timer.timeout.connect(self.check_ui_responsiveness) self.watchdog_timer.start(5000) # Check every 5 seconds # Create UI first (will be in loading state) self.setup_ui() # Create a command file to indicate UI is visible (but not fully ready) try: with open(COMMAND_FILE, 'w') as f: f.write("UI_READY") except Exception: pass # Start checking for server commands self.command_check_timer = QTimer(self) self.command_check_timer.timeout.connect(self.check_for_commands) self.command_check_timer.start(100) # Check every 100ms # Start checking for response files self.response_check_timer = QTimer(self) self.response_check_timer.timeout.connect(self.check_for_responses) self.response_check_timer.start(100) # Check every 100ms # Connect the components_ready signal to update UI self.components_ready.connect(self.on_components_ready) # Initialize components in background threads QTimer.singleShot(100, self.initialize_components) def setup_ui(self): """Set up the UI components.""" # Main widget and layout main_widget = QWidget() main_layout = QVBoxLayout(main_widget) main_layout.setContentsMargins(20, 20, 20, 20) main_layout.setSpacing(15) # Create a layout for the visualizer labels label_layout = QHBoxLayout() # User label user_label = QLabel("User") user_label.setAlignment(Qt.AlignCenter) user_label.setStyleSheet(""" font-size: 14px; color: #00c8ff; font-weight: bold; """) label_layout.addWidget(user_label, 1) # Agent label agent_label = QLabel("Agent") agent_label.setAlignment(Qt.AlignCenter) agent_label.setStyleSheet(""" font-size: 14px; color: #00ff64; font-weight: bold; """) label_layout.addWidget(agent_label, 1) # Add the label layout to the main layout main_layout.addLayout(label_layout) # Create a layout for the visualizers visualizer_layout = QHBoxLayout() # User audio visualizer (blue) self.user_visualizer = AudioVisualizer(mode="user", width_factor=1.0) visualizer_layout.addWidget(self.user_visualizer, 1) # Equal ratio # Agent audio visualizer (green) self.agent_visualizer = AudioVisualizer(mode="agent", width_factor=1.0) visualizer_layout.addWidget(self.agent_visualizer, 1) # Equal ratio # Add the visualizer layout to the main layout main_layout.addLayout(visualizer_layout) # Transcription display self.transcription_label = QLabel("Ready for voice interaction") self.transcription_label.setAlignment(Qt.AlignCenter) self.transcription_label.setWordWrap(True) self.transcription_label.setStyleSheet(""" font-size: 14px; color: #ffffff; background-color: #2a2a2a; border-radius: 5px; padding: 10px; """) main_layout.addWidget(self.transcription_label) # Voice selection voice_layout = QHBoxLayout() voice_label = QLabel("Voice:") voice_label.setStyleSheet("color: #ffffff;") self.voice_combo = QComboBox() self.voice_combo.setStyleSheet(""" background-color: #2a2a2a; color: #ffffff; border: 1px solid #3a3a3a; border-radius: 3px; padding: 5px; """) # Add loading placeholder self.voice_combo.addItem("Loading voices...") self.voice_combo.setEnabled(False) self.voice_combo.currentIndexChanged.connect(self.on_voice_changed) voice_layout.addWidget(voice_label) voice_layout.addWidget(self.voice_combo, 1) # 1 = stretch factor main_layout.addLayout(voice_layout) # Control buttons button_layout = QHBoxLayout() # Add Select Voice button self.select_voice_button = AnimatedButton("Save Voice") self.select_voice_button.clicked.connect(self.save_selected_voice) self.select_voice_button.setEnabled(True) self.select_voice_button.setMinimumWidth(120) self.select_voice_button.set_style(""" background-color: #9b59b6; color: white; border: none; border-radius: 5px; padding: 8px 16px; font-weight: bold; """) # Use AnimatedButton for Test Voice button self.speak_button = AnimatedButton("Test Voice") self.speak_button.clicked.connect(self.test_voice) self.speak_button.setEnabled(True) self.speak_button.setMinimumWidth(120) self.speak_button.set_style(""" background-color: #27ae60; color: white; border: none; border-radius: 5px; padding: 8px 16px; font-weight: bold; """) # Use AnimatedButton for Close button self.close_button = AnimatedButton("Close") self.close_button.clicked.connect(self.close) self.close_button.setMinimumWidth(120) self.close_button.set_style(""" background-color: #e74c3c; color: white; border: none; border-radius: 5px; padding: 8px 16px; font-weight: bold; """) # Add buttons to layout with equal spacing button_layout.addStretch(1) button_layout.addWidget(self.select_voice_button) button_layout.addSpacing(10) button_layout.addWidget(self.speak_button) button_layout.addSpacing(10) button_layout.addWidget(self.close_button) button_layout.addStretch(1) main_layout.addLayout(button_layout) # Set the main widget self.setCentralWidget(main_widget) # Apply dark theme self.setStyleSheet(""" QMainWindow, QWidget { background-color: #121212; color: #ffffff; } QLabel { color: #ffffff; } """) # Initialize visualizers to inactive state self.set_user_visualizer_active(False) self.set_agent_visualizer_active(False) def set_user_visualizer_active(self, active): """Set the user visualizer as active or inactive.""" self.user_visualizer.set_active(active) def set_agent_visualizer_active(self, active): """Set the agent visualizer as active or inactive.""" self.agent_visualizer.set_active(active) def update_voice_list(self): """Update the voice selection combo box""" # Skip if TTS adapter is not ready yet if not hasattr(self, 'tts_adapter') or not self.tts_adapter: return self.voice_combo.clear() voices = self.tts_adapter.get_available_voices() current_voice = self.tts_adapter.get_current_voice() if not voices: self.voice_combo.addItem("No voices available") self.voice_combo.setEnabled(False) return # Add all available voices selected_index = 0 for i, voice in enumerate(voices): # Format the voice name for display if voice.startswith("pyttsx3:"): # For pyttsx3 voices, try to get a more readable name voice_id = voice.split(":", 1)[1] if hasattr(self.tts_adapter.tts_engine, 'getProperty'): for v in self.tts_adapter.tts_engine.getProperty('voices'): if v.id == voice_id: display_name = f"{v.name} (pyttsx3)" self.voice_combo.addItem(display_name, voice) break else: self.voice_combo.addItem(voice, voice) else: self.voice_combo.addItem(voice, voice) else: # For Kokoro voices, use the voice name directly self.voice_combo.addItem(voice, voice) # Select the current voice if voice == current_voice: selected_index = i # Enable the combo box now that it has real data self.voice_combo.setEnabled(True) # Set the current selection self.voice_combo.setCurrentIndex(selected_index) def initialize_components(self): """Initialize components in background threads""" # Start background threads for initialization threading.Thread(target=self.initialize_audio_processor, daemon=True).start() threading.Thread(target=self.initialize_tts_adapter, daemon=True).start() def initialize_audio_processor(self): """Initialize audio processor in background thread""" try: self.audio_processor = AudioProcessorUI() self.audio_processor.audio_level_updated.connect(self.update_audio_level) self.audio_processor.transcription_ready.connect(self.handle_transcription) self.audio_ready = True self.check_all_components_ready() except Exception: pass def initialize_tts_adapter(self): """Initialize TTS adapter in background thread""" try: self.tts_adapter = TTSAdapter() self.tts_adapter.speaking_started.connect(self.on_speaking_started) self.tts_adapter.speaking_finished.connect(self.on_speaking_finished) # Connect audio level signal to agent visualizer self.tts_adapter.audio_level.connect(self.update_agent_audio_level) # Create audio level timer if it doesn't exist yet if not hasattr(self.tts_adapter, 'audio_level_timer'): self.tts_adapter.audio_level_timer = QTimer() self.tts_adapter.audio_level_timer.timeout.connect(self.tts_adapter.emit_audio_level) self.tts_ready = True # Update voice list when TTS is ready - use QTimer to call from main thread QTimer.singleShot(0, self.update_voice_list) self.check_all_components_ready() except Exception: pass def check_all_components_ready(self): """Check if all components are ready and emit signal if they are""" if self.audio_ready and self.tts_ready: # Use QTimer to safely emit signal from background thread QTimer.singleShot(0, lambda: self.components_ready.emit()) def on_components_ready(self): """Called when all components are ready""" # Clear initialization message from transcription label self.transcription_label.setText("Ready for voice interaction") # Check for any pending commands if os.path.exists(COMMAND_FILE): try: with open(COMMAND_FILE, 'r') as f: command = f.read().strip() if command == "LISTEN" and self.has_saved_voice_preference(): # Start listening since we have a saved voice preference self.start_listening() except Exception: pass # If no voice preference is saved, show guidance message if not self.has_saved_voice_preference(): self.transcription_label.setText("Please select a voice from the dropdown and click 'Save Voice' to continue") # Wait a moment before speaking to ensure UI is fully ready QTimer.singleShot(500, self.play_guidance_message) def has_saved_voice_preference(self): """Check if a voice preference has been saved""" try: # First check environment variable env_voice = get_env_setting(ENV_TTS_VOICE) if env_voice: return True # Then check config file config_voice = get_setting("tts", "voice", None) if config_voice: return True return False except ImportError: return False except Exception: return False def save_voice_preference(self, voice): """Save the selected voice preference to config""" try: # Save to config file result = set_setting("tts", "voice", voice) # Also set environment variable for current session set_env_setting(ENV_TTS_VOICE, voice) return result except ImportError: return False except Exception: return False def save_selected_voice(self): """Save the selected voice and switch to listen mode""" # Get the currently selected voice index = self.voice_combo.currentIndex() if index < 0: self.transcription_label.setText("Please select a voice from the dropdown") return voice = self.voice_combo.itemData(index) if not voice: self.transcription_label.setText("Please select a valid voice from the dropdown") return # Save the voice preference if self.save_voice_preference(voice): self.transcription_label.setText(f"Voice '{voice}' saved as your preference") # Create a UI_READY command file to signal back to the server try: with open(COMMAND_FILE, 'w') as f: f.write(CMD_UI_READY) except Exception: pass # Test the voice to confirm QTimer.singleShot(1000, lambda: self.tts_adapter.speak("Voice preference saved. You can now start listening.")) else: self.transcription_label.setText("Failed to save voice preference. Please try again.") def play_guidance_message(self): """Play a guidance message for first-time users""" if hasattr(self, 'tts_adapter') and self.tts_adapter: # Add a highlight effect to the Select Voice button original_style = self.select_voice_button.styleSheet() highlight_style = """ background-color: #e74c3c; color: white; border: 2px solid #f39c12; border-radius: 5px; padding: 8px 16px; font-weight: bold; """ self.select_voice_button.setStyleSheet(highlight_style) # Speak the guidance message self.tts_adapter.speak("Please select a voice from the dropdown menu and click Save Voice to continue.") # Restore the original style after a delay QTimer.singleShot(3000, lambda: self.select_voice_button.setStyleSheet(original_style)) def on_voice_changed(self, index): """Handle voice selection change""" # Skip if TTS adapter is not ready yet if not hasattr(self, 'tts_adapter') or not self.tts_adapter: return if index < 0: return voice = self.voice_combo.itemData(index) if not voice: return self.tts_adapter.set_voice(voice) def test_voice(self): """Test the selected voice""" # Skip if TTS adapter is not ready yet if not hasattr(self, 'tts_adapter') or not self.tts_adapter: self.transcription_label.setText("TTS not ready yet. Please wait...") return if self.tts_adapter.is_speaking: return # Update the transcription label to show we're testing the voice self.transcription_label.setText("Testing voice...") # Create a dedicated animation timer if it doesn't exist if not hasattr(self, 'agent_animation_timer'): self.agent_animation_timer = QTimer(self) self.agent_animation_timer.timeout.connect(self.animate_agent_visualizer) # Start the timer if it's not already running if not self.agent_animation_timer.isActive(): self.agent_animation_timer.start(50) # Update every 50ms # Activate agent visualizer self.set_agent_visualizer_active(True) self.set_user_visualizer_active(False) # Speak a test message try: result = self.tts_adapter.speak("This is a test of the selected voice. Hello, I am Goose!") if not result: self.transcription_label.setText("Error: Failed to test voice") QTimer.singleShot(2000, lambda: self.transcription_label.setText("Select a voice and click 'Test Voice' to hear it")) # Don't deactivate the visualizer here - it will be handled by on_speaking_finished except Exception as e: self.transcription_label.setText(f"Error: {str(e)}") QTimer.singleShot(3000, lambda: self.transcription_label.setText("Select a voice and click 'Test Voice' to hear it")) # Deactivate agent visualizer on error self.set_agent_visualizer_active(False) def update_audio_level(self, level): """Update the user audio level visualization.""" self.user_visualizer.update_level(level) def update_agent_audio_level(self, level): """Update the agent audio level visualization.""" self.agent_visualizer.update_level(level) def handle_transcription(self, text): """Handle new transcription text.""" self.transcription_label.setText(f"You: {text}") def start_listening(self): """Start listening mode.""" # Skip if audio processor is not ready yet if not hasattr(self, 'audio_processor') or not self.audio_processor: self.transcription_label.setText("Speech recognition not ready yet") return self.audio_processor.start_listening() # Activate user visualizer, deactivate agent visualizer self.set_user_visualizer_active(True) self.set_agent_visualizer_active(False) def stop_listening(self): """Stop listening mode.""" # Skip if audio processor is not ready yet if not hasattr(self, 'audio_processor') or not self.audio_processor: return self.audio_processor.stop_listening() # Deactivate user visualizer self.set_user_visualizer_active(False) def on_speaking_started(self): """Called when speaking starts.""" self.speak_button.setEnabled(False) # Record when speaking started for the watchdog timer self._speaking_start_time = time.time() # Activate agent visualizer, deactivate user visualizer self.set_agent_visualizer_active(True) self.set_user_visualizer_active(False) # Create a dedicated animation timer that won't be stopped by other operations if not hasattr(self, 'agent_animation_timer'): self.agent_animation_timer = QTimer(self) self.agent_animation_timer.timeout.connect(self.animate_agent_visualizer) # Start the timer if it's not already running if not self.agent_animation_timer.isActive(): self.agent_animation_timer.start(50) # Update every 50ms def on_speaking_finished(self): """Called when speaking finishes.""" self.speak_button.setEnabled(True) # Clear the speaking start time if hasattr(self, '_speaking_start_time'): del self._speaking_start_time # Deactivate agent visualizer but keep the timer running # This prevents issues with the timer being stopped and not restarted self.set_agent_visualizer_active(False) # Note: We intentionally don't stop the animation timer here # The visualizer's inactive state will show a flat line instead def animate_agent_visualizer(self): """Animate the agent visualizer with pre-recorded patterns""" # Just trigger an update - the visualizer will use its internal patterns self.agent_visualizer.update_level(0.0) # The level is ignored for agent mode def check_for_commands(self): """Check for commands from the server.""" if os.path.exists(COMMAND_FILE): try: with open(COMMAND_FILE, 'r') as f: command = f.read().strip() # Process the command if command == CMD_LISTEN: # If components are not ready, store the command to process later if not hasattr(self, 'audio_processor') or not self.audio_processor: # Command will be processed in on_components_ready return # Only start listening if we have a saved voice preference if self.has_saved_voice_preference(): self.start_listening() else: # Show guidance message instead self.transcription_label.setText("Please select a voice from the dropdown and click 'Select Voice' to continue") # Wait a moment before speaking to ensure UI is fully ready QTimer.singleShot(500, self.play_guidance_message) elif command == CMD_IDLE and hasattr(self, 'audio_processor') and self.audio_processor and self.audio_processor.is_listening: self.stop_listening() elif command == CMD_SPEAK: # We'll handle speaking in check_for_responses if hasattr(self, 'tts_adapter') and self.tts_adapter: # Activate agent visualizer self.set_agent_visualizer_active(True) self.set_user_visualizer_active(False) except Exception: pass def check_for_responses(self): """Check for response files to speak.""" if os.path.exists(RESPONSE_FILE): try: # Read the response with open(RESPONSE_FILE, 'r') as f: response = f.read().strip() # Delete the file immediately to prevent duplicate processing try: os.remove(RESPONSE_FILE) except Exception: pass # If TTS is not ready yet, show a message and return if not hasattr(self, 'tts_adapter') or not self.tts_adapter: self.transcription_label.setText("Response received but TTS not ready yet") return # Display the response text in the transcription label self.transcription_label.setText(f"Agent: {response}") # Create a dedicated animation timer if it doesn't exist if not hasattr(self, 'agent_animation_timer'): self.agent_animation_timer = QTimer(self) self.agent_animation_timer.timeout.connect(self.animate_agent_visualizer) # Start the timer if it's not already running if not self.agent_animation_timer.isActive(): self.agent_animation_timer.start(50) # Update every 50ms # Speak the response using the TTS adapter if response: self.tts_adapter.speak(response) except Exception as e: self.transcription_label.setText(f"Error processing response: {str(e)}") QTimer.singleShot(3000, lambda: self.transcription_label.setText("Ready for voice interaction")) def closeEvent(self, event): """Handle window close event.""" # Stop audio processor if it exists if hasattr(self, 'audio_processor') and self.audio_processor: self.audio_processor.stop_listening() # Write a UI_CLOSED command to the command file try: with open(COMMAND_FILE, 'w') as f: f.write(CMD_UI_CLOSED) except Exception: pass super().closeEvent(event) def check_ui_responsiveness(self): """Check if UI is responsive and reset state if needed.""" # Check if TTS adapter is in a stuck state if hasattr(self, 'tts_adapter') and self.tts_adapter: # Use the lock to safely check the speaking state with self.tts_adapter._speaking_lock: is_speaking = self.tts_adapter.is_speaking # If speaking state has been active for too long, reset it if is_speaking and hasattr(self, '_speaking_start_time'): duration = time.time() - self._speaking_start_time if duration > 30: # 30 seconds max for speaking with self.tts_adapter._speaking_lock: self.tts_adapter.is_speaking = False self.on_speaking_finished() elif is_speaking: # Record when speaking started self._speaking_start_time = time.time() else: # Clear the timestamp when not speaking if hasattr(self, '_speaking_start_time'): del self._speaking_start_time def run_ui(): """Run the PyQt speech UI.""" app = QApplication(sys.argv) window = PyQtSpeechUI() window.show() return app.exec_() if __name__ == "__main__": # Run the UI sys.exit(run_ui())