AbletonMCP
by ahujasid
Verified
- ableton-mcp
- MCP_Server
# ableton_mcp_server.py
from mcp.server.fastmcp import FastMCP, Context
import socket
import json
import logging
from dataclasses import dataclass
from contextlib import asynccontextmanager
from typing import AsyncIterator, Dict, Any, List, Union
# Configure logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("AbletonMCPServer")
@dataclass
class AbletonConnection:
host: str
port: int
sock: socket.socket = None
def connect(self) -> bool:
"""Connect to the Ableton Remote Script socket server"""
if self.sock:
return True
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((self.host, self.port))
logger.info(f"Connected to Ableton at {self.host}:{self.port}")
return True
except Exception as e:
logger.error(f"Failed to connect to Ableton: {str(e)}")
self.sock = None
return False
def disconnect(self):
"""Disconnect from the Ableton Remote Script"""
if self.sock:
try:
self.sock.close()
except Exception as e:
logger.error(f"Error disconnecting from Ableton: {str(e)}")
finally:
self.sock = None
def receive_full_response(self, sock, buffer_size=8192):
"""Receive the complete response, potentially in multiple chunks"""
chunks = []
sock.settimeout(15.0) # Increased timeout for operations that might take longer
try:
while True:
try:
chunk = sock.recv(buffer_size)
if not chunk:
if not chunks:
raise Exception("Connection closed before receiving any data")
break
chunks.append(chunk)
# Check if we've received a complete JSON object
try:
data = b''.join(chunks)
json.loads(data.decode('utf-8'))
logger.info(f"Received complete response ({len(data)} bytes)")
return data
except json.JSONDecodeError:
# Incomplete JSON, continue receiving
continue
except socket.timeout:
logger.warning("Socket timeout during chunked receive")
break
except (ConnectionError, BrokenPipeError, ConnectionResetError) as e:
logger.error(f"Socket connection error during receive: {str(e)}")
raise
except Exception as e:
logger.error(f"Error during receive: {str(e)}")
raise
# If we get here, we either timed out or broke out of the loop
if chunks:
data = b''.join(chunks)
logger.info(f"Returning data after receive completion ({len(data)} bytes)")
try:
json.loads(data.decode('utf-8'))
return data
except json.JSONDecodeError:
raise Exception("Incomplete JSON response received")
else:
raise Exception("No data received")
def send_command(self, command_type: str, params: Dict[str, Any] = None) -> Dict[str, Any]:
"""Send a command to Ableton and return the response"""
if not self.sock and not self.connect():
raise ConnectionError("Not connected to Ableton")
command = {
"type": command_type,
"params": params or {}
}
# Check if this is a state-modifying command
is_modifying_command = command_type in [
"create_midi_track", "create_audio_track", "set_track_name",
"create_clip", "add_notes_to_clip", "set_clip_name",
"set_tempo", "fire_clip", "stop_clip", "set_device_parameter",
"start_playback", "stop_playback", "load_instrument_or_effect"
]
try:
logger.info(f"Sending command: {command_type} with params: {params}")
# Send the command
self.sock.sendall(json.dumps(command).encode('utf-8'))
logger.info(f"Command sent, waiting for response...")
# For state-modifying commands, add a small delay to give Ableton time to process
if is_modifying_command:
import time
time.sleep(0.1) # 100ms delay
# Set timeout based on command type
timeout = 15.0 if is_modifying_command else 10.0
self.sock.settimeout(timeout)
# Receive the response
response_data = self.receive_full_response(self.sock)
logger.info(f"Received {len(response_data)} bytes of data")
# Parse the response
response = json.loads(response_data.decode('utf-8'))
logger.info(f"Response parsed, status: {response.get('status', 'unknown')}")
if response.get("status") == "error":
logger.error(f"Ableton error: {response.get('message')}")
raise Exception(response.get("message", "Unknown error from Ableton"))
# For state-modifying commands, add another small delay after receiving response
if is_modifying_command:
import time
time.sleep(0.1) # 100ms delay
return response.get("result", {})
except socket.timeout:
logger.error("Socket timeout while waiting for response from Ableton")
self.sock = None
raise Exception("Timeout waiting for Ableton response")
except (ConnectionError, BrokenPipeError, ConnectionResetError) as e:
logger.error(f"Socket connection error: {str(e)}")
self.sock = None
raise Exception(f"Connection to Ableton lost: {str(e)}")
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON response from Ableton: {str(e)}")
if 'response_data' in locals() and response_data:
logger.error(f"Raw response (first 200 bytes): {response_data[:200]}")
self.sock = None
raise Exception(f"Invalid response from Ableton: {str(e)}")
except Exception as e:
logger.error(f"Error communicating with Ableton: {str(e)}")
self.sock = None
raise Exception(f"Communication error with Ableton: {str(e)}")
@asynccontextmanager
async def server_lifespan(server: FastMCP) -> AsyncIterator[Dict[str, Any]]:
"""Manage server startup and shutdown lifecycle"""
try:
logger.info("AbletonMCP server starting up")
try:
ableton = get_ableton_connection()
logger.info("Successfully connected to Ableton on startup")
except Exception as e:
logger.warning(f"Could not connect to Ableton on startup: {str(e)}")
logger.warning("Make sure the Ableton Remote Script is running")
yield {}
finally:
global _ableton_connection
if _ableton_connection:
logger.info("Disconnecting from Ableton on shutdown")
_ableton_connection.disconnect()
_ableton_connection = None
logger.info("AbletonMCP server shut down")
# Create the MCP server with lifespan support
mcp = FastMCP(
"AbletonMCP",
description="Ableton Live integration through the Model Context Protocol",
lifespan=server_lifespan
)
# Global connection for resources
_ableton_connection = None
def get_ableton_connection():
"""Get or create a persistent Ableton connection"""
global _ableton_connection
if _ableton_connection is not None:
try:
# Test the connection with a simple ping
# We'll try to send an empty message, which should fail if the connection is dead
# but won't affect Ableton if it's alive
_ableton_connection.sock.settimeout(1.0)
_ableton_connection.sock.sendall(b'')
return _ableton_connection
except Exception as e:
logger.warning(f"Existing connection is no longer valid: {str(e)}")
try:
_ableton_connection.disconnect()
except:
pass
_ableton_connection = None
# Connection doesn't exist or is invalid, create a new one
if _ableton_connection is None:
# Try to connect up to 3 times with a short delay between attempts
max_attempts = 3
for attempt in range(1, max_attempts + 1):
try:
logger.info(f"Connecting to Ableton (attempt {attempt}/{max_attempts})...")
_ableton_connection = AbletonConnection(host="localhost", port=9877)
if _ableton_connection.connect():
logger.info("Created new persistent connection to Ableton")
# Validate connection with a simple command
try:
# Get session info as a test
_ableton_connection.send_command("get_session_info")
logger.info("Connection validated successfully")
return _ableton_connection
except Exception as e:
logger.error(f"Connection validation failed: {str(e)}")
_ableton_connection.disconnect()
_ableton_connection = None
# Continue to next attempt
else:
_ableton_connection = None
except Exception as e:
logger.error(f"Connection attempt {attempt} failed: {str(e)}")
if _ableton_connection:
_ableton_connection.disconnect()
_ableton_connection = None
# Wait before trying again, but only if we have more attempts left
if attempt < max_attempts:
import time
time.sleep(1.0)
# If we get here, all connection attempts failed
if _ableton_connection is None:
logger.error("Failed to connect to Ableton after multiple attempts")
raise Exception("Could not connect to Ableton. Make sure the Remote Script is running.")
return _ableton_connection
# Core Tool endpoints
@mcp.tool()
def get_session_info(ctx: Context) -> str:
"""Get detailed information about the current Ableton session"""
try:
ableton = get_ableton_connection()
result = ableton.send_command("get_session_info")
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error getting session info from Ableton: {str(e)}")
return f"Error getting session info: {str(e)}"
@mcp.tool()
def get_track_info(ctx: Context, track_index: int) -> str:
"""
Get detailed information about a specific track in Ableton.
Parameters:
- track_index: The index of the track to get information about
"""
try:
ableton = get_ableton_connection()
result = ableton.send_command("get_track_info", {"track_index": track_index})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error getting track info from Ableton: {str(e)}")
return f"Error getting track info: {str(e)}"
@mcp.tool()
def create_midi_track(ctx: Context, index: int = -1) -> str:
"""
Create a new MIDI track in the Ableton session.
Parameters:
- index: The index to insert the track at (-1 = end of list)
"""
try:
ableton = get_ableton_connection()
result = ableton.send_command("create_midi_track", {"index": index})
return f"Created new MIDI track: {result.get('name', 'unknown')}"
except Exception as e:
logger.error(f"Error creating MIDI track: {str(e)}")
return f"Error creating MIDI track: {str(e)}"
@mcp.tool()
def set_track_name(ctx: Context, track_index: int, name: str) -> str:
"""
Set the name of a track.
Parameters:
- track_index: The index of the track to rename
- name: The new name for the track
"""
try:
ableton = get_ableton_connection()
result = ableton.send_command("set_track_name", {"track_index": track_index, "name": name})
return f"Renamed track to: {result.get('name', name)}"
except Exception as e:
logger.error(f"Error setting track name: {str(e)}")
return f"Error setting track name: {str(e)}"
@mcp.tool()
def create_clip(ctx: Context, track_index: int, clip_index: int, length: float = 4.0) -> str:
"""
Create a new MIDI clip in the specified track and clip slot.
Parameters:
- track_index: The index of the track to create the clip in
- clip_index: The index of the clip slot to create the clip in
- length: The length of the clip in beats (default: 4.0)
"""
try:
ableton = get_ableton_connection()
result = ableton.send_command("create_clip", {
"track_index": track_index,
"clip_index": clip_index,
"length": length
})
return f"Created new clip at track {track_index}, slot {clip_index} with length {length} beats"
except Exception as e:
logger.error(f"Error creating clip: {str(e)}")
return f"Error creating clip: {str(e)}"
@mcp.tool()
def add_notes_to_clip(
ctx: Context,
track_index: int,
clip_index: int,
notes: List[Dict[str, Union[int, float, bool]]]
) -> str:
"""
Add MIDI notes to a clip.
Parameters:
- track_index: The index of the track containing the clip
- clip_index: The index of the clip slot containing the clip
- notes: List of note dictionaries, each with pitch, start_time, duration, velocity, and mute
"""
try:
ableton = get_ableton_connection()
result = ableton.send_command("add_notes_to_clip", {
"track_index": track_index,
"clip_index": clip_index,
"notes": notes
})
return f"Added {len(notes)} notes to clip at track {track_index}, slot {clip_index}"
except Exception as e:
logger.error(f"Error adding notes to clip: {str(e)}")
return f"Error adding notes to clip: {str(e)}"
@mcp.tool()
def set_clip_name(ctx: Context, track_index: int, clip_index: int, name: str) -> str:
"""
Set the name of a clip.
Parameters:
- track_index: The index of the track containing the clip
- clip_index: The index of the clip slot containing the clip
- name: The new name for the clip
"""
try:
ableton = get_ableton_connection()
result = ableton.send_command("set_clip_name", {
"track_index": track_index,
"clip_index": clip_index,
"name": name
})
return f"Renamed clip at track {track_index}, slot {clip_index} to '{name}'"
except Exception as e:
logger.error(f"Error setting clip name: {str(e)}")
return f"Error setting clip name: {str(e)}"
@mcp.tool()
def set_tempo(ctx: Context, tempo: float) -> str:
"""
Set the tempo of the Ableton session.
Parameters:
- tempo: The new tempo in BPM
"""
try:
ableton = get_ableton_connection()
result = ableton.send_command("set_tempo", {"tempo": tempo})
return f"Set tempo to {tempo} BPM"
except Exception as e:
logger.error(f"Error setting tempo: {str(e)}")
return f"Error setting tempo: {str(e)}"
@mcp.tool()
def load_instrument_or_effect(ctx: Context, track_index: int, uri: str) -> str:
"""
Load an instrument or effect onto a track using its URI.
Parameters:
- track_index: The index of the track to load the instrument on
- uri: The URI of the instrument or effect to load (e.g., 'query:Synths#Instrument%20Rack:Bass:FileId_5116')
"""
try:
ableton = get_ableton_connection()
result = ableton.send_command("load_browser_item", {
"track_index": track_index,
"item_uri": uri
})
# Check if the instrument was loaded successfully
if result.get("loaded", False):
new_devices = result.get("new_devices", [])
if new_devices:
return f"Loaded instrument with URI '{uri}' on track {track_index}. New devices: {', '.join(new_devices)}"
else:
devices = result.get("devices_after", [])
return f"Loaded instrument with URI '{uri}' on track {track_index}. Devices on track: {', '.join(devices)}"
else:
return f"Failed to load instrument with URI '{uri}'"
except Exception as e:
logger.error(f"Error loading instrument by URI: {str(e)}")
return f"Error loading instrument by URI: {str(e)}"
@mcp.tool()
def fire_clip(ctx: Context, track_index: int, clip_index: int) -> str:
"""
Start playing a clip.
Parameters:
- track_index: The index of the track containing the clip
- clip_index: The index of the clip slot containing the clip
"""
try:
ableton = get_ableton_connection()
result = ableton.send_command("fire_clip", {
"track_index": track_index,
"clip_index": clip_index
})
return f"Started playing clip at track {track_index}, slot {clip_index}"
except Exception as e:
logger.error(f"Error firing clip: {str(e)}")
return f"Error firing clip: {str(e)}"
@mcp.tool()
def stop_clip(ctx: Context, track_index: int, clip_index: int) -> str:
"""
Stop playing a clip.
Parameters:
- track_index: The index of the track containing the clip
- clip_index: The index of the clip slot containing the clip
"""
try:
ableton = get_ableton_connection()
result = ableton.send_command("stop_clip", {
"track_index": track_index,
"clip_index": clip_index
})
return f"Stopped clip at track {track_index}, slot {clip_index}"
except Exception as e:
logger.error(f"Error stopping clip: {str(e)}")
return f"Error stopping clip: {str(e)}"
@mcp.tool()
def start_playback(ctx: Context) -> str:
"""Start playing the Ableton session."""
try:
ableton = get_ableton_connection()
result = ableton.send_command("start_playback")
return "Started playback"
except Exception as e:
logger.error(f"Error starting playback: {str(e)}")
return f"Error starting playback: {str(e)}"
@mcp.tool()
def stop_playback(ctx: Context) -> str:
"""Stop playing the Ableton session."""
try:
ableton = get_ableton_connection()
result = ableton.send_command("stop_playback")
return "Stopped playback"
except Exception as e:
logger.error(f"Error stopping playback: {str(e)}")
return f"Error stopping playback: {str(e)}"
@mcp.tool()
def get_browser_tree(ctx: Context, category_type: str = "all") -> str:
"""
Get a hierarchical tree of browser categories from Ableton.
Parameters:
- category_type: Type of categories to get ('all', 'instruments', 'sounds', 'drums', 'audio_effects', 'midi_effects')
"""
try:
ableton = get_ableton_connection()
result = ableton.send_command("get_browser_tree", {
"category_type": category_type
})
# Check if we got any categories
if "available_categories" in result and len(result.get("categories", [])) == 0:
available_cats = result.get("available_categories", [])
return (f"No categories found for '{category_type}'. "
f"Available browser categories: {', '.join(available_cats)}")
# Format the tree in a more readable way
total_folders = result.get("total_folders", 0)
formatted_output = f"Browser tree for '{category_type}' (showing {total_folders} folders):\n\n"
def format_tree(item, indent=0):
output = ""
if item:
prefix = " " * indent
name = item.get("name", "Unknown")
path = item.get("path", "")
has_more = item.get("has_more", False)
# Add this item
output += f"{prefix}• {name}"
if path:
output += f" (path: {path})"
if has_more:
output += " [...]"
output += "\n"
# Add children
for child in item.get("children", []):
output += format_tree(child, indent + 1)
return output
# Format each category
for category in result.get("categories", []):
formatted_output += format_tree(category)
formatted_output += "\n"
return formatted_output
except Exception as e:
error_msg = str(e)
if "Browser is not available" in error_msg:
logger.error(f"Browser is not available in Ableton: {error_msg}")
return f"Error: The Ableton browser is not available. Make sure Ableton Live is fully loaded and try again."
elif "Could not access Live application" in error_msg:
logger.error(f"Could not access Live application: {error_msg}")
return f"Error: Could not access the Ableton Live application. Make sure Ableton Live is running and the Remote Script is loaded."
else:
logger.error(f"Error getting browser tree: {error_msg}")
return f"Error getting browser tree: {error_msg}"
@mcp.tool()
def get_browser_items_at_path(ctx: Context, path: str) -> str:
"""
Get browser items at a specific path in Ableton's browser.
Parameters:
- path: Path in the format "category/folder/subfolder"
where category is one of the available browser categories in Ableton
"""
try:
ableton = get_ableton_connection()
result = ableton.send_command("get_browser_items_at_path", {
"path": path
})
# Check if there was an error with available categories
if "error" in result and "available_categories" in result:
error = result.get("error", "")
available_cats = result.get("available_categories", [])
return (f"Error: {error}\n"
f"Available browser categories: {', '.join(available_cats)}")
return json.dumps(result, indent=2)
except Exception as e:
error_msg = str(e)
if "Browser is not available" in error_msg:
logger.error(f"Browser is not available in Ableton: {error_msg}")
return f"Error: The Ableton browser is not available. Make sure Ableton Live is fully loaded and try again."
elif "Could not access Live application" in error_msg:
logger.error(f"Could not access Live application: {error_msg}")
return f"Error: Could not access the Ableton Live application. Make sure Ableton Live is running and the Remote Script is loaded."
elif "Unknown or unavailable category" in error_msg:
logger.error(f"Invalid browser category: {error_msg}")
return f"Error: {error_msg}. Please check the available categories using get_browser_tree."
elif "Path part" in error_msg and "not found" in error_msg:
logger.error(f"Path not found: {error_msg}")
return f"Error: {error_msg}. Please check the path and try again."
else:
logger.error(f"Error getting browser items at path: {error_msg}")
return f"Error getting browser items at path: {error_msg}"
@mcp.tool()
def load_drum_kit(ctx: Context, track_index: int, rack_uri: str, kit_path: str) -> str:
"""
Load a drum rack and then load a specific drum kit into it.
Parameters:
- track_index: The index of the track to load on
- rack_uri: The URI of the drum rack to load (e.g., 'Drums/Drum Rack')
- kit_path: Path to the drum kit inside the browser (e.g., 'drums/acoustic/kit1')
"""
try:
ableton = get_ableton_connection()
# Step 1: Load the drum rack
result = ableton.send_command("load_browser_item", {
"track_index": track_index,
"item_uri": rack_uri
})
if not result.get("loaded", False):
return f"Failed to load drum rack with URI '{rack_uri}'"
# Step 2: Get the drum kit items at the specified path
kit_result = ableton.send_command("get_browser_items_at_path", {
"path": kit_path
})
if "error" in kit_result:
return f"Loaded drum rack but failed to find drum kit: {kit_result.get('error')}"
# Step 3: Find a loadable drum kit
kit_items = kit_result.get("items", [])
loadable_kits = [item for item in kit_items if item.get("is_loadable", False)]
if not loadable_kits:
return f"Loaded drum rack but no loadable drum kits found at '{kit_path}'"
# Step 4: Load the first loadable kit
kit_uri = loadable_kits[0].get("uri")
load_result = ableton.send_command("load_browser_item", {
"track_index": track_index,
"item_uri": kit_uri
})
return f"Loaded drum rack and kit '{loadable_kits[0].get('name')}' on track {track_index}"
except Exception as e:
logger.error(f"Error loading drum kit: {str(e)}")
return f"Error loading drum kit: {str(e)}"
# Main execution
def main():
"""Run the MCP server"""
mcp.run()
if __name__ == "__main__":
main()