We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/markuskreitzer/picoscope_mcp'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
"""Device manager for PicoScope oscilloscopes."""
from typing import Optional, Any
import ctypes
import numpy as np
# Try to import PicoSDK, but handle gracefully if not installed
try:
from picosdk.discover import find_all_units
from picosdk.ps5000a import ps5000a as ps
from picosdk.functions import adc2mV, assert_pico_ok, mV2adc
PICOSDK_AVAILABLE = True
except Exception as e:
PICOSDK_AVAILABLE = False
print(f"Warning: PicoSDK not available: {e}")
print("Server will run but device operations will fail until PicoSDK is installed.")
from .models import DeviceInfo, ChannelConfig, TriggerConfig, CaptureData
class PicoScopeManager:
"""Manager for PicoScope device operations."""
def __init__(self):
"""Initialize the device manager."""
self.current_device: Optional[Any] = None
self.device_info: Optional[DeviceInfo] = None
self.channel_configs: dict[str, ChannelConfig] = {}
self.chandle: Optional[ctypes.c_int16] = None
self.status: dict[str, Any] = {}
def discover_devices(self) -> list[dict[str, Any]]:
"""Discover all connected PicoScope devices.
Returns:
List of discovered devices with their information.
"""
if not PICOSDK_AVAILABLE:
return []
try:
devices = find_all_units()
device_list = []
for scope in devices:
device_list.append(
{
"info": str(scope.info),
"variant": getattr(scope, "variant", "unknown"),
}
)
scope.close()
return device_list
except Exception as e:
return []
def connect(self, device_index: int = 0, serial: Optional[str] = None) -> bool:
"""Connect to a PicoScope 5000A device.
Args:
device_index: Index of device to connect to (0 for first device).
serial: Optional serial number string for specific device.
Returns:
True if connection successful, False otherwise.
"""
if not PICOSDK_AVAILABLE:
return False
try:
# Create handle for PS5000A
self.chandle = ctypes.c_int16()
# Set resolution (12-bit default)
resolution = ps.PS5000A_DEVICE_RESOLUTION["PS5000A_DR_12BIT"]
# Convert serial to bytes if provided
serial_bytes = serial.encode() if serial else None
# Open the device
self.status["openunit"] = ps.ps5000aOpenUnit(
ctypes.byref(self.chandle), serial_bytes, resolution
)
# Handle power status issues
try:
assert_pico_ok(self.status["openunit"])
except:
power_status = self.status["openunit"]
# Power status codes: 286 = USB3.0 powered, 282 = needs external power
if power_status in [286, 282]:
self.status["changePowerSource"] = ps.ps5000aChangePowerSource(
self.chandle, power_status
)
assert_pico_ok(self.status["changePowerSource"])
else:
raise
# Get device info
info_string = ctypes.create_string_buffer(256)
required_size = ctypes.c_int16(256)
# Get variant info
self.status["getUnitInfo"] = ps.ps5000aGetUnitInfo(
self.chandle,
ctypes.byref(info_string),
ctypes.c_int16(256),
ctypes.byref(required_size),
ps.PICO_INFO["PICO_VARIANT_INFO"]
)
variant = info_string.value.decode('utf-8')
# Get batch and serial
self.status["getUnitInfo"] = ps.ps5000aGetUnitInfo(
self.chandle,
ctypes.byref(info_string),
ctypes.c_int16(256),
ctypes.byref(required_size),
ps.PICO_INFO["PICO_BATCH_AND_SERIAL"]
)
batch_serial = info_string.value.decode('utf-8')
# Store device information
self.device_info = DeviceInfo(
handle=self.chandle.value,
model="PS5000A",
serial=batch_serial,
variant=variant,
batch_and_serial=batch_serial,
max_adc_value=32767, # 15-bit for PS5000A
min_adc_value=-32767,
num_channels=4, # PS5000A has 4 channels
)
return True
except Exception as e:
self.chandle = None
self.device_info = None
return False
def disconnect(self) -> bool:
"""Disconnect from current device.
Returns:
True if disconnection successful, False otherwise.
"""
try:
if self.chandle:
# Stop any running capture
ps.ps5000aStop(self.chandle)
# Close the unit
ps.ps5000aCloseUnit(self.chandle)
self.chandle = None
self.current_device = None
self.device_info = None
self.channel_configs.clear()
self.status.clear()
return True
except Exception as e:
return False
def is_connected(self) -> bool:
"""Check if a device is currently connected.
Returns:
True if connected, False otherwise.
"""
return self.current_device is not None
def get_info(self) -> Optional[DeviceInfo]:
"""Get information about the current device.
Returns:
DeviceInfo object or None if not connected.
"""
return self.device_info
def configure_channel(self, config: ChannelConfig) -> bool:
"""Configure a channel.
Args:
config: Channel configuration.
Returns:
True if successful, False otherwise.
"""
if not self.is_connected():
return False
try:
# Map channel letter to PS5000A channel constant
channel_map = {
"A": ps.PS5000A_CHANNEL["PS5000A_CHANNEL_A"],
"B": ps.PS5000A_CHANNEL["PS5000A_CHANNEL_B"],
"C": ps.PS5000A_CHANNEL["PS5000A_CHANNEL_C"],
"D": ps.PS5000A_CHANNEL["PS5000A_CHANNEL_D"],
}
if config.channel not in channel_map:
return False
# Map coupling type
coupling = (
ps.PS5000A_COUPLING["PS5000A_AC"]
if config.coupling.value == "AC"
else ps.PS5000A_COUPLING["PS5000A_DC"]
)
# Map voltage range to closest available range
range_map = {
0.02: "PS5000A_20MV",
0.05: "PS5000A_50MV",
0.1: "PS5000A_100MV",
0.2: "PS5000A_200MV",
0.5: "PS5000A_500MV",
1.0: "PS5000A_1V",
2.0: "PS5000A_2V",
5.0: "PS5000A_5V",
10.0: "PS5000A_10V",
20.0: "PS5000A_20V",
}
# Find closest range
closest_range = min(range_map.keys(), key=lambda x: abs(x - config.voltage_range))
voltage_range = ps.PS5000A_RANGE[range_map[closest_range]]
# Convert analog offset to ADC counts
if self.device_info:
analog_offset_adc = mV2adc(
config.analog_offset * 1000, # V to mV
voltage_range,
self.device_info.max_adc_value
)
else:
analog_offset_adc = 0
# Set the channel
self.status[f"setCh{config.channel}"] = ps.ps5000aSetChannel(
self.chandle,
channel_map[config.channel],
1 if config.enabled else 0,
coupling,
voltage_range,
analog_offset_adc
)
assert_pico_ok(self.status[f"setCh{config.channel}"])
# Store configuration
self.channel_configs[config.channel] = config
return True
except Exception as e:
return False
def set_trigger(self, config: TriggerConfig) -> bool:
"""Set up trigger.
Args:
config: Trigger configuration.
Returns:
True if successful, False otherwise.
"""
if not self.is_connected():
return False
try:
# Map source to channel
source_map = {
"A": ps.PS5000A_CHANNEL["PS5000A_CHANNEL_A"],
"B": ps.PS5000A_CHANNEL["PS5000A_CHANNEL_B"],
"C": ps.PS5000A_CHANNEL["PS5000A_CHANNEL_C"],
"D": ps.PS5000A_CHANNEL["PS5000A_CHANNEL_D"],
"External": ps.PS5000A_CHANNEL["PS5000A_EXTERNAL"],
}
if config.source not in source_map:
return False
# Map trigger direction
direction_map = {
"Rising": ps.PS5000A_THRESHOLD_DIRECTION["PS5000A_RISING"],
"Falling": ps.PS5000A_THRESHOLD_DIRECTION["PS5000A_FALLING"],
"Rising_Or_Falling": ps.PS5000A_THRESHOLD_DIRECTION[
"PS5000A_RISING_OR_FALLING"
],
}
direction = direction_map[config.direction.value]
# Convert threshold from mV to ADC counts
# Use the configured range for the source channel if available
if config.source in self.channel_configs:
ch_config = self.channel_configs[config.source]
range_map = {
0.02: "PS5000A_20MV",
0.05: "PS5000A_50MV",
0.1: "PS5000A_100MV",
0.2: "PS5000A_200MV",
0.5: "PS5000A_500MV",
1.0: "PS5000A_1V",
2.0: "PS5000A_2V",
5.0: "PS5000A_5V",
10.0: "PS5000A_10V",
20.0: "PS5000A_20V",
}
closest_range = min(
range_map.keys(), key=lambda x: abs(x - ch_config.voltage_range)
)
voltage_range = ps.PS5000A_RANGE[range_map[closest_range]]
else:
voltage_range = ps.PS5000A_RANGE["PS5000A_2V"] # Default
threshold_adc = mV2adc(
config.threshold_mv,
voltage_range,
self.device_info.max_adc_value if self.device_info else 32767,
)
# Set simple trigger
self.status["trigger"] = ps.ps5000aSetSimpleTrigger(
self.chandle,
1, # Enable trigger
source_map[config.source],
threshold_adc,
direction,
0, # Delay (samples)
config.auto_trigger_ms, # Auto-trigger timeout
)
assert_pico_ok(self.status["trigger"])
return True
except Exception as e:
return False
def capture_block(
self, pre_trigger: int, post_trigger: int
) -> Optional[dict[str, CaptureData]]:
"""Capture a block of data.
Args:
pre_trigger: Number of samples before trigger.
post_trigger: Number of samples after trigger.
Returns:
Dictionary of channel data or None if failed.
"""
if not self.is_connected():
return None
try:
total_samples = pre_trigger + post_trigger
# Set up buffers for enabled channels
buffers = {}
channel_map = {
"A": ps.PS5000A_CHANNEL["PS5000A_CHANNEL_A"],
"B": ps.PS5000A_CHANNEL["PS5000A_CHANNEL_B"],
"C": ps.PS5000A_CHANNEL["PS5000A_CHANNEL_C"],
"D": ps.PS5000A_CHANNEL["PS5000A_CHANNEL_D"],
}
for ch_name, ch_config in self.channel_configs.items():
if ch_config.enabled:
# Create buffer
buffer = (ctypes.c_int16 * total_samples)()
buffers[ch_name] = buffer
# Set data buffer
self.status[f"setDataBuffer{ch_name}"] = ps.ps5000aSetDataBuffer(
self.chandle,
channel_map[ch_name],
ctypes.byref(buffer),
total_samples,
0, # Segment index
ps.PS5000A_RATIO_MODE["PS5000A_RATIO_MODE_NONE"],
)
assert_pico_ok(self.status[f"setDataBuffer{ch_name}"])
# Get timebase - using timebase 0 for fastest sampling
timebase = 0
time_interval_ns = ctypes.c_float()
max_samples = ctypes.c_int32()
self.status["getTimebase"] = ps.ps5000aGetTimebase2(
self.chandle,
timebase,
total_samples,
ctypes.byref(time_interval_ns),
ctypes.byref(max_samples),
0, # Segment index
)
# If timebase 0 doesn't work, try higher values
while self.status["getTimebase"] != 0 and timebase < 100:
timebase += 1
self.status["getTimebase"] = ps.ps5000aGetTimebase2(
self.chandle,
timebase,
total_samples,
ctypes.byref(time_interval_ns),
ctypes.byref(max_samples),
0,
)
assert_pico_ok(self.status["getTimebase"])
# Run block capture
self.status["runBlock"] = ps.ps5000aRunBlock(
self.chandle,
pre_trigger,
post_trigger,
timebase,
None, # Time indisposed (not used)
0, # Segment index
None, # Callback (not used, we'll poll)
None, # Callback parameter
)
assert_pico_ok(self.status["runBlock"])
# Wait for capture to complete
ready = ctypes.c_int16(0)
check = ctypes.c_int16(0)
while ready.value == check.value:
self.status["isReady"] = ps.ps5000aIsReady(self.chandle, ctypes.byref(ready))
# Get values
overflow = ctypes.c_int16()
c_total_samples = ctypes.c_int32(total_samples)
self.status["getValues"] = ps.ps5000aGetValues(
self.chandle,
0, # Start index
ctypes.byref(c_total_samples),
1, # Downsampling ratio
ps.PS5000A_RATIO_MODE["PS5000A_RATIO_MODE_NONE"],
0, # Segment index
ctypes.byref(overflow),
)
assert_pico_ok(self.status["getValues"])
# Convert ADC values to mV and create CaptureData objects
result = {}
time_values = np.arange(total_samples) * (time_interval_ns.value / 1e9) # Convert to seconds
for ch_name, buffer in buffers.items():
ch_config = self.channel_configs[ch_name]
# Get voltage range
range_map = {
0.02: "PS5000A_20MV", 0.05: "PS5000A_50MV", 0.1: "PS5000A_100MV",
0.2: "PS5000A_200MV", 0.5: "PS5000A_500MV", 1.0: "PS5000A_1V",
2.0: "PS5000A_2V", 5.0: "PS5000A_5V", 10.0: "PS5000A_10V",
20.0: "PS5000A_20V",
}
closest_range = min(range_map.keys(), key=lambda x: abs(x - ch_config.voltage_range))
voltage_range = ps.PS5000A_RANGE[range_map[closest_range]]
# Convert to numpy array
adc_array = np.array(buffer)
# Convert ADC to mV
voltage_mv = adc2mV(
adc_array,
voltage_range,
self.device_info.max_adc_value if self.device_info else 32767,
)
result[ch_name] = CaptureData(
channel=ch_name,
time_values=time_values.tolist(),
voltage_values=voltage_mv.tolist(),
sample_interval_ns=int(time_interval_ns.value),
num_samples=total_samples,
)
return result
except Exception as e:
return None
def start_streaming(
self, sample_interval_ns: int, buffer_size: int
) -> bool:
"""Start streaming mode.
Args:
sample_interval_ns: Sample interval in nanoseconds.
buffer_size: Buffer size for streaming.
Returns:
True if successful, False otherwise.
"""
if not self.is_connected():
return False
# TODO: Implement streaming mode
# This will need to set up buffers and call ps*RunStreaming
return True
def stop_streaming(self) -> bool:
"""Stop streaming mode.
Returns:
True if successful, False otherwise.
"""
if not self.is_connected():
return False
# TODO: Implement streaming stop
# Call ps*Stop
return True
# Global instance of the device manager
device_manager = PicoScopeManager()