Skip to main content
Glama
audio_player.py19 kB
""" Audio playback functionality for the MCP Play Sound Server. This module provides audio playback capabilities with multiple backend support, fallback mechanisms, and comprehensive error handling. """ import logging import subprocess import sys from dataclasses import dataclass from enum import Enum from pathlib import Path from typing import Optional, Union import asyncio import concurrent.futures logger = logging.getLogger(__name__) class PlaybackStatus(Enum): """Status of audio playback operation.""" SUCCESS = "success" FAILED = "failed" FALLBACK_USED = "fallback_used" TIMEOUT = "timeout" FILE_NOT_FOUND = "file_not_found" UNSUPPORTED_FORMAT = "unsupported_format" @dataclass class PlaybackResult: """Result of an audio playback operation.""" status: PlaybackStatus message: str duration_ms: Optional[int] = None backend_used: Optional[str] = None fallback_used: bool = False class AudioBackend: """Base class for audio backends.""" def __init__(self, name: str): self.name = name async def play(self, file_path: Path, volume: float = 1.0, timeout: int = 30) -> PlaybackResult: """Play audio file asynchronously.""" raise NotImplementedError def is_available(self) -> bool: """Check if this backend is available on the current system.""" raise NotImplementedError class AFPlayBackend(AudioBackend): """macOS afplay backend for audio playback.""" def __init__(self, audio_device: Optional[str] = None): super().__init__("afplay") self.audio_device = audio_device def is_available(self) -> bool: """Check if afplay is available (macOS only).""" if sys.platform != "darwin": return False try: result = subprocess.run( ["which", "afplay"], capture_output=True, text=True, timeout=5 ) return result.returncode == 0 except (subprocess.TimeoutExpired, FileNotFoundError): return False async def play(self, file_path: Path, volume: float = 1.0, timeout: int = 30) -> PlaybackResult: """Play audio using afplay with optional device switching.""" if not file_path.exists(): return PlaybackResult( status=PlaybackStatus.FILE_NOT_FOUND, message=f"Audio file not found: {file_path}", backend_used=self.name ) try: # Handle device switching if specified original_device = None device_switched = False if self.audio_device and self.audio_device.strip(): if self.audio_device.lower() in ["default", "system default", "default system output device"]: # Use system default - no switching needed logger.info("Using system default audio device (explicitly configured)") else: # Try to switch to specific device original_device = await self._get_current_audio_device() logger.info(f"Attempting to switch from '{original_device}' to '{self.audio_device}'") switch_result = await self._switch_audio_device(self.audio_device) if switch_result: device_switched = True logger.info(f"Successfully switched to '{self.audio_device}'") else: logger.warning(f"Could not switch to '{self.audio_device}', using current device '{original_device}'") else: # No audio device configured - use current default logger.info("No AUDIO_DEVICE configured, using current system default") # Build afplay command with volume control cmd = ["afplay"] if volume != 1.0: cmd.extend(["-v", str(volume)]) cmd.append(str(file_path)) logger.debug(f"Executing afplay command: {' '.join(cmd)}") # Run afplay asynchronously with timeout process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) try: stdout, stderr = await asyncio.wait_for( process.communicate(), timeout=timeout ) except asyncio.TimeoutError: process.kill() await process.wait() return PlaybackResult( status=PlaybackStatus.TIMEOUT, message=f"Audio playback timed out after {timeout} seconds", backend_used=self.name ) # Restore original audio device if we switched if device_switched and original_device: await self._switch_audio_device(original_device) if process.returncode == 0: if self.audio_device and self.audio_device.strip(): if self.audio_device.lower() in ["default", "system default", "default system output device"]: device_msg = " (via system default device)" else: device_msg = f" (via {self.audio_device})" else: device_msg = " (via current system default)" return PlaybackResult( status=PlaybackStatus.SUCCESS, message=f"Audio played successfully{device_msg}", backend_used=self.name ) else: error_msg = stderr.decode() if stderr else "Unknown afplay error" return PlaybackResult( status=PlaybackStatus.FAILED, message=f"afplay failed: {error_msg}", backend_used=self.name ) except Exception as e: logger.error(f"Error playing audio with afplay: {e}") # Restore original audio device if we switched and there was an error if device_switched and original_device: try: await self._switch_audio_device(original_device) except: pass # Don't fail if we can't restore return PlaybackResult( status=PlaybackStatus.FAILED, message=f"afplay error: {str(e)}", backend_used=self.name ) async def _get_current_audio_device(self) -> Optional[str]: """Get the current audio output device using SwitchAudioSource.""" try: # Use SwitchAudioSource to get current device (same as working setup script) process = await asyncio.create_subprocess_exec( "SwitchAudioSource", "-c", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() if process.returncode == 0: current_device = stdout.decode().strip() logger.debug(f"Current audio device: {current_device}") return current_device except Exception as e: logger.debug(f"Could not get current audio device: {e}") return None async def _switch_audio_device(self, device_name: str) -> bool: """Switch to the specified audio output device using SwitchAudioSource.""" try: # Use SwitchAudioSource (same as working setup script) process = await asyncio.create_subprocess_exec( "SwitchAudioSource", "-s", device_name, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() if process.returncode == 0: logger.info(f"Successfully switched audio device to: {device_name} (via SwitchAudioSource)") return True else: error_msg = stderr.decode().strip() if stderr else "Unknown error" logger.warning(f"Failed to switch audio device to {device_name}: {error_msg}") return False except FileNotFoundError: logger.error("SwitchAudioSource not found. Please install with: brew install switchaudio-osx") return False except Exception as e: logger.error(f"Error switching audio device to {device_name}: {e}") return False @staticmethod async def get_available_audio_devices() -> list: """Get list of available audio output devices on macOS.""" devices = [] try: # Use system_profiler to get audio devices process = await asyncio.create_subprocess_exec( "system_profiler", "SPAudioDataType", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() if process.returncode == 0: output = stdout.decode() lines = output.split('\n') current_device = None in_devices_section = False for line in lines: stripped = line.strip() # Check if we're in the Devices section if stripped == "Devices:": in_devices_section = True continue if not in_devices_section: continue # Device names end with ':' and are not indented much if (stripped.endswith(':') and not stripped.startswith(('Input', 'Output', 'Current', 'Manufacturer', 'Transport', 'Default')) and len(line) - len(line.lstrip()) <= 8): # Not too deeply indented # Save previous device if it was an output device if current_device and current_device.get("is_output"): devices.append(current_device) # Start new device device_name = stripped.rstrip(':').strip() if device_name and device_name != "Devices": current_device = {"name": device_name, "is_output": False, "is_default": False} elif current_device: # Check for output device indicators if "Output Channels:" in stripped: current_device["is_output"] = True elif "Default Output Device: Yes" in stripped: current_device["is_default"] = True current_device["is_output"] = True elif "Default System Output Device: Yes" in stripped: current_device["is_default"] = True current_device["is_output"] = True # Add the last device if it's an output device if current_device and current_device.get("is_output"): devices.append(current_device) except Exception as e: logger.error(f"Error getting audio devices: {e}") return devices class SimpleAudioBackend(AudioBackend): """SimpleAudio backend for cross-platform audio playback.""" def __init__(self): super().__init__("simpleaudio") self._simpleaudio = None def is_available(self) -> bool: """Check if simpleaudio is available.""" try: import simpleaudio self._simpleaudio = simpleaudio return True except ImportError: return False async def play(self, file_path: Path, volume: float = 1.0, timeout: int = 30) -> PlaybackResult: """Play audio using simpleaudio.""" if not self._simpleaudio: if not self.is_available(): return PlaybackResult( status=PlaybackStatus.FAILED, message="simpleaudio not available", backend_used=self.name ) if not file_path.exists(): return PlaybackResult( status=PlaybackStatus.FILE_NOT_FOUND, message=f"Audio file not found: {file_path}", backend_used=self.name ) try: # Run in thread pool to avoid blocking loop = asyncio.get_event_loop() with concurrent.futures.ThreadPoolExecutor() as executor: result = await asyncio.wait_for( loop.run_in_executor(executor, self._play_sync, file_path, volume), timeout=timeout ) return result except asyncio.TimeoutError: return PlaybackResult( status=PlaybackStatus.TIMEOUT, message=f"Audio playback timed out after {timeout} seconds", backend_used=self.name ) except Exception as e: logger.error(f"Error playing audio with simpleaudio: {e}") return PlaybackResult( status=PlaybackStatus.FAILED, message=f"simpleaudio error: {str(e)}", backend_used=self.name ) def _play_sync(self, file_path: Path, volume: float) -> PlaybackResult: """Synchronous audio playback for thread execution.""" try: # Load and play WAV file wave_obj = self._simpleaudio.WaveObject.from_wave_file(str(file_path)) play_obj = wave_obj.play() play_obj.wait_done() return PlaybackResult( status=PlaybackStatus.SUCCESS, message="Audio played successfully", backend_used=self.name ) except Exception as e: return PlaybackResult( status=PlaybackStatus.FAILED, message=f"simpleaudio playback error: {str(e)}", backend_used=self.name ) class AudioPlayer: """Main audio player with multiple backend support and fallback.""" def __init__(self, config): """Initialize audio player with configuration.""" self.config = config self.backends = [] self._setup_backends() self._default_sound_path = self._get_default_sound_path() logger.info(f"AudioPlayer initialized with {len(self.backends)} available backends") def _setup_backends(self) -> None: """Set up available audio backends based on configuration and platform.""" # Always try AFPlay first on macOS (most reliable) afplay = AFPlayBackend(audio_device=getattr(self.config, 'audio_device', None)) if afplay.is_available(): self.backends.append(afplay) device_info = f" (device: {self.config.audio_device})" if getattr(self.config, 'audio_device', None) else "" logger.debug(f"AFPlay backend available{device_info}") # Add SimpleAudio as fallback simpleaudio = SimpleAudioBackend() if simpleaudio.is_available(): self.backends.append(simpleaudio) logger.debug("SimpleAudio backend available") if not self.backends: logger.warning("No audio backends available!") def _get_default_sound_path(self) -> Path: """Get path to default notification sound.""" # Default sound should be bundled with the package package_dir = Path(__file__).parent default_sound = package_dir / "assets" / "notification.wav" if not default_sound.exists(): logger.warning(f"Default sound file not found: {default_sound}") return default_sound async def play_notification(self, custom_path: Optional[str] = None) -> PlaybackResult: """ Play notification sound with fallback support. Args: custom_path: Optional path to custom audio file Returns: PlaybackResult with status and details """ # Determine which file to play if custom_path and self.config.custom_sound_path: audio_path = Path(self.config.custom_sound_path) use_custom = True elif custom_path: audio_path = Path(custom_path) use_custom = True else: audio_path = self._default_sound_path use_custom = False logger.info(f"Playing audio: {audio_path} (custom={use_custom})") # Try to play the requested file result = await self._play_with_backends(audio_path) # If custom file failed and fallback is enabled, try default if (result.status != PlaybackStatus.SUCCESS and use_custom and self.config.enable_fallback and audio_path != self._default_sound_path): logger.info("Custom audio failed, trying default fallback") fallback_result = await self._play_with_backends(self._default_sound_path) if fallback_result.status == PlaybackStatus.SUCCESS: fallback_result.fallback_used = True fallback_result.status = PlaybackStatus.FALLBACK_USED fallback_result.message = f"Custom audio failed, used fallback: {result.message}" return fallback_result return result async def _play_with_backends(self, file_path: Path) -> PlaybackResult: """Try to play audio file with available backends.""" if not self.backends: return PlaybackResult( status=PlaybackStatus.FAILED, message="No audio backends available" ) last_result = None for backend in self.backends: logger.debug(f"Trying backend: {backend.name}") result = await backend.play( file_path, volume=self.config.volume_level, timeout=self.config.playback_timeout_seconds ) if result.status == PlaybackStatus.SUCCESS: return result last_result = result logger.debug(f"Backend {backend.name} failed: {result.message}") # All backends failed return last_result or PlaybackResult( status=PlaybackStatus.FAILED, message="All audio backends failed" )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/davidteren/play-sound-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server