"""
Blender MCP Wrapper - Python side of the MCP bridge.
Usage in Blender:
import sys
sys.path.insert(0, r"C:\path\to\blender-mcp-rs\python")
sys.path.insert(0, r"C:\path\to\blender-mcp-rs\target\release") # or debug
from blender_mcp_wrapper import BlenderMcpBridge
bridge = BlenderMcpBridge()
bridge.start() # Starts MCP server and polling
# To stop:
bridge.stop()
"""
from __future__ import annotations
import json
import math
from typing import Any, Callable, Optional
import bpy
# Import the Rust extension (must be in sys.path)
import blender_mcp
class BlenderMcpBridge:
"""Bridge between Rust MCP server and Blender bpy API."""
def __init__(self, tag: str = "default", port: int = 8765):
"""
Initialize the bridge.
Args:
tag: Unique identifier for this instance
port: HTTP port for MCP server (0 = auto-assign)
"""
self.tag = tag
self.port = port
self._mcp: Optional[blender_mcp.BlenderMcp] = None
self._timer_running = False
# Command handlers
self._handlers: dict[str, Callable] = {
"scene_info": self._handle_scene_info,
"list_objects": self._handle_list_objects,
"get_object": self._handle_get_object,
"create_primitive": self._handle_create_primitive,
"update_object": self._handle_update_object,
"delete_object": self._handle_delete_object,
"list_materials": self._handle_list_materials,
"create_material": self._handle_create_material,
"assign_material": self._handle_assign_material,
"render_image": self._handle_render_image,
"execute_python": self._handle_execute_python,
}
def start(self, poll_interval: float = 0.05) -> int:
"""
Start MCP server and begin polling for commands.
Args:
poll_interval: How often to check for commands (seconds)
Returns:
Actual port number the server is running on
"""
if self._mcp is not None and self._mcp.is_running():
print(f"[BlenderMCP] Already running on port {self._mcp.port}")
return self._mcp.port
# Create and start Rust MCP server
self._mcp = blender_mcp.BlenderMcp(self.tag, self.port)
actual_port = self._mcp.start()
self.port = actual_port
# Register timer for polling
self._timer_running = True
bpy.app.timers.register(
lambda: self._poll_timer(poll_interval),
first_interval=poll_interval
)
print(f"[BlenderMCP] Server '{self.tag}' started on http://127.0.0.1:{actual_port}/mcp")
return actual_port
def stop(self):
"""Stop MCP server and polling."""
self._timer_running = False
if self._mcp is not None:
self._mcp.stop()
print(f"[BlenderMCP] Server '{self.tag}' stopped")
self._mcp = None
def is_running(self) -> bool:
"""Check if server is running."""
return self._mcp is not None and self._mcp.is_running()
@property
def url(self) -> str:
"""Get MCP server URL."""
if self._mcp is None:
return ""
return self._mcp.url()
def _poll_timer(self, interval: float) -> Optional[float]:
"""Timer callback - polls for commands and processes them."""
if not self._timer_running or self._mcp is None:
return None # Stop timer
# Process all pending commands
while True:
cmd = self._mcp.poll()
if cmd is None:
break
self._process_command(cmd)
return interval # Continue timer
def _process_command(self, cmd: dict):
"""Process a single command from MCP server."""
cmd_id = cmd["id"]
method = cmd["method"]
params = json.loads(cmd["params"]) if cmd["params"] else {}
handler = self._handlers.get(method)
if handler is None:
self._mcp.respond(cmd_id, None, f"Unknown method: {method}")
return
try:
result = handler(params)
result_json = json.dumps(result)
self._mcp.respond(cmd_id, result_json, None)
except Exception as e:
self._mcp.respond(cmd_id, None, str(e))
# =========================================================================
# Command Handlers
# =========================================================================
def _handle_scene_info(self, params: dict) -> dict:
"""Get scene information."""
scene = bpy.context.scene
return {
"name": scene.name,
"fps": scene.render.fps,
"frame_start": scene.frame_start,
"frame_end": scene.frame_end,
"frame_current": scene.frame_current,
"resolution_x": scene.render.resolution_x,
"resolution_y": scene.render.resolution_y,
"render_engine": scene.render.engine,
"object_count": len(bpy.data.objects),
"material_count": len(bpy.data.materials),
}
def _handle_list_objects(self, params: dict) -> list[dict]:
"""List all objects with optional filtering."""
type_filter = params.get("type")
collection_filter = params.get("collection")
objects = bpy.data.objects
if collection_filter:
coll = bpy.data.collections.get(collection_filter)
if coll:
objects = coll.objects
result = []
for obj in objects:
if type_filter and obj.type != type_filter:
continue
result.append({
"name": obj.name,
"type": obj.type,
"location": list(obj.location),
"visible": obj.visible_get(),
})
return result
def _handle_get_object(self, params: dict) -> dict:
"""Get detailed object information."""
name = params["name"]
obj = bpy.data.objects.get(name)
if obj is None:
raise ValueError(f"Object not found: {name}")
result = {
"name": obj.name,
"type": obj.type,
"location": list(obj.location),
"rotation_euler": list(obj.rotation_euler),
"scale": list(obj.scale),
"visible": obj.visible_get(),
"parent": obj.parent.name if obj.parent else None,
"children": [c.name for c in obj.children],
}
# Add mesh data if applicable
if obj.type == "MESH" and obj.data:
mesh = obj.data
result["mesh"] = {
"vertices": len(mesh.vertices),
"edges": len(mesh.edges),
"polygons": len(mesh.polygons),
"materials": [m.name if m else None for m in mesh.materials],
}
return result
def _handle_create_primitive(self, params: dict) -> dict:
"""Create a primitive mesh object."""
ptype = params["type"].lower()
name = params.get("name")
location = params.get("location", [0, 0, 0])
scale = params.get("scale", [1, 1, 1])
# Create primitive
if ptype == "cube":
bpy.ops.mesh.primitive_cube_add(location=location)
elif ptype == "sphere":
bpy.ops.mesh.primitive_uv_sphere_add(location=location)
elif ptype == "cylinder":
bpy.ops.mesh.primitive_cylinder_add(location=location)
elif ptype == "plane":
bpy.ops.mesh.primitive_plane_add(location=location)
elif ptype == "cone":
bpy.ops.mesh.primitive_cone_add(location=location)
elif ptype == "torus":
bpy.ops.mesh.primitive_torus_add(location=location)
else:
raise ValueError(f"Unknown primitive type: {ptype}")
obj = bpy.context.active_object
if name:
obj.name = name
obj.scale = scale
return {
"name": obj.name,
"type": obj.type,
"location": list(obj.location),
}
def _handle_update_object(self, params: dict) -> dict:
"""Update object transform."""
name = params["name"]
obj = bpy.data.objects.get(name)
if obj is None:
raise ValueError(f"Object not found: {name}")
if "location" in params:
obj.location = params["location"]
if "rotation" in params:
obj.rotation_euler = params["rotation"]
if "scale" in params:
obj.scale = params["scale"]
return {
"name": obj.name,
"location": list(obj.location),
"rotation_euler": list(obj.rotation_euler),
"scale": list(obj.scale),
}
def _handle_delete_object(self, params: dict) -> dict:
"""Delete an object."""
name = params["name"]
obj = bpy.data.objects.get(name)
if obj is None:
raise ValueError(f"Object not found: {name}")
bpy.data.objects.remove(obj, do_unlink=True)
return {"deleted": name}
def _handle_list_materials(self, params: dict) -> list[dict]:
"""List all materials."""
result = []
for mat in bpy.data.materials:
info = {
"name": mat.name,
"use_nodes": mat.use_nodes,
}
# Try to get Principled BSDF values
if mat.use_nodes and mat.node_tree:
for node in mat.node_tree.nodes:
if node.type == "BSDF_PRINCIPLED":
info["base_color"] = list(node.inputs["Base Color"].default_value)
info["metallic"] = node.inputs["Metallic"].default_value
info["roughness"] = node.inputs["Roughness"].default_value
break
result.append(info)
return result
def _handle_create_material(self, params: dict) -> dict:
"""Create a new material."""
name = params["name"]
color = params.get("color", [0.8, 0.8, 0.8, 1.0])
metallic = params.get("metallic", 0.0)
roughness = params.get("roughness", 0.5)
mat = bpy.data.materials.new(name=name)
mat.use_nodes = True
# Find Principled BSDF node
bsdf = mat.node_tree.nodes.get("Principled BSDF")
if bsdf:
bsdf.inputs["Base Color"].default_value = color
bsdf.inputs["Metallic"].default_value = metallic
bsdf.inputs["Roughness"].default_value = roughness
return {
"name": mat.name,
"base_color": color,
"metallic": metallic,
"roughness": roughness,
}
def _handle_assign_material(self, params: dict) -> dict:
"""Assign material to object."""
obj_name = params["object"]
mat_name = params["material"]
obj = bpy.data.objects.get(obj_name)
if obj is None:
raise ValueError(f"Object not found: {obj_name}")
mat = bpy.data.materials.get(mat_name)
if mat is None:
raise ValueError(f"Material not found: {mat_name}")
if obj.data is None:
raise ValueError(f"Object {obj_name} has no data to assign material to")
# Add material slot if needed
if len(obj.data.materials) == 0:
obj.data.materials.append(mat)
else:
obj.data.materials[0] = mat
return {
"object": obj_name,
"material": mat_name,
}
def _handle_render_image(self, params: dict) -> dict:
"""Render current view to image."""
output = params["output"]
engine = params.get("engine")
width = params.get("width")
height = params.get("height")
samples = params.get("samples")
scene = bpy.context.scene
# Save current settings
old_engine = scene.render.engine
old_path = scene.render.filepath
old_x = scene.render.resolution_x
old_y = scene.render.resolution_y
try:
# Apply settings
if engine:
scene.render.engine = engine
if width:
scene.render.resolution_x = width
if height:
scene.render.resolution_y = height
if samples and scene.render.engine == "CYCLES":
scene.cycles.samples = samples
scene.render.filepath = output
# Render
bpy.ops.render.render(write_still=True)
return {
"output": output,
"engine": scene.render.engine,
"resolution": [scene.render.resolution_x, scene.render.resolution_y],
}
finally:
# Restore settings
scene.render.engine = old_engine
scene.render.filepath = old_path
scene.render.resolution_x = old_x
scene.render.resolution_y = old_y
def _handle_execute_python(self, params: dict) -> dict:
"""Execute arbitrary Python code."""
code = params["code"]
# Create execution namespace with bpy
namespace = {"bpy": bpy, "math": math, "__result__": None}
# Execute code
exec(code, namespace)
# Return result if set
result = namespace.get("__result__")
return {
"executed": True,
"result": result,
}
# Convenience function for quick start
def start_mcp(tag: str = "default", port: int = 8765) -> BlenderMcpBridge:
"""Quick start MCP server."""
bridge = BlenderMcpBridge(tag=tag, port=port)
bridge.start()
return bridge
# Global instance for simple usage
_default_bridge: Optional[BlenderMcpBridge] = None
def get_default_bridge() -> BlenderMcpBridge:
"""Get or create default bridge instance."""
global _default_bridge
if _default_bridge is None:
_default_bridge = BlenderMcpBridge()
return _default_bridge
def start() -> int:
"""Start default MCP bridge."""
return get_default_bridge().start()
def stop():
"""Stop default MCP bridge."""
global _default_bridge
if _default_bridge is not None:
_default_bridge.stop()
_default_bridge = None