Skip to main content
Glama
markuskreitzer

PicoScope MCP Server

capture_block

Capture waveform data from PicoScope oscilloscopes by specifying pre- and post-trigger sample counts for signal acquisition and analysis.

Instructions

Capture a single block of data.

Args: pre_trigger_samples: Number of samples before trigger. post_trigger_samples: Number of samples after trigger.

Returns: Dictionary containing captured waveform data for all enabled channels.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
pre_trigger_samplesNo
post_trigger_samplesNo

Implementation Reference

  • MCP tool handler for 'capture_block'. Decorated with @mcp.tool(), calls device_manager.capture_block, formats and returns the captured data.
    @mcp.tool()
    def capture_block(
        pre_trigger_samples: int = 1000, post_trigger_samples: int = 1000
    ) -> dict[str, Any]:
        """Capture a single block of data.
    
        Args:
            pre_trigger_samples: Number of samples before trigger.
            post_trigger_samples: Number of samples after trigger.
    
        Returns:
            Dictionary containing captured waveform data for all enabled channels.
        """
        try:
            if not device_manager.is_connected():
                return {
                    "status": "error",
                    "error": "No device connected",
                }
    
            # Capture block
            result = device_manager.capture_block(pre_trigger_samples, post_trigger_samples)
    
            if result is None:
                return {
                    "status": "error",
                    "error": "Failed to capture data",
                }
    
            # Format the response
            channels_data = {}
            for channel, data in result.items():
                channels_data[channel] = {
                    "time_values": data.time_values,
                    "voltage_values": data.voltage_values,
                    "sample_interval_ns": data.sample_interval_ns,
                    "num_samples": data.num_samples,
                }
    
            return {
                "status": "success",
                "total_samples": pre_trigger_samples + post_trigger_samples,
                "pre_trigger_samples": pre_trigger_samples,
                "post_trigger_samples": post_trigger_samples,
                "channels": channels_data,
            }
    
        except Exception as e:
            return {
                "status": "error",
                "error": str(e),
            }
  • Registration of acquisition tools, which includes the capture_block tool, by calling register_acquisition_tools on the MCP server instance.
    register_acquisition_tools(mcp)
  • Low-level helper method in PicoScopeManager that performs the actual block capture using PicoSDK API calls.
    def capture_block(
        self, pre_trigger: int, post_trigger: int
    ) -> Optional[dict[str, CaptureData]]:
        """Capture a block of data.
    
        Args:
            pre_trigger: Number of samples before trigger.
            post_trigger: Number of samples after trigger.
    
        Returns:
            Dictionary of channel data or None if failed.
        """
        if not self.is_connected():
            return None
    
        try:
            total_samples = pre_trigger + post_trigger
    
            # Set up buffers for enabled channels
            buffers = {}
            channel_map = {
                "A": ps.PS5000A_CHANNEL["PS5000A_CHANNEL_A"],
                "B": ps.PS5000A_CHANNEL["PS5000A_CHANNEL_B"],
                "C": ps.PS5000A_CHANNEL["PS5000A_CHANNEL_C"],
                "D": ps.PS5000A_CHANNEL["PS5000A_CHANNEL_D"],
            }
    
            for ch_name, ch_config in self.channel_configs.items():
                if ch_config.enabled:
                    # Create buffer
                    buffer = (ctypes.c_int16 * total_samples)()
                    buffers[ch_name] = buffer
    
                    # Set data buffer
                    self.status[f"setDataBuffer{ch_name}"] = ps.ps5000aSetDataBuffer(
                        self.chandle,
                        channel_map[ch_name],
                        ctypes.byref(buffer),
                        total_samples,
                        0,  # Segment index
                        ps.PS5000A_RATIO_MODE["PS5000A_RATIO_MODE_NONE"],
                    )
                    assert_pico_ok(self.status[f"setDataBuffer{ch_name}"])
    
            # Get timebase - using timebase 0 for fastest sampling
            timebase = 0
            time_interval_ns = ctypes.c_float()
            max_samples = ctypes.c_int32()
    
            self.status["getTimebase"] = ps.ps5000aGetTimebase2(
                self.chandle,
                timebase,
                total_samples,
                ctypes.byref(time_interval_ns),
                ctypes.byref(max_samples),
                0,  # Segment index
            )
    
            # If timebase 0 doesn't work, try higher values
            while self.status["getTimebase"] != 0 and timebase < 100:
                timebase += 1
                self.status["getTimebase"] = ps.ps5000aGetTimebase2(
                    self.chandle,
                    timebase,
                    total_samples,
                    ctypes.byref(time_interval_ns),
                    ctypes.byref(max_samples),
                    0,
                )
    
            assert_pico_ok(self.status["getTimebase"])
    
            # Run block capture
            self.status["runBlock"] = ps.ps5000aRunBlock(
                self.chandle,
                pre_trigger,
                post_trigger,
                timebase,
                None,  # Time indisposed (not used)
                0,  # Segment index
                None,  # Callback (not used, we'll poll)
                None,  # Callback parameter
            )
            assert_pico_ok(self.status["runBlock"])
    
            # Wait for capture to complete
            ready = ctypes.c_int16(0)
            check = ctypes.c_int16(0)
            while ready.value == check.value:
                self.status["isReady"] = ps.ps5000aIsReady(self.chandle, ctypes.byref(ready))
    
            # Get values
            overflow = ctypes.c_int16()
            c_total_samples = ctypes.c_int32(total_samples)
    
            self.status["getValues"] = ps.ps5000aGetValues(
                self.chandle,
                0,  # Start index
                ctypes.byref(c_total_samples),
                1,  # Downsampling ratio
                ps.PS5000A_RATIO_MODE["PS5000A_RATIO_MODE_NONE"],
                0,  # Segment index
                ctypes.byref(overflow),
            )
            assert_pico_ok(self.status["getValues"])
    
            # Convert ADC values to mV and create CaptureData objects
            result = {}
            time_values = np.arange(total_samples) * (time_interval_ns.value / 1e9)  # Convert to seconds
    
            for ch_name, buffer in buffers.items():
                ch_config = self.channel_configs[ch_name]
    
                # Get voltage range
                range_map = {
                    0.02: "PS5000A_20MV", 0.05: "PS5000A_50MV", 0.1: "PS5000A_100MV",
                    0.2: "PS5000A_200MV", 0.5: "PS5000A_500MV", 1.0: "PS5000A_1V",
                    2.0: "PS5000A_2V", 5.0: "PS5000A_5V", 10.0: "PS5000A_10V",
                    20.0: "PS5000A_20V",
                }
                closest_range = min(range_map.keys(), key=lambda x: abs(x - ch_config.voltage_range))
                voltage_range = ps.PS5000A_RANGE[range_map[closest_range]]
    
                # Convert to numpy array
                adc_array = np.array(buffer)
    
                # Convert ADC to mV
                voltage_mv = adc2mV(
                    adc_array,
                    voltage_range,
                    self.device_info.max_adc_value if self.device_info else 32767,
                )
    
                result[ch_name] = CaptureData(
                    channel=ch_name,
                    time_values=time_values.tolist(),
                    voltage_values=voltage_mv.tolist(),
                    sample_interval_ns=int(time_interval_ns.value),
                    num_samples=total_samples,
                )
    
            return result
    
        except Exception as e:
            return None

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/markuskreitzer/picoscope_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server