from mcp.server.fastmcp import FastMCP, Context
import socket
import json
import asyncio
import logging
from dataclasses import dataclass
from contextlib import asynccontextmanager
from typing import AsyncIterator, Dict, Any, List
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger("SketchupMCPServer")
# Define version directly to avoid pkg_resources dependency
__version__ = "0.1.17"
logger.info(f"SketchupMCP Server version {__version__} starting up")
@dataclass
class SketchupConnection:
host: str
port: int
sock: socket.socket = None
def connect(self) -> bool:
"""Connect to the Sketchup extension socket server"""
if self.sock:
try:
# Test if connection is still alive
self.sock.settimeout(0.1)
self.sock.send(b"")
return True
except (socket.error, BrokenPipeError, ConnectionResetError):
# Connection is dead, close it and reconnect
logger.info("Connection test failed, reconnecting...")
self.disconnect()
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((self.host, self.port))
logger.info(f"Connected to Sketchup at {self.host}:{self.port}")
return True
except Exception as e:
logger.error(f"Failed to connect to Sketchup: {str(e)}")
self.sock = None
return False
def disconnect(self):
"""Disconnect from the Sketchup extension"""
if self.sock:
try:
self.sock.close()
except Exception as e:
logger.error(f"Error disconnecting from Sketchup: {str(e)}")
finally:
self.sock = None
def receive_full_response(self, sock, buffer_size=8192):
"""Receive the complete response, potentially in multiple chunks"""
chunks = []
sock.settimeout(15.0)
try:
while True:
try:
chunk = sock.recv(buffer_size)
if not chunk:
if not chunks:
raise Exception(
"Connection closed before receiving any data"
)
break
chunks.append(chunk)
try:
data = b"".join(chunks)
json.loads(data.decode("utf-8"))
logger.info(f"Received complete response ({len(data)} bytes)")
return data
except json.JSONDecodeError:
continue
except socket.timeout:
logger.warning("Socket timeout during chunked receive")
break
except (ConnectionError, BrokenPipeError, ConnectionResetError) as e:
logger.error(f"Socket connection error during receive: {str(e)}")
raise
except socket.timeout:
logger.warning("Socket timeout during chunked receive")
except Exception as e:
logger.error(f"Error during receive: {str(e)}")
raise
if chunks:
data = b"".join(chunks)
logger.info(f"Returning data after receive completion ({len(data)} bytes)")
try:
json.loads(data.decode("utf-8"))
return data
except json.JSONDecodeError:
raise Exception("Incomplete JSON response received")
else:
raise Exception("No data received")
def send_command(
self, method: str, params: Dict[str, Any] = None, request_id: Any = None
) -> Dict[str, Any]:
"""Send a JSON-RPC request to Sketchup and return the response"""
# Try to connect if not connected
if not self.connect():
raise ConnectionError("Not connected to Sketchup")
# Ensure we're sending a proper JSON-RPC request
if (
method == "tools/call"
and params
and "name" in params
and "arguments" in params
):
# This is already in the correct format
request = {
"jsonrpc": "2.0",
"method": method,
"params": params,
"id": request_id,
}
else:
# This is a direct command - convert to JSON-RPC
command_name = method
command_params = params or {}
# Log the conversion
logger.info(
f"Converting direct command '{command_name}' to JSON-RPC format"
)
request = {
"jsonrpc": "2.0",
"method": "tools/call",
"params": {"name": command_name, "arguments": command_params},
"id": request_id,
}
# Maximum number of retries
max_retries = 2
retry_count = 0
while retry_count <= max_retries:
try:
logger.info(f"Sending JSON-RPC request: {request}")
# Log the exact bytes being sent
request_bytes = json.dumps(request).encode("utf-8") + b"\n"
logger.info(f"Raw bytes being sent: {request_bytes}")
self.sock.sendall(request_bytes)
logger.info(f"Request sent, waiting for response...")
self.sock.settimeout(15.0)
response_data = self.receive_full_response(self.sock)
logger.info(f"Received {len(response_data)} bytes of data")
response = json.loads(response_data.decode("utf-8"))
logger.info(f"Response parsed: {response}")
if "error" in response:
logger.error(f"Sketchup error: {response['error']}")
raise Exception(
response["error"].get("message", "Unknown error from Sketchup")
)
return response.get("result", {})
except (
socket.timeout,
ConnectionError,
BrokenPipeError,
ConnectionResetError,
) as e:
logger.warning(
f"Connection error (attempt {retry_count + 1}/{max_retries + 1}): {str(e)}"
)
retry_count += 1
if retry_count <= max_retries:
logger.info(f"Retrying connection...")
self.disconnect()
if not self.connect():
logger.error("Failed to reconnect")
break
else:
logger.error(f"Max retries reached, giving up")
self.sock = None
raise Exception(
f"Connection to Sketchup lost after {max_retries + 1} attempts: {str(e)}"
)
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON response from Sketchup: {str(e)}")
if "response_data" in locals() and response_data:
logger.error(
f"Raw response (first 200 bytes): {response_data[:200]}"
)
raise Exception(f"Invalid response from Sketchup: {str(e)}")
except Exception as e:
logger.error(f"Error communicating with Sketchup: {str(e)}")
self.sock = None
raise Exception(f"Communication error with Sketchup: {str(e)}")
# Global connection management
_sketchup_connection = None
def get_sketchup_connection():
"""Get or create a persistent Sketchup connection"""
global _sketchup_connection
if _sketchup_connection is not None:
try:
# Test connection with a ping command
ping_request = {"jsonrpc": "2.0", "method": "ping", "params": {}, "id": 0}
_sketchup_connection.sock.sendall(
json.dumps(ping_request).encode("utf-8") + b"\n"
)
return _sketchup_connection
except Exception as e:
logger.warning(f"Existing connection is no longer valid: {str(e)}")
try:
_sketchup_connection.disconnect()
except:
pass
_sketchup_connection = None
if _sketchup_connection is None:
_sketchup_connection = SketchupConnection(host="localhost", port=9876)
if not _sketchup_connection.connect():
logger.error("Failed to connect to Sketchup")
_sketchup_connection = None
raise Exception(
"Could not connect to Sketchup. Make sure the Sketchup extension is running."
)
logger.info("Created new persistent connection to Sketchup")
return _sketchup_connection
@asynccontextmanager
async def server_lifespan(server: FastMCP) -> AsyncIterator[Dict[str, Any]]:
"""Manage server startup and shutdown lifecycle"""
try:
logger.info("SketchupMCP server starting up")
try:
sketchup = get_sketchup_connection()
logger.info("Successfully connected to Sketchup on startup")
except Exception as e:
logger.warning(f"Could not connect to Sketchup on startup: {str(e)}")
logger.warning("Make sure the Sketchup extension is running")
yield {}
finally:
global _sketchup_connection
if _sketchup_connection:
logger.info("Disconnecting from Sketchup")
_sketchup_connection.disconnect()
_sketchup_connection = None
logger.info("SketchupMCP server shut down")
# Create MCP server with lifespan support
mcp = FastMCP("SketchupMCP", lifespan=server_lifespan)
# Tool endpoints
@mcp.tool()
def create_component(
ctx: Context,
type: str = "cube",
position: List[float] = None,
dimensions: List[float] = None,
) -> str:
"""Create a new component in Sketchup"""
try:
logger.info(
f"create_component called with type={type}, position={position}, dimensions={dimensions}, request_id={ctx.request_id}"
)
sketchup = get_sketchup_connection()
params = {
"name": "create_component",
"arguments": {
"type": type,
"position": position or [0, 0, 0],
"dimensions": dimensions or [1, 1, 1],
},
}
logger.info(
f"Calling send_command with method='tools/call', params={params}, request_id={ctx.request_id}"
)
result = sketchup.send_command(
method="tools/call", params=params, request_id=ctx.request_id
)
logger.info(f"create_component result: {result}")
return json.dumps(result)
except Exception as e:
logger.error(f"Error in create_component: {str(e)}")
return f"Error creating component: {str(e)}"
@mcp.tool()
def delete_component(ctx: Context, id: str) -> str:
"""Delete a component by ID"""
try:
sketchup = get_sketchup_connection()
result = sketchup.send_command(
method="tools/call",
params={"name": "delete_component", "arguments": {"id": id}},
request_id=ctx.request_id,
)
return json.dumps(result)
except Exception as e:
return f"Error deleting component: {str(e)}"
@mcp.tool()
def transform_component(
ctx: Context,
id: str,
position: List[float] = None,
rotation: List[float] = None,
scale: List[float] = None,
) -> str:
"""Transform a component's position, rotation, or scale"""
try:
sketchup = get_sketchup_connection()
arguments = {"id": id}
if position is not None:
arguments["position"] = position
if rotation is not None:
arguments["rotation"] = rotation
if scale is not None:
arguments["scale"] = scale
result = sketchup.send_command(
method="tools/call",
params={"name": "transform_component", "arguments": arguments},
request_id=ctx.request_id,
)
return json.dumps(result)
except Exception as e:
return f"Error transforming component: {str(e)}"
@mcp.tool()
def get_selection(ctx: Context) -> str:
"""Get currently selected components"""
try:
sketchup = get_sketchup_connection()
result = sketchup.send_command(
method="tools/call",
params={"name": "get_selection", "arguments": {}},
request_id=ctx.request_id,
)
return json.dumps(result)
except Exception as e:
return f"Error getting selection: {str(e)}"
@mcp.tool()
def set_material(ctx: Context, id: str, material: str) -> str:
"""Set material for a component"""
try:
sketchup = get_sketchup_connection()
result = sketchup.send_command(
method="tools/call",
params={
"name": "set_material",
"arguments": {"id": id, "material": material},
},
request_id=ctx.request_id,
)
return json.dumps(result)
except Exception as e:
return f"Error setting material: {str(e)}"
@mcp.tool()
def export_scene(ctx: Context, format: str = "skp") -> str:
"""Export the current scene"""
try:
sketchup = get_sketchup_connection()
result = sketchup.send_command(
method="tools/call",
params={"name": "export", "arguments": {"format": format}},
request_id=ctx.request_id,
)
return json.dumps(result)
except Exception as e:
return f"Error exporting scene: {str(e)}"
@mcp.tool()
def create_mortise_tenon(
ctx: Context,
mortise_id: str,
tenon_id: str,
width: float = 1.0,
height: float = 1.0,
depth: float = 1.0,
offset_x: float = 0.0,
offset_y: float = 0.0,
offset_z: float = 0.0,
) -> str:
"""Create a mortise and tenon joint between two components"""
try:
logger.info(
f"create_mortise_tenon called with mortise_id={mortise_id}, tenon_id={tenon_id}, width={width}, height={height}, depth={depth}, offsets=({offset_x}, {offset_y}, {offset_z})"
)
sketchup = get_sketchup_connection()
result = sketchup.send_command(
method="tools/call",
params={
"name": "create_mortise_tenon",
"arguments": {
"mortise_id": mortise_id,
"tenon_id": tenon_id,
"width": width,
"height": height,
"depth": depth,
"offset_x": offset_x,
"offset_y": offset_y,
"offset_z": offset_z,
},
},
request_id=ctx.request_id,
)
logger.info(f"create_mortise_tenon result: {result}")
return json.dumps(result)
except Exception as e:
logger.error(f"Error in create_mortise_tenon: {str(e)}")
return f"Error creating mortise and tenon joint: {str(e)}"
@mcp.tool()
def create_dovetail(
ctx: Context,
tail_id: str,
pin_id: str,
width: float = 1.0,
height: float = 1.0,
depth: float = 1.0,
angle: float = 15.0,
num_tails: int = 3,
offset_x: float = 0.0,
offset_y: float = 0.0,
offset_z: float = 0.0,
) -> str:
"""Create a dovetail joint between two components"""
try:
logger.info(
f"create_dovetail called with tail_id={tail_id}, pin_id={pin_id}, width={width}, height={height}, depth={depth}, angle={angle}, num_tails={num_tails}"
)
sketchup = get_sketchup_connection()
result = sketchup.send_command(
method="tools/call",
params={
"name": "create_dovetail",
"arguments": {
"tail_id": tail_id,
"pin_id": pin_id,
"width": width,
"height": height,
"depth": depth,
"angle": angle,
"num_tails": num_tails,
"offset_x": offset_x,
"offset_y": offset_y,
"offset_z": offset_z,
},
},
request_id=ctx.request_id,
)
logger.info(f"create_dovetail result: {result}")
return json.dumps(result)
except Exception as e:
logger.error(f"Error in create_dovetail: {str(e)}")
return f"Error creating dovetail joint: {str(e)}"
@mcp.tool()
def create_finger_joint(
ctx: Context,
board1_id: str,
board2_id: str,
width: float = 1.0,
height: float = 1.0,
depth: float = 1.0,
num_fingers: int = 5,
offset_x: float = 0.0,
offset_y: float = 0.0,
offset_z: float = 0.0,
) -> str:
"""Create a finger joint (box joint) between two components"""
try:
logger.info(
f"create_finger_joint called with board1_id={board1_id}, board2_id={board2_id}, width={width}, height={height}, depth={depth}, num_fingers={num_fingers}"
)
sketchup = get_sketchup_connection()
result = sketchup.send_command(
method="tools/call",
params={
"name": "create_finger_joint",
"arguments": {
"board1_id": board1_id,
"board2_id": board2_id,
"width": width,
"height": height,
"depth": depth,
"num_fingers": num_fingers,
"offset_x": offset_x,
"offset_y": offset_y,
"offset_z": offset_z,
},
},
request_id=ctx.request_id,
)
logger.info(f"create_finger_joint result: {result}")
return json.dumps(result)
except Exception as e:
logger.error(f"Error in create_finger_joint: {str(e)}")
return f"Error creating finger joint: {str(e)}"
@mcp.tool()
def eval_ruby(ctx: Context, code: str) -> str:
"""Evaluate arbitrary Ruby code in Sketchup"""
try:
logger.info(f"eval_ruby called with code length: {len(code)}")
sketchup = get_sketchup_connection()
result = sketchup.send_command(
method="tools/call",
params={"name": "eval_ruby", "arguments": {"code": code}},
request_id=ctx.request_id,
)
logger.info(f"eval_ruby result: {result}")
# Format the response to include the result
response = {
"success": True,
"result": result.get("content", [{"text": "Success"}])[0].get(
"text", "Success"
)
if isinstance(result.get("content"), list)
and len(result.get("content", [])) > 0
else "Success",
}
return json.dumps(response)
except Exception as e:
logger.error(f"Error in eval_ruby: {str(e)}")
return json.dumps({"success": False, "error": str(e)})
def main():
mcp.run()
if __name__ == "__main__":
main()