"""
Core audio processing functionality for stem generation, splitting, and loop creation.
"""
import asyncio
import logging
import os
import tempfile
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Any
import subprocess
import json
import librosa
import numpy as np
import soundfile as sf
from pydub import AudioSegment
import torch
import torchaudio
logger = logging.getLogger(__name__)
class AudioProcessor:
"""Core audio processing class with stem generation and manipulation capabilities."""
def __init__(self):
"""Initialize the audio processor."""
self.sample_rate = 44100
self.temp_dir = Path(tempfile.gettempdir()) / "stem_mcp"
self.temp_dir.mkdir(exist_ok=True)
# Check if CUDA is available
self.device = "cuda" if torch.cuda.is_available() else "cpu"
logger.info(f"Audio processor initialized with device: {self.device}")
async def generate_stems(self, audio_path: str, output_dir: str = ".",
model_type: str = "htdemucs", num_stems: int = 4) -> str:
"""
Generate stems from an audio file using Demucs AI source separation.
Args:
audio_path: Path to input audio file
output_dir: Directory to save stems
model_type: Demucs model to use
num_stems: Number of stems to generate
Returns:
Result message with paths to generated stems
"""
try:
audio_path = Path(audio_path)
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
if not audio_path.exists():
raise FileNotFoundError(f"Audio file not found: {audio_path}")
# Run Demucs separation
cmd = [
"python", "-m", "demucs.separate",
"--name", model_type,
"--device", self.device,
"-o", str(output_dir),
str(audio_path)
]
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
raise RuntimeError(f"Demucs failed: {stderr.decode()}")
# Find generated stems
stem_dir = output_dir / model_type / audio_path.stem
stem_files = list(stem_dir.glob("*.wav"))
result = f"Generated {len(stem_files)} stems:\n"
for stem_file in sorted(stem_files):
result += f" 📁 {stem_file}\n"
return result
except Exception as e:
logger.error(f"Error generating stems: {e}")
raise
async def split_stems(self, stem_path: str, output_dir: str = ".",
segment_length: float = 30.0, overlap: float = 0.0) -> str:
"""
Split a stem into smaller segments.
Args:
stem_path: Path to stem file
output_dir: Directory to save segments
segment_length: Length of each segment in seconds
overlap: Overlap between segments in seconds
Returns:
Result message with paths to split segments
"""
try:
stem_path = Path(stem_path)
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
if not stem_path.exists():
raise FileNotFoundError(f"Stem file not found: {stem_path}")
# Load audio
audio, sr = librosa.load(str(stem_path), sr=self.sample_rate)
duration = len(audio) / sr
# Calculate segment parameters
segment_samples = int(segment_length * sr)
overlap_samples = int(overlap * sr)
hop_samples = segment_samples - overlap_samples
segments = []
start = 0
segment_idx = 0
while start < len(audio):
end = min(start + segment_samples, len(audio))
segment = audio[start:end]
# Save segment
segment_name = f"{stem_path.stem}_segment_{segment_idx:03d}.wav"
segment_path = output_dir / segment_name
sf.write(str(segment_path), segment, sr)
segments.append(segment_path)
start += hop_samples
segment_idx += 1
if end >= len(audio):
break
result = f"Split '{stem_path.name}' into {len(segments)} segments:\n"
for segment_path in segments:
segment_duration = (end - start) / sr if segments.index(segment_path) == len(segments) - 1 else segment_length
result += f" 🎵 {segment_path.name} ({segment_duration:.1f}s)\n"
return result
except Exception as e:
logger.error(f"Error splitting stems: {e}")
raise
async def create_loop(self, audio_path: str, output_path: Optional[str] = None,
loop_duration: float = 4.0, bpm: Optional[float] = None,
crossfade_duration: float = 0.1) -> str:
"""
Create a seamless loop from audio.
Args:
audio_path: Path to input audio
output_path: Path for output loop file
loop_duration: Duration of loop in seconds
bpm: Target BPM (auto-detected if None)
crossfade_duration: Crossfade duration in seconds
Returns:
Result message with loop information
"""
try:
audio_path = Path(audio_path)
if not audio_path.exists():
raise FileNotFoundError(f"Audio file not found: {audio_path}")
if output_path is None:
output_path = audio_path.parent / f"{audio_path.stem}_loop.wav"
else:
output_path = Path(output_path)
# Load audio
audio, sr = librosa.load(str(audio_path), sr=self.sample_rate)
# Detect BPM if not provided
if bpm is None:
tempo, _ = librosa.beat.beat_track(y=audio, sr=sr)
bpm = tempo
# Calculate loop parameters
loop_samples = int(loop_duration * sr)
crossfade_samples = int(crossfade_duration * sr)
# Extract loop segment
if len(audio) >= loop_samples:
loop_audio = audio[:loop_samples]
else:
# Repeat audio to fill loop duration
repeats = int(np.ceil(loop_samples / len(audio)))
extended_audio = np.tile(audio, repeats)
loop_audio = extended_audio[:loop_samples]
# Apply crossfade for seamless looping
if crossfade_samples > 0:
fade_in = np.linspace(0, 1, crossfade_samples)
fade_out = np.linspace(1, 0, crossfade_samples)
# Fade out end
loop_audio[-crossfade_samples:] *= fade_out
# Add faded beginning to end
loop_audio[-crossfade_samples:] += loop_audio[:crossfade_samples] * fade_in
# Save loop
sf.write(str(output_path), loop_audio, sr)
result = f"Loop created: {output_path.name}\n"
result += f" ⏱️ Duration: {loop_duration}s\n"
result += f" 🎵 BPM: {bpm:.1f}\n"
result += f" 🔄 Crossfade: {crossfade_duration}s\n"
result += f" 📁 Output: {output_path}"
return result
except Exception as e:
logger.error(f"Error creating loop: {e}")
raise
async def analyze_audio(self, audio_path: str) -> str:
"""
Analyze audio file for musical features.
Args:
audio_path: Path to audio file
Returns:
Analysis results as formatted string
"""
try:
audio_path = Path(audio_path)
if not audio_path.exists():
raise FileNotFoundError(f"Audio file not found: {audio_path}")
# Load audio
audio, sr = librosa.load(str(audio_path), sr=self.sample_rate)
duration = len(audio) / sr
# Extract features
tempo, beats = librosa.beat.beat_track(y=audio, sr=sr)
# Spectral features
spectral_centroids = librosa.feature.spectral_centroid(y=audio, sr=sr)[0]
spectral_rolloff = librosa.feature.spectral_rolloff(y=audio, sr=sr)[0]
zero_crossing_rate = librosa.feature.zero_crossing_rate(audio)[0]
# RMS energy
rms_energy = librosa.feature.rms(y=audio)[0]
# Key detection (simplified)
chroma = librosa.feature.chroma_stft(y=audio, sr=sr)
key_profile = np.mean(chroma, axis=1)
key_names = ['C', 'C#', 'D', 'D#', 'E', 'F', 'F#', 'G', 'G#', 'A', 'A#', 'B']
estimated_key = key_names[np.argmax(key_profile)]
# Format results
result = f"Audio Analysis for '{audio_path.name}':\n\n"
result += f"📊 Basic Info:\n"
result += f" Duration: {duration:.2f} seconds\n"
result += f" Sample Rate: {sr} Hz\n"
result += f" Channels: {'Stereo' if len(audio.shape) > 1 else 'Mono'}\n\n"
result += f"🎵 Musical Features:\n"
result += f" Tempo: {tempo:.1f} BPM\n"
result += f" Estimated Key: {estimated_key}\n"
result += f" Beat Count: {len(beats)}\n\n"
result += f"🔊 Spectral Analysis:\n"
result += f" Avg Spectral Centroid: {np.mean(spectral_centroids):.1f} Hz\n"
result += f" Avg Spectral Rolloff: {np.mean(spectral_rolloff):.1f} Hz\n"
result += f" Avg Zero Crossing Rate: {np.mean(zero_crossing_rate):.4f}\n"
result += f" Avg RMS Energy: {np.mean(rms_energy):.4f}\n"
return result
except Exception as e:
logger.error(f"Error analyzing audio: {e}")
raise
async def extract_vocal(self, audio_path: str, output_path: Optional[str] = None,
method: str = "demucs") -> str:
"""
Extract vocal track from audio.
Args:
audio_path: Path to input audio
output_path: Path for output vocal file
method: Extraction method
Returns:
Result message with vocal extraction info
"""
try:
if method == "demucs":
# Use generate_stems and extract vocals
temp_output = self.temp_dir / "vocal_extraction"
result = await self.generate_stems(audio_path, str(temp_output))
# Find vocals stem
audio_name = Path(audio_path).stem
vocal_path = temp_output / "htdemucs" / audio_name / "vocals.wav"
if output_path is None:
output_path = Path(audio_path).parent / f"{audio_name}_vocals.wav"
else:
output_path = Path(output_path)
# Copy vocals to output
import shutil
shutil.copy2(vocal_path, output_path)
return f"Vocal extracted using Demucs:\n 🎤 {output_path}"
else: # librosa method
audio_path = Path(audio_path)
if output_path is None:
output_path = audio_path.parent / f"{audio_path.stem}_vocals.wav"
else:
output_path = Path(output_path)
# Load stereo audio
audio, sr = librosa.load(str(audio_path), sr=self.sample_rate, mono=False)
if len(audio.shape) == 1:
raise ValueError("Vocal extraction requires stereo audio")
# Simple vocal isolation using center channel extraction
vocals = audio[0] - audio[1] # L - R
# Save vocals
sf.write(str(output_path), vocals, sr)
return f"Vocal extracted using center channel method:\n 🎤 {output_path}"
except Exception as e:
logger.error(f"Error extracting vocal: {e}")
raise
async def isolate_instrument(self, audio_path: str, instrument: str = "drums",
output_path: Optional[str] = None, method: str = "demucs") -> str:
"""
Isolate specific instrument from audio.
Args:
audio_path: Path to input audio
instrument: Instrument to isolate
output_path: Path for output file
method: Isolation method
Returns:
Result message with isolation info
"""
try:
if method == "demucs":
# Use generate_stems and extract instrument
temp_output = self.temp_dir / "instrument_isolation"
result = await self.generate_stems(audio_path, str(temp_output))
# Find instrument stem
audio_name = Path(audio_path).stem
instrument_path = temp_output / "htdemucs" / audio_name / f"{instrument}.wav"
if not instrument_path.exists():
# Try common alternatives
alternatives = {
"guitar": "other",
"piano": "other"
}
alt_instrument = alternatives.get(instrument, instrument)
instrument_path = temp_output / "htdemucs" / audio_name / f"{alt_instrument}.wav"
if output_path is None:
output_path = Path(audio_path).parent / f"{audio_name}_{instrument}.wav"
else:
output_path = Path(output_path)
# Copy instrument to output
import shutil
shutil.copy2(instrument_path, output_path)
return f"{instrument.title()} isolated using Demucs:\n 🎸 {output_path}"
else:
raise NotImplementedError(f"Method '{method}' not implemented for instrument isolation")
except Exception as e:
logger.error(f"Error isolating {instrument}: {e}")
raise
async def separate_vocal_ranges(self, audio_path: str, output_dir: str = ".",
ranges: List[str] = None, method: str = "harmonic_analysis",
enhance_separation: bool = True) -> str:
"""
Separate vocal track into different vocal ranges (Soprano, Alto, Tenor, Bass).
Args:
audio_path: Path to vocal audio file
output_dir: Directory to save separated ranges
ranges: List of ranges to extract (soprano, alto, tenor, bass)
method: Separation method
enhance_separation: Apply additional processing
Returns:
Result message with separated vocal ranges info
"""
try:
audio_path = Path(audio_path)
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
if not audio_path.exists():
raise FileNotFoundError(f"Audio file not found: {audio_path}")
if ranges is None:
ranges = ["soprano", "alto", "tenor", "bass"]
# Load audio
audio, sr = librosa.load(str(audio_path), sr=self.sample_rate)
# Define vocal range frequency bounds (in Hz)
vocal_ranges = {
"soprano": {"low": 261.63, "high": 1046.50}, # C4 to C6
"alto": {"low": 196.00, "high": 783.99}, # G3 to G5
"tenor": {"low": 130.81, "high": 523.25}, # C3 to C5
"bass": {"low": 82.41, "high": 329.63} # E2 to E4
}
result_files = []
for vocal_range in ranges:
if vocal_range not in vocal_ranges:
continue
logger.info(f"Processing {vocal_range} range...")
# Get frequency bounds
low_freq = vocal_ranges[vocal_range]["low"]
high_freq = vocal_ranges[vocal_range]["high"]
if method == "frequency_bands":
separated_audio = self._separate_by_frequency_bands(
audio, sr, low_freq, high_freq
)
elif method == "harmonic_analysis":
separated_audio = self._separate_by_harmonic_analysis(
audio, sr, low_freq, high_freq
)
elif method == "spectral_filtering":
separated_audio = self._separate_by_spectral_filtering(
audio, sr, low_freq, high_freq
)
else:
raise ValueError(f"Unknown separation method: {method}")
# Apply enhancement if requested
if enhance_separation:
separated_audio = self._enhance_vocal_separation(
separated_audio, sr, vocal_range
)
# Save the separated range
output_filename = f"{audio_path.stem}_{vocal_range}.wav"
output_path = output_dir / output_filename
sf.write(str(output_path), separated_audio, sr)
result_files.append({
"range": vocal_range,
"file": str(output_path),
"freq_range": f"{low_freq:.1f}Hz - {high_freq:.1f}Hz"
})
logger.info(f"Saved {vocal_range}: {output_path}")
# Format result
result = f"Vocal ranges separated successfully:\n\n"
for file_info in result_files:
result += f"🎤 {file_info['range'].title()}:\n"
result += f" 📁 {file_info['file']}\n"
result += f" 🎵 Frequency range: {file_info['freq_range']}\n\n"
result += f"Method used: {method}\n"
result += f"Enhancement: {'✅ Applied' if enhance_separation else '❌ Disabled'}\n"
result += f"Total files created: {len(result_files)}"
return result
except Exception as e:
logger.error(f"Error separating vocal ranges: {e}")
raise
def _separate_by_frequency_bands(self, audio: np.ndarray, sr: int,
low_freq: float, high_freq: float) -> np.ndarray:
"""Separate audio using frequency band filtering."""
# Convert to frequency domain
stft = librosa.stft(audio)
magnitude = np.abs(stft)
phase = np.angle(stft)
# Create frequency mask
freqs = librosa.fft_frequencies(sr=sr)
mask = (freqs >= low_freq) & (freqs <= high_freq)
# Apply mask
filtered_magnitude = magnitude.copy()
filtered_magnitude[~mask] *= 0.1 # Attenuate outside frequencies
# Reconstruct audio
filtered_stft = filtered_magnitude * np.exp(1j * phase)
filtered_audio = librosa.istft(filtered_stft)
return filtered_audio
def _separate_by_harmonic_analysis(self, audio: np.ndarray, sr: int,
low_freq: float, high_freq: float) -> np.ndarray:
"""Separate audio using harmonic analysis and pitch tracking."""
# Extract harmonic and percussive components
harmonic, percussive = librosa.effects.hpss(audio)
# Focus on harmonic content for vocals
# Extract pitch information
pitches, magnitudes = librosa.piptrack(y=harmonic, sr=sr, threshold=0.1)
# Convert to frequency domain for filtering
stft = librosa.stft(harmonic)
magnitude = np.abs(stft)
phase = np.angle(stft)
# Create frequency-aware mask based on pitch content
freqs = librosa.fft_frequencies(sr=sr)
# Create a soft mask that emphasizes the target frequency range
mask = np.ones_like(magnitude)
for i, freq in enumerate(freqs):
if freq < low_freq:
# Gradual rolloff below range
rolloff = max(0.1, (freq / low_freq) ** 2)
mask[i] *= rolloff
elif freq > high_freq:
# Gradual rolloff above range
rolloff = max(0.1, (high_freq / freq) ** 2)
mask[i] *= rolloff
# Apply harmonic enhancement in target range
target_mask = (freqs >= low_freq) & (freqs <= high_freq)
mask[target_mask] *= 1.5 # Boost target frequencies
# Apply mask
filtered_magnitude = magnitude * mask
# Reconstruct audio
filtered_stft = filtered_magnitude * np.exp(1j * phase)
filtered_audio = librosa.istft(filtered_stft)
return filtered_audio
def _separate_by_spectral_filtering(self, audio: np.ndarray, sr: int,
low_freq: float, high_freq: float) -> np.ndarray:
"""Separate audio using advanced spectral filtering."""
# Use spectral subtraction approach
stft = librosa.stft(audio, hop_length=512)
magnitude = np.abs(stft)
phase = np.angle(stft)
# Calculate spectral centroid to identify vocal content
spectral_centroids = librosa.feature.spectral_centroid(S=magnitude, sr=sr)
# Create adaptive mask based on spectral characteristics
freqs = librosa.fft_frequencies(sr=sr)
mask = np.ones_like(magnitude)
for t in range(magnitude.shape[1]):
centroid = spectral_centroids[0, t]
# Adjust mask based on spectral centroid
for i, freq in enumerate(freqs):
# Distance from target range
if freq < low_freq:
distance = (low_freq - freq) / low_freq
mask[i, t] *= max(0.05, 1 - distance * 2)
elif freq > high_freq:
distance = (freq - high_freq) / high_freq
mask[i, t] *= max(0.05, 1 - distance * 2)
else:
# Boost frequencies in target range
mask[i, t] *= 1.3
# Apply mask
filtered_magnitude = magnitude * mask
# Reconstruct audio
filtered_stft = filtered_magnitude * np.exp(1j * phase)
filtered_audio = librosa.istft(filtered_stft)
return filtered_audio
def _enhance_vocal_separation(self, audio: np.ndarray, sr: int,
vocal_range: str) -> np.ndarray:
"""Apply additional enhancement based on vocal range characteristics."""
# Apply range-specific processing
if vocal_range == "soprano":
# Enhance high frequencies and clarity
audio = self._apply_high_frequency_enhancement(audio, sr)
elif vocal_range == "alto":
# Enhance mid-high frequencies
audio = self._apply_mid_frequency_enhancement(audio, sr)
elif vocal_range == "tenor":
# Enhance mid frequencies with warmth
audio = self._apply_tenor_enhancement(audio, sr)
elif vocal_range == "bass":
# Enhance low-mid frequencies and presence
audio = self._apply_bass_enhancement(audio, sr)
# Apply general vocal enhancement
audio = self._apply_vocal_enhancement(audio, sr)
return audio
def _apply_high_frequency_enhancement(self, audio: np.ndarray, sr: int) -> np.ndarray:
"""Enhance high frequencies for soprano vocals."""
# Apply subtle high-frequency emphasis
stft = librosa.stft(audio)
magnitude = np.abs(stft)
phase = np.angle(stft)
freqs = librosa.fft_frequencies(sr=sr)
# Gentle high-frequency boost above 1kHz
boost_mask = np.where(freqs > 1000, 1.2, 1.0)
enhanced_magnitude = magnitude * boost_mask.reshape(-1, 1)
enhanced_stft = enhanced_magnitude * np.exp(1j * phase)
return librosa.istft(enhanced_stft)
def _apply_mid_frequency_enhancement(self, audio: np.ndarray, sr: int) -> np.ndarray:
"""Enhance mid frequencies for alto vocals."""
stft = librosa.stft(audio)
magnitude = np.abs(stft)
phase = np.angle(stft)
freqs = librosa.fft_frequencies(sr=sr)
# Boost 500Hz-2kHz range
boost_mask = np.where((freqs >= 500) & (freqs <= 2000), 1.15, 1.0)
enhanced_magnitude = magnitude * boost_mask.reshape(-1, 1)
enhanced_stft = enhanced_magnitude * np.exp(1j * phase)
return librosa.istft(enhanced_stft)
def _apply_tenor_enhancement(self, audio: np.ndarray, sr: int) -> np.ndarray:
"""Enhance frequencies for tenor vocals."""
stft = librosa.stft(audio)
magnitude = np.abs(stft)
phase = np.angle(stft)
freqs = librosa.fft_frequencies(sr=sr)
# Boost 200Hz-1kHz range for warmth and presence
boost_mask = np.where((freqs >= 200) & (freqs <= 1000), 1.1, 1.0)
enhanced_magnitude = magnitude * boost_mask.reshape(-1, 1)
enhanced_stft = enhanced_magnitude * np.exp(1j * phase)
return librosa.istft(enhanced_stft)
def _apply_bass_enhancement(self, audio: np.ndarray, sr: int) -> np.ndarray:
"""Enhance frequencies for bass vocals."""
stft = librosa.stft(audio)
magnitude = np.abs(stft)
phase = np.angle(stft)
freqs = librosa.fft_frequencies(sr=sr)
# Boost 80Hz-400Hz range for bass presence
boost_mask = np.where((freqs >= 80) & (freqs <= 400), 1.2, 1.0)
enhanced_magnitude = magnitude * boost_mask.reshape(-1, 1)
enhanced_stft = enhanced_magnitude * np.exp(1j * phase)
return librosa.istft(enhanced_stft)
def _apply_vocal_enhancement(self, audio: np.ndarray, sr: int) -> np.ndarray:
"""Apply general vocal enhancement processing."""
# Apply subtle compression to even out dynamics
# This is a simple form of dynamic range compression
threshold = 0.7
ratio = 3.0
# Calculate envelope using a simpler approach
# Use RMS energy over sliding windows
window_size = 1024
hop_size = 512
# Ensure audio is long enough
if len(audio) < window_size:
return audio # Return as-is for very short audio
# Calculate RMS envelope
envelope = []
for i in range(0, len(audio) - window_size + 1, hop_size):
window = audio[i:i + window_size]
rms = np.sqrt(np.mean(window ** 2))
envelope.extend([rms] * hop_size)
# Handle remaining samples
remaining = len(audio) - len(envelope)
if remaining > 0:
envelope.extend([envelope[-1]] * remaining)
# Ensure envelope matches audio length exactly
envelope = np.array(envelope[:len(audio)])
# Apply compression where envelope exceeds threshold
compressed_audio = audio.copy()
over_threshold = envelope > threshold
if np.any(over_threshold):
# Simple compression formula
gain_reduction = 1 - (envelope[over_threshold] - threshold) / ratio
compressed_audio[over_threshold] *= gain_reduction
return compressed_audio