Skip to main content
Glama

OpenSCAD MCP Server

by jhacksman
printer_discovery.py16.5 kB
import os import logging import socket import json import time import threading from typing import Dict, List, Any, Optional, Callable logger = logging.getLogger(__name__) class PrinterDiscovery: """ Discovers 3D printers on the network and provides interfaces for direct printing. """ def __init__(self): """Initialize the printer discovery service.""" self.printers = {} # Dictionary of discovered printers self.discovery_thread = None self.discovery_stop_event = threading.Event() self.discovery_callback = None def start_discovery(self, callback: Optional[Callable[[Dict[str, Any]], None]] = None) -> None: """ Start discovering 3D printers on the network. Args: callback: Optional callback function to call when a printer is discovered """ if self.discovery_thread and self.discovery_thread.is_alive(): logger.warning("Printer discovery already running") return self.discovery_callback = callback self.discovery_stop_event.clear() self.discovery_thread = threading.Thread(target=self._discover_printers) self.discovery_thread.daemon = True self.discovery_thread.start() logger.info("Started printer discovery") def stop_discovery(self) -> None: """Stop discovering 3D printers.""" if self.discovery_thread and self.discovery_thread.is_alive(): self.discovery_stop_event.set() self.discovery_thread.join(timeout=2.0) logger.info("Stopped printer discovery") else: logger.warning("Printer discovery not running") def get_printers(self) -> Dict[str, Any]: """ Get the list of discovered printers. Returns: Dictionary of printer information """ return self.printers def _discover_printers(self) -> None: """Discover 3D printers on the network using various protocols.""" # This is a simplified implementation that simulates printer discovery # In a real implementation, you would use protocols like mDNS, SNMP, or OctoPrint API # Simulate discovering printers while not self.discovery_stop_event.is_set(): try: # Simulate network discovery self._discover_octoprint_printers() self._discover_prusa_printers() self._discover_ultimaker_printers() # Wait before next discovery cycle time.sleep(10) except Exception as e: logger.error(f"Error in printer discovery: {str(e)}") time.sleep(5) def _discover_octoprint_printers(self) -> None: """Discover OctoPrint servers on the network.""" # Simulate discovering OctoPrint servers # In a real implementation, you would use mDNS to discover OctoPrint instances # Simulate finding a printer printer_id = "octoprint_1" if printer_id not in self.printers: printer_info = { "id": printer_id, "name": "OctoPrint Printer", "type": "octoprint", "address": "192.168.1.100", "port": 80, "api_key": None, # Would need to be provided by user "status": "online", "capabilities": ["print", "status", "cancel"] } self.printers[printer_id] = printer_info if self.discovery_callback: self.discovery_callback(printer_info) logger.info(f"Discovered OctoPrint printer: {printer_info['name']}") def _discover_prusa_printers(self) -> None: """Discover Prusa printers on the network.""" # Simulate discovering Prusa printers # Simulate finding a printer printer_id = "prusa_1" if printer_id not in self.printers: printer_info = { "id": printer_id, "name": "Prusa MK3S", "type": "prusa", "address": "192.168.1.101", "port": 80, "status": "online", "capabilities": ["print", "status"] } self.printers[printer_id] = printer_info if self.discovery_callback: self.discovery_callback(printer_info) logger.info(f"Discovered Prusa printer: {printer_info['name']}") def _discover_ultimaker_printers(self) -> None: """Discover Ultimaker printers on the network.""" # Simulate discovering Ultimaker printers # Simulate finding a printer printer_id = "ultimaker_1" if printer_id not in self.printers: printer_info = { "id": printer_id, "name": "Ultimaker S5", "type": "ultimaker", "address": "192.168.1.102", "port": 80, "status": "online", "capabilities": ["print", "status", "cancel"] } self.printers[printer_id] = printer_info if self.discovery_callback: self.discovery_callback(printer_info) logger.info(f"Discovered Ultimaker printer: {printer_info['name']}") class PrinterInterface: """ Interface for communicating with 3D printers. """ def __init__(self, printer_discovery: PrinterDiscovery): """ Initialize the printer interface. Args: printer_discovery: Instance of PrinterDiscovery for finding printers """ self.printer_discovery = printer_discovery self.connected_printers = {} # Dictionary of connected printers def connect_to_printer(self, printer_id: str, credentials: Optional[Dict[str, Any]] = None) -> bool: """ Connect to a specific printer. Args: printer_id: ID of the printer to connect to credentials: Optional credentials for authentication Returns: True if connection successful, False otherwise """ printers = self.printer_discovery.get_printers() if printer_id not in printers: logger.error(f"Printer not found: {printer_id}") return False printer_info = printers[printer_id] # Create appropriate printer client based on type if printer_info["type"] == "octoprint": client = OctoPrintClient(printer_info, credentials) elif printer_info["type"] == "prusa": client = PrusaClient(printer_info, credentials) elif printer_info["type"] == "ultimaker": client = UltimakerClient(printer_info, credentials) else: logger.error(f"Unsupported printer type: {printer_info['type']}") return False # Connect to the printer if client.connect(): self.connected_printers[printer_id] = client return True else: return False def disconnect_from_printer(self, printer_id: str) -> bool: """ Disconnect from a specific printer. Args: printer_id: ID of the printer to disconnect from Returns: True if disconnection successful, False otherwise """ if printer_id not in self.connected_printers: logger.error(f"Not connected to printer: {printer_id}") return False client = self.connected_printers[printer_id] if client.disconnect(): del self.connected_printers[printer_id] return True else: return False def print_file(self, printer_id: str, file_path: str, print_settings: Optional[Dict[str, Any]] = None) -> bool: """ Send a file to a printer for printing. Args: printer_id: ID of the printer to print on file_path: Path to the STL file to print print_settings: Optional print settings Returns: True if print job started successfully, False otherwise """ if printer_id not in self.connected_printers: logger.error(f"Not connected to printer: {printer_id}") return False client = self.connected_printers[printer_id] return client.print_file(file_path, print_settings) def get_printer_status(self, printer_id: str) -> Dict[str, Any]: """ Get the status of a specific printer. Args: printer_id: ID of the printer to get status for Returns: Dictionary with printer status information """ if printer_id not in self.connected_printers: logger.error(f"Not connected to printer: {printer_id}") return {"error": "Not connected to printer"} client = self.connected_printers[printer_id] return client.get_status() def cancel_print(self, printer_id: str) -> bool: """ Cancel a print job on a specific printer. Args: printer_id: ID of the printer to cancel print on Returns: True if cancellation successful, False otherwise """ if printer_id not in self.connected_printers: logger.error(f"Not connected to printer: {printer_id}") return False client = self.connected_printers[printer_id] return client.cancel_print() class PrinterClient: """Base class for printer clients.""" def __init__(self, printer_info: Dict[str, Any], credentials: Optional[Dict[str, Any]] = None): """ Initialize the printer client. Args: printer_info: Information about the printer credentials: Optional credentials for authentication """ self.printer_info = printer_info self.credentials = credentials or {} self.connected = False def connect(self) -> bool: """ Connect to the printer. Returns: True if connection successful, False otherwise """ # Base implementation - should be overridden by subclasses self.connected = True return True def disconnect(self) -> bool: """ Disconnect from the printer. Returns: True if disconnection successful, False otherwise """ # Base implementation - should be overridden by subclasses self.connected = False return True def print_file(self, file_path: str, print_settings: Optional[Dict[str, Any]] = None) -> bool: """ Send a file to the printer for printing. Args: file_path: Path to the STL file to print print_settings: Optional print settings Returns: True if print job started successfully, False otherwise """ # Base implementation - should be overridden by subclasses if not self.connected: logger.error("Not connected to printer") return False logger.info(f"Printing file: {file_path}") return True def get_status(self) -> Dict[str, Any]: """ Get the status of the printer. Returns: Dictionary with printer status information """ # Base implementation - should be overridden by subclasses if not self.connected: return {"status": "disconnected"} return {"status": "connected"} def cancel_print(self) -> bool: """ Cancel the current print job. Returns: True if cancellation successful, False otherwise """ # Base implementation - should be overridden by subclasses if not self.connected: logger.error("Not connected to printer") return False logger.info("Cancelling print job") return True class OctoPrintClient(PrinterClient): """Client for OctoPrint printers.""" def connect(self) -> bool: """Connect to an OctoPrint server.""" try: # In a real implementation, you would use the OctoPrint API # to connect to the printer # Check if API key is provided if "api_key" not in self.credentials: logger.error("API key required for OctoPrint") return False # Simulate connection logger.info(f"Connected to OctoPrint server: {self.printer_info['address']}") self.connected = True return True except Exception as e: logger.error(f"Error connecting to OctoPrint server: {str(e)}") return False def print_file(self, file_path: str, print_settings: Optional[Dict[str, Any]] = None) -> bool: """Send a file to an OctoPrint server for printing.""" if not self.connected: logger.error("Not connected to OctoPrint server") return False try: # In a real implementation, you would use the OctoPrint API # to upload the file and start printing # Check if file exists if not os.path.exists(file_path): logger.error(f"File not found: {file_path}") return False # Simulate printing logger.info(f"Printing file on OctoPrint server: {file_path}") return True except Exception as e: logger.error(f"Error printing file on OctoPrint server: {str(e)}") return False def get_status(self) -> Dict[str, Any]: """Get the status of an OctoPrint server.""" if not self.connected: return {"status": "disconnected"} try: # In a real implementation, you would use the OctoPrint API # to get the printer status # Simulate status return { "status": "connected", "printer": { "state": "operational", "temperature": { "bed": {"actual": 60.0, "target": 60.0}, "tool0": {"actual": 210.0, "target": 210.0} } }, "job": { "file": {"name": "example.gcode"}, "progress": {"completion": 0.0, "printTime": 0, "printTimeLeft": 0} } } except Exception as e: logger.error(f"Error getting status from OctoPrint server: {str(e)}") return {"status": "error", "message": str(e)} class PrusaClient(PrinterClient): """Client for Prusa printers.""" def connect(self) -> bool: """Connect to a Prusa printer.""" try: # In a real implementation, you would use the Prusa API # to connect to the printer # Simulate connection logger.info(f"Connected to Prusa printer: {self.printer_info['address']}") self.connected = True return True except Exception as e: logger.error(f"Error connecting to Prusa printer: {str(e)}") return False class UltimakerClient(PrinterClient): """Client for Ultimaker printers.""" def connect(self) -> bool: """Connect to an Ultimaker printer.""" try: # In a real implementation, you would use the Ultimaker API # to connect to the printer # Simulate connection logger.info(f"Connected to Ultimaker printer: {self.printer_info['address']}") self.connected = True return True except Exception as e: logger.error(f"Error connecting to Ultimaker printer: {str(e)}") return False

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jhacksman/OpenSCAD-MCP-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server