Skip to main content
Glama

capture_block

Capture waveform data from PicoScope oscilloscopes by specifying pre-trigger and post-trigger sample counts. Returns signal data for all enabled channels in a single acquisition block.

Instructions

Capture a single block of data.

Args: pre_trigger_samples: Number of samples before trigger. post_trigger_samples: Number of samples after trigger.

Returns: Dictionary containing captured waveform data for all enabled channels.

Input Schema

NameRequiredDescriptionDefault
post_trigger_samplesNo
pre_trigger_samplesNo

Input Schema (JSON Schema)

{ "properties": { "post_trigger_samples": { "default": 1000, "type": "integer" }, "pre_trigger_samples": { "default": 1000, "type": "integer" } }, "type": "object" }

Implementation Reference

  • MCP tool handler for 'capture_block'. Decorated with @mcp.tool(), handles parameters, calls low-level capture, formats response with channel data, and manages errors.
    @mcp.tool() def capture_block( pre_trigger_samples: int = 1000, post_trigger_samples: int = 1000 ) -> dict[str, Any]: """Capture a single block of data. Args: pre_trigger_samples: Number of samples before trigger. post_trigger_samples: Number of samples after trigger. Returns: Dictionary containing captured waveform data for all enabled channels. """ try: if not device_manager.is_connected(): return { "status": "error", "error": "No device connected", } # Capture block result = device_manager.capture_block(pre_trigger_samples, post_trigger_samples) if result is None: return { "status": "error", "error": "Failed to capture data", } # Format the response channels_data = {} for channel, data in result.items(): channels_data[channel] = { "time_values": data.time_values, "voltage_values": data.voltage_values, "sample_interval_ns": data.sample_interval_ns, "num_samples": data.num_samples, } return { "status": "success", "total_samples": pre_trigger_samples + post_trigger_samples, "pre_trigger_samples": pre_trigger_samples, "post_trigger_samples": post_trigger_samples, "channels": channels_data, } except Exception as e: return { "status": "error", "error": str(e), }
  • Server imports register_acquisition_tools and calls it to register the acquisition tools including 'capture_block'.
    from .tools.acquisition import register_acquisition_tools from .tools.analysis import register_analysis_tools from .tools.advanced import register_advanced_tools # Create FastMCP server instance mcp = FastMCP("PicoScope MCP Server") # Register all tool categories register_discovery_tools(mcp) register_configuration_tools(mcp) register_acquisition_tools(mcp) register_analysis_tools(mcp) register_advanced_tools(mcp)
  • Low-level capture_block implementation in device_manager. Sets up buffers, calls PicoScope ps5000aRunBlock, retrieves data, converts to voltage values, returns dict of CaptureData.
    def capture_block( self, pre_trigger: int, post_trigger: int ) -> Optional[dict[str, CaptureData]]: """Capture a block of data. Args: pre_trigger: Number of samples before trigger. post_trigger: Number of samples after trigger. Returns: Dictionary of channel data or None if failed. """ if not self.is_connected(): return None try: total_samples = pre_trigger + post_trigger # Set up buffers for enabled channels buffers = {} channel_map = { "A": ps.PS5000A_CHANNEL["PS5000A_CHANNEL_A"], "B": ps.PS5000A_CHANNEL["PS5000A_CHANNEL_B"], "C": ps.PS5000A_CHANNEL["PS5000A_CHANNEL_C"], "D": ps.PS5000A_CHANNEL["PS5000A_CHANNEL_D"], } for ch_name, ch_config in self.channel_configs.items(): if ch_config.enabled: # Create buffer buffer = (ctypes.c_int16 * total_samples)() buffers[ch_name] = buffer # Set data buffer self.status[f"setDataBuffer{ch_name}"] = ps.ps5000aSetDataBuffer( self.chandle, channel_map[ch_name], ctypes.byref(buffer), total_samples, 0, # Segment index ps.PS5000A_RATIO_MODE["PS5000A_RATIO_MODE_NONE"], ) assert_pico_ok(self.status[f"setDataBuffer{ch_name}"]) # Get timebase - using timebase 0 for fastest sampling timebase = 0 time_interval_ns = ctypes.c_float() max_samples = ctypes.c_int32() self.status["getTimebase"] = ps.ps5000aGetTimebase2( self.chandle, timebase, total_samples, ctypes.byref(time_interval_ns), ctypes.byref(max_samples), 0, # Segment index ) # If timebase 0 doesn't work, try higher values while self.status["getTimebase"] != 0 and timebase < 100: timebase += 1 self.status["getTimebase"] = ps.ps5000aGetTimebase2( self.chandle, timebase, total_samples, ctypes.byref(time_interval_ns), ctypes.byref(max_samples), 0, ) assert_pico_ok(self.status["getTimebase"]) # Run block capture self.status["runBlock"] = ps.ps5000aRunBlock( self.chandle, pre_trigger, post_trigger, timebase, None, # Time indisposed (not used) 0, # Segment index None, # Callback (not used, we'll poll) None, # Callback parameter ) assert_pico_ok(self.status["runBlock"]) # Wait for capture to complete ready = ctypes.c_int16(0) check = ctypes.c_int16(0) while ready.value == check.value: self.status["isReady"] = ps.ps5000aIsReady(self.chandle, ctypes.byref(ready)) # Get values overflow = ctypes.c_int16() c_total_samples = ctypes.c_int32(total_samples) self.status["getValues"] = ps.ps5000aGetValues( self.chandle, 0, # Start index ctypes.byref(c_total_samples), 1, # Downsampling ratio ps.PS5000A_RATIO_MODE["PS5000A_RATIO_MODE_NONE"], 0, # Segment index ctypes.byref(overflow), ) assert_pico_ok(self.status["getValues"]) # Convert ADC values to mV and create CaptureData objects result = {} time_values = np.arange(total_samples) * (time_interval_ns.value / 1e9) # Convert to seconds for ch_name, buffer in buffers.items(): ch_config = self.channel_configs[ch_name] # Get voltage range range_map = { 0.02: "PS5000A_20MV", 0.05: "PS5000A_50MV", 0.1: "PS5000A_100MV", 0.2: "PS5000A_200MV", 0.5: "PS5000A_500MV", 1.0: "PS5000A_1V", 2.0: "PS5000A_2V", 5.0: "PS5000A_5V", 10.0: "PS5000A_10V", 20.0: "PS5000A_20V", } closest_range = min(range_map.keys(), key=lambda x: abs(x - ch_config.voltage_range)) voltage_range = ps.PS5000A_RANGE[range_map[closest_range]] # Convert to numpy array adc_array = np.array(buffer) # Convert ADC to mV voltage_mv = adc2mV( adc_array, voltage_range, self.device_info.max_adc_value if self.device_info else 32767, ) result[ch_name] = CaptureData( channel=ch_name, time_values=time_values.tolist(), voltage_values=voltage_mv.tolist(), sample_interval_ns=int(time_interval_ns.value), num_samples=total_samples, ) return result except Exception as e: return None
  • Dataclass schema for CaptureData used in the low-level capture result, which the handler formats into JSON-like dict.
    class CaptureData: """Captured waveform data.""" channel: str time_values: list[float] voltage_values: list[float] sample_interval_ns: int num_samples: int

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/markuskreitzer/picoscope_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server