Bonsai-mcp
by JotaDeRodriguez
Verified
# blender_mcp_server.py
from mcp.server.fastmcp import FastMCP, Context, Image
import socket
import json
import asyncio
import logging
from dataclasses import dataclass
from contextlib import asynccontextmanager
from typing import AsyncIterator, Dict, Any, List, TypedDict
import os
from pathlib import Path
import base64
from urllib.parse import urlparse
from typing import Optional
import sys
# Configure logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("BlenderMCPServer")
@dataclass
class BlenderConnection:
host: str
port: int
sock: socket.socket = None # Changed from 'socket' to 'sock' to avoid naming conflict
def connect(self) -> bool:
"""Connect to the Blender addon socket server"""
if self.sock:
return True
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((self.host, self.port))
logger.info(f"Connected to Blender at {self.host}:{self.port}")
return True
except Exception as e:
logger.error(f"Failed to connect to Blender: {str(e)}")
self.sock = None
return False
def disconnect(self):
"""Disconnect from the Blender addon"""
if self.sock:
try:
self.sock.close()
except Exception as e:
logger.error(f"Error disconnecting from Blender: {str(e)}")
finally:
self.sock = None
def receive_full_response(self, sock, buffer_size=8192):
"""Receive the complete response, potentially in multiple chunks"""
chunks = []
# Use a consistent timeout value that matches the addon's timeout
sock.settimeout(15.0) # Match the addon's timeout
try:
while True:
try:
chunk = sock.recv(buffer_size)
if not chunk:
# If we get an empty chunk, the connection might be closed
if not chunks: # If we haven't received anything yet, this is an error
raise Exception("Connection closed before receiving any data")
break
chunks.append(chunk)
# Check if we've received a complete JSON object
try:
data = b''.join(chunks)
json.loads(data.decode('utf-8'))
# If we get here, it parsed successfully
logger.info(f"Received complete response ({len(data)} bytes)")
return data
except json.JSONDecodeError:
# Incomplete JSON, continue receiving
continue
except socket.timeout:
# If we hit a timeout during receiving, break the loop and try to use what we have
logger.warning("Socket timeout during chunked receive")
break
except (ConnectionError, BrokenPipeError, ConnectionResetError) as e:
logger.error(f"Socket connection error during receive: {str(e)}")
raise # Re-raise to be handled by the caller
except socket.timeout:
logger.warning("Socket timeout during chunked receive")
except Exception as e:
logger.error(f"Error during receive: {str(e)}")
raise
# If we get here, we either timed out or broke out of the loop
# Try to use what we have
if chunks:
data = b''.join(chunks)
logger.info(f"Returning data after receive completion ({len(data)} bytes)")
try:
# Try to parse what we have
json.loads(data.decode('utf-8'))
return data
except json.JSONDecodeError:
# If we can't parse it, it's incomplete
raise Exception("Incomplete JSON response received")
else:
raise Exception("No data received")
def send_command(self, command_type: str, params: Dict[str, Any] = None) -> Dict[str, Any]:
"""Send a command to Blender and return the response"""
if not self.sock and not self.connect():
raise ConnectionError("Not connected to Blender")
command = {
"type": command_type,
"params": params or {}
}
try:
# Log the command being sent
logger.info(f"Sending command: {command_type} with params: {params}")
# Send the command
self.sock.sendall(json.dumps(command).encode('utf-8'))
logger.info(f"Command sent, waiting for response...")
# Set a timeout for receiving - use the same timeout as in receive_full_response
self.sock.settimeout(15.0) # Match the addon's timeout
# Receive the response using the improved receive_full_response method
response_data = self.receive_full_response(self.sock)
logger.info(f"Received {len(response_data)} bytes of data")
response = json.loads(response_data.decode('utf-8'))
logger.info(f"Response parsed, status: {response.get('status', 'unknown')}")
if response.get("status") == "error":
logger.error(f"Blender error: {response.get('message')}")
raise Exception(response.get("message", "Unknown error from Blender"))
return response.get("result", {})
except socket.timeout:
logger.error("Socket timeout while waiting for response from Blender")
# Don't try to reconnect here - let the get_blender_connection handle reconnection
# Just invalidate the current socket so it will be recreated next time
self.sock = None
raise Exception("Timeout waiting for Blender response - try simplifying your request")
except (ConnectionError, BrokenPipeError, ConnectionResetError) as e:
logger.error(f"Socket connection error: {str(e)}")
self.sock = None
raise Exception(f"Connection to Blender lost: {str(e)}")
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON response from Blender: {str(e)}")
# Try to log what was received
if 'response_data' in locals() and response_data:
logger.error(f"Raw response (first 200 bytes): {response_data[:200]}")
raise Exception(f"Invalid response from Blender: {str(e)}")
except Exception as e:
logger.error(f"Error communicating with Blender: {str(e)}")
# Don't try to reconnect here - let the get_blender_connection handle reconnection
self.sock = None
raise Exception(f"Communication error with Blender: {str(e)}")
@asynccontextmanager
async def server_lifespan(server: FastMCP) -> AsyncIterator[Dict[str, Any]]:
"""Manage server startup and shutdown lifecycle"""
# We don't need to create a connection here since we're using the global connection
# for resources and tools
try:
# Just log that we're starting up
logger.info("BlenderMCP server starting up")
# Try to connect to Blender on startup to verify it's available
try:
# This will initialize the global connection if needed
blender = get_blender_connection()
logger.info("Successfully connected to Blender on startup")
except Exception as e:
logger.warning(f"Could not connect to Blender on startup: {str(e)}")
logger.warning("Make sure the Blender addon is running before using Blender resources or tools")
# Return an empty context - we're using the global connection
yield {}
finally:
# Clean up the global connection on shutdown
global _blender_connection
if _blender_connection:
logger.info("Disconnecting from Blender on shutdown")
_blender_connection.disconnect()
_blender_connection = None
logger.info("BlenderMCP server shut down")
# Create the MCP server with lifespan support
mcp = FastMCP(
"BlenderMCP",
description="Blender integration through the Model Context Protocol",
lifespan=server_lifespan
)
# Resource endpoints
# Global connection for resources (since resources can't access context)
_blender_connection = None
_polyhaven_enabled = False # Add this global variable
def get_blender_connection():
"""Get or create a persistent Blender connection"""
global _blender_connection, _polyhaven_enabled # Add _polyhaven_enabled to globals
# If we have an existing connection, check if it's still valid
if _blender_connection is not None:
try:
# First check if PolyHaven is enabled by sending a ping command
result = _blender_connection.send_command("get_polyhaven_status")
# Store the PolyHaven status globally
_polyhaven_enabled = result.get("enabled", False)
return _blender_connection
except Exception as e:
# Connection is dead, close it and create a new one
logger.warning(f"Existing connection is no longer valid: {str(e)}")
try:
_blender_connection.disconnect()
except:
pass
_blender_connection = None
# Create a new connection if needed
if _blender_connection is None:
_blender_connection = BlenderConnection(host="localhost", port=9876)
if not _blender_connection.connect():
logger.error("Failed to connect to Blender")
_blender_connection = None
raise Exception("Could not connect to Blender. Make sure the Blender addon is running.")
logger.info("Created new persistent connection to Blender")
return _blender_connection
### Sequencial Thinking Server:
# Sequential Thinking Tool
class ThoughtData(TypedDict, total=False):
thought: str
thoughtNumber: int
totalThoughts: int
nextThoughtNeeded: bool
isRevision: Optional[bool]
revisesThought: Optional[int]
branchFromThought: Optional[int]
branchId: Optional[str]
needsMoreThoughts: Optional[bool]
class SequentialThinkingServer:
def __init__(self):
self.thought_history = []
self.branches = {}
def validate_thought_data(self, data: Dict[str, Any]) -> ThoughtData:
if not isinstance(data.get('thought'), str):
raise ValueError('Invalid thought: must be a string')
if not isinstance(data.get('thoughtNumber'), int):
raise ValueError('Invalid thoughtNumber: must be a number')
if not isinstance(data.get('totalThoughts'), int):
raise ValueError('Invalid totalThoughts: must be a number')
if not isinstance(data.get('nextThoughtNeeded'), bool):
raise ValueError('Invalid nextThoughtNeeded: must be a boolean')
return {
'thought': data['thought'],
'thoughtNumber': data['thoughtNumber'],
'totalThoughts': data['totalThoughts'],
'nextThoughtNeeded': data['nextThoughtNeeded'],
'isRevision': data.get('isRevision'),
'revisesThought': data.get('revisesThought'),
'branchFromThought': data.get('branchFromThought'),
'branchId': data.get('branchId'),
'needsMoreThoughts': data.get('needsMoreThoughts')
}
def format_thought(self, thought_data: ThoughtData) -> str:
"""Format a thought with colored borders and context"""
thought_num = thought_data['thoughtNumber']
total = thought_data['totalThoughts']
thought = thought_data['thought']
is_revision = thought_data.get('isRevision', False)
revises = thought_data.get('revisesThought')
branch_from = thought_data.get('branchFromThought')
branch_id = thought_data.get('branchId')
# Create appropriate prefix and context
if is_revision:
prefix = "🔄 Revision"
context = f" (revising thought {revises})"
elif branch_from:
prefix = "🌿 Branch"
context = f" (from thought {branch_from}, ID: {branch_id})"
else:
prefix = "💭 Thought"
context = ""
header = f"{prefix} {thought_num}/{total}{context}"
border_len = max(len(header), len(thought)) + 4
border = "─" * border_len
# Build the formatted output
output = f"\n┌{border}┐\n"
output += f"│ {header.ljust(border_len)} │\n"
output += f"├{border}┤\n"
output += f"│ {thought.ljust(border_len)} │\n"
output += f"└{border}┘"
return output
def process_thought(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
"""Process a thought and return the response"""
try:
validated_input = self.validate_thought_data(input_data)
if validated_input['thoughtNumber'] > validated_input['totalThoughts']:
validated_input['totalThoughts'] = validated_input['thoughtNumber']
self.thought_history.append(validated_input)
# Track branches if applicable
if validated_input.get('branchFromThought') and validated_input.get('branchId'):
branch_id = validated_input['branchId']
if branch_id not in self.branches:
self.branches[branch_id] = []
self.branches[branch_id].append(validated_input)
# Format and log the thought
formatted_thought = self.format_thought(validated_input)
print(formatted_thought, file=sys.stderr)
# Return response
return {
'thoughtNumber': validated_input['thoughtNumber'],
'totalThoughts': validated_input['totalThoughts'],
'nextThoughtNeeded': validated_input['nextThoughtNeeded'],
'branches': list(self.branches.keys()),
'thoughtHistoryLength': len(self.thought_history)
}
except Exception as e:
return {
'error': str(e),
'status': 'failed'
}
# Create a single instance of the sequential thinking server
thinking_server = SequentialThinkingServer()
@mcp.tool()
def execute_blender_code(ctx: Context, code: str) -> str:
"""
Execute arbitrary Python code in Blender.
Parameters:
- code: The Python code to execute
"""
try:
# Get the global connection
blender = get_blender_connection()
result = blender.send_command("execute_code", {"code": code})
return f"Code executed successfully: {result.get('result', '')}"
except Exception as e:
logger.error(f"Error executing code: {str(e)}")
return f"Error executing code: {str(e)}"
### IFC Tools
@mcp.tool()
def get_ifc_project_info() -> str:
"""
Get basic information about the IFC project, including name, description,
and counts of different entity types.
Returns:
A JSON-formatted string with project information
"""
try:
blender = get_blender_connection()
result = blender.send_command("get_ifc_project_info")
# Return the formatted JSON of the results
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error getting IFC project info: {str(e)}")
return f"Error getting IFC project info: {str(e)}"
@mcp.tool()
def get_selected_ifc_entities() -> str:
"""
Get IFC entities corresponding to the currently selected objects in Blender.
This allows working specifically with objects the user has manually selected in the Blender UI.
Returns:
A JSON-formatted string with information about the selected IFC entities
"""
try:
blender = get_blender_connection()
result = blender.send_command("get_selected_ifc_entities")
# Return the formatted JSON of the results
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error getting selected IFC entities: {str(e)}")
return f"Error getting selected IFC entities: {str(e)}"
# Modify the existing list_ifc_entities function to accept a selected_only parameter
@mcp.tool()
def list_ifc_entities(entity_type: str = None, limit: int = 50, selected_only: bool = False) -> str:
"""
List IFC entities of a specific type. Can be filtered to only include objects
currently selected in the Blender UI.
Args:
entity_type: Type of IFC entity to list (e.g., "IfcWall")
limit: Maximum number of entities to return
selected_only: If True, only return information about selected objects
Returns:
A JSON-formatted string listing the specified entities
"""
try:
blender = get_blender_connection()
result = blender.send_command("list_ifc_entities", {
"entity_type": entity_type,
"limit": limit,
"selected_only": selected_only
})
# Return the formatted JSON of the results
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error listing IFC entities: {str(e)}")
return f"Error listing IFC entities: {str(e)}"
# Modify the existing get_ifc_properties function to accept a selected_only parameter
@mcp.tool()
def get_ifc_properties(global_id: str = None, selected_only: bool = False) -> str:
"""
Get properties of IFC entities. Can be used to get properties of a specific entity by GlobalId,
or to get properties of all currently selected objects in Blender.
Args:
global_id: GlobalId of a specific IFC entity (optional if selected_only is True)
selected_only: If True, return properties for all selected objects instead of a specific entity
Returns:
A JSON-formatted string with entity information and properties
"""
try:
blender = get_blender_connection()
# Validate parameters
if not global_id and not selected_only:
return json.dumps({"error": "Either global_id or selected_only must be specified"}, indent=2)
result = blender.send_command("get_ifc_properties", {
"global_id": global_id,
"selected_only": selected_only
})
# Return the formatted JSON of the results
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error getting IFC properties: {str(e)}")
return f"Error getting IFC properties: {str(e)}"
@mcp.tool()
def get_ifc_spatial_structure() -> str:
"""
Get the spatial structure of the IFC model (site, building, storey, space hierarchy).
Returns:
A JSON-formatted string representing the hierarchical structure of the IFC model
"""
try:
blender = get_blender_connection()
result = blender.send_command("get_ifc_spatial_structure")
# Return the formatted JSON of the results
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error getting IFC spatial structure: {str(e)}")
return f"Error getting IFC spatial structure: {str(e)}"
@mcp.tool()
def get_ifc_relationships(global_id: str) -> str:
"""
Get all relationships for a specific IFC entity.
Args:
global_id: GlobalId of the IFC entity
Returns:
A JSON-formatted string with all relationships the entity participates in
"""
try:
blender = get_blender_connection()
result = blender.send_command("get_ifc_relationships", {
"global_id": global_id
})
# Return the formatted JSON of the results
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error getting IFC relationships: {str(e)}")
return f"Error getting IFC relationships: {str(e)}"
@mcp.tool()
def get_user_view() -> Image:
"""
Capture and return the current Blender viewport as an image.
Shows what the user is currently seeing in Blender.
Focus mostly on the 3D viewport. Use the UI to assist in your understanding of the scene but only refer to it if specifically prompted.
Returns:
An image of the current Blender viewport
"""
try:
# Get the global connection
blender = get_blender_connection()
# Request current view
result = blender.send_command("get_current_view")
if "error" in result:
raise Exception(f"Error getting current view: {result.get('error', 'Unknown error')}")
if "data" not in result:
raise Exception("No image data returned from Blender")
# Decode the base64 image data
image_data = base64.b64decode(result["data"])
# Return as an Image object
return Image(data=image_data, format="png")
except Exception as e:
logger.error(f"Error getting current view: {str(e)}")
raise Exception(f"Error getting current view: {str(e)}")
# WIP, not ready to be implemented:
# @mcp.tool()
# def create_plan_view(height_offset: float = 0.5, view_type: str = "top",
# resolution_x: int = 400, resolution_y: int = 400,
# output_path: str = None) -> Image:
# """
# Create a plan view (top-down view) at the specified height above the first building story.
# Args:
# height_offset: Height in meters above the building story (default 0.5m)
# view_type: Type of view - "top", "front", "right", "left" (note: only "top" is fully implemented)
# resolution_x: Horizontal resolution of the render in pixels - Keep it small, max 800 x 800, recomended 400 x 400
# resolution_y: Vertical resolution of the render in pixels
# output_path: Optional path to save the rendered image
# Returns:
# A rendered image showing the plan view of the model
# """
# try:
# # Get the global connection
# blender = get_blender_connection()
# # Request an orthographic render
# result = blender.send_command("create_orthographic_render", {
# "view_type": view_type,
# "height_offset": height_offset,
# "resolution_x": resolution_x,
# "resolution_y": resolution_y,
# "output_path": output_path # Can be None to use a temporary file
# })
# if "error" in result:
# raise Exception(f"Error creating plan view: {result.get('error', 'Unknown error')}")
# if "data" not in result:
# raise Exception("No image data returned from Blender")
# # Decode the base64 image data
# image_data = base64.b64decode(result["data"])
# # Return as an Image object
# return Image(data=image_data, format="png")
# except Exception as e:
# logger.error(f"Error creating plan view: {str(e)}")
# raise Exception(f"Error creating plan view: {str(e)}")
@mcp.tool()
def sequentialthinking(
thought: str,
thoughtNumber: int,
totalThoughts: int,
nextThoughtNeeded: bool,
isRevision: Optional[bool] = None,
revisesThought: Optional[int] = None,
branchFromThought: Optional[int] = None,
branchId: Optional[str] = None,
needsMoreThoughts: Optional[bool] = None
) -> str:
"""A detailed tool for dynamic and reflective problem-solving through thoughts.
This tool helps analyze problems through a flexible thinking process that can adapt and evolve.
Each thought can build on, question, or revise previous insights as understanding deepens.
When to use this tool:
- Breaking down complex problems into steps
- Planning and design with room for revision
- Analysis that might need course correction
- Problems where the full scope might not be clear initially
- Problems that require a multi-step solution
- Tasks that need to maintain context over multiple steps
- Situations where irrelevant information needs to be filtered out
Args:
thought: Your current thinking step
thoughtNumber: Current number in sequence (can go beyond initial total if needed)
totalThoughts: Current estimate of thoughts needed (can be adjusted up/down)
nextThoughtNeeded: Whether another thought step is needed
isRevision: Whether this revises previous thinking
revisesThought: Which thought is being reconsidered
branchFromThought: Branching point thought number
branchId: Branch identifier
needsMoreThoughts: If more thoughts are needed
"""
input_data = {
'thought': thought,
'thoughtNumber': thoughtNumber,
'totalThoughts': totalThoughts,
'nextThoughtNeeded': nextThoughtNeeded
}
# Add optional parameters if provided
if isRevision is not None:
input_data['isRevision'] = isRevision
if revisesThought is not None:
input_data['revisesThought'] = revisesThought
if branchFromThought is not None:
input_data['branchFromThought'] = branchFromThought
if branchId is not None:
input_data['branchId'] = branchId
if needsMoreThoughts is not None:
input_data['needsMoreThoughts'] = needsMoreThoughts
response = thinking_server.process_thought(input_data)
return json.dumps(response, indent=2)
# Main execution
def main():
"""Run the MCP server"""
mcp.run()
if __name__ == "__main__":
main()