connection.py•9.77 kB
"""Ableton Live connection management."""
from dataclasses import dataclass
import json
import socket
from typing import Any, Dict
from ..utils.logging import get_logger
logger = get_logger("AbletonMCPServer")
@dataclass
class AbletonConnection:
"""Manages connection to Ableton Live Remote Script via socket."""
host: str
port: int
sock: socket.socket = None
def connect(self) -> bool:
"""Connect to the Ableton Remote Script socket server"""
if self.sock:
return True
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((self.host, self.port))
logger.info(f"Connected to Ableton at {self.host}:{self.port}")
return True
except Exception as e:
logger.error(f"Failed to connect to Ableton: {str(e)}")
self.sock = None
return False
def disconnect(self) -> None:
"""Disconnect from the Ableton Remote Script"""
if self.sock:
try:
self.sock.close()
except Exception as e:
logger.error(f"Error disconnecting from Ableton: {str(e)}")
finally:
self.sock = None
def receive_full_response(self, sock: socket.socket, buffer_size: int = 8192) -> bytes:
"""Receive the complete response, potentially in multiple chunks"""
chunks = []
sock.settimeout(15.0) # Increased timeout for operations that might take longer
try:
while True:
try:
chunk = sock.recv(buffer_size)
if not chunk:
if not chunks:
raise Exception("Connection closed before receiving any data")
break
chunks.append(chunk)
# Check if we've received a complete JSON object
try:
data = b"".join(chunks)
json.loads(data.decode("utf-8"))
logger.info(f"Received complete response ({len(data)} bytes)")
return data
except json.JSONDecodeError:
# Incomplete JSON, continue receiving
continue
except TimeoutError:
logger.warning("Socket timeout during chunked receive")
break
except (ConnectionError, BrokenPipeError, ConnectionResetError) as e:
logger.error(f"Socket connection error during receive: {str(e)}")
raise
except Exception as e:
logger.error(f"Error during receive: {str(e)}")
raise
# If we get here, we either timed out or broke out of the loop
if chunks:
data = b"".join(chunks)
logger.info(f"Returning data after receive completion ({len(data)} bytes)")
try:
json.loads(data.decode("utf-8"))
return data
except json.JSONDecodeError:
raise Exception("Incomplete JSON response received")
else:
raise Exception("No data received")
def send_command(self, command_type: str, params: dict[str, Any] = None) -> dict[str, Any]:
"""Send a command to Ableton and return the response"""
if not self.sock and not self.connect():
raise ConnectionError("Not connected to Ableton")
command = {"type": command_type, "params": params or {}}
# Check if this is a state-modifying command
is_modifying_command = command_type in [
"create_midi_track",
"create_audio_track",
"set_track_name",
"create_clip",
"add_notes_to_clip",
"set_clip_name",
"set_tempo",
"fire_clip",
"stop_clip",
"set_device_parameter",
"start_playback",
"stop_playback",
"load_instrument_or_effect",
]
try:
logger.info(f"Sending command: {command_type} with params: {params}")
# Send the command
self.sock.sendall(json.dumps(command).encode("utf-8"))
logger.info("Command sent, waiting for response...")
# For state-modifying commands, add a small delay to give Ableton time to process
if is_modifying_command:
import time
time.sleep(0.1) # 100ms delay
# Set timeout based on command type
timeout = 15.0 if is_modifying_command else 10.0
self.sock.settimeout(timeout)
# Receive the response
response_data = self.receive_full_response(self.sock)
logger.info(f"Received {len(response_data)} bytes of data")
# Parse the response
response = json.loads(response_data.decode("utf-8"))
logger.info(f"Response parsed, status: {response.get('status', 'unknown')}")
if response.get("status") == "error":
logger.error(f"Ableton error: {response.get('message')}")
raise Exception(response.get("message", "Unknown error from Ableton"))
# For state-modifying commands, add another small delay after receiving response
if is_modifying_command:
import time
time.sleep(0.1) # 100ms delay
return response.get("result", {})
except TimeoutError:
logger.error("Socket timeout while waiting for response from Ableton")
self.sock = None
raise Exception("Timeout waiting for Ableton response")
except (ConnectionError, BrokenPipeError, ConnectionResetError) as e:
logger.error(f"Socket connection error: {str(e)}")
self.sock = None
raise Exception(f"Connection to Ableton lost: {str(e)}")
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON response from Ableton: {str(e)}")
if "response_data" in locals() and response_data:
raw_data = response_data[:200]
logger.error(f"Raw response (first 200 bytes): {raw_data!r}")
self.sock = None
raise Exception(f"Invalid response from Ableton: {str(e)}")
except Exception as e:
logger.error(f"Error communicating with Ableton: {str(e)}")
self.sock = None
raise Exception(f"Communication error with Ableton: {str(e)}")
# Global connection for resources
_ableton_connection = None
def get_ableton_connection() -> AbletonConnection:
"""Get or create a persistent Ableton connection"""
global _ableton_connection
if _ableton_connection is not None:
try:
# Test the connection with a simple ping
# We'll try to send an empty message, which should fail if the connection is dead
# but won't affect Ableton if it's alive
_ableton_connection.sock.settimeout(1.0)
_ableton_connection.sock.sendall(b"")
return _ableton_connection
except Exception as e:
logger.warning(f"Existing connection is no longer valid: {str(e)}")
try:
_ableton_connection.disconnect()
except:
pass
_ableton_connection = None
# Connection doesn't exist or is invalid, create a new one
if _ableton_connection is None:
# Try to connect up to 3 times with a short delay between attempts
max_attempts = 3
for attempt in range(1, max_attempts + 1):
try:
logger.info(f"Connecting to Ableton (attempt {attempt}/{max_attempts})...")
_ableton_connection = AbletonConnection(host="localhost", port=9877)
if _ableton_connection.connect():
logger.info("Created new persistent connection to Ableton")
# Validate connection with a simple command
try:
# Get session info as a test
_ableton_connection.send_command("get_session_info")
logger.info("Connection validated successfully")
return _ableton_connection
except Exception as e:
logger.error(f"Connection validation failed: {str(e)}")
_ableton_connection.disconnect()
_ableton_connection = None
# Continue to next attempt
else:
_ableton_connection = None
except Exception as e:
logger.error(f"Connection attempt {attempt} failed: {str(e)}")
if _ableton_connection:
_ableton_connection.disconnect()
_ableton_connection = None
# Wait before trying again, but only if we have more attempts left
if attempt < max_attempts:
import time
time.sleep(1.0)
# If we get here, all connection attempts failed
if _ableton_connection is None:
logger.error("Failed to connect to Ableton after multiple attempts")
raise Exception("Could not connect to Ableton. Make sure the Remote Script is running.")
return _ableton_connection
def disconnect_global_connection() -> None:
"""Disconnect the global connection"""
global _ableton_connection
if _ableton_connection:
logger.info("Disconnecting from Ableton")
_ableton_connection.disconnect()
_ableton_connection = None