Skip to main content
Glama

Voice Mode

by mbailey
cli.py87.2 kB
""" CLI entry points for voice-mode package. """ import asyncio import sys import os import warnings import subprocess import shutil import click # Import version info try: from voice_mode.version import __version__ except ImportError: __version__ = "unknown" # Import configuration constants from voice_mode.config import ( DEFAULT_WHISPER_MODEL, DEFAULT_LISTEN_DURATION, MIN_RECORDING_DURATION, ) # Suppress known deprecation warnings for better user experience # These apply to both CLI commands and MCP server operation # They can be shown with VOICEMODE_DEBUG=true or --debug flag if not os.environ.get('VOICEMODE_DEBUG', '').lower() in ('true', '1', 'yes'): # Suppress audioop deprecation warning from pydub warnings.filterwarnings('ignore', message='.*audioop.*deprecated.*', category=DeprecationWarning) # Suppress pkg_resources deprecation warning from webrtcvad warnings.filterwarnings('ignore', message='.*pkg_resources.*deprecated.*', category=UserWarning) # Suppress psutil connections() deprecation warning warnings.filterwarnings('ignore', message='.*connections.*deprecated.*', category=DeprecationWarning) # Also suppress INFO logging for CLI commands (but not for MCP server) import logging logging.getLogger("voicemode").setLevel(logging.WARNING) logging.getLogger("httpx").setLevel(logging.WARNING) # Service management CLI - runs MCP server by default, subcommands override @click.group(invoke_without_command=True) @click.version_option(version=__version__, prog_name="VoiceMode") @click.help_option('-h', '--help', help='Show this message and exit') @click.option('--debug', is_flag=True, help='Enable debug mode and show all warnings') @click.option('--tools-enabled', help='Comma-separated list of tools to enable (whitelist)') @click.option('--tools-disabled', help='Comma-separated list of tools to disable (blacklist)') @click.pass_context def voice_mode_main_cli(ctx, debug, tools_enabled, tools_disabled): """Voice Mode - MCP server and service management. Without arguments, starts the MCP server. With subcommands, executes service management operations. """ if debug: # Re-enable warnings if debug flag is set warnings.resetwarnings() os.environ['VOICEMODE_DEBUG'] = 'true' # Re-enable INFO logging import logging logging.getLogger("voicemode").setLevel(logging.INFO) # Set environment variables from CLI args if tools_enabled: os.environ['VOICEMODE_TOOLS_ENABLED'] = tools_enabled if tools_disabled: os.environ['VOICEMODE_TOOLS_DISABLED'] = tools_disabled if ctx.invoked_subcommand is None: # No subcommand - run MCP server # Note: warnings are already suppressed at module level unless debug is enabled from .server import main as voice_mode_main voice_mode_main() def voice_mode() -> None: """Entry point for voicemode command - starts the MCP server or runs subcommands.""" voice_mode_main_cli() # Audio group for audio-related commands @voice_mode_main_cli.group() @click.help_option('-h', '--help', help='Show this message and exit') def audio(): """Audio transcription and playback commands.""" pass # Service group commands @voice_mode_main_cli.group() @click.help_option('-h', '--help', help='Show this message and exit') def kokoro(): """Manage Kokoro TTS service.""" pass @voice_mode_main_cli.group() @click.help_option('-h', '--help', help='Show this message and exit') def whisper(): """Manage Whisper STT service.""" pass @voice_mode_main_cli.group() @click.help_option('-h', '--help', help='Show this message and exit') def livekit(): """Manage LiveKit RTC service.""" pass # Service functions are imported lazily in their respective command handlers to improve startup time # Kokoro service commands @kokoro.command() def status(): """Show Kokoro service status.""" from voice_mode.tools.service import status_service result = asyncio.run(status_service("kokoro")) click.echo(result) @kokoro.command() def start(): """Start Kokoro service.""" from voice_mode.tools.service import start_service result = asyncio.run(start_service("kokoro")) click.echo(result) @kokoro.command() def stop(): """Stop Kokoro service.""" from voice_mode.tools.service import stop_service result = asyncio.run(stop_service("kokoro")) click.echo(result) @kokoro.command() def restart(): """Restart Kokoro service.""" from voice_mode.tools.service import restart_service result = asyncio.run(restart_service("kokoro")) click.echo(result) @kokoro.command() def enable(): """Enable Kokoro service to start at boot/login.""" from voice_mode.tools.service import enable_service result = asyncio.run(enable_service("kokoro")) click.echo(result) @kokoro.command() def disable(): """Disable Kokoro service from starting at boot/login.""" from voice_mode.tools.service import disable_service result = asyncio.run(disable_service("kokoro")) click.echo(result) @kokoro.command() @click.help_option('-h', '--help') @click.option('--lines', '-n', default=50, help='Number of log lines to show') def logs(lines): """View Kokoro service logs.""" from voice_mode.tools.service import view_logs result = asyncio.run(view_logs("kokoro", lines)) click.echo(result) @kokoro.command("update-service-files") def kokoro_update_service_files(): """Update Kokoro service files to latest version.""" from voice_mode.tools.service import update_service_files result = asyncio.run(update_service_files("kokoro")) click.echo(result) @kokoro.command() def health(): """Check Kokoro health endpoint.""" import subprocess try: result = subprocess.run( ["curl", "-s", "http://127.0.0.1:8880/health"], capture_output=True, text=True, timeout=5 ) if result.returncode == 0: import json try: health_data = json.loads(result.stdout) click.echo("✅ Kokoro is responding") click.echo(f" Status: {health_data.get('status', 'unknown')}") if 'uptime' in health_data: click.echo(f" Uptime: {health_data['uptime']}") except json.JSONDecodeError: click.echo("✅ Kokoro is responding (non-JSON response)") else: click.echo("❌ Kokoro not responding on port 8880") except subprocess.TimeoutExpired: click.echo("❌ Kokoro health check timed out") except Exception as e: click.echo(f"❌ Health check failed: {e}") @kokoro.command() @click.help_option('-h', '--help') @click.option('--install-dir', help='Directory to install kokoro-fastapi') @click.option('--port', default=8880, help='Port to configure for the service') @click.option('--force', '-f', is_flag=True, help='Force reinstall even if already installed') @click.option('--version', default='latest', help='Version to install (default: latest)') @click.option('--auto-enable/--no-auto-enable', default=None, help='Enable service at boot/login') @click.option('--skip-deps', is_flag=True, help='Skip dependency checks (for advanced users)') def install(install_dir, port, force, version, auto_enable, skip_deps): """Install kokoro-fastapi TTS service.""" from voice_mode.tools.kokoro.install import kokoro_install result = asyncio.run(kokoro_install.fn( install_dir=install_dir, port=port, force_reinstall=force, version=version, auto_enable=auto_enable, skip_deps=skip_deps )) if result.get('success'): if result.get('already_installed'): click.echo(f"✅ Kokoro already installed at {result['install_path']}") click.echo(f" Version: {result.get('version', 'unknown')}") else: click.echo("✅ Kokoro installed successfully!") click.echo(f" Install path: {result['install_path']}") click.echo(f" Version: {result.get('version', 'unknown')}") if result.get('enabled'): click.echo(" Auto-start: Enabled") if result.get('migration_message'): click.echo(f"\n{result['migration_message']}") else: click.echo(f"❌ Installation failed: {result.get('error', 'Unknown error')}") if result.get('details'): click.echo(f" Details: {result['details']}") @kokoro.command() @click.help_option('-h', '--help') @click.option('--remove-models', is_flag=True, help='Also remove downloaded Kokoro models') @click.option('--remove-all-data', is_flag=True, help='Remove all Kokoro data including logs and cache') @click.confirmation_option(prompt='Are you sure you want to uninstall Kokoro?') def uninstall(remove_models, remove_all_data): """Uninstall kokoro-fastapi service and optionally remove data.""" from voice_mode.tools.kokoro.uninstall import kokoro_uninstall result = asyncio.run(kokoro_uninstall.fn( remove_models=remove_models, remove_all_data=remove_all_data )) if result.get('success'): click.echo("✅ Kokoro uninstalled successfully!") if result.get('service_stopped'): click.echo(" Service stopped") if result.get('service_disabled'): click.echo(" Service disabled") if result.get('install_removed'): click.echo(f" Installation removed: {result['install_path']}") if result.get('models_removed'): click.echo(" Models removed") if result.get('data_removed'): click.echo(" All data removed") if result.get('warnings'): click.echo("\n⚠️ Warnings:") for warning in result['warnings']: click.echo(f" - {warning}") else: click.echo(f"❌ Uninstall failed: {result.get('error', 'Unknown error')}") if result.get('details'): click.echo(f" Details: {result['details']}") # Create service group for whisper @whisper.group("service") @click.help_option('-h', '--help', help='Show this message and exit') def whisper_service(): """Manage Whisper service.""" pass # Service commands under the group @whisper_service.command("status") def whisper_service_status(): """Show Whisper service status.""" from voice_mode.tools.service import status_service result = asyncio.run(status_service("whisper")) click.echo(result) @whisper_service.command("start") def whisper_service_start(): """Start Whisper service.""" from voice_mode.tools.service import start_service result = asyncio.run(start_service("whisper")) click.echo(result) @whisper_service.command("stop") def whisper_service_stop(): """Stop Whisper service.""" from voice_mode.tools.service import stop_service result = asyncio.run(stop_service("whisper")) click.echo(result) @whisper_service.command("restart") def whisper_service_restart(): """Restart Whisper service.""" from voice_mode.tools.service import restart_service result = asyncio.run(restart_service("whisper")) click.echo(result) @whisper_service.command("enable") def whisper_service_enable(): """Enable Whisper service to start at boot/login.""" from voice_mode.tools.service import enable_service result = asyncio.run(enable_service("whisper")) click.echo(result) @whisper_service.command("disable") def whisper_service_disable(): """Disable Whisper service from starting at boot/login.""" from voice_mode.tools.service import disable_service result = asyncio.run(disable_service("whisper")) click.echo(result) @whisper_service.command("logs") @click.help_option('-h', '--help') @click.option('--lines', '-n', default=50, help='Number of log lines to show') def whisper_service_logs(lines): """View Whisper service logs.""" from voice_mode.tools.service import view_logs result = asyncio.run(view_logs("whisper", lines)) click.echo(result) @whisper_service.command("update-files") def whisper_update_service_files(): """Update Whisper service files to latest version.""" from voice_mode.tools.service import update_service_files result = asyncio.run(update_service_files("whisper")) click.echo(result) @whisper_service.command("health") def whisper_service_health(): """Check Whisper health endpoint.""" import subprocess try: result = subprocess.run( ["curl", "-s", "http://127.0.0.1:2022/health"], capture_output=True, text=True, timeout=5 ) if result.returncode == 0: import json try: health_data = json.loads(result.stdout) click.echo("✅ Whisper is responding") click.echo(f" Status: {health_data.get('status', 'unknown')}") if 'uptime' in health_data: click.echo(f" Uptime: {health_data['uptime']}") except json.JSONDecodeError: click.echo("✅ Whisper is responding (non-JSON response)") else: click.echo("❌ Whisper not responding on port 2022") except subprocess.TimeoutExpired: click.echo("❌ Whisper health check timed out") except Exception as e: click.echo(f"❌ Health check failed: {e}") @whisper_service.command("install") @click.help_option('-h', '--help') @click.option('--install-dir', help='Directory to install whisper.cpp') @click.option('--model', default=DEFAULT_WHISPER_MODEL, help=f'Whisper model to download (default: {DEFAULT_WHISPER_MODEL})') @click.option('--use-gpu/--no-gpu', default=None, help='Enable GPU support if available') @click.option('--force', '-f', is_flag=True, help='Force reinstall even if already installed') @click.option('--version', default='latest', help='Version to install (default: latest)') @click.option('--auto-enable/--no-auto-enable', default=None, help='Enable service at boot/login') @click.option('--skip-deps', is_flag=True, help='Skip dependency checks (for advanced users)') def whisper_service_install(install_dir, model, use_gpu, force, version, auto_enable, skip_deps): """Install whisper.cpp STT service with automatic system detection.""" from voice_mode.tools.whisper.install import whisper_install result = asyncio.run(whisper_install.fn( install_dir=install_dir, model=model, use_gpu=use_gpu, force_reinstall=force, version=version, auto_enable=auto_enable, skip_deps=skip_deps )) if result.get('success'): if result.get('already_installed'): click.echo(f"✅ Whisper already installed at {result['install_path']}") click.echo(f" Version: {result.get('version', 'unknown')}") else: click.echo("✅ Whisper installed successfully!") click.echo(f" Install path: {result['install_path']}") click.echo(f" Version: {result.get('version', 'unknown')}") if result.get('gpu_enabled'): click.echo(" GPU support: Enabled") if result.get('model_downloaded'): click.echo(f" Model: {result.get('model', 'unknown')}") if result.get('enabled'): click.echo(" Auto-start: Enabled") if result.get('migration_message'): click.echo(f"\n{result['migration_message']}") if result.get('next_steps'): click.echo("\nNext steps:") for step in result['next_steps']: click.echo(f" - {step}") else: click.echo(f"❌ Installation failed: {result.get('error', 'Unknown error')}") if result.get('details'): click.echo(f" Details: {result['details']}") @whisper_service.command("uninstall") @click.help_option('-h', '--help') @click.option('--remove-models', is_flag=True, help='Also remove downloaded Whisper models') @click.option('--remove-all-data', is_flag=True, help='Remove all Whisper data including logs and transcriptions') @click.confirmation_option(prompt='Are you sure you want to uninstall Whisper?') def whisper_service_uninstall(remove_models, remove_all_data): """Uninstall whisper.cpp and optionally remove models and data.""" from voice_mode.tools.whisper.uninstall import whisper_uninstall result = asyncio.run(whisper_uninstall.fn( remove_models=remove_models, remove_all_data=remove_all_data )) if result.get('success'): click.echo("✅ Whisper uninstalled successfully!") if result.get('service_stopped'): click.echo(" Service stopped") if result.get('service_disabled'): click.echo(" Service disabled") if result.get('install_removed'): click.echo(f" Installation removed: {result['install_path']}") if result.get('models_removed'): click.echo(" Models removed") if result.get('data_removed'): click.echo(" All data removed") if result.get('warnings'): click.echo("\n⚠️ Warnings:") for warning in result['warnings']: click.echo(f" - {warning}") else: click.echo(f"❌ Uninstall failed: {result.get('error', 'Unknown error')}") if result.get('details'): click.echo(f" Details: {result['details']}") # Import the unified model command from voice_mode.whisper_model_unified import whisper_model_unified # Add it directly to the whisper group whisper.add_command(whisper_model_unified, name="model") # Backward compatibility: Add hidden aliases for old direct commands # These allow "whisper start" to work as "whisper service start" @whisper.command("status", hidden=True) @click.pass_context def whisper_status_alias(ctx): """(Deprecated) Show Whisper service status. Use 'whisper service status' instead.""" ctx.forward(whisper_service_status) @whisper.command("start", hidden=True) @click.pass_context def whisper_start_alias(ctx): """(Deprecated) Start Whisper service. Use 'whisper service start' instead.""" ctx.forward(whisper_service_start) @whisper.command("stop", hidden=True) @click.pass_context def whisper_stop_alias(ctx): """(Deprecated) Stop Whisper service. Use 'whisper service stop' instead.""" ctx.forward(whisper_service_stop) @whisper.command("restart", hidden=True) @click.pass_context def whisper_restart_alias(ctx): """(Deprecated) Restart Whisper service. Use 'whisper service restart' instead.""" ctx.forward(whisper_service_restart) @whisper.command("enable", hidden=True) @click.pass_context def whisper_enable_alias(ctx): """(Deprecated) Enable Whisper service. Use 'whisper service enable' instead.""" ctx.forward(whisper_service_enable) @whisper.command("disable", hidden=True) @click.pass_context def whisper_disable_alias(ctx): """(Deprecated) Disable Whisper service. Use 'whisper service disable' instead.""" ctx.forward(whisper_service_disable) @whisper.command("logs", hidden=True) @click.help_option('-h', '--help') @click.option('--lines', '-n', default=50, help='Number of log lines to show') @click.pass_context def whisper_logs_alias(ctx, lines): """(Deprecated) View Whisper logs. Use 'whisper service logs' instead.""" ctx.forward(whisper_service_logs, lines=lines) @whisper.command("health", hidden=True) @click.pass_context def whisper_health_alias(ctx): """(Deprecated) Check Whisper health. Use 'whisper service health' instead.""" ctx.forward(whisper_service_health) @whisper.command("install", hidden=True) @click.help_option('-h', '--help') @click.option('--install-dir', help='Directory to install whisper.cpp') @click.option('--model', default=DEFAULT_WHISPER_MODEL, help=f'Whisper model to download (default: {DEFAULT_WHISPER_MODEL})') @click.option('--use-gpu/--no-gpu', default=None, help='Enable GPU support if available') @click.option('--force', '-f', is_flag=True, help='Force reinstall even if already installed') @click.option('--version', default='latest', help='Version to install (default: latest)') @click.option('--auto-enable/--no-auto-enable', default=None, help='Enable service at boot/login') @click.option('--skip-deps', is_flag=True, help='Skip dependency checks (for advanced users)') @click.pass_context def whisper_install_alias(ctx, install_dir, model, use_gpu, force, version, auto_enable, skip_deps): """(Deprecated) Install Whisper. Use 'whisper service install' instead.""" ctx.forward(whisper_service_install, install_dir=install_dir, model=model, use_gpu=use_gpu, force=force, version=version, auto_enable=auto_enable, skip_deps=skip_deps) @whisper.command("uninstall", hidden=True) @click.help_option('-h', '--help') @click.option('--remove-models', is_flag=True, help='Also remove downloaded Whisper models') @click.option('--remove-all-data', is_flag=True, help='Remove all Whisper data including logs and transcriptions') @click.confirmation_option(prompt='Are you sure you want to uninstall Whisper?') @click.pass_context def whisper_uninstall_alias(ctx, remove_models, remove_all_data): """(Deprecated) Uninstall Whisper. Use 'whisper service uninstall' instead.""" ctx.forward(whisper_service_uninstall, remove_models=remove_models, remove_all_data=remove_all_data) # Old subcommand structure removed - replaced by unified model command # The old @whisper_model group and all its subcommands have been replaced # by the unified whisper_model_unified command above # Note: The old model group commands (list, active, install, remove, benchmark) # have been removed in favor of the unified model command that works as: # voicemode whisper model # show current # voicemode whisper model --all # list all # voicemode whisper model <name> # set/install model # Skip the old definitions to prevent errors ''' def whisper_model_list(): """List available Whisper models and their installation status. Shows all available models with: - Installation status (installed/available) - Core ML acceleration status on Apple Silicon - File sizes - Language support - Performance characteristics """ from voice_mode.tools.whisper.models import ( WHISPER_MODEL_REGISTRY, get_model_directory, get_active_model, is_whisper_model_installed, get_installed_whisper_models, format_size, has_whisper_coreml_model ) model_dir = get_model_directory() current_model = get_active_model() installed_models = get_installed_whisper_models() # Calculate totals total_installed_size = sum( (model_dir / f"ggml-{name}.bin").stat().st_size for name in installed_models if (model_dir / f"ggml-{name}.bin").exists() ) total_available_size = sum( info["size_mb"] * 1024 * 1024 for info in WHISPER_MODEL_REGISTRY.values() ) click.echo("\nWhisper Models:\n") # Display each model for model_name, model_info in WHISPER_MODEL_REGISTRY.items(): # Check installation status is_installed = is_whisper_model_installed(model_name) has_coreml = has_whisper_coreml_model(model_name) # Status indicator if is_installed and has_coreml: status = "[✓ Installed+ML]" elif is_installed: status = "[✓ Installed]" else: status = "[ Download ]" # Active model indicator prefix = "→ " if model_name == current_model else " " # Format size size_mb = model_info["size_mb"] if size_mb >= 1000: size_str = f"{size_mb / 1000:.1f} GB" else: size_str = f"{size_mb} MB" # Format description desc = model_info["description"] if model_name == current_model: desc += " (active)" # Print model line click.echo( f"{prefix}{model_name:15} {status:16} {size_str:7} " f"{model_info['languages']:20} {desc}" ) # Show summary click.echo(f"\nModels directory: {model_dir}") if total_installed_size > 0: click.echo( f"Total size: {format_size(total_installed_size)} installed / " f"{format_size(total_available_size)} available" ) click.echo("\nTo download a model: voicemode whisper model install <model-name>") click.echo("To set default model: voicemode whisper model active <model-name>") @whisper_model.command("active") @click.help_option('-h', '--help') @click.argument('model_name', required=False) def whisper_model_active(model_name): """Show or set the active Whisper model. Without arguments: Shows the current active model With MODEL_NAME: Sets the active model (updates VOICEMODE_WHISPER_MODEL) """ from voice_mode.tools.whisper.models import ( get_active_model, WHISPER_MODEL_REGISTRY, is_whisper_model_installed, set_active_model ) import os import subprocess if model_name: # Set model mode if model_name not in WHISPER_MODEL_REGISTRY: click.echo(f"Error: '{model_name}' is not a valid model.", err=True) click.echo("\nAvailable models:", err=True) for name in WHISPER_MODEL_REGISTRY.keys(): click.echo(f" - {name}", err=True) return # Check if model is installed if not is_whisper_model_installed(model_name): click.echo(f"Error: Model '{model_name}' is not installed.", err=True) click.echo(f"Install it with: voicemode whisper model install {model_name}", err=True) raise click.Abort() # Get previous model previous_model = get_active_model() # Update the configuration file set_active_model(model_name) click.echo(f"✓ Active model set to: {model_name}") if previous_model != model_name: click.echo(f" (was: {previous_model})") # Check if whisper service is running try: result = subprocess.run(['pgrep', '-f', 'whisper-server'], capture_output=True) if result.returncode == 0: # Service is running click.echo(f"\n⚠️ Please restart the whisper service for changes to take effect:") click.echo(f" {click.style('voicemode whisper restart', fg='yellow', bold=True)}") else: click.echo(f"\nWhisper service is not running. Start it with:") click.echo(f" voicemode whisper start") click.echo(f"(or restart the whisper service if it's managed by systemd/launchd)") except: click.echo(f"\nPlease restart the whisper service for changes to take effect:") click.echo(f" voicemode whisper restart") else: # Show current model current = get_active_model() # Check if current model is installed installed = is_whisper_model_installed(current) status = click.style("[✓ Installed]", fg="green") if installed else click.style("[Not installed]", fg="red") # Get model info model_info = WHISPER_MODEL_REGISTRY.get(current, {}) click.echo(f"\nActive Whisper model: {click.style(current, fg='yellow', bold=True)} {status}") if model_info: click.echo(f" Size: {model_info.get('size_mb', 'Unknown')} MB") click.echo(f" Languages: {model_info.get('languages', 'Unknown')}") click.echo(f" Description: {model_info.get('description', 'Unknown')}") # Check what model the service is actually using try: result = subprocess.run(['pgrep', '-f', 'whisper-server'], capture_output=True) if result.returncode == 0: # Service is running, could check its actual model here click.echo(f"\nWhisper service status: {click.style('Running', fg='green')}") except: pass click.echo(f"\nTo change: voicemode whisper model active <model-name>") click.echo(f"To list all models: voicemode whisper models") @whisper.command("models", hidden=True) # Hidden - use 'whisper model list' instead def whisper_models(): """List available Whisper models and their installation status. DEPRECATED: Use 'voicemode whisper model list' instead. """ from voice_mode.tools.whisper.models import ( WHISPER_MODEL_REGISTRY, get_model_directory, get_active_model, is_whisper_model_installed, get_installed_whisper_models, format_size, has_whisper_coreml_model ) model_dir = get_model_directory() current_model = get_active_model() installed_models = get_installed_whisper_models() # Calculate totals total_installed_size = sum( WHISPER_MODEL_REGISTRY[m]["size_mb"] for m in installed_models ) total_available_size = sum( m["size_mb"] for m in WHISPER_MODEL_REGISTRY.values() ) # Print header click.echo("\nWhisper Models:") click.echo("") # Print models table for model_name, info in WHISPER_MODEL_REGISTRY.items(): # Check status is_installed = is_whisper_model_installed(model_name) is_current = model_name == current_model # Format status if is_current: status = click.style("→", fg="yellow", bold=True) model_display = click.style(f"{model_name:15}", fg="yellow", bold=True) else: status = " " model_display = f"{model_name:15}" # Format installation status if is_installed: # Check for Core ML model if has_whisper_coreml_model(model_name): install_status = click.style("[✓ Installed+ML]", fg="green") else: install_status = click.style("[✓ Installed]", fg="green") else: install_status = click.style("[ Download ]", fg="bright_black") # Format size size_str = format_size(info["size_mb"]).rjust(8) # Format languages lang_str = f"{info['languages']:20}" # Format description desc = info['description'] if is_current: desc += " (Currently selected)" desc = click.style(desc, fg="yellow") # Print row click.echo(f"{status} {model_display} {install_status:18} {size_str} {lang_str} {desc}") # Print footer click.echo("") click.echo(f"Models directory: {model_dir}") click.echo(f"Total size: {format_size(total_installed_size)} installed / {format_size(total_available_size)} available") click.echo("") click.echo("To download a model: voicemode whisper model install <model-name>") click.echo("To set default model: voicemode whisper model <model-name>") @whisper_model.command("install") @click.help_option('-h', '--help') @click.argument('model', default=DEFAULT_WHISPER_MODEL) @click.option('--force', '-f', is_flag=True, help='Re-download even if model exists') @click.option('--skip-core-ml', is_flag=True, help='Skip Core ML conversion on Apple Silicon') def whisper_model_install(model, force, skip_core_ml): """Install Whisper model(s) with automatic Core ML support on Apple Silicon. MODEL can be a model name (e.g., 'base'), 'all' to download all models, or omitted to use the default (base). Available models: tiny, tiny.en, base, base.en, small, small.en, medium, medium.en, large-v1, large-v2, large-v3, large-v3-turbo """ import json import voice_mode.tools.whisper.model_install as install_module # Get the actual function from the MCP tool wrapper tool = install_module.whisper_model_install install_func = tool.fn if hasattr(tool, 'fn') else tool # Call the install function result = asyncio.run(install_func( model=model, force_download=force, skip_core_ml=skip_core_ml )) try: # Parse JSON response data = json.loads(result) # Core ML is now automatic with pre-built models - no prompts needed! if data.get('success'): click.echo("✅ Model download completed!") if 'results' in data: for model_result in data['results']: click.echo(f"\n📦 {model_result['model']}:") if model_result.get('already_exists') and not force: click.echo(" Already downloaded") else: click.echo(" Downloaded successfully") if model_result.get('core_ml_converted'): click.echo(" Core ML: Converted") elif model_result.get('core_ml_exists'): click.echo(" Core ML: Already exists") if 'models_dir' in data: click.echo(f"\nModels location: {data['models_dir']}") else: click.echo(f"❌ Download failed: {data.get('error', 'Unknown error')}") if 'available_models' in data: click.echo("\nAvailable models:") for m in data['available_models']: click.echo(f" - {m}") except json.JSONDecodeError: click.echo(result) @whisper_model.command("remove") @click.help_option('-h', '--help') @click.argument('model') @click.option('--force', '-f', is_flag=True, help='Remove without confirmation') def whisper_model_remove(model, force): """Remove an installed Whisper model. MODEL is the name of the model to remove (e.g., 'large-v2'). """ from voice_mode.tools.whisper.models import ( WHISPER_MODEL_REGISTRY, is_whisper_model_installed, get_model_directory, get_active_model ) import os # Validate model name if model not in WHISPER_MODEL_REGISTRY: click.echo(f"Error: '{model}' is not a valid model.", err=True) click.echo("\nAvailable models:", err=True) for name in WHISPER_MODEL_REGISTRY.keys(): click.echo(f" - {name}", err=True) ctx.exit(1) # Check if model is installed if not is_whisper_model_installed(model): click.echo(f"Model '{model}' is not installed.") return # Check if it's the current model current = get_active_model() if model == current: click.echo(f"Warning: '{model}' is the currently selected model.", err=True) if not force: if not click.confirm("Do you still want to remove it?"): return # Get model path model_dir = get_model_directory() model_info = WHISPER_MODEL_REGISTRY[model] model_path = model_dir / model_info["filename"] # Also check for Core ML models coreml_path = model_dir / f"ggml-{model}-encoder.mlmodelc" # Confirm removal if not forced if not force: size_mb = model_info["size_mb"] if not click.confirm(f"Remove {model} ({size_mb} MB)?"): return # Remove the model file try: if model_path.exists(): os.remove(model_path) click.echo(f"✓ Removed model: {model}") # Remove Core ML model if exists if coreml_path.exists(): import shutil shutil.rmtree(coreml_path) click.echo(f"✓ Removed Core ML model: {model}") click.echo(f"\nModel '{model}' has been removed.") except Exception as e: click.echo(f"Error removing model: {e}", err=True) @whisper_model.command("benchmark") @click.help_option('-h', '--help') @click.option('--models', default='installed', help='Models to benchmark: installed, all, or comma-separated list') @click.option('--sample', help='Audio file to use for benchmarking') @click.option('--runs', default=1, help='Number of benchmark runs per model') def whisper_model_benchmark_cmd(models, sample, runs): """Benchmark Whisper model performance. Runs performance tests on specified models to help choose the optimal model for your use case based on speed vs accuracy trade-offs. """ from voice_mode.tools.whisper.model_benchmark import whisper_model_benchmark # Parse models parameter if ',' in models: model_list = [m.strip() for m in models.split(',')] else: model_list = models # Run benchmark result = asyncio.run(whisper_model_benchmark( models=model_list, sample_file=sample, runs=runs )) if not result.get('success'): click.echo(f"❌ Benchmark failed: {result.get('error', 'Unknown error')}", err=True) return # Display results click.echo("\n" + "="*60) click.echo("Whisper Model Benchmark Results") click.echo("="*60) if result.get('sample_file'): click.echo(f"Sample: {result['sample_file']}") if result.get('runs_per_model') > 1: click.echo(f"Runs per model: {result['runs_per_model']} (showing best)") click.echo("") # Display benchmark table click.echo(f"{'Model':<20} {'Load (ms)':<12} {'Encode (ms)':<12} {'Total (ms)':<12} {'Speed':<10}") click.echo("-"*70) for bench in result.get('benchmarks', []): if bench.get('success'): model = bench['model'] load_time = f"{bench.get('load_time_ms', 0):.1f}" encode_time = f"{bench.get('encode_time_ms', 0):.1f}" total_time = f"{bench.get('total_time_ms', 0):.1f}" rtf = f"{bench.get('real_time_factor', 0):.1f}x" # Highlight fastest model if bench['model'] == result.get('fastest_model'): model = click.style(model, fg='green', bold=True) rtf = click.style(rtf, fg='green', bold=True) click.echo(f"{model:<20} {load_time:<12} {encode_time:<12} {total_time:<12} {rtf:<10}") else: click.echo(f"{bench['model']:<20} {'Failed':<12} {bench.get('error', 'Unknown error')}") # Display recommendations if result.get('recommendations'): click.echo("\nRecommendations:") for rec in result['recommendations']: click.echo(f" • {rec}") # Summary if result.get('fastest_model'): click.echo(f"\nFastest model: {click.style(result['fastest_model'], fg='yellow', bold=True)}") click.echo(f"Processing time: {result.get('fastest_time_ms', 'N/A')} ms") click.echo("\nNote: Speed values show real-time factor (higher is better)") click.echo(" 1.0x = real-time, 10x = 10 times faster than real-time") ''' # End of old model subcommands # LiveKit service commands @livekit.command() def status(): """Show LiveKit service status.""" from voice_mode.tools.service import status_service result = asyncio.run(status_service("livekit")) click.echo(result) @livekit.command() def start(): """Start LiveKit service.""" from voice_mode.tools.service import start_service result = asyncio.run(start_service("livekit")) click.echo(result) @livekit.command() def stop(): """Stop LiveKit service.""" from voice_mode.tools.service import stop_service result = asyncio.run(stop_service("livekit")) click.echo(result) @livekit.command() def restart(): """Restart LiveKit service.""" from voice_mode.tools.service import restart_service result = asyncio.run(restart_service("livekit")) click.echo(result) @livekit.command() def enable(): """Enable LiveKit service to start at boot/login.""" from voice_mode.tools.service import enable_service result = asyncio.run(enable_service("livekit")) click.echo(result) @livekit.command() def disable(): """Disable LiveKit service from starting at boot/login.""" from voice_mode.tools.service import disable_service result = asyncio.run(disable_service("livekit")) click.echo(result) @livekit.command() @click.help_option('-h', '--help') @click.option('--lines', '-n', default=50, help='Number of log lines to show') def logs(lines): """View LiveKit service logs.""" from voice_mode.tools.service import view_logs result = asyncio.run(view_logs("livekit", lines)) click.echo(result) @livekit.command() def update(): """Update LiveKit service files to the latest version.""" from voice_mode.tools.service import update_service_files result = asyncio.run(update_service_files("livekit")) if result.get("success"): click.echo("✅ LiveKit service files updated successfully") if result.get("message"): click.echo(f" {result['message']}") else: click.echo(f"❌ {result.get('message', 'Update failed')}") @livekit.command() @click.help_option('-h', '--help') @click.option('--install-dir', help='Directory to install LiveKit') @click.option('--port', default=7880, help='Port for LiveKit server (default: 7880)') @click.option('--force', '-f', is_flag=True, help='Force reinstall even if already installed') @click.option('--version', default='latest', help='Version to install (default: latest)') @click.option('--auto-enable/--no-auto-enable', default=None, help='Enable service at boot/login') def install(install_dir, port, force, version, auto_enable): """Install LiveKit server with development configuration.""" from voice_mode.tools.livekit.install import livekit_install result = asyncio.run(livekit_install.fn( install_dir=install_dir, port=port, force_reinstall=force, version=version, auto_enable=auto_enable )) if result.get('success'): if result.get('already_installed'): click.echo(f"✅ LiveKit already installed at {result['install_path']}") click.echo(f" Version: {result.get('version', 'unknown')}") else: click.echo("✅ LiveKit installed successfully!") click.echo(f" Version: {result.get('version', 'unknown')}") click.echo(f" Install path: {result['install_path']}") click.echo(f" Config: {result['config_path']}") click.echo(f" Port: {result['port']}") click.echo(f" URL: {result['url']}") click.echo(f" Dev credentials: {result['dev_key']} / {result['dev_secret']}") if result.get('service_installed'): click.echo(" Service installed") if result.get('service_enabled'): click.echo(" Service enabled (will start at boot/login)") else: click.echo(f"❌ Installation failed: {result.get('error', 'Unknown error')}") if result.get('details'): click.echo(f" Details: {result['details']}") @livekit.command() @click.help_option('-h', '--help') @click.option('--remove-config', is_flag=True, help='Also remove LiveKit configuration files') @click.option('--remove-all-data', is_flag=True, help='Remove all LiveKit data including logs') @click.confirmation_option(prompt='Are you sure you want to uninstall LiveKit?') def uninstall(remove_config, remove_all_data): """Uninstall LiveKit server and optionally remove configuration and data.""" from voice_mode.tools.livekit.uninstall import livekit_uninstall result = asyncio.run(livekit_uninstall.fn( remove_config=remove_config, remove_all_data=remove_all_data )) if result.get('success'): click.echo("✅ LiveKit uninstalled successfully!") if result.get('removed_items'): click.echo("\n📦 Removed:") for item in result['removed_items']: click.echo(f" ✓ {item}") if result.get('warnings'): click.echo("\n⚠️ Warnings:") for warning in result['warnings']: click.echo(f" - {warning}") else: click.echo(f"❌ Uninstall failed: {result.get('error', 'Unknown error')}") # LiveKit frontend subcommands @livekit.group() @click.help_option('-h', '--help', help='Show this message and exit') def frontend(): """Manage LiveKit Voice Assistant Frontend.""" pass @frontend.command("install") @click.help_option('-h', '--help') @click.option('--auto-enable/--no-auto-enable', default=None, help='Enable service after installation (default: from config)') def frontend_install(auto_enable): """Install and setup LiveKit Voice Assistant Frontend.""" from voice_mode.tools.livekit.frontend import livekit_frontend_install result = asyncio.run(livekit_frontend_install.fn(auto_enable=auto_enable)) if result.get('success'): click.echo("✅ LiveKit Frontend setup completed!") click.echo(f" Frontend directory: {result['frontend_dir']}") click.echo(f" Log directory: {result['log_dir']}") click.echo(f" Node.js available: {result['node_available']}") if result.get('node_path'): click.echo(f" Node.js path: {result['node_path']}") click.echo(f" Service installed: {result['service_installed']}") click.echo(f" Service enabled: {result['service_enabled']}") click.echo(f" URL: {result['url']}") click.echo(f" Password: {result['password']}") if result.get('service_enabled'): click.echo("\n💡 Frontend service is enabled and will start automatically at boot/login") else: click.echo("\n💡 Run 'voicemode livekit frontend enable' to start automatically at boot/login") else: click.echo(f"❌ Frontend installation failed: {result.get('error', 'Unknown error')}") @frontend.command("start") @click.help_option('-h', '--help') @click.option('--port', default=3000, help='Port to run frontend on (default: 3000)') @click.option('--host', default='127.0.0.1', help='Host to bind to (default: 127.0.0.1)') def frontend_start(port, host): """Start the LiveKit Voice Assistant Frontend.""" from voice_mode.tools.livekit.frontend import livekit_frontend_start result = asyncio.run(livekit_frontend_start.fn(port=port, host=host)) if result.get('success'): click.echo("✅ LiveKit Frontend started successfully!") click.echo(f" URL: {result['url']}") click.echo(f" Password: {result['password']}") click.echo(f" PID: {result['pid']}") click.echo(f" Directory: {result['directory']}") else: error_msg = result.get('error', 'Unknown error') click.echo(f"❌ Failed to start frontend: {error_msg}") if "Cannot find module" in error_msg or "dependencies" in error_msg.lower(): click.echo("") click.echo("💡 Try fixing dependencies with:") click.echo(" ./bin/fix-frontend-deps.sh") click.echo(" or manually: cd vendor/livekit-voice-assistant/voice-assistant-frontend && pnpm install") @frontend.command("stop") def frontend_stop(): """Stop the LiveKit Voice Assistant Frontend.""" from voice_mode.tools.livekit.frontend import livekit_frontend_stop result = asyncio.run(livekit_frontend_stop.fn()) if result.get('success'): click.echo(f"✅ {result['message']}") else: click.echo(f"❌ Failed to stop frontend: {result.get('error', 'Unknown error')}") @frontend.command("status") def frontend_status(): """Check status of the LiveKit Voice Assistant Frontend.""" from voice_mode.tools.livekit.frontend import livekit_frontend_status result = asyncio.run(livekit_frontend_status.fn()) if 'error' in result: click.echo(f"❌ Error: {result['error']}") return if result.get('running'): click.echo("✅ Frontend is running") click.echo(f" PID: {result['pid']}") click.echo(f" URL: {result['url']}") else: click.echo("❌ Frontend is not running") click.echo(f" Directory: {result.get('directory', 'Not found')}") if result.get('configuration'): click.echo(" Configuration:") for key, value in result['configuration'].items(): click.echo(f" {key}: {value}") @frontend.command("open") def frontend_open(): """Open the LiveKit Voice Assistant Frontend in your browser. Starts the frontend if not already running, then opens it in the default browser. """ from voice_mode.tools.livekit.frontend import livekit_frontend_open result = asyncio.run(livekit_frontend_open.fn()) if result.get('success'): click.echo("✅ Frontend opened in browser!") click.echo(f" URL: {result['url']}") click.echo(f" Password: {result['password']}") if result.get('hint'): click.echo(f" 💡 {result['hint']}") else: click.echo(f"❌ Failed to open frontend: {result.get('error', 'Unknown error')}") @frontend.command("logs") @click.help_option('-h', '--help') @click.option("--lines", "-n", default=50, help="Number of lines to show (default: 50)") @click.option("--follow", "-f", is_flag=True, help="Follow log output (tail -f)") def frontend_logs(lines, follow): """View LiveKit Voice Assistant Frontend logs. Shows the last N lines of frontend logs. Use --follow to tail the logs. """ if follow: # For following, run tail -f directly from voice_mode.tools.livekit.frontend import livekit_frontend_logs result = asyncio.run(livekit_frontend_logs.fn(follow=True)) if result.get('success'): click.echo(f"📂 Log file: {result['log_file']}") click.echo("🔄 Following logs (press Ctrl+C to stop)...") try: import subprocess subprocess.run(["tail", "-f", result['log_file']]) except KeyboardInterrupt: click.echo("\n✅ Stopped following logs") else: click.echo(f"❌ Error: {result.get('error', 'Unknown error')}") else: # Show last N lines from voice_mode.tools.livekit.frontend import livekit_frontend_logs result = asyncio.run(livekit_frontend_logs.fn(lines=lines, follow=False)) if result.get('success'): click.echo(f"📂 Log file: {result['log_file']}") click.echo(f"📄 Showing last {result['lines_shown']} lines:") click.echo("─" * 60) click.echo(result['logs']) else: click.echo(f"❌ Error: {result.get('error', 'Unknown error')}") @frontend.command("enable") def frontend_enable(): """Enable frontend service to start automatically at boot/login.""" from voice_mode.tools.service import enable_service result = asyncio.run(enable_service("frontend")) # enable_service returns a string, not a dict click.echo(result) @frontend.command("disable") def frontend_disable(): """Disable frontend service from starting automatically.""" from voice_mode.tools.service import disable_service result = asyncio.run(disable_service("frontend")) # disable_service returns a string, not a dict click.echo(result) @frontend.command("build") @click.help_option('-h', '--help') @click.option('--force', '-f', is_flag=True, help='Force rebuild even if build exists') def frontend_build(force): """Build frontend for production (requires Node.js).""" import subprocess from pathlib import Path frontend_dir = Path(__file__).parent / "frontend" if not frontend_dir.exists(): click.echo("❌ Frontend directory not found") return build_dir = frontend_dir / ".next" if build_dir.exists() and not force: click.echo("✅ Frontend already built. Use --force to rebuild.") click.echo(f" Build directory: {build_dir}") return click.echo("🔨 Building frontend for production...") # Check Node.js availability try: subprocess.run(["node", "--version"], capture_output=True, check=True) except (subprocess.CalledProcessError, FileNotFoundError): click.echo("❌ Node.js not found. Please install Node.js to build the frontend.") return # Change to frontend directory and build import os original_cwd = os.getcwd() try: os.chdir(frontend_dir) # Install dependencies if needed if not (frontend_dir / "node_modules").exists(): click.echo("📦 Installing dependencies...") subprocess.run(["npm", "install"], check=True) # Build with production settings click.echo("🏗️ Building standalone production version...") env = os.environ.copy() env["BUILD_STANDALONE"] = "true" subprocess.run(["npm", "run", "build:standalone"], check=True, env=env) click.echo("✅ Frontend built successfully!") click.echo(f" Build directory: {build_dir}") click.echo(" Frontend will now start in production mode.") except subprocess.CalledProcessError as e: click.echo(f"❌ Build failed: {e}") except Exception as e: click.echo(f"❌ Unexpected error: {e}") finally: os.chdir(original_cwd) # Configuration management group @voice_mode_main_cli.group() @click.help_option('-h', '--help', help='Show this message and exit') def config(): """Manage voicemode configuration.""" pass @config.command("list") def config_list(): """List all configuration keys with their descriptions.""" from voice_mode.tools.configuration_management import list_config_keys result = asyncio.run(list_config_keys.fn()) click.echo(result) @config.command("get") @click.help_option('-h', '--help') @click.argument('key') def config_get(key): """Get a configuration value.""" import os from pathlib import Path # Read from the env file env_file = Path.home() / ".voicemode" / "voicemode.env" if not env_file.exists(): click.echo(f"❌ Configuration file not found: {env_file}") return # Look for the key found = False with open(env_file, 'r') as f: for line in f: line = line.strip() if line.startswith('#') or not line: continue if '=' in line: k, v = line.split('=', 1) if k.strip() == key: click.echo(f"{key}={v.strip()}") found = True break if not found: # Check environment variable env_value = os.getenv(key) if env_value is not None: click.echo(f"{key}={env_value} (from environment)") else: click.echo(f"❌ Configuration key not found: {key}") click.echo("Run 'voicemode config list' to see available keys") @config.command("set") @click.help_option('-h', '--help') @click.argument('key') @click.argument('value') def config_set(key, value): """Set a configuration value.""" from voice_mode.tools.configuration_management import update_config result = asyncio.run(update_config.fn(key, value)) click.echo(result) @config.command("edit") @click.help_option('-h', '--help') @click.option('--editor', help='Editor to use (overrides $EDITOR)') def config_edit(editor): """Open the configuration file in your default editor. Opens ~/.voicemode/voicemode.env in your configured editor. Uses $EDITOR environment variable by default, or you can specify with --editor. Examples: voicemode config edit # Use $EDITOR voicemode config edit --editor vim voicemode config edit --editor "code --wait" """ from pathlib import Path # Find the config file config_path = Path.home() / ".voicemode" / "voicemode.env" # Create default config if it doesn't exist if not config_path.exists(): config_path.parent.mkdir(parents=True, exist_ok=True) from voice_mode.config import load_voicemode_env # This will create the default config load_voicemode_env() # Determine which editor to use if editor: editor_cmd = editor else: # Try environment variables in order of preference editor_cmd = ( os.environ.get('EDITOR') or os.environ.get('VISUAL') or shutil.which('nano') or shutil.which('vim') or shutil.which('vi') ) if not editor_cmd: click.echo("❌ No editor found. Please set $EDITOR or use --editor") click.echo(" Example: export EDITOR=vim") click.echo(" Or use: voicemode config edit --editor vim") return # Handle complex editor commands (e.g., "code --wait") if ' ' in editor_cmd: import shlex cmd_parts = shlex.split(editor_cmd) cmd = cmd_parts + [str(config_path)] else: cmd = [editor_cmd, str(config_path)] # Open the editor try: click.echo(f"Opening {config_path} in {editor_cmd}...") subprocess.run(cmd, check=True) click.echo("✅ Configuration file edited successfully") click.echo("\nChanges will take effect when voicemode is restarted.") except subprocess.CalledProcessError: click.echo(f"❌ Editor exited with an error") except FileNotFoundError: click.echo(f"❌ Editor not found: {editor_cmd}") click.echo(" Please check that the editor is installed and in your PATH") # Dependency management group @voice_mode_main_cli.command() @click.help_option('-h', '--help') @click.option('--component', type=click.Choice(['core', 'whisper', 'kokoro']), help='Check specific component only') @click.option('--yes', '-y', is_flag=True, help='Install without prompting') @click.option('--dry-run', is_flag=True, help='Show what would be installed') @click.option('--verbose', '-v', is_flag=True, help='Show full installation output') def deps(component, yes, dry_run, verbose): """Check and install system dependencies. Shows dependency status and offers to install missing ones. Checks core dependencies by default, or specify --component. Examples: voicemode deps # Check all dependencies voicemode deps --component whisper # Check whisper dependencies only voicemode deps --yes # Install without prompting voicemode deps --verbose # Show full installation output """ from voice_mode.utils.dependencies.checker import ( check_component_dependencies, load_dependencies, install_missing_dependencies ) deps_yaml = load_dependencies() components = [component] if component else ['core', 'whisper', 'kokoro'] all_missing = [] for comp in components: click.echo(f"\n{comp.capitalize()} Dependencies:") results = check_component_dependencies(comp, deps_yaml) if not results: click.echo(" (No required dependencies for this platform)") continue for pkg, installed in results.items(): status = "✓" if installed else "✗" click.echo(f" {status} {pkg}") if not installed: all_missing.append(pkg) if not all_missing: click.echo("\n✅ All dependencies satisfied") return if dry_run: click.echo(f"\nWould install: {', '.join(all_missing)}") return # Offer to install success, message = install_missing_dependencies( all_missing, interactive=not yes, verbose=verbose ) if success: click.echo("\n✅ Dependencies installed successfully") else: click.echo(f"\n❌ Installation failed: {message}") # Diagnostics group @voice_mode_main_cli.group() @click.help_option('-h', '--help', help='Show this message and exit') def diag(): """Diagnostic tools for voicemode.""" pass @diag.command() def info(): """Show voicemode installation information.""" from voice_mode.tools.diagnostics import voice_mode_info result = asyncio.run(voice_mode_info.fn()) click.echo(result) @diag.command() def devices(): """List available audio input and output devices.""" from voice_mode.tools.devices import check_audio_devices result = asyncio.run(check_audio_devices.fn()) click.echo(result) @diag.command() def registry(): """Show voice provider registry with all discovered endpoints.""" from voice_mode.tools.voice_registry import voice_registry result = asyncio.run(voice_registry.fn()) click.echo(result) @diag.command() def dependencies(): """Check system audio dependencies and provide installation guidance.""" import json from voice_mode.tools.dependencies import check_audio_dependencies result = asyncio.run(check_audio_dependencies.fn()) if isinstance(result, dict): # Format the dictionary output nicely click.echo("System Audio Dependencies Check") click.echo("=" * 50) click.echo(f"\nPlatform: {result.get('platform', 'Unknown')}") if 'packages' in result: click.echo("\nSystem Packages:") for pkg, status in result['packages'].items(): symbol = "✅" if status else "❌" click.echo(f" {symbol} {pkg}") if 'missing_packages' in result and result['missing_packages']: click.echo("\n❌ Missing Packages:") for pkg in result['missing_packages']: click.echo(f" - {pkg}") if 'install_command' in result: click.echo(f"\nInstall with: {result['install_command']}") if 'pulseaudio' in result: pa = result['pulseaudio'] click.echo(f"\nPulseAudio Status: {'✅ Running' if pa.get('running') else '❌ Not running'}") if pa.get('version'): click.echo(f" Version: {pa['version']}") if 'diagnostics' in result and result['diagnostics']: click.echo("\nDiagnostics:") for diag in result['diagnostics']: click.echo(f" - {diag}") if 'recommendations' in result and result['recommendations']: click.echo("\nRecommendations:") for rec in result['recommendations']: click.echo(f" - {rec}") else: # Fallback for string output click.echo(str(result)) # Legacy CLI for voicemode-cli command @click.group() @click.version_option() @click.help_option('-h', '--help') def cli(): """Voice Mode CLI - Manage conversations, view logs, and analyze voice interactions.""" pass # Import subcommand groups from voice_mode.cli_commands import exchanges as exchanges_cmd from voice_mode.cli_commands import transcribe as transcribe_cmd from voice_mode.cli_commands import pronounce_commands from voice_mode.cli_commands import claude from voice_mode.cli_commands import hook as hook_cmd # Add subcommands to legacy CLI cli.add_command(exchanges_cmd.exchanges) cli.add_command(transcribe_cmd.transcribe) cli.add_command(pronounce_commands.pronounce_group) cli.add_command(claude.claude_group) # Add exchanges to main CLI voice_mode_main_cli.add_command(exchanges_cmd.exchanges) voice_mode_main_cli.add_command(claude.claude_group) # Note: We'll add these commands after the groups are defined # audio group will get transcribe and play commands # claude group will get hook command # config group will get pronounce command # Now add the subcommands to their respective groups # Add transcribe command to audio group transcribe_audio_cmd = transcribe_cmd.transcribe.commands['audio'] transcribe_audio_cmd.name = 'transcribe' audio.add_command(transcribe_audio_cmd) # Add hooks command under claude group from voice_mode.cli_commands.hook import hooks claude.claude_group.add_command(hooks) # Add pronounce under config group config.add_command(pronounce_commands.pronounce_group) # Converse command - direct voice conversation from CLI @voice_mode_main_cli.command() @click.help_option('-h', '--help') @click.option('--message', '-m', default="Hello! How can I help you today?", help='Initial message to speak') @click.option('--wait/--no-wait', default=True, help='Wait for response after speaking') @click.option('--duration', '-d', type=float, default=DEFAULT_LISTEN_DURATION, help='Listen duration in seconds') @click.option('--min-duration', type=float, default=MIN_RECORDING_DURATION, help='Minimum listen duration before silence detection') @click.option('--transport', type=click.Choice(['auto', 'local', 'livekit']), default='auto', help='Transport method') @click.option('--room-name', default='', help='LiveKit room name (for livekit transport)') @click.option('--voice', help='TTS voice to use (e.g., nova, shimmer, af_sky)') @click.option('--tts-provider', type=click.Choice(['openai', 'kokoro']), help='TTS provider') @click.option('--tts-model', help='TTS model (e.g., tts-1, tts-1-hd)') @click.option('--tts-instructions', help='Tone/style instructions for gpt-4o-mini-tts') @click.option('--audio-feedback/--no-audio-feedback', default=None, help='Enable/disable audio feedback') @click.option('--audio-format', help='Audio format (pcm, mp3, wav, flac, aac, opus)') @click.option('--disable-silence-detection', is_flag=True, help='Disable silence detection') @click.option('--speed', type=float, help='Speech rate (0.25 to 4.0)') @click.option('--vad-aggressiveness', type=int, help='VAD aggressiveness (0-3)') @click.option('--skip-tts/--no-skip-tts', default=None, help='Skip TTS and only show text') @click.option('--continuous', '-c', is_flag=True, help='Continuous conversation mode') def converse(message, wait, duration, min_duration, transport, room_name, voice, tts_provider, tts_model, tts_instructions, audio_feedback, audio_format, disable_silence_detection, speed, vad_aggressiveness, skip_tts, continuous): """Have a voice conversation directly from the command line. Examples: # Simple conversation voicemode converse # Speak a message without waiting voicemode converse -m "Hello there!" --no-wait # Continuous conversation mode voicemode converse --continuous # Use specific voice voicemode converse --voice nova """ # Check core dependencies before running from voice_mode.utils.dependencies.checker import check_component_dependencies results = check_component_dependencies('core') missing = [pkg for pkg, installed in results.items() if not installed] if missing: click.echo(f"⚠️ Missing core dependencies: {', '.join(missing)}") click.echo(" Run 'voicemode deps' to install them") return from voice_mode.tools.converse import converse as converse_fn async def run_conversation(): """Run the conversation asynchronously.""" # Suppress the spurious aiohttp warning that appears on startup # This warning is a false positive from asyncio detecting an unclosed # session that was likely created during module import import logging logging.getLogger('asyncio').setLevel(logging.CRITICAL) # Enable INFO logging for converse command to show progress logging.getLogger('voicemode').setLevel(logging.INFO) try: if continuous: # Continuous conversation mode click.echo("🎤 Starting continuous conversation mode...") click.echo(" Press Ctrl+C to exit\n") # First message result = await converse_fn.fn( message=message, wait_for_response=True, listen_duration_max=duration, listen_duration_min=min_duration, transport=transport, room_name=room_name, voice=voice, tts_provider=tts_provider, tts_model=tts_model, tts_instructions=tts_instructions, chime_enabled=audio_feedback, audio_format=audio_format, disable_silence_detection=disable_silence_detection, speed=speed, vad_aggressiveness=vad_aggressiveness, skip_tts=skip_tts ) if result and "Voice response:" in result: click.echo(f"You: {result.split('Voice response:')[1].split('|')[0].strip()}") # Continue conversation while True: # Wait for user's next input result = await converse_fn.fn( message="", # Empty message for listening only wait_for_response=True, listen_duration_max=duration, listen_duration_min=min_duration, transport=transport, room_name=room_name, voice=voice, tts_provider=tts_provider, tts_model=tts_model, tts_instructions=tts_instructions, chime_enabled=audio_feedback, audio_format=audio_format, disable_silence_detection=disable_silence_detection, speed=speed, vad_aggressiveness=vad_aggressiveness, skip_tts=skip_tts ) if result and "Voice response:" in result: user_text = result.split('Voice response:')[1].split('|')[0].strip() click.echo(f"You: {user_text}") # Check for exit commands if user_text.lower() in ['exit', 'quit', 'goodbye', 'bye']: await converse_fn.fn( message="Goodbye!", wait_for_response=False, voice=voice, tts_provider=tts_provider, tts_model=tts_model, audio_format=audio_format, speed=speed, skip_tts=skip_tts ) break else: # Single conversation result = await converse_fn.fn( message=message, wait_for_response=wait, listen_duration_max=duration, listen_duration_min=min_duration, transport=transport, room_name=room_name, voice=voice, tts_provider=tts_provider, tts_model=tts_model, tts_instructions=tts_instructions, chime_enabled=audio_feedback, audio_format=audio_format, disable_silence_detection=disable_silence_detection, speed=speed, vad_aggressiveness=vad_aggressiveness, skip_tts=skip_tts ) # Display result if result: if "Voice response:" in result: # Extract the response text and timing info parts = result.split('|') response_text = result.split('Voice response:')[1].split('|')[0].strip() timing_info = parts[1].strip() if len(parts) > 1 else "" click.echo(f"\n📢 Spoke: {message}") if wait: click.echo(f"🎤 Heard: {response_text}") if timing_info: click.echo(f"⏱️ {timing_info}") else: click.echo(result) except KeyboardInterrupt: click.echo("\n\n👋 Conversation ended") except Exception as e: click.echo(f"❌ Error: {e}", err=True) import traceback if os.environ.get('VOICEMODE_DEBUG'): traceback.print_exc() # Run the async function asyncio.run(run_conversation()) # Version command @voice_mode_main_cli.command() def version(): """Show VoiceMode version and check for updates.""" import requests # Use the same version that --version shows click.echo(f"VoiceMode version: {__version__}") # Check for updates if not in development mode if not ("dev" in __version__ or "dirty" in __version__): try: response = requests.get( "https://pypi.org/pypi/voice-mode/json", timeout=2 ) if response.status_code == 200: latest_version = response.json()["info"]["version"] # Simple version comparison (works for semantic versioning) if latest_version != __version__: click.echo(f"Latest version: {latest_version} available") click.echo("Run 'voicemode update' to update") else: click.echo("You are running the latest version") except (requests.RequestException, KeyError, ValueError): # Fail silently if we can't check for updates pass # Update command @voice_mode_main_cli.command() @click.help_option('-h', '--help') @click.option('--force', is_flag=True, help='Force reinstall even if already up to date') def update(force): """Update Voice Mode to the latest version. Automatically detects installation method (UV tool, UV pip, or regular pip) and uses the appropriate update command. """ import subprocess import requests from pathlib import Path from importlib.metadata import version as get_version, PackageNotFoundError def detect_uv_tool_installation(): """Detect if running from a UV tool installation.""" prefix_path = Path(sys.prefix).resolve() uv_tools_base = Path.home() / ".local" / "share" / "uv" / "tools" # Check if sys.prefix is within UV tools directory if uv_tools_base in prefix_path.parents or prefix_path.parent == uv_tools_base: # Find the tool directory tool_dir = prefix_path if prefix_path.parent == uv_tools_base else None if not tool_dir: for parent in prefix_path.parents: if parent.parent == uv_tools_base: tool_dir = parent break if tool_dir: # Verify with uv-receipt.toml receipt_file = tool_dir / "uv-receipt.toml" if receipt_file.exists(): # Parse tool name from receipt or use directory name try: with open(receipt_file) as f: content = f.read() import re match = re.search(r'name = "([^"]+)"', content) tool_name = match.group(1) if match else tool_dir.name return True, tool_name except Exception: return True, tool_dir.name return False, None def detect_uv_venv(): """Detect if running in a UV-managed virtual environment.""" # Check if we're in a venv if sys.prefix == sys.base_prefix: return False # Check for UV markers in pyvenv.cfg pyvenv_cfg = Path(sys.prefix) / "pyvenv.cfg" if pyvenv_cfg.exists(): try: with open(pyvenv_cfg) as f: content = f.read() if "uv" in content.lower() or "managed by uv" in content: return True except Exception: pass return False def check_uv_available(): """Check if UV is available.""" try: result = subprocess.run( ["uv", "--version"], capture_output=True, text=True, timeout=2 ) return result.returncode == 0 except (FileNotFoundError, subprocess.TimeoutExpired): return False # Get current version try: current_version = get_version("voice-mode") except PackageNotFoundError: current_version = "development" # Check if update needed (unless forced) if not force and current_version != "development": try: response = requests.get( "https://pypi.org/pypi/voice-mode/json", timeout=2 ) if response.status_code == 200: latest_version = response.json()["info"]["version"] if latest_version == current_version: click.echo(f"Already running the latest version ({current_version})") return except (requests.RequestException, KeyError, ValueError): pass # Continue with update if we can't check # Detect installation method is_uv_tool, tool_name = detect_uv_tool_installation() if is_uv_tool: # UV tool installation - use uv tool upgrade click.echo(f"Updating Voice Mode (UV tool: {tool_name})...") result = subprocess.run( ["uv", "tool", "upgrade", tool_name], capture_output=True, text=True ) if result.returncode == 0: try: new_version = get_version("voice-mode") click.echo(f"✅ Successfully updated to version {new_version}") except PackageNotFoundError: click.echo("✅ Successfully updated Voice Mode") else: click.echo(f"❌ Update failed: {result.stderr}") click.echo(f"Try running manually: uv tool upgrade {tool_name}") elif detect_uv_venv(): # UV-managed virtual environment click.echo("Updating Voice Mode (UV virtual environment)...") result = subprocess.run( ["uv", "pip", "install", "--upgrade", "voice-mode"], capture_output=True, text=True ) if result.returncode == 0: try: new_version = get_version("voice-mode") click.echo(f"✅ Successfully updated to version {new_version}") except PackageNotFoundError: click.echo("✅ Successfully updated Voice Mode") else: click.echo(f"❌ Update failed: {result.stderr}") click.echo("Try running: uv pip install --upgrade voice-mode") else: # Standard installation - try UV if available, else pip has_uv = check_uv_available() if has_uv: click.echo("Updating Voice Mode (using UV)...") result = subprocess.run( ["uv", "pip", "install", "--upgrade", "voice-mode"], capture_output=True, text=True ) else: click.echo("Updating Voice Mode (using pip)...") result = subprocess.run( [sys.executable, "-m", "pip", "install", "--upgrade", "voice-mode"], capture_output=True, text=True ) if result.returncode == 0: try: new_version = get_version("voice-mode") click.echo(f"✅ Successfully updated to version {new_version}") except PackageNotFoundError: click.echo("✅ Successfully updated Voice Mode") else: click.echo(f"❌ Update failed: {result.stderr}") if has_uv: click.echo("Try running: uv pip install --upgrade voice-mode") else: click.echo("Try running: pip install --upgrade voice-mode") # Sound Fonts command @audio.command("play") @click.help_option('-h', '--help') @click.option('-t', '--tool', help='Tool name for direct command-line usage') @click.option('-a', '--action', default='start', type=click.Choice(['start', 'end']), help='Action type') @click.option('-s', '--subagent', help='Subagent type (for Task tool)') def play_sound(tool, action, subagent): """Play sound based on tool events (primarily for Claude Code hooks). This command is designed to be called by Claude Code hooks to play sounds when tools are used. It reads hook data from stdin by default, or can be used directly with command-line options. Examples: echo '{"tool_name":"Task","tool_input":{"subagent_type":"mama-bear"}}' | voicemode play-sound voicemode play-sound --tool Task --action start --subagent mama-bear """ import sys from .tools.sound_fonts.player import AudioPlayer from .tools.sound_fonts.hook_handler import ( read_hook_data_from_stdin, parse_claude_code_hook ) # Try to read hook data from stdin first hook_data = None if not sys.stdin.isatty(): hook_data = read_hook_data_from_stdin() if hook_data: # Parse Claude Code hook format parsed_data = parse_claude_code_hook(hook_data) if not parsed_data: sys.exit(1) tool_name = parsed_data["tool_name"] action_type = parsed_data["action"] subagent_type = parsed_data["subagent_type"] metadata = parsed_data["metadata"] else: # Use command-line arguments if not tool: click.echo("Error: --tool is required when not reading from stdin", err=True) sys.exit(1) tool_name = tool action_type = action subagent_type = subagent metadata = {} # Play the sound player = AudioPlayer() success = player.play_sound_for_event( tool_name=tool_name, action=action_type, subagent_type=subagent_type, metadata=metadata ) # Silent exit for hooks - don't clutter Claude Code output sys.exit(0 if success else 1) # Completions command @voice_mode_main_cli.command() @click.help_option('-h', '--help') @click.argument('shell', type=click.Choice(['bash', 'zsh', 'fish'])) @click.option('--install', is_flag=True, help='Install completion script to the appropriate location') def completions(shell, install): """Generate or install shell completion scripts. Examples: voicemode completions bash # Output bash completion to stdout voicemode completions bash --install # Install to ~/.bash_completion.d/ voicemode completions zsh --install # Install to ~/.zfunc/ voicemode completions fish --install # Install to ~/.config/fish/completions/ """ from pathlib import Path # Generate completion scripts based on shell type if shell == 'bash': completion_script = '''# bash completion for voicemode _voicemode_completion() { local IFS=$'\\n' local response response=$(env _VOICEMODE_COMPLETE=bash_complete COMP_WORDS="${COMP_WORDS[*]}" COMP_CWORD=$COMP_CWORD voicemode 2>/dev/null) for completion in $response; do IFS=',' read type value <<< "$completion" if [[ $type == 'plain' ]]; then COMPREPLY+=("$value") elif [[ $type == 'file' ]]; then COMPREPLY+=("$value") elif [[ $type == 'dir' ]]; then COMPREPLY+=("$value") fi done return 0 } complete -o default -F _voicemode_completion voicemode ''' elif shell == 'zsh': completion_script = '''#compdef voicemode # zsh completion for voicemode _voicemode() { local -a response response=(${(f)"$(env _VOICEMODE_COMPLETE=zsh_complete COMP_WORDS="${words[*]}" COMP_CWORD=$((CURRENT-1)) voicemode 2>/dev/null)"}) for completion in $response; do IFS=',' read type value <<< "$completion" compadd -U -- "$value" done } compdef _voicemode voicemode ''' elif shell == 'fish': completion_script = '''# fish completion for voicemode function __fish_voicemode_complete set -l response (env _VOICEMODE_COMPLETE=fish_complete COMP_WORDS=(commandline -cp) COMP_CWORD=(commandline -t) voicemode 2>/dev/null) for completion in $response echo $completion end end complete -c voicemode -f -a '(__fish_voicemode_complete)' ''' if install: # Define installation locations for each shell locations = { 'bash': '~/.bash_completion.d/voicemode', 'zsh': '~/.zfunc/_voicemode', 'fish': '~/.config/fish/completions/voicemode.fish' } install_path = Path(locations[shell]).expanduser() install_path.parent.mkdir(parents=True, exist_ok=True) # Write completion script to file install_path.write_text(completion_script) click.echo(f"✅ Installed {shell} completions to {install_path}") # Provide shell-specific instructions if shell == 'bash': click.echo("\nTo activate now, run:") click.echo(f" source {install_path}") click.echo("\nTo activate permanently, add to ~/.bashrc:") click.echo(f" source {install_path}") elif shell == 'zsh': click.echo("\nTo activate now, run:") click.echo(" autoload -U compinit && compinit") click.echo("\nMake sure ~/.zfunc is in your fpath (add to ~/.zshrc):") click.echo(" fpath=(~/.zfunc $fpath)") elif shell == 'fish': click.echo("\nCompletions will be active in new fish sessions.") click.echo("To activate now, run:") click.echo(f" source {install_path}") else: # Output completion script to stdout click.echo(completion_script)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/mbailey/voicemode'

If you have feedback or need assistance with the MCP directory API, please join our Discord server