Skip to main content
Glama
blender_mcp_wrapper.py14.4 kB
""" Blender MCP Wrapper - Python side of the MCP bridge. Usage in Blender: import sys sys.path.insert(0, r"C:\path\to\blender-mcp-rs\python") sys.path.insert(0, r"C:\path\to\blender-mcp-rs\target\release") # or debug from blender_mcp_wrapper import BlenderMcpBridge bridge = BlenderMcpBridge() bridge.start() # Starts MCP server and polling # To stop: bridge.stop() """ from __future__ import annotations import json import math from typing import Any, Callable, Optional import bpy # Import the Rust extension (must be in sys.path) import blender_mcp class BlenderMcpBridge: """Bridge between Rust MCP server and Blender bpy API.""" def __init__(self, tag: str = "default", port: int = 8765): """ Initialize the bridge. Args: tag: Unique identifier for this instance port: HTTP port for MCP server (0 = auto-assign) """ self.tag = tag self.port = port self._mcp: Optional[blender_mcp.BlenderMcp] = None self._timer_running = False # Command handlers self._handlers: dict[str, Callable] = { "scene_info": self._handle_scene_info, "list_objects": self._handle_list_objects, "get_object": self._handle_get_object, "create_primitive": self._handle_create_primitive, "update_object": self._handle_update_object, "delete_object": self._handle_delete_object, "list_materials": self._handle_list_materials, "create_material": self._handle_create_material, "assign_material": self._handle_assign_material, "render_image": self._handle_render_image, "execute_python": self._handle_execute_python, } def start(self, poll_interval: float = 0.05) -> int: """ Start MCP server and begin polling for commands. Args: poll_interval: How often to check for commands (seconds) Returns: Actual port number the server is running on """ if self._mcp is not None and self._mcp.is_running(): print(f"[BlenderMCP] Already running on port {self._mcp.port}") return self._mcp.port # Create and start Rust MCP server self._mcp = blender_mcp.BlenderMcp(self.tag, self.port) actual_port = self._mcp.start() self.port = actual_port # Register timer for polling self._timer_running = True bpy.app.timers.register( lambda: self._poll_timer(poll_interval), first_interval=poll_interval ) print(f"[BlenderMCP] Server '{self.tag}' started on http://127.0.0.1:{actual_port}/mcp") return actual_port def stop(self): """Stop MCP server and polling.""" self._timer_running = False if self._mcp is not None: self._mcp.stop() print(f"[BlenderMCP] Server '{self.tag}' stopped") self._mcp = None def is_running(self) -> bool: """Check if server is running.""" return self._mcp is not None and self._mcp.is_running() @property def url(self) -> str: """Get MCP server URL.""" if self._mcp is None: return "" return self._mcp.url() def _poll_timer(self, interval: float) -> Optional[float]: """Timer callback - polls for commands and processes them.""" if not self._timer_running or self._mcp is None: return None # Stop timer # Process all pending commands while True: cmd = self._mcp.poll() if cmd is None: break self._process_command(cmd) return interval # Continue timer def _process_command(self, cmd: dict): """Process a single command from MCP server.""" cmd_id = cmd["id"] method = cmd["method"] params = json.loads(cmd["params"]) if cmd["params"] else {} handler = self._handlers.get(method) if handler is None: self._mcp.respond(cmd_id, None, f"Unknown method: {method}") return try: result = handler(params) result_json = json.dumps(result) self._mcp.respond(cmd_id, result_json, None) except Exception as e: self._mcp.respond(cmd_id, None, str(e)) # ========================================================================= # Command Handlers # ========================================================================= def _handle_scene_info(self, params: dict) -> dict: """Get scene information.""" scene = bpy.context.scene return { "name": scene.name, "fps": scene.render.fps, "frame_start": scene.frame_start, "frame_end": scene.frame_end, "frame_current": scene.frame_current, "resolution_x": scene.render.resolution_x, "resolution_y": scene.render.resolution_y, "render_engine": scene.render.engine, "object_count": len(bpy.data.objects), "material_count": len(bpy.data.materials), } def _handle_list_objects(self, params: dict) -> list[dict]: """List all objects with optional filtering.""" type_filter = params.get("type") collection_filter = params.get("collection") objects = bpy.data.objects if collection_filter: coll = bpy.data.collections.get(collection_filter) if coll: objects = coll.objects result = [] for obj in objects: if type_filter and obj.type != type_filter: continue result.append({ "name": obj.name, "type": obj.type, "location": list(obj.location), "visible": obj.visible_get(), }) return result def _handle_get_object(self, params: dict) -> dict: """Get detailed object information.""" name = params["name"] obj = bpy.data.objects.get(name) if obj is None: raise ValueError(f"Object not found: {name}") result = { "name": obj.name, "type": obj.type, "location": list(obj.location), "rotation_euler": list(obj.rotation_euler), "scale": list(obj.scale), "visible": obj.visible_get(), "parent": obj.parent.name if obj.parent else None, "children": [c.name for c in obj.children], } # Add mesh data if applicable if obj.type == "MESH" and obj.data: mesh = obj.data result["mesh"] = { "vertices": len(mesh.vertices), "edges": len(mesh.edges), "polygons": len(mesh.polygons), "materials": [m.name if m else None for m in mesh.materials], } return result def _handle_create_primitive(self, params: dict) -> dict: """Create a primitive mesh object.""" ptype = params["type"].lower() name = params.get("name") location = params.get("location", [0, 0, 0]) scale = params.get("scale", [1, 1, 1]) # Create primitive if ptype == "cube": bpy.ops.mesh.primitive_cube_add(location=location) elif ptype == "sphere": bpy.ops.mesh.primitive_uv_sphere_add(location=location) elif ptype == "cylinder": bpy.ops.mesh.primitive_cylinder_add(location=location) elif ptype == "plane": bpy.ops.mesh.primitive_plane_add(location=location) elif ptype == "cone": bpy.ops.mesh.primitive_cone_add(location=location) elif ptype == "torus": bpy.ops.mesh.primitive_torus_add(location=location) else: raise ValueError(f"Unknown primitive type: {ptype}") obj = bpy.context.active_object if name: obj.name = name obj.scale = scale return { "name": obj.name, "type": obj.type, "location": list(obj.location), } def _handle_update_object(self, params: dict) -> dict: """Update object transform.""" name = params["name"] obj = bpy.data.objects.get(name) if obj is None: raise ValueError(f"Object not found: {name}") if "location" in params: obj.location = params["location"] if "rotation" in params: obj.rotation_euler = params["rotation"] if "scale" in params: obj.scale = params["scale"] return { "name": obj.name, "location": list(obj.location), "rotation_euler": list(obj.rotation_euler), "scale": list(obj.scale), } def _handle_delete_object(self, params: dict) -> dict: """Delete an object.""" name = params["name"] obj = bpy.data.objects.get(name) if obj is None: raise ValueError(f"Object not found: {name}") bpy.data.objects.remove(obj, do_unlink=True) return {"deleted": name} def _handle_list_materials(self, params: dict) -> list[dict]: """List all materials.""" result = [] for mat in bpy.data.materials: info = { "name": mat.name, "use_nodes": mat.use_nodes, } # Try to get Principled BSDF values if mat.use_nodes and mat.node_tree: for node in mat.node_tree.nodes: if node.type == "BSDF_PRINCIPLED": info["base_color"] = list(node.inputs["Base Color"].default_value) info["metallic"] = node.inputs["Metallic"].default_value info["roughness"] = node.inputs["Roughness"].default_value break result.append(info) return result def _handle_create_material(self, params: dict) -> dict: """Create a new material.""" name = params["name"] color = params.get("color", [0.8, 0.8, 0.8, 1.0]) metallic = params.get("metallic", 0.0) roughness = params.get("roughness", 0.5) mat = bpy.data.materials.new(name=name) mat.use_nodes = True # Find Principled BSDF node bsdf = mat.node_tree.nodes.get("Principled BSDF") if bsdf: bsdf.inputs["Base Color"].default_value = color bsdf.inputs["Metallic"].default_value = metallic bsdf.inputs["Roughness"].default_value = roughness return { "name": mat.name, "base_color": color, "metallic": metallic, "roughness": roughness, } def _handle_assign_material(self, params: dict) -> dict: """Assign material to object.""" obj_name = params["object"] mat_name = params["material"] obj = bpy.data.objects.get(obj_name) if obj is None: raise ValueError(f"Object not found: {obj_name}") mat = bpy.data.materials.get(mat_name) if mat is None: raise ValueError(f"Material not found: {mat_name}") if obj.data is None: raise ValueError(f"Object {obj_name} has no data to assign material to") # Add material slot if needed if len(obj.data.materials) == 0: obj.data.materials.append(mat) else: obj.data.materials[0] = mat return { "object": obj_name, "material": mat_name, } def _handle_render_image(self, params: dict) -> dict: """Render current view to image.""" output = params["output"] engine = params.get("engine") width = params.get("width") height = params.get("height") samples = params.get("samples") scene = bpy.context.scene # Save current settings old_engine = scene.render.engine old_path = scene.render.filepath old_x = scene.render.resolution_x old_y = scene.render.resolution_y try: # Apply settings if engine: scene.render.engine = engine if width: scene.render.resolution_x = width if height: scene.render.resolution_y = height if samples and scene.render.engine == "CYCLES": scene.cycles.samples = samples scene.render.filepath = output # Render bpy.ops.render.render(write_still=True) return { "output": output, "engine": scene.render.engine, "resolution": [scene.render.resolution_x, scene.render.resolution_y], } finally: # Restore settings scene.render.engine = old_engine scene.render.filepath = old_path scene.render.resolution_x = old_x scene.render.resolution_y = old_y def _handle_execute_python(self, params: dict) -> dict: """Execute arbitrary Python code.""" code = params["code"] # Create execution namespace with bpy namespace = {"bpy": bpy, "math": math, "__result__": None} # Execute code exec(code, namespace) # Return result if set result = namespace.get("__result__") return { "executed": True, "result": result, } # Convenience function for quick start def start_mcp(tag: str = "default", port: int = 8765) -> BlenderMcpBridge: """Quick start MCP server.""" bridge = BlenderMcpBridge(tag=tag, port=port) bridge.start() return bridge # Global instance for simple usage _default_bridge: Optional[BlenderMcpBridge] = None def get_default_bridge() -> BlenderMcpBridge: """Get or create default bridge instance.""" global _default_bridge if _default_bridge is None: _default_bridge = BlenderMcpBridge() return _default_bridge def start() -> int: """Start default MCP bridge.""" return get_default_bridge().start() def stop(): """Stop default MCP bridge.""" global _default_bridge if _default_bridge is not None: _default_bridge.stop() _default_bridge = None

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ssoj13/blender-mcp-rs'

If you have feedback or need assistance with the MCP directory API, please join our Discord server