SketchupMCP

from mcp.server.fastmcp import FastMCP, Context import socket import json import asyncio import logging from dataclasses import dataclass from contextlib import asynccontextmanager from typing import AsyncIterator, Dict, Any, List # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger("SketchupMCPServer") # Define version directly to avoid pkg_resources dependency __version__ = "0.1.17" logger.info(f"SketchupMCP Server version {__version__} starting up") @dataclass class SketchupConnection: host: str port: int sock: socket.socket = None def connect(self) -> bool: """Connect to the Sketchup extension socket server""" if self.sock: try: # Test if connection is still alive self.sock.settimeout(0.1) self.sock.send(b'') return True except (socket.error, BrokenPipeError, ConnectionResetError): # Connection is dead, close it and reconnect logger.info("Connection test failed, reconnecting...") self.disconnect() try: self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.connect((self.host, self.port)) logger.info(f"Connected to Sketchup at {self.host}:{self.port}") return True except Exception as e: logger.error(f"Failed to connect to Sketchup: {str(e)}") self.sock = None return False def disconnect(self): """Disconnect from the Sketchup extension""" if self.sock: try: self.sock.close() except Exception as e: logger.error(f"Error disconnecting from Sketchup: {str(e)}") finally: self.sock = None def receive_full_response(self, sock, buffer_size=8192): """Receive the complete response, potentially in multiple chunks""" chunks = [] sock.settimeout(15.0) try: while True: try: chunk = sock.recv(buffer_size) if not chunk: if not chunks: raise Exception("Connection closed before receiving any data") break chunks.append(chunk) try: data = b''.join(chunks) json.loads(data.decode('utf-8')) logger.info(f"Received complete response ({len(data)} bytes)") return data except json.JSONDecodeError: continue except socket.timeout: logger.warning("Socket timeout during chunked receive") break except (ConnectionError, BrokenPipeError, ConnectionResetError) as e: logger.error(f"Socket connection error during receive: {str(e)}") raise except socket.timeout: logger.warning("Socket timeout during chunked receive") except Exception as e: logger.error(f"Error during receive: {str(e)}") raise if chunks: data = b''.join(chunks) logger.info(f"Returning data after receive completion ({len(data)} bytes)") try: json.loads(data.decode('utf-8')) return data except json.JSONDecodeError: raise Exception("Incomplete JSON response received") else: raise Exception("No data received") def send_command(self, method: str, params: Dict[str, Any] = None, request_id: Any = None) -> Dict[str, Any]: """Send a JSON-RPC request to Sketchup and return the response""" # Try to connect if not connected if not self.connect(): raise ConnectionError("Not connected to Sketchup") # Ensure we're sending a proper JSON-RPC request if method == "tools/call" and params and "name" in params and "arguments" in params: # This is already in the correct format request = { "jsonrpc": "2.0", "method": method, "params": params, "id": request_id } else: # This is a direct command - convert to JSON-RPC command_name = method command_params = params or {} # Log the conversion logger.info(f"Converting direct command '{command_name}' to JSON-RPC format") request = { "jsonrpc": "2.0", "method": "tools/call", "params": { "name": command_name, "arguments": command_params }, "id": request_id } # Maximum number of retries max_retries = 2 retry_count = 0 while retry_count <= max_retries: try: logger.info(f"Sending JSON-RPC request: {request}") # Log the exact bytes being sent request_bytes = json.dumps(request).encode('utf-8') + b'\n' logger.info(f"Raw bytes being sent: {request_bytes}") self.sock.sendall(request_bytes) logger.info(f"Request sent, waiting for response...") self.sock.settimeout(15.0) response_data = self.receive_full_response(self.sock) logger.info(f"Received {len(response_data)} bytes of data") response = json.loads(response_data.decode('utf-8')) logger.info(f"Response parsed: {response}") if "error" in response: logger.error(f"Sketchup error: {response['error']}") raise Exception(response["error"].get("message", "Unknown error from Sketchup")) return response.get("result", {}) except (socket.timeout, ConnectionError, BrokenPipeError, ConnectionResetError) as e: logger.warning(f"Connection error (attempt {retry_count+1}/{max_retries+1}): {str(e)}") retry_count += 1 if retry_count <= max_retries: logger.info(f"Retrying connection...") self.disconnect() if not self.connect(): logger.error("Failed to reconnect") break else: logger.error(f"Max retries reached, giving up") self.sock = None raise Exception(f"Connection to Sketchup lost after {max_retries+1} attempts: {str(e)}") except json.JSONDecodeError as e: logger.error(f"Invalid JSON response from Sketchup: {str(e)}") if 'response_data' in locals() and response_data: logger.error(f"Raw response (first 200 bytes): {response_data[:200]}") raise Exception(f"Invalid response from Sketchup: {str(e)}") except Exception as e: logger.error(f"Error communicating with Sketchup: {str(e)}") self.sock = None raise Exception(f"Communication error with Sketchup: {str(e)}") # Global connection management _sketchup_connection = None def get_sketchup_connection(): """Get or create a persistent Sketchup connection""" global _sketchup_connection if _sketchup_connection is not None: try: # Test connection with a ping command ping_request = { "jsonrpc": "2.0", "method": "ping", "params": {}, "id": 0 } _sketchup_connection.sock.sendall(json.dumps(ping_request).encode('utf-8') + b'\n') return _sketchup_connection except Exception as e: logger.warning(f"Existing connection is no longer valid: {str(e)}") try: _sketchup_connection.disconnect() except: pass _sketchup_connection = None if _sketchup_connection is None: _sketchup_connection = SketchupConnection(host="localhost", port=9876) if not _sketchup_connection.connect(): logger.error("Failed to connect to Sketchup") _sketchup_connection = None raise Exception("Could not connect to Sketchup. Make sure the Sketchup extension is running.") logger.info("Created new persistent connection to Sketchup") return _sketchup_connection @asynccontextmanager async def server_lifespan(server: FastMCP) -> AsyncIterator[Dict[str, Any]]: """Manage server startup and shutdown lifecycle""" try: logger.info("SketchupMCP server starting up") try: sketchup = get_sketchup_connection() logger.info("Successfully connected to Sketchup on startup") except Exception as e: logger.warning(f"Could not connect to Sketchup on startup: {str(e)}") logger.warning("Make sure the Sketchup extension is running") yield {} finally: global _sketchup_connection if _sketchup_connection: logger.info("Disconnecting from Sketchup") _sketchup_connection.disconnect() _sketchup_connection = None logger.info("SketchupMCP server shut down") # Create MCP server with lifespan support mcp = FastMCP( "SketchupMCP", description="Sketchup integration through the Model Context Protocol", lifespan=server_lifespan ) # Tool endpoints @mcp.tool() def create_component( ctx: Context, type: str = "cube", position: List[float] = None, dimensions: List[float] = None ) -> str: """Create a new component in Sketchup""" try: logger.info(f"create_component called with type={type}, position={position}, dimensions={dimensions}, request_id={ctx.request_id}") sketchup = get_sketchup_connection() params = { "name": "create_component", "arguments": { "type": type, "position": position or [0,0,0], "dimensions": dimensions or [1,1,1] } } logger.info(f"Calling send_command with method='tools/call', params={params}, request_id={ctx.request_id}") result = sketchup.send_command( method="tools/call", params=params, request_id=ctx.request_id ) logger.info(f"create_component result: {result}") return json.dumps(result) except Exception as e: logger.error(f"Error in create_component: {str(e)}") return f"Error creating component: {str(e)}" @mcp.tool() def delete_component( ctx: Context, id: str ) -> str: """Delete a component by ID""" try: sketchup = get_sketchup_connection() result = sketchup.send_command( method="tools/call", params={ "name": "delete_component", "arguments": {"id": id} }, request_id=ctx.request_id ) return json.dumps(result) except Exception as e: return f"Error deleting component: {str(e)}" @mcp.tool() def transform_component( ctx: Context, id: str, position: List[float] = None, rotation: List[float] = None, scale: List[float] = None ) -> str: """Transform a component's position, rotation, or scale""" try: sketchup = get_sketchup_connection() arguments = {"id": id} if position is not None: arguments["position"] = position if rotation is not None: arguments["rotation"] = rotation if scale is not None: arguments["scale"] = scale result = sketchup.send_command( method="tools/call", params={ "name": "transform_component", "arguments": arguments }, request_id=ctx.request_id ) return json.dumps(result) except Exception as e: return f"Error transforming component: {str(e)}" @mcp.tool() def get_selection(ctx: Context) -> str: """Get currently selected components""" try: sketchup = get_sketchup_connection() result = sketchup.send_command( method="tools/call", params={ "name": "get_selection", "arguments": {} }, request_id=ctx.request_id ) return json.dumps(result) except Exception as e: return f"Error getting selection: {str(e)}" @mcp.tool() def set_material( ctx: Context, id: str, material: str ) -> str: """Set material for a component""" try: sketchup = get_sketchup_connection() result = sketchup.send_command( method="tools/call", params={ "name": "set_material", "arguments": { "id": id, "material": material } }, request_id=ctx.request_id ) return json.dumps(result) except Exception as e: return f"Error setting material: {str(e)}" @mcp.tool() def export_scene( ctx: Context, format: str = "skp" ) -> str: """Export the current scene""" try: sketchup = get_sketchup_connection() result = sketchup.send_command( method="tools/call", params={ "name": "export", "arguments": { "format": format } }, request_id=ctx.request_id ) return json.dumps(result) except Exception as e: return f"Error exporting scene: {str(e)}" @mcp.tool() def create_mortise_tenon( ctx: Context, mortise_id: str, tenon_id: str, width: float = 1.0, height: float = 1.0, depth: float = 1.0, offset_x: float = 0.0, offset_y: float = 0.0, offset_z: float = 0.0 ) -> str: """Create a mortise and tenon joint between two components""" try: logger.info(f"create_mortise_tenon called with mortise_id={mortise_id}, tenon_id={tenon_id}, width={width}, height={height}, depth={depth}, offsets=({offset_x}, {offset_y}, {offset_z})") sketchup = get_sketchup_connection() result = sketchup.send_command( method="tools/call", params={ "name": "create_mortise_tenon", "arguments": { "mortise_id": mortise_id, "tenon_id": tenon_id, "width": width, "height": height, "depth": depth, "offset_x": offset_x, "offset_y": offset_y, "offset_z": offset_z } }, request_id=ctx.request_id ) logger.info(f"create_mortise_tenon result: {result}") return json.dumps(result) except Exception as e: logger.error(f"Error in create_mortise_tenon: {str(e)}") return f"Error creating mortise and tenon joint: {str(e)}" @mcp.tool() def create_dovetail( ctx: Context, tail_id: str, pin_id: str, width: float = 1.0, height: float = 1.0, depth: float = 1.0, angle: float = 15.0, num_tails: int = 3, offset_x: float = 0.0, offset_y: float = 0.0, offset_z: float = 0.0 ) -> str: """Create a dovetail joint between two components""" try: logger.info(f"create_dovetail called with tail_id={tail_id}, pin_id={pin_id}, width={width}, height={height}, depth={depth}, angle={angle}, num_tails={num_tails}") sketchup = get_sketchup_connection() result = sketchup.send_command( method="tools/call", params={ "name": "create_dovetail", "arguments": { "tail_id": tail_id, "pin_id": pin_id, "width": width, "height": height, "depth": depth, "angle": angle, "num_tails": num_tails, "offset_x": offset_x, "offset_y": offset_y, "offset_z": offset_z } }, request_id=ctx.request_id ) logger.info(f"create_dovetail result: {result}") return json.dumps(result) except Exception as e: logger.error(f"Error in create_dovetail: {str(e)}") return f"Error creating dovetail joint: {str(e)}" @mcp.tool() def create_finger_joint( ctx: Context, board1_id: str, board2_id: str, width: float = 1.0, height: float = 1.0, depth: float = 1.0, num_fingers: int = 5, offset_x: float = 0.0, offset_y: float = 0.0, offset_z: float = 0.0 ) -> str: """Create a finger joint (box joint) between two components""" try: logger.info(f"create_finger_joint called with board1_id={board1_id}, board2_id={board2_id}, width={width}, height={height}, depth={depth}, num_fingers={num_fingers}") sketchup = get_sketchup_connection() result = sketchup.send_command( method="tools/call", params={ "name": "create_finger_joint", "arguments": { "board1_id": board1_id, "board2_id": board2_id, "width": width, "height": height, "depth": depth, "num_fingers": num_fingers, "offset_x": offset_x, "offset_y": offset_y, "offset_z": offset_z } }, request_id=ctx.request_id ) logger.info(f"create_finger_joint result: {result}") return json.dumps(result) except Exception as e: logger.error(f"Error in create_finger_joint: {str(e)}") return f"Error creating finger joint: {str(e)}" @mcp.tool() def eval_ruby( ctx: Context, code: str ) -> str: """Evaluate arbitrary Ruby code in Sketchup""" try: logger.info(f"eval_ruby called with code length: {len(code)}") sketchup = get_sketchup_connection() result = sketchup.send_command( method="tools/call", params={ "name": "eval_ruby", "arguments": { "code": code } }, request_id=ctx.request_id ) logger.info(f"eval_ruby result: {result}") # Format the response to include the result response = { "success": True, "result": result.get("content", [{"text": "Success"}])[0].get("text", "Success") if isinstance(result.get("content"), list) and len(result.get("content", [])) > 0 else "Success" } return json.dumps(response) except Exception as e: logger.error(f"Error in eval_ruby: {str(e)}") return json.dumps({ "success": False, "error": str(e) }) def main(): mcp.run() if __name__ == "__main__": main()