Skip to main content
Glama
kicad_interface.py58.7 kB
#!/usr/bin/env python3 """ KiCAD Python Interface Script for Model Context Protocol This script handles communication between the MCP TypeScript server and KiCAD's Python API (pcbnew). It receives commands via stdin as JSON and returns responses via stdout also as JSON. """ import sys import json import traceback import logging import os from typing import Dict, Any, Optional # Import tool schemas and resource definitions from schemas.tool_schemas import TOOL_SCHEMAS from resources.resource_definitions import RESOURCE_DEFINITIONS, handle_resource_read # Configure logging log_dir = os.path.join(os.path.expanduser('~'), '.kicad-mcp', 'logs') os.makedirs(log_dir, exist_ok=True) log_file = os.path.join(log_dir, 'kicad_interface.log') logging.basicConfig( level=logging.DEBUG, format='%(asctime)s [%(levelname)s] %(message)s', handlers=[ logging.FileHandler(log_file), logging.StreamHandler(sys.stderr) ] ) logger = logging.getLogger('kicad_interface') # Log Python environment details logger.info(f"Python version: {sys.version}") logger.info(f"Python executable: {sys.executable}") logger.info(f"Platform: {sys.platform}") logger.info(f"Working directory: {os.getcwd()}") # Windows-specific diagnostics if sys.platform == 'win32': logger.info("=== Windows Environment Diagnostics ===") logger.info(f"PYTHONPATH: {os.environ.get('PYTHONPATH', 'NOT SET')}") logger.info(f"PATH: {os.environ.get('PATH', 'NOT SET')[:200]}...") # Truncate PATH # Check for common KiCAD installations common_kicad_paths = [ r"C:\Program Files\KiCad", r"C:\Program Files (x86)\KiCad" ] found_kicad = False for base_path in common_kicad_paths: if os.path.exists(base_path): logger.info(f"Found KiCAD installation at: {base_path}") # List versions try: versions = [d for d in os.listdir(base_path) if os.path.isdir(os.path.join(base_path, d))] logger.info(f" Versions found: {', '.join(versions)}") for version in versions: python_path = os.path.join(base_path, version, 'lib', 'python3', 'dist-packages') if os.path.exists(python_path): logger.info(f" ✓ Python path exists: {python_path}") found_kicad = True else: logger.warning(f" ✗ Python path missing: {python_path}") except Exception as e: logger.warning(f" Could not list versions: {e}") if not found_kicad: logger.warning("No KiCAD installations found in standard locations!") logger.warning("Please ensure KiCAD 9.0+ is installed from https://www.kicad.org/download/windows/") logger.info("========================================") # Add utils directory to path for imports utils_dir = os.path.join(os.path.dirname(__file__)) if utils_dir not in sys.path: sys.path.insert(0, utils_dir) # Import platform helper and add KiCAD paths from utils.platform_helper import PlatformHelper from utils.kicad_process import check_and_launch_kicad, KiCADProcessManager logger.info(f"Detecting KiCAD Python paths for {PlatformHelper.get_platform_name()}...") paths_added = PlatformHelper.add_kicad_to_python_path() if paths_added: logger.info("Successfully added KiCAD Python paths to sys.path") else: logger.warning("No KiCAD Python paths found - attempting to import pcbnew from system path") logger.info(f"Current Python path: {sys.path}") # Check if auto-launch is enabled AUTO_LAUNCH_KICAD = os.environ.get("KICAD_AUTO_LAUNCH", "false").lower() == "true" if AUTO_LAUNCH_KICAD: logger.info("KiCAD auto-launch enabled") # Check which backend to use # KICAD_BACKEND can be: 'auto', 'ipc', or 'swig' KICAD_BACKEND = os.environ.get("KICAD_BACKEND", "auto").lower() logger.info(f"KiCAD backend preference: {KICAD_BACKEND}") # Try to use IPC backend first if available and preferred USE_IPC_BACKEND = False ipc_backend = None if KICAD_BACKEND in ('auto', 'ipc'): try: logger.info("Checking IPC backend availability...") from kicad_api.ipc_backend import IPCBackend # Try to connect to running KiCAD ipc_backend = IPCBackend() if ipc_backend.connect(): USE_IPC_BACKEND = True logger.info(f"✓ Using IPC backend - real-time UI sync enabled!") logger.info(f" KiCAD version: {ipc_backend.get_version()}") else: logger.info("IPC backend available but KiCAD not running with IPC enabled") ipc_backend = None except ImportError: logger.info("IPC backend not available (kicad-python not installed)") except Exception as e: logger.info(f"IPC backend connection failed: {e}") ipc_backend = None # Fall back to SWIG backend if IPC not available if not USE_IPC_BACKEND and KICAD_BACKEND != 'ipc': # Import KiCAD's Python API (SWIG) try: logger.info("Attempting to import pcbnew module (SWIG backend)...") import pcbnew # type: ignore logger.info(f"Successfully imported pcbnew module from: {pcbnew.__file__}") logger.info(f"pcbnew version: {pcbnew.GetBuildVersion()}") logger.warning("Using SWIG backend - changes require manual reload in KiCAD UI") except ImportError as e: logger.error(f"Failed to import pcbnew module: {e}") logger.error(f"Current sys.path: {sys.path}") # Platform-specific help message help_message = "" if sys.platform == 'win32': help_message = """ Windows Troubleshooting: 1. Verify KiCAD is installed: C:\\Program Files\\KiCad\\9.0 2. Check PYTHONPATH environment variable points to: C:\\Program Files\\KiCad\\9.0\\lib\\python3\\dist-packages 3. Test with: "C:\\Program Files\\KiCad\\9.0\\bin\\python.exe" -c "import pcbnew" 4. Log file location: %USERPROFILE%\\.kicad-mcp\\logs\\kicad_interface.log 5. Run setup-windows.ps1 for automatic configuration """ elif sys.platform == 'darwin': help_message = """ macOS Troubleshooting: 1. Verify KiCAD is installed: /Applications/KiCad/KiCad.app 2. Check PYTHONPATH points to KiCAD's Python packages 3. Run: python3 -c "import pcbnew" to test """ else: # Linux help_message = """ Linux Troubleshooting: 1. Verify KiCAD is installed: apt list --installed | grep kicad 2. Check: /usr/lib/kicad/lib/python3/dist-packages exists 3. Test: python3 -c "import pcbnew" """ logger.error(help_message) error_response = { "success": False, "message": "Failed to import pcbnew module - KiCAD Python API not found", "errorDetails": f"Error: {str(e)}\n\n{help_message}\n\nPython sys.path:\n{chr(10).join(sys.path)}" } print(json.dumps(error_response)) sys.exit(1) except Exception as e: logger.error(f"Unexpected error importing pcbnew: {e}") logger.error(traceback.format_exc()) error_response = { "success": False, "message": "Error importing pcbnew module", "errorDetails": str(e) } print(json.dumps(error_response)) sys.exit(1) # If IPC-only mode requested but not available, exit with error elif KICAD_BACKEND == 'ipc' and not USE_IPC_BACKEND: error_response = { "success": False, "message": "IPC backend requested but not available", "errorDetails": "KiCAD must be running with IPC API enabled. Enable at: Preferences > Plugins > Enable IPC API Server" } print(json.dumps(error_response)) sys.exit(1) # Import command handlers try: logger.info("Importing command handlers...") from commands.project import ProjectCommands from commands.board import BoardCommands from commands.component import ComponentCommands from commands.routing import RoutingCommands from commands.design_rules import DesignRuleCommands from commands.export import ExportCommands from commands.schematic import SchematicManager from commands.component_schematic import ComponentManager from commands.connection_schematic import ConnectionManager from commands.library_schematic import LibraryManager as SchematicLibraryManager from commands.library import LibraryManager as FootprintLibraryManager, LibraryCommands logger.info("Successfully imported all command handlers") except ImportError as e: logger.error(f"Failed to import command handlers: {e}") error_response = { "success": False, "message": "Failed to import command handlers", "errorDetails": str(e) } print(json.dumps(error_response)) sys.exit(1) class KiCADInterface: """Main interface class to handle KiCAD operations""" def __init__(self): """Initialize the interface and command handlers""" self.board = None self.project_filename = None self.use_ipc = USE_IPC_BACKEND self.ipc_backend = ipc_backend self.ipc_board_api = None if self.use_ipc: logger.info("Initializing with IPC backend (real-time UI sync enabled)") try: self.ipc_board_api = self.ipc_backend.get_board() logger.info("✓ Got IPC board API") except Exception as e: logger.warning(f"Could not get IPC board API: {e}") else: logger.info("Initializing with SWIG backend") logger.info("Initializing command handlers...") # Initialize footprint library manager self.footprint_library = FootprintLibraryManager() # Initialize command handlers self.project_commands = ProjectCommands(self.board) self.board_commands = BoardCommands(self.board) self.component_commands = ComponentCommands(self.board, self.footprint_library) self.routing_commands = RoutingCommands(self.board) self.design_rule_commands = DesignRuleCommands(self.board) self.export_commands = ExportCommands(self.board) self.library_commands = LibraryCommands(self.footprint_library) # Schematic-related classes don't need board reference # as they operate directly on schematic files # Command routing dictionary self.command_routes = { # Project commands "create_project": self.project_commands.create_project, "open_project": self.project_commands.open_project, "save_project": self.project_commands.save_project, "get_project_info": self.project_commands.get_project_info, # Board commands "set_board_size": self.board_commands.set_board_size, "add_layer": self.board_commands.add_layer, "set_active_layer": self.board_commands.set_active_layer, "get_board_info": self.board_commands.get_board_info, "get_layer_list": self.board_commands.get_layer_list, "get_board_2d_view": self.board_commands.get_board_2d_view, "add_board_outline": self.board_commands.add_board_outline, "add_mounting_hole": self.board_commands.add_mounting_hole, "add_text": self.board_commands.add_text, "add_board_text": self.board_commands.add_text, # Alias for TypeScript tool # Component commands "place_component": self.component_commands.place_component, "move_component": self.component_commands.move_component, "rotate_component": self.component_commands.rotate_component, "delete_component": self.component_commands.delete_component, "edit_component": self.component_commands.edit_component, "get_component_properties": self.component_commands.get_component_properties, "get_component_list": self.component_commands.get_component_list, "place_component_array": self.component_commands.place_component_array, "align_components": self.component_commands.align_components, "duplicate_component": self.component_commands.duplicate_component, # Routing commands "add_net": self.routing_commands.add_net, "route_trace": self.routing_commands.route_trace, "add_via": self.routing_commands.add_via, "delete_trace": self.routing_commands.delete_trace, "get_nets_list": self.routing_commands.get_nets_list, "create_netclass": self.routing_commands.create_netclass, "add_copper_pour": self.routing_commands.add_copper_pour, "route_differential_pair": self.routing_commands.route_differential_pair, # Design rule commands "set_design_rules": self.design_rule_commands.set_design_rules, "get_design_rules": self.design_rule_commands.get_design_rules, "run_drc": self.design_rule_commands.run_drc, "get_drc_violations": self.design_rule_commands.get_drc_violations, # Export commands "export_gerber": self.export_commands.export_gerber, "export_pdf": self.export_commands.export_pdf, "export_svg": self.export_commands.export_svg, "export_3d": self.export_commands.export_3d, "export_bom": self.export_commands.export_bom, # Library commands (footprint management) "list_libraries": self.library_commands.list_libraries, "search_footprints": self.library_commands.search_footprints, "list_library_footprints": self.library_commands.list_library_footprints, "get_footprint_info": self.library_commands.get_footprint_info, # Schematic commands "create_schematic": self._handle_create_schematic, "load_schematic": self._handle_load_schematic, "add_schematic_component": self._handle_add_schematic_component, "add_schematic_wire": self._handle_add_schematic_wire, "add_schematic_connection": self._handle_add_schematic_connection, "add_schematic_net_label": self._handle_add_schematic_net_label, "connect_to_net": self._handle_connect_to_net, "get_net_connections": self._handle_get_net_connections, "generate_netlist": self._handle_generate_netlist, "list_schematic_libraries": self._handle_list_schematic_libraries, "export_schematic_pdf": self._handle_export_schematic_pdf, # UI/Process management commands "check_kicad_ui": self._handle_check_kicad_ui, "launch_kicad_ui": self._handle_launch_kicad_ui, # IPC-specific commands (real-time operations) "get_backend_info": self._handle_get_backend_info, "ipc_add_track": self._handle_ipc_add_track, "ipc_add_via": self._handle_ipc_add_via, "ipc_add_text": self._handle_ipc_add_text, "ipc_list_components": self._handle_ipc_list_components, "ipc_get_tracks": self._handle_ipc_get_tracks, "ipc_get_vias": self._handle_ipc_get_vias, "ipc_save_board": self._handle_ipc_save_board } logger.info(f"KiCAD interface initialized (backend: {'IPC' if self.use_ipc else 'SWIG'})") # Commands that can be handled via IPC for real-time updates IPC_CAPABLE_COMMANDS = { # Routing commands "route_trace": "_ipc_route_trace", "add_via": "_ipc_add_via", "add_net": "_ipc_add_net", # Board commands "add_text": "_ipc_add_text", "add_board_text": "_ipc_add_text", "set_board_size": "_ipc_set_board_size", "get_board_info": "_ipc_get_board_info", # Component commands "place_component": "_ipc_place_component", "move_component": "_ipc_move_component", "delete_component": "_ipc_delete_component", "get_component_list": "_ipc_get_component_list", # Save command "save_project": "_ipc_save_project", } def handle_command(self, command: str, params: Dict[str, Any]) -> Dict[str, Any]: """Route command to appropriate handler, preferring IPC when available""" logger.info(f"Handling command: {command}") logger.debug(f"Command parameters: {params}") try: # Check if we can use IPC for this command (real-time UI sync) if self.use_ipc and self.ipc_board_api and command in self.IPC_CAPABLE_COMMANDS: ipc_handler_name = self.IPC_CAPABLE_COMMANDS[command] ipc_handler = getattr(self, ipc_handler_name, None) if ipc_handler: logger.info(f"Using IPC backend for {command} (real-time sync)") result = ipc_handler(params) # Add indicator that IPC was used if isinstance(result, dict): result["_backend"] = "ipc" result["_realtime"] = True logger.debug(f"IPC command result: {result}") return result # Fall back to SWIG-based handler if self.use_ipc and command in self.IPC_CAPABLE_COMMANDS: logger.warning(f"IPC handler not available for {command}, falling back to SWIG (deprecated)") # Get the handler for the command handler = self.command_routes.get(command) if handler: # Execute the command result = handler(params) logger.debug(f"Command result: {result}") # Add backend indicator if isinstance(result, dict): result["_backend"] = "swig" result["_realtime"] = False # Update board reference if command was successful if result.get("success", False): if command == "create_project" or command == "open_project": logger.info("Updating board reference...") # Get board from the project commands handler self.board = self.project_commands.board self._update_command_handlers() return result else: logger.error(f"Unknown command: {command}") return { "success": False, "message": f"Unknown command: {command}", "errorDetails": "The specified command is not supported" } except Exception as e: # Get the full traceback traceback_str = traceback.format_exc() logger.error(f"Error handling command {command}: {str(e)}\n{traceback_str}") return { "success": False, "message": f"Error handling command: {command}", "errorDetails": f"{str(e)}\n{traceback_str}" } def _update_command_handlers(self): """Update board reference in all command handlers""" logger.debug("Updating board reference in command handlers") self.project_commands.board = self.board self.board_commands.board = self.board self.component_commands.board = self.board self.routing_commands.board = self.board self.design_rule_commands.board = self.board self.export_commands.board = self.board # Schematic command handlers def _handle_create_schematic(self, params): """Create a new schematic""" logger.info("Creating schematic") try: project_name = params.get("projectName") path = params.get("path", ".") metadata = params.get("metadata", {}) if not project_name: return {"success": False, "message": "Project name is required"} schematic = SchematicManager.create_schematic(project_name, metadata) file_path = f"{path}/{project_name}.kicad_sch" success = SchematicManager.save_schematic(schematic, file_path) return {"success": success, "file_path": file_path} except Exception as e: logger.error(f"Error creating schematic: {str(e)}") return {"success": False, "message": str(e)} def _handle_load_schematic(self, params): """Load an existing schematic""" logger.info("Loading schematic") try: filename = params.get("filename") if not filename: return {"success": False, "message": "Filename is required"} schematic = SchematicManager.load_schematic(filename) success = schematic is not None if success: metadata = SchematicManager.get_schematic_metadata(schematic) return {"success": success, "metadata": metadata} else: return {"success": False, "message": "Failed to load schematic"} except Exception as e: logger.error(f"Error loading schematic: {str(e)}") return {"success": False, "message": str(e)} def _handle_add_schematic_component(self, params): """Add a component to a schematic""" logger.info("Adding component to schematic") try: schematic_path = params.get("schematicPath") component = params.get("component", {}) if not schematic_path: return {"success": False, "message": "Schematic path is required"} if not component: return {"success": False, "message": "Component definition is required"} schematic = SchematicManager.load_schematic(schematic_path) if not schematic: return {"success": False, "message": "Failed to load schematic"} component_obj = ComponentManager.add_component(schematic, component) success = component_obj is not None if success: SchematicManager.save_schematic(schematic, schematic_path) return {"success": True} else: return {"success": False, "message": "Failed to add component"} except Exception as e: logger.error(f"Error adding component to schematic: {str(e)}") return {"success": False, "message": str(e)} def _handle_add_schematic_wire(self, params): """Add a wire to a schematic""" logger.info("Adding wire to schematic") try: schematic_path = params.get("schematicPath") start_point = params.get("startPoint") end_point = params.get("endPoint") if not schematic_path: return {"success": False, "message": "Schematic path is required"} if not start_point or not end_point: return {"success": False, "message": "Start and end points are required"} schematic = SchematicManager.load_schematic(schematic_path) if not schematic: return {"success": False, "message": "Failed to load schematic"} wire = ConnectionManager.add_wire(schematic, start_point, end_point) success = wire is not None if success: SchematicManager.save_schematic(schematic, schematic_path) return {"success": True} else: return {"success": False, "message": "Failed to add wire"} except Exception as e: logger.error(f"Error adding wire to schematic: {str(e)}") return {"success": False, "message": str(e)} def _handle_list_schematic_libraries(self, params): """List available symbol libraries""" logger.info("Listing schematic libraries") try: search_paths = params.get("searchPaths") libraries = LibraryManager.list_available_libraries(search_paths) return {"success": True, "libraries": libraries} except Exception as e: logger.error(f"Error listing schematic libraries: {str(e)}") return {"success": False, "message": str(e)} def _handle_export_schematic_pdf(self, params): """Export schematic to PDF""" logger.info("Exporting schematic to PDF") try: schematic_path = params.get("schematicPath") output_path = params.get("outputPath") if not schematic_path: return {"success": False, "message": "Schematic path is required"} if not output_path: return {"success": False, "message": "Output path is required"} import subprocess result = subprocess.run( ["kicad-cli", "sch", "export", "pdf", "--output", output_path, schematic_path], capture_output=True, text=True ) success = result.returncode == 0 message = result.stderr if not success else "" return {"success": success, "message": message} except Exception as e: logger.error(f"Error exporting schematic to PDF: {str(e)}") return {"success": False, "message": str(e)} def _handle_add_schematic_connection(self, params): """Add a pin-to-pin connection in schematic""" logger.info("Adding pin-to-pin connection in schematic") try: schematic_path = params.get("schematicPath") source_ref = params.get("sourceRef") source_pin = params.get("sourcePin") target_ref = params.get("targetRef") target_pin = params.get("targetPin") if not all([schematic_path, source_ref, source_pin, target_ref, target_pin]): return {"success": False, "message": "Missing required parameters"} schematic = SchematicManager.load_schematic(schematic_path) if not schematic: return {"success": False, "message": "Failed to load schematic"} success = ConnectionManager.add_connection(schematic, source_ref, source_pin, target_ref, target_pin) if success: SchematicManager.save_schematic(schematic, schematic_path) return {"success": True} else: return {"success": False, "message": "Failed to add connection"} except Exception as e: logger.error(f"Error adding schematic connection: {str(e)}") return {"success": False, "message": str(e)} def _handle_add_schematic_net_label(self, params): """Add a net label to schematic""" logger.info("Adding net label to schematic") try: schematic_path = params.get("schematicPath") net_name = params.get("netName") position = params.get("position") if not all([schematic_path, net_name, position]): return {"success": False, "message": "Missing required parameters"} schematic = SchematicManager.load_schematic(schematic_path) if not schematic: return {"success": False, "message": "Failed to load schematic"} label = ConnectionManager.add_net_label(schematic, net_name, position) if label: SchematicManager.save_schematic(schematic, schematic_path) return {"success": True} else: return {"success": False, "message": "Failed to add net label"} except Exception as e: logger.error(f"Error adding net label: {str(e)}") return {"success": False, "message": str(e)} def _handle_connect_to_net(self, params): """Connect a component pin to a named net""" logger.info("Connecting component pin to net") try: schematic_path = params.get("schematicPath") component_ref = params.get("componentRef") pin_name = params.get("pinName") net_name = params.get("netName") if not all([schematic_path, component_ref, pin_name, net_name]): return {"success": False, "message": "Missing required parameters"} schematic = SchematicManager.load_schematic(schematic_path) if not schematic: return {"success": False, "message": "Failed to load schematic"} success = ConnectionManager.connect_to_net(schematic, component_ref, pin_name, net_name) if success: SchematicManager.save_schematic(schematic, schematic_path) return {"success": True} else: return {"success": False, "message": "Failed to connect to net"} except Exception as e: logger.error(f"Error connecting to net: {str(e)}") return {"success": False, "message": str(e)} def _handle_get_net_connections(self, params): """Get all connections for a named net""" logger.info("Getting net connections") try: schematic_path = params.get("schematicPath") net_name = params.get("netName") if not all([schematic_path, net_name]): return {"success": False, "message": "Missing required parameters"} schematic = SchematicManager.load_schematic(schematic_path) if not schematic: return {"success": False, "message": "Failed to load schematic"} connections = ConnectionManager.get_net_connections(schematic, net_name) return {"success": True, "connections": connections} except Exception as e: logger.error(f"Error getting net connections: {str(e)}") return {"success": False, "message": str(e)} def _handle_generate_netlist(self, params): """Generate netlist from schematic""" logger.info("Generating netlist from schematic") try: schematic_path = params.get("schematicPath") if not schematic_path: return {"success": False, "message": "Schematic path is required"} schematic = SchematicManager.load_schematic(schematic_path) if not schematic: return {"success": False, "message": "Failed to load schematic"} netlist = ConnectionManager.generate_netlist(schematic) return {"success": True, "netlist": netlist} except Exception as e: logger.error(f"Error generating netlist: {str(e)}") return {"success": False, "message": str(e)} def _handle_check_kicad_ui(self, params): """Check if KiCAD UI is running""" logger.info("Checking if KiCAD UI is running") try: manager = KiCADProcessManager() is_running = manager.is_running() processes = manager.get_process_info() if is_running else [] return { "success": True, "running": is_running, "processes": processes, "message": "KiCAD is running" if is_running else "KiCAD is not running" } except Exception as e: logger.error(f"Error checking KiCAD UI status: {str(e)}") return {"success": False, "message": str(e)} def _handle_launch_kicad_ui(self, params): """Launch KiCAD UI""" logger.info("Launching KiCAD UI") try: project_path = params.get("projectPath") auto_launch = params.get("autoLaunch", AUTO_LAUNCH_KICAD) # Convert project path to Path object if provided from pathlib import Path path_obj = Path(project_path) if project_path else None result = check_and_launch_kicad(path_obj, auto_launch) return { "success": True, **result } except Exception as e: logger.error(f"Error launching KiCAD UI: {str(e)}") return {"success": False, "message": str(e)} # ========================================================================= # IPC Backend handlers - these provide real-time UI synchronization # These methods are called automatically when IPC is available # ========================================================================= def _ipc_route_trace(self, params): """IPC handler for route_trace - adds track with real-time UI update""" try: # Extract parameters matching the existing route_trace interface start = params.get("start", {}) end = params.get("end", {}) layer = params.get("layer", "F.Cu") width = params.get("width", 0.25) net = params.get("net") # Handle both dict format and direct x/y start_x = start.get("x", 0) if isinstance(start, dict) else params.get("startX", 0) start_y = start.get("y", 0) if isinstance(start, dict) else params.get("startY", 0) end_x = end.get("x", 0) if isinstance(end, dict) else params.get("endX", 0) end_y = end.get("y", 0) if isinstance(end, dict) else params.get("endY", 0) success = self.ipc_board_api.add_track( start_x=start_x, start_y=start_y, end_x=end_x, end_y=end_y, width=width, layer=layer, net_name=net ) return { "success": success, "message": "Added trace (visible in KiCAD UI)" if success else "Failed to add trace", "trace": { "start": {"x": start_x, "y": start_y, "unit": "mm"}, "end": {"x": end_x, "y": end_y, "unit": "mm"}, "layer": layer, "width": width, "net": net } } except Exception as e: logger.error(f"IPC route_trace error: {e}") return {"success": False, "message": str(e)} def _ipc_add_via(self, params): """IPC handler for add_via - adds via with real-time UI update""" try: position = params.get("position", {}) x = position.get("x", 0) if isinstance(position, dict) else params.get("x", 0) y = position.get("y", 0) if isinstance(position, dict) else params.get("y", 0) size = params.get("size", 0.8) drill = params.get("drill", 0.4) net = params.get("net") from_layer = params.get("from_layer", "F.Cu") to_layer = params.get("to_layer", "B.Cu") success = self.ipc_board_api.add_via( x=x, y=y, diameter=size, drill=drill, net_name=net, via_type="through" ) return { "success": success, "message": "Added via (visible in KiCAD UI)" if success else "Failed to add via", "via": { "position": {"x": x, "y": y, "unit": "mm"}, "size": size, "drill": drill, "from_layer": from_layer, "to_layer": to_layer, "net": net } } except Exception as e: logger.error(f"IPC add_via error: {e}") return {"success": False, "message": str(e)} def _ipc_add_net(self, params): """IPC handler for add_net""" # Note: Net creation via IPC is limited - nets are typically created # when components are placed. Return success for compatibility. name = params.get("name") logger.info(f"IPC add_net: {name} (nets auto-created with components)") return { "success": True, "message": f"Net '{name}' will be created when components are connected", "net": {"name": name} } def _ipc_add_text(self, params): """IPC handler for add_text/add_board_text - adds text with real-time UI update""" try: text = params.get("text", "") position = params.get("position", {}) x = position.get("x", 0) if isinstance(position, dict) else params.get("x", 0) y = position.get("y", 0) if isinstance(position, dict) else params.get("y", 0) layer = params.get("layer", "F.SilkS") size = params.get("size", 1.0) rotation = params.get("rotation", 0) success = self.ipc_board_api.add_text( text=text, x=x, y=y, layer=layer, size=size, rotation=rotation ) return { "success": success, "message": f"Added text '{text}' (visible in KiCAD UI)" if success else "Failed to add text" } except Exception as e: logger.error(f"IPC add_text error: {e}") return {"success": False, "message": str(e)} def _ipc_set_board_size(self, params): """IPC handler for set_board_size""" try: width = params.get("width", 100) height = params.get("height", 100) unit = params.get("unit", "mm") success = self.ipc_board_api.set_size(width, height, unit) return { "success": success, "message": f"Board size set to {width}x{height} {unit} (visible in KiCAD UI)" if success else "Failed to set board size", "boardSize": {"width": width, "height": height, "unit": unit} } except Exception as e: logger.error(f"IPC set_board_size error: {e}") return {"success": False, "message": str(e)} def _ipc_get_board_info(self, params): """IPC handler for get_board_info""" try: size = self.ipc_board_api.get_size() components = self.ipc_board_api.list_components() tracks = self.ipc_board_api.get_tracks() vias = self.ipc_board_api.get_vias() nets = self.ipc_board_api.get_nets() return { "success": True, "boardInfo": { "size": size, "componentCount": len(components), "trackCount": len(tracks), "viaCount": len(vias), "netCount": len(nets), "backend": "ipc", "realtime": True } } except Exception as e: logger.error(f"IPC get_board_info error: {e}") return {"success": False, "message": str(e)} def _ipc_place_component(self, params): """IPC handler for place_component - places component with real-time UI update""" try: reference = params.get("reference", params.get("componentId", "")) footprint = params.get("footprint", "") position = params.get("position", {}) x = position.get("x", 0) if isinstance(position, dict) else params.get("x", 0) y = position.get("y", 0) if isinstance(position, dict) else params.get("y", 0) rotation = params.get("rotation", 0) layer = params.get("layer", "F.Cu") value = params.get("value", "") success = self.ipc_board_api.place_component( reference=reference, footprint=footprint, x=x, y=y, rotation=rotation, layer=layer, value=value ) return { "success": success, "message": f"Placed component {reference} (visible in KiCAD UI)" if success else "Failed to place component", "component": { "reference": reference, "footprint": footprint, "position": {"x": x, "y": y, "unit": "mm"}, "rotation": rotation, "layer": layer } } except Exception as e: logger.error(f"IPC place_component error: {e}") return {"success": False, "message": str(e)} def _ipc_move_component(self, params): """IPC handler for move_component - moves component with real-time UI update""" try: reference = params.get("reference", params.get("componentId", "")) position = params.get("position", {}) x = position.get("x", 0) if isinstance(position, dict) else params.get("x", 0) y = position.get("y", 0) if isinstance(position, dict) else params.get("y", 0) rotation = params.get("rotation") success = self.ipc_board_api.move_component( reference=reference, x=x, y=y, rotation=rotation ) return { "success": success, "message": f"Moved component {reference} (visible in KiCAD UI)" if success else "Failed to move component" } except Exception as e: logger.error(f"IPC move_component error: {e}") return {"success": False, "message": str(e)} def _ipc_delete_component(self, params): """IPC handler for delete_component - deletes component with real-time UI update""" try: reference = params.get("reference", params.get("componentId", "")) success = self.ipc_board_api.delete_component(reference=reference) return { "success": success, "message": f"Deleted component {reference} (visible in KiCAD UI)" if success else "Failed to delete component" } except Exception as e: logger.error(f"IPC delete_component error: {e}") return {"success": False, "message": str(e)} def _ipc_get_component_list(self, params): """IPC handler for get_component_list""" try: components = self.ipc_board_api.list_components() return { "success": True, "components": components, "count": len(components) } except Exception as e: logger.error(f"IPC get_component_list error: {e}") return {"success": False, "message": str(e)} def _ipc_save_project(self, params): """IPC handler for save_project""" try: success = self.ipc_board_api.save() return { "success": success, "message": "Project saved" if success else "Failed to save project" } except Exception as e: logger.error(f"IPC save_project error: {e}") return {"success": False, "message": str(e)} # ========================================================================= # Legacy IPC command handlers (explicit ipc_* commands) # ========================================================================= def _handle_get_backend_info(self, params): """Get information about the current backend""" return { "success": True, "backend": "ipc" if self.use_ipc else "swig", "realtime_sync": self.use_ipc, "ipc_connected": self.ipc_backend.is_connected() if self.ipc_backend else False, "version": self.ipc_backend.get_version() if self.ipc_backend else "N/A", "message": "Using IPC backend with real-time UI sync" if self.use_ipc else "Using SWIG backend (requires manual reload)" } def _handle_ipc_add_track(self, params): """Add a track using IPC backend (real-time)""" if not self.use_ipc or not self.ipc_board_api: return {"success": False, "message": "IPC backend not available"} try: success = self.ipc_board_api.add_track( start_x=params.get("startX", 0), start_y=params.get("startY", 0), end_x=params.get("endX", 0), end_y=params.get("endY", 0), width=params.get("width", 0.25), layer=params.get("layer", "F.Cu"), net_name=params.get("net") ) return { "success": success, "message": "Track added (visible in KiCAD UI)" if success else "Failed to add track", "realtime": True } except Exception as e: logger.error(f"Error adding track via IPC: {e}") return {"success": False, "message": str(e)} def _handle_ipc_add_via(self, params): """Add a via using IPC backend (real-time)""" if not self.use_ipc or not self.ipc_board_api: return {"success": False, "message": "IPC backend not available"} try: success = self.ipc_board_api.add_via( x=params.get("x", 0), y=params.get("y", 0), diameter=params.get("diameter", 0.8), drill=params.get("drill", 0.4), net_name=params.get("net"), via_type=params.get("type", "through") ) return { "success": success, "message": "Via added (visible in KiCAD UI)" if success else "Failed to add via", "realtime": True } except Exception as e: logger.error(f"Error adding via via IPC: {e}") return {"success": False, "message": str(e)} def _handle_ipc_add_text(self, params): """Add text using IPC backend (real-time)""" if not self.use_ipc or not self.ipc_board_api: return {"success": False, "message": "IPC backend not available"} try: success = self.ipc_board_api.add_text( text=params.get("text", ""), x=params.get("x", 0), y=params.get("y", 0), layer=params.get("layer", "F.SilkS"), size=params.get("size", 1.0), rotation=params.get("rotation", 0) ) return { "success": success, "message": "Text added (visible in KiCAD UI)" if success else "Failed to add text", "realtime": True } except Exception as e: logger.error(f"Error adding text via IPC: {e}") return {"success": False, "message": str(e)} def _handle_ipc_list_components(self, params): """List components using IPC backend""" if not self.use_ipc or not self.ipc_board_api: return {"success": False, "message": "IPC backend not available"} try: components = self.ipc_board_api.list_components() return { "success": True, "components": components, "count": len(components) } except Exception as e: logger.error(f"Error listing components via IPC: {e}") return {"success": False, "message": str(e)} def _handle_ipc_get_tracks(self, params): """Get tracks using IPC backend""" if not self.use_ipc or not self.ipc_board_api: return {"success": False, "message": "IPC backend not available"} try: tracks = self.ipc_board_api.get_tracks() return { "success": True, "tracks": tracks, "count": len(tracks) } except Exception as e: logger.error(f"Error getting tracks via IPC: {e}") return {"success": False, "message": str(e)} def _handle_ipc_get_vias(self, params): """Get vias using IPC backend""" if not self.use_ipc or not self.ipc_board_api: return {"success": False, "message": "IPC backend not available"} try: vias = self.ipc_board_api.get_vias() return { "success": True, "vias": vias, "count": len(vias) } except Exception as e: logger.error(f"Error getting vias via IPC: {e}") return {"success": False, "message": str(e)} def _handle_ipc_save_board(self, params): """Save board using IPC backend""" if not self.use_ipc or not self.ipc_board_api: return {"success": False, "message": "IPC backend not available"} try: success = self.ipc_board_api.save() return { "success": success, "message": "Board saved" if success else "Failed to save board" } except Exception as e: logger.error(f"Error saving board via IPC: {e}") return {"success": False, "message": str(e)} def main(): """Main entry point""" logger.info("Starting KiCAD interface...") interface = KiCADInterface() try: logger.info("Processing commands from stdin...") # Process commands from stdin for line in sys.stdin: try: # Parse command logger.debug(f"Received input: {line.strip()}") command_data = json.loads(line) # Check if this is JSON-RPC 2.0 format if 'jsonrpc' in command_data and command_data['jsonrpc'] == '2.0': logger.info("Detected JSON-RPC 2.0 format message") method = command_data.get('method') params = command_data.get('params', {}) request_id = command_data.get('id') # Handle MCP protocol methods if method == 'initialize': logger.info("Handling MCP initialize") response = { 'jsonrpc': '2.0', 'id': request_id, 'result': { 'protocolVersion': '2025-06-18', 'capabilities': { 'tools': { 'listChanged': True }, 'resources': { 'subscribe': False, 'listChanged': True } }, 'serverInfo': { 'name': 'kicad-mcp-server', 'title': 'KiCAD PCB Design Assistant', 'version': '2.1.0-alpha' }, 'instructions': 'AI-assisted PCB design with KiCAD. Use tools to create projects, design boards, place components, route traces, and export manufacturing files.' } } elif method == 'tools/list': logger.info("Handling MCP tools/list") # Return list of available tools with proper schemas tools = [] for cmd_name in interface.command_routes.keys(): # Get schema from TOOL_SCHEMAS if available if cmd_name in TOOL_SCHEMAS: tool_def = TOOL_SCHEMAS[cmd_name].copy() tools.append(tool_def) else: # Fallback for tools without schemas logger.warning(f"No schema defined for tool: {cmd_name}") tools.append({ 'name': cmd_name, 'description': f'KiCAD command: {cmd_name}', 'inputSchema': { 'type': 'object', 'properties': {} } }) logger.info(f"Returning {len(tools)} tools") response = { 'jsonrpc': '2.0', 'id': request_id, 'result': { 'tools': tools } } elif method == 'tools/call': logger.info("Handling MCP tools/call") tool_name = params.get('name') tool_params = params.get('arguments', {}) # Execute the command result = interface.handle_command(tool_name, tool_params) response = { 'jsonrpc': '2.0', 'id': request_id, 'result': { 'content': [ { 'type': 'text', 'text': json.dumps(result) } ] } } elif method == 'resources/list': logger.info("Handling MCP resources/list") # Return list of available resources response = { 'jsonrpc': '2.0', 'id': request_id, 'result': { 'resources': RESOURCE_DEFINITIONS } } elif method == 'resources/read': logger.info("Handling MCP resources/read") resource_uri = params.get('uri') if not resource_uri: response = { 'jsonrpc': '2.0', 'id': request_id, 'error': { 'code': -32602, 'message': 'Missing required parameter: uri' } } else: # Read the resource resource_data = handle_resource_read(resource_uri, interface) response = { 'jsonrpc': '2.0', 'id': request_id, 'result': resource_data } else: logger.error(f"Unknown JSON-RPC method: {method}") response = { 'jsonrpc': '2.0', 'id': request_id, 'error': { 'code': -32601, 'message': f'Method not found: {method}' } } else: # Handle legacy custom format logger.info("Detected custom format message") command = command_data.get("command") params = command_data.get("params", {}) if not command: logger.error("Missing command field") response = { "success": False, "message": "Missing command", "errorDetails": "The command field is required" } else: # Handle command response = interface.handle_command(command, params) # Send response logger.debug(f"Sending response: {response}") print(json.dumps(response)) sys.stdout.flush() except json.JSONDecodeError as e: logger.error(f"Invalid JSON input: {str(e)}") response = { "success": False, "message": "Invalid JSON input", "errorDetails": str(e) } print(json.dumps(response)) sys.stdout.flush() except KeyboardInterrupt: logger.info("KiCAD interface stopped") sys.exit(0) except Exception as e: logger.error(f"Unexpected error: {str(e)}\n{traceback.format_exc()}") sys.exit(1) if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/mixelpixx/KiCAD-MCP-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server