BlenderMCP

by ahujasid
Verified
# blender_mcp_server.py from mcp.server.fastmcp import FastMCP, Context, Image import socket import json import asyncio import logging from dataclasses import dataclass from contextlib import asynccontextmanager from typing import AsyncIterator, Dict, Any, List # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger("BlenderMCPServer") @dataclass class BlenderConnection: host: str port: int sock: socket.socket = None # Changed from 'socket' to 'sock' to avoid naming conflict def connect(self) -> bool: """Connect to the Blender addon socket server""" if self.sock: return True try: self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.connect((self.host, self.port)) logger.info(f"Connected to Blender at {self.host}:{self.port}") return True except Exception as e: logger.error(f"Failed to connect to Blender: {str(e)}") self.sock = None return False def disconnect(self): """Disconnect from the Blender addon""" if self.sock: try: self.sock.close() except Exception as e: logger.error(f"Error disconnecting from Blender: {str(e)}") finally: self.sock = None def receive_full_response(self, sock, buffer_size=8192): """Receive the complete response, potentially in multiple chunks""" chunks = [] # Use a consistent timeout value that matches the addon's timeout sock.settimeout(15.0) # Match the addon's timeout try: while True: try: chunk = sock.recv(buffer_size) if not chunk: # If we get an empty chunk, the connection might be closed if not chunks: # If we haven't received anything yet, this is an error raise Exception("Connection closed before receiving any data") break chunks.append(chunk) # Check if we've received a complete JSON object try: data = b''.join(chunks) json.loads(data.decode('utf-8')) # If we get here, it parsed successfully logger.info(f"Received complete response ({len(data)} bytes)") return data except json.JSONDecodeError: # Incomplete JSON, continue receiving continue except socket.timeout: # If we hit a timeout during receiving, break the loop and try to use what we have logger.warning("Socket timeout during chunked receive") break except (ConnectionError, BrokenPipeError, ConnectionResetError) as e: logger.error(f"Socket connection error during receive: {str(e)}") raise # Re-raise to be handled by the caller except socket.timeout: logger.warning("Socket timeout during chunked receive") except Exception as e: logger.error(f"Error during receive: {str(e)}") raise # If we get here, we either timed out or broke out of the loop # Try to use what we have if chunks: data = b''.join(chunks) logger.info(f"Returning data after receive completion ({len(data)} bytes)") try: # Try to parse what we have json.loads(data.decode('utf-8')) return data except json.JSONDecodeError: # If we can't parse it, it's incomplete raise Exception("Incomplete JSON response received") else: raise Exception("No data received") def send_command(self, command_type: str, params: Dict[str, Any] = None) -> Dict[str, Any]: """Send a command to Blender and return the response""" if not self.sock and not self.connect(): raise ConnectionError("Not connected to Blender") command = { "type": command_type, "params": params or {} } try: # Log the command being sent logger.info(f"Sending command: {command_type} with params: {params}") # Send the command self.sock.sendall(json.dumps(command).encode('utf-8')) logger.info(f"Command sent, waiting for response...") # Set a timeout for receiving - use the same timeout as in receive_full_response self.sock.settimeout(15.0) # Match the addon's timeout # Receive the response using the improved receive_full_response method response_data = self.receive_full_response(self.sock) logger.info(f"Received {len(response_data)} bytes of data") response = json.loads(response_data.decode('utf-8')) logger.info(f"Response parsed, status: {response.get('status', 'unknown')}") if response.get("status") == "error": logger.error(f"Blender error: {response.get('message')}") raise Exception(response.get("message", "Unknown error from Blender")) return response.get("result", {}) except socket.timeout: logger.error("Socket timeout while waiting for response from Blender") # Don't try to reconnect here - let the get_blender_connection handle reconnection # Just invalidate the current socket so it will be recreated next time self.sock = None raise Exception("Timeout waiting for Blender response - try simplifying your request") except (ConnectionError, BrokenPipeError, ConnectionResetError) as e: logger.error(f"Socket connection error: {str(e)}") self.sock = None raise Exception(f"Connection to Blender lost: {str(e)}") except json.JSONDecodeError as e: logger.error(f"Invalid JSON response from Blender: {str(e)}") # Try to log what was received if 'response_data' in locals() and response_data: logger.error(f"Raw response (first 200 bytes): {response_data[:200]}") raise Exception(f"Invalid response from Blender: {str(e)}") except Exception as e: logger.error(f"Error communicating with Blender: {str(e)}") # Don't try to reconnect here - let the get_blender_connection handle reconnection self.sock = None raise Exception(f"Communication error with Blender: {str(e)}") @asynccontextmanager async def server_lifespan(server: FastMCP) -> AsyncIterator[Dict[str, Any]]: """Manage server startup and shutdown lifecycle""" # We don't need to create a connection here since we're using the global connection # for resources and tools try: # Just log that we're starting up logger.info("BlenderMCP server starting up") # Try to connect to Blender on startup to verify it's available try: # This will initialize the global connection if needed blender = get_blender_connection() logger.info("Successfully connected to Blender on startup") except Exception as e: logger.warning(f"Could not connect to Blender on startup: {str(e)}") logger.warning("Make sure the Blender addon is running before using Blender resources or tools") # Return an empty context - we're using the global connection yield {} finally: # Clean up the global connection on shutdown global _blender_connection if _blender_connection: logger.info("Disconnecting from Blender on shutdown") _blender_connection.disconnect() _blender_connection = None logger.info("BlenderMCP server shut down") # Create the MCP server with lifespan support mcp = FastMCP( "BlenderMCP", description="Blender integration through the Model Context Protocol", lifespan=server_lifespan ) # Resource endpoints # Global connection for resources (since resources can't access context) _blender_connection = None _polyhaven_enabled = False # Add this global variable def get_blender_connection(): """Get or create a persistent Blender connection""" global _blender_connection, _polyhaven_enabled # Add _polyhaven_enabled to globals # If we have an existing connection, check if it's still valid if _blender_connection is not None: try: # First check if PolyHaven is enabled by sending a ping command result = _blender_connection.send_command("get_polyhaven_status") # Store the PolyHaven status globally _polyhaven_enabled = result.get("enabled", False) return _blender_connection except Exception as e: # Connection is dead, close it and create a new one logger.warning(f"Existing connection is no longer valid: {str(e)}") try: _blender_connection.disconnect() except: pass _blender_connection = None # Create a new connection if needed if _blender_connection is None: _blender_connection = BlenderConnection(host="localhost", port=9876) if not _blender_connection.connect(): logger.error("Failed to connect to Blender") _blender_connection = None raise Exception("Could not connect to Blender. Make sure the Blender addon is running.") logger.info("Created new persistent connection to Blender") return _blender_connection @mcp.tool() def get_scene_info(ctx: Context) -> str: """Get detailed information about the current Blender scene""" try: blender = get_blender_connection() result = blender.send_command("get_scene_info") # Just return the JSON representation of what Blender sent us return json.dumps(result, indent=2) except Exception as e: logger.error(f"Error getting scene info from Blender: {str(e)}") return f"Error getting scene info: {str(e)}" @mcp.tool() def get_object_info(ctx: Context, object_name: str) -> str: """ Get detailed information about a specific object in the Blender scene. Parameters: - object_name: The name of the object to get information about """ try: blender = get_blender_connection() result = blender.send_command("get_object_info", {"name": object_name}) # Just return the JSON representation of what Blender sent us return json.dumps(result, indent=2) except Exception as e: logger.error(f"Error getting object info from Blender: {str(e)}") return f"Error getting object info: {str(e)}" @mcp.tool() def create_object( ctx: Context, type: str = "CUBE", name: str = None, location: List[float] = None, rotation: List[float] = None, scale: List[float] = None, # Torus-specific parameters align: str = "WORLD", major_segments: int = 48, minor_segments: int = 12, mode: str = "MAJOR_MINOR", major_radius: float = 1.0, minor_radius: float = 0.25, abso_major_rad: float = 1.25, abso_minor_rad: float = 0.75, generate_uvs: bool = True ) -> str: """ Create a new object in the Blender scene. Parameters: - type: Object type (CUBE, SPHERE, CYLINDER, PLANE, CONE, TORUS, EMPTY, CAMERA, LIGHT) - name: Optional name for the object - location: Optional [x, y, z] location coordinates - rotation: Optional [x, y, z] rotation in radians - scale: Optional [x, y, z] scale factors (not used for TORUS) Torus-specific parameters (only used when type == "TORUS"): - align: How to align the torus ('WORLD', 'VIEW', or 'CURSOR') - major_segments: Number of segments for the main ring - minor_segments: Number of segments for the cross-section - mode: Dimension mode ('MAJOR_MINOR' or 'EXT_INT') - major_radius: Radius from the origin to the center of the cross sections - minor_radius: Radius of the torus' cross section - abso_major_rad: Total exterior radius of the torus - abso_minor_rad: Total interior radius of the torus - generate_uvs: Whether to generate a default UV map Returns: A message indicating the created object name. """ try: # Get the global connection blender = get_blender_connection() # Set default values for missing parameters loc = location or [0, 0, 0] rot = rotation or [0, 0, 0] sc = scale or [1, 1, 1] params = { "type": type, "location": loc, "rotation": rot, } if name: params["name"] = name if type == "TORUS": # For torus, the scale is not used. params.update({ "align": align, "major_segments": major_segments, "minor_segments": minor_segments, "mode": mode, "major_radius": major_radius, "minor_radius": minor_radius, "abso_major_rad": abso_major_rad, "abso_minor_rad": abso_minor_rad, "generate_uvs": generate_uvs }) result = blender.send_command("create_object", params) return f"Created {type} object: {result['name']}" else: # For non-torus objects, include scale params["scale"] = sc result = blender.send_command("create_object", params) return f"Created {type} object: {result['name']}" except Exception as e: logger.error(f"Error creating object: {str(e)}") return f"Error creating object: {str(e)}" @mcp.tool() def modify_object( ctx: Context, name: str, location: List[float] = None, rotation: List[float] = None, scale: List[float] = None, visible: bool = None ) -> str: """ Modify an existing object in the Blender scene. Parameters: - name: Name of the object to modify - location: Optional [x, y, z] location coordinates - rotation: Optional [x, y, z] rotation in radians - scale: Optional [x, y, z] scale factors - visible: Optional boolean to set visibility """ try: # Get the global connection blender = get_blender_connection() params = {"name": name} if location is not None: params["location"] = location if rotation is not None: params["rotation"] = rotation if scale is not None: params["scale"] = scale if visible is not None: params["visible"] = visible result = blender.send_command("modify_object", params) return f"Modified object: {result['name']}" except Exception as e: logger.error(f"Error modifying object: {str(e)}") return f"Error modifying object: {str(e)}" @mcp.tool() def delete_object(ctx: Context, name: str) -> str: """ Delete an object from the Blender scene. Parameters: - name: Name of the object to delete """ try: # Get the global connection blender = get_blender_connection() result = blender.send_command("delete_object", {"name": name}) return f"Deleted object: {name}" except Exception as e: logger.error(f"Error deleting object: {str(e)}") return f"Error deleting object: {str(e)}" @mcp.tool() def set_material( ctx: Context, object_name: str, material_name: str = None, color: List[float] = None ) -> str: """ Set or create a material for an object. Parameters: - object_name: Name of the object to apply the material to - material_name: Optional name of the material to use or create - color: Optional [R, G, B] color values (0.0-1.0) """ try: # Get the global connection blender = get_blender_connection() params = {"object_name": object_name} if material_name: params["material_name"] = material_name if color: params["color"] = color result = blender.send_command("set_material", params) return f"Applied material to {object_name}: {result.get('material_name', 'unknown')}" except Exception as e: logger.error(f"Error setting material: {str(e)}") return f"Error setting material: {str(e)}" @mcp.tool() def execute_blender_code(ctx: Context, code: str) -> str: """ Execute arbitrary Python code in Blender. Parameters: - code: The Python code to execute """ try: # Get the global connection blender = get_blender_connection() result = blender.send_command("execute_code", {"code": code}) return f"Code executed successfully: {result.get('result', '')}" except Exception as e: logger.error(f"Error executing code: {str(e)}") return f"Error executing code: {str(e)}" @mcp.tool() def get_polyhaven_categories(ctx: Context, asset_type: str = "hdris") -> str: """ Get a list of categories for a specific asset type on Polyhaven. Parameters: - asset_type: The type of asset to get categories for (hdris, textures, models, all) """ try: blender = get_blender_connection() if not _polyhaven_enabled: return "PolyHaven integration is disabled. Select it in the sidebar in BlenderMCP, then run it again." result = blender.send_command("get_polyhaven_categories", {"asset_type": asset_type}) if "error" in result: return f"Error: {result['error']}" # Format the categories in a more readable way categories = result["categories"] formatted_output = f"Categories for {asset_type}:\n\n" # Sort categories by count (descending) sorted_categories = sorted(categories.items(), key=lambda x: x[1], reverse=True) for category, count in sorted_categories: formatted_output += f"- {category}: {count} assets\n" return formatted_output except Exception as e: logger.error(f"Error getting Polyhaven categories: {str(e)}") return f"Error getting Polyhaven categories: {str(e)}" @mcp.tool() def search_polyhaven_assets( ctx: Context, asset_type: str = "all", categories: str = None ) -> str: """ Search for assets on Polyhaven with optional filtering. Parameters: - asset_type: Type of assets to search for (hdris, textures, models, all) - categories: Optional comma-separated list of categories to filter by Returns a list of matching assets with basic information. """ try: blender = get_blender_connection() result = blender.send_command("search_polyhaven_assets", { "asset_type": asset_type, "categories": categories }) if "error" in result: return f"Error: {result['error']}" # Format the assets in a more readable way assets = result["assets"] total_count = result["total_count"] returned_count = result["returned_count"] formatted_output = f"Found {total_count} assets" if categories: formatted_output += f" in categories: {categories}" formatted_output += f"\nShowing {returned_count} assets:\n\n" # Sort assets by download count (popularity) sorted_assets = sorted(assets.items(), key=lambda x: x[1].get("download_count", 0), reverse=True) for asset_id, asset_data in sorted_assets: formatted_output += f"- {asset_data.get('name', asset_id)} (ID: {asset_id})\n" formatted_output += f" Type: {['HDRI', 'Texture', 'Model'][asset_data.get('type', 0)]}\n" formatted_output += f" Categories: {', '.join(asset_data.get('categories', []))}\n" formatted_output += f" Downloads: {asset_data.get('download_count', 'Unknown')}\n\n" return formatted_output except Exception as e: logger.error(f"Error searching Polyhaven assets: {str(e)}") return f"Error searching Polyhaven assets: {str(e)}" @mcp.tool() def download_polyhaven_asset( ctx: Context, asset_id: str, asset_type: str, resolution: str = "1k", file_format: str = None ) -> str: """ Download and import a Polyhaven asset into Blender. Parameters: - asset_id: The ID of the asset to download - asset_type: The type of asset (hdris, textures, models) - resolution: The resolution to download (e.g., 1k, 2k, 4k) - file_format: Optional file format (e.g., hdr, exr for HDRIs; jpg, png for textures; gltf, fbx for models) Returns a message indicating success or failure. """ try: blender = get_blender_connection() result = blender.send_command("download_polyhaven_asset", { "asset_id": asset_id, "asset_type": asset_type, "resolution": resolution, "file_format": file_format }) if "error" in result: return f"Error: {result['error']}" if result.get("success"): message = result.get("message", "Asset downloaded and imported successfully") # Add additional information based on asset type if asset_type == "hdris": return f"{message}. The HDRI has been set as the world environment." elif asset_type == "textures": material_name = result.get("material", "") maps = ", ".join(result.get("maps", [])) return f"{message}. Created material '{material_name}' with maps: {maps}." elif asset_type == "models": return f"{message}. The model has been imported into the current scene." else: return message else: return f"Failed to download asset: {result.get('message', 'Unknown error')}" except Exception as e: logger.error(f"Error downloading Polyhaven asset: {str(e)}") return f"Error downloading Polyhaven asset: {str(e)}" @mcp.tool() def set_texture( ctx: Context, object_name: str, texture_id: str ) -> str: """ Apply a previously downloaded Polyhaven texture to an object. Parameters: - object_name: Name of the object to apply the texture to - texture_id: ID of the Polyhaven texture to apply (must be downloaded first) Returns a message indicating success or failure. """ try: # Get the global connection blender = get_blender_connection() result = blender.send_command("set_texture", { "object_name": object_name, "texture_id": texture_id }) if "error" in result: return f"Error: {result['error']}" if result.get("success"): material_name = result.get("material", "") maps = ", ".join(result.get("maps", [])) # Add detailed material info material_info = result.get("material_info", {}) node_count = material_info.get("node_count", 0) has_nodes = material_info.get("has_nodes", False) texture_nodes = material_info.get("texture_nodes", []) output = f"Successfully applied texture '{texture_id}' to {object_name}.\n" output += f"Using material '{material_name}' with maps: {maps}.\n\n" output += f"Material has nodes: {has_nodes}\n" output += f"Total node count: {node_count}\n\n" if texture_nodes: output += "Texture nodes:\n" for node in texture_nodes: output += f"- {node['name']} using image: {node['image']}\n" if node['connections']: output += " Connections:\n" for conn in node['connections']: output += f" {conn}\n" else: output += "No texture nodes found in the material.\n" return output else: return f"Failed to apply texture: {result.get('message', 'Unknown error')}" except Exception as e: logger.error(f"Error applying texture: {str(e)}") return f"Error applying texture: {str(e)}" @mcp.tool() def get_polyhaven_status(ctx: Context) -> str: """ Check if PolyHaven integration is enabled in Blender. Returns a message indicating whether PolyHaven features are available. """ try: blender = get_blender_connection() result = blender.send_command("get_polyhaven_status") enabled = result.get("enabled", False) message = result.get("message", "") return message except Exception as e: logger.error(f"Error checking PolyHaven status: {str(e)}") return f"Error checking PolyHaven status: {str(e)}" @mcp.prompt() def asset_creation_strategy() -> str: """Defines the preferred strategy for creating assets in Blender""" return """When creating 3D content in Blender, always start by checking if PolyHaven is available: 0. Before anything, always check the scene from get_scene_info() 1. First use get_polyhaven_status() to verify if PolyHaven integration is enabled. 2. If PolyHaven is enabled: - For objects/models: Use download_polyhaven_asset() with asset_type="models" - For materials/textures: Use download_polyhaven_asset() with asset_type="textures" - For environment lighting: Use download_polyhaven_asset() with asset_type="hdris" 3. If PolyHaven is disabled or when falling back to basic tools: - create_object() for basic primitives (CUBE, SPHERE, CYLINDER, etc.) - set_material() for basic colors and materials Only fall back to basic creation tools when: - PolyHaven is disabled - A simple primitive is explicitly requested - No suitable PolyHaven asset exists - The task specifically requires a basic material/color """ # Main execution def main(): """Run the MCP server""" mcp.run() if __name__ == "__main__": main()