SketchupMCP
by mhyrr
- sketchup-mcp
- src
- sketchup_mcp
from mcp.server.fastmcp import FastMCP, Context
import socket
import json
import asyncio
import logging
from dataclasses import dataclass
from contextlib import asynccontextmanager
from typing import AsyncIterator, Dict, Any, List
# Configure logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("SketchupMCPServer")
# Define version directly to avoid pkg_resources dependency
__version__ = "0.1.17"
logger.info(f"SketchupMCP Server version {__version__} starting up")
@dataclass
class SketchupConnection:
host: str
port: int
sock: socket.socket = None
def connect(self) -> bool:
"""Connect to the Sketchup extension socket server"""
if self.sock:
try:
# Test if connection is still alive
self.sock.settimeout(0.1)
self.sock.send(b'')
return True
except (socket.error, BrokenPipeError, ConnectionResetError):
# Connection is dead, close it and reconnect
logger.info("Connection test failed, reconnecting...")
self.disconnect()
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((self.host, self.port))
logger.info(f"Connected to Sketchup at {self.host}:{self.port}")
return True
except Exception as e:
logger.error(f"Failed to connect to Sketchup: {str(e)}")
self.sock = None
return False
def disconnect(self):
"""Disconnect from the Sketchup extension"""
if self.sock:
try:
self.sock.close()
except Exception as e:
logger.error(f"Error disconnecting from Sketchup: {str(e)}")
finally:
self.sock = None
def receive_full_response(self, sock, buffer_size=8192):
"""Receive the complete response, potentially in multiple chunks"""
chunks = []
sock.settimeout(15.0)
try:
while True:
try:
chunk = sock.recv(buffer_size)
if not chunk:
if not chunks:
raise Exception("Connection closed before receiving any data")
break
chunks.append(chunk)
try:
data = b''.join(chunks)
json.loads(data.decode('utf-8'))
logger.info(f"Received complete response ({len(data)} bytes)")
return data
except json.JSONDecodeError:
continue
except socket.timeout:
logger.warning("Socket timeout during chunked receive")
break
except (ConnectionError, BrokenPipeError, ConnectionResetError) as e:
logger.error(f"Socket connection error during receive: {str(e)}")
raise
except socket.timeout:
logger.warning("Socket timeout during chunked receive")
except Exception as e:
logger.error(f"Error during receive: {str(e)}")
raise
if chunks:
data = b''.join(chunks)
logger.info(f"Returning data after receive completion ({len(data)} bytes)")
try:
json.loads(data.decode('utf-8'))
return data
except json.JSONDecodeError:
raise Exception("Incomplete JSON response received")
else:
raise Exception("No data received")
def send_command(self, method: str, params: Dict[str, Any] = None, request_id: Any = None) -> Dict[str, Any]:
"""Send a JSON-RPC request to Sketchup and return the response"""
# Try to connect if not connected
if not self.connect():
raise ConnectionError("Not connected to Sketchup")
# Ensure we're sending a proper JSON-RPC request
if method == "tools/call" and params and "name" in params and "arguments" in params:
# This is already in the correct format
request = {
"jsonrpc": "2.0",
"method": method,
"params": params,
"id": request_id
}
else:
# This is a direct command - convert to JSON-RPC
command_name = method
command_params = params or {}
# Log the conversion
logger.info(f"Converting direct command '{command_name}' to JSON-RPC format")
request = {
"jsonrpc": "2.0",
"method": "tools/call",
"params": {
"name": command_name,
"arguments": command_params
},
"id": request_id
}
# Maximum number of retries
max_retries = 2
retry_count = 0
while retry_count <= max_retries:
try:
logger.info(f"Sending JSON-RPC request: {request}")
# Log the exact bytes being sent
request_bytes = json.dumps(request).encode('utf-8') + b'\n'
logger.info(f"Raw bytes being sent: {request_bytes}")
self.sock.sendall(request_bytes)
logger.info(f"Request sent, waiting for response...")
self.sock.settimeout(15.0)
response_data = self.receive_full_response(self.sock)
logger.info(f"Received {len(response_data)} bytes of data")
response = json.loads(response_data.decode('utf-8'))
logger.info(f"Response parsed: {response}")
if "error" in response:
logger.error(f"Sketchup error: {response['error']}")
raise Exception(response["error"].get("message", "Unknown error from Sketchup"))
return response.get("result", {})
except (socket.timeout, ConnectionError, BrokenPipeError, ConnectionResetError) as e:
logger.warning(f"Connection error (attempt {retry_count+1}/{max_retries+1}): {str(e)}")
retry_count += 1
if retry_count <= max_retries:
logger.info(f"Retrying connection...")
self.disconnect()
if not self.connect():
logger.error("Failed to reconnect")
break
else:
logger.error(f"Max retries reached, giving up")
self.sock = None
raise Exception(f"Connection to Sketchup lost after {max_retries+1} attempts: {str(e)}")
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON response from Sketchup: {str(e)}")
if 'response_data' in locals() and response_data:
logger.error(f"Raw response (first 200 bytes): {response_data[:200]}")
raise Exception(f"Invalid response from Sketchup: {str(e)}")
except Exception as e:
logger.error(f"Error communicating with Sketchup: {str(e)}")
self.sock = None
raise Exception(f"Communication error with Sketchup: {str(e)}")
# Global connection management
_sketchup_connection = None
def get_sketchup_connection():
"""Get or create a persistent Sketchup connection"""
global _sketchup_connection
if _sketchup_connection is not None:
try:
# Test connection with a ping command
ping_request = {
"jsonrpc": "2.0",
"method": "ping",
"params": {},
"id": 0
}
_sketchup_connection.sock.sendall(json.dumps(ping_request).encode('utf-8') + b'\n')
return _sketchup_connection
except Exception as e:
logger.warning(f"Existing connection is no longer valid: {str(e)}")
try:
_sketchup_connection.disconnect()
except:
pass
_sketchup_connection = None
if _sketchup_connection is None:
_sketchup_connection = SketchupConnection(host="localhost", port=9876)
if not _sketchup_connection.connect():
logger.error("Failed to connect to Sketchup")
_sketchup_connection = None
raise Exception("Could not connect to Sketchup. Make sure the Sketchup extension is running.")
logger.info("Created new persistent connection to Sketchup")
return _sketchup_connection
@asynccontextmanager
async def server_lifespan(server: FastMCP) -> AsyncIterator[Dict[str, Any]]:
"""Manage server startup and shutdown lifecycle"""
try:
logger.info("SketchupMCP server starting up")
try:
sketchup = get_sketchup_connection()
logger.info("Successfully connected to Sketchup on startup")
except Exception as e:
logger.warning(f"Could not connect to Sketchup on startup: {str(e)}")
logger.warning("Make sure the Sketchup extension is running")
yield {}
finally:
global _sketchup_connection
if _sketchup_connection:
logger.info("Disconnecting from Sketchup")
_sketchup_connection.disconnect()
_sketchup_connection = None
logger.info("SketchupMCP server shut down")
# Create MCP server with lifespan support
mcp = FastMCP(
"SketchupMCP",
description="Sketchup integration through the Model Context Protocol",
lifespan=server_lifespan
)
# Tool endpoints
@mcp.tool()
def create_component(
ctx: Context,
type: str = "cube",
position: List[float] = None,
dimensions: List[float] = None
) -> str:
"""Create a new component in Sketchup"""
try:
logger.info(f"create_component called with type={type}, position={position}, dimensions={dimensions}, request_id={ctx.request_id}")
sketchup = get_sketchup_connection()
params = {
"name": "create_component",
"arguments": {
"type": type,
"position": position or [0,0,0],
"dimensions": dimensions or [1,1,1]
}
}
logger.info(f"Calling send_command with method='tools/call', params={params}, request_id={ctx.request_id}")
result = sketchup.send_command(
method="tools/call",
params=params,
request_id=ctx.request_id
)
logger.info(f"create_component result: {result}")
return json.dumps(result)
except Exception as e:
logger.error(f"Error in create_component: {str(e)}")
return f"Error creating component: {str(e)}"
@mcp.tool()
def delete_component(
ctx: Context,
id: str
) -> str:
"""Delete a component by ID"""
try:
sketchup = get_sketchup_connection()
result = sketchup.send_command(
method="tools/call",
params={
"name": "delete_component",
"arguments": {"id": id}
},
request_id=ctx.request_id
)
return json.dumps(result)
except Exception as e:
return f"Error deleting component: {str(e)}"
@mcp.tool()
def transform_component(
ctx: Context,
id: str,
position: List[float] = None,
rotation: List[float] = None,
scale: List[float] = None
) -> str:
"""Transform a component's position, rotation, or scale"""
try:
sketchup = get_sketchup_connection()
arguments = {"id": id}
if position is not None:
arguments["position"] = position
if rotation is not None:
arguments["rotation"] = rotation
if scale is not None:
arguments["scale"] = scale
result = sketchup.send_command(
method="tools/call",
params={
"name": "transform_component",
"arguments": arguments
},
request_id=ctx.request_id
)
return json.dumps(result)
except Exception as e:
return f"Error transforming component: {str(e)}"
@mcp.tool()
def get_selection(ctx: Context) -> str:
"""Get currently selected components"""
try:
sketchup = get_sketchup_connection()
result = sketchup.send_command(
method="tools/call",
params={
"name": "get_selection",
"arguments": {}
},
request_id=ctx.request_id
)
return json.dumps(result)
except Exception as e:
return f"Error getting selection: {str(e)}"
@mcp.tool()
def set_material(
ctx: Context,
id: str,
material: str
) -> str:
"""Set material for a component"""
try:
sketchup = get_sketchup_connection()
result = sketchup.send_command(
method="tools/call",
params={
"name": "set_material",
"arguments": {
"id": id,
"material": material
}
},
request_id=ctx.request_id
)
return json.dumps(result)
except Exception as e:
return f"Error setting material: {str(e)}"
@mcp.tool()
def export_scene(
ctx: Context,
format: str = "skp"
) -> str:
"""Export the current scene"""
try:
sketchup = get_sketchup_connection()
result = sketchup.send_command(
method="tools/call",
params={
"name": "export",
"arguments": {
"format": format
}
},
request_id=ctx.request_id
)
return json.dumps(result)
except Exception as e:
return f"Error exporting scene: {str(e)}"
@mcp.tool()
def create_mortise_tenon(
ctx: Context,
mortise_id: str,
tenon_id: str,
width: float = 1.0,
height: float = 1.0,
depth: float = 1.0,
offset_x: float = 0.0,
offset_y: float = 0.0,
offset_z: float = 0.0
) -> str:
"""Create a mortise and tenon joint between two components"""
try:
logger.info(f"create_mortise_tenon called with mortise_id={mortise_id}, tenon_id={tenon_id}, width={width}, height={height}, depth={depth}, offsets=({offset_x}, {offset_y}, {offset_z})")
sketchup = get_sketchup_connection()
result = sketchup.send_command(
method="tools/call",
params={
"name": "create_mortise_tenon",
"arguments": {
"mortise_id": mortise_id,
"tenon_id": tenon_id,
"width": width,
"height": height,
"depth": depth,
"offset_x": offset_x,
"offset_y": offset_y,
"offset_z": offset_z
}
},
request_id=ctx.request_id
)
logger.info(f"create_mortise_tenon result: {result}")
return json.dumps(result)
except Exception as e:
logger.error(f"Error in create_mortise_tenon: {str(e)}")
return f"Error creating mortise and tenon joint: {str(e)}"
@mcp.tool()
def create_dovetail(
ctx: Context,
tail_id: str,
pin_id: str,
width: float = 1.0,
height: float = 1.0,
depth: float = 1.0,
angle: float = 15.0,
num_tails: int = 3,
offset_x: float = 0.0,
offset_y: float = 0.0,
offset_z: float = 0.0
) -> str:
"""Create a dovetail joint between two components"""
try:
logger.info(f"create_dovetail called with tail_id={tail_id}, pin_id={pin_id}, width={width}, height={height}, depth={depth}, angle={angle}, num_tails={num_tails}")
sketchup = get_sketchup_connection()
result = sketchup.send_command(
method="tools/call",
params={
"name": "create_dovetail",
"arguments": {
"tail_id": tail_id,
"pin_id": pin_id,
"width": width,
"height": height,
"depth": depth,
"angle": angle,
"num_tails": num_tails,
"offset_x": offset_x,
"offset_y": offset_y,
"offset_z": offset_z
}
},
request_id=ctx.request_id
)
logger.info(f"create_dovetail result: {result}")
return json.dumps(result)
except Exception as e:
logger.error(f"Error in create_dovetail: {str(e)}")
return f"Error creating dovetail joint: {str(e)}"
@mcp.tool()
def create_finger_joint(
ctx: Context,
board1_id: str,
board2_id: str,
width: float = 1.0,
height: float = 1.0,
depth: float = 1.0,
num_fingers: int = 5,
offset_x: float = 0.0,
offset_y: float = 0.0,
offset_z: float = 0.0
) -> str:
"""Create a finger joint (box joint) between two components"""
try:
logger.info(f"create_finger_joint called with board1_id={board1_id}, board2_id={board2_id}, width={width}, height={height}, depth={depth}, num_fingers={num_fingers}")
sketchup = get_sketchup_connection()
result = sketchup.send_command(
method="tools/call",
params={
"name": "create_finger_joint",
"arguments": {
"board1_id": board1_id,
"board2_id": board2_id,
"width": width,
"height": height,
"depth": depth,
"num_fingers": num_fingers,
"offset_x": offset_x,
"offset_y": offset_y,
"offset_z": offset_z
}
},
request_id=ctx.request_id
)
logger.info(f"create_finger_joint result: {result}")
return json.dumps(result)
except Exception as e:
logger.error(f"Error in create_finger_joint: {str(e)}")
return f"Error creating finger joint: {str(e)}"
@mcp.tool()
def eval_ruby(
ctx: Context,
code: str
) -> str:
"""Evaluate arbitrary Ruby code in Sketchup"""
try:
logger.info(f"eval_ruby called with code length: {len(code)}")
sketchup = get_sketchup_connection()
result = sketchup.send_command(
method="tools/call",
params={
"name": "eval_ruby",
"arguments": {
"code": code
}
},
request_id=ctx.request_id
)
logger.info(f"eval_ruby result: {result}")
# Format the response to include the result
response = {
"success": True,
"result": result.get("content", [{"text": "Success"}])[0].get("text", "Success") if isinstance(result.get("content"), list) and len(result.get("content", [])) > 0 else "Success"
}
return json.dumps(response)
except Exception as e:
logger.error(f"Error in eval_ruby: {str(e)}")
return json.dumps({
"success": False,
"error": str(e)
})
def main():
mcp.run()
if __name__ == "__main__":
main()