#!/usr/bin/env python3
"""
Stem MCP Server
An MCP server that provides audio stem processing capabilities:
- Generate stems from audio files
- Split existing stems
- Create loops from audio
- Audio analysis and processing
"""
import asyncio
import logging
import tempfile
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence
import click
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent, ImageContent, EmbeddedResource
from .audio_processor import AudioProcessor
from .tools_schema import TOOLS_SCHEMA
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = Server("stem-mcp")
# Initialize the audio processor
audio_processor = AudioProcessor()
@app.list_tools()
async def handle_list_tools() -> List[Tool]:
"""List available audio processing tools."""
return [Tool(**tool_schema) for tool_schema in TOOLS_SCHEMA]
@app.call_tool()
async def handle_call_tool(name: str, arguments: Dict[str, Any]) -> Sequence[TextContent]:
"""Handle tool execution requests."""
try:
if name == "generate_stems":
return await handle_generate_stems(arguments)
elif name == "split_stems":
return await handle_split_stems(arguments)
elif name == "create_loop":
return await handle_create_loop(arguments)
elif name == "analyze_audio":
return await handle_analyze_audio(arguments)
elif name == "extract_vocal":
return await handle_extract_vocal(arguments)
elif name == "isolate_instrument":
return await handle_isolate_instrument(arguments)
elif name == "separate_vocal_ranges":
return await handle_separate_vocal_ranges(arguments)
else:
return [TextContent(type="text", text=f"Unknown tool: {name}")]
except Exception as e:
logger.error(f"Error executing tool {name}: {e}")
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def handle_generate_stems(arguments: Dict[str, Any]) -> Sequence[TextContent]:
"""Generate stems from an audio file using AI source separation."""
audio_path = arguments.get("audio_path")
output_dir = arguments.get("output_dir", ".")
model_type = arguments.get("model_type", "htdemucs")
num_stems = arguments.get("num_stems", 4)
if not audio_path:
return [TextContent(type="text", text="Error: audio_path is required")]
try:
result = await audio_processor.generate_stems(
audio_path=audio_path,
output_dir=output_dir,
model_type=model_type,
num_stems=num_stems
)
return [TextContent(
type="text",
text=f"✅ Stems generated successfully!\n\n{result}"
)]
except Exception as e:
return [TextContent(type="text", text=f"Error generating stems: {str(e)}")]
async def handle_split_stems(arguments: Dict[str, Any]) -> Sequence[TextContent]:
"""Split existing stems into smaller segments."""
stem_path = arguments.get("stem_path")
output_dir = arguments.get("output_dir", ".")
segment_length = arguments.get("segment_length", 30.0)
overlap = arguments.get("overlap", 0.0)
if not stem_path:
return [TextContent(type="text", text="Error: stem_path is required")]
try:
result = await audio_processor.split_stems(
stem_path=stem_path,
output_dir=output_dir,
segment_length=segment_length,
overlap=overlap
)
return [TextContent(
type="text",
text=f"✅ Stem split successfully!\n\n{result}"
)]
except Exception as e:
return [TextContent(type="text", text=f"Error splitting stem: {str(e)}")]
async def handle_create_loop(arguments: Dict[str, Any]) -> Sequence[TextContent]:
"""Create seamless loops from audio segments."""
audio_path = arguments.get("audio_path")
output_path = arguments.get("output_path")
loop_duration = arguments.get("loop_duration", 4.0)
bpm = arguments.get("bpm")
crossfade_duration = arguments.get("crossfade_duration", 0.1)
if not audio_path:
return [TextContent(type="text", text="Error: audio_path is required")]
try:
result = await audio_processor.create_loop(
audio_path=audio_path,
output_path=output_path,
loop_duration=loop_duration,
bpm=bpm,
crossfade_duration=crossfade_duration
)
return [TextContent(
type="text",
text=f"✅ Loop created successfully!\n\n{result}"
)]
except Exception as e:
return [TextContent(type="text", text=f"Error creating loop: {str(e)}")]
async def handle_analyze_audio(arguments: Dict[str, Any]) -> Sequence[TextContent]:
"""Analyze audio file for tempo, key, and other musical features."""
audio_path = arguments.get("audio_path")
if not audio_path:
return [TextContent(type="text", text="Error: audio_path is required")]
try:
result = await audio_processor.analyze_audio(audio_path=audio_path)
return [TextContent(
type="text",
text=f"🎵 Audio Analysis Results:\n\n{result}"
)]
except Exception as e:
return [TextContent(type="text", text=f"Error analyzing audio: {str(e)}")]
async def handle_extract_vocal(arguments: Dict[str, Any]) -> Sequence[TextContent]:
"""Extract vocal track from audio."""
audio_path = arguments.get("audio_path")
output_path = arguments.get("output_path")
method = arguments.get("method", "demucs")
if not audio_path:
return [TextContent(type="text", text="Error: audio_path is required")]
try:
result = await audio_processor.extract_vocal(
audio_path=audio_path,
output_path=output_path,
method=method
)
return [TextContent(
type="text",
text=f"🎤 Vocal extracted successfully!\n\n{result}"
)]
except Exception as e:
return [TextContent(type="text", text=f"Error extracting vocal: {str(e)}")]
async def handle_isolate_instrument(arguments: Dict[str, Any]) -> Sequence[TextContent]:
"""Isolate specific instrument from audio."""
audio_path = arguments.get("audio_path")
instrument = arguments.get("instrument", "drums")
output_path = arguments.get("output_path")
method = arguments.get("method", "demucs")
if not audio_path:
return [TextContent(type="text", text="Error: audio_path is required")]
try:
result = await audio_processor.isolate_instrument(
audio_path=audio_path,
instrument=instrument,
output_path=output_path,
method=method
)
return [TextContent(
type="text",
text=f"🥁 {instrument.title()} isolated successfully!\n\n{result}"
)]
except Exception as e:
return [TextContent(type="text", text=f"Error isolating {instrument}: {str(e)}")]
async def handle_separate_vocal_ranges(arguments: Dict[str, Any]) -> Sequence[TextContent]:
"""Separate vocal track into different vocal ranges (Soprano, Alto, Tenor, Bass)."""
audio_path = arguments.get("audio_path")
output_dir = arguments.get("output_dir", ".")
ranges = arguments.get("ranges", ["soprano", "alto", "tenor", "bass"])
method = arguments.get("method", "harmonic_analysis")
enhance_separation = arguments.get("enhance_separation", True)
if not audio_path:
return [TextContent(type="text", text="Error: audio_path is required")]
try:
result = await audio_processor.separate_vocal_ranges(
audio_path=audio_path,
output_dir=output_dir,
ranges=ranges,
method=method,
enhance_separation=enhance_separation
)
return [TextContent(
type="text",
text=f"🎤 Vocal ranges separated successfully!\n\n{result}"
)]
except Exception as e:
return [TextContent(type="text", text=f"Error separating vocal ranges: {str(e)}")]
async def main():
"""Main entry point for the MCP server."""
async with stdio_server() as (read_stream, write_stream):
await app.run(read_stream, write_stream, app.create_initialization_options())
@click.command()
@click.option("--debug", is_flag=True, help="Enable debug logging")
def cli(debug: bool):
"""Start the Stem MCP Server."""
if debug:
logging.getLogger().setLevel(logging.DEBUG)
logger.info("Starting Stem MCP Server...")
asyncio.run(main())
if __name__ == "__main__":
cli()