Skip to main content
Glama

3D-MCP

by team-plask
python-templates.ts18.3 kB
import { join } from "path"; import { type PluginConfig } from "../config/pluginsConfig"; import { getTypeForLanguage, getDocstringType, } from "../utils/types"; import { existsSync, mkdirSync, writeFileSync, readFileSync, readdirSync, rmSync, } from "fs"; /** * Generate parameter signature from schema */ function generatePythonParamSignature(schema: any): string { if (!schema || !schema.properties) return ""; const requiredProps = new Set<string>( schema.required || [] ); const parameters = Object.entries(schema.properties).map( ([name, prop]) => { const typeName = getTypeForLanguage(prop, "python"); if (requiredProps.has(name)) { return `${name}: ${typeName}`; } else { // Set default to None for optional parameters return `${name}: Optional[${typeName}] = None`; } } ); return parameters.join(", "); } /** * Generate properly formatted Python docstring with detailed type information */ function generatePythonDocstring(tool: any): string { const { description, parameters } = tool; const paramDocs = []; if (parameters && parameters.properties) { for (const [name, prop] of Object.entries( parameters.properties )) { // Use detailed docstring types with full information const typeName = getDocstringType(prop); const paramDesc = (prop as any).description || `The ${name} parameter`; paramDocs.push( ` ${name} (${typeName}): ${paramDesc}` ); } } // Generate detailed return type documentation const returnProps = tool.returns?.properties || {}; let returnDocs; if (Object.keys(returnProps).length > 0) { returnDocs = Object.entries(returnProps) .map(([name, prop]) => { // Use detailed docstring types for returns const typeName = getDocstringType(prop); const desc = (prop as any).description || `The ${name} return value`; return ` ${name} (${typeName}): ${desc}`; }) .join("\n"); } else { returnDocs = " Dict[str, bool]: Operation response with success status"; } return ` """ ${description} Args: ${ paramDocs.length > 0 ? paramDocs.join("\n") : " No parameters" } Returns: ${returnDocs} """`; } /** * Generate parameter validation code for Python with explicit parameters */ function generatePythonParamValidation( schema: any ): string { if (!schema || !schema.properties) return ""; const validationBlocks = []; // Validate enum values where applicable for (const [name, prop] of Object.entries( schema.properties )) { if ((prop as any).enum) { const validValues = JSON.stringify( (prop as any).enum ).replace(/"/g, "'"); validationBlocks.push(` # Validate enum values for ${name} if ${name} is not None and ${name} not in ${validValues}: raise ValueError(f"Parameter '${name}' must be one of ${validValues}, got {${name}}") `); } } return validationBlocks.join("\n"); } /** * Generate common implementation for Python-based plugins */ export function generatePythonImplementation( category: string, tools: any[], plugin: PluginConfig ): string { const toolsImplementation = tools .map((tool) => { const paramJsonSchema = tool.parameters; const returnJsonSchema = tool.returns; const toolName = tool.name; const docstring = generatePythonDocstring(tool); const paramSignature = generatePythonParamSignature(paramJsonSchema); const paramValidation = generatePythonParamValidation(paramJsonSchema); // Determine return structure const returnProps = returnJsonSchema ? Object.keys(returnJsonSchema.properties || {}) : ["success"]; const returnStructure = returnProps .map((prop) => { if (prop === "success") return '"success": True'; return `"${prop}": None `; }) .join(", # TODO: Implement \n "); // Create dictionary of parameters for logging const buildParamsDict = Object.keys( paramJsonSchema?.properties || {} ) .map((param) => `"${param}": ${param}`) .join(", "); const paramsDict = buildParamsDict ? `{${buildParamsDict}}` : "{}"; return `def ${toolName}(${paramSignature}) -> Dict[str, Any]: ${docstring} tool_name = "${toolName}" # Define tool name for logging params = ${paramsDict} # Create params dict for logging ${plugin.printFormat} try: ${paramValidation || " # No parameters to validate"} # TODO: Implement actual ${plugin.name} API calls # This is a placeholder implementation return { ${returnStructure} } except Exception as e: ${plugin.errorFormat} return {"success": False, "error": str(e)}`; }) .join("\n\n"); return `# Generated ${ plugin.name } implementation for ${category} atomic tools # This file is generated - DO NOT EDIT DIRECTLY ${plugin.importStatements.join("\n")} from typing import Dict, Any, Optional, List, Union, Tuple, Literal ${toolsImplementation} `; } /** * Generate Python server component for plugin */ export function generatePythonServer( plugin: PluginConfig, categories: string[] ): string { const imports = categories .map( (category) => `from .${category} import ${category}_atomic` ) .join("\n"); const toolRegistrations = categories .map((category) => { // Get all the atomic files in the category directory const categoryPath = join( process.cwd(), "packages", plugin.dir, category ); // Make sure the directory exists before trying to read it if (!existsSync(categoryPath)) { return ` # Register ${category} tools\n # No tools found in ${category}`; } return ` # Register ${category} tools\n for name, func in inspect.getmembers(${category}_atomic, inspect.isfunction):\n print(f"Registering tool: {name}")\n register_tool(name, func)`; }) .join("\n\n"); // Wrap application-specific imports in try-except blocks const safeImportStatements = plugin.importStatements .map((imp) => { // Detect if this is an application-specific import if ( imp.includes("bpy") || imp.includes("maya.cmds") || imp.includes("maya.mel") || imp.includes("unreal") ) { return `try: ${imp} HAS_APP_LIBS = True except ImportError: print(f"Warning: Could not import ${ imp.split(" ")[1] }. Running in mock mode.") HAS_APP_LIBS = False`; } return imp; }) .join("\n"); return `# Generated ${plugin.name} MCP server # This file is generated - DO NOT EDIT DIRECTLY bl_info = { "name": "3D MCP", "author": "Plask", "version": (0, 3), "blender": (3, 0, 0), "location": "View3D > Sidebar > 3dMCP", "description": "Connect 3D Tools to LLM via MCP", "category": "Interface", } ${safeImportStatements} import json import inspect import socket import sys import os import argparse from typing import Dict, Any, Callable, List, Union, Optional, Literal, TypedDict, Tuple import threading import traceback import time import uuid import queue ${imports} # Global variables - this will store tools tools = {} # Task queue for main thread execution task_queue = queue.Queue() results_store = {} # Store task results by ID def execute_on_main_thread(tool_name, params): """Schedule a tool execution on the main thread and wait for result""" task_id = str(uuid.uuid4()) task = {"id": task_id, "tool": tool_name, "params": params, "completed": False} results_store[task_id] = { "completed": False, "result": None, } # Initialize with proper structure task_queue.put(task) # Register timer if not already registered if not bpy.app.timers.is_registered(process_task_queue): bpy.app.timers.register(process_task_queue) # Wait for task completion try: while not results_store.get(task_id, {}).get("completed", False): time.sleep(0.1) except Exception as e: print(f"Error waiting for task completion: {str(e)}") return {"success": False, "error": f"Task execution error: {str(e)}"} # Get result and clean up result = results_store.get(task_id, {}).get( "result", {"success": False, "error": "No result found"} ) del results_store[task_id] # Clean up return result def process_task_queue(): """Process queued tasks on the main thread""" if task_queue.empty(): return 0.1 # Check again in 0.1 seconds task = task_queue.get() tool_name = task["tool"] params = task["params"] task_id = task["id"] try: if tool_name in tools: result = tools[tool_name](**params) else: result = {"success": False, "error": f"Unknown tool: {tool_name}"} except Exception as e: print(f"Error executing {tool_name}: {str(e)}") traceback.print_exc() result = {"success": False, "error": str(e)} # Store result results_store[task_id] = {"completed": True, "result": result} return 0.1 # Check again in 0.1 seconds def register_tool(name: str, func: Callable): """Register a tool function with the server""" print(f"Registering tool: {name}") tools[name] = func def register_all_tools(): """Register all available tool functions""" print("Registering all tools...") ${toolRegistrations} def server_loop(): """Main server loop running in a separate thread""" try: server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_socket.bind((bpy.context.scene.mcp_host, bpy.context.scene.mcp_port)) server_socket.listen(5) # Store socket in bpy.types for access from other functions bpy.types.mcp_server_socket = server_socket print( f"MCP Server started on {bpy.context.scene.mcp_host}:{bpy.context.scene.mcp_port}" ) while bpy.context.scene.mcp_server_running: try: client_socket, address = server_socket.accept() print(f"Connection from {address}") handle_client(client_socket) except socket.timeout: continue except Exception as e: if bpy.context.scene.mcp_server_running: print(f"Error accepting connection: {str(e)}") except Exception as e: print(f"Server error: {str(e)}") finally: if hasattr(bpy.types, "mcp_server_socket"): bpy.types.mcp_server_socket.close() del bpy.types.mcp_server_socket print("Server socket closed") def handle_client(client_socket): """Handle client connection and route requests to appropriate tool""" try: data = client_socket.recv(4096) if not data: return print(f"Received data: {data}") # Check if this is an HTTP request request_str = data.decode("utf-8", errors="ignore") if request_str.startswith("POST") or request_str.startswith("GET"): print("Received HTTP request, extracting JSON body") # Extract the JSON body from HTTP request (after the \\r\\n\\r\\n) body_start = request_str.find("\\r\\n\\r\\n") + 4 if body_start > 4: # Found the body separator json_body = request_str[body_start:] print(f"Extracted JSON body: {json_body}") request = json.loads(json_body) else: raise ValueError("Invalid HTTP request format, no body found") else: # Regular JSON request request = json.loads(request_str) print(f"Parsed request: {request}") tool_name = request.get("tool") params = request.get("params", {}) # Execute on main thread and get result result = execute_on_main_thread(tool_name, params) response_data = json.dumps(result).encode("utf-8") # Check if we need to send HTTP response if request_str.startswith("POST") or request_str.startswith("GET"): response = ( b"HTTP/1.1 200 OK\\r\\n" + b"Content-Type: application/json\\r\\n" + b"Access-Control-Allow-Origin: *\\r\\n" + b"Content-Length: " + str(len(response_data)).encode() + b"\\r\\n" + b"\\r\\n" + response_data ) else: response = response_data client_socket.send(response) except Exception as e: print(f"Error handling client request: {str(e)}") traceback.print_exc() error_msg = {"success": False, "error": str(e)} response_data = json.dumps(error_msg).encode("utf-8") # Try to determine if this was an HTTP request try: request_str = data.decode("utf-8", errors="ignore") if request_str.startswith("POST") or request_str.startswith("GET"): response = ( b"HTTP/1.1 500 Internal Server Error\\r\\n" + b"Content-Type: application/json\\r\\n" + b"Access-Control-Allow-Origin: *\\r\\n" + b"Content-Length: " + str(len(response_data)).encode() + b"\\r\\n" + b"\\r\\n" + response_data ) else: response = response_data except: response = response_data client_socket.send(response) finally: client_socket.close() def start_server(new_host="localhost", new_port=8000): """Start the MCP server""" scene = bpy.context.scene # Set host and port in scene properties scene.mcp_host = new_host scene.mcp_port = new_port if scene.mcp_server_running: print("Server is already running") return register_all_tools() scene.mcp_server_running = True # Create and start server thread server_thread = threading.Thread(target=server_loop) server_thread.daemon = True server_thread.start() # Store thread in bpy.types for access from other functions bpy.types.mcp_server_thread = server_thread print(f"MCP Server started on {scene.mcp_host}:{scene.mcp_port}") def stop_server(): """Stop the MCP server""" scene = bpy.context.scene if not scene.mcp_server_running: return scene.mcp_server_running = False # Close the socket to interrupt accept() if hasattr(bpy.types, "mcp_server_socket"): try: bpy.types.mcp_server_socket.close() del bpy.types.mcp_server_socket except: pass # Wait for the thread to finish if hasattr(bpy.types, "mcp_server_thread"): try: bpy.types.mcp_server_thread.join(timeout=1.0) del bpy.types.mcp_server_thread except: pass print("MCP Server stopped") # ------------------------------------------------------------- # Blender UI Components # ------------------------------------------------------------- class MCP_PT_Panel(bpy.types.Panel): """MCP Server Panel""" bl_label = "3D MCP Server" bl_idname = "MCP_PT_Panel" bl_space_type = "VIEW_3D" bl_region_type = "UI" bl_category = "3dMCP" def draw(self, context): layout = self.layout scene = context.scene layout.prop(scene, "mcp_port") layout.prop(scene, "mcp_host") if not scene.mcp_server_running: layout.operator("mcp.start_server", text="Start MCP Server") else: layout.operator("mcp.stop_server", text="Stop MCP Server") layout.label(text=f"Running on {scene.mcp_host}:{scene.mcp_port}") class MCP_OT_StartServer(bpy.types.Operator): """Start the MCP Server""" bl_idname = "mcp.start_server" bl_label = "Start MCP Server" def execute(self, context): start_server(context.scene.mcp_host, context.scene.mcp_port) return {"FINISHED"} class MCP_OT_StopServer(bpy.types.Operator): """Stop the MCP Server""" bl_idname = "mcp.stop_server" bl_label = "Stop MCP Server" def execute(self, context): stop_server() return {"FINISHED"} def register(): """Register the Blender addon""" bpy.types.Scene.mcp_port = bpy.props.IntProperty( name="Port", description="Port for the MCP server", default=8000, min=1024, max=65535, ) bpy.types.Scene.mcp_host = bpy.props.StringProperty( name="Host", description="Host for the MCP server", default="localhost" ) bpy.types.Scene.mcp_server_running = bpy.props.BoolProperty( name="Server Running", default=False ) bpy.utils.register_class(MCP_PT_Panel) bpy.utils.register_class(MCP_OT_StartServer) bpy.utils.register_class(MCP_OT_StopServer) register_all_tools() print("3D MCP addon registered") def unregister(): """Unregister the Blender addon""" # Stop server if running if bpy.context.scene.mcp_server_running: stop_server() bpy.utils.unregister_class(MCP_PT_Panel) bpy.utils.unregister_class(MCP_OT_StartServer) bpy.utils.unregister_class(MCP_OT_StopServer) del bpy.types.Scene.mcp_port del bpy.types.Scene.mcp_host del bpy.types.Scene.mcp_server_running # Clean up any remaining server resources if hasattr(bpy.types, "mcp_server_socket"): try: bpy.types.mcp_server_socket.close() del bpy.types.mcp_server_socket except: pass if hasattr(bpy.types, "mcp_server_thread"): del bpy.types.mcp_server_thread print("3D MCP addon unregistered") if __name__ == "__main__": register() `; }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/team-plask/3d-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server