Skip to main content
Glama
socket_client.py19.6 kB
""" Simple Socket Client for Blender Communication ============================================== Connects to the Blender socket server to send commands. Much simpler than MCP stdio - no async context issues. """ import socket import json import time import logging from typing import Dict, Any, Optional logger = logging.getLogger(__name__) class BlenderSocketClient: """Client for communicating with Blender socket server""" def __init__(self, host: str = "127.0.0.1", port: int = 8765, timeout: int = 60): self.host = host self.port = port self.timeout = timeout def _send_command(self, command: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """Send a command to Blender and return the response with improved error handling""" request = { "command": command, "params": params or {} } max_retries = 3 retry_delay = 1.0 for attempt in range(max_retries): try: # Create socket connection with shorter timeout for connection with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.settimeout(10) # 10 second connection timeout try: sock.connect((self.host, self.port)) except (socket.timeout, ConnectionRefusedError, OSError) as e: if attempt == max_retries - 1: return { "success": False, "error": f"Cannot connect to Blender server at {self.host}:{self.port}. Make sure Blender socket server is running. Error: {str(e)}" } print(f"Connection attempt {attempt + 1} failed, retrying in {retry_delay}s...") time.sleep(retry_delay) continue # Set longer timeout for command execution sock.settimeout(self.timeout) # Send request request_json = json.dumps(request) try: sock.sendall(request_json.encode('utf-8')) except (socket.timeout, BrokenPipeError, ConnectionResetError) as e: return {"success": False, "error": f"Failed to send command: {str(e)}"} # Receive response with chunked reading for large responses try: response_chunks = [] while True: chunk = sock.recv(8192) if not chunk: break response_chunks.append(chunk) if not response_chunks: return {"success": False, "error": "Empty response from server"} response_data = b''.join(response_chunks).decode('utf-8') except socket.timeout: return {"success": False, "error": f"Command timed out after {self.timeout} seconds"} except (ConnectionResetError, BrokenPipeError) as e: return {"success": False, "error": f"Connection lost during command execution: {str(e)}"} # Parse JSON response try: if not response_data.strip(): return {"success": False, "error": "Empty response from Blender server"} response = json.loads(response_data) return response except json.JSONDecodeError as e: return { "success": False, "error": f"Invalid JSON response from server. Raw response: {response_data[:100]}..." } except Exception as e: if attempt == max_retries - 1: return {"success": False, "error": f"Unexpected error: {str(e)}"} print(f"Attempt {attempt + 1} failed with error: {str(e)}, retrying...") time.sleep(retry_delay) return {"success": False, "error": "All connection attempts failed"} def health_check(self) -> Dict[str, Any]: """Check if Blender server is responsive""" return self._send_command("health_check") def get_scene_info(self) -> Dict[str, Any]: """Get current scene information""" return self._send_command("get_scene_info") def list_cameras(self) -> Dict[str, Any]: """List all cameras in the scene""" return self._send_command("list_cameras") def render_camera(self, camera_name: str = "Camera", width: int = 1920, height: int = 1080, format_type: str = "PNG", quality: int = 90) -> Dict[str, Any]: """Render from specified camera using ultra-safe approach for large scenes""" # Ultra-conservative approach - no timeouts, maximum safety render_code = f""" import bpy import os import base64 import gc try: print(f"DEBUG SAFE: Looking for camera named: '{camera_name}'") # Find camera camera = bpy.data.objects.get("{camera_name}") if not camera or camera.type != 'CAMERA': print(f"DEBUG SAFE: Camera '{camera_name}' not found") available_cameras = [obj.name for obj in bpy.data.objects if obj.type == 'CAMERA'] print(f"DEBUG SAFE: Available cameras: {{available_cameras}}") result = {{"success": False, "error": f"Camera '{camera_name}' not found in scene. Available: {{', '.join(available_cameras)}}"}} else: print(f"DEBUG SAFE: Found camera '{camera_name}', preparing ultra-safe render") # Scene complexity detection scene = bpy.context.scene total_objects = len(bpy.data.objects) total_materials = len(bpy.data.materials) total_meshes = len(bpy.data.meshes) is_complex_scene = total_objects > 200 or total_materials > 50 or total_meshes > 100 print(f"DEBUG SAFE: Scene stats - Objects: {{total_objects}}, Materials: {{total_materials}}, Meshes: {{total_meshes}}") print(f"DEBUG SAFE: Complex scene detected: {{is_complex_scene}} (using stricter thresholds)") # Store ALL original settings for complete restoration original_camera = scene.camera original_engine = scene.render.engine original_resolution_x = scene.render.resolution_x original_resolution_y = scene.render.resolution_y original_resolution_percentage = scene.render.resolution_percentage original_file_format = scene.render.image_settings.file_format original_color_mode = scene.render.image_settings.color_mode original_compression = scene.render.image_settings.compression original_use_motion_blur = scene.render.use_motion_blur original_filepath = scene.render.filepath # Store workbench specific settings if available original_render_aa = None original_viewport_aa = None if hasattr(scene.display, 'render_aa'): original_render_aa = scene.display.render_aa if hasattr(scene.display, 'viewport_aa'): original_viewport_aa = scene.display.viewport_aa try: print("DEBUG SAFE: Setting ultra-conservative render settings") # Force garbage collection before making any changes gc.collect() # Set camera scene.camera = camera print(f"DEBUG SAFE: Set active camera to '{{camera.name}}'") # Set resolution (but cap it for complex scenes) if is_complex_scene: # Reduce resolution for complex scenes to prevent crashes max_res = 512 actual_width = min({width}, max_res) actual_height = min({height}, max_res) print(f"DEBUG SAFE: Reducing resolution for complex scene: {{actual_width}}x{{actual_height}}") else: actual_width = {width} actual_height = {height} scene.render.resolution_x = actual_width scene.render.resolution_y = actual_height scene.render.resolution_percentage = 100 # Use ultra-safe render settings print("DEBUG SAFE: Setting Workbench engine with minimal settings") scene.render.engine = 'BLENDER_WORKBENCH' # Set workbench to absolute minimum settings if hasattr(scene.display, 'render_aa'): scene.display.render_aa = 'OFF' if hasattr(scene.display, 'viewport_aa'): scene.display.viewport_aa = 'OFF' # Disable ALL optional features scene.render.use_motion_blur = False if hasattr(scene.render, 'use_freestyle'): scene.render.use_freestyle = False if hasattr(scene.render, 'use_compositing'): scene.render.use_compositing = False if hasattr(scene.render, 'use_sequencer'): scene.render.use_sequencer = False # Set image format to safest possible scene.render.image_settings.file_format = 'PNG' scene.render.image_settings.color_mode = 'RGB' scene.render.image_settings.compression = 0 # No compression for speed # Use temporary file with timestamp to avoid any conflicts import time timestamp = int(time.time() * 1000000) # Microsecond precision temp_file = f"/tmp/blender_safe_render_{{timestamp}}.png" scene.render.filepath = temp_file print(f"DEBUG SAFE: Starting ultra-safe render to {{temp_file}}") print("DEBUG SAFE: Using minimal Workbench settings, no anti-aliasing, no post-processing") # Force another garbage collection right before render gc.collect() # Advanced fallback: for complex scenes, create a temporary scene with ONLY the camera render_successful = False try: if is_complex_scene: print("DEBUG SAFE: Creating temporary minimal scene for render") temp_scene = bpy.data.scenes.new(name="Griptape_Temp") # Duplicate camera (object and data) so we don't touch original cam_copy = camera.copy() cam_copy.data = camera.data.copy() temp_scene.collection.objects.link(cam_copy) temp_scene.camera = cam_copy # Copy minimal render settings temp_scene.render.engine = 'BLENDER_WORKBENCH' temp_scene.render.resolution_x = actual_width temp_scene.render.resolution_y = actual_height temp_scene.render.resolution_percentage = 100 # Switch context to new scene (safe in background) current_window = bpy.context.window original_scene_ctx = current_window.scene current_window.scene = temp_scene # Perform OpenGL render (fast & light) bpy.ops.render.opengl(write_still=True, view_context=False) # Restore original context current_window.scene = original_scene_ctx # Cleanup: remove duplicated camera object and its data, then remove temp scene try: if cam_copy.name in bpy.data.objects: bpy.data.objects.remove(cam_copy, do_unlink=True) if cam_copy.data and cam_copy.data.users == 0: if cam_copy.data.name in bpy.data.cameras: bpy.data.cameras.remove(cam_copy.data, do_unlink=True) except Exception as cleanup_obj_err: print(f"DEBUG SAFE: Warning removing temp camera object: {{cleanup_obj_err}}") try: bpy.data.scenes.remove(temp_scene, do_unlink=True) except Exception as cleanup_scene_err: print(f"DEBUG SAFE: Warning removing temp scene: {{cleanup_scene_err}}") # The OpenGL render writes to temp_scene.render.filepath; ensure it's saved ogl_path = temp_scene.render.filepath if temp_scene.render.filepath else temp_file # Move/symlink to our expected temp_file name if os.path.exists(ogl_path): os.rename(ogl_path, temp_file) else: bpy.ops.render.render(write_still=True) print("DEBUG SAFE: Render completed successfully") render_successful = True except Exception as render_error: print(f"DEBUG SAFE: Render failed with error: {{str(render_error)}}") render_successful = False result = {{"success": False, "error": f"Render operation failed: {{str(render_error)}}"}} if render_successful: # Check if file was created and read it if os.path.exists(temp_file) and os.path.getsize(temp_file) > 0: try: with open(temp_file, 'rb') as f: image_data = base64.b64encode(f.read()).decode('utf-8') # Clean up temp file immediately os.remove(temp_file) result = {{ "success": True, "image": image_data, "camera_used": camera.name, "width": actual_width, "height": actual_height, "render_time": 0.0, "scene_complexity": "complex" if is_complex_scene else "normal", "render_mode": "ultra_safe" }} print(f"DEBUG SAFE: Successfully encoded {{len(image_data)}} bytes of image data") except Exception as file_error: result = {{"success": False, "error": f"Failed to read render result: {{str(file_error)}}"}} # Clean up temp file on error try: if os.path.exists(temp_file): os.remove(temp_file) except: pass else: result = {{"success": False, "error": "Render completed but no output file was created or file is empty"}} # Clean up temp file if it exists try: if os.path.exists(temp_file): os.remove(temp_file) except: pass except Exception as setup_error: print(f"DEBUG SAFE: Setup error: {{setup_error}}") result = {{"success": False, "error": f"Render setup failed: {{str(setup_error)}}"}} finally: # ALWAYS restore ALL original settings - this is critical try: print("DEBUG SAFE: Restoring all original settings") if original_camera: scene.camera = original_camera scene.render.engine = original_engine scene.render.resolution_x = original_resolution_x scene.render.resolution_y = original_resolution_y scene.render.resolution_percentage = original_resolution_percentage scene.render.image_settings.file_format = original_file_format scene.render.image_settings.color_mode = original_color_mode scene.render.image_settings.compression = original_compression scene.render.use_motion_blur = original_use_motion_blur scene.render.filepath = original_filepath # Restore workbench settings if they were changed if original_render_aa is not None and hasattr(scene.display, 'render_aa'): scene.display.render_aa = original_render_aa if original_viewport_aa is not None and hasattr(scene.display, 'viewport_aa'): scene.display.viewport_aa = original_viewport_aa print("DEBUG SAFE: All settings restored successfully") except Exception as restore_error: print(f"DEBUG SAFE: Warning - failed to restore some settings: {{restore_error}}") # Final garbage collection gc.collect() print("DEBUG SAFE: Cleanup completed") except Exception as e: import traceback error_details = traceback.format_exc() print(f"DEBUG SAFE: Major error in render operation: {{e}}") print(f"DEBUG SAFE: Full traceback: {{error_details}}") result = {{"success": False, "error": f"Render operation failed with system error: {{str(e)}}"}} """ return self._send_command("execute_code", {"code": render_code}) def execute_code(self, code: str) -> Dict[str, Any]: """Execute arbitrary Python code in Blender""" return self._send_command("execute_code", {"code": code}) class BlenderSocketClientManager: """Singleton manager for Blender socket client""" _instance: Optional[BlenderSocketClient] = None @classmethod def get_client(cls, host: str = "localhost", port: int = 8765) -> BlenderSocketClient: """Get or create the socket client instance""" if cls._instance is None: cls._instance = BlenderSocketClient(host, port) return cls._instance @classmethod def reset_client(cls): """Reset the client instance (useful for changing connection settings)""" cls._instance = None # Convenience functions for easier integration def health_check() -> Dict[str, Any]: """Quick health check function""" client = BlenderSocketClient() return client.health_check() def get_scene_info() -> Dict[str, Any]: """Quick scene info function""" client = BlenderSocketClient() return client.get_scene_info() def list_cameras() -> Dict[str, Any]: """Quick camera list function""" client = BlenderSocketClient() return client.list_cameras() def render_camera(camera_name: str = "Camera", width: int = 1920, height: int = 1080, format_type: str = "PNG", quality: int = 90) -> Dict[str, Any]: """Quick render function""" client = BlenderSocketClient(timeout=120) # Longer timeout for rendering return client.render_camera(camera_name, width, height, format_type, quality)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/griptape-ai/griptape-nodes-blender'

If you have feedback or need assistance with the MCP directory API, please join our Discord server