Skip to main content
Glama

qudi MCP Integration

by dirkenglund
measurement_tools.py•14.8 kB
""" Measurement tools for qudi MCP integration Handles measurement module loading, execution, and data acquisition through qudi's measurement framework. """ import logging import asyncio import time import uuid from typing import Dict, List, Any, Optional from ..safety import RunLevel class MeasurementTools: """Measurement execution and management tools""" def __init__(self, server): self.server = server self.logger = logging.getLogger("qudi-mcp.measurements") async def handle_tool(self, name: str, arguments: Dict[str, Any]) -> Dict[str, Any]: """Route measurement tool calls""" tool_map = { "measurement.list_modules": self._list_modules, "measurement.start": self._start_measurement, "measurement.status": self._get_status, "measurement.stop": self._stop_measurement, "measurement.get_data": self._get_data, "measurement.save_data": self._save_data } if name not in tool_map: return {"error": f"Unknown measurement tool: {name}"} return await tool_map[name](arguments) async def _list_modules(self, args: Dict[str, Any]) -> Dict[str, Any]: """List available measurement modules""" if self.server.runlevel == RunLevel.DRY_RUN: return { "modules": [ { "name": "photoluminescence_scan", "type": "spectroscopy", "description": "PL spectroscopy measurement", "parameters": [ "wavelength_start", "wavelength_end", "wavelength_step", "integration_time", "laser_power" ] }, { "name": "gate_sweep", "type": "transport", "description": "Gate voltage sweep measurement", "parameters": [ "gate_start", "gate_end", "gate_step", "bias_voltage", "measurement_time" ] }, { "name": "resonance_scan", "type": "spectroscopy", "description": "Resonance frequency scan", "parameters": [ "frequency_start", "frequency_end", "frequency_step", "power", "integration_time" ] }, { "name": "time_trace", "type": "time_series", "description": "Time-resolved measurement", "parameters": [ "total_time", "time_resolution", "trigger_source" ] }, { "name": "2d_gate_map", "type": "transport_2d", "description": "2D gate voltage map", "parameters": [ "gate1_start", "gate1_end", "gate1_steps", "gate2_start", "gate2_end", "gate2_steps", "bias_voltage", "integration_time" ] } ], "runlevel": self.server.runlevel.value, "message": "Simulated measurement modules" } # TODO: Get real modules from qudi return { "modules": [], "message": "qudi measurement modules not yet integrated" } async def _start_measurement(self, args: Dict[str, Any]) -> Dict[str, Any]: """Start a measurement with specified parameters""" module_name = args.get("module_name") parameters = args.get("parameters", {}) if not module_name: return {"error": "module_name is required"} # Validate parameters against safety limits validation_results = [] for param, value in parameters.items(): is_safe, message = self.server.safety_checker.validate_parameter(param, value) validation_results.append({"parameter": param, "safe": is_safe, "message": message}) if not is_safe: return { "error": f"Safety validation failed for {param}: {message}", "validation_results": validation_results } # Generate measurement ID measurement_id = str(uuid.uuid4())[:8] # Create measurement state measurement_state = { "id": measurement_id, "module": module_name, "parameters": parameters, "status": "running", "start_time": time.time(), "progress": 0.0, "data_points": 0, "estimated_duration": self._estimate_duration(module_name, parameters) } self.server.measurement_state[measurement_id] = measurement_state if self.server.runlevel == RunLevel.DRY_RUN: # Simulate measurement execution asyncio.create_task(self._simulate_measurement(measurement_id)) return { "status": "started", "measurement_id": measurement_id, "module": module_name, "parameters": parameters, "estimated_duration": measurement_state["estimated_duration"], "message": "Dry-run: Simulated measurement started", "validation_results": validation_results } if self.server.runlevel == RunLevel.LIVE: return { "error": "LIVE mode measurements require approval (not implemented)", "measurement_id": measurement_id, "module": module_name } # SIM mode - realistic simulation asyncio.create_task(self._simulate_measurement(measurement_id)) return { "status": "started", "measurement_id": measurement_id, "module": module_name, "parameters": parameters, "estimated_duration": measurement_state["estimated_duration"], "message": "Simulation mode: Realistic measurement simulation started", "validation_results": validation_results } async def _get_status(self, args: Dict[str, Any]) -> Dict[str, Any]: """Get measurement status""" measurement_id = args.get("measurement_id") if measurement_id: # Get specific measurement status if measurement_id not in self.server.measurement_state: return {"error": f"Measurement {measurement_id} not found"} state = self.server.measurement_state[measurement_id] return { "measurement_id": measurement_id, "status": state["status"], "module": state["module"], "progress": state["progress"], "data_points": state["data_points"], "elapsed_time": time.time() - state["start_time"], "estimated_duration": state["estimated_duration"] } else: # Get status of all measurements return { "active_measurements": len(self.server.measurement_state), "measurements": { mid: { "status": state["status"], "module": state["module"], "progress": state["progress"], "elapsed_time": time.time() - state["start_time"] } for mid, state in self.server.measurement_state.items() } } async def _stop_measurement(self, args: Dict[str, Any]) -> Dict[str, Any]: """Stop a running measurement""" measurement_id = args.get("measurement_id") if not measurement_id: return {"error": "measurement_id is required"} if measurement_id not in self.server.measurement_state: return {"error": f"Measurement {measurement_id} not found"} state = self.server.measurement_state[measurement_id] if state["status"] != "running": return { "message": f"Measurement {measurement_id} is not running (status: {state['status']})" } # Stop the measurement state["status"] = "stopped" state["stop_time"] = time.time() return { "status": "stopped", "measurement_id": measurement_id, "elapsed_time": time.time() - state["start_time"], "data_points": state["data_points"], "message": f"Measurement {measurement_id} stopped" } async def _get_data(self, args: Dict[str, Any]) -> Dict[str, Any]: """Get measurement data""" measurement_id = args.get("measurement_id") if not measurement_id: return {"error": "measurement_id is required"} if measurement_id not in self.server.measurement_state: return {"error": f"Measurement {measurement_id} not found"} state = self.server.measurement_state[measurement_id] # Generate simulated data based on measurement type if self.server.runlevel in [RunLevel.DRY_RUN, RunLevel.SIM]: data = self._generate_simulated_data(state["module"], state["parameters"], state["data_points"]) return { "measurement_id": measurement_id, "status": state["status"], "data_points": len(data.get("x", [])), "data": data, "message": "Simulated measurement data" } # TODO: Get real data from qudi measurement return { "error": "Real measurement data access not implemented" } async def _save_data(self, args: Dict[str, Any]) -> Dict[str, Any]: """Save measurement data""" measurement_id = args.get("measurement_id") filename = args.get("filename") if not measurement_id: return {"error": "measurement_id is required"} if measurement_id not in self.server.measurement_state: return {"error": f"Measurement {measurement_id} not found"} # Generate filename if not provided if not filename: state = self.server.measurement_state[measurement_id] timestamp = int(time.time()) filename = f"{state['module']}_{measurement_id}_{timestamp}.dat" if self.server.runlevel == RunLevel.DRY_RUN: return { "status": "success", "measurement_id": measurement_id, "filename": filename, "message": f"Dry-run: Would save data to {filename}" } # TODO: Implement real data saving return { "status": "success", "measurement_id": measurement_id, "filename": filename, "message": f"Simulated save to {filename}" } async def _simulate_measurement(self, measurement_id: str): """Simulate measurement execution with realistic timing""" state = self.server.measurement_state[measurement_id] duration = state["estimated_duration"] # Simulate measurement progress steps = 20 step_time = duration / steps for i in range(steps): if state["status"] != "running": break await asyncio.sleep(step_time) state["progress"] = (i + 1) / steps state["data_points"] = int(state["progress"] * 100) # Simulate data accumulation # Complete measurement if still running if state["status"] == "running": state["status"] = "completed" state["progress"] = 1.0 state["completion_time"] = time.time() def _estimate_duration(self, module_name: str, parameters: Dict[str, Any]) -> float: """Estimate measurement duration based on module and parameters""" base_times = { "photoluminescence_scan": 30.0, # 30 seconds "gate_sweep": 60.0, # 1 minute "resonance_scan": 45.0, # 45 seconds "time_trace": 120.0, # 2 minutes "2d_gate_map": 600.0 # 10 minutes } base_time = base_times.get(module_name, 60.0) # Adjust based on parameters if "integration_time" in parameters: base_time *= parameters["integration_time"] if "steps" in parameters: base_time *= (parameters["steps"] / 100.0) return min(base_time, 3600.0) # Cap at 1 hour def _generate_simulated_data(self, module_name: str, parameters: Dict[str, Any], num_points: int) -> Dict[str, Any]: """Generate realistic simulated measurement data""" import numpy as np if module_name == "photoluminescence_scan": x = np.linspace(630, 650, num_points) # wavelength y = np.exp(-((x - 637)**2) / (2 * 2**2)) + 0.1 * np.random.random(len(x)) # Gaussian peak return {"wavelength": x.tolist(), "intensity": y.tolist()} elif module_name == "gate_sweep": x = np.linspace(-1.0, 1.0, num_points) # gate voltage y = np.abs(x)**2 + 0.05 * np.random.random(len(x)) # Parabolic response return {"gate_voltage": x.tolist(), "current": y.tolist()} elif module_name == "time_trace": x = np.linspace(0, 10, num_points) # time y = np.sin(2 * np.pi * x) + 0.1 * np.random.random(len(x)) # Oscillating signal return {"time": x.tolist(), "signal": y.tolist()} else: # Generic data x = np.linspace(0, 100, num_points) y = np.random.random(len(x)) return {"x": x.tolist(), "y": y.tolist()}

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/dirkenglund/qudi-mcp-integration'

If you have feedback or need assistance with the MCP directory API, please join our Discord server