Speech MCP
by Kvadratni
- backup
- ui
import os
import sys
import json
import time
import threading
import logging
import tempfile
import io
from queue import Queue
import sysconfig
os.environ["PYTHONPATH"] = os.path.join(sysconfig.get_paths()['stdlib'], 'site-packages')
os.environ["TCL_LIBRARY"] = os.path.join(sysconfig.get_paths()['stdlib'], '..', 'tcl8.6')
os.environ["TK_LIBRARY"] = os.path.join(sysconfig.get_paths()['stdlib'], '..', 'tk8.6')
import tkinter as tk
# Set up logging
log_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "speech-mcp-ui.log")
logging.basicConfig(
level=logging.DEBUG,
format='%(levelname)s: %(message)s', # Very simple format for easier parsing
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler(sys.stdout) # Explicitly use stdout
]
)
logger = logging.getLogger(__name__)
# Path to audio notification files
AUDIO_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "resources", "audio")
START_LISTENING_SOUND = os.path.join(AUDIO_DIR, "start_listening.wav")
STOP_LISTENING_SOUND = os.path.join(AUDIO_DIR, "stop_listening.wav")
# Import other dependencies
import numpy as np
import wave
import pyaudio
# For playing notification sounds
def play_audio_file(file_path):
"""Play an audio file using PyAudio"""
try:
if not os.path.exists(file_path):
logger.error(f"Audio file not found: {file_path}")
return
logger.debug(f"Playing audio notification: {file_path}")
# Open the wave file
with wave.open(file_path, 'rb') as wf:
# Create PyAudio instance
p = pyaudio.PyAudio()
# Open stream
stream = p.open(format=p.get_format_from_width(wf.getsampwidth()),
channels=wf.getnchannels(),
rate=wf.getframerate(),
output=True)
# Read data in chunks and play
chunk_size = 1024
data = wf.readframes(chunk_size)
while data:
stream.write(data)
data = wf.readframes(chunk_size)
# Close stream and PyAudio
stream.stop_stream()
stream.close()
p.terminate()
logger.debug("Audio notification played successfully")
except Exception as e:
logger.error(f"Error playing audio notification: {e}")
# For text-to-speech
# Always prioritize Kokoro as the primary TTS engine if available
try:
print("Initializing Kokoro as primary TTS engine...")
logger.info("Initializing Kokoro as primary TTS engine")
# Import and initialize Kokoro adapter
try:
from speech_mcp.tts_adapters.kokoro_adapter import KokoroTTS
# Initialize with Kokoro voice settings
tts_engine = KokoroTTS(voice="af_heart", lang_code="a", speed=1.0)
tts_available = True
logger.info("Kokoro TTS adapter initialized successfully as primary TTS engine")
print("Kokoro TTS adapter initialized successfully as primary TTS engine!")
# Log available voices
voices = tts_engine.get_available_voices()
logger.debug(f"Available Kokoro TTS voices: {len(voices)}")
for i, voice in enumerate(voices):
logger.debug(f"Voice {i}: {voice}")
print(f"Available Kokoro voices: {', '.join(voices[:5])}{' and more...' if len(voices) > 5 else ''}")
except ImportError as e:
# If the adapter is available but Kokoro itself is not installed
logger.warning(f"Kokoro package not available: {e}. Falling back to pyttsx3.")
print("WARNING: Kokoro package not available. Falling back to pyttsx3.")
raise ImportError("Kokoro package not installed")
except ImportError as e:
logger.warning(f"Kokoro adapter not available: {e}. Falling back to pyttsx3.")
print("WARNING: Kokoro adapter not available. Falling back to pyttsx3.")
# Fall back to pyttsx3
try:
import pyttsx3
tts_engine = pyttsx3.init()
tts_available = True
logger.info("pyttsx3 text-to-speech engine initialized as fallback")
print("pyttsx3 text-to-speech engine initialized as fallback!")
# Log available voices
voices = tts_engine.getProperty('voices')
logger.debug(f"Available pyttsx3 voices: {len(voices)}")
for i, voice in enumerate(voices):
logger.debug(f"Voice {i}: {voice.id} - {voice.name}")
except ImportError as e:
logger.warning(f"pyttsx3 not available: {e}. Text-to-speech will be simulated.")
print("WARNING: pyttsx3 not available. Text-to-speech will be simulated.")
tts_available = False
except Exception as e:
logger.error(f"Error initializing text-to-speech engine: {e}")
print(f"WARNING: Error initializing text-to-speech: {e}. Text-to-speech will be simulated.")
tts_available = False
except Exception as e:
logger.error(f"Error initializing Kokoro TTS adapter: {e}")
print(f"WARNING: Error initializing Kokoro TTS adapter: {e}. Falling back to pyttsx3.")
# Fall back to pyttsx3
try:
import pyttsx3
tts_engine = pyttsx3.init()
tts_available = True
logger.info("pyttsx3 text-to-speech engine initialized as fallback")
print("pyttsx3 text-to-speech engine initialized as fallback!")
# Log available voices
voices = tts_engine.getProperty('voices')
logger.debug(f"Available pyttsx3 voices: {len(voices)}")
for i, voice in enumerate(voices):
logger.debug(f"Voice {i}: {voice.id} - {voice.name}")
except ImportError as e:
logger.warning(f"pyttsx3 not available: {e}. Text-to-speech will be simulated.")
print("WARNING: pyttsx3 not available. Text-to-speech will be simulated.")
tts_available = False
except Exception as e:
logger.error(f"Error initializing text-to-speech engine: {e}")
print(f"WARNING: Error initializing text-to-speech: {e}. Text-to-speech will be simulated.")
tts_available = False
# These will be imported later when needed
whisper_loaded = False
speech_recognition_loaded = False
# Path to save speech state - same as in server.py
STATE_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "speech_state.json")
TRANSCRIPTION_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "transcription.txt")
RESPONSE_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "response.txt")
COMMAND_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "ui_command.txt")
# Audio parameters
CHUNK = 1024
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16000
RECORD_SECONDS = 5
# Import optional dependencies when needed
def load_whisper():
global whisper_loaded
try:
global whisper
print("Loading faster-whisper speech recognition model... This may take a moment.")
import faster_whisper
whisper_loaded = True
logger.info("faster-whisper successfully loaded")
print("faster-whisper speech recognition model loaded successfully!")
return True
except ImportError as e:
logger.error(f"Failed to load faster-whisper: {e}")
print(f"ERROR: Failed to load faster-whisper module: {e}")
print("Trying to fall back to SpeechRecognition library...")
return load_speech_recognition()
def load_speech_recognition():
global speech_recognition_loaded
try:
global sr
import speech_recognition as sr
speech_recognition_loaded = True
logger.info("SpeechRecognition successfully loaded")
print("SpeechRecognition library loaded successfully!")
return True
except ImportError as e:
logger.error(f"Failed to load SpeechRecognition: {e}")
print(f"ERROR: Failed to load SpeechRecognition module: {e}")
print("Please install it with: pip install SpeechRecognition")
return False
class SimpleSpeechProcessorUI:
"""A speech processor UI that shows status and audio waveform visualization"""
def __init__(self, root):
self.root = root
self.root.title("Speech MCP - Voice Interface")
self.root.geometry("500x500") # Square shape for better circular visualization
# Initialize basic components
print("Initializing speech processor...")
logger.info("Initializing speech processor UI")
self.ui_active = True
self.listening = False
self.speaking = False
self.last_transcript = ""
self.last_response = ""
self.should_update = True
self.stream = None
# Create initial command file to indicate UI is ready
try:
with open(COMMAND_FILE, 'w') as f:
f.write("UI_READY")
logger.info("Created initial UI_READY command file")
except Exception as e:
logger.error(f"Error creating initial command file: {e}")
# Audio visualization parameters
self.waveform_data = []
self.waveform_max_points = 100 # Number of points to display in waveform
self.waveform_update_interval = 50 # Update interval in milliseconds
# Initialize PyAudio with explicit device selection
print("Initializing audio system...")
logger.info("Initializing PyAudio system")
try:
self.p = pyaudio.PyAudio()
# Log audio device information
logger.info(f"PyAudio version: {pyaudio.get_portaudio_version()}")
# Get all available audio devices
info = self.p.get_host_api_info_by_index(0)
numdevices = info.get('deviceCount')
logger.info(f"Found {numdevices} audio devices:")
# Find the best input device
selected_device = None
selected_device_index = None
for i in range(numdevices):
try:
device_info = self.p.get_device_info_by_host_api_device_index(0, i)
device_name = device_info.get('name')
max_input_channels = device_info.get('maxInputChannels')
logger.info(f"Device {i}: {device_name}")
logger.info(f" Max Input Channels: {max_input_channels}")
logger.info(f" Default Sample Rate: {device_info.get('defaultSampleRate')}")
# Only consider input devices
if max_input_channels > 0:
print(f"Found input device: {device_name}")
# Prefer non-default devices as they're often external mics
if not selected_device or 'default' not in device_name.lower():
selected_device = device_info
selected_device_index = i
except Exception as e:
logger.warning(f"Error checking device {i}: {e}")
if not selected_device:
raise Exception("No suitable input device found")
logger.info(f"Selected input device: {selected_device['name']} (index {selected_device_index})")
print(f"Using input device: {selected_device['name']}")
# Store the selected device info for later use
self.selected_device_index = selected_device_index
self.selected_device_info = selected_device
except Exception as e:
logger.error(f"Error initializing PyAudio: {e}", exc_info=True)
print(f"ERROR: Failed to initialize audio system: {e}")
# Show error in UI
self.root.after(0, lambda: self.status_label.config(
text=f"Audio Error: {str(e)[:30]}..."
))
# Create the UI components
# Main frame
self.main_frame = tk.Frame(self.root)
self.main_frame.pack(expand=True, fill="both", padx=10, pady=10)
# Status label
self.status_label = tk.Label(
self.main_frame,
text="Initializing...",
font=('Arial', 16)
)
self.status_label.pack(fill="x", pady=(0, 10))
# Waveform canvas
self.waveform_frame = tk.Frame(self.main_frame, bg="#f0f0f0")
self.waveform_frame.pack(expand=True, fill="both")
self.waveform_canvas = tk.Canvas(
self.waveform_frame,
bg="#f0f0f0",
height=150,
highlightthickness=1,
highlightbackground="#cccccc"
)
self.waveform_canvas.pack(expand=True, fill="both", padx=5, pady=5)
# Load speech state
self.load_speech_state()
# Load whisper in a background thread
print("Checking for speech recognition module...")
threading.Thread(target=self.initialize_speech_recognition, daemon=True).start()
# Start threads for monitoring state changes
self.update_thread = threading.Thread(target=self.check_for_updates)
self.update_thread.daemon = True
self.update_thread.start()
# Start thread for checking response file
self.response_thread = threading.Thread(target=self.check_for_responses)
self.response_thread.daemon = True
self.response_thread.start()
# Handle window close event
root.protocol("WM_DELETE_WINDOW", self.on_close)
# Initialize UI to a proper state
self.root.after(100, self.update_ui_from_state)
print("Speech processor initialization complete!")
logger.info("Speech processor initialized successfully")
self.status_label.config(text="Ready")
# Update the waveform to show the initial state
self.root.after(200, self.update_waveform)
def initialize_speech_recognition(self):
"""Initialize speech recognition in a background thread"""
if not load_whisper():
self.root.after(0, lambda: self.status_label.config(
text="WARNING: Speech recognition not available"
))
return
# Load the faster-whisper model
try:
self.root.after(0, lambda: self.status_label.config(
text="Loading faster-whisper model..."
))
# Import here to avoid circular imports
import faster_whisper
# Load the small model for a good balance of speed and accuracy
# Using CPU as default for compatibility
self.whisper_model = faster_whisper.WhisperModel("base", device="cpu", compute_type="int8")
self.root.after(0, lambda: self.status_label.config(
text="Ready"
))
logger.info("faster-whisper model loaded successfully")
except Exception as e:
logger.error(f"Error loading faster-whisper model: {e}")
self.root.after(0, lambda: self.status_label.config(
text=f"Error loading model: {e}"
))
def load_speech_state(self):
"""Load the speech state from the file shared with the server"""
try:
if os.path.exists(STATE_FILE):
with open(STATE_FILE, 'r') as f:
state = json.load(f)
self.ui_active = state.get("ui_active", False)
self.listening = state.get("listening", False)
self.speaking = state.get("speaking", False)
self.last_transcript = state.get("last_transcript", "")
self.last_response = state.get("last_response", "")
else:
# Default state if file doesn't exist
self.ui_active = True
self.listening = False
self.speaking = False
self.last_transcript = ""
self.last_response = ""
self.save_speech_state()
except Exception as e:
logger.error(f"Error loading speech state: {e}")
# Default state on error
self.ui_active = True
self.listening = False
self.speaking = False
self.last_transcript = ""
self.last_response = ""
def save_speech_state(self):
"""Save the speech state to the file shared with the server"""
try:
state = {
"ui_active": self.ui_active,
"listening": self.listening,
"speaking": self.speaking,
"last_transcript": self.last_transcript,
"last_response": self.last_response
}
with open(STATE_FILE, 'w') as f:
json.dump(state, f)
except Exception as e:
logger.error(f"Error saving speech state: {e}")
def update_ui_from_state(self):
"""Update the UI to reflect the current speech state"""
if self.listening:
self.status_label.config(text="Listening...")
# Start visualization if not already running
self.root.after(0, self.update_waveform)
# Start listening if not already started
if not hasattr(self, 'stream') or self.stream is None:
self.root.after(0, self.start_listening)
elif self.speaking:
self.status_label.config(text="Speaking...")
# Start visualization for speaking
self.root.after(0, self.update_waveform)
else:
self.status_label.config(text="Ready")
# Update visualization to show idle state
self.root.after(0, self.update_waveform)
# Stop listening if still active
if hasattr(self, 'stream') and self.stream is not None:
self.root.after(0, self.stop_listening)
# Always schedule another waveform update to ensure the UI stays responsive
self.root.after(self.waveform_update_interval * 2, self.update_waveform)
def update_waveform(self):
"""Update the circular audio visualization on the canvas"""
try:
# Get canvas dimensions
canvas_width = self.waveform_canvas.winfo_width()
canvas_height = self.waveform_canvas.winfo_height()
if canvas_width <= 1 or canvas_height <= 1:
# Canvas not yet properly sized
self.root.after(100, self.update_waveform)
return
# Clear the canvas
self.waveform_canvas.delete("all")
# Calculate center point
center_x = canvas_width / 2
center_y = canvas_height / 2
# Draw background circle
background_radius = min(canvas_width, canvas_height) * 0.4
self.waveform_canvas.create_oval(
center_x - background_radius, center_y - background_radius,
center_x + background_radius, center_y + background_radius,
outline="#e0e0e0", width=2, fill="#f8f8f8"
)
if self.listening or self.speaking:
# Get current amplitude
current_amplitude = 0
if hasattr(self, 'waveform_data') and len(self.waveform_data) > 0:
# Use the most recent amplitude value
current_amplitude = self.waveform_data[-1]
# Calculate radius based on amplitude
# Scale the amplitude (typically 0-0.5) to a reasonable range
# Base radius is 30% of the background circle, max is 90%
min_radius = background_radius * 0.3
max_radius = background_radius * 0.9
# Scale amplitude (typically 0-0.5) to radius range
radius = min_radius + (current_amplitude * (max_radius - min_radius) * 4)
# Ensure radius stays within bounds
radius = max(min_radius, min(radius, max_radius))
# Draw the amplitude circle
fill_color = "#4287f5" if self.listening else "#42f587" # Blue for listening, green for speaking
self.waveform_canvas.create_oval(
center_x - radius, center_y - radius,
center_x + radius, center_y + radius,
outline="", fill=fill_color
)
# Draw inner circle (white)
inner_radius = min_radius * 0.8
self.waveform_canvas.create_oval(
center_x - inner_radius, center_y - inner_radius,
center_x + inner_radius, center_y + inner_radius,
outline="", fill="white"
)
# Add icon based on state
if self.listening:
# Draw microphone icon (simple representation)
mic_width = inner_radius * 0.6
mic_height = inner_radius * 1.2
# Microphone body
self.waveform_canvas.create_rectangle(
center_x - mic_width/2, center_y - mic_height/2,
center_x + mic_width/2, center_y + mic_height/4,
fill="#555555", outline=""
)
# Microphone top (rounded)
self.waveform_canvas.create_oval(
center_x - mic_width/2, center_y - mic_height/2 - mic_width/2,
center_x + mic_width/2, center_y - mic_height/2 + mic_width/2,
fill="#555555", outline=""
)
# Stand
self.waveform_canvas.create_rectangle(
center_x - mic_width/6, center_y + mic_height/4,
center_x + mic_width/6, center_y + mic_height/2,
fill="#555555", outline=""
)
# Base
self.waveform_canvas.create_rectangle(
center_x - mic_width/2, center_y + mic_height/2 - mic_width/6,
center_x + mic_width/2, center_y + mic_height/2 + mic_width/6,
fill="#555555", outline=""
)
else:
# Draw speaker icon for speaking
speaker_size = inner_radius * 0.7
# Speaker body
self.waveform_canvas.create_rectangle(
center_x - speaker_size/2, center_y - speaker_size/2,
center_x - speaker_size/6, center_y + speaker_size/2,
fill="#555555", outline=""
)
# Speaker cone
points = [
center_x - speaker_size/6, center_y - speaker_size/2, # Top left
center_x + speaker_size/2, center_y - speaker_size, # Top right
center_x + speaker_size/2, center_y + speaker_size, # Bottom right
center_x - speaker_size/6, center_y + speaker_size/2 # Bottom left
]
self.waveform_canvas.create_polygon(points, fill="#555555", outline="")
# Sound waves (3 arcs)
for i in range(1, 4):
arc_size = speaker_size * (0.5 + i * 0.25)
self.waveform_canvas.create_arc(
center_x, center_y - arc_size/2,
center_x + arc_size, center_y + arc_size/2,
start=300, extent=120,
style="arc", outline="#555555", width=2
)
# Draw pulsing rings
if hasattr(self, 'pulse_count'):
self.pulse_count += 1
if self.pulse_count > 100:
self.pulse_count = 0
else:
self.pulse_count = 0
# Create 3 pulsing rings
for i in range(3):
pulse_phase = (self.pulse_count + i * 33) % 100
if pulse_phase < 70: # Only show rings during part of the cycle
# Calculate ring size based on pulse phase
ring_size = background_radius * (0.5 + pulse_phase / 70)
# Calculate opacity based on pulse phase (fade out as it expands)
opacity = int(255 * (1 - pulse_phase / 70))
ring_color = f"#{opacity:02x}{opacity:02x}{opacity:02x}"
self.waveform_canvas.create_oval(
center_x - ring_size, center_y - ring_size,
center_x + ring_size, center_y + ring_size,
outline=ring_color, width=1, fill=""
)
else:
# Draw a standby/ready icon in the center
ready_radius = background_radius * 0.3
self.waveform_canvas.create_oval(
center_x - ready_radius, center_y - ready_radius,
center_x + ready_radius, center_y + ready_radius,
outline="#cccccc", width=2, fill="#f0f0f0"
)
# Draw a simple "ready" symbol (play button)
triangle_size = ready_radius * 0.8
points = [
center_x - triangle_size/2, center_y - triangle_size,
center_x - triangle_size/2, center_y + triangle_size,
center_x + triangle_size, center_y
]
self.waveform_canvas.create_polygon(points, fill="#cccccc", outline="")
# Schedule the next update if listening or speaking
if self.listening or self.speaking:
self.root.after(self.waveform_update_interval, self.update_waveform)
except Exception as e:
logger.error(f"Error updating visualization: {e}", exc_info=True)
# Try again after a delay
self.root.after(self.waveform_update_interval * 2, self.update_waveform)
def process_audio_for_visualization(self, audio_data):
"""Process audio data for visualization"""
try:
# Convert to numpy array
data = np.frombuffer(audio_data, dtype=np.int16)
# Normalize the data to range [-1, 1]
normalized = data.astype(float) / 32768.0
# Take absolute value to get amplitude
amplitude = np.abs(normalized).mean()
# Add to waveform data
self.waveform_data.append(amplitude)
# Keep only the most recent points
if len(self.waveform_data) > self.waveform_max_points:
self.waveform_data = self.waveform_data[-self.waveform_max_points:]
except Exception as e:
logger.error(f"Error processing audio for visualization: {e}", exc_info=True)
def start_listening(self):
"""Start listening for audio input"""
try:
logger.info("Starting audio recording")
# Play start listening notification sound
threading.Thread(target=play_audio_file, args=(START_LISTENING_SOUND,), daemon=True).start()
# Reset waveform data
self.waveform_data = []
def audio_callback(in_data, frame_count, time_info, status):
try:
# Log detailed timing information periodically
if hasattr(self, 'callback_count'):
self.callback_count += 1
if self.callback_count % 50 == 0: # Log every ~50 callbacks
logger.debug(f"Audio callback timing - input timestamp: {time_info.get('input_buffer_adc_time', 'N/A')}, "
f"current time: {time_info.get('current_time', 'N/A')}")
else:
self.callback_count = 1
# Check for audio status flags
if status:
status_flags = []
if status & pyaudio.paInputUnderflow:
status_flags.append("Input Underflow")
if status & pyaudio.paInputOverflow:
status_flags.append("Input Overflow")
if status & pyaudio.paOutputUnderflow:
status_flags.append("Output Underflow")
if status & pyaudio.paOutputOverflow:
status_flags.append("Output Overflow")
if status & pyaudio.paPrimingOutput:
status_flags.append("Priming Output")
if status_flags:
logger.warning(f"Audio callback status flags: {', '.join(status_flags)}")
# Store audio data for processing
if hasattr(self, 'audio_frames'):
self.audio_frames.append(in_data)
# Process audio for visualization
self.process_audio_for_visualization(in_data)
# Periodically log audio levels for debugging
if len(self.audio_frames) % 20 == 0: # Log every ~1 second (20 chunks at 1024 samples)
try:
audio_data = np.frombuffer(in_data, dtype=np.int16)
normalized = audio_data.astype(float) / 32768.0
amplitude = np.abs(normalized).mean()
logger.debug(f"Current audio amplitude: {amplitude:.6f}")
except Exception as e:
logger.error(f"Error calculating audio level: {e}")
return (in_data, pyaudio.paContinue)
except Exception as e:
logger.error(f"Error in audio callback: {e}", exc_info=True)
return (in_data, pyaudio.paContinue) # Try to continue despite errors
# Initialize audio frames list
self.audio_frames = []
# Start the audio stream with the selected device
logger.debug(f"Opening audio stream with FORMAT={FORMAT}, CHANNELS={CHANNELS}, RATE={RATE}, CHUNK={CHUNK}, DEVICE={self.selected_device_index}")
self.stream = self.p.open(
format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
input_device_index=self.selected_device_index,
frames_per_buffer=CHUNK,
stream_callback=audio_callback
)
# Verify stream is active and receiving audio
if not self.stream.is_active():
logger.error("Stream created but not active")
raise Exception("Audio stream is not active")
# Test audio input
logger.info("Testing audio input...")
print("Testing audio input...")
# Wait a moment and check if we're receiving audio
time.sleep(0.5)
if not hasattr(self, 'audio_frames') or len(self.audio_frames) == 0:
logger.error("No audio data received in initial test")
raise Exception("No audio data being received")
# Check audio levels
test_frame = self.audio_frames[-1]
audio_data = np.frombuffer(test_frame, dtype=np.int16)
normalized = audio_data.astype(float) / 32768.0
level = np.abs(normalized).mean()
logger.info(f"Initial audio level: {level:.6f}")
print(f"Audio input level: {level:.6f}")
if level < 0.0001: # Very low level threshold
logger.warning("Very low audio input level detected")
print("Warning: Very low audio input level detected")
logger.info("Audio stream initialized and receiving data")
print("Microphone activated. Listening for speech...")
# Start a thread to detect silence and stop recording
threading.Thread(target=self.detect_silence, daemon=True).start()
except Exception as e:
logger.error(f"Error starting audio stream: {e}", exc_info=True)
print(f"Error starting audio: {e}")
self.listening = False
self.save_speech_state()
self.update_ui_from_state()
def detect_silence(self):
"""Detect when the user stops speaking and end recording"""
try:
# Wait for initial audio to accumulate
logger.info("Starting silence detection")
time.sleep(0.5)
# Adjusted silence detection parameters for longer pauses
silence_threshold = 0.008 # Reduced threshold to be more sensitive to quiet speech (was 0.01)
silence_duration = 0
max_silence = 5.0 # Increased from 1.5s to 5.0s to allow for longer thinking pauses
check_interval = 0.1 # Check every 100ms
logger.debug(f"Silence detection parameters: threshold={silence_threshold}, max_silence={max_silence}s, check_interval={check_interval}s")
# Track audio levels for debugging
amplitude_history = []
while self.listening and self.stream and silence_duration < max_silence:
if not hasattr(self, 'audio_frames') or len(self.audio_frames) < 2:
time.sleep(check_interval)
continue
# Get the latest audio frame
latest_frame = self.audio_frames[-1]
audio_data = np.frombuffer(latest_frame, dtype=np.int16)
normalized = audio_data.astype(float) / 32768.0
current_amplitude = np.abs(normalized).mean()
# Use a moving average of recent amplitudes for more stable detection
if hasattr(self, 'recent_amplitudes') and len(self.recent_amplitudes) > 0:
avg_amplitude = sum(self.recent_amplitudes) / len(self.recent_amplitudes)
else:
avg_amplitude = current_amplitude
if avg_amplitude < silence_threshold:
silence_duration += check_interval
# Log only when silence is detected
if silence_duration >= 1.0 and silence_duration % 1.0 < check_interval:
logger.debug(f"Silence detected for {silence_duration:.1f}s, avg amplitude: {avg_amplitude:.6f}")
else:
if silence_duration > 0:
logger.debug(f"Speech resumed after {silence_duration:.1f}s of silence, amplitude: {avg_amplitude:.6f}")
silence_duration = 0
time.sleep(check_interval)
# If we exited because of silence detection
if self.listening and self.stream:
logger.info(f"Silence threshold reached after {silence_duration:.1f}s, stopping recording")
logger.debug(f"Final amplitude history: {[f'{a:.6f}' for a in amplitude_history]}")
self.root.after(0, lambda: self.status_label.config(text="Processing speech..."))
print("Silence detected. Processing speech...")
self.process_recording()
self.stop_listening()
else:
if not self.listening:
logger.info("Silence detection stopped because listening state changed")
if not self.stream:
logger.info("Silence detection stopped because audio stream was closed")
except Exception as e:
logger.error(f"Error in silence detection: {e}", exc_info=True)
def process_recording(self):
"""Process the recorded audio and generate a transcription using faster-whisper"""
try:
if not hasattr(self, 'audio_frames') or not self.audio_frames:
logger.warning("No audio frames to process")
return
logger.info(f"Processing {len(self.audio_frames)} audio frames")
# Check if we have enough audio data
total_audio_time = len(self.audio_frames) * (CHUNK / RATE)
logger.info(f"Total recorded audio: {total_audio_time:.2f} seconds")
if total_audio_time < 0.5: # Less than half a second of audio
logger.warning(f"Audio recording too short ({total_audio_time:.2f}s), may not contain speech")
if not hasattr(self, 'whisper_model') or self.whisper_model is None:
logger.warning("faster-whisper model not loaded yet")
self.last_transcript = "Sorry, speech recognition model is still loading. Please try again in a moment."
with open(TRANSCRIPTION_FILE, 'w') as f:
f.write(self.last_transcript)
return
# Save the recorded audio to a temporary WAV file
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_audio:
temp_audio_path = temp_audio.name
# Create a WAV file from the recorded frames
logger.debug(f"Creating WAV file at {temp_audio_path}")
wf = wave.open(temp_audio_path, 'wb')
wf.setnchannels(CHANNELS)
wf.setsampwidth(self.p.get_sample_size(FORMAT))
wf.setframerate(RATE)
wf.writeframes(b''.join(self.audio_frames))
wf.close()
# Get file size for logging
file_size = os.path.getsize(temp_audio_path)
logger.debug(f"WAV file created, size: {file_size} bytes")
logger.info(f"Audio saved to temporary file: {temp_audio_path}")
# Use faster-whisper to transcribe the audio
logger.info("Transcribing audio with faster-whisper...")
print("Transcribing audio with faster-whisper...")
self.root.after(0, lambda: self.status_label.config(text="Transcribing audio..."))
transcription_start = time.time()
segments, info = self.whisper_model.transcribe(temp_audio_path, beam_size=5)
# Collect all segments to form the complete transcription
transcription = ""
for segment in segments:
transcription += segment.text + " "
transcription = transcription.strip()
transcription_time = time.time() - transcription_start
logger.info(f"Transcription completed in {transcription_time:.2f}s: {transcription}")
logger.debug(f"Transcription info: {info}")
print(f"Transcription complete: \"{transcription}\"")
# Log segments for debugging
logger.debug("Transcription segments:")
for i, segment in enumerate(segments):
logger.debug(f"Segment {i}: {segment.start}-{segment.end}s: {segment.text}")
# Clean up the temporary file
try:
logger.debug(f"Removing temporary WAV file: {temp_audio_path}")
os.unlink(temp_audio_path)
except Exception as e:
logger.error(f"Error removing temporary file: {e}")
# Update the state with the transcription
self.last_transcript = transcription
# Write the transcription to a file for the server to read
try:
logger.debug(f"Writing transcription to file: {TRANSCRIPTION_FILE}")
with open(TRANSCRIPTION_FILE, 'w') as f:
f.write(transcription)
logger.debug("Transcription file written successfully")
except Exception as e:
logger.error(f"Error writing transcription to file: {e}", exc_info=True)
raise e
# Update state
self.save_speech_state()
except Exception as e:
logger.error(f"Error processing recording: {e}", exc_info=True)
self.last_transcript = f"Error processing speech: {str(e)}"
with open(TRANSCRIPTION_FILE, 'w') as f:
f.write(self.last_transcript)
def stop_listening(self):
"""Stop listening for audio input"""
try:
logger.info("Stopping audio recording")
if self.stream:
logger.debug(f"Stopping audio stream, stream active: {self.stream.is_active()}")
self.stream.stop_stream()
self.stream.close()
self.stream = None
print("Microphone deactivated.")
logger.info("Audio stream closed successfully")
# Play stop listening notification sound
threading.Thread(target=play_audio_file, args=(STOP_LISTENING_SOUND,), daemon=True).start()
else:
logger.debug("No active audio stream to close")
# Clear waveform data
self.waveform_data = []
# Update state
self.listening = False
self.save_speech_state()
self.update_ui_from_state()
except Exception as e:
logger.error(f"Error stopping audio stream: {e}", exc_info=True)
print(f"Error stopping audio: {e}")
# Make sure we update state even if there's an error
self.listening = False
self.save_speech_state()
self.update_ui_from_state()
def check_for_updates(self):
"""Periodically check for updates to the speech state file and command file"""
last_modified_state = 0
last_modified_command = 0
if os.path.exists(STATE_FILE):
last_modified_state = os.path.getmtime(STATE_FILE)
while self.should_update:
try:
# Check for command file first (higher priority)
if os.path.exists(COMMAND_FILE):
current_modified = os.path.getmtime(COMMAND_FILE)
if current_modified > last_modified_command:
last_modified_command = current_modified
# Read the command
try:
with open(COMMAND_FILE, 'r') as f:
command = f.read().strip()
logger.debug(f"Received UI command: {command}")
# Process the command
if command == "LISTEN":
if not self.listening:
self.listening = True
self.speaking = False
self.root.after(0, self.start_listening)
self.root.after(0, self.update_ui_from_state)
elif command == "SPEAK":
if not self.speaking:
self.listening = False
self.speaking = True
self.root.after(0, self.update_ui_from_state)
elif command == "IDLE":
self.listening = False
self.speaking = False
self.root.after(0, self.update_ui_from_state)
except Exception as e:
logger.error(f"Error processing command: {e}")
# Also check state file for other updates
if os.path.exists(STATE_FILE):
current_modified = os.path.getmtime(STATE_FILE)
if current_modified > last_modified_state:
last_modified_state = current_modified
self.load_speech_state()
self.root.after(0, self.update_ui_from_state)
except Exception as e:
logger.error(f"Error checking for updates: {e}")
time.sleep(0.1) # Check every 100ms for faster response
def check_for_responses(self):
"""Periodically check for new responses to speak"""
# Add a lock to prevent multiple TTS instances from running simultaneously
self.tts_lock = threading.Lock()
while self.should_update:
try:
if os.path.exists(RESPONSE_FILE):
# Only proceed if we're not already speaking
if not self.speaking and self.tts_lock.acquire(blocking=False):
try:
# Read the response
logger.debug(f"Found response file: {RESPONSE_FILE}")
try:
with open(RESPONSE_FILE, 'r') as f:
response = f.read().strip()
logger.debug(f"Read response text ({len(response)} chars): {response[:100]}{'...' if len(response) > 100 else ''}")
except Exception as e:
logger.error(f"Error reading response file: {e}", exc_info=True)
self.tts_lock.release()
time.sleep(0.5)
continue
# Delete the file immediately to prevent duplicate processing
try:
logger.debug("Removing response file")
os.remove(RESPONSE_FILE)
except Exception as e:
logger.warning(f"Error removing response file: {e}")
# Process the response
if response:
self.last_response = response
self.speaking = True
self.save_speech_state()
self.root.after(0, self.update_ui_from_state)
# Create a simple speaking animation
def animate_speaking():
if not self.speaking:
return
# Generate a random amplitude for speaking animation
# Use a sine wave with noise for more natural movement
import time
time_val = time.time() * 3 # Speed factor
base_amplitude = 0.1 + 0.1 * np.sin(time_val)
noise = 0.05 * np.random.random()
amplitude = base_amplitude + noise
# Add to waveform data
self.waveform_data.append(amplitude)
# Keep only the most recent points
if len(self.waveform_data) > self.waveform_max_points:
self.waveform_data = self.waveform_data[-self.waveform_max_points:]
# Update the visualization
self.update_waveform()
# Schedule the next animation frame if still speaking
if self.speaking:
self.root.after(50, animate_speaking)
# Start the speaking animation
self.root.after(0, animate_speaking)
logger.info(f"Speaking text ({len(response)} chars): {response[:100]}{'...' if len(response) > 100 else ''}")
print(f"Speaking: \"{response}\"")
# Use actual text-to-speech if available
if tts_available:
try:
logger.debug("Using TTS engine for text-to-speech")
# If we're using our Kokoro adapter
if hasattr(tts_engine, 'speak'):
# Use the speak method directly
tts_start = time.time()
tts_engine.speak(response)
tts_duration = time.time() - tts_start
logger.info(f"Kokoro TTS completed in {tts_duration:.2f} seconds")
print("Speech completed.")
else:
# Use pyttsx3 directly
# Log TTS settings
rate = tts_engine.getProperty('rate')
volume = tts_engine.getProperty('volume')
voice = tts_engine.getProperty('voice')
logger.debug(f"TTS settings - Rate: {rate}, Volume: {volume}, Voice: {voice}")
# Speak the text
tts_start = time.time()
tts_engine.say(response)
tts_engine.runAndWait()
tts_duration = time.time() - tts_start
logger.info(f"Speech completed in {tts_duration:.2f} seconds")
print("Speech completed.")
except Exception as e:
logger.error(f"Error using text-to-speech: {e}", exc_info=True)
print(f"Error using text-to-speech: {e}")
# Fall back to simulated speech
logger.info("Falling back to simulated speech")
speaking_duration = len(response) * 0.05 # 50ms per character
time.sleep(speaking_duration)
else:
# Simulate speaking time if TTS not available
logger.debug("TTS not available, simulating speech timing")
speaking_duration = len(response) * 0.05 # 50ms per character
logger.debug(f"Simulating speech for {speaking_duration:.2f} seconds")
time.sleep(speaking_duration)
# Update state when done speaking
self.speaking = False
self.waveform_data = [] # Clear waveform data
self.save_speech_state()
self.root.after(0, self.update_ui_from_state)
print("Done speaking.")
logger.info("Done speaking")
# Release the lock when done
self.tts_lock.release()
except Exception as e:
logger.error(f"Error processing response: {e}", exc_info=True)
# Make sure we release the lock on error
self.speaking = False
self.save_speech_state()
try:
self.tts_lock.release()
except RuntimeError:
pass # Ignore if lock wasn't acquired
except Exception as e:
logger.error(f"Error checking for responses: {e}", exc_info=True)
# Make sure we're not stuck in speaking state
if self.speaking:
self.speaking = False
self.save_speech_state()
# Try to release the lock if we might have it
try:
self.tts_lock.release()
except RuntimeError:
pass # Ignore if lock wasn't acquired
time.sleep(0.1) # Check every 100ms for faster response
def on_close(self):
"""Handle window close event"""
try:
logger.info("Shutting down speech processor")
print("\nShutting down speech processor...")
self.should_update = False
if self.stream:
logger.debug("Stopping audio stream")
try:
self.stream.stop_stream()
self.stream.close()
logger.debug("Audio stream closed successfully")
except Exception as e:
logger.error(f"Error closing audio stream: {e}")
logger.debug("Terminating PyAudio")
try:
self.p.terminate()
logger.debug("PyAudio terminated successfully")
except Exception as e:
logger.error(f"Error terminating PyAudio: {e}")
# Update state to indicate UI is closed
self.ui_active = False
self.listening = False
self.speaking = False
self.save_speech_state()
# Write a UI_CLOSED command to the command file
try:
with open(COMMAND_FILE, 'w') as f:
f.write("UI_CLOSED")
logger.info("Created UI_CLOSED command file")
except Exception as e:
logger.error(f"Error creating command file: {e}")
# Remove the lock file
try:
lock_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "speech_ui.lock")
if os.path.exists(lock_file):
os.remove(lock_file)
logger.info("Removed lock file")
except Exception as e:
logger.error(f"Error removing lock file: {e}")
print("Speech processor shut down successfully.")
logger.info("Speech processor shut down successfully")
self.root.destroy()
except Exception as e:
logger.error(f"Error shutting down speech processor: {e}", exc_info=True)
print(f"Error during shutdown: {e}")
self.root.destroy()
def main():
"""Main entry point for the speech processor"""
try:
logger.info("Starting Speech MCP Processor")
print("\n===== Speech MCP Processor =====")
print("Starting speech recognition system...")
# Log platform information
import platform
logger.info(f"Platform: {platform.platform()}")
logger.info(f"Python version: {platform.python_version()}")
# Check if another instance is already running
import psutil
import os
# Create a lock file to prevent multiple instances
lock_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "speech_ui.lock")
# Check if the lock file exists and if the process is still running
if os.path.exists(lock_file):
try:
with open(lock_file, 'r') as f:
pid = int(f.read().strip())
if psutil.pid_exists(pid):
# Check if it's actually our UI process
try:
process = psutil.Process(pid)
cmdline = process.cmdline()
if len(cmdline) >= 3 and 'speech_mcp.ui' in ' '.join(cmdline):
logger.warning(f"Another UI instance is already running with PID {pid}")
print(f"WARNING: Another Speech UI instance is already running with PID {pid}")
print("Only one instance of Speech UI can run at a time.")
return
except (psutil.NoSuchProcess, psutil.AccessDenied):
# Process doesn't exist or can't be accessed, ignore the lock file
pass
except Exception as e:
logger.error(f"Error checking lock file: {e}")
# Create a new lock file with our PID
try:
with open(lock_file, 'w') as f:
f.write(str(os.getpid()))
logger.info(f"Created lock file with PID {os.getpid()}")
except Exception as e:
logger.error(f"Error creating lock file: {e}")
# Log audio-related environment variables
audio_env_vars = {k: v for k, v in os.environ.items() if 'AUDIO' in k.upper() or 'PULSE' in k.upper() or 'ALSA' in k.upper()}
if audio_env_vars:
logger.debug(f"Audio-related environment variables: {json.dumps(audio_env_vars)}")
# Start the UI
root = tk.Tk()
app = SimpleSpeechProcessorUI(root)
logger.info("Starting Tkinter main loop")
root.mainloop()
logger.info("Tkinter main loop exited")
# Clean up the lock file when we exit
try:
if os.path.exists(lock_file):
os.remove(lock_file)
logger.info("Removed lock file")
except Exception as e:
logger.error(f"Error removing lock file: {e}")
except Exception as e:
logger.error(f"Error in speech processor main: {e}", exc_info=True)
print(f"\nERROR: Failed to start speech processor: {e}")
if __name__ == "__main__":
main()