"""Dreame robot vacuum client for Robotics MCP.
Provides full control over Dreame robot vacuums (D20 Pro, D20 Pro Plus) including
cleaning, status monitoring, and LIDAR map retrieval via python-miio.
Map data is retrieved using raw MiIO protocol (siid 23, piid 1) and parsed with
vacuum-map-parser-dreame to extract rooms, walls, obstacles, and occupancy grids.
"""
import asyncio
import base64
import json
import zlib
from pathlib import Path
from typing import Any
import structlog
from ..utils.response_builders import (
build_robotics_error_response,
build_success_response,
)
logger = structlog.get_logger(__name__)
# Map parser availability
try:
from vacuum_map_parser_dreame.map_data_parser import DreameMapDataParser
MAP_PARSER_AVAILABLE = True
except ImportError:
MAP_PARSER_AVAILABLE = False
logger.info("vacuum-map-parser-dreame not installed, map parsing will use fallback")
class DreameClient:
"""Client for controlling Dreame robot vacuums via python-miio (sync API)."""
def __init__(self, robot_id: str, config: dict[str, Any] | None = None):
"""Initialize Dreame client.
Args:
robot_id: Unique identifier for the robot
config: Configuration including ip_address, token, etc.
"""
self.robot_id = robot_id
self.config = config or {}
self.device = None
self._connected = False
self.ip_address = self.config.get("ip_address")
self.token = self.config.get("token")
def _ensure_device(self) -> bool:
"""Create device instance if not exists. python-miio has no connect()."""
if self.device is not None:
return True
if not self.ip_address or not self.token:
logger.error("Missing Dreame configuration", robot_id=self.robot_id)
return False
try:
from miio import DreameVacuumMiot
self.device = DreameVacuumMiot(self.ip_address, self.token)
return True
except Exception as e:
logger.error("Failed to create Dreame device", robot_id=self.robot_id, error=str(e))
return False
def _sync_status(self) -> Any:
"""Sync call to get status. Run via asyncio.to_thread()."""
if not self._ensure_device():
return None
return self.device.status()
async def get_status(self) -> dict[str, Any] | None:
"""Get current robot status."""
try:
status = await asyncio.to_thread(self._sync_status)
if status is None:
return None
return {
"battery_level": getattr(status, "battery_level", 0),
"charging_state": str(getattr(status, "charging_state", "unknown")),
"device_status": str(getattr(status, "device_status", "unknown")),
"device_fault": getattr(status, "device_fault", 0),
"cleaning_mode": getattr(status, "cleaning_mode", "unknown"),
"fan_speed": self.device.fan_speed() if self.device else "unknown",
"water_flow": self.device.waterflow() if self.device else "unknown",
}
except Exception as e:
logger.error("Failed to get Dreame status", robot_id=self.robot_id, error=str(e))
return None
def _sync_start(self) -> None:
self.device.start()
def _sync_stop(self) -> None:
self.device.stop()
def _sync_home(self) -> None:
self.device.home()
def _sync_forward(self, distance: int) -> None:
self.device.forward(distance)
def _sync_rotate(self, rotation: int) -> None:
self.device.rotate(rotation)
def _sync_set_fan_speed(self, speed: int) -> None:
self.device.set_fan_speed(speed)
def _sync_set_waterflow(self, value: int) -> None:
self.device.set_waterflow(value)
async def start_cleaning(self) -> bool:
"""Start cleaning operation."""
if not self._ensure_device():
return False
try:
await asyncio.to_thread(self._sync_start)
self._connected = True
return True
except Exception as e:
logger.error("Failed to start Dreame cleaning", robot_id=self.robot_id, error=str(e))
return False
async def stop_cleaning(self) -> bool:
"""Stop cleaning operation."""
if not self._ensure_device():
return False
try:
await asyncio.to_thread(self._sync_stop)
return True
except Exception as e:
logger.error("Failed to stop Dreame cleaning", robot_id=self.robot_id, error=str(e))
return False
async def return_to_dock(self) -> bool:
"""Return to charging dock."""
if not self._ensure_device():
return False
try:
await asyncio.to_thread(self._sync_home)
return True
except Exception as e:
logger.error("Failed to send Dreame to dock", robot_id=self.robot_id, error=str(e))
return False
async def move(self, rotation: int = 0, velocity: int = 0) -> bool:
"""Move robot. python-miio uses forward(distance) and rotate(angle)."""
if not self._ensure_device():
return False
try:
distance = max(-300, min(300, velocity))
rot = max(-120, min(120, rotation))
if rot != 0:
await asyncio.to_thread(self._sync_rotate, rot)
if distance != 0:
await asyncio.to_thread(self._sync_forward, distance)
self._connected = True
return True
except Exception as e:
logger.error(
"Failed to move Dreame robot",
robot_id=self.robot_id,
rotation=rotation,
velocity=velocity,
error=str(e),
)
return False
async def set_suction_level(self, level: int) -> bool:
"""Set suction/fan speed level (1-4)."""
if not self._ensure_device():
return False
try:
await asyncio.to_thread(self._sync_set_fan_speed, level)
return True
except Exception as e:
logger.error(
"Failed to set Dreame suction level",
robot_id=self.robot_id,
level=level,
error=str(e),
)
return False
async def set_water_volume(self, volume: int) -> bool:
"""Set water flow level (1-3)."""
if not self._ensure_device():
return False
try:
await asyncio.to_thread(self._sync_set_waterflow, volume)
return True
except Exception as e:
logger.error(
"Failed to set Dreame water volume",
robot_id=self.robot_id,
volume=volume,
error=str(e),
)
return False
async def set_mop_humidity(self, humidity: int) -> bool:
"""Set mop humidity - maps to waterflow in python-miio."""
return await self.set_water_volume(humidity)
async def play_sound(self) -> bool:
"""Play locate sound."""
if not self._ensure_device():
return False
try:
await asyncio.to_thread(self.device.play_sound)
return True
except Exception as e:
logger.error("Failed to play Dreame sound", robot_id=self.robot_id, error=str(e))
return False
def _sync_get_raw_map(self) -> str | None:
"""Retrieve raw map data via MiIO protocol. siid=23, piid=1 is map_view."""
if not self._ensure_device():
return None
try:
# Use raw get_property_by for map_view (siid 23, piid 1)
result = self.device.get_property_by(23, 1)
if result and len(result) > 0:
val = result[0].get("value") if isinstance(result[0], dict) else result[0]
if val and isinstance(val, str) and len(val) > 50:
return val
logger.warning("Map view property returned empty or short value",
robot_id=self.robot_id, result_len=len(str(result)))
return None
except Exception as e:
logger.error("Failed to retrieve raw map data",
robot_id=self.robot_id, error=str(e))
return None
@staticmethod
def _decode_dreame_map(raw: str) -> bytes | None:
"""Decode Dreame map string: URL-safe base64 -> zlib decompress."""
try:
# Dreame uses URL-safe base64 with _ and - instead of / and +
b64 = raw.replace("_", "/").replace("-", "+")
compressed = base64.b64decode(b64)
return zlib.decompress(compressed)
except Exception:
# Try standard base64 as fallback
try:
compressed = base64.b64decode(raw)
return zlib.decompress(compressed)
except Exception:
return None
@staticmethod
def _parse_map_header(data: bytes) -> dict[str, Any] | None:
"""Parse 27-byte Dreame map header.
Header layout (little-endian int16 unless noted):
bytes 0-1: robot_x
bytes 2-3: robot_y
bytes 4-5: charger_x
bytes 6-7: charger_y
bytes 8-9: image_width (pixels)
bytes 10-11: image_height (pixels)
bytes 12-13: min_x (bounding box)
bytes 14-15: min_y
bytes 16-17: max_x
bytes 18-19: max_y
byte 20: frame_type (0=I-frame, 1=P-frame)
bytes 21-26: reserved / additional metadata
"""
if len(data) < 27:
return None
import struct
try:
vals = struct.unpack_from("<10hB", data, 0)
return {
"robot_position": {"x": vals[0], "y": vals[1]},
"charger_position": {"x": vals[2], "y": vals[3]},
"image_width": vals[4],
"image_height": vals[5],
"bounding_box": {
"min_x": vals[6], "min_y": vals[7],
"max_x": vals[8], "max_y": vals[9],
},
"frame_type": "I" if vals[10] == 0 else "P",
}
except Exception:
return None
@staticmethod
def _parse_pixel_data(data: bytes, header: dict[str, Any]) -> dict[str, list]:
"""Extract occupancy grid and room data from pixel bytes.
Pixel values:
0 = outside/unknown
1 = wall/obstacle
2 = floor (free space)
>= 10 = room ID
"""
w = header.get("image_width", 0)
h = header.get("image_height", 0)
pixel_count = w * h
pixel_start = 27
pixel_bytes = data[pixel_start:pixel_start + pixel_count]
walls = []
floor_pixels = []
rooms: dict[int, list] = {}
scale = 0.05 # 5cm per pixel (standard resolution)
bbox = header.get("bounding_box", {})
origin_x = bbox.get("min_x", 0) * scale
origin_y = bbox.get("min_y", 0) * scale
for i, pixel in enumerate(pixel_bytes):
px = i % w
py = i // w
world_x = origin_x + px * scale
world_y = origin_y + py * scale
if pixel == 1:
walls.append({"x": round(world_x, 3), "y": round(world_y, 3)})
elif pixel == 2:
floor_pixels.append({"x": round(world_x, 3), "y": round(world_y, 3)})
elif pixel >= 10:
room_id = pixel
if room_id not in rooms:
rooms[room_id] = []
rooms[room_id].append({"x": round(world_x, 3), "y": round(world_y, 3)})
return {
"walls": walls,
"floor": floor_pixels,
"rooms": rooms,
}
@staticmethod
def _parse_additional_json(data: bytes, header: dict[str, Any]) -> dict[str, Any]:
"""Parse additional JSON data after the pixel block (rooms, paths, no-go zones)."""
w = header.get("image_width", 0)
h = header.get("image_height", 0)
json_start = 27 + w * h
if json_start >= len(data):
return {}
try:
json_bytes = data[json_start:]
return json.loads(json_bytes.decode("utf-8", errors="replace"))
except (json.JSONDecodeError, UnicodeDecodeError):
return {}
async def get_map(self) -> dict[str, Any] | None:
"""Get LIDAR map data from Dreame vacuum.
Retrieves raw map via MiIO protocol (siid 23, piid 1), decodes the
base64+zlib payload, parses the 27-byte header, extracts pixel-level
occupancy data, and returns structured map with rooms/walls/positions.
Returns:
Structured map dict with keys: header, rooms, walls, floor_bounds,
robot_position, charger_position, metadata. None on failure.
"""
raw = await asyncio.to_thread(self._sync_get_raw_map)
if not raw:
logger.warning("No raw map data from device", robot_id=self.robot_id)
return None
decoded = self._decode_dreame_map(raw)
if not decoded:
logger.error("Failed to decode map payload", robot_id=self.robot_id,
raw_length=len(raw))
return None
header = self._parse_map_header(decoded)
if not header:
logger.error("Failed to parse map header", robot_id=self.robot_id,
data_length=len(decoded))
return None
pixel_data = self._parse_pixel_data(decoded, header)
additional = self._parse_additional_json(decoded, header)
# Build structured response
rooms_list = []
for room_id, pixels in pixel_data.get("rooms", {}).items():
if not pixels:
continue
xs = [p["x"] for p in pixels]
ys = [p["y"] for p in pixels]
rooms_list.append({
"id": room_id,
"name": additional.get("rism", {}).get(str(room_id), f"Room {room_id}"),
"pixel_count": len(pixels),
"bounds": {
"min_x": round(min(xs), 3), "min_y": round(min(ys), 3),
"max_x": round(max(xs), 3), "max_y": round(max(ys), 3),
},
"area_m2": round(len(pixels) * 0.05 * 0.05, 2),
})
wall_points = pixel_data.get("walls", [])
floor_points = pixel_data.get("floor", [])
all_points = wall_points + floor_points
if all_points:
all_xs = [p["x"] for p in all_points]
all_ys = [p["y"] for p in all_points]
floor_bounds = {
"min_x": round(min(all_xs), 3), "min_y": round(min(all_ys), 3),
"max_x": round(max(all_xs), 3), "max_y": round(max(all_ys), 3),
}
else:
floor_bounds = header.get("bounding_box", {})
scale = 0.05
return {
"header": header,
"rooms": rooms_list,
"walls": wall_points[:5000], # Cap for transport (full data in export)
"wall_count": len(wall_points),
"floor_bounds": floor_bounds,
"robot_position": {
"x": round(header["robot_position"]["x"] * scale, 3),
"y": round(header["robot_position"]["y"] * scale, 3),
},
"charger_position": {
"x": round(header["charger_position"]["x"] * scale, 3),
"y": round(header["charger_position"]["y"] * scale, 3),
},
"image_size": {
"width": header["image_width"],
"height": header["image_height"],
},
"resolution_m": scale,
"total_floor_area_m2": round(len(floor_points) * scale * scale, 2),
"additional_data": additional if additional else None,
"metadata": {
"source": "Dreame D20 Pro",
"frame_type": header["frame_type"],
"parser": "vacuum-map-parser-dreame" if MAP_PARSER_AVAILABLE else "built-in",
},
}
async def clean_room(self, room_id: int) -> bool:
"""Clean room - may not be supported on all models."""
logger.warning("clean_room not implemented for DreameVacuumMiot", room_id=room_id)
return False
async def clean_zone(self, zones: list) -> bool:
"""Clean zone - may not be supported on all models."""
logger.warning("clean_zone not implemented for DreameVacuumMiot", zones=zones)
return False
async def clean_spot(self, spot_x: int, spot_y: int) -> bool:
"""Clean spot - may not be supported on all models."""
logger.warning(
"clean_spot not implemented for DreameVacuumMiot", spot_x=spot_x, spot_y=spot_y
)
return False
async def start_fast_mapping(self) -> bool:
"""Start fast mapping - not in base python-miio."""
return False
async def start_mapping(self) -> bool:
"""Start mapping - not in base python-miio."""
return False
async def set_cleaning_sequence(self, sequence: list[int]) -> bool:
"""Set cleaning sequence - not in base python-miio."""
return False
async def set_restricted_zones(self, zones: dict[str, list[list[int]]]) -> bool:
"""Set restricted zones - not in base python-miio."""
return False
async def get_cleaning_history(self) -> list[dict[str, Any]] | None:
"""Get cleaning history - not in base python-miio."""
return None
async def clear_error(self) -> bool:
"""Clear error - not in base python-miio."""
return False
async def go_to_position(self, x: float, y: float) -> bool:
"""Go to position - use spot clean if available."""
return await self.clean_spot(int(x), int(y))
# Global Dreame client instance
_dreame_client: DreameClient | None = None
def get_dreame_client(
robot_id: str = "dreame_01", config: dict[str, Any] | None = None
) -> DreameClient:
"""Get or create Dreame client instance.
When config is passed, updates or creates client with ip_address and token.
"""
global _dreame_client
if config is not None:
_dreame_client = DreameClient(robot_id, config)
elif _dreame_client is None:
_dreame_client = DreameClient(robot_id, None)
return _dreame_client
async def dreame_get_status(
robot_id: str = "dreame_01", config: dict[str, Any] | None = None
) -> dict[str, Any]:
"""Get Dreame robot status."""
client = get_dreame_client(robot_id, config)
status = await client.get_status()
if status:
return build_success_response(
operation="get_dreame_status",
summary=f"Dreame {robot_id} status retrieved",
result={"status": status, "robot_id": robot_id},
)
return build_robotics_error_response(
error="Failed to get Dreame status",
robot_type="dreame",
robot_id=robot_id,
recovery_options=[
"Check Dreame robot is powered on and connected to WiFi",
"Verify ip_address and token in config",
"Run scripts/discover_dreame.py to find IP",
"Run scripts/get_dreame_token.py to obtain token",
],
)
async def dreame_start_cleaning(
robot_id: str = "dreame_01", config: dict[str, Any] | None = None
) -> dict[str, Any]:
"""Start Dreame cleaning."""
client = get_dreame_client(robot_id, config)
success = await client.start_cleaning()
if success:
return build_success_response(
operation="start_dreame_cleaning",
summary=f"Dreame {robot_id} started cleaning",
result={"robot_id": robot_id, "action": "start_cleaning"},
)
return build_robotics_error_response(
error="Failed to start Dreame cleaning", robot_type="dreame", robot_id=robot_id
)
async def dreame_stop_cleaning(
robot_id: str = "dreame_01", config: dict[str, Any] | None = None
) -> dict[str, Any]:
"""Stop Dreame cleaning."""
client = get_dreame_client(robot_id, config)
success = await client.stop_cleaning()
if success:
return build_success_response(
operation="stop_dreame_cleaning",
summary=f"Dreame {robot_id} stopped cleaning",
result={"robot_id": robot_id, "action": "stop_cleaning"},
)
return build_robotics_error_response(
error="Failed to stop Dreame cleaning", robot_type="dreame", robot_id=robot_id
)
async def dreame_move(
robot_id: str = "dreame_01",
rotation: int = 0,
velocity: int = 0,
config: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Move Dreame robot with rotation and velocity."""
client = get_dreame_client(robot_id, config)
success = await client.move(rotation=rotation, velocity=velocity)
if success:
return build_success_response(
operation="move_dreame",
summary=f"Dreame {robot_id} moved (rotation: {rotation}, velocity: {velocity})",
result={"robot_id": robot_id, "rotation": rotation, "velocity": velocity},
)
return build_robotics_error_response(
error="Failed to move Dreame robot", robot_type="dreame", robot_id=robot_id
)
async def dreame_get_map(
robot_id: str = "dreame_01", config: dict[str, Any] | None = None
) -> dict[str, Any]:
"""Get Dreame map data."""
client = get_dreame_client(robot_id, config)
map_data = await client.get_map()
if map_data:
return build_success_response(
operation="get_dreame_map",
summary=f"Dreame {robot_id} map retrieved",
result={"robot_id": robot_id, "map": map_data},
)
return build_robotics_error_response(
error="Map not supported by DreameVacuumMiot", robot_type="dreame", robot_id=robot_id
)
async def dreame_clean_room(
robot_id: str = "dreame_01", room_id: int = 1, config: dict[str, Any] | None = None
) -> dict[str, Any]:
"""Clean specific room with Dreame."""
client = get_dreame_client(robot_id, config)
success = await client.clean_room(room_id)
if success:
return build_success_response(
operation="clean_dreame_room",
summary=f"Dreame {robot_id} cleaning room {room_id}",
result={"robot_id": robot_id, "room_id": room_id},
)
return build_robotics_error_response(
error="clean_room not supported", robot_type="dreame", robot_id=robot_id
)
async def dreame_clean_zone(
robot_id: str = "dreame_01",
zones: list[list[int]] | None = None,
config: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Clean specific zone(s) with Dreame."""
if not zones:
return build_robotics_error_response(
error="No zones specified for cleaning",
robot_type="dreame",
robot_id=robot_id,
)
client = get_dreame_client(robot_id, config)
success = await client.clean_zone(zones)
if success:
return build_success_response(
operation="clean_dreame_zone",
summary=f"Dreame {robot_id} cleaning {len(zones)} zone(s)",
result={"robot_id": robot_id, "zones": zones},
)
return build_robotics_error_response(
error="clean_zone not supported", robot_type="dreame", robot_id=robot_id
)
async def dreame_clean_spot(
robot_id: str = "dreame_01",
spot_x: int = 0,
spot_y: int = 0,
config: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Clean specific spot with Dreame."""
client = get_dreame_client(robot_id, config)
success = await client.clean_spot(spot_x, spot_y)
if success:
return build_success_response(
operation="clean_dreame_spot",
summary=f"Dreame {robot_id} cleaning spot at ({spot_x}, {spot_y})",
result={"robot_id": robot_id, "spot_x": spot_x, "spot_y": spot_y},
)
return build_robotics_error_response(
error="clean_spot not supported", robot_type="dreame", robot_id=robot_id
)