Skip to main content
Glama
scanner_manager.py8.24 kB
""" Scanner Manager: Unified interface for multiple scanner backends Orchestrates WIA, TWAIN, and other scanner control systems with a unified API. """ import logging from typing import Dict, Any, Optional, List from pathlib import Path from .wia_scanner import WIABackend, ScannerInfo, ScanSettings, ScannerProperties logger = logging.getLogger(__name__) class ScannerManager: """ Unified scanner management across multiple backends. Provides a consistent interface for scanner discovery, configuration, and control regardless of the underlying scanner API used. """ def __init__(self): self.backends = { "wia": WIABackend(), # Future: "twain": TWAINBackend(), # Future: "sane": SANEBackend(), } self._discovered_scanners = {} self._last_discovery = None def is_available(self) -> bool: """Check if any scanner backend is available.""" return any(backend.is_available() for backend in self.backends.values()) def discover_scanners(self, force_refresh: bool = False) -> List[ScannerInfo]: """ Discover all connected scanners across all backends. Args: force_refresh: Force rediscovery even if cached Returns: List of all discovered scanners """ if not force_refresh and self._last_discovery and self._discovered_scanners: # Return cached results if recent enough return list(self._discovered_scanners.values()) all_scanners = [] self._discovered_scanners = {} for backend_name, backend in self.backends.items(): if backend.is_available(): try: scanners = backend.discover_scanners() for scanner in scanners: # Prefix device_id with backend name to avoid conflicts unique_id = f"{backend_name}:{scanner.device_id}" scanner.device_id = unique_id self._discovered_scanners[unique_id] = scanner all_scanners.append(scanner) logger.info(f"{backend_name.upper()} backend found {len(scanners)} scanners") except Exception as e: logger.error(f"Error discovering scanners with {backend_name}: {e}") self._last_discovery = all_scanners logger.info(f"Total scanners discovered: {len(all_scanners)}") return all_scanners def get_scanner_info(self, device_id: str) -> Optional[ScannerInfo]: """ Get information about a specific scanner. Args: device_id: Scanner device ID (with backend prefix) Returns: ScannerInfo object or None if not found """ # Ensure we have discovered scanners if not self._discovered_scanners: self.discover_scanners() return self._discovered_scanners.get(device_id) def get_scanner_properties(self, device_id: str) -> Optional[ScannerProperties]: """ Get detailed properties for a scanner. Args: device_id: Scanner device ID (with backend prefix) Returns: ScannerProperties object or None if not found """ backend_name, actual_device_id = self._parse_device_id(device_id) backend = self.backends.get(backend_name) if not backend or not backend.is_available(): return None try: # Remove backend prefix for backend-specific call return backend.get_scanner_properties(actual_device_id) except Exception as e: logger.error(f"Failed to get properties for scanner {device_id}: {e}") return None def configure_scan(self, device_id: str, settings: Dict[str, Any]) -> bool: """ Configure scan settings for a scanner. Args: device_id: Scanner device ID (with backend prefix) settings: Dictionary of scan settings Returns: True if configuration successful """ backend_name, actual_device_id = self._parse_device_id(device_id) backend = self.backends.get(backend_name) if not backend or not backend.is_available(): return False try: # Convert dict to ScanSettings object scan_settings = ScanSettings(**settings) return backend.configure_scan(actual_device_id, scan_settings) except Exception as e: logger.error(f"Failed to configure scanner {device_id}: {e}") return False def scan_document(self, device_id: str, settings: Dict[str, Any]) -> Optional[Any]: """ Perform a document scan. Args: device_id: Scanner device ID (with backend prefix) settings: Dictionary of scan settings Returns: PIL Image object if successful, None otherwise """ backend_name, actual_device_id = self._parse_device_id(device_id) backend = self.backends.get(backend_name) if not backend or not backend.is_available(): return None try: # Convert dict to ScanSettings object scan_settings = ScanSettings(**settings) return backend.scan_document(actual_device_id, scan_settings) except Exception as e: logger.error(f"Failed to scan document on {device_id}: {e}") return None async def scan_batch( self, device_id: str, settings: Dict[str, Any], count: int = 10, auto_process: bool = True ) -> List[Any]: """ Perform batch scanning of multiple documents. Args: device_id: Scanner device ID (with backend prefix) settings: Dictionary of scan settings count: Maximum number of documents to scan auto_process: Whether to automatically process each scan Returns: List of PIL Image objects """ backend_name, actual_device_id = self._parse_device_id(device_id) backend = self.backends.get(backend_name) if not backend or not backend.is_available(): return [] images = [] try: for i in range(count): logger.info(f"Scanning document {i + 1}/{count}") # Configure scanner for batch mode batch_settings = settings.copy() batch_settings["use_adf"] = True # Enable ADF for batch scanning image = self.scan_document(device_id, batch_settings) if image: images.append(image) logger.info(f"Document {i + 1} scanned successfully") else: logger.warning(f"Failed to scan document {i + 1}") break # Stop on first failure except Exception as e: logger.error(f"Batch scanning failed on {device_id}: {e}") logger.info(f"Batch scanning completed: {len(images)} documents scanned") return images def _parse_device_id(self, device_id: str) -> tuple[str, str]: """ Parse device ID to extract backend name and actual device ID. Args: device_id: Full device ID with backend prefix (e.g., "wia:device123") Returns: Tuple of (backend_name, actual_device_id) """ if ":" in device_id: backend_name, actual_device_id = device_id.split(":", 1) return backend_name, actual_device_id else: # Assume WIA if no prefix (backward compatibility) return "wia", device_id def get_available_backends(self) -> List[str]: """Get list of available scanner backends.""" return [name for name, backend in self.backends.items() if backend.is_available()] def get_backend_status(self) -> Dict[str, bool]: """Get status of all scanner backends.""" return {name: backend.is_available() for name, backend in self.backends.items()} # Global scanner manager instance scanner_manager = ScannerManager()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/sandraschi/ocr-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server