printer_discovery.py•16.5 kB
import os
import logging
import socket
import json
import time
import threading
from typing import Dict, List, Any, Optional, Callable
logger = logging.getLogger(__name__)
class PrinterDiscovery:
"""
Discovers 3D printers on the network and provides interfaces for direct printing.
"""
def __init__(self):
"""Initialize the printer discovery service."""
self.printers = {} # Dictionary of discovered printers
self.discovery_thread = None
self.discovery_stop_event = threading.Event()
self.discovery_callback = None
def start_discovery(self, callback: Optional[Callable[[Dict[str, Any]], None]] = None) -> None:
"""
Start discovering 3D printers on the network.
Args:
callback: Optional callback function to call when a printer is discovered
"""
if self.discovery_thread and self.discovery_thread.is_alive():
logger.warning("Printer discovery already running")
return
self.discovery_callback = callback
self.discovery_stop_event.clear()
self.discovery_thread = threading.Thread(target=self._discover_printers)
self.discovery_thread.daemon = True
self.discovery_thread.start()
logger.info("Started printer discovery")
def stop_discovery(self) -> None:
"""Stop discovering 3D printers."""
if self.discovery_thread and self.discovery_thread.is_alive():
self.discovery_stop_event.set()
self.discovery_thread.join(timeout=2.0)
logger.info("Stopped printer discovery")
else:
logger.warning("Printer discovery not running")
def get_printers(self) -> Dict[str, Any]:
"""
Get the list of discovered printers.
Returns:
Dictionary of printer information
"""
return self.printers
def _discover_printers(self) -> None:
"""Discover 3D printers on the network using various protocols."""
# This is a simplified implementation that simulates printer discovery
# In a real implementation, you would use protocols like mDNS, SNMP, or OctoPrint API
# Simulate discovering printers
while not self.discovery_stop_event.is_set():
try:
# Simulate network discovery
self._discover_octoprint_printers()
self._discover_prusa_printers()
self._discover_ultimaker_printers()
# Wait before next discovery cycle
time.sleep(10)
except Exception as e:
logger.error(f"Error in printer discovery: {str(e)}")
time.sleep(5)
def _discover_octoprint_printers(self) -> None:
"""Discover OctoPrint servers on the network."""
# Simulate discovering OctoPrint servers
# In a real implementation, you would use mDNS to discover OctoPrint instances
# Simulate finding a printer
printer_id = "octoprint_1"
if printer_id not in self.printers:
printer_info = {
"id": printer_id,
"name": "OctoPrint Printer",
"type": "octoprint",
"address": "192.168.1.100",
"port": 80,
"api_key": None, # Would need to be provided by user
"status": "online",
"capabilities": ["print", "status", "cancel"]
}
self.printers[printer_id] = printer_info
if self.discovery_callback:
self.discovery_callback(printer_info)
logger.info(f"Discovered OctoPrint printer: {printer_info['name']}")
def _discover_prusa_printers(self) -> None:
"""Discover Prusa printers on the network."""
# Simulate discovering Prusa printers
# Simulate finding a printer
printer_id = "prusa_1"
if printer_id not in self.printers:
printer_info = {
"id": printer_id,
"name": "Prusa MK3S",
"type": "prusa",
"address": "192.168.1.101",
"port": 80,
"status": "online",
"capabilities": ["print", "status"]
}
self.printers[printer_id] = printer_info
if self.discovery_callback:
self.discovery_callback(printer_info)
logger.info(f"Discovered Prusa printer: {printer_info['name']}")
def _discover_ultimaker_printers(self) -> None:
"""Discover Ultimaker printers on the network."""
# Simulate discovering Ultimaker printers
# Simulate finding a printer
printer_id = "ultimaker_1"
if printer_id not in self.printers:
printer_info = {
"id": printer_id,
"name": "Ultimaker S5",
"type": "ultimaker",
"address": "192.168.1.102",
"port": 80,
"status": "online",
"capabilities": ["print", "status", "cancel"]
}
self.printers[printer_id] = printer_info
if self.discovery_callback:
self.discovery_callback(printer_info)
logger.info(f"Discovered Ultimaker printer: {printer_info['name']}")
class PrinterInterface:
"""
Interface for communicating with 3D printers.
"""
def __init__(self, printer_discovery: PrinterDiscovery):
"""
Initialize the printer interface.
Args:
printer_discovery: Instance of PrinterDiscovery for finding printers
"""
self.printer_discovery = printer_discovery
self.connected_printers = {} # Dictionary of connected printers
def connect_to_printer(self, printer_id: str, credentials: Optional[Dict[str, Any]] = None) -> bool:
"""
Connect to a specific printer.
Args:
printer_id: ID of the printer to connect to
credentials: Optional credentials for authentication
Returns:
True if connection successful, False otherwise
"""
printers = self.printer_discovery.get_printers()
if printer_id not in printers:
logger.error(f"Printer not found: {printer_id}")
return False
printer_info = printers[printer_id]
# Create appropriate printer client based on type
if printer_info["type"] == "octoprint":
client = OctoPrintClient(printer_info, credentials)
elif printer_info["type"] == "prusa":
client = PrusaClient(printer_info, credentials)
elif printer_info["type"] == "ultimaker":
client = UltimakerClient(printer_info, credentials)
else:
logger.error(f"Unsupported printer type: {printer_info['type']}")
return False
# Connect to the printer
if client.connect():
self.connected_printers[printer_id] = client
return True
else:
return False
def disconnect_from_printer(self, printer_id: str) -> bool:
"""
Disconnect from a specific printer.
Args:
printer_id: ID of the printer to disconnect from
Returns:
True if disconnection successful, False otherwise
"""
if printer_id not in self.connected_printers:
logger.error(f"Not connected to printer: {printer_id}")
return False
client = self.connected_printers[printer_id]
if client.disconnect():
del self.connected_printers[printer_id]
return True
else:
return False
def print_file(self, printer_id: str, file_path: str, print_settings: Optional[Dict[str, Any]] = None) -> bool:
"""
Send a file to a printer for printing.
Args:
printer_id: ID of the printer to print on
file_path: Path to the STL file to print
print_settings: Optional print settings
Returns:
True if print job started successfully, False otherwise
"""
if printer_id not in self.connected_printers:
logger.error(f"Not connected to printer: {printer_id}")
return False
client = self.connected_printers[printer_id]
return client.print_file(file_path, print_settings)
def get_printer_status(self, printer_id: str) -> Dict[str, Any]:
"""
Get the status of a specific printer.
Args:
printer_id: ID of the printer to get status for
Returns:
Dictionary with printer status information
"""
if printer_id not in self.connected_printers:
logger.error(f"Not connected to printer: {printer_id}")
return {"error": "Not connected to printer"}
client = self.connected_printers[printer_id]
return client.get_status()
def cancel_print(self, printer_id: str) -> bool:
"""
Cancel a print job on a specific printer.
Args:
printer_id: ID of the printer to cancel print on
Returns:
True if cancellation successful, False otherwise
"""
if printer_id not in self.connected_printers:
logger.error(f"Not connected to printer: {printer_id}")
return False
client = self.connected_printers[printer_id]
return client.cancel_print()
class PrinterClient:
"""Base class for printer clients."""
def __init__(self, printer_info: Dict[str, Any], credentials: Optional[Dict[str, Any]] = None):
"""
Initialize the printer client.
Args:
printer_info: Information about the printer
credentials: Optional credentials for authentication
"""
self.printer_info = printer_info
self.credentials = credentials or {}
self.connected = False
def connect(self) -> bool:
"""
Connect to the printer.
Returns:
True if connection successful, False otherwise
"""
# Base implementation - should be overridden by subclasses
self.connected = True
return True
def disconnect(self) -> bool:
"""
Disconnect from the printer.
Returns:
True if disconnection successful, False otherwise
"""
# Base implementation - should be overridden by subclasses
self.connected = False
return True
def print_file(self, file_path: str, print_settings: Optional[Dict[str, Any]] = None) -> bool:
"""
Send a file to the printer for printing.
Args:
file_path: Path to the STL file to print
print_settings: Optional print settings
Returns:
True if print job started successfully, False otherwise
"""
# Base implementation - should be overridden by subclasses
if not self.connected:
logger.error("Not connected to printer")
return False
logger.info(f"Printing file: {file_path}")
return True
def get_status(self) -> Dict[str, Any]:
"""
Get the status of the printer.
Returns:
Dictionary with printer status information
"""
# Base implementation - should be overridden by subclasses
if not self.connected:
return {"status": "disconnected"}
return {"status": "connected"}
def cancel_print(self) -> bool:
"""
Cancel the current print job.
Returns:
True if cancellation successful, False otherwise
"""
# Base implementation - should be overridden by subclasses
if not self.connected:
logger.error("Not connected to printer")
return False
logger.info("Cancelling print job")
return True
class OctoPrintClient(PrinterClient):
"""Client for OctoPrint printers."""
def connect(self) -> bool:
"""Connect to an OctoPrint server."""
try:
# In a real implementation, you would use the OctoPrint API
# to connect to the printer
# Check if API key is provided
if "api_key" not in self.credentials:
logger.error("API key required for OctoPrint")
return False
# Simulate connection
logger.info(f"Connected to OctoPrint server: {self.printer_info['address']}")
self.connected = True
return True
except Exception as e:
logger.error(f"Error connecting to OctoPrint server: {str(e)}")
return False
def print_file(self, file_path: str, print_settings: Optional[Dict[str, Any]] = None) -> bool:
"""Send a file to an OctoPrint server for printing."""
if not self.connected:
logger.error("Not connected to OctoPrint server")
return False
try:
# In a real implementation, you would use the OctoPrint API
# to upload the file and start printing
# Check if file exists
if not os.path.exists(file_path):
logger.error(f"File not found: {file_path}")
return False
# Simulate printing
logger.info(f"Printing file on OctoPrint server: {file_path}")
return True
except Exception as e:
logger.error(f"Error printing file on OctoPrint server: {str(e)}")
return False
def get_status(self) -> Dict[str, Any]:
"""Get the status of an OctoPrint server."""
if not self.connected:
return {"status": "disconnected"}
try:
# In a real implementation, you would use the OctoPrint API
# to get the printer status
# Simulate status
return {
"status": "connected",
"printer": {
"state": "operational",
"temperature": {
"bed": {"actual": 60.0, "target": 60.0},
"tool0": {"actual": 210.0, "target": 210.0}
}
},
"job": {
"file": {"name": "example.gcode"},
"progress": {"completion": 0.0, "printTime": 0, "printTimeLeft": 0}
}
}
except Exception as e:
logger.error(f"Error getting status from OctoPrint server: {str(e)}")
return {"status": "error", "message": str(e)}
class PrusaClient(PrinterClient):
"""Client for Prusa printers."""
def connect(self) -> bool:
"""Connect to a Prusa printer."""
try:
# In a real implementation, you would use the Prusa API
# to connect to the printer
# Simulate connection
logger.info(f"Connected to Prusa printer: {self.printer_info['address']}")
self.connected = True
return True
except Exception as e:
logger.error(f"Error connecting to Prusa printer: {str(e)}")
return False
class UltimakerClient(PrinterClient):
"""Client for Ultimaker printers."""
def connect(self) -> bool:
"""Connect to an Ultimaker printer."""
try:
# In a real implementation, you would use the Ultimaker API
# to connect to the printer
# Simulate connection
logger.info(f"Connected to Ultimaker printer: {self.printer_info['address']}")
self.connected = True
return True
except Exception as e:
logger.error(f"Error connecting to Ultimaker printer: {str(e)}")
return False