# blender_mcp_server.py
from fastmcp import FastMCP, Context
from mcp.types import ImageContent
import socket
import json
import asyncio
import logging
import tempfile
import contextlib
from dataclasses import dataclass, field
from contextlib import asynccontextmanager
from typing import AsyncIterator, Dict, Any, List
import os
from pathlib import Path
import base64
import threading
import time
from urllib.parse import urlparse
# Import telemetry
from .telemetry import record_startup, get_telemetry
from .telemetry_decorator import telemetry_tool
from .dsl import DSL_VERSION, DslValidationError, validate_ops_request
from .tool_profiles import IntegrationsEnabled, ToolProfile, parse_tool_profile, select_tool_names
# Configure logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("BlenderMCPServer")
# Default configuration
DEFAULT_HOST = "localhost"
DEFAULT_PORT = 9876
DEFAULT_MAX_MESSAGE_BYTES = 10 * 1024 * 1024 # 10MB
AUTH_TOKEN_ENV_VAR = "BLENDER_AUTH_TOKEN"
@dataclass
class BlenderConnection:
host: str
port: int
sock: socket.socket | None = None # Changed from 'socket' to 'sock' to avoid naming conflict
auth_token: str | None = None
_recv_buffer: bytearray | None = None
_send_lock: threading.Lock = field(default_factory=threading.Lock, init=False, repr=False)
def connect(self) -> bool:
"""Connect to the Blender addon socket server"""
if self.sock:
return True
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((self.host, self.port))
self._recv_buffer = bytearray()
logger.info(f"Connected to Blender at {self.host}:{self.port}")
return True
except Exception as e:
logger.error(f"Failed to connect to Blender: {str(e)}")
self.sock = None
self._recv_buffer = None
return False
def disconnect(self):
"""Disconnect from the Blender addon"""
if self.sock:
try:
self.sock.close()
except Exception as e:
logger.error(f"Error disconnecting from Blender: {str(e)}")
finally:
self.sock = None
self._recv_buffer = None
def _send_json_line(self, payload: Dict[str, Any]):
if not self.sock:
raise ConnectionError("Not connected to Blender")
data = (json.dumps(payload, separators=(",", ":")) + "\n").encode("utf-8")
self.sock.sendall(data)
def _recv_json_line(self, sock: socket.socket, buffer_size: int = 8192, max_bytes: int = DEFAULT_MAX_MESSAGE_BYTES) -> bytes:
if self._recv_buffer is None:
self._recv_buffer = bytearray()
while True:
nl = self._recv_buffer.find(b"\n")
if nl != -1:
line = bytes(self._recv_buffer[:nl])
del self._recv_buffer[: nl + 1]
return line
if len(self._recv_buffer) > max_bytes:
raise Exception(f"Response exceeded max size ({max_bytes} bytes)")
chunk = sock.recv(buffer_size)
if not chunk:
raise Exception("Connection closed while waiting for response")
self._recv_buffer.extend(chunk)
def receive_full_response(self, sock, buffer_size=8192):
"""Receive a single newline-delimited JSON response"""
sock.settimeout(180.0)
line = self._recv_json_line(sock, buffer_size=buffer_size)
logger.info(f"Received complete response line ({len(line)} bytes)")
return line
def send_command(self, command_type: str, params: Dict[str, Any] = None) -> Dict[str, Any]:
"""Send a command to Blender and return the response"""
with self._send_lock:
if not self.sock and not self.connect():
raise ConnectionError("Not connected to Blender")
command = {
"type": command_type,
"params": params or {},
"auth_token": self.auth_token,
}
try:
# Log the command being sent
logger.info(f"Sending command: {command_type} with params: {params}")
# Send the command
self._send_json_line(command)
logger.info("Command sent, waiting for response...")
# Set a timeout for receiving - use the same timeout as in receive_full_response
self.sock.settimeout(180.0) # Match the addon's timeout
# Receive the response using the improved receive_full_response method
response_data = self.receive_full_response(self.sock)
logger.info(f"Received {len(response_data)} bytes of data")
response = json.loads(response_data.decode("utf-8"))
logger.info(f"Response parsed, status: {response.get('status', 'unknown')}")
if response.get("status") == "error":
logger.error(f"Blender error: {response.get('message')}")
raise Exception(response.get("message", "Unknown error from Blender"))
return response.get("result", {})
except socket.timeout:
logger.error("Socket timeout while waiting for response from Blender")
# Don't try to reconnect here - let the get_blender_connection handle reconnection
# Just invalidate the current socket so it will be recreated next time
self.sock = None
self._recv_buffer = None
raise Exception("Timeout waiting for Blender response - try simplifying your request")
except (ConnectionError, BrokenPipeError, ConnectionResetError) as e:
logger.error(f"Socket connection error: {str(e)}")
self.sock = None
self._recv_buffer = None
raise Exception(f"Connection to Blender lost: {str(e)}")
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON response from Blender: {str(e)}")
# Try to log what was received
if "response_data" in locals() and response_data:
logger.error(f"Raw response (first 200 bytes): {response_data[:200]}")
raise Exception(f"Invalid response from Blender: {str(e)}")
except Exception as e:
logger.error(f"Error communicating with Blender: {str(e)}")
# Don't try to reconnect here - let the get_blender_connection handle reconnection
self.sock = None
self._recv_buffer = None
raise Exception(f"Communication error with Blender: {str(e)}")
@asynccontextmanager
async def server_lifespan(server: FastMCP) -> AsyncIterator[Dict[str, Any]]:
"""Manage server startup and shutdown lifecycle"""
# We don't need to create a connection here since we're using the global connection
# for resources and tools
try:
# Just log that we're starting up
logger.info("BlenderMCP server starting up")
# Record startup event for telemetry
try:
record_startup()
except Exception as e:
logger.debug(f"Failed to record startup telemetry: {e}")
# Try to connect to Blender on startup to verify it's available
try:
# This will initialize the global connection if needed
blender = get_blender_connection()
logger.info("Successfully connected to Blender on startup")
except Exception as e:
logger.warning(f"Could not connect to Blender on startup: {str(e)}")
logger.warning("Make sure the Blender addon is running before using Blender resources or tools")
# Return an empty context - we're using the global connection
yield {}
finally:
# Clean up the global connection on shutdown
global _blender_connection
with _blender_connection_lock:
if _blender_connection:
logger.info("Disconnecting from Blender on shutdown")
_blender_connection.disconnect()
_blender_connection = None
logger.info("BlenderMCP server shut down")
# Create the MCP server with lifespan support
mcp = FastMCP(
name="BlenderMCP",
lifespan=server_lifespan
)
# Register documentation-style resources (read-only help + examples).
from .usage_resources import register_usage_resources
register_usage_resources(mcp)
# We will register tools programmatically after we know which profile is active.
# Disable decorator-time registration by turning `@mcp.tool` into a no-op decorator,
# while retaining access to the original FastMCP registration method.
_FAST_MCP_REGISTER_TOOL = mcp.tool
def _noop_tool_decorator(func=None, *args, **kwargs):
if func is None:
def _decorator(f):
return f
return _decorator
return func
# `FastMCP.tool` is used as a decorator across this file. Overriding it ensures we
# don't accidentally register the full tool surface at import time.
mcp.tool = _noop_tool_decorator # type: ignore[assignment]
def _try_get_enabled_integrations() -> IntegrationsEnabled:
"""
Best-effort: attempt to connect to Blender and query add-on capabilities.
If Blender is unavailable at startup, return all-false.
"""
try:
host = os.getenv("BLENDER_HOST", DEFAULT_HOST)
port = int(os.getenv("BLENDER_PORT", DEFAULT_PORT))
auth_token = os.getenv(AUTH_TOKEN_ENV_VAR) or None
# Use a short timeout to avoid hanging module import (tests often import this module).
sock = socket.create_connection((host, port), timeout=0.25)
try:
sock.settimeout(0.5)
payload = {"type": "get_capabilities", "params": {}, "auth_token": auth_token}
sock.sendall((json.dumps(payload, separators=(",", ":")) + "\n").encode("utf-8"))
buf = bytearray()
while True:
chunk = sock.recv(8192)
if not chunk:
raise Exception("Connection closed while waiting for capabilities response")
buf.extend(chunk)
nl = buf.find(b"\n")
if nl != -1:
line = bytes(buf[:nl])
resp = json.loads(line.decode("utf-8"))
result = resp.get("result", {})
if isinstance(result, dict):
return IntegrationsEnabled.from_capabilities_integrations(result.get("integrations"))
return IntegrationsEnabled()
if len(buf) > DEFAULT_MAX_MESSAGE_BYTES:
raise Exception("Capabilities response exceeded max size")
finally:
with contextlib.suppress(Exception):
sock.close()
except Exception as e:
logger.info(f"Blender not reachable for integration detection at startup: {e}")
return IntegrationsEnabled()
# Resource endpoints
# Global connection for resources (since resources can't access context)
_blender_connection = None
_blender_connection_lock = threading.Lock()
POLYHAVEN_STATUS_TTL_S = 5.0
_polyhaven_status_lock = threading.Lock()
_polyhaven_enabled: bool | None = None
_polyhaven_enabled_checked_at: float = 0.0
def _is_polyhaven_enabled(blender: BlenderConnection, *, force_refresh: bool = False) -> bool:
global _polyhaven_enabled, _polyhaven_enabled_checked_at
now = time.monotonic()
with _polyhaven_status_lock:
if (
not force_refresh
and _polyhaven_enabled is not None
and (now - _polyhaven_enabled_checked_at) <= POLYHAVEN_STATUS_TTL_S
):
return _polyhaven_enabled
try:
result = blender.send_command("get_polyhaven_status")
enabled = bool(result.get("enabled", False))
except Exception:
enabled = False
with _polyhaven_status_lock:
_polyhaven_enabled = enabled
_polyhaven_enabled_checked_at = now
return enabled
def get_blender_connection():
"""Get or create a persistent Blender connection"""
global _blender_connection
with _blender_connection_lock:
# If we have an existing connection, check if it's still valid
if _blender_connection is not None:
try:
_blender_connection.send_command("ping")
return _blender_connection
except Exception as e:
if "Unauthorized" in str(e) and not os.getenv(AUTH_TOKEN_ENV_VAR):
logger.warning(
"Blender rejected the connection as Unauthorized. "
f"If you set an auth token in the Blender addon preferences, also set {AUTH_TOKEN_ENV_VAR} for this MCP server."
)
logger.warning(f"Existing connection is no longer valid: {str(e)}")
try:
_blender_connection.disconnect()
except Exception:
pass
_blender_connection = None
# Create a new connection if needed
if _blender_connection is None:
host = os.getenv("BLENDER_HOST", DEFAULT_HOST)
port = int(os.getenv("BLENDER_PORT", DEFAULT_PORT))
auth_token = os.getenv(AUTH_TOKEN_ENV_VAR) or None
_blender_connection = BlenderConnection(host=host, port=port, auth_token=auth_token)
if not _blender_connection.connect():
logger.error("Failed to connect to Blender")
_blender_connection = None
raise Exception("Could not connect to Blender. Make sure the Blender addon is running.")
logger.info("Created new persistent connection to Blender")
try:
_blender_connection.send_command("ping")
except Exception as e:
logger.debug(f"Ping failed immediately after connect: {e}")
_is_polyhaven_enabled(_blender_connection, force_refresh=True)
return _blender_connection
@telemetry_tool("get_scene_info")
@mcp.tool
def get_scene_info(ctx: Context) -> str:
"""Get detailed information about the current Blender scene"""
try:
blender = get_blender_connection()
result = blender.send_command("get_scene_info")
# Just return the JSON representation of what Blender sent us
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error getting scene info from Blender: {str(e)}")
return f"Error getting scene info: {str(e)}"
@telemetry_tool("get_object_info")
@mcp.tool
def get_object_info(ctx: Context, object_name: str) -> str:
"""
Get detailed information about a specific object in the Blender scene.
Parameters:
- object_name: The name of the object to get information about
"""
try:
blender = get_blender_connection()
result = blender.send_command("get_object_info", {"name": object_name})
# Just return the JSON representation of what Blender sent us
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error getting object info from Blender: {str(e)}")
return f"Error getting object info: {str(e)}"
@telemetry_tool("get_viewport_screenshot")
@mcp.tool
def get_viewport_screenshot(ctx: Context, max_size: int = 800) -> ImageContent:
"""
Capture a screenshot of the current Blender 3D viewport.
Parameters:
- max_size: Maximum size in pixels for the largest dimension (default: 800)
Returns the screenshot as an Image.
"""
try:
blender = get_blender_connection()
# Create temp file path
temp_dir = tempfile.gettempdir()
temp_path = os.path.join(temp_dir, f"blender_screenshot_{os.getpid()}.png")
result = blender.send_command("get_viewport_screenshot", {
"max_size": max_size,
"filepath": temp_path,
"format": "png"
})
if "error" in result:
raise Exception(result["error"])
if not os.path.exists(temp_path):
raise Exception("Screenshot file was not created")
# Read the file
with open(temp_path, 'rb') as f:
image_bytes = f.read()
# Delete the temp file
os.remove(temp_path)
return ImageContent(
type="image",
data=base64.b64encode(image_bytes).decode("ascii"),
mimeType="image/png",
)
except Exception as e:
logger.error(f"Error capturing screenshot: {str(e)}")
raise Exception(f"Screenshot failed: {str(e)}")
@telemetry_tool("execute_blender_code")
@mcp.tool
def execute_blender_code(ctx: Context, code: str) -> str:
"""
Execute arbitrary Python code in Blender. Make sure to do it step-by-step by breaking it into smaller chunks.
Parameters:
- code: The Python code to execute
"""
try:
# Get the global connection
blender = get_blender_connection()
result = blender.send_command("execute_code", {"code": code})
return f"Code executed successfully: {result.get('result', '')}"
except Exception as e:
logger.error(f"Error executing code: {str(e)}")
return f"Error executing code: {str(e)}"
@telemetry_tool("get_capabilities")
@mcp.tool
def get_capabilities(ctx: Context) -> str:
"""
Return server/addon capabilities: DSL version, enabled integrations, supported ops/formats.
"""
try:
blender = get_blender_connection()
result = blender.send_command("get_capabilities", {})
# Ensure minimal expected fields exist
if isinstance(result, dict) and "dsl_version" not in result:
result["dsl_version"] = DSL_VERSION
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error getting capabilities: {str(e)}")
return f"Error getting capabilities: {str(e)}"
@telemetry_tool("execute_ops")
@mcp.tool
def execute_ops(ctx: Context, request: Dict[str, Any]) -> str:
"""
Execute a validated allowlisted ops request (DSL v1) in Blender.
Parameters:
- request: JSON object with {dsl_version, transaction, dry_run, ops}
"""
try:
normalized = validate_ops_request(request)
blender = get_blender_connection()
result = blender.send_command("execute_ops", {"request": normalized})
return json.dumps(result, indent=2)
except DslValidationError as e:
return f"DSL validation error: {str(e)}"
except Exception as e:
logger.error(f"Error executing ops: {str(e)}")
return f"Error executing ops: {str(e)}"
def _exec_single_op(op: Dict[str, Any], *, transaction: str = "atomic") -> Dict[str, Any]:
blender = get_blender_connection()
request = {"dsl_version": DSL_VERSION, "transaction": transaction, "dry_run": False, "ops": [op]}
normalized = validate_ops_request(request)
return blender.send_command("execute_ops", {"request": normalized})
@telemetry_tool("list_objects")
@mcp.tool
def list_objects(ctx: Context, filter: Dict[str, Any] = None, limit: int = 500) -> str:
"""
List objects in the current scene with optional filtering.
- filter: {types?: string[], collection?: string, name_contains?: string}
- limit: max number of objects to return (capped)
"""
try:
blender = get_blender_connection()
result = blender.send_command("list_objects", {"filter": filter or {}, "limit": limit})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error listing objects: {str(e)}")
return f"Error listing objects: {str(e)}"
@telemetry_tool("get_selection")
@mcp.tool
def get_selection(ctx: Context) -> str:
"""Get selected and active objects."""
try:
blender = get_blender_connection()
result = blender.send_command("get_selection", {})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error getting selection: {str(e)}")
return f"Error getting selection: {str(e)}"
@telemetry_tool("select_objects")
@mcp.tool
def select_objects(ctx: Context, names: List[str], mode: str = "replace", active: str | None = None) -> str:
"""
Select objects by name.
- mode: replace|add|remove
- active: optional active object
"""
try:
result = _exec_single_op({"type": "select", "names": names, "mode": mode, "active": active})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error selecting objects: {str(e)}")
return f"Error selecting objects: {str(e)}"
@telemetry_tool("get_world_info")
@mcp.tool
def get_world_info(ctx: Context) -> str:
"""Get world settings summary (nodes if enabled)."""
try:
blender = get_blender_connection()
result = blender.send_command("get_world_info", {})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error getting world info: {str(e)}")
return f"Error getting world info: {str(e)}"
@telemetry_tool("get_collections")
@mcp.tool
def get_collections(ctx: Context) -> str:
"""List collections and their object counts."""
try:
blender = get_blender_connection()
result = blender.send_command("get_collections", {})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error getting collections: {str(e)}")
return f"Error getting collections: {str(e)}"
@telemetry_tool("create_primitive")
@mcp.tool
def create_primitive(
ctx: Context,
primitive: str,
name: str | None = None,
size: float = 1.0,
location: List[float] | None = None,
rotation: List[float] | None = None,
scale: List[float] | None = None,
) -> str:
"""Create a primitive mesh object."""
try:
op: Dict[str, Any] = {
"type": "create_primitive",
"primitive": primitive,
"name": name,
"size": size,
"location": location,
"rotation": rotation,
"scale": scale,
}
result = _exec_single_op(op)
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error creating primitive: {str(e)}")
return f"Error creating primitive: {str(e)}"
@telemetry_tool("delete_objects")
@mcp.tool
def delete_objects(ctx: Context, names: List[str]) -> str:
"""Delete objects by name."""
try:
result = _exec_single_op({"type": "delete", "names": names})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error deleting objects: {str(e)}")
return f"Error deleting objects: {str(e)}"
@telemetry_tool("duplicate_objects")
@mcp.tool
def duplicate_objects(ctx: Context, names: List[str], linked: bool = False, new_names: List[str] | None = None) -> str:
"""Duplicate objects by name."""
try:
result = _exec_single_op({"type": "duplicate", "names": names, "linked": linked, "new_names": new_names})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error duplicating objects: {str(e)}")
return f"Error duplicating objects: {str(e)}"
@telemetry_tool("rename_object")
@mcp.tool
def rename_object(ctx: Context, from_name: str, to_name: str) -> str:
"""Rename an object."""
try:
result = _exec_single_op({"type": "rename", "from": from_name, "to": to_name})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error renaming object: {str(e)}")
return f"Error renaming object: {str(e)}"
@telemetry_tool("set_transform")
@mcp.tool
def set_transform(
ctx: Context,
name: str,
location: List[float] | None = None,
rotation: List[float] | None = None,
scale: List[float] | None = None,
space: str = "world",
) -> str:
"""Set object transform values."""
try:
result = _exec_single_op(
{"type": "set_transform", "name": name, "location": location, "rotation": rotation, "scale": scale, "space": space}
)
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error setting transform: {str(e)}")
return f"Error setting transform: {str(e)}"
@telemetry_tool("apply_transform")
@mcp.tool
def apply_transform(ctx: Context, name: str, location: bool = False, rotation: bool = False, scale: bool = False) -> str:
"""Apply object transforms (bake into mesh)."""
try:
result = _exec_single_op({"type": "apply_transform", "name": name, "location": location, "rotation": rotation, "scale": scale})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error applying transform: {str(e)}")
return f"Error applying transform: {str(e)}"
@telemetry_tool("set_shading")
@mcp.tool
def set_shading(
ctx: Context,
names: List[str],
shade: str,
auto_smooth: bool | None = None,
auto_smooth_angle: float | None = None,
) -> str:
"""Set per-face shading and optional auto smooth on mesh objects."""
try:
result = _exec_single_op(
{
"type": "set_shading",
"names": names,
"shade": shade,
"auto_smooth": auto_smooth,
"auto_smooth_angle": auto_smooth_angle,
}
)
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error setting shading: {str(e)}")
return f"Error setting shading: {str(e)}"
@telemetry_tool("recalculate_normals")
@mcp.tool
def recalculate_normals(ctx: Context, names: List[str], inside: bool = False) -> str:
"""Recalculate mesh face normals."""
try:
result = _exec_single_op({"type": "recalculate_normals", "names": names, "inside": inside})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error recalculating normals: {str(e)}")
return f"Error recalculating normals: {str(e)}"
@telemetry_tool("merge_by_distance")
@mcp.tool
def merge_by_distance(ctx: Context, names: List[str], distance: float) -> str:
"""Merge mesh vertices by distance (remove doubles)."""
try:
result = _exec_single_op({"type": "merge_by_distance", "names": names, "distance": distance})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error merging by distance: {str(e)}")
return f"Error merging by distance: {str(e)}"
@telemetry_tool("triangulate")
@mcp.tool
def triangulate(ctx: Context, names: List[str], quad_method: str = "BEAUTY", ngon_method: str = "BEAUTY") -> str:
"""Triangulate mesh faces (useful before export)."""
try:
result = _exec_single_op({"type": "triangulate", "names": names, "quad_method": quad_method, "ngon_method": ngon_method})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error triangulating meshes: {str(e)}")
return f"Error triangulating meshes: {str(e)}"
@telemetry_tool("join_objects")
@mcp.tool
def join_objects(ctx: Context, names: List[str], active: str | None = None, new_name: str | None = None) -> str:
"""Join multiple objects into one mesh object."""
try:
result = _exec_single_op({"type": "join_objects", "names": names, "active": active, "new_name": new_name})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error joining objects: {str(e)}")
return f"Error joining objects: {str(e)}"
@telemetry_tool("separate_mesh")
@mcp.tool
def separate_mesh(ctx: Context, name: str, mode: str) -> str:
"""Separate a mesh into multiple objects (loose/material/selected)."""
try:
result = _exec_single_op({"type": "separate_mesh", "name": name, "mode": mode})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error separating mesh: {str(e)}")
return f"Error separating mesh: {str(e)}"
@telemetry_tool("convert_to_mesh")
@mcp.tool
def convert_to_mesh(ctx: Context, names: List[str], keep_original: bool = False) -> str:
"""Convert objects to mesh (e.g., curves/text -> mesh)."""
try:
result = _exec_single_op({"type": "convert_to_mesh", "names": names, "keep_original": keep_original})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error converting to mesh: {str(e)}")
return f"Error converting to mesh: {str(e)}"
@telemetry_tool("set_visibility")
@mcp.tool
def set_visibility(
ctx: Context,
names: List[str],
viewport: bool | None = None,
render: bool | None = None,
selectable: bool | None = None,
) -> str:
"""Set object visibility/selectability flags."""
try:
result = _exec_single_op({"type": "set_visibility", "names": names, "viewport": viewport, "render": render, "selectable": selectable})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error setting visibility: {str(e)}")
return f"Error setting visibility: {str(e)}"
@telemetry_tool("set_collection_visibility")
@mcp.tool
def set_collection_visibility(ctx: Context, collection: str, viewport: bool | None = None, render: bool | None = None) -> str:
"""Hide/show a collection in the active view layer."""
try:
result = _exec_single_op({"type": "set_collection_visibility", "collection": collection, "viewport": viewport, "render": render})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error setting collection visibility: {str(e)}")
return f"Error setting collection visibility: {str(e)}"
@telemetry_tool("isolate_objects")
@mcp.tool
def isolate_objects(
ctx: Context,
names: List[str] | None = None,
mode: str = "isolate",
include_children: bool = True,
render: bool | None = None,
) -> str:
"""Isolate specific objects (hide all others) or clear isolation."""
try:
result = _exec_single_op(
{"type": "isolate_objects", "names": names or [], "mode": mode, "include_children": include_children, "render": render}
)
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error isolating objects: {str(e)}")
return f"Error isolating objects: {str(e)}"
@telemetry_tool("uv_smart_project")
@mcp.tool
def uv_smart_project(ctx: Context, names: List[str], params: Dict[str, Any] | None = None) -> str:
"""Generate UVs using Smart UV Project."""
try:
result = _exec_single_op({"type": "uv_smart_project", "names": names, "params": params or {}})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error running uv_smart_project: {str(e)}")
return f"Error running uv_smart_project: {str(e)}"
@telemetry_tool("uv_unwrap")
@mcp.tool
def uv_unwrap(ctx: Context, names: List[str], params: Dict[str, Any] | None = None) -> str:
"""Unwrap UVs for the given mesh objects."""
try:
result = _exec_single_op({"type": "uv_unwrap", "names": names, "params": params or {}})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error running uv_unwrap: {str(e)}")
return f"Error running uv_unwrap: {str(e)}"
@telemetry_tool("uv_pack_islands")
@mcp.tool
def uv_pack_islands(ctx: Context, names: List[str], params: Dict[str, Any] | None = None) -> str:
"""Pack UV islands for the given mesh objects."""
try:
result = _exec_single_op({"type": "uv_pack_islands", "names": names, "params": params or {}})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error running uv_pack_islands: {str(e)}")
return f"Error running uv_pack_islands: {str(e)}"
@telemetry_tool("bake_maps")
@mcp.tool
def bake_maps(
ctx: Context,
name: str,
bake_type: str,
output_path: str,
resolution: int = 1024,
margin: int = 16,
samples: int = 64,
use_selected_to_active: bool = False,
cage_extrusion: float | None = None,
) -> str:
"""Bake a map (AO/NORMAL/DIFFUSE/ROUGHNESS/EMIT) to an image file."""
try:
result = _exec_single_op(
{
"type": "bake_maps",
"name": name,
"bake_type": bake_type,
"output_path": output_path,
"resolution": resolution,
"margin": margin,
"samples": samples,
"use_selected_to_active": use_selected_to_active,
"cage_extrusion": cage_extrusion,
}
)
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error baking maps: {str(e)}")
return f"Error baking maps: {str(e)}"
@telemetry_tool("camera_look_at")
@mcp.tool
def camera_look_at(ctx: Context, camera: str, target: List[float], roll: float = 0.0) -> str:
"""Point a camera at a world-space target position."""
try:
result = _exec_single_op({"type": "camera_look_at", "camera": camera, "target": target, "roll": roll})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error aiming camera: {str(e)}")
return f"Error aiming camera: {str(e)}"
@telemetry_tool("create_turntable_animation")
@mcp.tool
def create_turntable_animation(
ctx: Context,
name: str,
frame_start: int,
frame_end: int,
axis: str = "Z",
revolutions: float = 1.0,
rig_name: str | None = None,
) -> str:
"""Create a simple turntable animation rig for an object."""
try:
result = _exec_single_op(
{
"type": "create_turntable_animation",
"name": name,
"frame_start": frame_start,
"frame_end": frame_end,
"axis": axis,
"revolutions": revolutions,
"rig_name": rig_name,
}
)
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error creating turntable animation: {str(e)}")
return f"Error creating turntable animation: {str(e)}"
@telemetry_tool("boolean_operation")
@mcp.tool
def boolean_operation(
ctx: Context,
target: str,
cutter: str,
operation: str = "DIFFERENCE",
solver: str = "FAST",
apply: bool = True,
remove_cutter: bool = False,
) -> str:
"""Perform a boolean operation between two mesh objects."""
try:
result = _exec_single_op(
{
"type": "boolean_operation",
"target": target,
"cutter": cutter,
"operation": operation,
"solver": solver,
"apply": apply,
"remove_cutter": remove_cutter,
}
)
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error running boolean operation: {str(e)}")
return f"Error running boolean operation: {str(e)}"
@telemetry_tool("purge_orphans")
@mcp.tool
def purge_orphans(ctx: Context) -> str:
"""Purge unused datablocks (orphans)."""
try:
result = _exec_single_op({"type": "purge_orphans"})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error purging orphans: {str(e)}")
return f"Error purging orphans: {str(e)}"
@telemetry_tool("pack_external_data")
@mcp.tool
def pack_external_data(ctx: Context) -> str:
"""Pack external data into the .blend file."""
try:
result = _exec_single_op({"type": "pack_external_data"})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error packing external data: {str(e)}")
return f"Error packing external data: {str(e)}"
@telemetry_tool("save_blend")
@mcp.tool
def save_blend(ctx: Context, path: str, compress: bool = False, copy: bool = False) -> str:
"""Save the current .blend to a path (subject to addon path policy)."""
try:
result = _exec_single_op({"type": "save_blend", "path": path, "compress": compress, "copy": copy})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error saving blend: {str(e)}")
return f"Error saving blend: {str(e)}"
@telemetry_tool("snap_to_ground")
@mcp.tool
def snap_to_ground(ctx: Context, names: List[str]) -> str:
"""Move mesh objects so their lowest point sits on Z=0."""
try:
result = _exec_single_op({"type": "snap_to_ground", "names": names})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error snapping to ground: {str(e)}")
return f"Error snapping to ground: {str(e)}"
@telemetry_tool("set_origin")
@mcp.tool
def set_origin(ctx: Context, name: str, mode: str) -> str:
"""Set object origin according to mode."""
try:
result = _exec_single_op({"type": "set_origin", "name": name, "mode": mode})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error setting origin: {str(e)}")
return f"Error setting origin: {str(e)}"
@telemetry_tool("ensure_collection")
@mcp.tool
def ensure_collection(ctx: Context, name: str) -> str:
"""Create a collection if it doesn't exist."""
try:
result = _exec_single_op({"type": "ensure_collection", "name": name})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error ensuring collection: {str(e)}")
return f"Error ensuring collection: {str(e)}"
@telemetry_tool("move_to_collection")
@mcp.tool
def move_to_collection(ctx: Context, object_name: str, collection: str) -> str:
"""Link an object to a collection (creates collection if needed)."""
try:
result = _exec_single_op({"type": "move_to_collection", "object": object_name, "collection": collection})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error moving to collection: {str(e)}")
return f"Error moving to collection: {str(e)}"
@telemetry_tool("set_parent")
@mcp.tool
def set_parent(ctx: Context, child: str, parent: str) -> str:
"""Set parent relationship."""
try:
result = _exec_single_op({"type": "set_parent", "child": child, "parent": parent})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error setting parent: {str(e)}")
return f"Error setting parent: {str(e)}"
@telemetry_tool("clear_parent")
@mcp.tool
def clear_parent(ctx: Context, names: List[str]) -> str:
"""Clear parent relationship for objects."""
try:
result = _exec_single_op({"type": "clear_parent", "names": names})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error clearing parent: {str(e)}")
return f"Error clearing parent: {str(e)}"
@telemetry_tool("list_materials")
@mcp.tool
def list_materials(ctx: Context, limit: int = 500) -> str:
"""List materials in the file."""
try:
blender = get_blender_connection()
result = blender.send_command("list_materials", {"limit": limit})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error listing materials: {str(e)}")
return f"Error listing materials: {str(e)}"
@telemetry_tool("create_material")
@mcp.tool
def create_material(ctx: Context, name: str, model: str = "pbr") -> str:
"""Create/ensure a material exists with a basic node setup."""
try:
result = _exec_single_op({"type": "ensure_material", "name": name, "model": model})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error creating material: {str(e)}")
return f"Error creating material: {str(e)}"
@telemetry_tool("set_material_params")
@mcp.tool
def set_material_params(ctx: Context, material: str, params: Dict[str, Any]) -> str:
"""Set PBR parameters on a material."""
try:
result = _exec_single_op({"type": "set_material_params", "material": material, "params": params})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error setting material params: {str(e)}")
return f"Error setting material params: {str(e)}"
@telemetry_tool("assign_material")
@mcp.tool
def assign_material(ctx: Context, object_name: str, material: str, slot: int = 0) -> str:
"""Assign a material to a mesh object slot."""
try:
result = _exec_single_op({"type": "assign_material", "object": object_name, "material": material, "slot": slot})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error assigning material: {str(e)}")
return f"Error assigning material: {str(e)}"
@telemetry_tool("set_texture_maps")
@mcp.tool
def set_texture_maps(ctx: Context, material: str, maps: Dict[str, str]) -> str:
"""
Wire texture maps (local paths) into a material.
maps keys: basecolor, normal, roughness, metallic, ao
"""
try:
result = _exec_single_op({"type": "set_texture_maps", "material": material, "maps": maps})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error setting texture maps: {str(e)}")
return f"Error setting texture maps: {str(e)}"
@telemetry_tool("list_modifiers")
@mcp.tool
def list_modifiers(ctx: Context, object_name: str) -> str:
"""List modifiers on an object."""
try:
blender = get_blender_connection()
result = blender.send_command("list_modifiers", {"object_name": object_name})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error listing modifiers: {str(e)}")
return f"Error listing modifiers: {str(e)}"
@telemetry_tool("add_modifier")
@mcp.tool
def add_modifier(ctx: Context, object_name: str, modifier_type: str, params: Dict[str, Any] | None = None) -> str:
"""Add a modifier to a mesh object."""
try:
result = _exec_single_op({"type": "add_modifier", "name": object_name, "modifier_type": modifier_type, "params": params or {}})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error adding modifier: {str(e)}")
return f"Error adding modifier: {str(e)}"
@telemetry_tool("apply_modifier")
@mcp.tool
def apply_modifier(ctx: Context, object_name: str, modifier_name: str) -> str:
"""Apply a modifier on a mesh object."""
try:
result = _exec_single_op({"type": "apply_modifier", "name": object_name, "modifier_name": modifier_name})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error applying modifier: {str(e)}")
return f"Error applying modifier: {str(e)}"
@telemetry_tool("remove_modifier")
@mcp.tool
def remove_modifier(ctx: Context, object_name: str, modifier_name: str) -> str:
"""Remove a modifier from a mesh object."""
try:
result = _exec_single_op({"type": "remove_modifier", "name": object_name, "modifier_name": modifier_name})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error removing modifier: {str(e)}")
return f"Error removing modifier: {str(e)}"
@telemetry_tool("create_light")
@mcp.tool
def create_light(ctx: Context, light_type: str, name: str | None = None, location: List[float] | None = None, params: Dict[str, Any] | None = None) -> str:
"""Create a light object."""
try:
result = _exec_single_op({"type": "create_light", "light": light_type, "name": name, "location": location, "params": params or {}})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error creating light: {str(e)}")
return f"Error creating light: {str(e)}"
@telemetry_tool("set_light_params")
@mcp.tool
def set_light_params(ctx: Context, name: str, params: Dict[str, Any]) -> str:
"""Set light parameters."""
try:
result = _exec_single_op({"type": "set_light_params", "name": name, "params": params})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error setting light params: {str(e)}")
return f"Error setting light params: {str(e)}"
@telemetry_tool("create_camera")
@mcp.tool
def create_camera(ctx: Context, name: str | None = None, location: List[float] | None = None, rotation: List[float] | None = None, params: Dict[str, Any] | None = None) -> str:
"""Create a camera object."""
try:
result = _exec_single_op({"type": "create_camera", "name": name, "location": location, "rotation": rotation, "params": params or {}})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error creating camera: {str(e)}")
return f"Error creating camera: {str(e)}"
@telemetry_tool("set_camera_params")
@mcp.tool
def set_camera_params(ctx: Context, name: str, params: Dict[str, Any]) -> str:
"""Set camera parameters (lens/clips/DOF subset supported)."""
try:
result = _exec_single_op({"type": "set_camera_params", "name": name, "params": params})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error setting camera params: {str(e)}")
return f"Error setting camera params: {str(e)}"
@telemetry_tool("set_active_camera")
@mcp.tool
def set_active_camera(ctx: Context, name: str) -> str:
"""Set the active scene camera."""
try:
result = _exec_single_op({"type": "set_active_camera", "name": name})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error setting active camera: {str(e)}")
return f"Error setting active camera: {str(e)}"
@telemetry_tool("frame_camera_on_objects")
@mcp.tool
def frame_camera_on_objects(ctx: Context, camera: str, object_names: List[str], margin: float = 0.1) -> str:
"""Move/aim camera to frame the provided objects."""
try:
result = _exec_single_op({"type": "frame_camera", "camera": camera, "objects": object_names, "margin": margin})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error framing camera: {str(e)}")
return f"Error framing camera: {str(e)}"
@telemetry_tool("set_world_background")
@mcp.tool
def set_world_background(ctx: Context, color: List[float] | None = None, strength: float | None = None) -> str:
"""Set world background color/strength."""
try:
result = _exec_single_op({"type": "set_world_background", "color": color, "strength": strength})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error setting world background: {str(e)}")
return f"Error setting world background: {str(e)}"
@telemetry_tool("set_world_hdri")
@mcp.tool
def set_world_hdri(ctx: Context, source: Dict[str, Any], strength: float = 1.0, rotation: List[float] | None = None) -> str:
"""Set an HDRI from polyhaven_id/path/url."""
try:
result = _exec_single_op({"type": "set_world_hdri", "source": source, "strength": strength, "rotation": rotation})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error setting world HDRI: {str(e)}")
return f"Error setting world HDRI: {str(e)}"
@telemetry_tool("import_model")
@mcp.tool
def import_model(ctx: Context, path: str, format: str, options: Dict[str, Any] | None = None) -> str:
"""Import a model from a local path."""
try:
result = _exec_single_op({"type": "import_model", "path": path, "format": format, "options": options or {}})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error importing model: {str(e)}")
return f"Error importing model: {str(e)}"
@telemetry_tool("export_scene")
@mcp.tool
def export_scene(ctx: Context, path: str, format: str, options: Dict[str, Any] | None = None) -> str:
"""Export the scene to a local path."""
try:
result = _exec_single_op({"type": "export_scene", "path": path, "format": format, "options": options or {}})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error exporting scene: {str(e)}")
return f"Error exporting scene: {str(e)}"
@telemetry_tool("set_render_settings")
@mcp.tool
def set_render_settings(
ctx: Context,
engine: str,
resolution: List[int] | None = None,
samples: int | None = None,
denoise: bool | None = None,
color_management: Dict[str, Any] | None = None,
) -> str:
"""Set render settings."""
try:
result = _exec_single_op(
{
"type": "set_render_settings",
"engine": engine,
"resolution": resolution,
"samples": samples,
"denoise": denoise,
"color_management": color_management or {},
}
)
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error setting render settings: {str(e)}")
return f"Error setting render settings: {str(e)}"
@telemetry_tool("render_still")
@mcp.tool
def render_still(ctx: Context, output_path: str | None = None, format: str = "PNG") -> str:
"""Render a still image to output_path (or temp)."""
try:
result = _exec_single_op({"type": "render_still", "output_path": output_path, "format": format})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error rendering still: {str(e)}")
return f"Error rendering still: {str(e)}"
@telemetry_tool("render_animation")
@mcp.tool
def render_animation(ctx: Context, output_dir: str, frame_start: int, frame_end: int) -> str:
"""Render an animation to a directory."""
try:
result = _exec_single_op({"type": "render_animation", "output_dir": output_dir, "frame_start": frame_start, "frame_end": frame_end})
return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error rendering animation: {str(e)}")
return f"Error rendering animation: {str(e)}"
@telemetry_tool("get_polyhaven_categories")
@mcp.tool
def get_polyhaven_categories(ctx: Context, asset_type: str = "hdris") -> str:
"""
Get a list of categories for a specific asset type on Polyhaven.
Parameters:
- asset_type: The type of asset to get categories for (hdris, textures, models, all)
"""
try:
blender = get_blender_connection()
if not _is_polyhaven_enabled(blender):
return "PolyHaven integration is disabled. Select it in the sidebar in BlenderMCP, then run it again."
result = blender.send_command("get_polyhaven_categories", {"asset_type": asset_type})
if "error" in result:
return f"Error: {result['error']}"
# Format the categories in a more readable way
categories = result["categories"]
formatted_output = f"Categories for {asset_type}:\n\n"
# Sort categories by count (descending)
sorted_categories = sorted(categories.items(), key=lambda x: x[1], reverse=True)
for category, count in sorted_categories:
formatted_output += f"- {category}: {count} assets\n"
return formatted_output
except Exception as e:
logger.error(f"Error getting Polyhaven categories: {str(e)}")
return f"Error getting Polyhaven categories: {str(e)}"
@telemetry_tool("search_polyhaven_assets")
@mcp.tool
def search_polyhaven_assets(
ctx: Context,
asset_type: str = "all",
categories: str = None
) -> str:
"""
Search for assets on Polyhaven with optional filtering.
Parameters:
- asset_type: Type of assets to search for (hdris, textures, models, all)
- categories: Optional comma-separated list of categories to filter by
Returns a list of matching assets with basic information.
"""
try:
blender = get_blender_connection()
if not _is_polyhaven_enabled(blender):
return "PolyHaven integration is disabled. Select it in the sidebar in BlenderMCP, then run it again."
result = blender.send_command("search_polyhaven_assets", {
"asset_type": asset_type,
"categories": categories
})
if "error" in result:
return f"Error: {result['error']}"
# Format the assets in a more readable way
assets = result["assets"]
total_count = result["total_count"]
returned_count = result["returned_count"]
formatted_output = f"Found {total_count} assets"
if categories:
formatted_output += f" in categories: {categories}"
formatted_output += f"\nShowing {returned_count} assets:\n\n"
# Sort assets by download count (popularity)
sorted_assets = sorted(assets.items(), key=lambda x: x[1].get("download_count", 0), reverse=True)
for asset_id, asset_data in sorted_assets:
formatted_output += f"- {asset_data.get('name', asset_id)} (ID: {asset_id})\n"
formatted_output += f" Type: {['HDRI', 'Texture', 'Model'][asset_data.get('type', 0)]}\n"
formatted_output += f" Categories: {', '.join(asset_data.get('categories', []))}\n"
formatted_output += f" Downloads: {asset_data.get('download_count', 'Unknown')}\n\n"
return formatted_output
except Exception as e:
logger.error(f"Error searching Polyhaven assets: {str(e)}")
return f"Error searching Polyhaven assets: {str(e)}"
@telemetry_tool("download_polyhaven_asset")
@mcp.tool
def download_polyhaven_asset(
ctx: Context,
asset_id: str,
asset_type: str,
resolution: str = "1k",
file_format: str = None
) -> str:
"""
Download and import a Polyhaven asset into Blender.
Parameters:
- asset_id: The ID of the asset to download
- asset_type: The type of asset (hdris, textures, models)
- resolution: The resolution to download (e.g., 1k, 2k, 4k)
- file_format: Optional file format (e.g., hdr, exr for HDRIs; jpg, png for textures; gltf, fbx for models)
Returns a message indicating success or failure.
"""
try:
blender = get_blender_connection()
if not _is_polyhaven_enabled(blender):
return "PolyHaven integration is disabled. Select it in the sidebar in BlenderMCP, then run it again."
result = blender.send_command("download_polyhaven_asset", {
"asset_id": asset_id,
"asset_type": asset_type,
"resolution": resolution,
"file_format": file_format
})
if "error" in result:
return f"Error: {result['error']}"
if result.get("success"):
message = result.get("message", "Asset downloaded and imported successfully")
# Add additional information based on asset type
if asset_type == "hdris":
return f"{message}. The HDRI has been set as the world environment."
elif asset_type == "textures":
material_name = result.get("material", "")
maps = ", ".join(result.get("maps", []))
return f"{message}. Created material '{material_name}' with maps: {maps}."
elif asset_type == "models":
return f"{message}. The model has been imported into the current scene."
else:
return message
else:
return f"Failed to download asset: {result.get('message', 'Unknown error')}"
except Exception as e:
logger.error(f"Error downloading Polyhaven asset: {str(e)}")
return f"Error downloading Polyhaven asset: {str(e)}"
@telemetry_tool("set_texture")
@mcp.tool
def set_texture(
ctx: Context,
object_name: str,
texture_id: str
) -> str:
"""
Apply a previously downloaded Polyhaven texture to an object.
Parameters:
- object_name: Name of the object to apply the texture to
- texture_id: ID of the Polyhaven texture to apply (must be downloaded first)
Returns a message indicating success or failure.
"""
try:
# Get the global connection
blender = get_blender_connection()
if not _is_polyhaven_enabled(blender):
return "PolyHaven integration is disabled. Select it in the sidebar in BlenderMCP, then run it again."
result = blender.send_command("set_texture", {
"object_name": object_name,
"texture_id": texture_id
})
if "error" in result:
return f"Error: {result['error']}"
if result.get("success"):
material_name = result.get("material", "")
maps = ", ".join(result.get("maps", []))
# Add detailed material info
material_info = result.get("material_info", {})
node_count = material_info.get("node_count", 0)
has_nodes = material_info.get("has_nodes", False)
texture_nodes = material_info.get("texture_nodes", [])
output = f"Successfully applied texture '{texture_id}' to {object_name}.\n"
output += f"Using material '{material_name}' with maps: {maps}.\n\n"
output += f"Material has nodes: {has_nodes}\n"
output += f"Total node count: {node_count}\n\n"
if texture_nodes:
output += "Texture nodes:\n"
for node in texture_nodes:
output += f"- {node['name']} using image: {node['image']}\n"
if node['connections']:
output += " Connections:\n"
for conn in node['connections']:
output += f" {conn}\n"
else:
output += "No texture nodes found in the material.\n"
return output
else:
return f"Failed to apply texture: {result.get('message', 'Unknown error')}"
except Exception as e:
logger.error(f"Error applying texture: {str(e)}")
return f"Error applying texture: {str(e)}"
@telemetry_tool("get_polyhaven_status")
@mcp.tool
def get_polyhaven_status(ctx: Context) -> str:
"""
Check if PolyHaven integration is enabled in Blender.
Returns a message indicating whether PolyHaven features are available.
"""
try:
blender = get_blender_connection()
result = blender.send_command("get_polyhaven_status")
enabled = result.get("enabled", False)
message = result.get("message", "")
if enabled:
message += "PolyHaven is good at Textures, and has a wider variety of textures than Sketchfab."
return message
except Exception as e:
logger.error(f"Error checking PolyHaven status: {str(e)}")
return f"Error checking PolyHaven status: {str(e)}"
@telemetry_tool("get_hyper3d_status")
@mcp.tool
def get_hyper3d_status(ctx: Context) -> str:
"""
Check if Hyper3D Rodin integration is enabled in Blender.
Returns a message indicating whether Hyper3D Rodin features are available.
Don't emphasize the key type in the returned message, but sliently remember it.
"""
try:
blender = get_blender_connection()
result = blender.send_command("get_hyper3d_status")
enabled = result.get("enabled", False)
message = result.get("message", "")
if enabled:
message += ""
return message
except Exception as e:
logger.error(f"Error checking Hyper3D status: {str(e)}")
return f"Error checking Hyper3D status: {str(e)}"
@telemetry_tool("get_sketchfab_status")
@mcp.tool
def get_sketchfab_status(ctx: Context) -> str:
"""
Check if Sketchfab integration is enabled in Blender.
Returns a message indicating whether Sketchfab features are available.
"""
try:
blender = get_blender_connection()
result = blender.send_command("get_sketchfab_status")
enabled = result.get("enabled", False)
message = result.get("message", "")
if enabled:
message += "Sketchfab is good at Realistic models, and has a wider variety of models than PolyHaven."
return message
except Exception as e:
logger.error(f"Error checking Sketchfab status: {str(e)}")
return f"Error checking Sketchfab status: {str(e)}"
@telemetry_tool("search_sketchfab_models")
@mcp.tool
def search_sketchfab_models(
ctx: Context,
query: str,
categories: str = None,
count: int = 20,
downloadable: bool = True
) -> str:
"""
Search for models on Sketchfab with optional filtering.
Parameters:
- query: Text to search for
- categories: Optional comma-separated list of categories
- count: Maximum number of results to return (default 20)
- downloadable: Whether to include only downloadable models (default True)
Returns a formatted list of matching models.
"""
try:
blender = get_blender_connection()
logger.info(f"Searching Sketchfab models with query: {query}, categories: {categories}, count: {count}, downloadable: {downloadable}")
result = blender.send_command("search_sketchfab_models", {
"query": query,
"categories": categories,
"count": count,
"downloadable": downloadable
})
if "error" in result:
logger.error(f"Error from Sketchfab search: {result['error']}")
return f"Error: {result['error']}"
# Safely get results with fallbacks for None
if result is None:
logger.error("Received None result from Sketchfab search")
return "Error: Received no response from Sketchfab search"
# Format the results
models = result.get("results", []) or []
if not models:
return f"No models found matching '{query}'"
formatted_output = f"Found {len(models)} models matching '{query}':\n\n"
for model in models:
if model is None:
continue
model_name = model.get("name", "Unnamed model")
model_uid = model.get("uid", "Unknown ID")
formatted_output += f"- {model_name} (UID: {model_uid})\n"
# Get user info with safety checks
user = model.get("user") or {}
username = user.get("username", "Unknown author") if isinstance(user, dict) else "Unknown author"
formatted_output += f" Author: {username}\n"
# Get license info with safety checks
license_data = model.get("license") or {}
license_label = license_data.get("label", "Unknown") if isinstance(license_data, dict) else "Unknown"
formatted_output += f" License: {license_label}\n"
# Add face count and downloadable status
face_count = model.get("faceCount", "Unknown")
is_downloadable = "Yes" if model.get("isDownloadable") else "No"
formatted_output += f" Face count: {face_count}\n"
formatted_output += f" Downloadable: {is_downloadable}\n\n"
return formatted_output
except Exception as e:
logger.error(f"Error searching Sketchfab models: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return f"Error searching Sketchfab models: {str(e)}"
@telemetry_tool("download_sketchfab_model")
@mcp.tool
def download_sketchfab_model(
ctx: Context,
uid: str
) -> str:
"""
Download and import a Sketchfab model by its UID.
Parameters:
- uid: The unique identifier of the Sketchfab model
Returns a message indicating success or failure.
The model must be downloadable and you must have proper access rights.
"""
try:
blender = get_blender_connection()
logger.info(f"Attempting to download Sketchfab model with UID: {uid}")
result = blender.send_command("download_sketchfab_model", {
"uid": uid
})
if result is None:
logger.error("Received None result from Sketchfab download")
return "Error: Received no response from Sketchfab download request"
if "error" in result:
logger.error(f"Error from Sketchfab download: {result['error']}")
return f"Error: {result['error']}"
if result.get("success"):
imported_objects = result.get("imported_objects", [])
object_names = ", ".join(imported_objects) if imported_objects else "none"
return f"Successfully imported model. Created objects: {object_names}"
else:
return f"Failed to download model: {result.get('message', 'Unknown error')}"
except Exception as e:
logger.error(f"Error downloading Sketchfab model: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return f"Error downloading Sketchfab model: {str(e)}"
def _process_bbox(original_bbox: list[float] | list[int] | None) -> list[int] | None:
if original_bbox is None:
return None
if all(isinstance(i, int) for i in original_bbox):
return original_bbox
if any(i<=0 for i in original_bbox):
raise ValueError("Incorrect number range: bbox must be bigger than zero!")
return [int(float(i) / max(original_bbox) * 100) for i in original_bbox] if original_bbox else None
@telemetry_tool("generate_hyper3d_model_via_text")
@mcp.tool
def generate_hyper3d_model_via_text(
ctx: Context,
text_prompt: str,
bbox_condition: list[float]=None
) -> str:
"""
Generate 3D asset using Hyper3D by giving description of the desired asset, and import the asset into Blender.
The 3D asset has built-in materials.
The generated model has a normalized size, so re-scaling after generation can be useful.
Parameters:
- text_prompt: A short description of the desired model in **English**.
- bbox_condition: Optional. If given, it has to be a list of floats of length 3. Controls the ratio between [Length, Width, Height] of the model.
Returns a message indicating success or failure.
"""
try:
blender = get_blender_connection()
result = blender.send_command("create_rodin_job", {
"text_prompt": text_prompt,
"images": None,
"bbox_condition": _process_bbox(bbox_condition),
})
succeed = result.get("submit_time", False)
if succeed:
return json.dumps({
"task_uuid": result["uuid"],
"subscription_key": result["jobs"]["subscription_key"],
})
else:
return json.dumps(result)
except Exception as e:
logger.error(f"Error generating Hyper3D task: {str(e)}")
return f"Error generating Hyper3D task: {str(e)}"
@telemetry_tool("generate_hyper3d_model_via_images")
@mcp.tool
def generate_hyper3d_model_via_images(
ctx: Context,
input_image_paths: list[str]=None,
input_image_urls: list[str]=None,
bbox_condition: list[float]=None
) -> str:
"""
Generate 3D asset using Hyper3D by giving images of the wanted asset, and import the generated asset into Blender.
The 3D asset has built-in materials.
The generated model has a normalized size, so re-scaling after generation can be useful.
Parameters:
- input_image_paths: The **absolute** paths of input images. Even if only one image is provided, wrap it into a list. Required if Hyper3D Rodin in MAIN_SITE mode.
- input_image_urls: The URLs of input images. Even if only one image is provided, wrap it into a list. Required if Hyper3D Rodin in FAL_AI mode.
- bbox_condition: Optional. If given, it has to be a list of ints of length 3. Controls the ratio between [Length, Width, Height] of the model.
Only one of {input_image_paths, input_image_urls} should be given at a time, depending on the Hyper3D Rodin's current mode.
Returns a message indicating success or failure.
"""
def _is_valid_url(url: str) -> bool:
parsed = urlparse(url)
return parsed.scheme in {"http", "https"} and bool(parsed.netloc)
if input_image_paths is not None and input_image_urls is not None:
return f"Error: Conflict parameters given!"
if input_image_paths is None and input_image_urls is None:
return f"Error: No image given!"
if input_image_paths is not None:
if not all(os.path.exists(i) for i in input_image_paths):
return "Error: not all image paths are valid!"
images = []
for path in input_image_paths:
with open(path, "rb") as f:
images.append(
(Path(path).suffix, base64.b64encode(f.read()).decode("ascii"))
)
elif input_image_urls is not None:
if not all(isinstance(i, str) and _is_valid_url(i) for i in input_image_urls):
return "Error: not all image URLs are valid!"
images = input_image_urls.copy()
try:
blender = get_blender_connection()
result = blender.send_command("create_rodin_job", {
"text_prompt": None,
"images": images,
"bbox_condition": _process_bbox(bbox_condition),
})
succeed = result.get("submit_time", False)
if succeed:
return json.dumps({
"task_uuid": result["uuid"],
"subscription_key": result["jobs"]["subscription_key"],
})
else:
return json.dumps(result)
except Exception as e:
logger.error(f"Error generating Hyper3D task: {str(e)}")
return f"Error generating Hyper3D task: {str(e)}"
@telemetry_tool("poll_rodin_job_status")
@mcp.tool
def poll_rodin_job_status(
ctx: Context,
subscription_key: str=None,
request_id: str=None,
):
"""
Check if the Hyper3D Rodin generation task is completed.
For Hyper3D Rodin mode MAIN_SITE:
Parameters:
- subscription_key: The subscription_key given in the generate model step.
Returns a list of status. The task is done if all status are "Done".
If "Failed" showed up, the generating process failed.
This is a polling API, so only proceed if the status are finally determined ("Done" or "Canceled").
For Hyper3D Rodin mode FAL_AI:
Parameters:
- request_id: The request_id given in the generate model step.
Returns the generation task status. The task is done if status is "COMPLETED".
The task is in progress if status is "IN_PROGRESS".
If status other than "COMPLETED", "IN_PROGRESS", "IN_QUEUE" showed up, the generating process might be failed.
This is a polling API, so only proceed if the status are finally determined ("COMPLETED" or some failed state).
"""
try:
blender = get_blender_connection()
kwargs = {}
if subscription_key:
kwargs = {
"subscription_key": subscription_key,
}
elif request_id:
kwargs = {
"request_id": request_id,
}
result = blender.send_command("poll_rodin_job_status", kwargs)
return result
except Exception as e:
logger.error(f"Error generating Hyper3D task: {str(e)}")
return f"Error generating Hyper3D task: {str(e)}"
@telemetry_tool("import_generated_asset")
@mcp.tool
def import_generated_asset(
ctx: Context,
name: str,
task_uuid: str=None,
request_id: str=None,
):
"""
Import the asset generated by Hyper3D Rodin after the generation task is completed.
Parameters:
- name: The name of the object in scene
- task_uuid: For Hyper3D Rodin mode MAIN_SITE: The task_uuid given in the generate model step.
- request_id: For Hyper3D Rodin mode FAL_AI: The request_id given in the generate model step.
Only give one of {task_uuid, request_id} based on the Hyper3D Rodin Mode!
Return if the asset has been imported successfully.
"""
try:
blender = get_blender_connection()
kwargs = {
"name": name
}
if task_uuid:
kwargs["task_uuid"] = task_uuid
elif request_id:
kwargs["request_id"] = request_id
result = blender.send_command("import_generated_asset", kwargs)
return result
except Exception as e:
logger.error(f"Error generating Hyper3D task: {str(e)}")
return f"Error generating Hyper3D task: {str(e)}"
@mcp.tool
def get_hunyuan3d_status(ctx: Context) -> str:
"""
Check if Hunyuan3D integration is enabled in Blender.
Returns a message indicating whether Hunyuan3D features are available.
Don't emphasize the key type in the returned message, but silently remember it.
"""
try:
blender = get_blender_connection()
result = blender.send_command("get_hunyuan3d_status")
message = result.get("message", "")
return message
except Exception as e:
logger.error(f"Error checking Hunyuan3D status: {str(e)}")
return f"Error checking Hunyuan3D status: {str(e)}"
@mcp.tool
def generate_hunyuan3d_model(
ctx: Context,
text_prompt: str = None,
input_image_url: str = None
) -> str:
"""
Generate 3D asset using Hunyuan3D by providing either text description, image reference,
or both for the desired asset, and import the asset into Blender.
The 3D asset has built-in materials.
Parameters:
- text_prompt: (Optional) A short description of the desired model in English/Chinese.
- input_image_url: (Optional) The local or remote url of the input image. Accepts None if only using text prompt.
Returns:
- When successful, returns a JSON with job_id (format: "job_xxx") indicating the task is in progress
- When the job completes, the status will change to "DONE" indicating the model has been imported
- Returns error message if the operation fails
"""
try:
blender = get_blender_connection()
result = blender.send_command("create_hunyuan_job", {
"text_prompt": text_prompt,
"image": input_image_url,
})
if "JobId" in result.get("Response", {}):
job_id = result["Response"]["JobId"]
formatted_job_id = f"job_{job_id}"
return json.dumps({
"job_id": formatted_job_id,
})
return json.dumps(result)
except Exception as e:
logger.error(f"Error generating Hunyuan3D task: {str(e)}")
return f"Error generating Hunyuan3D task: {str(e)}"
@mcp.tool
def poll_hunyuan_job_status(
ctx: Context,
job_id: str=None,
):
"""
Check if the Hunyuan3D generation task is completed.
For Hunyuan3D:
Parameters:
- job_id: The job_id given in the generate model step.
Returns the generation task status. The task is done if status is "DONE".
The task is in progress if status is "RUN".
If status is "DONE", returns ResultFile3Ds, which is the generated ZIP model path
When the status is "DONE", the response includes a field named ResultFile3Ds that contains the generated ZIP file path of the 3D model in OBJ format.
This is a polling API, so only proceed if the status are finally determined ("DONE" or some failed state).
"""
try:
blender = get_blender_connection()
kwargs = {
"job_id": job_id,
}
result = blender.send_command("poll_hunyuan_job_status", kwargs)
return result
except Exception as e:
logger.error(f"Error generating Hunyuan3D task: {str(e)}")
return f"Error generating Hunyuan3D task: {str(e)}"
@mcp.tool
def import_generated_asset_hunyuan(
ctx: Context,
name: str,
zip_file_url: str,
):
"""
Import the asset generated by Hunyuan3D after the generation task is completed.
Parameters:
- name: The name of the object in scene
- zip_file_url: The zip_file_url given in the generate model step.
Return if the asset has been imported successfully.
"""
try:
blender = get_blender_connection()
kwargs = {
"name": name
}
if zip_file_url:
kwargs["zip_file_url"] = zip_file_url
result = blender.send_command("import_generated_asset_hunyuan", kwargs)
return result
except Exception as e:
logger.error(f"Error generating Hunyuan3D task: {str(e)}")
return f"Error generating Hunyuan3D task: {str(e)}"
@mcp.prompt
def asset_creation_strategy() -> str:
"""Defines the preferred strategy for creating assets in Blender"""
return """When creating 3D content in Blender, always start by checking if integrations are available:
0. Before anything, always check the scene from get_scene_info()
1. First use the following tools to verify if the following integrations are enabled:
1. PolyHaven
Use get_polyhaven_status() to verify its status
If PolyHaven is enabled:
- For objects/models: Use download_polyhaven_asset() with asset_type="models"
- For materials/textures: Use download_polyhaven_asset() with asset_type="textures"
- For environment lighting: Use download_polyhaven_asset() with asset_type="hdris"
2. Sketchfab
Sketchfab is good at Realistic models, and has a wider variety of models than PolyHaven.
Use get_sketchfab_status() to verify its status
If Sketchfab is enabled:
- For objects/models: First search using search_sketchfab_models() with your query
- Then download specific models using download_sketchfab_model() with the UID
- Note that only downloadable models can be accessed, and API key must be properly configured
- Sketchfab has a wider variety of models than PolyHaven, especially for specific subjects
3. Hyper3D(Rodin)
Hyper3D Rodin is good at generating 3D models for single item.
So don't try to:
1. Generate the whole scene with one shot
2. Generate ground using Hyper3D
3. Generate parts of the items separately and put them together afterwards
Use get_hyper3d_status() to verify its status
If Hyper3D is enabled:
- For objects/models, do the following steps:
1. Create the model generation task
- Use generate_hyper3d_model_via_images() if image(s) is/are given
- Use generate_hyper3d_model_via_text() if generating 3D asset using text prompt
If key type is free_trial and insufficient balance error returned, tell the user that the free trial key can only generated limited models everyday, they can choose to:
- Wait for another day and try again
- Go to hyper3d.ai to find out how to get their own API key
- Go to fal.ai to get their own private API key
2. Poll the status
- Use poll_rodin_job_status() to check if the generation task has completed or failed
3. Import the asset
- Use import_generated_asset() to import the generated GLB model the asset
4. After importing the asset, ALWAYS check the world_bounding_box of the imported mesh, and adjust the mesh's location and size
Adjust the imported mesh's location, scale, rotation, so that the mesh is on the right spot.
You can reuse assets previous generated by running python code to duplicate the object, without creating another generation task.
4. Hunyuan3D
Hunyuan3D is good at generating 3D models for single item.
So don't try to:
1. Generate the whole scene with one shot
2. Generate ground using Hunyuan3D
3. Generate parts of the items separately and put them together afterwards
Use get_hunyuan3d_status() to verify its status
If Hunyuan3D is enabled:
if Hunyuan3D mode is "OFFICIAL_API":
- For objects/models, do the following steps:
1. Create the model generation task
- Use generate_hunyuan3d_model by providing either a **text description** OR an **image(local or urls) reference**.
- Go to cloud.tencent.com out how to get their own SecretId and SecretKey
2. Poll the status
- Use poll_hunyuan_job_status() to check if the generation task has completed or failed
3. Import the asset
- Use import_generated_asset_hunyuan() to import the generated OBJ model the asset
if Hunyuan3D mode is "LOCAL_API":
- For objects/models, do the following steps:
1. Create the model generation task
- Use generate_hunyuan3d_model if image (local or urls) or text prompt is given and import the asset
You can reuse assets previous generated by running python code to duplicate the object, without creating another generation task.
3. Always check the world_bounding_box for each item so that:
- Ensure that all objects that should not be clipping are not clipping.
- Items have right spatial relationship.
4. Recommended asset source priority:
- For specific existing objects: First try Sketchfab, then PolyHaven
- For generic objects/furniture: First try PolyHaven, then Sketchfab
- For custom or unique items not available in libraries: Use Hyper3D Rodin or Hunyuan3D
- For environment lighting: Use PolyHaven HDRIs
- For materials/textures: Use PolyHaven textures
Only fall back to scripting when:
- PolyHaven, Sketchfab, Hyper3D, and Hunyuan3D are all disabled
- A simple primitive is explicitly requested
- No suitable asset exists in any of the libraries
- Hyper3D Rodin or Hunyuan3D failed to generate the desired asset
- The task specifically requires a basic material/color
"""
# Main execution
def _build_tool_registry() -> Dict[str, Any]:
"""
Central tool registry of *all* callable tools this server can expose.
Keys are the MCP tool names (function names).
"""
# NOTE: Keep this in sync when adding/removing tool functions.
return {
# Core
"get_scene_info": get_scene_info,
"get_object_info": get_object_info,
"get_viewport_screenshot": get_viewport_screenshot,
"get_capabilities": get_capabilities,
"execute_ops": execute_ops,
# Escape hatch (FULL profile only)
"execute_blender_code": execute_blender_code,
# Scene/object listing
"list_objects": list_objects,
"get_selection": get_selection,
"select_objects": select_objects,
"get_world_info": get_world_info,
"get_collections": get_collections,
# Objects / transforms
"create_primitive": create_primitive,
"delete_objects": delete_objects,
"duplicate_objects": duplicate_objects,
"rename_object": rename_object,
"set_transform": set_transform,
"apply_transform": apply_transform,
"snap_to_ground": snap_to_ground,
"set_origin": set_origin,
"ensure_collection": ensure_collection,
"move_to_collection": move_to_collection,
"set_parent": set_parent,
"clear_parent": clear_parent,
# Materials
"list_materials": list_materials,
"create_material": create_material,
"set_material_params": set_material_params,
"assign_material": assign_material,
"set_texture_maps": set_texture_maps,
# Modifiers
"list_modifiers": list_modifiers,
"add_modifier": add_modifier,
"apply_modifier": apply_modifier,
"remove_modifier": remove_modifier,
# Camera / lights / world
"create_camera": create_camera,
"set_camera_params": set_camera_params,
"set_active_camera": set_active_camera,
"frame_camera_on_objects": frame_camera_on_objects,
"create_light": create_light,
"set_light_params": set_light_params,
"set_world_background": set_world_background,
"set_world_hdri": set_world_hdri,
# Import/export + rendering
"import_model": import_model,
"export_scene": export_scene,
"set_render_settings": set_render_settings,
"render_still": render_still,
"render_animation": render_animation,
# Integrations (registered dynamically in STANDARD, always in FULL)
"get_polyhaven_categories": get_polyhaven_categories,
"search_polyhaven_assets": search_polyhaven_assets,
"download_polyhaven_asset": download_polyhaven_asset,
"set_texture": set_texture,
"get_polyhaven_status": get_polyhaven_status,
"get_hyper3d_status": get_hyper3d_status,
"get_sketchfab_status": get_sketchfab_status,
"search_sketchfab_models": search_sketchfab_models,
"download_sketchfab_model": download_sketchfab_model,
"generate_hyper3d_model_via_text": generate_hyper3d_model_via_text,
"generate_hyper3d_model_via_images": generate_hyper3d_model_via_images,
"poll_rodin_job_status": poll_rodin_job_status,
"import_generated_asset": import_generated_asset,
"get_hunyuan3d_status": get_hunyuan3d_status,
"generate_hunyuan3d_model": generate_hunyuan3d_model,
"poll_hunyuan_job_status": poll_hunyuan_job_status,
"import_generated_asset_hunyuan": import_generated_asset_hunyuan,
# New DSL-backed utilities
"set_shading": set_shading,
"recalculate_normals": recalculate_normals,
"merge_by_distance": merge_by_distance,
"triangulate": triangulate,
"join_objects": join_objects,
"separate_mesh": separate_mesh,
"convert_to_mesh": convert_to_mesh,
"set_visibility": set_visibility,
"set_collection_visibility": set_collection_visibility,
"isolate_objects": isolate_objects,
"uv_smart_project": uv_smart_project,
"uv_unwrap": uv_unwrap,
"uv_pack_islands": uv_pack_islands,
"bake_maps": bake_maps,
"camera_look_at": camera_look_at,
"create_turntable_animation": create_turntable_animation,
"boolean_operation": boolean_operation,
"purge_orphans": purge_orphans,
"pack_external_data": pack_external_data,
"save_blend": save_blend,
}
def _register_tools_for_active_profile() -> None:
raw_profile = os.getenv("BLENDER_MCP_TOOL_PROFILE")
profile = parse_tool_profile(raw_profile)
if raw_profile and profile.value != raw_profile.strip().lower():
logger.warning(f"Unknown BLENDER_MCP_TOOL_PROFILE='{raw_profile}', defaulting to '{profile.value}'")
tool_registry = _build_tool_registry()
integrations = IntegrationsEnabled()
if profile == ToolProfile.STANDARD:
integrations = _try_get_enabled_integrations()
if integrations == IntegrationsEnabled():
logger.info(
"STANDARD profile: no enabled integrations detected (or Blender unavailable). "
"Integration tools will not be registered; restart the server after enabling integrations in Blender."
)
if profile == ToolProfile.FULL:
selected = set(tool_registry.keys())
else:
selected = select_tool_names(profile, integrations)
missing = sorted(selected - set(tool_registry.keys()))
if missing:
logger.warning(f"Profile '{profile.value}' requested unknown tools (skipping): {missing}")
to_register = sorted(selected & set(tool_registry.keys()))
for tool_name in to_register:
_FAST_MCP_REGISTER_TOOL(tool_registry[tool_name])
logger.info(f"Registered {len(to_register)} tools (profile='{profile.value}')")
_register_tools_for_active_profile()
def main():
"""Run the MCP server"""
mcp.run()
if __name__ == "__main__":
main()