Skip to main content
Glama

Carla MCP Server

by agrathwohl
analysis_tools.py•22.9 kB
#!/usr/bin/env python3 """ Audio Analysis Tools for Carla MCP Server """ import logging import numpy as np import time from typing import Dict, Any, List, Optional, Union from scipy import signal logger = logging.getLogger(__name__) class AnalysisTools: """Audio analysis tools for Carla""" def __init__(self, carla_controller): """Initialize analysis tools Args: carla_controller: CarlaController instance """ self.carla = carla_controller self.analysis_cache = {} logger.info("AnalysisTools initialized") async def execute(self, tool_name: str, arguments: dict) -> dict: """Execute an analysis tool Args: tool_name: Name of the tool to execute arguments: Tool arguments Returns: Tool execution result """ if tool_name == "analyze_spectrum": return await self.analyze_spectrum(**arguments) elif tool_name == "measure_levels": return await self.measure_levels(**arguments) elif tool_name == "capture_plugin_parameters": return await self.capture_plugin_parameters(**arguments) elif tool_name == "detect_feedback": return await self.detect_feedback(**arguments) elif tool_name == "analyze_latency": return await self.analyze_latency(**arguments) else: raise ValueError(f"Unknown analysis tool: {tool_name}") async def analyze_spectrum(self, source: str, fft_size: int = 2048, window: str = "hann", session_context: dict = None, **kwargs) -> dict: """Perform spectrum analysis NOTE: Carla's Python API does not provide direct access to audio buffers for FFT analysis. This returns real peak data instead of spectrum data. Args: source: Plugin ID or bus ID fft_size: FFT size (512-8192) - kept for API compatibility window: Window function - kept for API compatibility Returns: Available audio analysis data """ try: plugin_id = int(source) if source.isdigit() else None if plugin_id is None or plugin_id not in self.carla.plugins: raise ValueError(f"Invalid plugin ID: {source}") # Get REAL audio peaks from Carla peaks = self.carla.get_audio_peaks(plugin_id) # Get REAL sample rate sample_rate = self.carla.host.get_sample_rate() if self.carla.engine_running else 48000 # Calculate approximate frequency response based on peaks # This is the best we can do without direct buffer access peak_avg = (peaks['out_left'] + peaks['out_right']) / 2.0 return { 'success': True, 'source': source, 'note': 'Carla Python API does not expose audio buffers for FFT. Using peak analysis instead.', 'sample_rate': sample_rate, 'peaks': peaks, 'peak_average': peak_avg, 'peak_db': 20 * np.log10(peak_avg + 1e-10), 'fft_size': fft_size, # Kept for compatibility 'window': window # Kept for compatibility } except Exception as e: logger.error(f"Failed to analyze audio: {str(e)}") return { 'success': False, 'error': str(e) } async def measure_levels(self, source: str, window_ms: int = 100, include_history: bool = False, capture_duration_ms: Optional[int] = None, silence_threshold_db: Optional[float] = None, session_context: dict = None, **kwargs) -> dict: """Measure REAL audio levels using Carla API Args: source: Plugin ID or bus ID window_ms: Measurement window in milliseconds (for history collection) include_history: Include historical data capture_duration_ms: Capture for this many milliseconds (e.g., 60000 for 1 minute) silence_threshold_db: Stop capturing when level drops below this (e.g., -50) Returns: REAL audio level measurements from Carla """ try: plugin_id = int(source) if source.isdigit() else None if plugin_id is None or plugin_id not in self.carla.plugins: raise ValueError(f"Invalid plugin ID: {source}") # Get REAL peaks from Carla API peaks = self.carla.get_audio_peaks(plugin_id) # Calculate REAL peak levels in dB peak_left_db = 20 * np.log10(peaks['out_left'] + 1e-10) peak_right_db = 20 * np.log10(peaks['out_right'] + 1e-10) peak_max_db = max(peak_left_db, peak_right_db) # Calculate stereo balance from real peaks if peaks['out_left'] + peaks['out_right'] > 0: balance = (peaks['out_right'] - peaks['out_left']) / (peaks['out_right'] + peaks['out_left']) else: balance = 0.0 result = { 'success': True, 'source': source, 'plugin_name': self.carla.plugins[plugin_id]['name'], 'peak_left_db': float(peak_left_db), 'peak_right_db': float(peak_right_db), 'peak_max_db': float(peak_max_db), 'balance': float(balance), 'peaks_raw': peaks, 'input_peaks': { 'left': peaks['in_left'], 'right': peaks['in_right'], 'left_db': 20 * np.log10(peaks['in_left'] + 1e-10), 'right_db': 20 * np.log10(peaks['in_right'] + 1e-10) }, 'output_peaks': { 'left': peaks['out_left'], 'right': peaks['out_right'], 'left_db': peak_left_db, 'right_db': peak_right_db } } if include_history or capture_duration_ms or silence_threshold_db: # Collect REAL peak history over time import time history = [] # Determine capture mode and duration if capture_duration_ms: # Duration-based capture # For long captures, use a longer sampling interval if int(capture_duration_ms) > 30000: # More than 30 seconds samples = int(capture_duration_ms) // 5000 # Sample every 5 seconds else: samples = int(capture_duration_ms) // 10 # Sample every 10ms capture_mode = "duration" elif silence_threshold_db is not None: # Threshold-based capture (max 60 seconds) samples = 6000 # Max 60 seconds worth capture_mode = "threshold" consecutive_below = 0 else: # Simple history mode samples = min(10, window_ms // 10) capture_mode = "simple" # Determine sampling interval if int(capture_duration_ms) > 30000: interval_ms = 5000 # 5 seconds for long captures else: interval_ms = 10 # 10ms for short captures # Capture loop for i in range(samples): snapshot = self.carla.get_audio_peaks(plugin_id) # Calculate current peak in dB current_peak_db = max( 20 * np.log10(snapshot['out_left'] + 1e-10), 20 * np.log10(snapshot['out_right'] + 1e-10) ) history.append({ 'time_ms': i * interval_ms, 'out_left': snapshot['out_left'], 'out_right': snapshot['out_right'], 'in_left': snapshot['in_left'], 'in_right': snapshot['in_right'], 'peak_db': current_peak_db }) # Check threshold stop condition if capture_mode == "threshold" and silence_threshold_db is not None: if current_peak_db < silence_threshold_db: consecutive_below += 1 if consecutive_below >= 5: # 50ms of silence result['capture_stopped_reason'] = f"Signal below {silence_threshold_db} dB" result['capture_duration_ms'] = i * 10 break else: consecutive_below = 0 time.sleep(interval_ms / 1000.0) # Sleep for the interval result['history'] = history result['capture_mode'] = capture_mode if capture_duration_ms: result['capture_duration_ms'] = len(history) * 10 return result except Exception as e: logger.error(f"Failed to measure levels: {str(e)}") return { 'success': False, 'error': str(e) } async def capture_plugin_parameters(self, plugin_ids: Union[int, List[int]], capture_duration_ms: int = 10000, sampling_interval_ms: int = 100, session_context: dict = None, **kwargs) -> dict: """ Capture all parameter values from one or more plugins over time Args: plugin_ids: Single plugin ID or list of plugin IDs to capture capture_duration_ms: Total duration to capture (default 10 seconds) sampling_interval_ms: Time between samples (default 100ms) Returns: Dictionary with parameter history for each plugin """ try: # Handle string conversion from MCP if isinstance(plugin_ids, str): # Check if it's a string representation of a list if plugin_ids.startswith('['): import json plugin_ids = json.loads(plugin_ids) else: plugin_ids = int(plugin_ids) # Ensure plugin_ids is a list if isinstance(plugin_ids, int): plugin_ids = [plugin_ids] # Validate all plugin IDs exist for pid in plugin_ids: if pid not in self.carla.plugins: raise ValueError(f"Invalid plugin ID: {pid}") import time # Calculate number of samples samples = int(capture_duration_ms / sampling_interval_ms) # Initialize result structure result = { 'success': True, 'capture_duration_ms': capture_duration_ms, 'sampling_interval_ms': sampling_interval_ms, 'samples': samples, 'plugins': {} } # Initialize plugin data structures for pid in plugin_ids: plugin_info = self.carla.plugins[pid] param_count = self.carla.host.get_parameter_count(pid) # Get parameter info for all parameters param_info = [] for i in range(param_count): info = self.carla.host.get_parameter_info(pid, i) data = self.carla.host.get_parameter_data(pid, i) ranges = self.carla.host.get_parameter_ranges(pid, i) param_info.append({ 'index': i, 'name': info['name'] if info else f'param_{i}', 'symbol': info['symbol'] if info else '', 'unit': info['unit'] if info else '', 'min': ranges['min'] if ranges else 0.0, 'max': ranges['max'] if ranges else 1.0 }) result['plugins'][pid] = { 'name': plugin_info['name'], 'parameters': param_info, 'history': [] } # Capture loop for sample_idx in range(samples): sample_time = sample_idx * sampling_interval_ms # Capture all parameters for all plugins for pid in plugin_ids: param_count = self.carla.host.get_parameter_count(pid) sample_data = { 'time_ms': sample_time, 'values': {} } # Read all parameter values for param_idx in range(param_count): value = self.carla.host.get_current_parameter_value(pid, param_idx) param_name = result['plugins'][pid]['parameters'][param_idx]['name'] sample_data['values'][param_name] = value # Also store by index for easier access sample_data['values'][f'param_{param_idx}'] = value result['plugins'][pid]['history'].append(sample_data) # Sleep until next sample time if sample_idx < samples - 1: # Don't sleep after last sample time.sleep(sampling_interval_ms / 1000.0) # Add summary statistics for each plugin for pid in plugin_ids: history = result['plugins'][pid]['history'] if history: # Calculate min/max/avg for each parameter param_stats = {} for param in result['plugins'][pid]['parameters']: param_name = param['name'] values = [h['values'][param_name] for h in history] param_stats[param_name] = { 'min': min(values), 'max': max(values), 'avg': sum(values) / len(values), 'final': values[-1] } result['plugins'][pid]['statistics'] = param_stats return result except Exception as e: logger.error(f"Failed to capture plugin parameters: {str(e)}") return { 'success': False, 'error': str(e) } async def detect_feedback(self, sensitivity: float = 0.8, session_context: dict = None, **kwargs) -> dict: """Detect feedback loops by analyzing REAL routing connections Args: sensitivity: Detection sensitivity (0-1) Returns: REAL feedback detection based on actual routing """ try: feedback_points = [] risky_connections = [] # Analyze REAL connections for feedback loops connections = self.carla.connections # Build connection graph to detect cycles graph = {} for conn in connections: src = f"{conn['source']['plugin']}:{conn['source']['port']}" dst = f"{conn['dest']['plugin']}:{conn['dest']['port']}" if conn['source']['plugin'] not in graph: graph[conn['source']['plugin']] = [] graph[conn['source']['plugin']].append(conn['dest']['plugin']) # Detect cycles in the graph (real feedback loops) def has_cycle(node, visited, rec_stack): visited[node] = True rec_stack[node] = True if node in graph: for neighbor in graph[node]: if neighbor not in visited: visited[neighbor] = False rec_stack[neighbor] = False if not visited[neighbor]: if has_cycle(neighbor, visited, rec_stack): return True elif rec_stack[neighbor]: return True rec_stack[node] = False return False # Check each plugin for cycles for plugin_id in self.carla.plugins: visited = {} rec_stack = {} for p in self.carla.plugins: visited[p] = False rec_stack[p] = False if has_cycle(plugin_id, visited, rec_stack): # Get REAL peak values to check for actual feedback peaks = self.carla.get_audio_peaks(plugin_id) peak_avg = (peaks['out_left'] + peaks['out_right']) / 2.0 # High peaks might indicate feedback if peak_avg > sensitivity: feedback_points.append({ 'plugin_id': plugin_id, 'plugin_name': self.carla.plugins[plugin_id]['name'], 'peak_level': float(peak_avg), 'risk_level': 'high' if peak_avg > 0.9 else 'medium' }) # Check for risky parallel connections for conn in connections: # Check if output level is high if conn['source']['plugin'] in self.carla.plugins: peaks = self.carla.get_audio_peaks(conn['source']['plugin']) if peaks['out_left'] > sensitivity or peaks['out_right'] > sensitivity: risky_connections.append({ 'source': conn['source']['plugin'], 'dest': conn['dest']['plugin'], 'peak_level': max(peaks['out_left'], peaks['out_right']) }) return { 'success': True, 'feedback_detected': len(feedback_points) > 0, 'feedback_points': feedback_points, 'risky_connections': risky_connections, 'total_connections': len(connections), 'sensitivity': sensitivity, 'plugins_analyzed': len(self.carla.plugins) } except Exception as e: logger.error(f"Failed to detect feedback: {str(e)}") return { 'success': False, 'error': str(e) } async def analyze_latency(self, measure_plugins: bool = True, measure_hardware: bool = True, session_context: dict = None, **kwargs) -> dict: """Measure REAL system and hardware latencies from Carla NOTE: Carla Python API does not expose per-plugin latency. Args: measure_plugins: Measure plugin latencies (limited by API) measure_hardware: Measure hardware latency Returns: REAL latency measurements from Carla """ try: total_latency = 0 hardware_latency = 0 buffer_size = 0 sample_rate = 48000 # Default if self.carla.engine_running: # Get REAL buffer size and sample rate from Carla buffer_size = self.carla.host.get_buffer_size() sample_rate = self.carla.host.get_sample_rate() if measure_hardware: # Calculate REAL hardware latency hardware_latency = (buffer_size / sample_rate) * 1000 # ms total_latency = hardware_latency # Account for JACK periods if using JACK # Default JACK uses 2 periods periods = 2 total_hardware_latency = hardware_latency * periods # Plugin latency info plugin_info = { 'note': 'Carla Python API does not expose individual plugin latency values.', 'plugin_count': len(self.carla.plugins), 'plugins': [] } if measure_plugins: # We can at least list the plugins and their processing state for plugin_id, plugin_data in self.carla.plugins.items(): info = self.carla.host.get_plugin_info(plugin_id) plugin_info['plugins'].append({ 'id': plugin_id, 'name': plugin_data['name'], 'active': plugin_data['active'], 'type': info.get('label', 'Unknown') if info else 'Unknown' }) return { 'success': True, 'hardware_latency_ms': hardware_latency, 'total_hardware_latency_ms': total_hardware_latency if measure_hardware else hardware_latency, 'buffer_size_samples': buffer_size, 'sample_rate_hz': sample_rate, 'latency_samples': buffer_size, 'engine_running': self.carla.engine_running, 'plugin_info': plugin_info, 'note': 'Individual plugin latencies not available via Carla Python API' } except Exception as e: logger.error(f"Failed to analyze latency: {str(e)}") return { 'success': False, 'error': str(e) }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/agrathwohl/carla-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server