"""Dreame LIDAR map export to 3D formats (OBJ, PLY, Unity JSON).
Converts parsed Dreame D20 Pro map data into importable 3D geometry:
- OBJ: Wavefront OBJ with rooms as floor planes and walls as extruded boxes
- PLY: Point cloud of wall/floor pixels (for Blender point cloud import)
- Unity JSON: NavMesh-ready vertices and triangles
- Blender Python: Direct bpy script for importing into Blender
"""
import json
from pathlib import Path
from typing import Any
import structlog
logger = structlog.get_logger(__name__)
WALL_HEIGHT = 2.4 # meters (standard room height)
WALL_THICKNESS = 0.05 # meters (one pixel = 5cm)
def export_map_to_obj(map_data: dict[str, Any], output_path: str) -> dict[str, Any]:
"""Export Dreame map to Wavefront OBJ format.
Creates:
- Floor plane from floor_bounds
- Wall cubes from wall pixel positions (extruded to WALL_HEIGHT)
- Room regions as named groups
- Charger and robot position markers
Args:
map_data: Parsed map data from DreameClient.get_map()
output_path: Path for the .obj file
Returns:
Export result with file path and statistics.
"""
lines = [
"# Dreame D20 Pro LIDAR Map Export",
"# Generated by robotics-mcp",
f"# Resolution: {map_data.get('resolution_m', 0.05)}m per pixel",
f"# Total floor area: {map_data.get('total_floor_area_m2', 0)} m2",
"",
]
vertex_idx = 1
bounds = map_data.get("floor_bounds", {})
# --- Floor plane ---
lines.append("o Floor")
min_x = bounds.get("min_x", 0)
min_y = bounds.get("min_y", 0)
max_x = bounds.get("max_x", 10)
max_y = bounds.get("max_y", 10)
lines.append(f"v {min_x} 0.0 {min_y}")
lines.append(f"v {max_x} 0.0 {min_y}")
lines.append(f"v {max_x} 0.0 {max_y}")
lines.append(f"v {min_x} 0.0 {max_y}")
lines.append(f"f {vertex_idx} {vertex_idx+1} {vertex_idx+2} {vertex_idx+3}")
vertex_idx += 4
# --- Walls as extruded boxes ---
walls = map_data.get("walls", [])
if walls:
lines.append("")
lines.append("o Walls")
wall_count = 0
# Limit to 10000 wall pixels for OBJ size sanity
for wp in walls[:10000]:
wx = wp["x"]
wy = wp["y"]
t = WALL_THICKNESS
h = WALL_HEIGHT
# 8 vertices per wall box
lines.append(f"v {wx} 0.0 {wy}")
lines.append(f"v {wx+t} 0.0 {wy}")
lines.append(f"v {wx+t} 0.0 {wy+t}")
lines.append(f"v {wx} 0.0 {wy+t}")
lines.append(f"v {wx} {h} {wy}")
lines.append(f"v {wx+t} {h} {wy}")
lines.append(f"v {wx+t} {h} {wy+t}")
lines.append(f"v {wx} {h} {wy+t}")
v = vertex_idx
# 6 faces per box
lines.append(f"f {v} {v+1} {v+2} {v+3}") # bottom
lines.append(f"f {v+4} {v+7} {v+6} {v+5}") # top
lines.append(f"f {v} {v+4} {v+5} {v+1}") # front
lines.append(f"f {v+2} {v+6} {v+7} {v+3}") # back
lines.append(f"f {v} {v+3} {v+7} {v+4}") # left
lines.append(f"f {v+1} {v+5} {v+6} {v+2}") # right
vertex_idx += 8
wall_count += 1
# --- Room groups ---
rooms = map_data.get("rooms", [])
for room in rooms:
b = room.get("bounds", {})
if not b:
continue
name = str(room.get("name", f"Room_{room.get('id', 0)}")).replace(" ", "_")
lines.append("")
lines.append(f"o Room_{name}")
rx0 = b.get("min_x", 0)
ry0 = b.get("min_y", 0)
rx1 = b.get("max_x", 1)
ry1 = b.get("max_y", 1)
# Slightly elevated plane (1mm above floor) so it renders distinctly
lines.append(f"v {rx0} 0.001 {ry0}")
lines.append(f"v {rx1} 0.001 {ry0}")
lines.append(f"v {rx1} 0.001 {ry1}")
lines.append(f"v {rx0} 0.001 {ry1}")
lines.append(f"f {vertex_idx} {vertex_idx+1} {vertex_idx+2} {vertex_idx+3}")
vertex_idx += 4
# --- Markers ---
robot = map_data.get("robot_position", {})
charger = map_data.get("charger_position", {})
if robot.get("x") is not None:
lines.append("")
lines.append("o Robot_Position")
rx, ry = robot["x"], robot["y"]
# Small diamond marker
lines.append(f"v {rx} 0.1 {ry}")
lines.append(f"v {rx+0.1} 0.0 {ry}")
lines.append(f"v {rx} 0.0 {ry+0.1}")
lines.append(f"v {rx-0.1} 0.0 {ry}")
lines.append(f"v {rx} 0.0 {ry-0.1}")
lines.append(f"f {vertex_idx} {vertex_idx+1} {vertex_idx+2}")
lines.append(f"f {vertex_idx} {vertex_idx+2} {vertex_idx+3}")
lines.append(f"f {vertex_idx} {vertex_idx+3} {vertex_idx+4}")
lines.append(f"f {vertex_idx} {vertex_idx+4} {vertex_idx+1}")
vertex_idx += 5
if charger.get("x") is not None:
lines.append("")
lines.append("o Charger_Position")
cx, cy = charger["x"], charger["y"]
lines.append(f"v {cx-0.15} 0.0 {cy-0.15}")
lines.append(f"v {cx+0.15} 0.0 {cy-0.15}")
lines.append(f"v {cx+0.15} 0.0 {cy+0.15}")
lines.append(f"v {cx-0.15} 0.0 {cy+0.15}")
lines.append(f"f {vertex_idx} {vertex_idx+1} {vertex_idx+2} {vertex_idx+3}")
vertex_idx += 4
# Write file
output = Path(output_path)
output.parent.mkdir(parents=True, exist_ok=True)
output.write_text("\n".join(lines), encoding="utf-8")
return {
"file_path": str(output.resolve()),
"format": "obj",
"vertices": vertex_idx - 1,
"wall_boxes": min(len(walls), 10000),
"rooms": len(rooms),
"file_size_bytes": output.stat().st_size,
}
def export_map_to_ply(map_data: dict[str, Any], output_path: str) -> dict[str, Any]:
"""Export Dreame map as PLY point cloud.
Each wall pixel becomes a red point, floor pixels become gray points.
Suitable for Blender PLY import or point cloud visualization.
Args:
map_data: Parsed map data from DreameClient.get_map()
output_path: Path for the .ply file
Returns:
Export result with file path and point count.
"""
walls = map_data.get("walls", [])
# For PLY we include all wall points and a sampled set of floor
# (full floor can be huge - millions of points)
floor_area = map_data.get("total_floor_area_m2", 0)
resolution = map_data.get("resolution_m", 0.05)
points = []
# Walls: red (255, 0, 0), at y=0 and y=WALL_HEIGHT
for wp in walls:
points.append((wp["x"], 0.0, wp["y"], 200, 50, 50))
points.append((wp["x"], WALL_HEIGHT, wp["y"], 255, 0, 0))
# Add intermediate height points for wall visibility
points.append((wp["x"], WALL_HEIGHT / 2, wp["y"], 220, 30, 30))
# Robot position: bright green
robot = map_data.get("robot_position", {})
if robot.get("x") is not None:
for h in [0.0, 0.05, 0.1, 0.15, 0.2]:
points.append((robot["x"], h, robot["y"], 0, 255, 0))
# Charger: bright blue
charger = map_data.get("charger_position", {})
if charger.get("x") is not None:
for h in [0.0, 0.05, 0.1]:
points.append((charger["x"], h, charger["y"], 0, 100, 255))
# Room centers: yellow
for room in map_data.get("rooms", []):
b = room.get("bounds", {})
if b:
cx = (b["min_x"] + b["max_x"]) / 2
cy = (b["min_y"] + b["max_y"]) / 2
points.append((cx, 0.01, cy, 255, 255, 0))
# Write PLY
output = Path(output_path)
output.parent.mkdir(parents=True, exist_ok=True)
header = [
"ply",
"format ascii 1.0",
f"element vertex {len(points)}",
"property float x",
"property float y",
"property float z",
"property uchar red",
"property uchar green",
"property uchar blue",
"end_header",
]
with open(output, "w", encoding="utf-8") as f:
f.write("\n".join(header) + "\n")
for x, y, z, r, g, b in points:
f.write(f"{x:.4f} {y:.4f} {z:.4f} {r} {g} {b}\n")
return {
"file_path": str(output.resolve()),
"format": "ply",
"point_count": len(points),
"wall_points": len(walls) * 3,
"file_size_bytes": output.stat().st_size,
}
def export_map_to_unity(map_data: dict[str, Any], output_path: str) -> dict[str, Any]:
"""Export Dreame map as Unity NavMesh JSON.
Converts room bounds to mesh vertices and triangles in Unity coordinate space
(Y-up, Z-forward). Includes metadata for NavMesh agent configuration.
Args:
map_data: Parsed map data from DreameClient.get_map()
output_path: Path for the .json file
Returns:
Export result with file path and mesh stats.
"""
vertices = []
triangles = []
vertex_idx = 0
# Floor plane
bounds = map_data.get("floor_bounds", {})
if bounds:
vertices.extend([
{"x": bounds["min_x"], "y": 0.0, "z": bounds["min_y"]},
{"x": bounds["max_x"], "y": 0.0, "z": bounds["min_y"]},
{"x": bounds["max_x"], "y": 0.0, "z": bounds["max_y"]},
{"x": bounds["min_x"], "y": 0.0, "z": bounds["max_y"]},
])
triangles.extend([vertex_idx, vertex_idx + 1, vertex_idx + 2])
triangles.extend([vertex_idx, vertex_idx + 2, vertex_idx + 3])
vertex_idx += 4
# Room planes
room_meshes = []
for room in map_data.get("rooms", []):
b = room.get("bounds", {})
if not b:
continue
room_start = vertex_idx
vertices.extend([
{"x": b["min_x"], "y": 0.001, "z": b["min_y"]},
{"x": b["max_x"], "y": 0.001, "z": b["min_y"]},
{"x": b["max_x"], "y": 0.001, "z": b["max_y"]},
{"x": b["min_x"], "y": 0.001, "z": b["max_y"]},
])
triangles.extend([vertex_idx, vertex_idx + 1, vertex_idx + 2])
triangles.extend([vertex_idx, vertex_idx + 2, vertex_idx + 3])
vertex_idx += 4
room_meshes.append({
"name": room.get("name", f"Room_{room.get('id', 0)}"),
"vertex_start": room_start,
"vertex_count": 4,
"area_m2": room.get("area_m2", 0),
})
unity_data = {
"format": "unity_navmesh",
"version": "1.0",
"coordinate_system": "left_handed_y_up",
"vertices": vertices,
"triangles": triangles,
"rooms": room_meshes,
"robot_position": map_data.get("robot_position", {}),
"charger_position": map_data.get("charger_position", {}),
"navmesh_agent": {
"radius": 0.17, # Dreame D20 Pro radius ~17cm
"height": 0.10, # Dreame D20 Pro height ~10cm
"step_height": 0.02,
"max_slope": 15,
},
"metadata": {
"source": "Dreame D20 Pro LIDAR",
"total_floor_area_m2": map_data.get("total_floor_area_m2", 0),
"wall_count": map_data.get("wall_count", 0),
},
}
output = Path(output_path)
output.parent.mkdir(parents=True, exist_ok=True)
output.write_text(json.dumps(unity_data, indent=2), encoding="utf-8")
return {
"file_path": str(output.resolve()),
"format": "unity_json",
"vertices": len(vertices),
"triangles": len(triangles) // 3,
"rooms": len(room_meshes),
"file_size_bytes": output.stat().st_size,
}
def export_map_to_blender_script(map_data: dict[str, Any], output_path: str) -> dict[str, Any]:
"""Export Dreame map as a Blender Python script.
Generates a .py file that can be run inside Blender to create the floor plan
as native Blender mesh objects with materials. This is the most accurate way
to get the map into Blender without relying on file import.
Args:
map_data: Parsed map data from DreameClient.get_map()
output_path: Path for the .py file
Returns:
Export result with file path.
"""
bounds = map_data.get("floor_bounds", {})
rooms = map_data.get("rooms", [])
robot = map_data.get("robot_position", {})
charger = map_data.get("charger_position", {})
script = f'''"""Dreame D20 Pro LIDAR Map - Blender Import Script.
Auto-generated by robotics-mcp. Run in Blender's scripting tab.
"""
import bpy
import bmesh
# Clean scene
bpy.ops.object.select_all(action='SELECT')
bpy.ops.object.delete(use_global=False)
# --- Floor plane ---
bpy.ops.mesh.primitive_plane_add(size=1, location=(0, 0, 0))
floor = bpy.context.active_object
floor.name = "Dreame_Floor"
floor.scale = ({bounds.get("max_x", 5) - bounds.get("min_x", 0)},
{bounds.get("max_y", 5) - bounds.get("min_y", 0)}, 1)
floor.location = ({(bounds.get("min_x", 0) + bounds.get("max_x", 5)) / 2},
{(bounds.get("min_y", 0) + bounds.get("max_y", 5)) / 2}, 0)
# Floor material
mat_floor = bpy.data.materials.new(name="Floor_Material")
mat_floor.diffuse_color = (0.85, 0.85, 0.82, 1.0)
floor.data.materials.append(mat_floor)
# --- Rooms ---
'''
for i, room in enumerate(rooms):
b = room.get("bounds", {})
if not b:
continue
name = str(room.get("name", f"Room_{room.get('id', 0)}")).replace(" ", "_")
cx = (b["min_x"] + b["max_x"]) / 2
cy = (b["min_y"] + b["max_y"]) / 2
sx = b["max_x"] - b["min_x"]
sy = b["max_y"] - b["min_y"]
# Assign distinct colors per room
hue = (i * 0.15) % 1.0
script += f'''
bpy.ops.mesh.primitive_plane_add(size=1, location=({cx}, {cy}, 0.001))
room_{i} = bpy.context.active_object
room_{i}.name = "Room_{name}"
room_{i}.scale = ({sx}, {sy}, 1)
mat_room_{i} = bpy.data.materials.new(name="Room_{name}_Material")
mat_room_{i}.diffuse_color = ({0.3 + hue * 0.5}, {0.5 + (1 - hue) * 0.3}, {0.7 - hue * 0.3}, 0.8)
room_{i}.data.materials.append(mat_room_{i})
'''
# Robot marker
if robot.get("x") is not None:
script += f'''
# --- Robot position ---
bpy.ops.mesh.primitive_cone_add(radius1=0.15, depth=0.2, location=({robot["x"]}, {robot["y"]}, 0.1))
robot_marker = bpy.context.active_object
robot_marker.name = "Robot_Dreame_D20"
mat_robot = bpy.data.materials.new(name="Robot_Material")
mat_robot.diffuse_color = (0.0, 0.9, 0.2, 1.0)
robot_marker.data.materials.append(mat_robot)
'''
# Charger marker
if charger.get("x") is not None:
script += f'''
# --- Charger position ---
bpy.ops.mesh.primitive_cylinder_add(radius=0.15, depth=0.05, location=({charger["x"]}, {charger["y"]}, 0.025))
charger_marker = bpy.context.active_object
charger_marker.name = "Charger_Station"
mat_charger = bpy.data.materials.new(name="Charger_Material")
mat_charger.diffuse_color = (0.0, 0.3, 1.0, 1.0)
charger_marker.data.materials.append(mat_charger)
'''
script += '''
# --- Final setup ---
bpy.ops.object.select_all(action='SELECT')
bpy.ops.view3d.view_all(center=True)
print("Dreame D20 Pro map imported successfully")
'''
output = Path(output_path)
output.parent.mkdir(parents=True, exist_ok=True)
output.write_text(script, encoding="utf-8")
return {
"file_path": str(output.resolve()),
"format": "blender_python",
"rooms": len(rooms),
"file_size_bytes": output.stat().st_size,
}
async def export_dreame_map(
map_data: dict[str, Any],
output_dir: str,
formats: list[str] | None = None,
) -> dict[str, Any]:
"""Export Dreame map to multiple 3D formats.
Args:
map_data: Parsed map data from DreameClient.get_map()
output_dir: Directory for exported files
formats: List of formats to export. Default: all.
Options: "obj", "ply", "unity", "blender", "json"
Returns:
Dict with export results per format.
"""
if formats is None:
formats = ["obj", "ply", "unity", "blender", "json"]
output = Path(output_dir)
output.mkdir(parents=True, exist_ok=True)
results = {}
for fmt in formats:
try:
if fmt == "obj":
results["obj"] = export_map_to_obj(
map_data, str(output / "dreame_map.obj"))
elif fmt == "ply":
results["ply"] = export_map_to_ply(
map_data, str(output / "dreame_map.ply"))
elif fmt == "unity":
results["unity"] = export_map_to_unity(
map_data, str(output / "dreame_map_unity.json"))
elif fmt == "blender":
results["blender"] = export_map_to_blender_script(
map_data, str(output / "dreame_map_blender.py"))
elif fmt == "json":
json_path = output / "dreame_map_raw.json"
json_path.write_text(
json.dumps(map_data, indent=2, default=str), encoding="utf-8")
results["json"] = {
"file_path": str(json_path.resolve()),
"format": "json",
"file_size_bytes": json_path.stat().st_size,
}
except Exception as e:
logger.error("Failed to export format", format=fmt, error=str(e))
results[fmt] = {"error": str(e)}
return {
"output_directory": str(output.resolve()),
"formats_exported": [f for f in results if "error" not in results[f]],
"formats_failed": [f for f in results if "error" in results[f]],
"exports": results,
}