measurement_tools.pyā¢14.8 kB
"""
Measurement tools for qudi MCP integration
Handles measurement module loading, execution, and data acquisition
through qudi's measurement framework.
"""
import logging
import asyncio
import time
import uuid
from typing import Dict, List, Any, Optional
from ..safety import RunLevel
class MeasurementTools:
"""Measurement execution and management tools"""
def __init__(self, server):
self.server = server
self.logger = logging.getLogger("qudi-mcp.measurements")
async def handle_tool(self, name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Route measurement tool calls"""
tool_map = {
"measurement.list_modules": self._list_modules,
"measurement.start": self._start_measurement,
"measurement.status": self._get_status,
"measurement.stop": self._stop_measurement,
"measurement.get_data": self._get_data,
"measurement.save_data": self._save_data
}
if name not in tool_map:
return {"error": f"Unknown measurement tool: {name}"}
return await tool_map[name](arguments)
async def _list_modules(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""List available measurement modules"""
if self.server.runlevel == RunLevel.DRY_RUN:
return {
"modules": [
{
"name": "photoluminescence_scan",
"type": "spectroscopy",
"description": "PL spectroscopy measurement",
"parameters": [
"wavelength_start", "wavelength_end", "wavelength_step",
"integration_time", "laser_power"
]
},
{
"name": "gate_sweep",
"type": "transport",
"description": "Gate voltage sweep measurement",
"parameters": [
"gate_start", "gate_end", "gate_step",
"bias_voltage", "measurement_time"
]
},
{
"name": "resonance_scan",
"type": "spectroscopy",
"description": "Resonance frequency scan",
"parameters": [
"frequency_start", "frequency_end", "frequency_step",
"power", "integration_time"
]
},
{
"name": "time_trace",
"type": "time_series",
"description": "Time-resolved measurement",
"parameters": [
"total_time", "time_resolution", "trigger_source"
]
},
{
"name": "2d_gate_map",
"type": "transport_2d",
"description": "2D gate voltage map",
"parameters": [
"gate1_start", "gate1_end", "gate1_steps",
"gate2_start", "gate2_end", "gate2_steps",
"bias_voltage", "integration_time"
]
}
],
"runlevel": self.server.runlevel.value,
"message": "Simulated measurement modules"
}
# TODO: Get real modules from qudi
return {
"modules": [],
"message": "qudi measurement modules not yet integrated"
}
async def _start_measurement(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Start a measurement with specified parameters"""
module_name = args.get("module_name")
parameters = args.get("parameters", {})
if not module_name:
return {"error": "module_name is required"}
# Validate parameters against safety limits
validation_results = []
for param, value in parameters.items():
is_safe, message = self.server.safety_checker.validate_parameter(param, value)
validation_results.append({"parameter": param, "safe": is_safe, "message": message})
if not is_safe:
return {
"error": f"Safety validation failed for {param}: {message}",
"validation_results": validation_results
}
# Generate measurement ID
measurement_id = str(uuid.uuid4())[:8]
# Create measurement state
measurement_state = {
"id": measurement_id,
"module": module_name,
"parameters": parameters,
"status": "running",
"start_time": time.time(),
"progress": 0.0,
"data_points": 0,
"estimated_duration": self._estimate_duration(module_name, parameters)
}
self.server.measurement_state[measurement_id] = measurement_state
if self.server.runlevel == RunLevel.DRY_RUN:
# Simulate measurement execution
asyncio.create_task(self._simulate_measurement(measurement_id))
return {
"status": "started",
"measurement_id": measurement_id,
"module": module_name,
"parameters": parameters,
"estimated_duration": measurement_state["estimated_duration"],
"message": "Dry-run: Simulated measurement started",
"validation_results": validation_results
}
if self.server.runlevel == RunLevel.LIVE:
return {
"error": "LIVE mode measurements require approval (not implemented)",
"measurement_id": measurement_id,
"module": module_name
}
# SIM mode - realistic simulation
asyncio.create_task(self._simulate_measurement(measurement_id))
return {
"status": "started",
"measurement_id": measurement_id,
"module": module_name,
"parameters": parameters,
"estimated_duration": measurement_state["estimated_duration"],
"message": "Simulation mode: Realistic measurement simulation started",
"validation_results": validation_results
}
async def _get_status(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Get measurement status"""
measurement_id = args.get("measurement_id")
if measurement_id:
# Get specific measurement status
if measurement_id not in self.server.measurement_state:
return {"error": f"Measurement {measurement_id} not found"}
state = self.server.measurement_state[measurement_id]
return {
"measurement_id": measurement_id,
"status": state["status"],
"module": state["module"],
"progress": state["progress"],
"data_points": state["data_points"],
"elapsed_time": time.time() - state["start_time"],
"estimated_duration": state["estimated_duration"]
}
else:
# Get status of all measurements
return {
"active_measurements": len(self.server.measurement_state),
"measurements": {
mid: {
"status": state["status"],
"module": state["module"],
"progress": state["progress"],
"elapsed_time": time.time() - state["start_time"]
}
for mid, state in self.server.measurement_state.items()
}
}
async def _stop_measurement(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Stop a running measurement"""
measurement_id = args.get("measurement_id")
if not measurement_id:
return {"error": "measurement_id is required"}
if measurement_id not in self.server.measurement_state:
return {"error": f"Measurement {measurement_id} not found"}
state = self.server.measurement_state[measurement_id]
if state["status"] != "running":
return {
"message": f"Measurement {measurement_id} is not running (status: {state['status']})"
}
# Stop the measurement
state["status"] = "stopped"
state["stop_time"] = time.time()
return {
"status": "stopped",
"measurement_id": measurement_id,
"elapsed_time": time.time() - state["start_time"],
"data_points": state["data_points"],
"message": f"Measurement {measurement_id} stopped"
}
async def _get_data(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Get measurement data"""
measurement_id = args.get("measurement_id")
if not measurement_id:
return {"error": "measurement_id is required"}
if measurement_id not in self.server.measurement_state:
return {"error": f"Measurement {measurement_id} not found"}
state = self.server.measurement_state[measurement_id]
# Generate simulated data based on measurement type
if self.server.runlevel in [RunLevel.DRY_RUN, RunLevel.SIM]:
data = self._generate_simulated_data(state["module"], state["parameters"], state["data_points"])
return {
"measurement_id": measurement_id,
"status": state["status"],
"data_points": len(data.get("x", [])),
"data": data,
"message": "Simulated measurement data"
}
# TODO: Get real data from qudi measurement
return {
"error": "Real measurement data access not implemented"
}
async def _save_data(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Save measurement data"""
measurement_id = args.get("measurement_id")
filename = args.get("filename")
if not measurement_id:
return {"error": "measurement_id is required"}
if measurement_id not in self.server.measurement_state:
return {"error": f"Measurement {measurement_id} not found"}
# Generate filename if not provided
if not filename:
state = self.server.measurement_state[measurement_id]
timestamp = int(time.time())
filename = f"{state['module']}_{measurement_id}_{timestamp}.dat"
if self.server.runlevel == RunLevel.DRY_RUN:
return {
"status": "success",
"measurement_id": measurement_id,
"filename": filename,
"message": f"Dry-run: Would save data to {filename}"
}
# TODO: Implement real data saving
return {
"status": "success",
"measurement_id": measurement_id,
"filename": filename,
"message": f"Simulated save to {filename}"
}
async def _simulate_measurement(self, measurement_id: str):
"""Simulate measurement execution with realistic timing"""
state = self.server.measurement_state[measurement_id]
duration = state["estimated_duration"]
# Simulate measurement progress
steps = 20
step_time = duration / steps
for i in range(steps):
if state["status"] != "running":
break
await asyncio.sleep(step_time)
state["progress"] = (i + 1) / steps
state["data_points"] = int(state["progress"] * 100) # Simulate data accumulation
# Complete measurement if still running
if state["status"] == "running":
state["status"] = "completed"
state["progress"] = 1.0
state["completion_time"] = time.time()
def _estimate_duration(self, module_name: str, parameters: Dict[str, Any]) -> float:
"""Estimate measurement duration based on module and parameters"""
base_times = {
"photoluminescence_scan": 30.0, # 30 seconds
"gate_sweep": 60.0, # 1 minute
"resonance_scan": 45.0, # 45 seconds
"time_trace": 120.0, # 2 minutes
"2d_gate_map": 600.0 # 10 minutes
}
base_time = base_times.get(module_name, 60.0)
# Adjust based on parameters
if "integration_time" in parameters:
base_time *= parameters["integration_time"]
if "steps" in parameters:
base_time *= (parameters["steps"] / 100.0)
return min(base_time, 3600.0) # Cap at 1 hour
def _generate_simulated_data(self, module_name: str, parameters: Dict[str, Any], num_points: int) -> Dict[str, Any]:
"""Generate realistic simulated measurement data"""
import numpy as np
if module_name == "photoluminescence_scan":
x = np.linspace(630, 650, num_points) # wavelength
y = np.exp(-((x - 637)**2) / (2 * 2**2)) + 0.1 * np.random.random(len(x)) # Gaussian peak
return {"wavelength": x.tolist(), "intensity": y.tolist()}
elif module_name == "gate_sweep":
x = np.linspace(-1.0, 1.0, num_points) # gate voltage
y = np.abs(x)**2 + 0.05 * np.random.random(len(x)) # Parabolic response
return {"gate_voltage": x.tolist(), "current": y.tolist()}
elif module_name == "time_trace":
x = np.linspace(0, 10, num_points) # time
y = np.sin(2 * np.pi * x) + 0.1 * np.random.random(len(x)) # Oscillating signal
return {"time": x.tolist(), "signal": y.tolist()}
else:
# Generic data
x = np.linspace(0, 100, num_points)
y = np.random.random(len(x))
return {"x": x.tolist(), "y": y.tolist()}