scan_network
Scan your local network to discover WeMo smart home devices by probing IP addresses and verifying device responses for management and control.
Instructions
Scan network for WeMo devices using pywemo discovery.
This tool scans the specified subnet for WeMo devices by:
Probing all IPs in the subnet on common WeMo ports (49152-49155)
Verifying responsive IPs by attempting to read device descriptions
Using pywemo library to properly identify and parse WeMo devices
Args:
subnet: Network subnet in CIDR notation (default: from config or "192.168.1.0/24")
timeout: Connection timeout in seconds for port probing (default: from config or 0.6)
max_workers: Maximum concurrent workers for network scanning (default: from config or 60)
ctx: MCP context injected by FastMCP; used to elicit the subnet when none is configuredReturns:
Dictionary containing:
- scan_parameters: The parameters used for scanning
- results: Summary with device counts
- devices: List of discovered WeMo devices with full detailsInput Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| subnet | No | ||
| timeout | No | ||
| max_workers | No |
Implementation Reference
- src/wemo_mcp_server/server.py:372-514 (handler)The `scan_network` MCP tool handler function is defined here, using the `WeMoScanner.scan_subnet` method to perform network discovery.
async def scan_network( subnet: str | None = None, timeout: float | None = None, max_workers: int | None = None, ctx: Context | None = None, ) -> dict[str, Any]: """Scan network for WeMo devices using pywemo discovery. This tool scans the specified subnet for WeMo devices by: 1. Probing all IPs in the subnet on common WeMo ports (49152-49155) 2. Verifying responsive IPs by attempting to read device descriptions 3. Using pywemo library to properly identify and parse WeMo devices Args: ---- subnet: Network subnet in CIDR notation (default: from config or "192.168.1.0/24") timeout: Connection timeout in seconds for port probing (default: from config or 0.6) max_workers: Maximum concurrent workers for network scanning (default: from config or 60) ctx: MCP context injected by FastMCP; used to elicit the subnet when none is configured Returns: ------- Dictionary containing: - scan_parameters: The parameters used for scanning - results: Summary with device counts - devices: List of discovered WeMo devices with full details """ # Get configuration config = get_config() # Use config defaults if not provided if subnet is None: config_subnet = config.get("network", "default_subnet", DEFAULT_SUBNET) if config_subnet == DEFAULT_SUBNET and ctx is not None: # No custom subnet configured — ask the user rather than scanning the wrong network elicit_result = await ctx.elicit( "No subnet is configured. Which subnet should I scan? " "(e.g. 192.168.1.0/24 — check your router for your local network range)", schema=_SubnetChoiceSchema, ) if elicit_result.action == "accept" and elicit_result.data: subnet = elicit_result.data.subnet elif elicit_result.action != "accept": return {"error": "Scan cancelled by user", "scan_completed": False} else: subnet = config_subnet else: subnet = config_subnet if timeout is None: timeout = config.get("network", "scan_timeout", 0.6) if max_workers is None: max_workers = config.get("network", "max_workers", 60) # Validate inputs try: params = ScanNetworkParams(subnet=subnet, timeout=timeout, max_workers=max_workers) except ValidationError as e: return { "error": ERR_INVALID_PARAMS, "validation_errors": [ {"field": err["loc"][0], "message": err["msg"], "input": err["input"]} for err in e.errors() ], "scan_completed": False, } start_time = time.time() try: logger.info(f"Starting WeMo network scan for subnet: {params.subnet}") # Create scanner instance scanner = WeMoScanner() scanner.timeout = params.timeout # Run the synchronous scan in a thread pool to keep async interface loop = asyncio.get_event_loop() devices = await loop.run_in_executor( None, scanner.scan_subnet, params.subnet, params.max_workers, ) # Extract device information device_list = [] for device in devices: device_info = extract_device_info(device) device_list.append(device_info) # Cache the device object for later control operations _device_cache[device.name] = device if hasattr(device, "host"): _device_cache[device.host] = device elapsed_time = time.time() - start_time # Save device info to persistent cache device_cache_data = {} for device in devices: device_cache_data[device.name] = serialize_device(device) if hasattr(device, "host"): device_cache_data[device.host] = serialize_device(device) cache_saved = _cache_manager.save(device_cache_data) scan_result = { "scan_parameters": { "subnet": params.subnet, "timeout": params.timeout, "max_workers": params.max_workers, }, "results": { "total_devices_found": len(device_list), "wemo_devices": len(device_list), "scan_duration_seconds": round(elapsed_time, 2), }, "devices": device_list, "scan_completed": True, "cache_saved": cache_saved, "timestamp": time.time(), } logger.info( f"Scan completed in {elapsed_time:.2f}s. Found {len(device_list)} WeMo devices. " f"Cache {'saved' if cache_saved else 'save failed'}.", ) return scan_result except Exception as e: logger.error(f"Network scan error: {e!s}", exc_info=True) error_response = build_error_response( e, "Network scan", context={ "subnet": params.subnet, "timeout": params.timeout, "max_workers": params.max_workers, }, ) error_response["scan_completed"] = False return error_response - src/wemo_mcp_server/server.py:295-330 (handler)The actual scanning logic is implemented in the `WeMoScanner.scan_subnet` method.
def scan_subnet(self, target_cidr: str, max_workers: int = 60) -> list[Any]: """Scan a subnet for WeMo devices. Uses UPnP/SSDP discovery FIRST (primary method), then port scanning as backup. This matches the proven approach from wemo-ops-center UI. Args: ---- target_cidr: CIDR notation subnet (e.g., "192.168.1.0/24") max_workers: Maximum concurrent workers for scanning Returns: ------- List of discovered pywemo device objects """ # Phase 1: UPnP/SSDP Discovery (PRIMARY) found_devices = self._run_upnp_discovery() # Parse and validate CIDR network all_hosts = self._parse_cidr_network(target_cidr) if all_hosts is None: return found_devices # Phase 2: Probe for active IPs active_ips = self._probe_active_ips(all_hosts, max_workers) # Phase 3: Verify active IPs (only check IPs not already found via UPnP) found_ips = {d.host for d in found_devices if hasattr(d, "host")} active_ips_to_check = [ip for ip in active_ips if ip not in found_ips] verified_devices = self._verify_wemo_devices(active_ips_to_check, max_workers) found_devices.extend(verified_devices) logger.info(f"Scan complete. Found {len(found_devices)} WeMo devices total") return found_devices