Bonsai-mcp

by JotaDeRodriguez
Verified
import bpy import mathutils import json import threading import socket import time import requests import tempfile import traceback import os import shutil from bpy.props import StringProperty, IntProperty, BoolProperty, EnumProperty import base64 import bpy import ifcopenshell from bonsai.bim.ifc import IfcStore bl_info = { "name": "Bonsai MCP", "author": "JotaDeRodriguez", "version": (0, 2), "blender": (3, 0, 0), "location": "View3D > Sidebar > Bonsai MCP", "description": "Connect Claude to Blender via MCP. Aimed at IFC projects", "category": "Interface", } class BlenderMCPServer: def __init__(self, host='localhost', port=9876): self.host = host self.port = port self.running = False self.socket = None self.server_thread = None def start(self): if self.running: print("Server is already running") return self.running = True try: # Create socket self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.bind((self.host, self.port)) self.socket.listen(1) # Start server thread self.server_thread = threading.Thread(target=self._server_loop) self.server_thread.daemon = True self.server_thread.start() print(f"BlenderMCP server started on {self.host}:{self.port}") except Exception as e: print(f"Failed to start server: {str(e)}") self.stop() def stop(self): self.running = False # Close socket if self.socket: try: self.socket.close() except: pass self.socket = None # Wait for thread to finish if self.server_thread: try: if self.server_thread.is_alive(): self.server_thread.join(timeout=1.0) except: pass self.server_thread = None print("BlenderMCP server stopped") def _server_loop(self): """Main server loop in a separate thread""" print("Server thread started") self.socket.settimeout(1.0) # Timeout to allow for stopping while self.running: try: # Accept new connection try: client, address = self.socket.accept() print(f"Connected to client: {address}") # Handle client in a separate thread client_thread = threading.Thread( target=self._handle_client, args=(client,) ) client_thread.daemon = True client_thread.start() except socket.timeout: # Just check running condition continue except Exception as e: print(f"Error accepting connection: {str(e)}") time.sleep(0.5) except Exception as e: print(f"Error in server loop: {str(e)}") if not self.running: break time.sleep(0.5) print("Server thread stopped") def _handle_client(self, client): """Handle connected client""" print("Client handler started") client.settimeout(None) # No timeout buffer = b'' try: while self.running: # Receive data try: data = client.recv(8192) if not data: print("Client disconnected") break buffer += data try: # Try to parse command command = json.loads(buffer.decode('utf-8')) buffer = b'' # Execute command in Blender's main thread def execute_wrapper(): try: response = self.execute_command(command) response_json = json.dumps(response) try: client.sendall(response_json.encode('utf-8')) except: print("Failed to send response - client disconnected") except Exception as e: print(f"Error executing command: {str(e)}") traceback.print_exc() try: error_response = { "status": "error", "message": str(e) } client.sendall(json.dumps(error_response).encode('utf-8')) except: pass return None # Schedule execution in main thread bpy.app.timers.register(execute_wrapper, first_interval=0.0) except json.JSONDecodeError: # Incomplete data, wait for more pass except Exception as e: print(f"Error receiving data: {str(e)}") break except Exception as e: print(f"Error in client handler: {str(e)}") finally: try: client.close() except: pass print("Client handler stopped") def execute_command(self, command): """Execute a command in the main Blender thread""" try: cmd_type = command.get("type") params = command.get("params", {}) # Ensure we're in the right context if cmd_type in ["create_object", "modify_object", "delete_object"]: override = bpy.context.copy() override['area'] = [area for area in bpy.context.screen.areas if area.type == 'VIEW_3D'][0] with bpy.context.temp_override(**override): return self._execute_command_internal(command) else: return self._execute_command_internal(command) except Exception as e: print(f"Error executing command: {str(e)}") traceback.print_exc() return {"status": "error", "message": str(e)} def _execute_command_internal(self, command): """Internal command execution with proper context""" cmd_type = command.get("type") params = command.get("params", {}) # Base handlers that are always available handlers = { "execute_code": self.execute_code, "get_ifc_project_info": self.get_ifc_project_info, "list_ifc_entities": self.list_ifc_entities, "get_ifc_properties": self.get_ifc_properties, "get_ifc_spatial_structure": self.get_ifc_spatial_structure, "get_ifc_relationships": self.get_ifc_relationships, "get_selected_ifc_entities": self.get_selected_ifc_entities, "get_current_view": self.get_current_view, "create_orthographic_render": self.create_orthographic_render } handler = handlers.get(cmd_type) if handler: try: print(f"Executing handler for {cmd_type}") result = handler(**params) print(f"Handler execution complete") return {"status": "success", "result": result} except Exception as e: print(f"Error in handler: {str(e)}") traceback.print_exc() return {"status": "error", "message": str(e)} else: return {"status": "error", "message": f"Unknown command type: {cmd_type}"} def execute_code(self, code): """Execute arbitrary Blender Python code""" # This is powerful but potentially dangerous - use with caution try: # Create a local namespace for execution namespace = {"bpy": bpy} exec(code, namespace) return {"executed": True} except Exception as e: raise Exception(f"Code execution error: {str(e)}") @staticmethod def get_selected_ifc_entities(): """ Get the IFC entities corresponding to the currently selected Blender objects. Returns: List of IFC entities for the selected objects """ try: file = IfcStore.get_file() if file is None: return {"error": "No IFC file is currently loaded"} # Get currently selected objects selected_objects = bpy.context.selected_objects if not selected_objects: return {"selected_count": 0, "message": "No objects selected in Blender"} # Collect IFC entities from selected objects selected_entities = [] for obj in selected_objects: if hasattr(obj, "BIMObjectProperties") and obj.BIMObjectProperties.ifc_definition_id: entity_id = obj.BIMObjectProperties.ifc_definition_id entity = file.by_id(entity_id) if entity: entity_info = { "id": entity.GlobalId if hasattr(entity, "GlobalId") else f"Entity_{entity.id()}", "ifc_id": entity.id(), "type": entity.is_a(), "name": entity.Name if hasattr(entity, "Name") else None, "blender_name": obj.name } selected_entities.append(entity_info) return { "selected_count": len(selected_entities), "selected_entities": selected_entities } except Exception as e: import traceback return {"error": str(e), "traceback": traceback.format_exc()} ### SPECIFIC IFC METHODS ### @staticmethod def get_ifc_project_info(): """ Get basic information about the IFC project. Returns: Dictionary with project name, description, and basic metrics """ try: file = IfcStore.get_file() if file is None: return {"error": "No IFC file is currently loaded"} # Get project information projects = file.by_type("IfcProject") if not projects: return {"error": "No IfcProject found in the model"} project = projects[0] # Basic project info info = { "id": project.GlobalId, "name": project.Name if hasattr(project, "Name") else "Unnamed Project", "description": project.Description if hasattr(project, "Description") else None, "entity_counts": {} } # Count entities by type entity_types = ["IfcWall", "IfcDoor", "IfcWindow", "IfcSlab", "IfcBeam", "IfcColumn", "IfcSpace", "IfcBuildingStorey"] for entity_type in entity_types: entities = file.by_type(entity_type) info["entity_counts"][entity_type] = len(entities) return info except Exception as e: import traceback return {"error": str(e), "traceback": traceback.format_exc()} @staticmethod def list_ifc_entities(entity_type=None, limit=50, selected_only=False): """ List IFC entities of a specific type. Parameters: entity_type: Type of IFC entity to list (e.g., "IfcWall") limit: Maximum number of entities to return Returns: List of entities with basic properties """ try: file = IfcStore.get_file() if file is None: return {"error": "No IFC file is currently loaded"} # If we're only looking at selected objects if selected_only: selected_result = BlenderMCPServer.get_selected_ifc_entities() # Check for errors if "error" in selected_result: return selected_result # If no objects are selected, return early if selected_result["selected_count"] == 0: return selected_result # If entity_type is specified, filter the selected entities if entity_type: filtered_entities = [ entity for entity in selected_result["selected_entities"] if entity["type"] == entity_type ] return { "type": entity_type, "selected_count": len(filtered_entities), "entities": filtered_entities[:limit] } else: # Group selected entities by type entity_types = {} for entity in selected_result["selected_entities"]: entity_type = entity["type"] if entity_type in entity_types: entity_types[entity_type].append(entity) else: entity_types[entity_type] = [entity] return { "selected_count": selected_result["selected_count"], "entity_types": [ {"type": t, "count": len(entities), "entities": entities[:limit]} for t, entities in entity_types.items() ] } # Original functionality for non-selected mode if not entity_type: # If no type specified, list available entity types entity_types = {} for entity in file.wrapped_data.entities: entity_type = entity.is_a() if entity_type in entity_types: entity_types[entity_type] += 1 else: entity_types[entity_type] = 1 return { "available_types": [{"type": k, "count": v} for k, v in entity_types.items()] } # Get entities of the specified type entities = file.by_type(entity_type) # Prepare the result result = { "type": entity_type, "total_count": len(entities), "entities": [] } # Add entity data (limited) for i, entity in enumerate(entities): if i >= limit: break entity_data = { "id": entity.GlobalId if hasattr(entity, "GlobalId") else f"Entity_{entity.id()}", "name": entity.Name if hasattr(entity, "Name") else None } result["entities"].append(entity_data) return result except Exception as e: import traceback return {"error": str(e), "traceback": traceback.format_exc()} @staticmethod def get_ifc_properties(global_id=None, selected_only=False): """ Get all properties of a specific IFC entity. Parameters: global_id: GlobalId of the IFC entity Returns: Dictionary with entity information and properties """ try: file = IfcStore.get_file() if file is None: return {"error": "No IFC file is currently loaded"} # If we're only looking at selected objects if selected_only: selected_result = BlenderMCPServer.get_selected_ifc_entities() # Check for errors if "error" in selected_result: return selected_result # If no objects are selected, return early if selected_result["selected_count"] == 0: return selected_result # Process each selected entity result = { "selected_count": selected_result["selected_count"], "entities": [] } for entity_info in selected_result["selected_entities"]: # Find entity by GlobalId entity = file.by_guid(entity_info["id"]) if not entity: continue # Get basic entity info entity_data = { "id": entity.GlobalId, "type": entity.is_a(), "name": entity.Name if hasattr(entity, "Name") else None, "description": entity.Description if hasattr(entity, "Description") else None, "blender_name": entity_info["blender_name"], "property_sets": {} } # Get all property sets psets = ifcopenshell.util.element.get_psets(entity) for pset_name, pset_data in psets.items(): entity_data["property_sets"][pset_name] = pset_data result["entities"].append(entity_data) return result # If we're looking at a specific entity elif global_id: # Find entity by GlobalId entity = file.by_guid(global_id) if not entity: return {"error": f"No entity found with GlobalId: {global_id}"} # Get basic entity info entity_info = { "id": entity.GlobalId, "type": entity.is_a(), "name": entity.Name if hasattr(entity, "Name") else None, "description": entity.Description if hasattr(entity, "Description") else None, "property_sets": {} } # Get all property sets psets = ifcopenshell.util.element.get_psets(entity) for pset_name, pset_data in psets.items(): entity_info["property_sets"][pset_name] = pset_data return entity_info else: return {"error": "Either global_id or selected_only must be specified"} except Exception as e: import traceback return {"error": str(e), "traceback": traceback.format_exc()} @staticmethod def get_ifc_spatial_structure(): """ Get the spatial structure of the IFC model (site, building, storey, space hierarchy). Returns: Hierarchical structure of the IFC model's spatial elements """ try: file = IfcStore.get_file() if file is None: return {"error": "No IFC file is currently loaded"} # Start with projects projects = file.by_type("IfcProject") if not projects: return {"error": "No IfcProject found in the model"} def get_children(parent): """Get immediate children of the given element""" if hasattr(parent, "IsDecomposedBy"): rel_aggregates = parent.IsDecomposedBy children = [] for rel in rel_aggregates: children.extend(rel.RelatedObjects) return children return [] def create_structure(element): """Recursively create the structure for an element""" result = { "id": element.GlobalId, "type": element.is_a(), "name": element.Name if hasattr(element, "Name") else None, "children": [] } for child in get_children(element): result["children"].append(create_structure(child)) return result # Create the structure starting from the project structure = create_structure(projects[0]) return structure except Exception as e: import traceback return {"error": str(e), "traceback": traceback.format_exc()} @staticmethod def get_ifc_relationships(global_id): """ Get all relationships for a specific IFC entity. Parameters: global_id: GlobalId of the IFC entity Returns: Dictionary with all relationships the entity participates in """ try: file = IfcStore.get_file() if file is None: return {"error": "No IFC file is currently loaded"} # Find entity by GlobalId entity = file.by_guid(global_id) if not entity: return {"error": f"No entity found with GlobalId: {global_id}"} # Basic entity info entity_info = { "id": entity.GlobalId, "type": entity.is_a(), "name": entity.Name if hasattr(entity, "Name") else None, "relationships": { "contains": [], "contained_in": [], "connects": [], "connected_by": [], "defines": [], "defined_by": [] } } # Check if entity contains other elements if hasattr(entity, "IsDecomposedBy"): for rel in entity.IsDecomposedBy: for obj in rel.RelatedObjects: entity_info["relationships"]["contains"].append({ "id": obj.GlobalId, "type": obj.is_a(), "name": obj.Name if hasattr(obj, "Name") else None }) # Check if entity is contained in other elements if hasattr(entity, "Decomposes"): for rel in entity.Decomposes: rel_obj = rel.RelatingObject entity_info["relationships"]["contained_in"].append({ "id": rel_obj.GlobalId, "type": rel_obj.is_a(), "name": rel_obj.Name if hasattr(rel_obj, "Name") else None }) # For physical connections (depends on entity type) if hasattr(entity, "ConnectedTo"): for rel in entity.ConnectedTo: for obj in rel.RelatedElement: entity_info["relationships"]["connects"].append({ "id": obj.GlobalId, "type": obj.is_a(), "name": obj.Name if hasattr(obj, "Name") else None, "connection_type": rel.ConnectionType if hasattr(rel, "ConnectionType") else None }) if hasattr(entity, "ConnectedFrom"): for rel in entity.ConnectedFrom: obj = rel.RelatingElement entity_info["relationships"]["connected_by"].append({ "id": obj.GlobalId, "type": obj.is_a(), "name": obj.Name if hasattr(obj, "Name") else None, "connection_type": rel.ConnectionType if hasattr(rel, "ConnectionType") else None }) return entity_info except Exception as e: import traceback return {"error": str(e), "traceback": traceback.format_exc()} ### Ability to see @staticmethod def get_current_view(): """Capture and return the current viewport as an image""" try: # Find a 3D View for area in bpy.context.screen.areas: if area.type == 'VIEW_3D': break else: return {"error": "No 3D View available"} # Create temporary file to save the viewport screenshot temp_file = tempfile.NamedTemporaryFile(suffix='.png', delete=False) temp_path = temp_file.name temp_file.close() # Find appropriate region for region in area.regions: if region.type == 'WINDOW': break else: return {"error": "No appropriate region found in 3D View"} # Use temp_override instead of the old override dictionary with bpy.context.temp_override(area=area, region=region): # Save screenshot bpy.ops.screen.screenshot(filepath=temp_path) # Read the image data and encode as base64 with open(temp_path, 'rb') as f: image_data = f.read() # Clean up os.unlink(temp_path) # Return base64 encoded image return { "width": area.width, "height": area.height, "format": "png", "data": base64.b64encode(image_data).decode('utf-8') } except Exception as e: import traceback return {"error": str(e), "traceback": traceback.format_exc()} #endregion # Blender UI Panel class BLENDERMCP_PT_Panel(bpy.types.Panel): bl_label = "Bonsai MCP" bl_idname = "BLENDERMCP_PT_Panel" bl_space_type = 'VIEW_3D' bl_region_type = 'UI' bl_category = 'Bonsai MCP' def draw(self, context): layout = self.layout scene = context.scene layout.prop(scene, "blendermcp_port") if not scene.blendermcp_server_running: layout.operator("blendermcp.start_server", text="Start MCP Server") else: layout.operator("blendermcp.stop_server", text="Stop MCP Server") layout.label(text=f"Running on port {scene.blendermcp_port}") # Operator to start the server class BLENDERMCP_OT_StartServer(bpy.types.Operator): bl_idname = "blendermcp.start_server" bl_label = "Connect to Claude" bl_description = "Start the BlenderMCP server to connect with Claude" def execute(self, context): scene = context.scene # Create a new server instance if not hasattr(bpy.types, "blendermcp_server") or not bpy.types.blendermcp_server: bpy.types.blendermcp_server = BlenderMCPServer(port=scene.blendermcp_port) # Start the server bpy.types.blendermcp_server.start() scene.blendermcp_server_running = True return {'FINISHED'} # Operator to stop the server class BLENDERMCP_OT_StopServer(bpy.types.Operator): bl_idname = "blendermcp.stop_server" bl_label = "Stop the connection to Claude" bl_description = "Stop the connection to Claude" def execute(self, context): scene = context.scene # Stop the server if it exists if hasattr(bpy.types, "blendermcp_server") and bpy.types.blendermcp_server: bpy.types.blendermcp_server.stop() del bpy.types.blendermcp_server scene.blendermcp_server_running = False return {'FINISHED'} # Registration functions def register(): bpy.types.Scene.blendermcp_port = IntProperty( name="Port", description="Port for the BlenderMCP server", default=9876, min=1024, max=65535 ) bpy.types.Scene.blendermcp_server_running = bpy.props.BoolProperty( name="Server Running", default=False ) bpy.utils.register_class(BLENDERMCP_PT_Panel) bpy.utils.register_class(BLENDERMCP_OT_StartServer) bpy.utils.register_class(BLENDERMCP_OT_StopServer) print("BlenderMCP addon registered") def unregister(): # Stop the server if it's running if hasattr(bpy.types, "blendermcp_server") and bpy.types.blendermcp_server: bpy.types.blendermcp_server.stop() del bpy.types.blendermcp_server bpy.utils.unregister_class(BLENDERMCP_PT_Panel) bpy.utils.unregister_class(BLENDERMCP_OT_StartServer) bpy.utils.unregister_class(BLENDERMCP_OT_StopServer) del bpy.types.Scene.blendermcp_port del bpy.types.Scene.blendermcp_server_running del bpy.types.Scene.blendermcp_use_polyhaven del bpy.types.Scene.blendermcp_use_hyper3d del bpy.types.Scene.blendermcp_hyper3d_mode del bpy.types.Scene.blendermcp_hyper3d_api_key print("BlenderMCP addon unregistered") if __name__ == "__main__": register()