Skip to main content
Glama

Moku MCP Server

by sealablab
BSD 2-Clause "Simplified" License
server.py22.5 kB
""" Moku MCP Server Implementation Skeleton structure for MCP server. See IMPLEMENTATION_GUIDE.md for details. """ import asyncio from typing import Optional from datetime import datetime, timezone from loguru import logger from moku_models import MokuConfig, MokuDeviceInfo from zeroconf import ServiceBrowser, ServiceStateChange, Zeroconf class MokuMCPServer: """ MCP server for Moku device control (Singleton Pattern). Session Model: - attach(device_id) → Connects and maintains ownership - detach() → Releases ownership (allows handoff to other clients) Tools: - discover_mokus() → List available devices on network - attach_moku(device_id) → Connect and assume ownership - release_moku() → Disconnect and release ownership - push_config(config: MokuConfig) → Deploy configuration - get_config() → Retrieve current configuration - set_routing(connections: list) → Configure MCC routing - get_device_info() → Query device metadata - list_slots() → Show configured slots Implementation: See IMPLEMENTATION_GUIDE.md """ _instance = None @classmethod def get_instance(cls): """ Get singleton instance of MokuMCPServer. Returns: The single MokuMCPServer instance """ if cls._instance is None: cls._instance = cls() return cls._instance def __init__(self): """ Initialize MCP server (stateless at start). Raises: RuntimeError: If attempting to create multiple instances """ if MokuMCPServer._instance is not None: raise RuntimeError("Use get_instance() instead of direct instantiation") self.connected_device: Optional[str] = None self.moku_instance = None self.last_config: Optional[MokuConfig] = None # Cache for get_config() logger.info("MokuMCPServer singleton initialized") async def discover_mokus(self, timeout: int = 2): """ Discover Moku devices on network via zeroconf. Args: timeout: Discovery timeout in seconds (default: 2) Returns: { "devices": [ { "ip": "192.168.1.100", "name": "Lilo", "serial": "MG106B", "port": 80, "last_seen": "2025-10-25T20:00:00Z" } ], "count": 1 } Implementation: See IMPLEMENTATION_GUIDE.md Section 3.1 """ from moku import Moku from .utils import update_cache_with_device discovered = [] zc = Zeroconf() def on_service_change(zeroconf, service_type, name, state_change): """Handle zeroconf service discovery events.""" if state_change == ServiceStateChange.Added: info = zeroconf.get_service_info(service_type, name) if info: # Extract IPv4 address addresses = info.parsed_addresses() ipv4 = [addr for addr in addresses if ":" not in addr] ip = ipv4[0] if ipv4 else addresses[0] if addresses else None if ip: device = MokuDeviceInfo( ip=ip, port=info.port if info.port else 80, zeroconf_name=name, last_seen=datetime.now(timezone.utc).isoformat(), ) discovered.append(device) logger.info(f"Discovered device at {ip}:{info.port}") # Start discovery browser = ServiceBrowser(zc, "_moku._tcp.local.", handlers=[on_service_change]) # Wait for discovery await asyncio.sleep(timeout) # Close zeroconf zc.close() # Enrich with metadata (name, serial) via Moku API for device in discovered: try: moku = Moku(ip=device.ip, force_connect=False, connect_timeout=5) device.canonical_name = moku.name() device.serial_number = moku.serial_number() moku.relinquish_ownership() # Update cache with enriched info update_cache_with_device( ip=device.ip, name=device.canonical_name, serial=device.serial_number, port=device.port, ) logger.info( f"Enriched device: {device.canonical_name} ({device.serial_number}) at {device.ip}" ) except Exception as e: logger.warning(f"Could not get metadata for {device.ip}: {e}") result = {"devices": [d.model_dump() for d in discovered], "count": len(discovered)} logger.info(f"Discovery complete: found {len(discovered)} devices") return result async def attach_moku(self, device_id: str, force: bool = False): """ Connect to Moku device and assume ownership. Args: device_id: IP address, name, or serial number force: Force connection even if owned by another client Returns: { "status": "connected", "device": { "ip": "192.168.1.100", "name": "Lilo", "serial": "MG106B", "platform": "Moku:Go" } } Implementation: See IMPLEMENTATION_GUIDE.md Section 3.2 """ from moku.instruments import MultiInstrument from .utils import resolve_device_identifier, load_device_cache # Check if already connected if self.moku_instance: if self.connected_device == device_id: return { "status": "already_connected", "message": f"Already connected to {device_id}", "device": {"ip": self.connected_device, "platform": "Moku:Go"}, } else: return { "status": "error", "message": f"Already connected to {self.connected_device}. Release first.", "suggestion": "Call release_moku() before connecting to a different device", } # Resolve device_id to IP ip = resolve_device_identifier(device_id) if not ip: # If not in cache, check if it's a valid IP if "." in device_id and device_id.replace(".", "").isdigit(): ip = device_id else: return { "status": "error", "message": f"Device '{device_id}' not found in cache", "suggestion": "Run discover_mokus() first to find devices", } # Try to connect (platform_id=2 for Moku:Go) try: logger.info(f"Attempting to connect to {ip} (force={force})") self.moku_instance = MultiInstrument(ip, platform_id=2, force_connect=force) self.connected_device = ip # Get device info from cache cache = load_device_cache() device_info = cache.find_by_ip(ip) logger.info(f"Successfully connected to Moku at {ip}") return { "status": "connected", "device": { "ip": ip, "name": device_info.canonical_name if device_info else "Unknown", "serial": device_info.serial_number if device_info else "Unknown", "platform": "Moku:Go", }, } except ConnectionError as e: logger.error(f"Connection failed: {e}") return { "status": "error", "message": f"Could not connect to {ip}. Device may be offline or owned by another client.", "suggestion": "Try with force=True to take ownership, or wait for current owner to disconnect.", "details": str(e), } except Exception as e: logger.error(f"Unexpected error connecting to {ip}: {e}") self.moku_instance = None self.connected_device = None return { "status": "error", "message": f"Failed to connect to {ip}", "details": str(e), } async def release_moku(self): """ Disconnect from Moku and release ownership. Returns: { "status": "disconnected", "device": "192.168.1.100" } Implementation: See IMPLEMENTATION_GUIDE.md Section 3.3 """ if not self.moku_instance: return { "status": "not_connected", "message": "No active connection to release", } try: # Store device info before releasing device = self.connected_device # Release ownership self.moku_instance.relinquish_ownership() # Clear internal state self.moku_instance = None self.connected_device = None self.last_config = None logger.info(f"Released Moku at {device}") return {"status": "disconnected", "device": device} except Exception as e: logger.error(f"Failed to release Moku: {e}") # Clear state anyway to avoid stuck connections self.moku_instance = None self.connected_device = None self.last_config = None return { "status": "error", "message": "Error releasing device, but cleared internal state", "details": str(e), } async def push_config(self, config_dict: dict): """ Deploy MokuConfig to connected device. Args: config_dict: MokuConfig serialized as dict Returns: { "status": "deployed", "slots_configured": [1, 2], "routing_configured": True } Implementation: See IMPLEMENTATION_GUIDE.md Section 3.4 """ from moku.instruments import CloudCompile, Oscilloscope from pydantic import ValidationError if not self.moku_instance: return { "status": "error", "message": "Not connected to any device", "suggestion": "Call attach_moku first", } # Validate and parse config try: config = MokuConfig.model_validate(config_dict) except ValidationError as e: logger.error(f"Invalid config: {e}") return {"status": "error", "message": "Invalid MokuConfig", "errors": e.errors()} # Validate routing errors = config.validate_routing() if errors: return { "status": "error", "message": "Invalid routing configuration", "errors": errors, } deployed_slots = [] # Deploy instruments to slots for slot_num, slot_config in config.slots.items(): try: if slot_config.instrument == "CloudCompile": if not slot_config.bitstream: logger.warning(f"Slot {slot_num}: No bitstream specified") continue logger.info(f"Deploying CloudCompile to slot {slot_num}") self.moku_instance.set_instrument( slot_num, CloudCompile, bitstream=slot_config.bitstream ) # Apply control registers if specified if slot_config.control_registers: cc = self.moku_instance.get_instrument(slot_num) for reg, value in slot_config.control_registers.items(): cc.write_register(reg, value) logger.debug(f"Slot {slot_num}: Set register {reg} = {value:#x}") deployed_slots.append(slot_num) logger.info(f"Successfully deployed CloudCompile to slot {slot_num}") elif slot_config.instrument == "Oscilloscope": logger.info(f"Deploying Oscilloscope to slot {slot_num}") osc = self.moku_instance.set_instrument(slot_num, Oscilloscope) # Apply settings if specified if slot_config.settings and "timebase" in slot_config.settings: osc.set_timebase(*slot_config.settings["timebase"]) logger.debug(f"Slot {slot_num}: Set timebase {slot_config.settings['timebase']}") deployed_slots.append(slot_num) logger.info(f"Successfully deployed Oscilloscope to slot {slot_num}") else: logger.warning( f"Slot {slot_num}: Instrument '{slot_config.instrument}' not supported yet" ) except Exception as e: logger.error(f"Failed to deploy slot {slot_num}: {e}") return { "status": "error", "message": f"Failed to deploy instrument to slot {slot_num}", "details": str(e), "slots_deployed": deployed_slots, } # Configure routing routing_configured = False if config.routing: try: # Convert routing to dict format expected by Moku API connections = [] for conn in config.routing: connections.append( {"source": conn.source, "destination": conn.destination} ) self.moku_instance.set_connections(connections) routing_configured = True logger.info(f"Configured {len(connections)} routing connections") except Exception as e: logger.error(f"Failed to configure routing: {e}") return { "status": "partial_success", "message": "Instruments deployed but routing failed", "slots_configured": deployed_slots, "routing_error": str(e), } # Cache the config for get_config() self.last_config = config return { "status": "deployed", "slots_configured": deployed_slots, "routing_configured": routing_configured, } async def get_config(self): """ Retrieve current device configuration. Returns: { "platform": {...}, "slots": {...}, "routing": [...] } Implementation: See IMPLEMENTATION_GUIDE.md Section 3.5 """ from moku_models import MokuConfig, SlotConfig, MOKU_GO_PLATFORM if not self.moku_instance: return { "status": "error", "message": "Not connected to any device", "suggestion": "Call attach_moku first", } # If we have a cached config from push_config, use it if self.last_config: logger.info("Returning cached configuration") return self.last_config.model_dump() # Otherwise, try to reconstruct config from device state # NOTE: The Moku API may not provide full config retrieval logger.info("Reconstructing config from device state (best effort)") slots = {} # Query each slot (1-4 for Moku:Go) for slot_num in range(1, 5): try: instrument = self.moku_instance.get_instrument(slot_num) if instrument: slots[slot_num] = SlotConfig( instrument=instrument.__class__.__name__, settings={}, # TODO: Extract settings from instrument if API supports ) logger.debug(f"Slot {slot_num}: Found {instrument.__class__.__name__}") except Exception: # Slot not configured or error accessing it pass # Routing is harder to query - the Moku API doesn't provide a way to retrieve it # We can only return what we cached during push_config routing = [] if self.last_config and self.last_config.routing: routing = self.last_config.routing # Build config object platform_config = MOKU_GO_PLATFORM.model_copy( update={"ip_address": self.connected_device} ) config = MokuConfig(platform=platform_config, slots=slots, routing=routing) return config.model_dump() async def set_routing(self, connections: list): """ Configure MCC signal routing. Args: connections: List of {"source": "...", "destination": "..."} dicts Returns: { "status": "configured", "connections_count": 2 } Implementation: See IMPLEMENTATION_GUIDE.md Section 3.6 """ from moku_models import MokuConnection from pydantic import ValidationError if not self.moku_instance: return { "status": "error", "message": "Not connected to any device", "suggestion": "Call attach_moku first", } # Validate connections try: parsed_connections = [MokuConnection(**conn) for conn in connections] except ValidationError as e: logger.error(f"Invalid connection format: {e}") return { "status": "error", "message": "Invalid connection format", "errors": e.errors(), } # Apply to hardware try: self.moku_instance.set_connections(connections) logger.info(f"Configured {len(connections)} routing connections") # Update cached config if we have one if self.last_config: self.last_config.routing = parsed_connections return {"status": "configured", "connections_count": len(connections)} except Exception as e: logger.error(f"Failed to configure routing: {e}") return { "status": "error", "message": "Failed to configure routing", "details": str(e), } async def get_device_info(self): """ Query device metadata (name, serial, IP, etc.). Returns: { "ip": "192.168.1.100", "name": "Lilo", "serial": "MG106B", "platform": "Moku:Go", "connected": true } Implementation: See IMPLEMENTATION_GUIDE.md Section 3.7 """ from moku import Moku if not self.moku_instance: return { "status": "error", "message": "Not connected to any device", "suggestion": "Call attach_moku first", } try: # Query via Moku API # Create a temporary Moku instance to query metadata # (We need this because MultiInstrument doesn't expose these methods directly) temp_moku = Moku(ip=self.connected_device, force_connect=False, connect_timeout=5) try: name = temp_moku.name() serial = temp_moku.serial_number() finally: # Always release ownership on the temp connection temp_moku.relinquish_ownership() info = { "ip": self.connected_device, "name": name, "serial": serial, "platform": "Moku:Go", # Inferred from platform_id=2 "connected": True, } logger.info(f"Device info: {name} ({serial}) at {self.connected_device}") return info except Exception as e: logger.error(f"Failed to query device info: {e}") return { "status": "error", "message": "Failed to query device information", "details": str(e), } async def list_slots(self): """ List configured instrument slots. Returns: { "slots": { "1": {"instrument": "CloudCompile", "configured": true}, "2": {"instrument": "Oscilloscope", "configured": true}, "3": {"configured": false}, "4": {"configured": false} } } Implementation: See IMPLEMENTATION_GUIDE.md Section 3.8 """ if not self.moku_instance: return { "status": "error", "message": "Not connected to any device", "suggestion": "Call attach_moku first", } slots = {} # Query each slot (1-4 for Moku:Go) for slot_num in range(1, 5): try: instrument = self.moku_instance.get_instrument(slot_num) if instrument: slots[str(slot_num)] = { "instrument": instrument.__class__.__name__, "configured": True, } logger.debug(f"Slot {slot_num}: {instrument.__class__.__name__}") else: slots[str(slot_num)] = {"configured": False} logger.debug(f"Slot {slot_num}: Empty") except Exception as e: # Slot not configured or error accessing it slots[str(slot_num)] = {"configured": False} logger.debug(f"Slot {slot_num}: Not configured or error: {e}") logger.info(f"Slot status: {sum(1 for s in slots.values() if s.get('configured'))} configured") return {"slots": slots}

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/sealablab/moku-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server