Skip to main content
Glama
server.ts16.9 kB
/** * KiCAD MCP Server implementation */ import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'; import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js'; import express from 'express'; import { spawn, exec, ChildProcess } from 'child_process'; import { existsSync } from 'fs'; import { join, dirname } from 'path'; import { logger } from './logger.js'; // Import tool registration functions import { registerProjectTools } from './tools/project.js'; import { registerBoardTools } from './tools/board.js'; import { registerComponentTools } from './tools/component.js'; import { registerRoutingTools } from './tools/routing.js'; import { registerDesignRuleTools } from './tools/design-rules.js'; import { registerExportTools } from './tools/export.js'; import { registerSchematicTools } from './tools/schematic.js'; import { registerLibraryTools } from './tools/library.js'; import { registerUITools } from './tools/ui.js'; // Import resource registration functions import { registerProjectResources } from './resources/project.js'; import { registerBoardResources } from './resources/board.js'; import { registerComponentResources } from './resources/component.js'; import { registerLibraryResources } from './resources/library.js'; // Import prompt registration functions import { registerComponentPrompts } from './prompts/component.js'; import { registerRoutingPrompts } from './prompts/routing.js'; import { registerDesignPrompts } from './prompts/design.js'; /** * Find the Python executable to use * Prioritizes virtual environment if available, falls back to system Python */ function findPythonExecutable(scriptPath: string): string { const isWindows = process.platform === 'win32'; // Get the project root (parent of the python/ directory) const projectRoot = dirname(dirname(scriptPath)); // Check for virtual environment const venvPaths = [ join(projectRoot, 'venv', isWindows ? 'Scripts' : 'bin', isWindows ? 'python.exe' : 'python'), join(projectRoot, '.venv', isWindows ? 'Scripts' : 'bin', isWindows ? 'python.exe' : 'python'), ]; for (const venvPath of venvPaths) { if (existsSync(venvPath)) { logger.info(`Found virtual environment Python at: ${venvPath}`); return venvPath; } } // Fall back to system Python or environment-specified Python if (isWindows && process.env.KICAD_PYTHON) { // Allow override via KICAD_PYTHON environment variable return process.env.KICAD_PYTHON; } else if (isWindows && process.env.PYTHONPATH?.includes('KiCad')) { // Windows: Try KiCAD's bundled Python const kicadPython = 'C:\\Program Files\\KiCad\\9.0\\bin\\python.exe'; if (existsSync(kicadPython)) { return kicadPython; } } // Default to system Python logger.info('Using system Python (no venv found)'); return isWindows ? 'python.exe' : 'python3'; } /** * KiCAD MCP Server class */ export class KiCADMcpServer { private server: McpServer; private pythonProcess: ChildProcess | null = null; private kicadScriptPath: string; private stdioTransport!: StdioServerTransport; private requestQueue: Array<{ request: any, resolve: Function, reject: Function }> = []; private processingRequest = false; /** * Constructor for the KiCAD MCP Server * @param kicadScriptPath Path to the Python KiCAD interface script * @param logLevel Log level for the server */ constructor( kicadScriptPath: string, logLevel: 'error' | 'warn' | 'info' | 'debug' = 'info' ) { // Set up the logger logger.setLogLevel(logLevel); // Check if KiCAD script exists this.kicadScriptPath = kicadScriptPath; if (!existsSync(this.kicadScriptPath)) { throw new Error(`KiCAD interface script not found: ${this.kicadScriptPath}`); } // Initialize the MCP server this.server = new McpServer({ name: 'kicad-mcp-server', version: '1.0.0', description: 'MCP server for KiCAD PCB design operations' }); // Initialize STDIO transport this.stdioTransport = new StdioServerTransport(); logger.info('Using STDIO transport for local communication'); // Register tools, resources, and prompts this.registerAll(); } /** * Register all tools, resources, and prompts */ private registerAll(): void { logger.info('Registering KiCAD tools, resources, and prompts...'); // Register all tools registerProjectTools(this.server, this.callKicadScript.bind(this)); registerBoardTools(this.server, this.callKicadScript.bind(this)); registerComponentTools(this.server, this.callKicadScript.bind(this)); registerRoutingTools(this.server, this.callKicadScript.bind(this)); registerDesignRuleTools(this.server, this.callKicadScript.bind(this)); registerExportTools(this.server, this.callKicadScript.bind(this)); registerSchematicTools(this.server, this.callKicadScript.bind(this)); registerLibraryTools(this.server, this.callKicadScript.bind(this)); registerUITools(this.server, this.callKicadScript.bind(this)); // Register all resources registerProjectResources(this.server, this.callKicadScript.bind(this)); registerBoardResources(this.server, this.callKicadScript.bind(this)); registerComponentResources(this.server, this.callKicadScript.bind(this)); registerLibraryResources(this.server, this.callKicadScript.bind(this)); // Register all prompts registerComponentPrompts(this.server); registerRoutingPrompts(this.server); registerDesignPrompts(this.server); logger.info('All KiCAD tools, resources, and prompts registered'); } /** * Validate prerequisites before starting the server */ private async validatePrerequisites(pythonExe: string): Promise<boolean> { const isWindows = process.platform === 'win32'; const errors: string[] = []; // Check if Python executable exists if (!existsSync(pythonExe)) { errors.push(`Python executable not found: ${pythonExe}`); if (isWindows) { errors.push('Windows: Install KiCAD 9.0+ from https://www.kicad.org/download/windows/'); errors.push('Or run: .\\setup-windows.ps1 for automatic configuration'); } } // Check if kicad_interface.py exists if (!existsSync(this.kicadScriptPath)) { errors.push(`KiCAD interface script not found: ${this.kicadScriptPath}`); } // Check if dist/index.js exists (if running from compiled code) const distPath = join(dirname(dirname(this.kicadScriptPath)), 'dist', 'index.js'); if (!existsSync(distPath)) { errors.push('Project not built. Run: npm run build'); } // Try to test pcbnew import (quick validation) if (existsSync(pythonExe) && existsSync(this.kicadScriptPath)) { logger.info('Validating pcbnew module access...'); const testCommand = `"${pythonExe}" -c "import pcbnew; print('OK')"`; try { const { stdout, stderr } = await new Promise<{stdout: string, stderr: string}>((resolve, reject) => { exec(testCommand, { timeout: 5000, env: { ...process.env } }, (error: any, stdout: string, stderr: string) => { if (error) { reject(error); } else { resolve({ stdout, stderr }); } }); }); if (!stdout.includes('OK')) { errors.push('pcbnew module import test failed'); errors.push(`Output: ${stdout}`); errors.push(`Errors: ${stderr}`); if (isWindows) { errors.push(''); errors.push('Windows troubleshooting:'); errors.push('1. Set PYTHONPATH=C:\\Program Files\\KiCad\\9.0\\lib\\python3\\dist-packages'); errors.push('2. Test: "C:\\Program Files\\KiCad\\9.0\\bin\\python.exe" -c "import pcbnew"'); errors.push('3. Run: .\\setup-windows.ps1 for automatic fix'); errors.push('4. See: docs/WINDOWS_TROUBLESHOOTING.md'); } } else { logger.info('✓ pcbnew module validated successfully'); } } catch (error: any) { errors.push(`pcbnew validation failed: ${error.message}`); if (isWindows) { errors.push(''); errors.push('This usually means:'); errors.push('- KiCAD is not installed'); errors.push('- PYTHONPATH is incorrect'); errors.push('- Python cannot find pcbnew module'); errors.push(''); errors.push('Quick fix: Run .\\setup-windows.ps1'); } } } // Log all errors if (errors.length > 0) { logger.error('='.repeat(70)); logger.error('STARTUP VALIDATION FAILED'); logger.error('='.repeat(70)); errors.forEach(err => logger.error(err)); logger.error('='.repeat(70)); // Also write to stderr for Claude Desktop to capture process.stderr.write('\n' + '='.repeat(70) + '\n'); process.stderr.write('KiCAD MCP Server - Startup Validation Failed\n'); process.stderr.write('='.repeat(70) + '\n'); errors.forEach(err => process.stderr.write(err + '\n')); process.stderr.write('='.repeat(70) + '\n\n'); return false; } return true; } /** * Start the MCP server and the Python KiCAD interface */ async start(): Promise<void> { try { logger.info('Starting KiCAD MCP server...'); // Start the Python process for KiCAD scripting logger.info(`Starting Python process with script: ${this.kicadScriptPath}`); const pythonExe = findPythonExecutable(this.kicadScriptPath); logger.info(`Using Python executable: ${pythonExe}`); // Validate prerequisites const isValid = await this.validatePrerequisites(pythonExe); if (!isValid) { throw new Error('Prerequisites validation failed. See logs above for details.'); } this.pythonProcess = spawn(pythonExe, [this.kicadScriptPath], { stdio: ['pipe', 'pipe', 'pipe'], env: { ...process.env, PYTHONPATH: process.env.PYTHONPATH || 'C:/Program Files/KiCad/9.0/lib/python3/dist-packages' } }); // Listen for process exit this.pythonProcess.on('exit', (code, signal) => { logger.warn(`Python process exited with code ${code} and signal ${signal}`); this.pythonProcess = null; }); // Listen for process errors this.pythonProcess.on('error', (err) => { logger.error(`Python process error: ${err.message}`); }); // Set up error logging for stderr if (this.pythonProcess.stderr) { this.pythonProcess.stderr.on('data', (data: Buffer) => { logger.error(`Python stderr: ${data.toString()}`); }); } // Connect server to STDIO transport logger.info('Connecting MCP server to STDIO transport...'); try { await this.server.connect(this.stdioTransport); logger.info('Successfully connected to STDIO transport'); } catch (error) { logger.error(`Failed to connect to STDIO transport: ${error}`); throw error; } // Write a ready message to stderr (for debugging) process.stderr.write('KiCAD MCP SERVER READY\n'); logger.info('KiCAD MCP server started and ready'); } catch (error) { logger.error(`Failed to start KiCAD MCP server: ${error}`); throw error; } } /** * Stop the MCP server and clean up resources */ async stop(): Promise<void> { logger.info('Stopping KiCAD MCP server...'); // Kill the Python process if it's running if (this.pythonProcess) { this.pythonProcess.kill(); this.pythonProcess = null; } logger.info('KiCAD MCP server stopped'); } /** * Call the KiCAD scripting interface to execute commands * * @param command The command to execute * @param params The parameters for the command * @returns The result of the command execution */ private async callKicadScript(command: string, params: any): Promise<any> { return new Promise((resolve, reject) => { // Check if Python process is running if (!this.pythonProcess) { logger.error('Python process is not running'); reject(new Error("Python process for KiCAD scripting is not running")); return; } // Determine timeout based on command type // DRC and export operations need longer timeouts for large boards let commandTimeout = 30000; // Default 30 seconds const longRunningCommands = ['run_drc', 'export_gerber', 'export_pdf', 'export_3d']; if (longRunningCommands.includes(command)) { commandTimeout = 600000; // 10 minutes for long operations logger.info(`Using extended timeout (${commandTimeout/1000}s) for command: ${command}`); } // Add request to queue with timeout info this.requestQueue.push({ request: { command, params, timeout: commandTimeout }, resolve, reject }); // Process the queue if not already processing if (!this.processingRequest) { this.processNextRequest(); } }); } /** * Process the next request in the queue */ private processNextRequest(): void { // If no more requests or already processing, return if (this.requestQueue.length === 0 || this.processingRequest) { return; } // Set processing flag this.processingRequest = true; // Get the next request const { request, resolve, reject } = this.requestQueue.shift()!; try { logger.debug(`Processing KiCAD command: ${request.command}`); // Format the command and parameters as JSON const requestStr = JSON.stringify(request); // Set up response handling let responseData = ''; let timeoutHandle: NodeJS.Timeout | null = null; // Clear any previous listeners if (this.pythonProcess?.stdout) { this.pythonProcess.stdout.removeAllListeners('data'); this.pythonProcess.stdout.removeAllListeners('end'); } // Set up new listeners if (this.pythonProcess?.stdout) { this.pythonProcess.stdout.on('data', (data: Buffer) => { const chunk = data.toString(); logger.debug(`Received data chunk: ${chunk.length} bytes`); responseData += chunk; // Check if we have a complete response try { // Try to parse the response as JSON const result = JSON.parse(responseData); // If we get here, we have a valid JSON response logger.debug(`Completed KiCAD command: ${request.command} with result: ${result.success ? 'success' : 'failure'}`); // Clear the timeout since we got a response if (timeoutHandle) { clearTimeout(timeoutHandle); } // Reset processing flag this.processingRequest = false; // Process next request if any setTimeout(() => this.processNextRequest(), 0); // Clear listeners if (this.pythonProcess?.stdout) { this.pythonProcess.stdout.removeAllListeners('data'); this.pythonProcess.stdout.removeAllListeners('end'); } // Resolve the promise with the result resolve(result); } catch (e) { // Not a complete JSON yet, keep collecting data } }); } // Set a timeout (use command-specific timeout or default) const timeoutDuration = request.timeout || 30000; timeoutHandle = setTimeout(() => { logger.error(`Command timeout after ${timeoutDuration/1000}s: ${request.command}`); // Clear listeners if (this.pythonProcess?.stdout) { this.pythonProcess.stdout.removeAllListeners('data'); this.pythonProcess.stdout.removeAllListeners('end'); } // Reset processing flag this.processingRequest = false; // Process next request setTimeout(() => this.processNextRequest(), 0); // Reject the promise reject(new Error(`Command timeout after ${timeoutDuration/1000}s: ${request.command}`)); }, timeoutDuration); // Write the request to the Python process logger.debug(`Sending request: ${requestStr}`); this.pythonProcess?.stdin?.write(requestStr + '\n'); } catch (error) { logger.error(`Error processing request: ${error}`); // Reset processing flag this.processingRequest = false; // Process next request setTimeout(() => this.processNextRequest(), 0); // Reject the promise reject(error); } } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/mixelpixx/KiCAD-MCP-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server