Skip to main content
Glama

qudi MCP Integration

by dirkenglund
simple_mcp_server.py•18.3 kB
#!/usr/bin/env python3 """ Simple MCP Server for qudi integration - Claude Desktop compatible This version works without the full MCP package by implementing a minimal stdio transport that Claude Desktop can understand. """ import json import sys import asyncio import logging from pathlib import Path # Add project root to path project_root = Path(__file__).parent sys.path.insert(0, str(project_root)) from safety import RunLevel, SafetyChecker from tools.instrument_tools import InstrumentTools from tools.measurement_tools import MeasurementTools from tools.safety_tools import SafetyTools # Optional plot extraction tools try: from tools.plot_extraction_tools import PlotExtractionTools PLOT_EXTRACTION_AVAILABLE = True except ImportError: PLOT_EXTRACTION_AVAILABLE = False class SimpleMCPServer: """Minimal MCP server that works with Claude Desktop""" def __init__(self): self.logger = self._setup_logging() self.runlevel = RunLevel.DRY_RUN self.safety_checker = SafetyChecker() self.instruments = {} self.measurement_state = {} # Tool modules self.instrument_tools = InstrumentTools(self) self.measurement_tools = MeasurementTools(self) self.safety_tools = SafetyTools(self) # Optional plot extraction tools if PLOT_EXTRACTION_AVAILABLE: self.plot_tools = PlotExtractionTools(self) else: self.plot_tools = None def _setup_logging(self): """Setup logging for debugging""" logger = logging.getLogger("qudi-mcp-simple") logger.setLevel(logging.INFO) if not logger.handlers: handler = logging.StreamHandler(sys.stderr) formatter = logging.Formatter('%(asctime)s - qudi-mcp - %(levelname)s - %(message)s') handler.setFormatter(formatter) logger.addHandler(handler) return logger def get_tools(self): """Return list of available tools""" tools = [ { "name": "station_info", "description": "Get qudi station configuration and status", "inputSchema": { "type": "object", "properties": {}, "required": [] } }, { "name": "instrument_list", "description": "List available instruments in qudi station", "inputSchema": { "type": "object", "properties": {}, "required": [] } }, { "name": "instrument_load", "description": "Load and initialize an instrument", "inputSchema": { "type": "object", "properties": { "instrument_name": {"type": "string", "description": "Name of instrument to load"} }, "required": ["instrument_name"] } }, { "name": "measurement_list_modules", "description": "List available measurement modules", "inputSchema": { "type": "object", "properties": {}, "required": [] } }, { "name": "measurement_start", "description": "Start a measurement with specified parameters", "inputSchema": { "type": "object", "properties": { "module_name": {"type": "string", "description": "Name of measurement module"}, "parameters": {"type": "object", "description": "Measurement parameters"} }, "required": ["module_name", "parameters"] } }, { "name": "safety_check_interlocks", "description": "Check all safety interlocks and system status", "inputSchema": { "type": "object", "properties": {}, "required": [] } }, { "name": "safety_validate_parameter", "description": "Validate a parameter value against safety limits", "inputSchema": { "type": "object", "properties": { "parameter": {"type": "string", "description": "Parameter name"}, "value": {"type": "number", "description": "Parameter value"} }, "required": ["parameter", "value"] } }, { "name": "safety_set_runlevel", "description": "Set system runlevel (dry-run, sim, live)", "inputSchema": { "type": "object", "properties": { "runlevel": {"type": "string", "enum": ["dry-run", "sim", "live"]}, "reason": {"type": "string", "description": "Reason for change"} }, "required": ["runlevel"] } }, { "name": "system_emergency_stop", "description": "Emergency stop all operations and measurements", "inputSchema": { "type": "object", "properties": { "reason": {"type": "string", "description": "Reason for emergency stop"} } } }, { "name": "feedback_submit", "description": "Submit feedback about qudi MCP integration - usage suggestions, issues, improvements", "inputSchema": { "type": "object", "properties": { "feedback_type": {"type": "string", "enum": ["bug_report", "feature_request", "usage_improvement", "general"], "description": "Type of feedback"}, "message": {"type": "string", "description": "Your feedback message"}, "user_context": {"type": "string", "description": "Optional context about what you were trying to do"} }, "required": ["feedback_type", "message"] } } ] # Add plot extraction tools if available if PLOT_EXTRACTION_AVAILABLE: plot_tools = [ { "name": "plot_extract_data", "description": "Extract x,y data points from scientific plot images using computer vision", "inputSchema": { "type": "object", "properties": { "image_path": {"type": "string", "description": "Path to the plot image file"} }, "required": ["image_path"] } }, { "name": "plot_extract_spectrum", "description": "Extract and smooth spectrum data from plot images using RKHS projection", "inputSchema": { "type": "object", "properties": { "image_path": {"type": "string", "description": "Path to spectrum image file"}, "wavelength_range": {"type": "array", "items": {"type": "number"}, "description": "Min/max wavelength range [nm]", "default": [400, 800]}, "epsilon": {"type": "number", "description": "RKHS kernel width parameter", "default": 0.05}, "lambda_reg": {"type": "number", "description": "Regularization parameter", "default": 0.001} }, "required": ["image_path"] } }, { "name": "plot_analyze_with_rkhs", "description": "Apply RKHS spline projection to smooth and analyze data arrays", "inputSchema": { "type": "object", "properties": { "x_data": {"type": "array", "items": {"type": "number"}, "description": "X coordinate data"}, "y_data": {"type": "array", "items": {"type": "number"}, "description": "Y coordinate data"}, "epsilon": {"type": "number", "description": "Kernel width parameter", "default": 0.05}, "lambda_reg": {"type": "number", "description": "Regularization parameter", "default": 0.001}, "kernel_type": {"type": "string", "enum": ["gaussian", "rbf", "polynomial"], "default": "gaussian"}, "prediction_points": {"type": "integer", "description": "Number of points for smooth curve", "default": 100} }, "required": ["x_data", "y_data"] } }, { "name": "plot_list_capabilities", "description": "List plot extraction capabilities and supported formats", "inputSchema": { "type": "object", "properties": {}, "required": [] } } ] tools.extend(plot_tools) return tools async def call_tool(self, name: str, arguments: dict): """Call a tool and return results""" self.logger.info(f"Tool called: {name} with args: {arguments}") try: # Route to appropriate tool module if name.startswith("station_"): result = await self._route_station_tool(name, arguments) elif name.startswith("instrument_"): # Convert instrument_list -> instrument.list format converted_name = name.replace("instrument_", "instrument.") result = await self.instrument_tools.handle_tool(converted_name, arguments) elif name.startswith("measurement_"): # Convert measurement_list_modules -> measurement.list_modules format converted_name = name.replace("measurement_", "measurement.") result = await self.measurement_tools.handle_tool(converted_name, arguments) elif name.startswith("safety_") or name.startswith("system_"): # Convert safety_check_interlocks -> safety.check_interlocks format if name.startswith("safety_"): converted_name = name.replace("safety_", "safety.") elif name.startswith("system_"): converted_name = name.replace("system_", "system.") else: converted_name = name result = await self.safety_tools.handle_tool(converted_name, arguments) elif name.startswith("feedback_"): result = await self._route_feedback_tool(name, arguments) elif name.startswith("plot_") and self.plot_tools: result = await self.plot_tools.handle_tool(name, arguments) else: result = {"error": f"Unknown tool: {name}"} self.logger.info(f"Tool result: {result}") return result except Exception as e: self.logger.error(f"Tool {name} failed: {e}", exc_info=True) return {"error": f"Tool execution failed: {str(e)}"} async def _route_station_tool(self, name: str, arguments: dict): """Route station management tools""" if name == "station_info": return { "status": "operational", "runlevel": self.runlevel.value, "station_loaded": True, "instruments_loaded": len(self.instruments), "active_measurements": len(self.measurement_state), "safety_status": "active", "message": f"qudi MCP integration running in {self.runlevel.value} mode" } else: return {"error": f"Unknown station tool: {name}"} async def _route_feedback_tool(self, name: str, arguments: dict): """Route feedback tools""" if name == "feedback_submit": feedback_type = arguments.get("feedback_type", "general") message = arguments.get("message", "") user_context = arguments.get("user_context", "") # Log feedback for analysis self.logger.info(f"FEEDBACK RECEIVED - Type: {feedback_type}") self.logger.info(f"FEEDBACK MESSAGE: {message}") if user_context: self.logger.info(f"FEEDBACK CONTEXT: {user_context}") # TODO: In production, this would create a GitHub issue # For now, just acknowledge receipt return { "status": "received", "message": "Thank you for your feedback! It has been logged for review.", "feedback_id": f"qudi-mcp-{hash(message) % 10000:04d}", "next_steps": "Your feedback will be reviewed and may result in GitHub issues or improvements to the qudi MCP integration." } else: return {"error": f"Unknown feedback tool: {name}"} async def handle_message(self, message): """Handle incoming MCP messages""" try: # Get ID, default to 0 if None msg_id = message.get("id", 0) method = message.get("method", "unknown") # Log all incoming messages for user interaction tracking self.logger.info(f"USER INTERACTION - Method: {method}, ID: {msg_id}") if method == "tools/call": params = message.get("params", {}) tool_name = params.get("name", "unknown") self.logger.info(f"USER TOOL CALL - Tool: {tool_name}, Args: {params.get('arguments', {})}") # Parse incoming message if message.get("method") == "tools/list": return { "jsonrpc": "2.0", "id": msg_id, "result": { "tools": self.get_tools() } } elif message.get("method") == "tools/call": params = message.get("params", {}) tool_name = params.get("name") tool_args = params.get("arguments", {}) result = await self.call_tool(tool_name, tool_args) return { "jsonrpc": "2.0", "id": msg_id, "result": { "content": [ { "type": "text", "text": json.dumps(result, indent=2) } ] } } elif message.get("method") == "initialize": return { "jsonrpc": "2.0", "id": msg_id, "result": { "protocolVersion": "2024-11-05", "capabilities": { "tools": {} }, "serverInfo": { "name": "qudi-mcp", "version": "0.1.0" } } } else: return { "jsonrpc": "2.0", "id": msg_id, "error": { "code": -32601, "message": f"Method not found: {message.get('method')}" } } except Exception as e: self.logger.error(f"Message handling failed: {e}", exc_info=True) msg_id = message.get("id", 0) if message else 0 return { "jsonrpc": "2.0", "id": msg_id, "error": { "code": -32603, "message": f"Internal error: {str(e)}" } } async def run(self): """Run the MCP server on stdio""" self.logger.info("Starting qudi MCP server on stdio") print("qudi MCP server running on stdio", file=sys.stderr) # Read from stdin and write to stdout while True: try: line = sys.stdin.readline() if not line: break line = line.strip() if not line: continue # Parse JSON message try: message = json.loads(line) except json.JSONDecodeError as e: self.logger.error(f"Invalid JSON: {e}") continue # Handle message response = await self.handle_message(message) # Send response if response: print(json.dumps(response), flush=True) except KeyboardInterrupt: break except Exception as e: self.logger.error(f"Server error: {e}", exc_info=True) self.logger.info("qudi MCP server shutting down") async def main(): """Main entry point""" server = SimpleMCPServer() await server.run() if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/dirkenglund/qudi-mcp-integration'

If you have feedback or need assistance with the MCP directory API, please join our Discord server