Skip to main content
Glama

Carla MCP Server

by agrathwohl
plugin_tools.py•27.1 kB
#!/usr/bin/env python3 """ Plugin Control Tools for Carla MCP Server """ import os import time import logging import subprocess from typing import Dict, Any, List, Optional from pathlib import Path import threading import wave import numpy as np logger = logging.getLogger(__name__) class PluginTools: """Plugin management and control tools for Carla""" def __init__(self, carla_controller): """Initialize plugin tools Args: carla_controller: CarlaController instance """ self.carla = carla_controller self.plugin_cache = {} self.processing_queue = [] logger.info("PluginTools initialized") async def execute(self, tool_name: str, arguments: dict) -> dict: """Execute a plugin tool Args: tool_name: Name of the tool to execute arguments: Tool arguments Returns: Tool execution result """ if tool_name == "load_plugin": return await self.load_plugin(**arguments) elif tool_name == "scan_plugins": return await self.scan_plugins(**arguments) elif tool_name == "control_plugin": return await self.control_plugin(**arguments) elif tool_name == "batch_process": return await self.batch_process(**arguments) elif tool_name == "list_plugins": return await self.list_plugins(**arguments) elif tool_name == "get_plugin_info": return await self.get_plugin_info(**arguments) elif tool_name == "clone_plugin": return await self.clone_plugin(**arguments) elif tool_name == "replace_plugin": return await self.replace_plugin(**arguments) else: raise ValueError(f"Unknown plugin tool: {tool_name}") async def load_plugin(self, path: str, type: str, position: int = -1, preset: Optional[str] = None, session_context: dict = None, **kwargs) -> dict: """Load a plugin Args: path: Plugin path or URI type: Plugin type (VST2, VST3, LV2, etc.) position: Rack position (-1 for end) preset: Optional preset to load Returns: Plugin information """ try: # Ensure engine is running if not self.carla.engine_running: if not self.carla.start_engine(): raise Exception("Failed to start Carla engine - cannot load plugins") # Convert type string to enum from carla_controller import PluginType plugin_type_map = { 'VST2': PluginType.VST2, 'VST3': PluginType.VST3, 'LV2': PluginType.LV2, 'LADSPA': PluginType.LADSPA, 'DSSI': PluginType.DSSI, 'AU': PluginType.AU, 'SF2': PluginType.SF2, 'SFZ': PluginType.SFZ, 'JACK': PluginType.JACK } plugin_type = plugin_type_map.get(type.upper()) if plugin_type is None: raise ValueError(f"Unknown plugin type: {type}") # Load the plugin plugin_id = self.carla.load_plugin(path, plugin_type, preset=preset) if plugin_id is None: raise Exception(f"Failed to load plugin: {path}") # Get plugin information info = self.carla.host.get_plugin_info(plugin_id) # Get parameter list parameters = self.carla.list_parameters(plugin_id) # Get I/O configuration using the correct API methods audio_info = self.carla.host.get_audio_port_count_info(plugin_id) midi_info = self.carla.host.get_midi_port_count_info(plugin_id) io_config = { 'audio_ins': audio_info.get('ins', 0) if audio_info else 0, 'audio_outs': audio_info.get('outs', 0) if audio_info else 0, 'midi_ins': midi_info.get('ins', 0) if midi_info else 0, 'midi_outs': midi_info.get('outs', 0) if midi_info else 0, 'cv_ins': 0, # CV ports not directly accessible via API 'cv_outs': 0 # CV ports not directly accessible via API } # Store in cache self.plugin_cache[plugin_id] = { 'path': path, 'type': type, 'info': info, 'parameters': parameters, 'io_config': io_config } logger.info(f"Loaded plugin {plugin_id}: {info.get('name', path) if info else path}") return { 'success': True, 'plugin_id': plugin_id, 'name': info.get('name', Path(path).stem) if info else Path(path).stem, 'maker': info.get('maker', 'Unknown') if info else 'Unknown', 'category': info.get('category', 'Unknown') if info else 'Unknown', 'parameters': len(parameters), 'io_config': io_config, 'position': position } except Exception as e: logger.error(f"Failed to load plugin: {str(e)}") return { 'success': False, 'error': str(e) } async def scan_plugins(self, directory: str, formats: Optional[List[str]] = None, recursive: bool = True, session_context: dict = None, **kwargs) -> dict: """Scan directory for plugins Args: directory: Directory to scan formats: Plugin formats to scan (None for all) recursive: Scan recursively Returns: List of found plugins """ try: if formats is None: formats = ['VST2', 'VST3', 'LV2'] found_plugins = [] errors = [] # Define file extensions for each format format_extensions = { 'VST2': ['.dll', '.so', '.dylib'], 'VST3': ['.vst3'], 'LV2': ['.lv2'], 'LADSPA': ['.so'], 'DSSI': ['.so'] } # Scan directory path = Path(directory) if not path.exists(): raise Exception(f"Directory not found: {directory}") # Get all files if recursive: files = list(path.rglob('*')) else: files = list(path.glob('*')) # Filter by format for file_path in files: # Check files for VST2, LADSPA, DSSI plugins if file_path.is_file(): for format_type in formats: extensions = format_extensions.get(format_type, []) if any(str(file_path).endswith(ext) for ext in extensions): # Try to get plugin info (quick scan) plugin_info = { 'path': str(file_path), 'name': file_path.stem, 'format': format_type, 'size': file_path.stat().st_size } found_plugins.append(plugin_info) logger.debug(f"Found {format_type} plugin: {file_path}") # Check directories for LV2 and VST3 bundles elif file_path.is_dir(): for format_type in formats: extensions = format_extensions.get(format_type, []) if any(str(file_path).endswith(ext) for ext in extensions): # Calculate directory size try: size = sum(f.stat().st_size for f in file_path.rglob('*') if f.is_file()) except: size = 0 plugin_info = { 'path': str(file_path), 'name': file_path.stem, 'format': format_type, 'size': size } found_plugins.append(plugin_info) logger.debug(f"Found {format_type} bundle: {file_path}") # Legacy VST3 handling (kept for compatibility) elif file_path.is_dir() and file_path.suffix == '.vst3' and 'VST3' in formats: plugin_info = { 'path': str(file_path), 'name': file_path.stem, 'format': 'VST3', 'size': sum(f.stat().st_size for f in file_path.rglob('*') if f.is_file()) } found_plugins.append(plugin_info) logger.debug(f"Found VST3 bundle: {file_path}") # Sort by name found_plugins.sort(key=lambda x: x['name'].lower()) logger.info(f"Scanned {directory}: found {len(found_plugins)} plugins") return { 'success': True, 'plugins': found_plugins, 'total': len(found_plugins), 'formats_scanned': formats, 'errors': errors, 'directory': directory } except Exception as e: logger.error(f"Failed to scan plugins: {str(e)}") return { 'success': False, 'error': str(e) } async def control_plugin(self, plugin_id: str, action: str, fade_ms: int = 0, session_context: dict = None, **kwargs) -> dict: """Control plugin state Args: plugin_id: Plugin ID action: Action to perform (activate, bypass, solo, remove) fade_ms: Fade time in milliseconds Returns: New plugin state """ try: plugin_id = int(plugin_id) if action == "activate": if fade_ms > 0: # Fade in await self._fade_plugin(plugin_id, 0.0, 1.0, fade_ms) self.carla.set_plugin_active(plugin_id, True) elif action == "bypass": if fade_ms > 0: # Fade out await self._fade_plugin(plugin_id, 1.0, 0.0, fade_ms) self.carla.set_plugin_active(plugin_id, False) elif action == "solo": # Bypass all other plugins for pid in self.carla.plugins: if pid != plugin_id: self.carla.set_plugin_active(pid, False) self.carla.set_plugin_active(plugin_id, True) elif action == "remove": if fade_ms > 0: # Fade out before removing await self._fade_plugin(plugin_id, 1.0, 0.0, fade_ms) success = self.carla.remove_plugin(plugin_id) if not success: raise Exception(f"Failed to remove plugin {plugin_id}") # Remove from cache if plugin_id in self.plugin_cache: del self.plugin_cache[plugin_id] return { 'success': True, 'plugin_id': plugin_id, 'action': action, 'removed': True } else: raise ValueError(f"Unknown action: {action}") # Get new state new_state = { 'active': self.carla.plugins[plugin_id]['active'] if plugin_id in self.carla.plugins else False, 'volume': self.carla.plugins.get(plugin_id, {}).get('volume', 1.0), 'cpu_usage': self.carla.get_cpu_load(plugin_id) } logger.info(f"Plugin {plugin_id} action: {action}") return { 'success': True, 'plugin_id': plugin_id, 'action': action, 'new_state': new_state, 'fade_applied': fade_ms > 0 } except Exception as e: logger.error(f"Failed to control plugin: {str(e)}") return { 'success': False, 'error': str(e) } async def _fade_plugin(self, plugin_id: int, start_vol: float, end_vol: float, duration_ms: int): """Fade plugin volume Args: plugin_id: Plugin ID start_vol: Starting volume (0.0 to 1.0) end_vol: Ending volume (0.0 to 1.0) duration_ms: Fade duration in milliseconds """ steps = int(duration_ms / 10) # 10ms steps for i in range(steps): progress = i / steps volume = start_vol + (end_vol - start_vol) * progress # Note: Carla doesn't have set_volume method, use internal state if plugin_id in self.carla.plugins: self.carla.plugins[plugin_id]['volume'] = volume time.sleep(0.01) # Set final volume if plugin_id in self.carla.plugins: self.carla.plugins[plugin_id]['volume'] = end_vol # Use Carla's set_volume method self.carla.host.set_volume(plugin_id, end_vol) async def batch_process(self, input_file: str, plugin_chain: List[str], output_format: Optional[dict] = None, normalize: bool = True, session_context: dict = None, **kwargs) -> dict: """Apply plugin chain to audio file NOTE: Carla's Python API does not provide offline/batch rendering. This sets up the plugin chain which can be used for real-time processing. Args: input_file: Input audio file path plugin_chain: List of plugin IDs to apply output_format: Output format settings normalize: Normalize output Returns: Processing setup result """ try: if output_format is None: output_format = { 'sample_rate': 48000, 'bit_depth': 24, 'format': 'wav' } # Verify all plugins in chain exist for plugin_id_str in plugin_chain: plugin_id = int(plugin_id_str) if plugin_id not in self.carla.plugins: raise ValueError(f"Plugin {plugin_id} not found") # Set up the plugin chain connections connections_made = [] if len(plugin_chain) > 1: for i in range(len(plugin_chain) - 1): src_id = int(plugin_chain[i]) dst_id = int(plugin_chain[i + 1]) # Connect plugins in series success = self.carla.connect_audio(src_id, 0, dst_id, 0) if success: connections_made.append(f"{src_id} -> {dst_id}") # Activate all plugins in chain for plugin_id_str in plugin_chain: plugin_id = int(plugin_id_str) self.carla.set_plugin_active(plugin_id, True) # Get real peak levels from the chain peak_data = {} for plugin_id_str in plugin_chain: plugin_id = int(plugin_id_str) peaks = self.carla.get_audio_peaks(plugin_id) peak_data[plugin_id] = peaks logger.info(f"Set up plugin chain with {len(plugin_chain)} plugins") return { 'success': True, 'note': 'Carla Python API does not support offline rendering. Plugin chain configured for real-time processing.', 'input_file': input_file, 'plugin_chain': plugin_chain, 'connections_made': connections_made, 'plugins_activated': len(plugin_chain), 'peak_data': peak_data, 'output_format': output_format, 'real_time_processing': True } except Exception as e: logger.error(f"Failed to set up plugin chain: {str(e)}") return { 'success': False, 'error': str(e) } async def list_plugins(self, session_context: dict = None, **kwargs) -> dict: """List all loaded plugins Returns: List of loaded plugins """ try: plugins = [] for plugin_id, plugin_data in self.carla.plugins.items(): info = self.carla.host.get_plugin_info(plugin_id) audio_info = self.carla.host.get_audio_port_count_info(plugin_id) plugins.append({ 'id': plugin_id, 'name': plugin_data['name'], 'type': plugin_data.get('type', 'Unknown'), 'active': plugin_data['active'], 'cpu_usage': self.carla.get_cpu_load(plugin_id), 'latency': 0, # Latency not available in Carla API 'parameters': self.carla.host.get_parameter_count(plugin_id), 'audio_ins': audio_info.get('ins', 0) if audio_info else 0, 'audio_outs': audio_info.get('outs', 0) if audio_info else 0 }) return { 'success': True, 'plugins': plugins, 'total': len(plugins), 'total_cpu': sum(p['cpu_usage'] for p in plugins) } except Exception as e: logger.error(f"Failed to list plugins: {str(e)}") return { 'success': False, 'error': str(e) } async def get_plugin_info(self, plugin_id: str, session_context: dict = None, **kwargs) -> dict: """Get detailed plugin information Args: plugin_id: Plugin ID Returns: Detailed plugin information """ try: plugin_id = int(plugin_id) if plugin_id not in self.carla.plugins: raise Exception(f"Plugin not found: {plugin_id}") # Get basic info info = self.carla.host.get_plugin_info(plugin_id) plugin_data = self.carla.plugins[plugin_id] # Get parameters parameters = self.carla.list_parameters(plugin_id) # Get current state using internal parameters (there are no get_ methods for these) # Internal parameters: 0=active, 1=drywet, 2=volume, 3=balance-left, 4=balance-right, 5=panning state = { 'active': plugin_data['active'], 'volume': self.carla.plugins.get(plugin_id, {}).get('volume', 1.0), 'drywet': self.carla.host.get_internal_parameter_value(plugin_id, 1) if plugin_id >= 0 else 1.0, 'balance_left': self.carla.host.get_internal_parameter_value(plugin_id, 3) if plugin_id >= 0 else 0.0, 'balance_right': self.carla.host.get_internal_parameter_value(plugin_id, 4) if plugin_id >= 0 else 0.0, 'panning': self.carla.host.get_internal_parameter_value(plugin_id, 5) if plugin_id >= 0 else 0.0 } # Get programs/presets program_count = self.carla.host.get_program_count(plugin_id) current_program = self.carla.host.get_current_program_index(plugin_id) programs = [] for i in range(program_count): programs.append({ 'index': i, 'name': self.carla.host.get_program_name(plugin_id, i), 'is_current': i == current_program }) # Get audio peaks peaks = self.carla.get_audio_peaks(plugin_id) return { 'success': True, 'plugin_id': plugin_id, 'name': info['name'] if info else plugin_data['name'], 'label': info['label'] if info else '', 'maker': info['maker'] if info else 'Unknown', 'copyright': info['copyright'] if info else '', 'unique_id': info['uniqueId'] if info else 0, 'category': info['category'] if info else 'Unknown', 'state': state, 'parameters': parameters, 'programs': programs, 'current_program': current_program, 'peaks': peaks, 'cpu_usage': self.carla.get_cpu_load(plugin_id), 'latency': 0 # Latency not available in Carla API } except Exception as e: logger.error(f"Failed to get plugin info: {str(e)}") return { 'success': False, 'error': str(e) } async def clone_plugin(self, plugin_id: str, session_context: dict = None, **kwargs) -> dict: """Clone a plugin with its current settings Args: plugin_id: Plugin ID to clone Returns: New plugin information """ try: plugin_id = int(plugin_id) if plugin_id not in self.carla.plugins: raise Exception(f"Plugin not found: {plugin_id}") # Clone the plugin success = self.carla.host.clone_plugin(plugin_id) if not success: raise Exception(f"Failed to clone plugin {plugin_id}") # Get new plugin ID new_plugin_id = self.carla.host.get_current_plugin_count() - 1 # Copy plugin data original_data = self.carla.plugins[plugin_id] self.carla.plugins[new_plugin_id] = original_data.copy() self.carla.plugins[new_plugin_id]['id'] = new_plugin_id logger.info(f"Cloned plugin {plugin_id} to {new_plugin_id}") return { 'success': True, 'original_id': plugin_id, 'new_id': new_plugin_id, 'name': original_data['name'] } except Exception as e: logger.error(f"Failed to clone plugin: {str(e)}") return { 'success': False, 'error': str(e) } async def replace_plugin(self, plugin_id: str, new_path: str, new_type: str, preserve_connections: bool = True, session_context: dict = None, **kwargs) -> dict: """Replace a plugin with another while preserving connections Args: plugin_id: Plugin ID to replace new_path: Path to new plugin new_type: New plugin type preserve_connections: Preserve audio connections Returns: Replacement result """ try: plugin_id = int(plugin_id) if plugin_id not in self.carla.plugins: raise Exception(f"Plugin not found: {plugin_id}") # Store current connections if preserving connections = [] if preserve_connections: # Store connections involving this plugin for conn in self.carla.connections: if conn['source']['plugin'] == plugin_id or conn['dest']['plugin'] == plugin_id: connections.append(conn.copy()) # Remove old plugin success = self.carla.remove_plugin(plugin_id) if not success: raise Exception(f"Failed to remove plugin {plugin_id}") # Load new plugin at same position from carla_controller import PluginType plugin_type_map = { 'VST2': PluginType.VST2, 'VST3': PluginType.VST3, 'LV2': PluginType.LV2, 'LADSPA': PluginType.LADSPA } plugin_type = plugin_type_map.get(new_type.upper()) if plugin_type is None: raise ValueError(f"Unknown plugin type: {new_type}") # Load new plugin new_plugin_id = self.carla.load_plugin(new_path, plugin_type) if new_plugin_id is None: raise Exception(f"Failed to load replacement plugin: {new_path}") # Restore connections if requested if preserve_connections and connections: # Restore connections with the new plugin ID for conn in connections: if conn['source']['plugin'] == plugin_id: # This plugin was the source self.carla.connect_audio( new_plugin_id, conn['source']['port'], conn['dest']['plugin'], conn['dest']['port'] ) elif conn['dest']['plugin'] == plugin_id: # This plugin was the destination self.carla.connect_audio( conn['source']['plugin'], conn['source']['port'], new_plugin_id, conn['dest']['port'] ) logger.info(f"Replaced plugin {plugin_id} with {new_plugin_id}") return { 'success': True, 'old_id': plugin_id, 'new_id': new_plugin_id, 'new_path': new_path, 'connections_preserved': preserve_connections } except Exception as e: logger.error(f"Failed to replace plugin: {str(e)}") return { 'success': False, 'error': str(e) }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/agrathwohl/carla-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server