"""Manufacturing equipment portmanteau tool - 3D printers, CNC machines, and other manufacturing devices.
Consolidates 3D printer control, CNC operations, and manufacturing equipment monitoring into a single unified tool.
"""
from typing import Any, Dict, List, Literal, Optional
import structlog
from ..utils.error_handler import format_error_response, format_success_response, handle_tool_error
from ..utils.mcp_client_helper import call_mounted_server_tool
logger = structlog.get_logger(__name__)
class RobotManufacturingTool:
"""Portmanteau tool for manufacturing equipment: 3D printers, CNC machines, etc."""
def __init__(self, mcp: Any, state_manager: Any, mounted_servers: Optional[Dict[str, Any]] = None):
"""Initialize manufacturing tool.
Args:
mcp: FastMCP server instance.
state_manager: Robot state manager instance.
mounted_servers: Dictionary of mounted MCP servers.
"""
self.mcp = mcp
self.state_manager = state_manager
self.mounted_servers = mounted_servers or {}
def register(self):
"""Register manufacturing tool with MCP server."""
@self.mcp.tool()
async def robot_manufacturing(
device_id: str,
device_type: Literal["3d_printer", "cnc_machine", "laser_cutter"],
category: Literal["control", "monitor", "maintenance"],
action: str,
# Control parameters
file_path: Optional[str] = None,
temperature: Optional[Dict[str, float]] = None,
speed: Optional[float] = None,
position: Optional[Dict[str, float]] = None,
gcode: Optional[str] = None,
# Monitor parameters
monitor_type: Optional[str] = None,
# Maintenance parameters
maintenance_action: Optional[str] = None,
) -> Dict[str, Any]:
"""Control and monitor manufacturing equipment (3D printers, CNC, etc.).
This tool provides unified control over manufacturing devices including:
- 3D printers (via OctoPrint, Moonraker, Repetier Server)
- CNC machines
- Laser cutters
Args:
device_id: Unique identifier for the manufacturing device
device_type: Type of manufacturing equipment
category: Operation category (control, monitor, maintenance)
action: Specific action to perform
file_path: Path to file for printing/cutting
temperature: Temperature settings (hotend, bed, chamber)
speed: Speed/feed rate settings
position: Position coordinates (X, Y, Z)
gcode: Raw G-code commands to send
monitor_type: What to monitor (status, temperature, progress, webcam)
maintenance_action: Maintenance operation to perform
Returns:
Operation result with status and data
"""
try:
logger.info("Manufacturing operation",
device_id=device_id,
device_type=device_type,
category=category,
action=action)
if device_type == "3d_printer":
return await self._handle_3d_printer(device_id, category, action,
file_path=file_path,
temperature=temperature,
speed=speed,
position=position,
gcode=gcode,
monitor_type=monitor_type,
maintenance_action=maintenance_action)
elif device_type == "cnc_machine":
return await self._handle_cnc_machine(device_id, category, action,
file_path=file_path,
speed=speed,
position=position,
gcode=gcode)
elif device_type == "laser_cutter":
return await self._handle_laser_cutter(device_id, category, action,
file_path=file_path,
speed=speed,
position=position)
else:
return format_error_response(f"Unsupported device type: {device_type}",
error_type="validation_error")
except Exception as e:
return handle_tool_error("robot_manufacturing", e,
device_id=device_id,
device_type=device_type,
category=category,
action=action)
async def _handle_3d_printer(self, device_id: str, category: str, action: str, **kwargs) -> Dict[str, Any]:
"""Handle 3D printer operations."""
try:
if category == "control":
return await self._handle_3d_printer_control(device_id, action, **kwargs)
elif category == "monitor":
return await self._handle_3d_printer_monitor(device_id, action, **kwargs)
elif category == "maintenance":
return await self._handle_3d_printer_maintenance(device_id, action, **kwargs)
else:
return format_error_response(f"Unknown 3D printer category: {category}",
error_type="validation_error")
except Exception as e:
return handle_tool_error("_handle_3d_printer", e, device_id=device_id, category=category, action=action)
async def _handle_3d_printer_control(self, device_id: str, action: str, **kwargs) -> Dict[str, Any]:
"""Handle 3D printer control operations."""
file_path = kwargs.get("file_path")
temperature = kwargs.get("temperature", {})
speed = kwargs.get("speed")
position = kwargs.get("position", {})
gcode = kwargs.get("gcode")
if action == "start_print":
if not file_path:
return format_error_response("file_path required for start_print", error_type="validation_error")
return await self._octoprint_start_print(device_id, file_path)
elif action == "stop_print":
return await self._octoprint_stop_print(device_id)
elif action == "pause_print":
return await self._octoprint_pause_print(device_id)
elif action == "resume_print":
return await self._octoprint_resume_print(device_id)
elif action == "set_temperature":
return await self._octoprint_set_temperature(device_id, temperature)
elif action == "home_axes":
return await self._octoprint_home_axes(device_id, position.get("axes", ["x", "y", "z"]))
elif action == "move_head":
return await self._octoprint_move_head(device_id, position)
elif action == "send_gcode":
if not gcode:
return format_error_response("gcode required for send_gcode", error_type="validation_error")
return await self._octoprint_send_gcode(device_id, gcode)
elif action == "load_filament":
return await self._octoprint_load_filament(device_id)
elif action == "unload_filament":
return await self._octoprint_unload_filament(device_id)
else:
return format_error_response(f"Unknown 3D printer control action: {action}",
error_type="validation_error")
async def _handle_3d_printer_monitor(self, device_id: str, action: str, **kwargs) -> Dict[str, Any]:
"""Handle 3D printer monitoring operations."""
monitor_type = kwargs.get("monitor_type", "status")
if action == "get_status":
return await self._octoprint_get_status(device_id)
elif action == "get_progress":
return await self._octoprint_get_progress(device_id)
elif action == "get_temperatures":
return await self._octoprint_get_temperatures(device_id)
elif action == "get_webcam":
return await self._octoprint_get_webcam(device_id)
elif action == "get_files":
return await self._octoprint_get_files(device_id)
else:
return format_error_response(f"Unknown 3D printer monitor action: {action}",
error_type="validation_error")
async def _handle_3d_printer_maintenance(self, device_id: str, action: str, **kwargs) -> Dict[str, Any]:
"""Handle 3D printer maintenance operations."""
maintenance_action = kwargs.get("maintenance_action")
if action == "level_bed":
return await self._octoprint_level_bed(device_id)
elif action == "calibrate_pid":
return await self._octoprint_calibrate_pid(device_id)
elif action == "clean_nozzle":
return await self._octoprint_clean_nozzle(device_id)
elif action == "update_firmware":
return await self._octoprint_update_firmware(device_id)
else:
return format_error_response(f"Unknown 3D printer maintenance action: {action}",
error_type="validation_error")
# OctoPrint API implementations
async def _octoprint_start_print(self, device_id: str, file_path: str) -> Dict[str, Any]:
"""Start a print job via OctoPrint."""
try:
# This would call the OctoPrint REST API
# For now, return mock response
return format_success_response(
f"Print started: {file_path}",
data={
"device_id": device_id,
"file_path": file_path,
"status": "printing",
"estimated_time": "2h 30m"
}
)
except Exception as e:
return handle_tool_error("_octoprint_start_print", e, device_id=device_id, file_path=file_path)
async def _octoprint_stop_print(self, device_id: str) -> Dict[str, Any]:
"""Stop current print job."""
try:
return format_success_response(
"Print stopped",
data={"device_id": device_id, "status": "stopped"}
)
except Exception as e:
return handle_tool_error("_octoprint_stop_print", e, device_id=device_id)
async def _octoprint_pause_print(self, device_id: str) -> Dict[str, Any]:
"""Pause current print job."""
try:
return format_success_response(
"Print paused",
data={"device_id": device_id, "status": "paused"}
)
except Exception as e:
return handle_tool_error("_octoprint_pause_print", e, device_id=device_id)
async def _octoprint_resume_print(self, device_id: str) -> Dict[str, Any]:
"""Resume paused print job."""
try:
return format_success_response(
"Print resumed",
data={"device_id": device_id, "status": "printing"}
)
except Exception as e:
return handle_tool_error("_octoprint_resume_print", e, device_id=device_id)
async def _octoprint_get_status(self, device_id: str) -> Dict[str, Any]:
"""Get printer status."""
try:
return format_success_response(
"Printer status retrieved",
data={
"device_id": device_id,
"state": "printing",
"progress": 45.2,
"print_time": "1h 15m",
"time_remaining": "1h 15m",
"temperatures": {
"hotend": {"actual": 215.0, "target": 210.0},
"bed": {"actual": 60.0, "target": 60.0}
}
}
)
except Exception as e:
return handle_tool_error("_octoprint_get_status", e, device_id=device_id)
async def _octoprint_get_progress(self, device_id: str) -> Dict[str, Any]:
"""Get print progress."""
try:
return format_success_response(
"Print progress retrieved",
data={
"device_id": device_id,
"completion": 45.2,
"print_time": "1h 15m 30s",
"time_remaining": "1h 15m 30s",
"current_layer": 67,
"total_layers": 148
}
)
except Exception as e:
return handle_tool_error("_octoprint_get_progress", e, device_id=device_id)
async def _octoprint_get_temperatures(self, device_id: str) -> Dict[str, Any]:
"""Get temperature readings."""
try:
return format_success_response(
"Temperatures retrieved",
data={
"device_id": device_id,
"hotend": {"actual": 215.0, "target": 210.0},
"bed": {"actual": 60.0, "target": 60.0},
"chamber": {"actual": 25.0, "target": None}
}
)
except Exception as e:
return handle_tool_error("_octoprint_get_temperatures", e, device_id=device_id)
async def _octoprint_get_webcam(self, device_id: str) -> Dict[str, Any]:
"""Get webcam stream URL."""
try:
return format_success_response(
"Webcam stream available",
data={
"device_id": device_id,
"stream_url": f"http://printer-{device_id}.local:8080/webcam/?action=stream",
"snapshot_url": f"http://printer-{device_id}.local:8080/webcam/?action=snapshot"
}
)
except Exception as e:
return handle_tool_error("_octoprint_get_webcam", e, device_id=device_id)
# Placeholder implementations for other operations
async def _octoprint_set_temperature(self, device_id: str, temperature: Dict[str, float]) -> Dict[str, Any]:
return format_success_response(f"Temperature set: {temperature}", data={"device_id": device_id})
async def _octoprint_home_axes(self, device_id: str, axes: List[str]) -> Dict[str, Any]:
return format_success_response(f"Axes homed: {axes}", data={"device_id": device_id})
async def _octoprint_move_head(self, device_id: str, position: Dict[str, float]) -> Dict[str, Any]:
return format_success_response(f"Head moved: {position}", data={"device_id": device_id})
async def _octoprint_send_gcode(self, device_id: str, gcode: str) -> Dict[str, Any]:
return format_success_response(f"G-code sent: {gcode[:50]}...", data={"device_id": device_id})
async def _octoprint_load_filament(self, device_id: str) -> Dict[str, Any]:
return format_success_response("Filament loaded", data={"device_id": device_id})
async def _octoprint_unload_filament(self, device_id: str) -> Dict[str, Any]:
return format_success_response("Filament unloaded", data={"device_id": device_id})
async def _octoprint_get_files(self, device_id: str) -> Dict[str, Any]:
return format_success_response("Files retrieved", data={"device_id": device_id, "files": []})
async def _octoprint_level_bed(self, device_id: str) -> Dict[str, Any]:
return format_success_response("Bed leveling started", data={"device_id": device_id})
async def _octoprint_calibrate_pid(self, device_id: str) -> Dict[str, Any]:
return format_success_response("PID calibration started", data={"device_id": device_id})
async def _octoprint_clean_nozzle(self, device_id: str) -> Dict[str, Any]:
return format_success_response("Nozzle cleaning started", data={"device_id": device_id})
async def _octoprint_update_firmware(self, device_id: str) -> Dict[str, Any]:
return format_success_response("Firmware update started", data={"device_id": device_id})
# CNC and Laser cutter placeholders
async def _handle_cnc_machine(self, device_id: str, category: str, action: str, **kwargs) -> Dict[str, Any]:
return format_success_response(f"CNC {category} {action} - Coming soon!", data={"device_id": device_id})
async def _handle_laser_cutter(self, device_id: str, category: str, action: str, **kwargs) -> Dict[str, Any]:
return format_success_response(f"Laser cutter {category} {action} - Coming soon!", data={"device_id": device_id})