"""
Utility functions for file management and audio processing helpers.
"""
import os
import shutil
import tempfile
from pathlib import Path
from typing import List, Optional, Union
import logging
logger = logging.getLogger(__name__)
class FileManager:
"""Utility class for managing temporary files and audio file operations."""
SUPPORTED_AUDIO_FORMATS = {'.wav', '.mp3', '.flac', '.aac', '.m4a', '.ogg', '.wma'}
def __init__(self, temp_dir: Optional[str] = None):
"""Initialize file manager with optional custom temp directory."""
if temp_dir:
self.temp_dir = Path(temp_dir)
else:
self.temp_dir = Path(tempfile.gettempdir()) / "stem_mcp"
self.temp_dir.mkdir(parents=True, exist_ok=True)
self.temp_files: List[Path] = []
def is_audio_file(self, file_path: Union[str, Path]) -> bool:
"""Check if file is a supported audio format."""
return Path(file_path).suffix.lower() in self.SUPPORTED_AUDIO_FORMATS
def create_temp_file(self, suffix: str = ".wav", prefix: str = "stem_mcp_") -> Path:
"""Create a temporary file and track it for cleanup."""
temp_file = self.temp_dir / f"{prefix}{len(self.temp_files)}{suffix}"
self.temp_files.append(temp_file)
return temp_file
def create_temp_dir(self, prefix: str = "stem_mcp_") -> Path:
"""Create a temporary directory."""
temp_dir = self.temp_dir / f"{prefix}{len(self.temp_files)}"
temp_dir.mkdir(parents=True, exist_ok=True)
self.temp_files.append(temp_dir)
return temp_dir
def cleanup_temp_files(self):
"""Remove all tracked temporary files and directories."""
for temp_path in self.temp_files:
try:
if temp_path.exists():
if temp_path.is_dir():
shutil.rmtree(temp_path)
else:
temp_path.unlink()
logger.debug(f"Cleaned up: {temp_path}")
except Exception as e:
logger.warning(f"Failed to cleanup {temp_path}: {e}")
self.temp_files.clear()
def validate_audio_path(self, audio_path: str) -> Path:
"""Validate audio file path and return Path object."""
path = Path(audio_path)
if not path.exists():
raise FileNotFoundError(f"Audio file not found: {audio_path}")
if not self.is_audio_file(path):
raise ValueError(f"Unsupported audio format: {path.suffix}")
return path
def get_file_info(self, file_path: Union[str, Path]) -> dict:
"""Get basic information about a file."""
path = Path(file_path)
if not path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
stat = path.stat()
return {
"name": path.name,
"stem": path.stem,
"suffix": path.suffix,
"size_bytes": stat.st_size,
"size_mb": stat.st_size / (1024 * 1024),
"is_audio": self.is_audio_file(path),
"absolute_path": str(path.absolute())
}
def ensure_output_dir(self, output_path: Union[str, Path]) -> Path:
"""Ensure output directory exists and return the path."""
path = Path(output_path)
path.parent.mkdir(parents=True, exist_ok=True)
return path
def generate_output_filename(self, input_path: Union[str, Path],
suffix: str, extension: str = ".wav") -> Path:
"""Generate an output filename based on input path and suffix."""
input_path = Path(input_path)
output_name = f"{input_path.stem}_{suffix}{extension}"
return input_path.parent / output_name
def __enter__(self):
"""Context manager entry."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit with cleanup."""
self.cleanup_temp_files()
def format_duration(seconds: float) -> str:
"""Format duration in seconds to human readable string."""
if seconds < 60:
return f"{seconds:.1f}s"
elif seconds < 3600:
minutes = int(seconds // 60)
secs = seconds % 60
return f"{minutes}m {secs:.1f}s"
else:
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = seconds % 60
return f"{hours}h {minutes}m {secs:.1f}s"
def format_file_size(size_bytes: int) -> str:
"""Format file size in bytes to human readable string."""
for unit in ['B', 'KB', 'MB', 'GB']:
if size_bytes < 1024.0:
return f"{size_bytes:.1f} {unit}"
size_bytes /= 1024.0
return f"{size_bytes:.1f} TB"
def validate_bpm(bpm: Optional[float]) -> bool:
"""Validate BPM value."""
if bpm is None:
return True
return 60.0 <= bpm <= 200.0
def validate_duration(duration: float) -> bool:
"""Validate duration value."""
return 0.1 <= duration <= 3600.0 # 0.1 second to 1 hour
def sanitize_filename(filename: str) -> str:
"""Sanitize filename by removing/replacing invalid characters."""
import re
# Remove or replace invalid characters
filename = re.sub(r'[<>:"/\\|?*]', '_', filename)
# Remove multiple underscores
filename = re.sub(r'_+', '_', filename)
# Remove leading/trailing underscores and spaces
filename = filename.strip('_ ')
# Ensure it's not empty
if not filename:
filename = "output"
return filename