Skip to main content
Glama
mcp_bridge.py9.06 kB
""" MCP Bridge for Webots Controllers. Generic, minimal bridge between ANY Webots controller and Claude Code via MCP. Requires only 3 lines of code in your controller. Usage: from mcp_bridge import MCPBridge bridge = MCPBridge(robot) # Auto-detects Supervisor bridge.publish({"pose": [x, y, theta], "mode": "navigate"}) Advanced: bridge.on_reload(reset_callback) # Auto-detect world reloads bridge.register_command("custom_action", handler_fn) """ import json from pathlib import Path from datetime import datetime from typing import Dict, Any, Optional, Callable, List class MCPBridge: """Minimal bridge between Webots controller and MCP server.""" def __init__( self, robot, data_dir: Optional[Path] = None, throttle_interval: int = 5, ): """ Initialize MCP bridge. Args: robot: Webots Robot or Supervisor instance data_dir: Path to data directory (default: auto-detect) throttle_interval: Publish every N calls (reduces I/O) """ self.robot = robot self._is_supervisor = hasattr(robot, 'simulationSetMode') # Auto-detect data directory if data_dir: self.data_dir = Path(data_dir) else: # Try common locations candidates = [ Path(__file__).parent / "data", Path.cwd() / "mcp_data", Path.home() / ".webots-mcp" / "data", ] self.data_dir = next((p for p in candidates if p.exists()), candidates[0]) # Lazy initialization flag self._initialized = False # Throttling self._update_counter = 0 self._throttle_interval = throttle_interval # Command handlers self._command_handlers: Dict[str, Callable] = {} self._last_cmd_ts = None # Reload detection self._sim_time = 0.0 self._reload_callback: Optional[Callable] = None # Files (set on first use) self.status_file: Optional[Path] = None self.commands_file: Optional[Path] = None self.log_file: Optional[Path] = None def _ensure_dirs(self): """Create directories on first use (lazy init).""" if self._initialized: return self.data_dir.mkdir(parents=True, exist_ok=True) (self.data_dir / "camera").mkdir(exist_ok=True) (self.data_dir / "screenshots").mkdir(exist_ok=True) (self.data_dir / "logs").mkdir(exist_ok=True) self.status_file = self.data_dir / "status.json" self.commands_file = self.data_dir / "commands.json" self.log_file = self.data_dir / "logs" / "controller.log" self._initialized = True def publish(self, state: Dict[str, Any], force: bool = False): """ Publish robot state to MCP server. Call every timestep with your current state dict. Throttled automatically to reduce I/O. Args: state: Dict with any fields (pose, mode, sensors, etc.) force: Bypass throttling and publish immediately """ self._update_counter += 1 if not force and self._update_counter % self._throttle_interval != 0: return self._ensure_dirs() # Add timestamp state["timestamp"] = datetime.now().isoformat() try: with open(self.status_file, 'w') as f: json.dump(state, f, indent=2) except Exception: pass def get_command(self) -> Optional[Dict[str, Any]]: """ Check for commands from MCP server. Returns: Command dict if new command, None otherwise. Built-in commands (simulation, screenshot) handled automatically. """ self._ensure_dirs() if not self.commands_file.exists(): return None try: with open(self.commands_file, 'r') as f: cmd = json.load(f) except Exception: return None # Check if new command ts = cmd.get("timestamp") if ts == self._last_cmd_ts: return None self._last_cmd_ts = ts # Handle built-in commands action = cmd.get("action") if action == "simulation": self._handle_simulation_cmd(cmd) elif action == "screenshot": self._handle_screenshot_cmd(cmd) elif action in self._command_handlers: self._command_handlers[action](cmd) return cmd def register_command(self, action: str, handler: Callable[[Dict], None]): """ Register custom command handler. Args: action: Command action name (e.g., "custom_reset") handler: Function(cmd_dict) to handle command """ self._command_handlers[action] = handler def on_reload(self, callback: Callable[[], None]): """ Register callback for world reload detection. Args: callback: Function() called when world reload is detected """ self._reload_callback = callback def detect_reload(self) -> bool: """ Detect if world was reloaded (simulation time jumped backwards). Returns: True if reload detected """ try: current = self.robot.getTime() except Exception: return False if current < self._sim_time - 0.1: # Small tolerance if self._reload_callback: self.log("World reload detected - calling reset callback") self._reload_callback() self._sim_time = 0.0 return True self._sim_time = current return False def _handle_simulation_cmd(self, cmd: Dict[str, Any]): """Handle simulation control commands.""" if not self._is_supervisor: self.log("Warning: simulation control requires Supervisor") return command = cmd.get("command") try: if command == "pause": self.robot.simulationSetMode(self.robot.SIMULATION_MODE_PAUSE) elif command == "resume": self.robot.simulationSetMode(self.robot.SIMULATION_MODE_REAL_TIME) elif command == "fast": self.robot.simulationSetMode(self.robot.SIMULATION_MODE_FAST) elif command == "reset": self.robot.simulationReset() elif command == "reload": self.robot.worldReload() elif command == "step": self.robot.step(int(self.robot.getBasicTimeStep())) self.log(f"Simulation: {command}") except Exception as e: self.log(f"Simulation control error: {e}") def _handle_screenshot_cmd(self, cmd: Dict[str, Any]): """Handle screenshot command.""" if not self._is_supervisor: return filename = cmd.get("filename", f"screenshot_{datetime.now().strftime('%H%M%S')}") path = self.data_dir / "screenshots" / f"{filename}.png" try: self.robot.exportImage(str(path), 100) self.log(f"Screenshot: {path.name}") except Exception as e: self.log(f"Screenshot error: {e}") def log(self, message: str): """Append message to log file.""" self._ensure_dirs() ts = datetime.now().strftime("%H:%M:%S") try: with open(self.log_file, 'a') as f: f.write(f"[{ts}] {message}\n") except Exception: pass def save_camera_frame(self, camera, max_frames: int = 50): """ Save camera frame to file. Args: camera: Webots Camera device max_frames: Max frames to keep (rolling) """ self._ensure_dirs() try: image = camera.getImage() if not image: return from PIL import Image w, h = camera.getWidth(), camera.getHeight() img = Image.frombytes('RGBA', (w, h), image).convert('RGB') frame_num = self._update_counter % max_frames path = self.data_dir / "camera" / f"frame_{frame_num:04d}.png" img.save(path) except ImportError: pass # PIL not available except Exception: pass # === Convenience Methods === def auto_publish( self, extra: Optional[Dict] = None, include_time: bool = True, ) -> Dict[str, Any]: """ Auto-extract and publish common robot data. Args: extra: Additional fields to merge include_time: Include simulation time Returns: Published state dict """ state = extra or {} if include_time: try: state["sim_time"] = self.robot.getTime() except Exception: pass self.publish(state) return state

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/luisfelipesena/webots-youbot-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server