Skip to main content
Glama

PicoScope MCP Server

device_manager.py17.8 kB
"""Device manager for PicoScope oscilloscopes.""" from typing import Optional, Any import ctypes import numpy as np # Try to import PicoSDK, but handle gracefully if not installed try: from picosdk.discover import find_all_units from picosdk.ps5000a import ps5000a as ps from picosdk.functions import adc2mV, assert_pico_ok, mV2adc PICOSDK_AVAILABLE = True except Exception as e: PICOSDK_AVAILABLE = False print(f"Warning: PicoSDK not available: {e}") print("Server will run but device operations will fail until PicoSDK is installed.") from .models import DeviceInfo, ChannelConfig, TriggerConfig, CaptureData class PicoScopeManager: """Manager for PicoScope device operations.""" def __init__(self): """Initialize the device manager.""" self.current_device: Optional[Any] = None self.device_info: Optional[DeviceInfo] = None self.channel_configs: dict[str, ChannelConfig] = {} self.chandle: Optional[ctypes.c_int16] = None self.status: dict[str, Any] = {} def discover_devices(self) -> list[dict[str, Any]]: """Discover all connected PicoScope devices. Returns: List of discovered devices with their information. """ if not PICOSDK_AVAILABLE: return [] try: devices = find_all_units() device_list = [] for scope in devices: device_list.append( { "info": str(scope.info), "variant": getattr(scope, "variant", "unknown"), } ) scope.close() return device_list except Exception as e: return [] def connect(self, device_index: int = 0, serial: Optional[str] = None) -> bool: """Connect to a PicoScope 5000A device. Args: device_index: Index of device to connect to (0 for first device). serial: Optional serial number string for specific device. Returns: True if connection successful, False otherwise. """ if not PICOSDK_AVAILABLE: return False try: # Create handle for PS5000A self.chandle = ctypes.c_int16() # Set resolution (12-bit default) resolution = ps.PS5000A_DEVICE_RESOLUTION["PS5000A_DR_12BIT"] # Convert serial to bytes if provided serial_bytes = serial.encode() if serial else None # Open the device self.status["openunit"] = ps.ps5000aOpenUnit( ctypes.byref(self.chandle), serial_bytes, resolution ) # Handle power status issues try: assert_pico_ok(self.status["openunit"]) except: power_status = self.status["openunit"] # Power status codes: 286 = USB3.0 powered, 282 = needs external power if power_status in [286, 282]: self.status["changePowerSource"] = ps.ps5000aChangePowerSource( self.chandle, power_status ) assert_pico_ok(self.status["changePowerSource"]) else: raise # Get device info info_string = ctypes.create_string_buffer(256) required_size = ctypes.c_int16(256) # Get variant info self.status["getUnitInfo"] = ps.ps5000aGetUnitInfo( self.chandle, ctypes.byref(info_string), ctypes.c_int16(256), ctypes.byref(required_size), ps.PICO_INFO["PICO_VARIANT_INFO"] ) variant = info_string.value.decode('utf-8') # Get batch and serial self.status["getUnitInfo"] = ps.ps5000aGetUnitInfo( self.chandle, ctypes.byref(info_string), ctypes.c_int16(256), ctypes.byref(required_size), ps.PICO_INFO["PICO_BATCH_AND_SERIAL"] ) batch_serial = info_string.value.decode('utf-8') # Store device information self.device_info = DeviceInfo( handle=self.chandle.value, model="PS5000A", serial=batch_serial, variant=variant, batch_and_serial=batch_serial, max_adc_value=32767, # 15-bit for PS5000A min_adc_value=-32767, num_channels=4, # PS5000A has 4 channels ) return True except Exception as e: self.chandle = None self.device_info = None return False def disconnect(self) -> bool: """Disconnect from current device. Returns: True if disconnection successful, False otherwise. """ try: if self.chandle: # Stop any running capture ps.ps5000aStop(self.chandle) # Close the unit ps.ps5000aCloseUnit(self.chandle) self.chandle = None self.current_device = None self.device_info = None self.channel_configs.clear() self.status.clear() return True except Exception as e: return False def is_connected(self) -> bool: """Check if a device is currently connected. Returns: True if connected, False otherwise. """ return self.current_device is not None def get_info(self) -> Optional[DeviceInfo]: """Get information about the current device. Returns: DeviceInfo object or None if not connected. """ return self.device_info def configure_channel(self, config: ChannelConfig) -> bool: """Configure a channel. Args: config: Channel configuration. Returns: True if successful, False otherwise. """ if not self.is_connected(): return False try: # Map channel letter to PS5000A channel constant channel_map = { "A": ps.PS5000A_CHANNEL["PS5000A_CHANNEL_A"], "B": ps.PS5000A_CHANNEL["PS5000A_CHANNEL_B"], "C": ps.PS5000A_CHANNEL["PS5000A_CHANNEL_C"], "D": ps.PS5000A_CHANNEL["PS5000A_CHANNEL_D"], } if config.channel not in channel_map: return False # Map coupling type coupling = ( ps.PS5000A_COUPLING["PS5000A_AC"] if config.coupling.value == "AC" else ps.PS5000A_COUPLING["PS5000A_DC"] ) # Map voltage range to closest available range range_map = { 0.02: "PS5000A_20MV", 0.05: "PS5000A_50MV", 0.1: "PS5000A_100MV", 0.2: "PS5000A_200MV", 0.5: "PS5000A_500MV", 1.0: "PS5000A_1V", 2.0: "PS5000A_2V", 5.0: "PS5000A_5V", 10.0: "PS5000A_10V", 20.0: "PS5000A_20V", } # Find closest range closest_range = min(range_map.keys(), key=lambda x: abs(x - config.voltage_range)) voltage_range = ps.PS5000A_RANGE[range_map[closest_range]] # Convert analog offset to ADC counts if self.device_info: analog_offset_adc = mV2adc( config.analog_offset * 1000, # V to mV voltage_range, self.device_info.max_adc_value ) else: analog_offset_adc = 0 # Set the channel self.status[f"setCh{config.channel}"] = ps.ps5000aSetChannel( self.chandle, channel_map[config.channel], 1 if config.enabled else 0, coupling, voltage_range, analog_offset_adc ) assert_pico_ok(self.status[f"setCh{config.channel}"]) # Store configuration self.channel_configs[config.channel] = config return True except Exception as e: return False def set_trigger(self, config: TriggerConfig) -> bool: """Set up trigger. Args: config: Trigger configuration. Returns: True if successful, False otherwise. """ if not self.is_connected(): return False try: # Map source to channel source_map = { "A": ps.PS5000A_CHANNEL["PS5000A_CHANNEL_A"], "B": ps.PS5000A_CHANNEL["PS5000A_CHANNEL_B"], "C": ps.PS5000A_CHANNEL["PS5000A_CHANNEL_C"], "D": ps.PS5000A_CHANNEL["PS5000A_CHANNEL_D"], "External": ps.PS5000A_CHANNEL["PS5000A_EXTERNAL"], } if config.source not in source_map: return False # Map trigger direction direction_map = { "Rising": ps.PS5000A_THRESHOLD_DIRECTION["PS5000A_RISING"], "Falling": ps.PS5000A_THRESHOLD_DIRECTION["PS5000A_FALLING"], "Rising_Or_Falling": ps.PS5000A_THRESHOLD_DIRECTION[ "PS5000A_RISING_OR_FALLING" ], } direction = direction_map[config.direction.value] # Convert threshold from mV to ADC counts # Use the configured range for the source channel if available if config.source in self.channel_configs: ch_config = self.channel_configs[config.source] range_map = { 0.02: "PS5000A_20MV", 0.05: "PS5000A_50MV", 0.1: "PS5000A_100MV", 0.2: "PS5000A_200MV", 0.5: "PS5000A_500MV", 1.0: "PS5000A_1V", 2.0: "PS5000A_2V", 5.0: "PS5000A_5V", 10.0: "PS5000A_10V", 20.0: "PS5000A_20V", } closest_range = min( range_map.keys(), key=lambda x: abs(x - ch_config.voltage_range) ) voltage_range = ps.PS5000A_RANGE[range_map[closest_range]] else: voltage_range = ps.PS5000A_RANGE["PS5000A_2V"] # Default threshold_adc = mV2adc( config.threshold_mv, voltage_range, self.device_info.max_adc_value if self.device_info else 32767, ) # Set simple trigger self.status["trigger"] = ps.ps5000aSetSimpleTrigger( self.chandle, 1, # Enable trigger source_map[config.source], threshold_adc, direction, 0, # Delay (samples) config.auto_trigger_ms, # Auto-trigger timeout ) assert_pico_ok(self.status["trigger"]) return True except Exception as e: return False def capture_block( self, pre_trigger: int, post_trigger: int ) -> Optional[dict[str, CaptureData]]: """Capture a block of data. Args: pre_trigger: Number of samples before trigger. post_trigger: Number of samples after trigger. Returns: Dictionary of channel data or None if failed. """ if not self.is_connected(): return None try: total_samples = pre_trigger + post_trigger # Set up buffers for enabled channels buffers = {} channel_map = { "A": ps.PS5000A_CHANNEL["PS5000A_CHANNEL_A"], "B": ps.PS5000A_CHANNEL["PS5000A_CHANNEL_B"], "C": ps.PS5000A_CHANNEL["PS5000A_CHANNEL_C"], "D": ps.PS5000A_CHANNEL["PS5000A_CHANNEL_D"], } for ch_name, ch_config in self.channel_configs.items(): if ch_config.enabled: # Create buffer buffer = (ctypes.c_int16 * total_samples)() buffers[ch_name] = buffer # Set data buffer self.status[f"setDataBuffer{ch_name}"] = ps.ps5000aSetDataBuffer( self.chandle, channel_map[ch_name], ctypes.byref(buffer), total_samples, 0, # Segment index ps.PS5000A_RATIO_MODE["PS5000A_RATIO_MODE_NONE"], ) assert_pico_ok(self.status[f"setDataBuffer{ch_name}"]) # Get timebase - using timebase 0 for fastest sampling timebase = 0 time_interval_ns = ctypes.c_float() max_samples = ctypes.c_int32() self.status["getTimebase"] = ps.ps5000aGetTimebase2( self.chandle, timebase, total_samples, ctypes.byref(time_interval_ns), ctypes.byref(max_samples), 0, # Segment index ) # If timebase 0 doesn't work, try higher values while self.status["getTimebase"] != 0 and timebase < 100: timebase += 1 self.status["getTimebase"] = ps.ps5000aGetTimebase2( self.chandle, timebase, total_samples, ctypes.byref(time_interval_ns), ctypes.byref(max_samples), 0, ) assert_pico_ok(self.status["getTimebase"]) # Run block capture self.status["runBlock"] = ps.ps5000aRunBlock( self.chandle, pre_trigger, post_trigger, timebase, None, # Time indisposed (not used) 0, # Segment index None, # Callback (not used, we'll poll) None, # Callback parameter ) assert_pico_ok(self.status["runBlock"]) # Wait for capture to complete ready = ctypes.c_int16(0) check = ctypes.c_int16(0) while ready.value == check.value: self.status["isReady"] = ps.ps5000aIsReady(self.chandle, ctypes.byref(ready)) # Get values overflow = ctypes.c_int16() c_total_samples = ctypes.c_int32(total_samples) self.status["getValues"] = ps.ps5000aGetValues( self.chandle, 0, # Start index ctypes.byref(c_total_samples), 1, # Downsampling ratio ps.PS5000A_RATIO_MODE["PS5000A_RATIO_MODE_NONE"], 0, # Segment index ctypes.byref(overflow), ) assert_pico_ok(self.status["getValues"]) # Convert ADC values to mV and create CaptureData objects result = {} time_values = np.arange(total_samples) * (time_interval_ns.value / 1e9) # Convert to seconds for ch_name, buffer in buffers.items(): ch_config = self.channel_configs[ch_name] # Get voltage range range_map = { 0.02: "PS5000A_20MV", 0.05: "PS5000A_50MV", 0.1: "PS5000A_100MV", 0.2: "PS5000A_200MV", 0.5: "PS5000A_500MV", 1.0: "PS5000A_1V", 2.0: "PS5000A_2V", 5.0: "PS5000A_5V", 10.0: "PS5000A_10V", 20.0: "PS5000A_20V", } closest_range = min(range_map.keys(), key=lambda x: abs(x - ch_config.voltage_range)) voltage_range = ps.PS5000A_RANGE[range_map[closest_range]] # Convert to numpy array adc_array = np.array(buffer) # Convert ADC to mV voltage_mv = adc2mV( adc_array, voltage_range, self.device_info.max_adc_value if self.device_info else 32767, ) result[ch_name] = CaptureData( channel=ch_name, time_values=time_values.tolist(), voltage_values=voltage_mv.tolist(), sample_interval_ns=int(time_interval_ns.value), num_samples=total_samples, ) return result except Exception as e: return None def start_streaming( self, sample_interval_ns: int, buffer_size: int ) -> bool: """Start streaming mode. Args: sample_interval_ns: Sample interval in nanoseconds. buffer_size: Buffer size for streaming. Returns: True if successful, False otherwise. """ if not self.is_connected(): return False # TODO: Implement streaming mode # This will need to set up buffers and call ps*RunStreaming return True def stop_streaming(self) -> bool: """Stop streaming mode. Returns: True if successful, False otherwise. """ if not self.is_connected(): return False # TODO: Implement streaming stop # Call ps*Stop return True # Global instance of the device manager device_manager = PicoScopeManager()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/markuskreitzer/picoscope_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server