Skip to main content
Glama
rhino_server.py9.37 kB
""" Rhino Bridge Server - Socket server for Rhino-Python communication. This module implements a socket server that runs inside Rhino's Python editor and allows external Python processes to communicate with and control Rhino. Version: 1.0 (2025-03-13) """ from typing import Dict, Any, Optional, List, Tuple, Union import socket import json import sys import traceback import threading import time # Import Rhino-specific modules try: import Rhino import rhinoscriptsyntax as rs import scriptcontext as sc except ImportError: print("Warning: Rhino modules not found. This script must run inside Rhino's Python editor.") # Server configuration HOST = '127.0.0.1' PORT = 8888 SERVER_VERSION = "RhinoMCP-1.0" SERVER_START_TIME = time.strftime("%Y-%m-%d %H:%M:%S") class RhinoEncoder(json.JSONEncoder): """Custom JSON encoder for handling Rhino and .NET objects. Args: None Returns: Encoded JSON string """ def default(self, obj: Any) -> Any: # Handle .NET Version objects and other common types try: if hasattr(obj, 'ToString'): return str(obj) elif hasattr(obj, 'Count') and hasattr(obj, 'Item'): return [self.default(obj.Item[i]) for i in range(obj.Count)] else: return str(obj) # Last resort: convert anything to string except: return str(obj) # Absolute fallback # Let the base class handle other types return super(RhinoEncoder, self).default(obj) def handle_client(conn: socket.socket, addr: Tuple[str, int]) -> None: """Handle individual client connections. Args: conn: Socket connection object addr: Client address tuple (host, port) Returns: None """ print(f"Connection established from {addr}") try: while True: # Receive command data = conn.recv(4096) if not data: break # Parse the command try: command_obj = json.loads(data.decode('utf-8')) cmd_type = command_obj.get('type', '') cmd_data = command_obj.get('data', {}) print(f"Received command: {cmd_type}") result: Dict[str, Any] = {'status': 'error', 'message': 'Unknown command'} # Process different command types if cmd_type == 'ping': result = { 'status': 'success', 'message': 'Rhino is connected', 'data': { 'version': str(Rhino.RhinoApp.Version), 'has_active_doc': Rhino.RhinoDoc.ActiveDoc is not None, 'server_version': SERVER_VERSION, 'server_start_time': SERVER_START_TIME, 'script_path': __file__ } } elif cmd_type == 'create_curve': try: # Extract points from the command data points_data = cmd_data.get('points', []) # Check if we have enough points for a curve if len(points_data) < 2: raise ValueError("At least 2 points are required to create a curve") # Convert point data to Rhino points points = [] for pt in points_data: x = pt.get('x', 0.0) y = pt.get('y', 0.0) z = pt.get('z', 0.0) points.append(Rhino.Geometry.Point3d(x, y, z)) # Create the curve doc = Rhino.RhinoDoc.ActiveDoc if not doc: raise Exception("No active Rhino document") # Create a NURBS curve curve = Rhino.Geometry.Curve.CreateInterpolatedCurve(points, 3) if not curve: raise Exception("Failed to create curve") # Add to document id = doc.Objects.AddCurve(curve) # Force view update doc.Views.Redraw() result = { 'status': 'success', 'message': f'Curve created with {len(points)} points', 'data': { 'id': str(id), 'point_count': len(points) } } except Exception as e: result = { 'status': 'error', 'message': f'Curve creation error: {str(e)}', 'traceback': traceback.format_exc() } elif cmd_type == 'refresh_view': try: doc = Rhino.RhinoDoc.ActiveDoc if not doc: raise Exception("No active Rhino document") doc.Views.Redraw() result = { 'status': 'success', 'message': 'View refreshed' } except Exception as e: result = { 'status': 'error', 'message': f'View refresh error: {str(e)}' } elif cmd_type == 'run_script': try: script = cmd_data.get('script', '') if not script: raise ValueError("Empty script") # Execute the script in Rhino's Python context locals_dict = {} exec(script, globals(), locals_dict) # Return the result if available script_result = locals_dict.get('result', None) result = { 'status': 'success', 'message': 'Script executed successfully', 'data': { 'result': script_result } } except Exception as e: result = { 'status': 'error', 'message': f'Script execution error: {str(e)}', 'traceback': traceback.format_exc() } # Send the result back to the client response = json.dumps(result, cls=RhinoEncoder) conn.sendall(response.encode('utf-8')) except json.JSONDecodeError: conn.sendall(json.dumps({ 'status': 'error', 'message': 'Invalid JSON format' }).encode('utf-8')) except Exception as e: print(f"Connection error: {str(e)}") finally: print(f"Connection closed with {addr}") conn.close() def start_server() -> None: """Start the socket server. Args: None Returns: None Raises: OSError: If the server fails to start """ with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) try: s.bind((HOST, PORT)) s.listen(5) print(f"Rhino Bridge started! Listening on {HOST}:{PORT}") print(f"Server version: {SERVER_VERSION}") print(f"Start time: {SERVER_START_TIME}") while True: conn, addr = s.accept() # Handle each client in a separate thread client_thread = threading.Thread(target=handle_client, args=(conn, addr)) client_thread.daemon = True client_thread.start() except OSError as e: if e.errno == 10048: # Address already in use print("Error: Address already in use. Is the Rhino Bridge already running?") else: print(f"Socket error: {str(e)}") except Exception as e: print(f"Server error: {str(e)}") traceback.print_exc() if __name__ == "__main__": # Only start if running directly in Rhino's Python editor if 'Rhino' in sys.modules: start_server() else: print("This script should be run inside Rhino's Python editor.")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/a01110946/RhinoMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server