Skip to main content
Glama
freecad_mcp_server.py64.7 kB
#!/usr/bin/env python3 """ Advanced FreeCAD MCP Server using FastMCP This script implements a Model Context Protocol (MCP) server for FreeCAD using the FastMCP library and incorporating best practices. Usage: python src/mcp_freecad/server/freecad_mcp_server.py """ import argparse import asyncio import json import logging import logging.handlers import os import pickle import socketserver import struct import sys import threading from typing import Any, Dict, List, Optional # --- FastMCP Import --- try: from fastmcp import FastMCP from fastmcp.exceptions import FastMCPError # Use actual exception name mcp_import_error = None except ImportError as e: mcp_import_error = str(e) # Dummy classes/objects for type hints if import fails class FastMCP: def __init__(self, name: str, version: str = "0.1.0"): pass def tool(self): return lambda f: f def resource(self, pattern: str): return lambda f: f async def run_stdio(self): pass class FastMCPError(Exception): pass # Define dummy for the actual exception # ErrorCode is not defined in fastmcp 0.4.1 exceptions # class ErrorCode: InvalidParams = -32602; InternalError = -32603; MethodNotFound = -32601 class ToolContext: pass # Keep dummy for type hint usage in comments # --- FreeCAD Connection Import --- # Import the correct FreeCADConnection class from freecad_connection_manager try: # Assuming execution from workspace root or correct PYTHONPATH from src.mcp_freecad.client.freecad_connection_manager import FreeCADConnection FREECAD_CONNECTION_AVAILABLE = True except ImportError: try: # Fallback if running from within the server directory structure from ...client.freecad_connection_manager import FreeCADConnection FREECAD_CONNECTION_AVAILABLE = True except ImportError: logging.warning( "FreeCAD connection manager module could not be imported. Server will run without FreeCAD features." ) FREECAD_CONNECTION_AVAILABLE = False FreeCADConnection = None # Define as None if unavailable # --- Configuration & Globals --- VERSION = "1.0.0" # Server version CONFIG_PATH = "config.json" # Path relative to repo root CONFIG: Dict[str, Any] = {} FC_CONNECTION: Optional[FreeCADConnection] = None # Ensure logs directory exists LOG_DIR = "logs" os.makedirs(LOG_DIR, exist_ok=True) LOG_FILE_PATH = os.path.join(LOG_DIR, "freecad_mcp_server.log") LOGGING_PORT = 9020 # Define port for logging receiver # Set up logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", handlers=[ logging.FileHandler(LOG_FILE_PATH), logging.StreamHandler(), # Keep logging to console as well ], ) logger = logging.getLogger("advanced_freecad_mcp") # --- Logging Socket Receiver --- class LogRequestHandler(socketserver.StreamRequestHandler): """Handler for incoming log records.""" def handle(self): """ Handle multiple requests - each expected to be a 4-byte length, followed by the pickled log record. Logs the record. """ while True: chunk = self.connection.recv(4) if len(chunk) < 4: break slen = struct.unpack(">L", chunk)[0] chunk = self.connection.recv(slen) while len(chunk) < slen: chunk = chunk + self.connection.recv(slen - len(chunk)) try: obj = pickle.loads(chunk) record = logging.makeLogRecord(obj) # Process the record using the server's logger logger.handle(record) except Exception as e: logger.error(f"Error processing log record: {e}") # Log the raw chunk data if unpickling fails logger.debug(f"Raw log data received: {chunk!r}") break # Stop processing if there's an error with this connection class LogRecordSocketReceiver(socketserver.ThreadingTCPServer): """Simple TCP socket server to receive log records.""" allow_reuse_address = True def __init__(self, host="localhost", port=LOGGING_PORT, handler=LogRequestHandler): socketserver.ThreadingTCPServer.__init__(self, (host, port), handler) self.abort = 0 self.timeout = 1 self.logname = None def serve_until_stopped(self): import select abort = 0 while not abort: rd, wr, ex = select.select([self.socket.fileno()], [], [], self.timeout) if rd: self.handle_request() abort = self.abort # --- Configuration Loading --- def load_config(config_path: str) -> Dict[str, Any]: """Load configuration from a JSON file.""" try: with open(config_path, "r") as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError) as e: logger.warning( f"Could not load config from {config_path}: {e}. Using defaults." ) # Return default configuration return { "server": {"name": "advanced-freecad-mcp-server", "version": VERSION}, "freecad": { "connection_method": "auto", # auto, server, or bridge "host": "localhost", "port": 12345, "freecad_path": "freecad", }, "tools": { "enable_primitives": True, # Enable primitive creation tools "enable_sketcher": True, # Enable sketcher tools "enable_constraints": True, # Enable constraint tools "enable_measurements": True, # Enable measurement tools }, } # --- FreeCAD Connection Initialization --- def initialize_freecad_connection(config: Dict[str, Any]): """Initialize connection to FreeCAD.""" global FC_CONNECTION if not FREECAD_CONNECTION_AVAILABLE or not FreeCADConnection: logger.warning("FreeCAD connection unavailable. Skipping initialization.") FC_CONNECTION = None return freecad_config = config.get("freecad", {}) logger.info("Attempting to connect to FreeCAD (preferring bridge mode)...") try: # Force prefer_method to 'bridge' for headless server FC_CONNECTION = FreeCADConnection( host=freecad_config.get("host", "localhost"), port=freecad_config.get("port", 12345), freecad_path=freecad_config.get("freecad_path", "freecad"), auto_connect=True, prefer_method="bridge", # Explicitly set bridge as preferred ) if FC_CONNECTION.is_connected(): connection_type = FC_CONNECTION.get_connection_type() logger.info(f"Connected to FreeCAD using {connection_type} method") try: version_info = FC_CONNECTION.get_version() version_str = ".".join( str(v) for v in version_info.get("version", ["Unknown"]) ) logger.info(f"FreeCAD version: {version_str}") except Exception as e: logger.warning(f"Could not retrieve FreeCAD version: {e}") else: logger.warning("Failed to connect to FreeCAD using bridge method.") FC_CONNECTION = None # Set to None if connection failed except Exception as e: logger.error(f"Error initializing FreeCAD connection: {e}") FC_CONNECTION = None # --- Input Sanitization Helper --- def sanitize_name(name: str) -> str: """Basic sanitization for names used in scripts (prevents breaking string literals).""" return name.replace('"', '\\"').replace("\\\\", "\\\\\\\\") def sanitize_path(path: str) -> str: """Basic path sanitization.""" # Add more robust checks if needed (e.g., check against allowed directories) if ".." in path: raise FastMCPError("Path cannot contain '..'") # Ensure absolute paths aren't used unexpectedly if desired # if os.path.isabs(path): # raise FastMCPError("Absolute paths are not allowed") return path # --- Progress Reporting --- class ToolContext: """Context for tool execution, including progress reporting capabilities.""" _instance = None def __init__(self): self.progress_callback = None @classmethod def get(cls): """Get the singleton instance of ToolContext.""" if cls._instance is None: cls._instance = ToolContext() return cls._instance def set_progress_callback(self, callback): """Set the callback function for progress reporting.""" self.progress_callback = callback async def send_progress(self, progress: float, message: str = None): """ Send a progress update. Args: progress: Progress value between 0.0 and 1.0 message: Optional message describing the current progress """ if self.progress_callback and callable(self.progress_callback): await self.progress_callback(progress, message) else: logger.debug( f"Progress update: {progress:.2%} - {message or 'Processing...'}" ) # --- MCP Server Initialization --- server_name = CONFIG.get("server", {}).get("name", "advanced-freecad-mcp-server") mcp = FastMCP(server_name, version=VERSION) # --- Background Connection Check --- async def connection_check_loop(config: Dict[str, Any]): """Periodically checks and attempts to establish FreeCAD connection if not already connected.""" global FC_CONNECTION check_interval = 5 # seconds freecad_config = config.get("freecad", {}) logger.info( f"Starting background FreeCAD connection check (interval: {check_interval}s)" ) while True: try: if FC_CONNECTION is None or not FC_CONNECTION.is_connected(): logger.info( "Attempting to establish FreeCAD connection (background check)..." ) temp_connection = None try: # Attempt connection using the preferred bridge method temp_connection = FreeCADConnection( host=freecad_config.get("host", "localhost"), port=freecad_config.get("port", 12345), freecad_path=freecad_config.get("freecad_path", "freecad"), auto_connect=False, # Connect manually below prefer_method="bridge", ) if temp_connection.connect( prefer_method="bridge" ): # Explicitly connect connection_type = temp_connection.get_connection_type() logger.info( f"Successfully connected to FreeCAD via {connection_type} (background check)" ) # Check version info (optional, but good feedback) try: version_info = temp_connection.get_version() version_str = ".".join( str(v) for v in version_info.get("version", ["Unknown"]) ) logger.info( f"FreeCAD version: {version_str} (background check)" ) except Exception as e: logger.warning( f"Could not retrieve FreeCAD version (background check): {e}" ) # Assign to global connection object FC_CONNECTION = temp_connection else: # Connection failed, FC_CONNECTION remains None # Logger warning happens inside connect() or FreeCADConnection init implicitly logger.debug("Background connection attempt failed.") # Explicitly close if connection object was created but connect failed if temp_connection and hasattr(temp_connection, "close"): temp_connection.close() except Exception as e: logger.error( f"Error during background FreeCAD connection attempt: {e}", exc_info=False, ) # Log less verbosely # Explicitly close if connection object was created but failed during setup if temp_connection and hasattr(temp_connection, "close"): temp_connection.close() # Wait for the next check interval await asyncio.sleep(check_interval) except asyncio.CancelledError: logger.info("Connection check loop cancelled.") break # Exit the loop if cancelled except Exception as e: # Log unexpected errors in the loop itself logger.error( f"Unexpected error in connection_check_loop: {e}", exc_info=True ) await asyncio.sleep(check_interval) # Still wait before retrying # --- Helper for script execution --- async def execute_script_in_freecad( script: str, env_vars_to_get: List[str] = None ) -> Dict[str, Any]: """Executes a script in FreeCAD and handles errors/results.""" if not FC_CONNECTION or not FC_CONNECTION.is_connected(): raise FastMCPError("Not connected to FreeCAD") env_vars_to_get = env_vars_to_get or [] logger.debug( f"Executing script in FreeCAD: requesting env vars {env_vars_to_get}\n---\n{script}\n---" ) # Send initial progress update ctx = ToolContext.get() await ctx.send_progress(0.0, "Starting script execution...") try: # Send progress update before execution await ctx.send_progress(0.1, "Sending script to FreeCAD...") # Add connection validation and retry logic max_retries = 3 retry_count = 0 result = None while retry_count < max_retries: try: result = FC_CONNECTION.execute_command("execute_script", {"script": script}) # Await if the connection method is async if asyncio.iscoroutine(result): result = await result # If we got a result, break out of retry loop if result: break except Exception as retry_error: retry_count += 1 logger.warning(f"Script execution attempt {retry_count} failed: {retry_error}") if retry_count < max_retries: await ctx.send_progress(0.2, f"Retrying script execution (attempt {retry_count + 1})...") await asyncio.sleep(1) # Brief delay before retry else: raise retry_error # Send progress update after execution await ctx.send_progress(0.8, "Script executed, processing results...") if not result: error_msg = "No result returned from FreeCAD script execution" logger.error(f"FreeCAD script execution failed: {error_msg}") await ctx.send_progress(1.0, f"Error: {error_msg}") raise FastMCPError(f"FreeCAD execution error: {error_msg}") if "error" in result: error_msg = result.get( "error", "Unknown error from FreeCAD script execution" ) logger.error(f"FreeCAD script execution failed: {error_msg}") # Send error progress await ctx.send_progress(1.0, f"Error: {error_msg}") raise FastMCPError(f"FreeCAD execution error: {error_msg}") env = result.get("environment", {}) extracted_data = {var: env.get(var) for var in env_vars_to_get} logger.debug( f"Script execution successful. Env data received: {extracted_data}" ) # Send completion progress await ctx.send_progress(1.0, "Script execution completed successfully") return extracted_data except FastMCPError: # Catch specific error type raise except Exception as e: logger.error(f"Error during script execution call: {e}", exc_info=True) # Send error progress await ctx.send_progress(1.0, f"Error: {str(e)}") raise FastMCPError(f"Server error during script execution: {str(e)}") # --- Tool Definitions --- # == FreeCAD Document/Object Tools == @mcp.tool() async def freecad_create_document(name: str = "Unnamed") -> Dict[str, Any]: """Create a new FreeCAD document.""" if not FC_CONNECTION: raise FastMCPError("FreeCAD connection not available") logger.info(f"Executing freecad.create_document with name: {name}") ctx = ToolContext.get() await ctx.send_progress(0.1, f"Creating document '{name}'...") try: doc_name = FC_CONNECTION.create_document(name) if not doc_name: raise FastMCPError(f"Failed to create document '{name}' in FreeCAD.") await ctx.send_progress(1.0, "Document created successfully") return { "document_name": doc_name, "message": f"Successfully created document '{doc_name}'", "success": True, } except Exception as e: logger.error(f"Error in freecad.create_document: {e}", exc_info=True) await ctx.send_progress(1.0, f"Failed to create document: {str(e)}") raise FastMCPError(f"Failed to create document: {str(e)}") @mcp.tool() async def freecad_list_documents() -> Dict[str, Any]: """List all open documents in FreeCAD.""" logger.info("Executing freecad.list_documents") ctx = ToolContext.get() await ctx.send_progress(0.1, "Listing documents...") script = f""" import FreeCAD import json try: docs = [doc.Name for doc in FreeCAD.listDocuments().values()] result_json = json.dumps(docs) except Exception as e: result_json = json.dumps({{"error": str(e)}}) """ try: await ctx.send_progress(0.3, "Executing document list script...") env_data = await execute_script_in_freecad(script, ["result_json"]) result_json_str = env_data.get("result_json") if not result_json_str: raise FastMCPError("Did not receive result from FreeCAD script") docs_data = json.loads(result_json_str) if isinstance(docs_data, dict) and "error" in docs_data: raise FastMCPError( f"Error listing documents in FreeCAD: {docs_data['error']}" ) document_names = docs_data # Should be the list await ctx.send_progress(1.0, "Documents listed successfully") return { "documents": document_names, "count": len(document_names), "message": f"Found {len(document_names)} open documents.", "success": True, } except json.JSONDecodeError: logger.error("Failed to decode document list from FreeCAD script") await ctx.send_progress(1.0, "Failed to parse response") raise FastMCPError("Failed to parse response from FreeCAD") except Exception as e: logger.error(f"Error listing documents: {e}", exc_info=True) await ctx.send_progress(1.0, f"Error: {str(e)}") if not isinstance(e, FastMCPError): raise FastMCPError(f"Error listing documents: {str(e)}") else: raise e @mcp.tool() async def freecad_list_objects(document: Optional[str] = None) -> Dict[str, Any]: """List objects in a specific document (or active one if none specified).""" logger.info(f"Executing freecad.list_objects (Document: {document})") ctx = ToolContext.get() await ctx.send_progress(0.1, "Listing objects...") safe_doc_name = sanitize_name(document) if document else "" script = f""" import FreeCAD import json try: doc = None if "{safe_doc_name}": doc = FreeCAD.getDocument("{safe_doc_name}") if not doc: raise Exception(f"Document '{document}' not found") else: doc = FreeCAD.ActiveDocument if not doc: raise Exception("No active document found") objects = [obj.Name for obj in doc.Objects] result_json = json.dumps(objects) except Exception as e: result_json = json.dumps({{"error": str(e)}}) """ try: await ctx.send_progress(0.3, "Executing object list script...") env_data = await execute_script_in_freecad(script, ["result_json"]) result_json_str = env_data.get("result_json") if not result_json_str: raise FastMCPError("Did not receive result from FreeCAD script") objects_data = json.loads(result_json_str) if isinstance(objects_data, dict) and "error" in objects_data: raise FastMCPError( f"Error listing objects in FreeCAD: {objects_data['error']}" ) objects_list = objects_data await ctx.send_progress(1.0, "Objects listed successfully") return { "objects": objects_list, "count": len(objects_list), "document": document or ( FC_CONNECTION.get_active_document_name() if FC_CONNECTION else "Active" ), "message": f"Found {len(objects_list)} objects.", "success": True, } except json.JSONDecodeError: logger.error("Failed to decode object list from FreeCAD script") await ctx.send_progress(1.0, "Failed to parse response") raise FastMCPError("Failed to parse object list from FreeCAD") except Exception as e: logger.error(f"Error listing objects: {e}", exc_info=True) await ctx.send_progress(1.0, f"Error: {str(e)}") if not isinstance(e, FastMCPError): raise FastMCPError(f"Error listing objects: {str(e)}") else: raise e # == Part Primitive Creation Tools == @mcp.tool() async def freecad_create_box( length: float, width: float, height: float, name: str = "Box", position_x: float = 0.0, position_y: float = 0.0, position_z: float = 0.0, ) -> Dict[str, Any]: """Create a box primitive.""" logger.info(f"Executing freecad.create_box: {name}") ctx = ToolContext.get() await ctx.send_progress(0.1, f"Creating box '{name}'...") safe_name = sanitize_name(name) script = f""" import FreeCAD import Part import json result_json = '' try: doc = FreeCAD.ActiveDocument if not doc: raise Exception("No active document") box = doc.addObject("Part::Box", "{safe_name}") box.Length = float({length}) box.Width = float({width}) box.Height = float({height}) box.Placement.Base.x = float({position_x}) box.Placement.Base.y = float({position_y}) box.Placement.Base.z = float({position_z}) doc.recompute() result_json = json.dumps({{"object_name": box.Name, "type": "Box"}}) except Exception as e: result_json = json.dumps({{"error": str(e)}}) """ try: await ctx.send_progress(0.3, "Executing box creation script...") env_data = await execute_script_in_freecad(script, ["result_json"]) result_json_str = env_data.get("result_json") if not result_json_str: raise FastMCPError("Did not receive result from FreeCAD script") result_data = json.loads(result_json_str) if isinstance(result_data, dict) and "error" in result_data: raise FastMCPError(f"Error creating box in FreeCAD: {result_data['error']}") created_name = result_data.get("object_name") await ctx.send_progress(1.0, "Box created successfully") return { "object_name": created_name, "type": "Box", "message": f"Successfully created box '{created_name}'", "success": True, } except json.JSONDecodeError: logger.error("Failed to decode box creation result from FreeCAD script") await ctx.send_progress(1.0, "Failed to parse response") raise FastMCPError("Failed to parse response from FreeCAD") except Exception as e: logger.error(f"Error creating box: {e}", exc_info=True) await ctx.send_progress(1.0, f"Error: {str(e)}") if not isinstance(e, FastMCPError): raise FastMCPError(f"Error creating box: {str(e)}") else: raise e @mcp.tool() async def freecad_create_cylinder( radius: float, height: float, name: str = "Cylinder", position_x: float = 0.0, position_y: float = 0.0, position_z: float = 0.0, ) -> Dict[str, Any]: """Create a cylinder primitive.""" logger.info(f"Executing freecad.create_cylinder: {name}") ctx = ToolContext.get() await ctx.send_progress(0.1, f"Creating cylinder '{name}'...") safe_name = sanitize_name(name) script = f""" import FreeCAD import Part import json result_json = '' try: doc = FreeCAD.ActiveDocument if not doc: raise Exception("No active document") cyl = doc.addObject("Part::Cylinder", "{safe_name}") cyl.Radius = float({radius}) cyl.Height = float({height}) cyl.Placement.Base.x = float({position_x}) cyl.Placement.Base.y = float({position_y}) cyl.Placement.Base.z = float({position_z}) doc.recompute() result_json = json.dumps({{"object_name": cyl.Name, "type": "Cylinder"}}) except Exception as e: result_json = json.dumps({{"error": str(e)}}) """ try: await ctx.send_progress(0.3, "Executing cylinder creation script...") env_data = await execute_script_in_freecad(script, ["result_json"]) result_json_str = env_data.get("result_json") if not result_json_str: raise FastMCPError("Did not receive result from FreeCAD script") result_data = json.loads(result_json_str) if isinstance(result_data, dict) and "error" in result_data: raise FastMCPError( f"Error creating cylinder in FreeCAD: {result_data['error']}" ) created_name = result_data.get("object_name") await ctx.send_progress(1.0, "Cylinder created successfully") return { "object_name": created_name, "type": "Cylinder", "message": f"Successfully created cylinder '{created_name}'", "success": True, } except json.JSONDecodeError: logger.error("Failed to decode cylinder creation result") await ctx.send_progress(1.0, "Failed to parse response") raise FastMCPError("Failed to parse response from FreeCAD") except Exception as e: logger.error(f"Error creating cylinder: {e}", exc_info=True) await ctx.send_progress(1.0, f"Error: {str(e)}") if not isinstance(e, FastMCPError): raise FastMCPError(f"Error creating cylinder: {str(e)}") else: raise e @mcp.tool() async def freecad_create_sphere( radius: float, name: str = "Sphere", position_x: float = 0.0, position_y: float = 0.0, position_z: float = 0.0, ) -> Dict[str, Any]: """Create a sphere primitive.""" logger.info(f"Executing freecad.create_sphere: {name}") ctx = ToolContext.get() await ctx.send_progress(0.1, f"Creating sphere '{name}'...") safe_name = sanitize_name(name) script = f""" import FreeCAD import Part import json result_json = '' try: doc = FreeCAD.ActiveDocument if not doc: raise Exception("No active document") sph = doc.addObject("Part::Sphere", "{safe_name}") sph.Radius = float({radius}) sph.Placement.Base.x = float({position_x}) sph.Placement.Base.y = float({position_y}) sph.Placement.Base.z = float({position_z}) doc.recompute() result_json = json.dumps({{"object_name": sph.Name, "type": "Sphere"}}) except Exception as e: result_json = json.dumps({{"error": str(e)}}) """ try: await ctx.send_progress(0.3, "Executing sphere creation script...") env_data = await execute_script_in_freecad(script, ["result_json"]) result_json_str = env_data.get("result_json") if not result_json_str: raise FastMCPError("Did not receive result from FreeCAD script") result_data = json.loads(result_json_str) if isinstance(result_data, dict) and "error" in result_data: raise FastMCPError( f"Error creating sphere in FreeCAD: {result_data['error']}" ) created_name = result_data.get("object_name") await ctx.send_progress(1.0, "Sphere created successfully") return { "object_name": created_name, "type": "Sphere", "message": f"Successfully created sphere '{created_name}'", "success": True, } except json.JSONDecodeError: logger.error("Failed to decode sphere creation result") await ctx.send_progress(1.0, "Failed to parse response") raise FastMCPError("Failed to parse response from FreeCAD") except Exception as e: logger.error(f"Error creating sphere: {e}", exc_info=True) await ctx.send_progress(1.0, f"Error: {str(e)}") if not isinstance(e, FastMCPError): raise FastMCPError(f"Error creating sphere: {str(e)}") else: raise e @mcp.tool() async def freecad_create_cone( radius1: float, height: float, radius2: float = 0.0, # Top radius defaults to 0 for a pointed cone name: str = "Cone", position_x: float = 0.0, position_y: float = 0.0, position_z: float = 0.0, ) -> Dict[str, Any]: """Create a cone primitive.""" logger.info(f"Executing freecad.create_cone: {name}") ctx = ToolContext.get() await ctx.send_progress(0.1, f"Creating cone '{name}'...") safe_name = sanitize_name(name) script = f""" import FreeCAD import Part import json result_json = '' try: doc = FreeCAD.ActiveDocument if not doc: raise Exception("No active document") cone = doc.addObject("Part::Cone", "{safe_name}") cone.Radius1 = float({radius1}) cone.Radius2 = float({radius2}) cone.Height = float({height}) cone.Placement.Base.x = float({position_x}) cone.Placement.Base.y = float({position_y}) cone.Placement.Base.z = float({position_z}) doc.recompute() result_json = json.dumps({{"object_name": cone.Name, "type": "Cone"}}) except Exception as e: result_json = json.dumps({{"error": str(e)}}) """ try: await ctx.send_progress(0.3, "Executing cone creation script...") env_data = await execute_script_in_freecad(script, ["result_json"]) result_json_str = env_data.get("result_json") if not result_json_str: raise FastMCPError("Did not receive result from FreeCAD script") result_data = json.loads(result_json_str) if isinstance(result_data, dict) and "error" in result_data: raise FastMCPError( f"Error creating cone in FreeCAD: {result_data['error']}" ) created_name = result_data.get("object_name") await ctx.send_progress(1.0, "Cone created successfully") return { "object_name": created_name, "type": "Cone", "message": f"Successfully created cone '{created_name}'", "success": True, } except json.JSONDecodeError: logger.error("Failed to decode cone creation result") await ctx.send_progress(1.0, "Failed to parse response") raise FastMCPError("Failed to parse response from FreeCAD") except Exception as e: logger.error(f"Error creating cone: {e}", exc_info=True) await ctx.send_progress(1.0, f"Error: {str(e)}") if not isinstance(e, FastMCPError): raise FastMCPError(f"Error creating cone: {str(e)}") else: raise e # == Part Boolean Operation Tools == @mcp.tool() async def freecad_boolean_union( object1: str, object2: str, name: str = "Union" ) -> Dict[str, Any]: """Perform a boolean union (fuse) between two objects.""" logger.info(f"Executing freecad.boolean_union: {object1} + {object2} -> {name}") ctx = ToolContext.get() await ctx.send_progress(0.1, "Starting boolean union...") safe_obj1_name = sanitize_name(object1) safe_obj2_name = sanitize_name(object2) safe_result_name = sanitize_name(name) script = f""" import FreeCAD import Part import json result_json = '' try: doc = FreeCAD.ActiveDocument if not doc: raise Exception("No active document") obj1 = doc.getObject("{safe_obj1_name}") obj2 = doc.getObject("{safe_obj2_name}") if not obj1: raise Exception(f"Object '{object1}' not found") if not obj2: raise Exception(f"Object '{object2}' not found") union = doc.addObject("Part::Fuse", "{safe_result_name}") union.Base = obj1 union.Tool = obj2 # Hide original objects after operation? Optional. # obj1.ViewObject.Visibility = False # obj2.ViewObject.Visibility = False doc.recompute() if union.Shape.isNull(): raise Exception("Boolean operation resulted in empty shape.") result_json = json.dumps({{"object_name": union.Name, "type": "Union"}}) except Exception as e: result_json = json.dumps({{"error": str(e)}}) """ try: await ctx.send_progress(0.3, "Executing boolean union script...") env_data = await execute_script_in_freecad(script, ["result_json"]) result_json_str = env_data.get("result_json") if not result_json_str: raise FastMCPError("Did not receive result from FreeCAD script") result_data = json.loads(result_json_str) if isinstance(result_data, dict) and "error" in result_data: await ctx.send_progress( 1.0, f"Union operation failed: {result_data['error']}" ) raise FastMCPError( f"Error creating union in FreeCAD: {result_data['error']}" ) created_name = result_data.get("object_name") await ctx.send_progress(1.0, "Union completed successfully") return { "object_name": created_name, "type": "Union", "message": f"Successfully created union '{created_name}' from '{object1}' and '{object2}'", "success": True, } except json.JSONDecodeError: logger.error("Failed to decode union result") await ctx.send_progress(1.0, "Failed to parse response from FreeCAD") raise FastMCPError("Failed to parse response from FreeCAD") except Exception as e: logger.error(f"Error performing boolean union: {e}", exc_info=True) await ctx.send_progress(1.0, f"Union operation error: {str(e)}") if not isinstance(e, FastMCPError): raise FastMCPError(f"Error creating union: {str(e)}") else: raise e @mcp.tool() async def freecad_boolean_cut( object1: str, object2: str, name: str = "Cut" ) -> Dict[str, Any]: """Perform a boolean cut (difference) between two objects (object1 - object2).""" logger.info(f"Executing freecad.boolean_cut: {object1} - {object2} -> {name}") ctx = ToolContext.get() await ctx.send_progress(0.1, "Starting boolean cut...") safe_obj1_name = sanitize_name(object1) safe_obj2_name = sanitize_name(object2) safe_result_name = sanitize_name(name) script = f""" import FreeCAD import Part import json result_json = '' try: doc = FreeCAD.ActiveDocument if not doc: raise Exception("No active document") obj1 = doc.getObject("{safe_obj1_name}") # Base object obj2 = doc.getObject("{safe_obj2_name}") # Tool object to subtract if not obj1: raise Exception(f"Object '{object1}' not found") if not obj2: raise Exception(f"Object '{object2}' not found") cut = doc.addObject("Part::Cut", "{safe_result_name}") cut.Base = obj1 cut.Tool = obj2 # Hide original objects? Optional. # obj1.ViewObject.Visibility = False # obj2.ViewObject.Visibility = False doc.recompute() if cut.Shape.isNull(): raise Exception("Boolean operation resulted in empty shape.") result_json = json.dumps({{"object_name": cut.Name, "type": "Cut"}}) except Exception as e: result_json = json.dumps({{"error": str(e)}}) """ try: await ctx.send_progress(0.3, "Executing boolean cut script...") env_data = await execute_script_in_freecad(script, ["result_json"]) result_json_str = env_data.get("result_json") if not result_json_str: raise FastMCPError("Did not receive result from FreeCAD script") result_data = json.loads(result_json_str) if isinstance(result_data, dict) and "error" in result_data: await ctx.send_progress( 1.0, f"Cut operation failed: {result_data['error']}" ) raise FastMCPError(f"Error creating cut in FreeCAD: {result_data['error']}") created_name = result_data.get("object_name") await ctx.send_progress(1.0, "Cut completed successfully") return { "object_name": created_name, "type": "Cut", "message": f"Successfully created cut '{created_name}' ({object1} - {object2})", "success": True, } except json.JSONDecodeError: logger.error("Failed to decode cut result") await ctx.send_progress(1.0, "Failed to parse response") raise FastMCPError("Failed to parse response from FreeCAD") except Exception as e: logger.error(f"Error performing boolean cut: {e}", exc_info=True) await ctx.send_progress(1.0, f"Cut operation error: {str(e)}") if not isinstance(e, FastMCPError): raise FastMCPError(f"Error creating cut: {str(e)}") else: raise e @mcp.tool() async def freecad_boolean_intersection( object1: str, object2: str, name: str = "Intersection" ) -> Dict[str, Any]: """Perform a boolean intersection (common) between two objects.""" logger.info( f"Executing freecad.boolean_intersection: {object1} & {object2} -> {name}" ) ctx = ToolContext.get() await ctx.send_progress(0.1, "Starting boolean intersection...") safe_obj1_name = sanitize_name(object1) safe_obj2_name = sanitize_name(object2) safe_result_name = sanitize_name(name) script = f""" import FreeCAD import Part import json result_json = '' try: doc = FreeCAD.ActiveDocument if not doc: raise Exception("No active document") obj1 = doc.getObject("{safe_obj1_name}") obj2 = doc.getObject("{safe_obj2_name}") if not obj1: raise Exception(f"Object '{object1}' not found") if not obj2: raise Exception(f"Object '{object2}' not found") common = doc.addObject("Part::Common", "{safe_result_name}") common.Shapes = [obj1, obj2] # Intersection takes a list of shapes # Hide original objects? Optional. # obj1.ViewObject.Visibility = False # obj2.ViewObject.Visibility = False doc.recompute() if common.Shape.isNull(): raise Exception("Boolean operation resulted in empty shape.") result_json = json.dumps({{"object_name": common.Name, "type": "Intersection"}}) except Exception as e: result_json = json.dumps({{"error": str(e)}}) """ try: await ctx.send_progress(0.3, "Executing boolean intersection script...") env_data = await execute_script_in_freecad(script, ["result_json"]) result_json_str = env_data.get("result_json") if not result_json_str: raise FastMCPError("Did not receive result from FreeCAD script") result_data = json.loads(result_json_str) if isinstance(result_data, dict) and "error" in result_data: await ctx.send_progress( 1.0, f"Intersection operation failed: {result_data['error']}" ) raise FastMCPError( f"Error creating intersection in FreeCAD: {result_data['error']}" ) created_name = result_data.get("object_name") await ctx.send_progress(1.0, "Intersection completed successfully") return { "object_name": created_name, "type": "Intersection", "message": f"Successfully created intersection '{created_name}' between '{object1}' and '{object2}'", "success": True, } except json.JSONDecodeError: logger.error("Failed to decode intersection result") await ctx.send_progress(1.0, "Failed to parse response") raise FastMCPError("Failed to parse response from FreeCAD") except Exception as e: logger.error(f"Error performing boolean intersection: {e}", exc_info=True) await ctx.send_progress(1.0, f"Intersection operation error: {str(e)}") if not isinstance(e, FastMCPError): raise FastMCPError(f"Error creating intersection: {str(e)}") else: raise e # == FreeCAD Object Manipulation Tools == @mcp.tool() async def freecad_move_object( object_name: str, x: Optional[float] = None, y: Optional[float] = None, z: Optional[float] = None, ) -> Dict[str, Any]: """Move an object to a new absolute position. Specify at least one coordinate.""" if x is None and y is None and z is None: raise FastMCPError( "At least one coordinate (x, y, or z) must be provided for move_object" ) logger.info( f"Executing freecad.move_object: {object_name} to (x={x}, y={y}, z={z})" ) ctx = ToolContext.get() await ctx.send_progress(0.1, "Starting object move...") safe_obj_name = sanitize_name(object_name) # Construct parts of the placement update based on provided args updates = [] if x is not None: updates.append(f"obj.Placement.Base.x = float({x})") if y is not None: updates.append(f"obj.Placement.Base.y = float({y})") if z is not None: updates.append(f"obj.Placement.Base.z = float({z})") update_str = "\\n ".join(updates) script = f""" import FreeCAD import json result_json = '' try: doc = FreeCAD.ActiveDocument if not doc: raise Exception("No active document") obj = doc.getObject("{safe_obj_name}") if not obj: raise Exception(f"Object '{object_name}' not found") # Apply updates {update_str} doc.recompute() # Report final position final_pos = obj.Placement.Base result_json = json.dumps({{ "final_x": final_pos.x, "final_y": final_pos.y, "final_z": final_pos.z }}) except Exception as e: result_json = json.dumps({{"error": str(e)}}) """ try: await ctx.send_progress(0.3, "Executing move script...") env_data = await execute_script_in_freecad(script, ["result_json"]) result_json_str = env_data.get("result_json") if not result_json_str: raise FastMCPError("Did not receive result from FreeCAD script") result_data = json.loads(result_json_str) if isinstance(result_data, dict) and "error" in result_data: await ctx.send_progress( 1.0, f"Move operation failed: {result_data['error']}" ) raise FastMCPError( f"Error moving object in FreeCAD: {result_data['error']}" ) final_pos = result_data # Should contain x, y, z await ctx.send_progress(1.0, "Move completed successfully") return { "object_name": object_name, "final_position": final_pos, "message": f"Successfully moved object '{object_name}' to ({final_pos.get('x',0):.2f}, {final_pos.get('y',0):.2f}, {final_pos.get('z',0):.2f})", "success": True, } except json.JSONDecodeError: logger.error("Failed to decode move result") await ctx.send_progress(1.0, "Failed to parse response") raise FastMCPError("Failed to parse response from FreeCAD") except Exception as e: logger.error(f"Error moving object: {e}", exc_info=True) await ctx.send_progress(1.0, f"Move error: {str(e)}") if not isinstance(e, FastMCPError): raise FastMCPError(f"Error moving object: {str(e)}") else: raise e @mcp.tool() async def freecad_rotate_object( object_name: str, angle_x: float = 0.0, angle_y: float = 0.0, angle_z: float = 0.0 ) -> Dict[str, Any]: """Rotate an object by specified angles around its placement base point (XYZ order).""" logger.info( f"Executing freecad.rotate_object: {object_name} by (x={angle_x}, y={angle_y}, z={angle_z})" ) ctx = ToolContext.get() await ctx.send_progress(0.1, "Starting object rotation...") safe_obj_name = sanitize_name(object_name) script = f""" import FreeCAD import Part import math import json final_rot_x = 0.0; final_rot_y = 0.0; final_rot_z = 0.0 # Store result result_json = '' try: doc = FreeCAD.ActiveDocument if not doc: raise Exception("No active document") obj = doc.getObject("{safe_obj_name}") if not obj: raise Exception(f"Object '{object_name}' not found") # Convert degrees to radians rot_x_rad = math.radians(float({angle_x})) rot_y_rad = math.radians(float({angle_y})) rot_z_rad = math.radians(float({angle_z})) # Create rotation object for the *increment* # Assuming XYZ rotation order for the incremental rotation incremental_rotation = FreeCAD.Rotation(rot_x_rad, rot_y_rad, rot_z_rad) # Apply the incremental rotation to the object's current placement current_placement = obj.Placement # Rotation is multiplied: new_rot = increment * old_rot new_rotation = incremental_rotation.multiply(current_placement.Rotation) # Rotation applied relative to Base new_placement = FreeCAD.Placement(current_placement.Base, new_rotation) obj.Placement = new_placement doc.recompute() # Get final rotation as Euler angles (may differ slightly from input due to quaternion math) # FreeCAD's getYawPitchRoll() returns ZYX order final_angles = obj.Placement.Rotation.getYawPitchRoll() final_rot_z = math.degrees(final_angles[0]) # Yaw final_rot_y = math.degrees(final_angles[1]) # Pitch final_rot_x = math.degrees(final_angles[2]) # Roll result_json = json.dumps({{'applied_x': {angle_x}, 'applied_y': {angle_y}, 'applied_z': {angle_z}, 'final_x': final_rot_x, 'final_y': final_rot_y, 'final_z': final_rot_z}}) except Exception as e: result_json = json.dumps({{"error": str(e)}}) """ await ctx.send_progress(0.3, "Executing rotation script...") try: env_data = await execute_script_in_freecad(script, ["result_json"]) result_json_str = env_data.get("result_json") if not result_json_str: raise FastMCPError("Did not receive result from FreeCAD script") result_data = json.loads(result_json_str) if isinstance(result_data, dict) and "error" in result_data: raise FastMCPError( f"Error rotating object in FreeCAD: {result_data['error']}" ) applied = { k.split("_")[1]: v for k, v in result_data.items() if k.startswith("applied_") } final = { k.split("_")[1]: v for k, v in result_data.items() if k.startswith("final_") } # Include final angles await ctx.send_progress(1.0, "Rotation completed successfully") return { "object_name": object_name, "rotation_applied": applied, "final_rotation_euler_zyx": final, # Return final calculated angles "message": f"Applied rotation ({applied.get('x', 0.0):.1f}, {applied.get('y', 0.0):.1f}, {applied.get('z', 0.0):.1f}) degrees to object '{object_name}'", "success": True, } except json.JSONDecodeError: logger.error("Failed to decode rotation result") await ctx.send_progress(1.0, "Failed to parse response from FreeCAD") raise FastMCPError("Failed to parse response from FreeCAD") except FastMCPError: # Catch specific error type raise except Exception as e: logger.error(f"Error rotating object: {e}", exc_info=True) await ctx.send_progress(1.0, f"Rotation error: {str(e)}") raise FastMCPError(f"Error rotating object: {str(e)}") # == FreeCAD Export Tools == @mcp.tool() async def freecad_export_stl( file_path: str, objects: Optional[List[str]] = None, document: Optional[str] = None ) -> Dict[str, Any]: """Export specified objects (or all if none specified) from a document to an STL file.""" logger.info( f"Executing freecad.export_stl to: {file_path} (Objects: {objects}, Doc: {document})" ) # Get the ToolContext for progress reporting ctx = ToolContext.get() await ctx.send_progress(0.1, "Starting STL export...") # Basic path validation safe_file_path = sanitize_path(file_path) if not safe_file_path.lower().endswith(".stl"): raise FastMCPError("Invalid file_path. Must end with .stl") # Ensure objects is a list of strings if provided if objects is not None and ( not isinstance(objects, list) or not all(isinstance(o, str) for o in objects) ): raise FastMCPError("Parameter 'objects' must be a list of strings.") await ctx.send_progress(0.2, "Validating export parameters...") # Use direct API call if available and seems robust enough if FC_CONNECTION and hasattr(FC_CONNECTION, "export_stl"): try: await ctx.send_progress(0.3, "Using direct export method...") success = FC_CONNECTION.export_stl( object_names=objects, file_path=safe_file_path, document=document ) await ctx.send_progress(0.9, "Export operation completed, finalizing...") if success: await ctx.send_progress(1.0, "Export completed successfully") return { "file_path": safe_file_path, "message": f"Exported {'selected objects' if objects else 'active model'} to {safe_file_path}", "success": True, } else: await ctx.send_progress(1.0, "Export failed") raise FastMCPError("FreeCADConnection.export_stl returned False.") except Exception as e: logger.error(f"Error during direct export_stl call: {e}", exc_info=True) await ctx.send_progress(1.0, f"Export error: {str(e)}") raise FastMCPError(f"Failed to export STL (direct call): {str(e)}") else: # Fallback to script execution logger.warning("Falling back to script execution for export_stl") await ctx.send_progress(0.3, "Using script-based export method...") safe_doc_name = sanitize_name(document) if document else "" # Convert list of object names into a Python list string for the script objects_list_str = "None" if objects: sanitized_object_names = [f'"{sanitize_name(o)}"' for o in objects] objects_list_str = f'[{", ".join(sanitized_object_names)}]' await ctx.send_progress(0.4, "Preparing export script...") script = f""" import FreeCAD import Mesh # Use Mesh module for STL export import json success = False error_msg = '' result_json = '' try: doc = None if "{safe_doc_name}": doc = FreeCAD.getDocument("{safe_doc_name}") if not doc: raise Exception(f"Document '{document}' not found") else: doc = FreeCAD.ActiveDocument if not doc: raise Exception("No active document found") obj_list_to_export = [] export_obj_names = {objects_list_str} # Use the generated list string or None if export_obj_names is None: # Export all visible objects with a Shape if no specific list given obj_list_to_export = [o for o in doc.Objects if hasattr(o, 'Shape') and getattr(o.ViewObject, 'Visibility', False)] else: for name in export_obj_names: obj = doc.getObject(name) if obj and hasattr(obj, 'Shape'): obj_list_to_export.append(obj) else: raise Exception(f"Object '{{name}}' not found or is not exportable.") if not obj_list_to_export: raise Exception("No valid objects found to export.") # Mesh.export expects list of objects Mesh.export(obj_list_to_export, u"{safe_file_path}") success = True except Exception as e: error_msg = str(e) result_json = json.dumps({{"success": success, "error": error_msg}}) """ try: await ctx.send_progress(0.5, "Executing export script in FreeCAD...") env_data = await execute_script_in_freecad(script, ["result_json"]) await ctx.send_progress(0.9, "Processing export results...") result_json_str = env_data.get("result_json") if not result_json_str: raise FastMCPError("Did not receive result from FreeCAD export script") result_data = json.loads(result_json_str) if result_data.get("error"): await ctx.send_progress(1.0, f"Export failed: {result_data['error']}") raise FastMCPError( f"Error exporting STL in FreeCAD: {result_data['error']}" ) if result_data.get("success"): await ctx.send_progress(1.0, "Export completed successfully") return { "file_path": safe_file_path, "message": f"Exported {'selected objects' if objects else 'model'} to {safe_file_path} (via script)", "success": True, } else: await ctx.send_progress(1.0, "Export failed silently") raise FastMCPError("STL export script failed silently.") except json.JSONDecodeError: logger.error("Failed to decode export result") await ctx.send_progress(1.0, "Failed to parse response from FreeCAD") raise FastMCPError("Failed to parse response from FreeCAD") except Exception as e: logger.error(f"Error exporting STL (script fallback): {e}", exc_info=True) await ctx.send_progress(1.0, f"Export error: {str(e)}") if not isinstance(e, FastMCPError): raise FastMCPError(f"Error exporting STL: {str(e)}") else: raise e # --- Resource Definitions --- @mcp.resource("freecad://info") async def get_freecad_info() -> Dict[str, Any]: """Get information about the connected FreeCAD instance.""" logger.info("Executing get_freecad_info resource") if not FC_CONNECTION or not FC_CONNECTION.is_connected(): return {"status": "error", "message": "Not currently connected to FreeCAD."} try: version_info = FC_CONNECTION.get_version() version_str = ".".join(str(v) for v in version_info.get("version", ["Unknown"])) connection_type = FC_CONNECTION.get_connection_type() return { "status": "success", "freecad_version": version_str, "connection_type": connection_type, } except Exception as e: logger.error(f"Error getting FreeCAD info: {e}") return { "status": "error", "message": f"Error retrieving FreeCAD info: {str(e)}", } @mcp.resource("server://info") async def get_server_info() -> Dict[str, Any]: """ Get comprehensive information about the MCP server. Returns information about the server version, capabilities, FreeCAD connection status, and available tools/resources. This resource is useful for clients to understand what functionality is available and the current state of the server. """ logger.info("Executing get_server_info resource") # Get server version and name server_info = { "name": server_name, "version": VERSION, "freecad_connection": { "connected": FC_CONNECTION is not None and FC_CONNECTION.is_connected(), "connection_type": ( FC_CONNECTION.get_connection_type() if FC_CONNECTION and FC_CONNECTION.is_connected() else "none" ), }, "capabilities": { "tools": { "document_management": True, "primitives": CONFIG.get("tools", {}).get("enable_primitives", True), "sketcher": CONFIG.get("tools", {}).get("enable_sketcher", True), "constraints": CONFIG.get("tools", {}).get("enable_constraints", True), "measurements": CONFIG.get("tools", {}).get( "enable_measurements", True ), "boolean_operations": True, "transformations": True, "export": True, }, "resources": {"freecad_info": True, "server_info": True}, }, "runtime": { "python_version": f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}", "platform": sys.platform, }, } # Add FreeCAD version info if connected if FC_CONNECTION and FC_CONNECTION.is_connected(): try: version_info = FC_CONNECTION.get_version() version_str = ".".join( str(v) for v in version_info.get("version", ["Unknown"]) ) server_info["freecad_connection"]["version"] = version_str except Exception as e: logger.warning(f"Could not retrieve FreeCAD version for server info: {e}") server_info["freecad_connection"]["version"] = "unknown" # Get available tools by inspecting global namespace for mcp.tool decorators # This is a simplification - in a real implementation you might want to # dynamically discover all tool functions tools = [] for name, value in globals().items(): if name.startswith("freecad_") and callable(value): tools.append(name) server_info["available_tools"] = tools return server_info # --- Main Execution --- async def main(): """Main entry point for the server.""" global CONFIG, FC_CONNECTION # --- Argument Parsing --- parser = argparse.ArgumentParser(description="Advanced FreeCAD MCP Server") parser.add_argument( "--config", default=CONFIG_PATH, help="Path to configuration file" ) parser.add_argument( "--log-host", default="localhost", help="Host for the logging socket receiver" ) parser.add_argument( "--log-port", type=int, default=LOGGING_PORT, help="Port for the logging socket receiver", ) parser.add_argument("--debug", action="store_true", help="Enable debug logging") parser.add_argument( "--version", action="store_true", help="Show server version and exit" ) args = parser.parse_args() # --- Version Info --- if args.version: print(f"MCP-FreeCAD Server v{VERSION}") return # --- Setup Logging --- if args.debug: logging.getLogger().setLevel(logging.DEBUG) logger.setLevel(logging.DEBUG) logger.debug("Debug logging enabled") # --- Check MCP Dependency --- if mcp_import_error: logger.error( f"ERROR: FastMCP SDK (fastmcp package) not found or import failed: {mcp_import_error}" ) logger.error("Please install it with: pip install fastmcp") sys.exit(1) # --- Load Configuration --- CONFIG = load_config(args.config) if args.host: CONFIG.setdefault("freecad", {})["host"] = args.host if args.port: CONFIG.setdefault("freecad", {})["port"] = args.port # --- Initialize FreeCAD Connection (Initial Attempt) --- initialize_freecad_connection(CONFIG) # --- Logging Receiver Setup --- log_receiver = None try: log_receiver = LogRecordSocketReceiver(host=args.log_host, port=args.log_port) receiver_thread = threading.Thread( target=log_receiver.serve_until_stopped, daemon=True ) receiver_thread.start() logger.info(f"Logging receiver started on {args.log_host}:{args.log_port}") # Optional: Add a SocketHandler to the server's own logger # to route its logs through the receiver as well. This ensures # all logs (local and remote) are handled identically. # root_logger = logging.getLogger() # socket_handler = logging.handlers.SocketHandler(args.log_host, args.log_port) # root_logger.addHandler(socket_handler) # logger.info("Server's own logs also routed to socket receiver.") except Exception as e: logger.error(f"Failed to start logging receiver: {e}") # Decide if this is fatal or not # return 1 # Example: Exit if logging receiver fails # --- Background Connection Check --- # Only start if FreeCAD features are desired/possible connection_check_task = None if FREECAD_CONNECTION_AVAILABLE: connection_check_task = asyncio.create_task(connection_check_loop(CONFIG)) else: logger.info( "Skipping background connection check as FreeCAD connection is unavailable." ) # --- Register Progress Callback --- # Example: Registering a simple progress callback for stdout async def local_progress_callback(progress_value, message=None): if message: logger.info(f"Progress: {progress_value*100:.1f}% - {message}") else: logger.info(f"Progress: {progress_value*100:.1f}%") tool_context = ToolContext.get() # Prefer FastMCP's built-in progress if available and setup # This assumes FastMCP provides a mechanism to register a progress handler, # which might need adjustment based on the actual FastMCP version/API. # If FastMCP uses ToolContext directly, this might be redundant. # For now, setting it on our ToolContext instance. tool_context.set_progress_callback( local_progress_callback ) # Use local console logger for progress # --- Start Server --- logger.info(f"Starting MCP server '{server_name}' v{VERSION}...") # Pass unknown arguments if necessary, depending on FastMCP's run method try: # Note: Check FastMCP documentation for how to pass extra args if needed await mcp.run_stdio() # Or run_tcp, run_ws depending on desired transport except KeyboardInterrupt: logger.info("Server shutting down (KeyboardInterrupt)...") except Exception as e: logger.error(f"Server exited with error: {e}", exc_info=True) # Log stack trace finally: logger.info("Performing cleanup...") # --- Cleanup --- if connection_check_task: connection_check_task.cancel() try: await connection_check_task except asyncio.CancelledError: logger.info("Background connection check task cancelled.") if FC_CONNECTION and FC_CONNECTION.is_connected(): logger.info("Closing FreeCAD connection...") FC_CONNECTION.close() if log_receiver: logger.info("Shutting down logging receiver...") log_receiver.abort = 1 # receiver_thread.join() # Wait for thread to finish logger.info("Server shutdown complete.") if __name__ == "__main__": # Check if FastMCP was imported successfully if mcp_import_error: sys.stderr.write( f"Fatal Error: Failed to import FastMCP - {mcp_import_error}\n" ) sys.stderr.write( "Please ensure FastMCP is installed correctly ('pip install fastmcp').\n" ) sys.exit(1) # Run the main async function try: asyncio.run(main()) except KeyboardInterrupt: # Already handled in main's finally block, but catch here too for clean exit logger.info("Server shutdown initiated by KeyboardInterrupt.") except Exception as e: logger.critical(f"Unhandled exception in main execution: {e}", exc_info=True) sys.exit(1)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jango-blockchained/mcp-freecad'

If you have feedback or need assistance with the MCP directory API, please join our Discord server