Skip to main content
Glama

CODESYS MCP Toolkit

server.txt83.5 kB
/** * server.ts * MCP Server for interacting with CODESYS via Python scripting. * Implements all MCP resources and tools that interact with the CODESYS environment. * * IMPORTANT: This server receives configuration as parameters from bin.ts, * which helps avoid issues with command-line argument passing in different execution environments. * (Incorporates script templates from v1.6.9) */ // --- Import 'os' FIRST --- import * as os from 'os'; // --- End Import 'os' --- // --- STARTUP LOG --- console.error(`>>> SERVER.TS TOP LEVEL EXECUTION @ ${new Date().toISOString()} <<<`); console.error(`>>> Node: ${process.version}, Platform: ${os.platform()}, Arch: ${os.arch()}`); console.error(`>>> Initial CWD: ${process.cwd()}`); // --- End Startup Log --- // --- Necessary Imports --- import { McpServer, ResourceTemplate } from "@modelcontextprotocol/sdk/server/mcp.js"; import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js"; import { z } from "zod"; import { executeCodesysScript } from "./codesys_interop"; // Assumes this utility exists import * as path from 'path'; import { stat } from "fs/promises"; // For file existence check (async) import * as fsPromises from 'fs/promises'; // Use promises version of fs // --- End Imports --- // --- Define an interface for configuration --- interface ServerConfig { codesysPath: string; profileName: string; workspaceDir: string; } // --- Wrap server logic in an exported function --- export async function startMcpServer(config: ServerConfig) { console.error(`>>> SERVER.TS startMcpServer() CALLED @ ${new Date().toISOString()} <<<`); console.error(`>>> Config Received: ${JSON.stringify(config)}`); // --- Use config values directly --- const WORKSPACE_DIR = config.workspaceDir; const codesysExePath = config.codesysPath; const codesysProfileName = config.profileName; console.error(`SERVER.TS: Using Workspace Directory: ${WORKSPACE_DIR}`); console.error(`SERVER.TS: Using CODESYS Path: ${codesysExePath}`); console.error(`SERVER.TS: Using CODESYS Profile: ${codesysProfileName}`); // --- Sanity check - confirm the path exists if possible --- // This helps catch configuration issues early and prevents runtime failures console.error(`SERVER.TS: Checking existence of CODESYS executable: ${codesysExePath}`); try { // Using sync check here as it's part of initial setup before async operations start const fsChecker = require('fs'); if (!fsChecker.existsSync(codesysExePath)) { console.error(`SERVER.TS ERROR: Determined CODESYS executable path does not exist: ${codesysExePath}`); // Consider throwing an error instead of exiting if bin.ts handles the catch throw new Error(`CODESYS executable not found at specified path: ${codesysExePath}`); // process.exit(1); // Avoid process.exit inside library functions if possible } else { console.error(`SERVER.TS: Confirmed CODESYS executable exists.`); } } catch (err: any) { console.error(`SERVER.TS ERROR: Error checking CODESYS path existence: ${err.message}`); throw err; // Rethrow the error to be caught by the caller (bin.ts) // process.exit(1); } // --- End Configuration Handling --- // --- Helper Function (fileExists - async version) --- async function fileExists(filePath: string): Promise<boolean> { try { await stat(filePath); return true; } catch (error: any) { if (error.code === 'ENOENT') { return false; // File does not exist } throw error; // Other error } } // --- End Helper Function --- // --- MCP Server Initialization --- console.error("SERVER.TS: Initializing McpServer..."); const server = new McpServer({ name: "CODESYS Control MCP Server", version: "1.7.1", // Update version as needed capabilities: { resources: {}, tools: {} } }); console.error("SERVER.TS: McpServer instance created."); // --- End MCP Server Initialization --- // --- Python Script Templates (Imported from v1.6.9) --- const ENSURE_PROJECT_OPEN_PYTHON_SNIPPET = ` import sys import scriptengine as script_engine import os import time import traceback # --- Function to ensure the correct project is open --- MAX_RETRIES = 3 RETRY_DELAY = 2.0 # seconds (use float for time.sleep) def ensure_project_open(target_project_path): print("DEBUG: Ensuring project is open: %s" % target_project_path) # Normalize target path once normalized_target_path = os.path.normcase(os.path.abspath(target_project_path)) for attempt in range(MAX_RETRIES): print("DEBUG: Ensure project attempt %d/%d for %s" % (attempt + 1, MAX_RETRIES, normalized_target_path)) primary_project = None try: # Getting primary project might fail if CODESYS instance is unstable primary_project = script_engine.projects.primary except Exception as primary_err: print("WARN: Error getting primary project: %s. Assuming none." % primary_err) # traceback.print_exc() # Optional: Print stack trace for this error primary_project = None current_project_path = "" project_ok = False # Flag to check if target is confirmed primary and accessible if primary_project: try: # Getting path should be relatively safe if primary_project object exists current_project_path = os.path.normcase(os.path.abspath(primary_project.path)) print("DEBUG: Current primary project path: %s" % current_project_path) if current_project_path == normalized_target_path: # Found the right project as primary, now check if it's usable print("DEBUG: Target project path matches primary. Checking access...") try: # Try a relatively safe operation to confirm object usability # Getting children count is a reasonable check _ = len(primary_project.get_children(False)) print("DEBUG: Target project '%s' is primary and accessible." % target_project_path) project_ok = True return primary_project # SUCCESS CASE 1: Already open and accessible except Exception as access_err: # Project found, but accessing it failed. Might be unstable. print("WARN: Primary project access check failed for '%s': %s. Will attempt reopen." % (current_project_path, access_err)) # traceback.print_exc() # Optional: Print stack trace primary_project = None # Force reopen by falling through else: # A *different* project is primary print("DEBUG: Primary project is '%s', not the target '%s'." % (current_project_path, normalized_target_path)) # Consider closing the wrong project if causing issues, but for now, just open target # try: # print("DEBUG: Closing incorrect primary project '%s'..." % current_project_path) # primary_project.close() # Be careful with unsaved changes # except Exception as close_err: # print("WARN: Failed to close incorrect primary project: %s" % close_err) primary_project = None # Force open target project except Exception as path_err: # Failed even to get the path of the supposed primary project print("WARN: Could not get path of current primary project: %s. Assuming not the target." % path_err) # traceback.print_exc() # Optional: Print stack trace primary_project = None # Force open target project # If target project not confirmed as primary and accessible, attempt to open/reopen if not project_ok: # Log clearly whether we are opening initially or reopening if primary_project is None and current_project_path == "": print("DEBUG: No primary project detected. Attempting to open target: %s" % target_project_path) elif primary_project is None and current_project_path != "": print("DEBUG: Primary project was '%s' but failed access check or needed close. Attempting to open target: %s" % (current_project_path, target_project_path)) else: # Includes cases where wrong project was open print("DEBUG: Target project not primary or initial check failed. Attempting to open/reopen: %s" % target_project_path) try: # Set flags for silent opening, handle potential attribute errors update_mode = script_engine.VersionUpdateFlags.NoUpdates | script_engine.VersionUpdateFlags.SilentMode # try: # update_mode = script_engine.VersionUpdateFlags.NoUpdates | script_engine.VersionUpdateFlags.SilentMode # except AttributeError: # print("WARN: VersionUpdateFlags not found, using integer flags for open (1 | 2 = 3).") # update_mode = 3 # 1=NoUpdates, 2=SilentMode opened_project = None try: # The actual open call print("DEBUG: Calling script_engine.projects.open('%s', update_flags=%s)..." % (target_project_path, update_mode)) opened_project = script_engine.projects.open(target_project_path, update_flags=update_mode) if not opened_project: # This is a critical failure if open returns None without exception print("ERROR: projects.open returned None for %s on attempt %d" % (target_project_path, attempt + 1)) # Allow retry loop to continue else: # Open call returned *something*, let's verify print("DEBUG: projects.open call returned an object for: %s" % target_project_path) print("DEBUG: Pausing for stabilization after open...") time.sleep(RETRY_DELAY) # Give CODESYS time # Re-verify: Is the project now primary and accessible? recheck_primary = None try: recheck_primary = script_engine.projects.primary except Exception as recheck_primary_err: print("WARN: Error getting primary project after reopen: %s" % recheck_primary_err) if recheck_primary: recheck_path = "" try: # Getting path might fail recheck_path = os.path.normcase(os.path.abspath(recheck_primary.path)) except Exception as recheck_path_err: print("WARN: Failed to get path after reopen: %s" % recheck_path_err) if recheck_path == normalized_target_path: print("DEBUG: Target project confirmed as primary after reopening.") try: # Final sanity check _ = len(recheck_primary.get_children(False)) print("DEBUG: Reopened project basic access confirmed.") return recheck_primary # SUCCESS CASE 2: Successfully opened/reopened except Exception as access_err_reopen: print("WARN: Reopened project (%s) basic access check failed: %s." % (normalized_target_path, access_err_reopen)) # traceback.print_exc() # Optional # Allow retry loop to continue else: print("WARN: Different project is primary after reopening! Expected '%s', got '%s'." % (normalized_target_path, recheck_path)) # Allow retry loop to continue, maybe it fixes itself else: print("WARN: No primary project found after reopening attempt %d!" % (attempt+1)) # Allow retry loop to continue except Exception as open_err: # Catch errors during the open call itself print("ERROR: Exception during projects.open call on attempt %d: %s" % (attempt + 1, open_err)) traceback.print_exc() # Crucial for diagnosing open failures # Allow retry loop to continue except Exception as outer_open_err: # Catch errors in the flag setup etc. print("ERROR: Unexpected error during open setup/logic attempt %d: %s" % (attempt + 1, outer_open_err)) traceback.print_exc() # If we didn't return successfully in this attempt, wait before retrying if attempt < MAX_RETRIES - 1: print("DEBUG: Ensure project attempt %d did not succeed. Waiting %f seconds..." % (attempt + 1, RETRY_DELAY)) time.sleep(RETRY_DELAY) else: # Last attempt failed print("ERROR: Failed all ensure_project_open attempts for %s." % normalized_target_path) # If all retries fail after the loop raise RuntimeError("Failed to ensure project '%s' is open and accessible after %d attempts." % (target_project_path, MAX_RETRIES)) # --- End of function --- # Placeholder for the project file path (must be set in scripts using this snippet) PROJECT_FILE_PATH = r"{PROJECT_FILE_PATH}" `; const CHECK_STATUS_SCRIPT = ` import sys, scriptengine as script_engine, os, traceback project_open = False; project_name = "No project open"; project_path = "N/A"; scripting_ok = False try: scripting_ok = True; primary_project = script_engine.projects.primary if primary_project: project_open = True try: project_path = os.path.normcase(os.path.abspath(primary_project.path)) try: project_name = primary_project.get_name() # Might fail if not project_name: project_name = "Unnamed (path: %s)" % os.path.basename(project_path) except: project_name = "Unnamed (path: %s)" % os.path.basename(project_path) except Exception as e_path: project_path = "N/A (Error: %s)" % e_path; project_name = "Unnamed (Path Error)" print("Project Open: %s" % project_open); print("Project Name: %s" % project_name) print("Project Path: %s" % project_path); print("Scripting OK: %s" % scripting_ok) print("SCRIPT_SUCCESS: Status check complete."); sys.exit(0) except Exception as e: error_message = "Error during status check: %s" % e print(error_message); print("Scripting OK: False") # traceback.print_exc() # Optional traceback print("SCRIPT_ERROR: %s" % error_message); sys.exit(1) `; const OPEN_PROJECT_SCRIPT_TEMPLATE = ` import sys, scriptengine as script_engine, os, traceback ${ENSURE_PROJECT_OPEN_PYTHON_SNIPPET} try: project = ensure_project_open(PROJECT_FILE_PATH) # Get name from object if possible, otherwise use path basename proj_name = "Unknown" try: if project: proj_name = project.get_name() or os.path.basename(PROJECT_FILE_PATH) else: proj_name = os.path.basename(PROJECT_FILE_PATH) + " (ensure_project_open returned None?)" except Exception: proj_name = os.path.basename(PROJECT_FILE_PATH) + " (name retrieval failed)" print("Project Opened: %s" % proj_name) print("SCRIPT_SUCCESS: Project opened successfully.") sys.exit(0) except Exception as e: error_message = "Error opening project %s: %s" % (PROJECT_FILE_PATH, e) print(error_message) traceback.print_exc() print("SCRIPT_ERROR: %s" % error_message); sys.exit(1) `; const CREATE_PROJECT_SCRIPT_TEMPLATE = ` import sys, scriptengine as script_engine, os, shutil, time, traceback # Placeholders TEMPLATE_PROJECT_PATH = r'{TEMPLATE_PROJECT_PATH}' # Path to Standard.project PROJECT_FILE_PATH = r'{PROJECT_FILE_PATH}' # Path for the new project (Target Path) try: print("DEBUG: Python script create_project (copy from template):") print("DEBUG: Template Source = %s" % TEMPLATE_PROJECT_PATH) print("DEBUG: Target Path = %s" % PROJECT_FILE_PATH) if not PROJECT_FILE_PATH: raise ValueError("Target project file path empty.") if not TEMPLATE_PROJECT_PATH: raise ValueError("Template project file path empty.") if not os.path.exists(TEMPLATE_PROJECT_PATH): raise IOError("Template project file not found: %s" % TEMPLATE_PROJECT_PATH) # 1. Copy the template project file to the new location target_dir = os.path.dirname(PROJECT_FILE_PATH) if not os.path.exists(target_dir): print("DEBUG: Creating target directory: %s" % target_dir); os.makedirs(target_dir) # Check if target file already exists if os.path.exists(PROJECT_FILE_PATH): print("WARN: Target project file already exists, overwriting: %s" % PROJECT_FILE_PATH) print("DEBUG: Copying '%s' to '%s'..." % (TEMPLATE_PROJECT_PATH, PROJECT_FILE_PATH)) shutil.copy2(TEMPLATE_PROJECT_PATH, PROJECT_FILE_PATH) # copy2 preserves metadata print("DEBUG: File copy complete.") # 2. Open the newly copied project file print("DEBUG: Opening the copied project: %s" % PROJECT_FILE_PATH) # Set flags for silent opening update_mode = script_engine.VersionUpdateFlags.NoUpdates | script_engine.VersionUpdateFlags.SilentMode # try: # update_mode = script_engine.VersionUpdateFlags.NoUpdates | script_engine.VersionUpdateFlags.SilentMode # except AttributeError: # print("WARN: VersionUpdateFlags not found, using integer flags for open (1 | 2 = 3).") # update_mode = 3 project = script_engine.projects.open(PROJECT_FILE_PATH, update_flags=update_mode) print("DEBUG: script_engine.projects.open returned: %s" % project) if project: print("DEBUG: Pausing briefly after open...") time.sleep(1.0) # Allow CODESYS to potentially initialize things try: print("DEBUG: Explicitly saving project after opening copy...") project.save(); print("DEBUG: Project save after opening copy succeeded.") except Exception as save_err: print("WARN: Explicit save after opening copy failed: %s" % save_err) # Decide if this is critical - maybe not, but good to know. print("Project Created from Template Copy at: %s" % PROJECT_FILE_PATH) print("SCRIPT_SUCCESS: Project copied from template and opened successfully.") sys.exit(0) else: error_message = "Failed to open project copy %s after copying template %s. projects.open returned None." % (PROJECT_FILE_PATH, TEMPLATE_PROJECT_PATH) print(error_message); print("SCRIPT_ERROR: %s" % error_message); sys.exit(1) except Exception as e: detailed_error = traceback.format_exc() error_message = "Error creating project '%s' from template '%s': %s\\n%s" % (PROJECT_FILE_PATH, TEMPLATE_PROJECT_PATH, e, detailed_error) print(error_message); print("SCRIPT_ERROR: Error copying/opening template: %s" % e); sys.exit(1) `; const SAVE_PROJECT_SCRIPT_TEMPLATE = ` import sys, scriptengine as script_engine, os, traceback ${ENSURE_PROJECT_OPEN_PYTHON_SNIPPET} try: primary_project = ensure_project_open(PROJECT_FILE_PATH) # Get name from object if possible, otherwise use path basename project_name = "Unknown" try: if primary_project: project_name = primary_project.get_name() or os.path.basename(PROJECT_FILE_PATH) else: project_name = os.path.basename(PROJECT_FILE_PATH) + " (ensure_project_open returned None?)" except Exception: project_name = os.path.basename(PROJECT_FILE_PATH) + " (name retrieval failed)" print("DEBUG: Saving project: %s (%s)" % (project_name, PROJECT_FILE_PATH)) primary_project.save() print("DEBUG: project.save() executed.") print("Project Saved: %s" % project_name) print("SCRIPT_SUCCESS: Project saved successfully.") sys.exit(0) except Exception as e: error_message = "Error saving project %s: %s" % (PROJECT_FILE_PATH, e) print(error_message) traceback.print_exc() print("SCRIPT_ERROR: %s" % error_message); sys.exit(1) `; const FIND_OBJECT_BY_PATH_PYTHON_SNIPPET = ` import traceback # --- Find object by path function --- def find_object_by_path_robust(start_node, full_path, target_type_name="object"): print("DEBUG: Finding %s by path: '%s'" % (target_type_name, full_path)) normalized_path = full_path.replace('\\\\', '/').strip('/') path_parts = normalized_path.split('/') if not path_parts: print("ERROR: Path is empty.") return None # Determine the actual starting node (project or application) project = start_node # Assume start_node is project initially if not hasattr(start_node, 'active_application') and hasattr(start_node, 'project'): # If start_node is not project but has project ref (e.g., an application), get the project try: project = start_node.project except Exception as proj_ref_err: print("WARN: Could not get project reference from start_node: %s" % proj_ref_err) # Proceed assuming start_node might be the project anyway or search fails # Try to get the application object robustly if we think we have the project app = None if hasattr(project, 'active_application'): try: app = project.active_application except Exception: pass # Ignore errors getting active app if not app: try: apps = project.find("Application", True) # Search recursively if apps: app = apps[0] except Exception: pass # Check if the first path part matches the application name app_name_lower = "" if app: try: app_name_lower = (app.get_name() or "application").lower() except Exception: app_name_lower = "application" # Fallback # Decide where to start the traversal current_obj = start_node # Default to the node passed in if hasattr(project, 'active_application'): # Only adjust if start_node was likely the project if app and path_parts[0].lower() == app_name_lower: print("DEBUG: Path starts with Application name '%s'. Beginning search there." % path_parts[0]) current_obj = app path_parts = path_parts[1:] # Consume the app name part # If path was *only* the application name if not path_parts: print("DEBUG: Target path is the Application object itself.") return current_obj else: print("DEBUG: Path does not start with Application name. Starting search from project root.") current_obj = project # Start search from the project root else: print("DEBUG: Starting search from originally provided node.") # Traverse the remaining path parts parent_path_str = getattr(current_obj, 'get_name', lambda: str(current_obj))() # Safer name getting for i, part_name in enumerate(path_parts): is_last_part = (i == len(path_parts) - 1) print("DEBUG: Searching for part [%d/%d]: '%s' under '%s'" % (i+1, len(path_parts), part_name, parent_path_str)) found_in_parent = None try: # Prioritize non-recursive find for direct children children_of_current = current_obj.get_children(False) print("DEBUG: Found %d direct children under '%s'." % (len(children_of_current), parent_path_str)) for child in children_of_current: child_name = getattr(child, 'get_name', lambda: None)() # Safer name getting # print("DEBUG: Checking child: '%s'" % child_name) # Verbose if child_name == part_name: found_in_parent = child print("DEBUG: Found direct child matching '%s'." % part_name) break # Found direct child, stop searching children # If not found directly, AND it's the last part, try recursive find from current parent if not found_in_parent and is_last_part: print("DEBUG: Direct find failed for last part '%s'. Trying recursive find under '%s'." % (part_name, parent_path_str)) found_recursive_list = current_obj.find(part_name, True) # Recursive find if found_recursive_list: # Maybe add a check here if multiple are found? found_in_parent = found_recursive_list[0] # Take the first match print("DEBUG: Found last part '%s' recursively." % part_name) else: print("DEBUG: Recursive find also failed for last part '%s'." % part_name) # Update current object if found if found_in_parent: current_obj = found_in_parent parent_path_str = getattr(current_obj, 'get_name', lambda: part_name)() # Safer name getting print("DEBUG: Stepped into '%s'." % parent_path_str) else: # If not found at any point, the path is invalid from this parent print("ERROR: Path part '%s' not found under '%s'." % (part_name, parent_path_str)) return None # Path broken except Exception as find_err: print("ERROR: Exception while searching for '%s' under '%s': %s" % (part_name, parent_path_str, find_err)) traceback.print_exc() return None # Error during search # Final verification (optional but recommended): Check if the found object's name matches the last part final_expected_name = full_path.split('/')[-1] found_final_name = getattr(current_obj, 'get_name', lambda: None)() # Safer name getting if found_final_name == final_expected_name: print("DEBUG: Final %s found and name verified for path '%s': %s" % (target_type_name, full_path, found_final_name)) return current_obj else: print("ERROR: Traversal ended on object '%s' but expected final name was '%s'." % (found_final_name, final_expected_name)) return None # Name mismatch implies target not found as expected # --- End of find object function --- `; const CREATE_POU_SCRIPT_TEMPLATE = ` import sys, scriptengine as script_engine, os, traceback ${ENSURE_PROJECT_OPEN_PYTHON_SNIPPET} ${FIND_OBJECT_BY_PATH_PYTHON_SNIPPET} POU_NAME = "{POU_NAME}"; POU_TYPE_STR = "{POU_TYPE_STR}"; IMPL_LANGUAGE_STR = "{IMPL_LANGUAGE_STR}"; PARENT_PATH_REL = "{PARENT_PATH}" pou_type_map = { "Program": script_engine.PouType.Program, "FunctionBlock": script_engine.PouType.FunctionBlock, "Function": script_engine.PouType.Function } # Map common language names to ImplementationLanguages attributes if needed (optional, None usually works) # lang_map = { "ST": script_engine.ImplementationLanguage.st, ... } try: print("DEBUG: create_pou script: Name='%s', Type='%s', Lang='%s', ParentPath='%s', Project='%s'" % (POU_NAME, POU_TYPE_STR, IMPL_LANGUAGE_STR, PARENT_PATH_REL, PROJECT_FILE_PATH)) primary_project = ensure_project_open(PROJECT_FILE_PATH) if not POU_NAME: raise ValueError("POU name empty.") if not PARENT_PATH_REL: raise ValueError("Parent path empty.") # Resolve POU Type Enum pou_type_enum = pou_type_map.get(POU_TYPE_STR) if not pou_type_enum: raise ValueError("Invalid POU type string: %s. Use Program, FunctionBlock, or Function." % POU_TYPE_STR) # Find parent object using the robust function parent_object = find_object_by_path_robust(primary_project, PARENT_PATH_REL, "parent container") if not parent_object: raise ValueError("Parent object not found for path: %s" % PARENT_PATH_REL) parent_name = getattr(parent_object, 'get_name', lambda: str(parent_object))() print("DEBUG: Using parent object: %s (Type: %s)" % (parent_name, type(parent_object).__name__)) # Check if parent object supports creating POUs (should implement ScriptIecLanguageObjectContainer) if not hasattr(parent_object, 'create_pou'): raise TypeError("Parent object '%s' of type %s does not support create_pou." % (parent_name, type(parent_object).__name__)) # Set language GUID to None (let CODESYS default based on parent/settings) lang_guid = None print("DEBUG: Setting language to None (will use default).") # Example if mapping language string: lang_guid = lang_map.get(IMPL_LANGUAGE_STR, None) print("DEBUG: Calling parent_object.create_pou: Name='%s', Type=%s, Lang=%s" % (POU_NAME, pou_type_enum, lang_guid)) # Call create_pou using keyword arguments new_pou = parent_object.create_pou( name=POU_NAME, type=pou_type_enum, language=lang_guid # Pass None ) print("DEBUG: parent_object.create_pou returned: %s" % new_pou) if new_pou: new_pou_name = getattr(new_pou, 'get_name', lambda: POU_NAME)() print("DEBUG: POU object created: %s" % new_pou_name) # --- SAVE THE PROJECT TO PERSIST THE NEW POU --- try: print("DEBUG: Saving Project...") primary_project.save() # Save the overall project file print("DEBUG: Project saved successfully after POU creation.") except Exception as save_err: print("ERROR: Failed to save Project after POU creation: %s" % save_err) detailed_error = traceback.format_exc() error_message = "Error saving Project after creating POU '%s': %s\\n%s" % (new_pou_name, save_err, detailed_error) print(error_message); print("SCRIPT_ERROR: %s" % error_message); sys.exit(1) # --- END SAVING --- print("POU Created: %s" % new_pou_name); print("Type: %s" % POU_TYPE_STR); print("Language: %s (Defaulted)" % IMPL_LANGUAGE_STR); print("Parent Path: %s" % PARENT_PATH_REL) print("SCRIPT_SUCCESS: POU created successfully."); sys.exit(0) else: error_message = "Failed to create POU '%s'. create_pou returned None." % POU_NAME; print(error_message); print("SCRIPT_ERROR: %s" % error_message); sys.exit(1) except Exception as e: detailed_error = traceback.format_exc() error_message = "Error creating POU '%s' in project '%s': %s\\n%s" % (POU_NAME, PROJECT_FILE_PATH, e, detailed_error) print(error_message); print("SCRIPT_ERROR: Error creating POU '%s': %s" % (POU_NAME, e)); sys.exit(1) `; const SET_POU_CODE_SCRIPT_TEMPLATE = ` import sys, scriptengine as script_engine, os, traceback ${ENSURE_PROJECT_OPEN_PYTHON_SNIPPET} ${FIND_OBJECT_BY_PATH_PYTHON_SNIPPET} POU_FULL_PATH = "{POU_FULL_PATH}" # Expecting format like "Application/MyPOU" or "Folder/SubFolder/MyPOU" DECLARATION_CONTENT = """{DECLARATION_CONTENT}""" IMPLEMENTATION_CONTENT = """{IMPLEMENTATION_CONTENT}""" try: print("DEBUG: set_pou_code script: POU_FULL_PATH='%s', Project='%s'" % (POU_FULL_PATH, PROJECT_FILE_PATH)) primary_project = ensure_project_open(PROJECT_FILE_PATH) if not POU_FULL_PATH: raise ValueError("POU full path empty.") # Find the target POU/Method/Property object target_object = find_object_by_path_robust(primary_project, POU_FULL_PATH, "target object") if not target_object: raise ValueError("Target object not found using path: %s" % POU_FULL_PATH) target_name = getattr(target_object, 'get_name', lambda: POU_FULL_PATH)() print("DEBUG: Found target object: %s" % target_name) # --- Set Declaration Part --- declaration_updated = False # Check if the content is actually provided (might be None/empty if only impl is set) has_declaration_content = 'DECLARATION_CONTENT' in locals() or 'DECLARATION_CONTENT' in globals() if has_declaration_content and DECLARATION_CONTENT is not None: # Check not None if hasattr(target_object, 'textual_declaration'): decl_obj = target_object.textual_declaration if decl_obj and hasattr(decl_obj, 'replace'): try: print("DEBUG: Accessing textual_declaration...") decl_obj.replace(DECLARATION_CONTENT) print("DEBUG: Set declaration text using replace().") declaration_updated = True except Exception as decl_err: print("ERROR: Failed to set declaration text: %s" % decl_err) traceback.print_exc() # Print stack trace for detailed error else: print("WARN: Target '%s' textual_declaration attribute is None or does not have replace(). Skipping declaration update." % target_name) else: print("WARN: Target '%s' does not have textual_declaration attribute. Skipping declaration update." % target_name) else: print("DEBUG: Declaration content not provided or is None. Skipping declaration update.") # --- Set Implementation Part --- implementation_updated = False has_implementation_content = 'IMPLEMENTATION_CONTENT' in locals() or 'IMPLEMENTATION_CONTENT' in globals() if has_implementation_content and IMPLEMENTATION_CONTENT is not None: # Check not None if hasattr(target_object, 'textual_implementation'): impl_obj = target_object.textual_implementation if impl_obj and hasattr(impl_obj, 'replace'): try: print("DEBUG: Accessing textual_implementation...") impl_obj.replace(IMPLEMENTATION_CONTENT) print("DEBUG: Set implementation text using replace().") implementation_updated = True except Exception as impl_err: print("ERROR: Failed to set implementation text: %s" % impl_err) traceback.print_exc() # Print stack trace for detailed error else: print("WARN: Target '%s' textual_implementation attribute is None or does not have replace(). Skipping implementation update." % target_name) else: print("WARN: Target '%s' does not have textual_implementation attribute. Skipping implementation update." % target_name) else: print("DEBUG: Implementation content not provided or is None. Skipping implementation update.") # --- SAVE THE PROJECT TO PERSIST THE CODE CHANGE --- # Only save if something was actually updated to avoid unnecessary saves if declaration_updated or implementation_updated: try: print("DEBUG: Saving Project (after code change)...") primary_project.save() # Save the overall project file print("DEBUG: Project saved successfully after code change.") except Exception as save_err: print("ERROR: Failed to save Project after setting code: %s" % save_err) detailed_error = traceback.format_exc() error_message = "Error saving Project after code change for '%s': %s\\n%s" % (target_name, save_err, detailed_error) print(error_message); print("SCRIPT_ERROR: %s" % error_message); sys.exit(1) else: print("DEBUG: No code parts were updated, skipping project save.") # --- END SAVING --- print("Code Set For: %s" % target_name) print("Path: %s" % POU_FULL_PATH) print("SCRIPT_SUCCESS: Declaration and/or implementation set successfully."); sys.exit(0) except Exception as e: detailed_error = traceback.format_exc() error_message = "Error setting code for object '%s' in project '%s': %s\\n%s" % (POU_FULL_PATH, PROJECT_FILE_PATH, e, detailed_error) print(error_message); print("SCRIPT_ERROR: %s" % error_message); sys.exit(1) `; const CREATE_PROPERTY_SCRIPT_TEMPLATE = ` import sys, scriptengine as script_engine, os, traceback ${ENSURE_PROJECT_OPEN_PYTHON_SNIPPET} ${FIND_OBJECT_BY_PATH_PYTHON_SNIPPET} PARENT_POU_FULL_PATH = "{PARENT_POU_FULL_PATH}" # e.g., "Application/MyFB" PROPERTY_NAME = "{PROPERTY_NAME}" PROPERTY_TYPE = "{PROPERTY_TYPE}" # Optional: Language for Getter/Setter (usually defaults to ST) # LANG_GUID_STR = "{LANG_GUID_STR}" # Example if needed try: print("DEBUG: create_property script: ParentPOU='%s', Name='%s', Type='%s', Project='%s'" % (PARENT_POU_FULL_PATH, PROPERTY_NAME, PROPERTY_TYPE, PROJECT_FILE_PATH)) primary_project = ensure_project_open(PROJECT_FILE_PATH) if not PARENT_POU_FULL_PATH: raise ValueError("Parent POU full path empty.") if not PROPERTY_NAME: raise ValueError("Property name empty.") if not PROPERTY_TYPE: raise ValueError("Property type empty.") # Find the parent POU object parent_pou_object = find_object_by_path_robust(primary_project, PARENT_POU_FULL_PATH, "parent POU") if not parent_pou_object: raise ValueError("Parent POU object not found: %s" % PARENT_POU_FULL_PATH) parent_pou_name = getattr(parent_pou_object, 'get_name', lambda: PARENT_POU_FULL_PATH)() print("DEBUG: Found Parent POU object: %s" % parent_pou_name) # Check if parent object supports creating properties (should implement ScriptIecLanguageMemberContainer) if not hasattr(parent_pou_object, 'create_property'): raise TypeError("Parent object '%s' of type %s does not support create_property." % (parent_pou_name, type(parent_pou_object).__name__)) # Default language to None (usually ST) lang_guid = None print("DEBUG: Calling create_property: Name='%s', Type='%s', Lang=%s" % (PROPERTY_NAME, PROPERTY_TYPE, lang_guid)) # Call the create_property method ON THE PARENT POU new_property_object = parent_pou_object.create_property( name=PROPERTY_NAME, return_type=PROPERTY_TYPE, language=lang_guid # Pass None to use default ) if new_property_object: new_prop_name = getattr(new_property_object, 'get_name', lambda: PROPERTY_NAME)() print("DEBUG: Property object created: %s" % new_prop_name) # --- SAVE THE PROJECT TO PERSIST THE NEW PROPERTY OBJECT --- try: print("DEBUG: Saving Project (after property creation)...") primary_project.save() print("DEBUG: Project saved successfully after property creation.") except Exception as save_err: print("ERROR: Failed to save Project after creating property: %s" % save_err) detailed_error = traceback.format_exc() error_message = "Error saving Project after creating property '%s': %s\\n%s" % (PROPERTY_NAME, save_err, detailed_error) print(error_message); print("SCRIPT_ERROR: %s" % error_message); sys.exit(1) # --- END SAVING --- print("Property Created: %s" % new_prop_name) print("Parent POU: %s" % PARENT_POU_FULL_PATH) print("Type: %s" % PROPERTY_TYPE) print("SCRIPT_SUCCESS: Property created successfully."); sys.exit(0) else: error_message = "Failed to create property '%s' under '%s'. create_property returned None." % (PROPERTY_NAME, parent_pou_name) print(error_message); print("SCRIPT_ERROR: %s" % error_message); sys.exit(1) except Exception as e: detailed_error = traceback.format_exc() error_message = "Error creating property '%s' under POU '%s' in project '%s': %s\\n%s" % (PROPERTY_NAME, PARENT_POU_FULL_PATH, PROJECT_FILE_PATH, e, detailed_error) print(error_message); print("SCRIPT_ERROR: %s" % error_message); sys.exit(1) `; const CREATE_METHOD_SCRIPT_TEMPLATE = ` import sys, scriptengine as script_engine, os, traceback ${ENSURE_PROJECT_OPEN_PYTHON_SNIPPET} ${FIND_OBJECT_BY_PATH_PYTHON_SNIPPET} PARENT_POU_FULL_PATH = "{PARENT_POU_FULL_PATH}" # e.g., "Application/MyFB" METHOD_NAME = "{METHOD_NAME}" RETURN_TYPE = "{RETURN_TYPE}" # Can be empty string for no return type # Optional: Language # LANG_GUID_STR = "{LANG_GUID_STR}" # Example if needed try: print("DEBUG: create_method script: ParentPOU='%s', Name='%s', ReturnType='%s', Project='%s'" % (PARENT_POU_FULL_PATH, METHOD_NAME, RETURN_TYPE, PROJECT_FILE_PATH)) primary_project = ensure_project_open(PROJECT_FILE_PATH) if not PARENT_POU_FULL_PATH: raise ValueError("Parent POU full path empty.") if not METHOD_NAME: raise ValueError("Method name empty.") # RETURN_TYPE can be empty # Find the parent POU object parent_pou_object = find_object_by_path_robust(primary_project, PARENT_POU_FULL_PATH, "parent POU") if not parent_pou_object: raise ValueError("Parent POU object not found: %s" % PARENT_POU_FULL_PATH) parent_pou_name = getattr(parent_pou_object, 'get_name', lambda: PARENT_POU_FULL_PATH)() print("DEBUG: Found Parent POU object: %s" % parent_pou_name) # Check if parent object supports creating methods (should implement ScriptIecLanguageMemberContainer) if not hasattr(parent_pou_object, 'create_method'): raise TypeError("Parent object '%s' of type %s does not support create_method." % (parent_pou_name, type(parent_pou_object).__name__)) # Default language to None (usually ST) lang_guid = None # Use None if RETURN_TYPE is empty string, otherwise use the string actual_return_type = RETURN_TYPE if RETURN_TYPE else None print("DEBUG: Calling create_method: Name='%s', ReturnType=%s, Lang=%s" % (METHOD_NAME, actual_return_type, lang_guid)) # Call the create_method method ON THE PARENT POU new_method_object = parent_pou_object.create_method( name=METHOD_NAME, return_type=actual_return_type, language=lang_guid # Pass None to use default ) if new_method_object: new_meth_name = getattr(new_method_object, 'get_name', lambda: METHOD_NAME)() print("DEBUG: Method object created: %s" % new_meth_name) # --- SAVE THE PROJECT TO PERSIST THE NEW METHOD OBJECT --- try: print("DEBUG: Saving Project (after method creation)...") primary_project.save() print("DEBUG: Project saved successfully after method creation.") except Exception as save_err: print("ERROR: Failed to save Project after creating method: %s" % save_err) detailed_error = traceback.format_exc() error_message = "Error saving Project after creating method '%s': %s\\n%s" % (METHOD_NAME, save_err, detailed_error) print(error_message); print("SCRIPT_ERROR: %s" % error_message); sys.exit(1) # --- END SAVING --- print("Method Created: %s" % new_meth_name) print("Parent POU: %s" % PARENT_POU_FULL_PATH) print("Return Type: %s" % (RETURN_TYPE if RETURN_TYPE else "(None)")) print("SCRIPT_SUCCESS: Method created successfully."); sys.exit(0) else: error_message = "Failed to create method '%s' under '%s'. create_method returned None." % (METHOD_NAME, parent_pou_name) print(error_message); print("SCRIPT_ERROR: %s" % error_message); sys.exit(1) except Exception as e: detailed_error = traceback.format_exc() error_message = "Error creating method '%s' under POU '%s' in project '%s': %s\\n%s" % (METHOD_NAME, PARENT_POU_FULL_PATH, PROJECT_FILE_PATH, e, detailed_error) print(error_message); print("SCRIPT_ERROR: %s" % error_message); sys.exit(1) `; const COMPILE_PROJECT_SCRIPT_TEMPLATE = ` import sys, scriptengine as script_engine, os, traceback ${ENSURE_PROJECT_OPEN_PYTHON_SNIPPET} try: print("DEBUG: compile_project script: Project='%s'" % PROJECT_FILE_PATH) primary_project = ensure_project_open(PROJECT_FILE_PATH) project_name = os.path.basename(PROJECT_FILE_PATH) target_app = None app_name = "N/A" # Try getting active application first try: target_app = primary_project.active_application if target_app: app_name = getattr(target_app, 'get_name', lambda: "Unnamed App (Active)")() print("DEBUG: Found active application: %s" % app_name) except Exception as active_err: print("WARN: Could not get active application: %s. Searching..." % active_err) # If no active app, search for the first one if not target_app: print("DEBUG: Searching for first compilable application...") apps = [] try: # Search recursively through all project objects all_children = primary_project.get_children(True) for child in all_children: # Check using the marker property and if build method exists if hasattr(child, 'is_application') and child.is_application and hasattr(child, 'build'): app_name_found = getattr(child, 'get_name', lambda: "Unnamed App")() print("DEBUG: Found potential application object: %s" % app_name_found) apps.append(child) break # Take the first one found except Exception as find_err: print("WARN: Error finding application object: %s" % find_err) if not apps: raise RuntimeError("No compilable application found in project '%s'" % project_name) target_app = apps[0] app_name = getattr(target_app, 'get_name', lambda: "Unnamed App (First Found)")() print("WARN: Compiling first found application: %s" % app_name) print("DEBUG: Calling build() on app '%s'..." % app_name) if not hasattr(target_app, 'build'): raise TypeError("Selected object '%s' is not an application or doesn't support build()." % app_name) # Execute the build target_app.build(); print("DEBUG: Build command executed for application '%s'." % app_name) # Check messages is harder without direct access to message store from script. # Rely on CODESYS UI or log output for now. print("Compile Initiated For Application: %s" % app_name); print("In Project: %s" % project_name) print("SCRIPT_SUCCESS: Application compilation initiated."); sys.exit(0) except Exception as e: detailed_error = traceback.format_exc() error_message = "Error initiating compilation for project %s: %s\\n%s" % (PROJECT_FILE_PATH, e, detailed_error) print(error_message); print("SCRIPT_ERROR: %s" % error_message); sys.exit(1) `; const GET_PROJECT_STRUCTURE_SCRIPT_TEMPLATE = ` import sys, scriptengine as script_engine, os, traceback ${ENSURE_PROJECT_OPEN_PYTHON_SNIPPET} # FIND_OBJECT_BY_PATH_PYTHON_SNIPPET is NOT needed here, we start from project root. def get_object_structure(obj, indent=0, max_depth=10): # Add max_depth lines = []; indent_str = " " * indent if indent > max_depth: lines.append("%s- Max recursion depth reached." % indent_str) return lines try: name = "Unnamed"; obj_type = type(obj).__name__ guid_str = "" folder_str = "" try: name = getattr(obj, 'get_name', lambda: "Unnamed")() or "Unnamed" # Safer get_name if hasattr(obj, 'guid'): guid_str = " {%s}" % obj.guid if hasattr(obj, 'is_folder') and obj.is_folder: folder_str = " [Folder]" except Exception as name_err: print("WARN: Error getting name/guid/folder status for an object: %s" % name_err) name = "!!! Error Getting Name !!!" lines.append("%s- %s (%s)%s%s" % (indent_str, name, obj_type, folder_str, guid_str)) # Get children only if the object potentially has them children = [] can_have_children = hasattr(obj, 'get_children') and ( not hasattr(obj, 'is_folder') or # If it's not clear if it's a folder (e.g., project root) (hasattr(obj, 'is_folder') and obj.is_folder) or # If it is a folder # Add known container types explicitly, check marker interfaces too hasattr(obj, 'is_project') or hasattr(obj, 'is_application') or hasattr(obj, 'is_device') or hasattr(obj,'is_pou') ) if can_have_children: try: children = obj.get_children(False) # print("DEBUG: %s has %d children" % (name, len(children))) # Verbose except Exception as get_child_err: lines.append("%s ERROR getting children: %s" % (indent_str, get_child_err)) # traceback.print_exc() # Optional for child in children: lines.extend(get_object_structure(child, indent + 1, max_depth)) # Recurse except Exception as e: lines.append("%s- Error processing node: %s" % (indent_str, e)) traceback.print_exc() # Print detailed error for this node return lines try: print("DEBUG: Getting structure for: %s" % PROJECT_FILE_PATH) primary_project = ensure_project_open(PROJECT_FILE_PATH) project_name = os.path.basename(PROJECT_FILE_PATH) print("DEBUG: Getting structure for project: %s" % project_name) # Use the project object obtained from ensure_project_open structure_list = get_object_structure(primary_project, max_depth=15) # Set a reasonable depth structure_output = "\\n".join(structure_list) # Ensure markers are printed distinctly print("\\n--- PROJECT STRUCTURE START ---") print(structure_output) print("--- PROJECT STRUCTURE END ---\\n") print("SCRIPT_SUCCESS: Project structure retrieved."); sys.exit(0) except Exception as e: detailed_error = traceback.format_exc() error_message = "Error getting structure for %s: %s\\n%s" % (PROJECT_FILE_PATH, e, detailed_error) print(error_message); print("SCRIPT_ERROR: %s" % error_message); sys.exit(1) `; const GET_POU_CODE_SCRIPT_TEMPLATE = ` import sys, scriptengine as script_engine, os, traceback ${ENSURE_PROJECT_OPEN_PYTHON_SNIPPET} ${FIND_OBJECT_BY_PATH_PYTHON_SNIPPET} POU_FULL_PATH = "{POU_FULL_PATH}"; CODE_START_MARKER = "### POU CODE START ###"; CODE_END_MARKER = "### POU CODE END ###" DECL_START_MARKER = "### POU DECLARATION START ###"; DECL_END_MARKER = "### POU DECLARATION END ###" IMPL_START_MARKER = "### POU IMPLEMENTATION START ###"; IMPL_END_MARKER = "### POU IMPLEMENTATION END ###" try: print("DEBUG: Getting code: POU_FULL_PATH='%s', Project='%s'" % (POU_FULL_PATH, PROJECT_FILE_PATH)) primary_project = ensure_project_open(PROJECT_FILE_PATH) if not POU_FULL_PATH: raise ValueError("POU full path empty.") # Find the target POU/Method/Property object target_object = find_object_by_path_robust(primary_project, POU_FULL_PATH, "target object") if not target_object: raise ValueError("Target object not found using path: %s" % POU_FULL_PATH) target_name = getattr(target_object, 'get_name', lambda: POU_FULL_PATH)() print("DEBUG: Found target object: %s" % target_name) declaration_code = ""; implementation_code = "" # --- Get Declaration Part --- if hasattr(target_object, 'textual_declaration'): decl_obj = target_object.textual_declaration if decl_obj and hasattr(decl_obj, 'text'): try: declaration_code = decl_obj.text print("DEBUG: Got declaration text.") except Exception as decl_read_err: print("ERROR: Failed to read declaration text: %s" % decl_read_err) declaration_code = "/* ERROR reading declaration: %s */" % decl_read_err else: print("WARN: textual_declaration exists but is None or has no 'text' attribute.") else: print("WARN: No textual_declaration attribute.") # --- Get Implementation Part --- if hasattr(target_object, 'textual_implementation'): impl_obj = target_object.textual_implementation if impl_obj and hasattr(impl_obj, 'text'): try: implementation_code = impl_obj.text print("DEBUG: Got implementation text.") except Exception as impl_read_err: print("ERROR: Failed to read implementation text: %s" % impl_read_err) implementation_code = "/* ERROR reading implementation: %s */" % impl_read_err else: print("WARN: textual_implementation exists but is None or has no 'text' attribute.") else: print("WARN: No textual_implementation attribute.") print("Code retrieved for: %s" % target_name) # Print declaration between markers, ensuring markers are on separate lines print("\\n" + DECL_START_MARKER) print(declaration_code) print(DECL_END_MARKER + "\\n") # Print implementation between markers print(IMPL_START_MARKER) print(implementation_code) print(IMPL_END_MARKER + "\\n") # --- LEGACY MARKERS for backward compatibility if needed --- # Combine both for old marker format, adding a separator line # legacy_combined_code = declaration_code + "\\n\\n// Implementation\\n" + implementation_code # print(CODE_START_MARKER); print(legacy_combined_code); print(CODE_END_MARKER) # --- END LEGACY --- print("SCRIPT_SUCCESS: Code retrieved."); sys.exit(0) except Exception as e: detailed_error = traceback.format_exc() error_message = "Error getting code for object '%s' in project '%s': %s\\n%s" % (POU_FULL_PATH, PROJECT_FILE_PATH, e, detailed_error) print(error_message); print("SCRIPT_ERROR: %s" % error_message); sys.exit(1) `; // --- End Python Script Templates --- // --- Zod Schemas (moved for clarity before usage) --- const PouTypeEnum = z.enum(["Program", "FunctionBlock", "Function"]); const ImplementationLanguageEnum = z.enum(["ST", "LD", "FBD", "SFC", "IL", "CFC", "StructuredText", "LadderDiagram", "FunctionBlockDiagram", "SequentialFunctionChart", "InstructionList", "ContinuousFunctionChart"]); // --- End Zod Schemas --- // --- MCP Resources / Tools Definitions --- console.error("SERVER.TS: Defining Resources and Tools..."); // --- Resources --- server.resource("project-status", "codesys://project/status", async (uri) => { console.error(`SERVER.TS Resource request: ${uri.href}`); try { const result = await executeCodesysScript(CHECK_STATUS_SCRIPT, codesysExePath, codesysProfileName); const outputLines = result.output.split(/[\r\n]+/).filter(line => line.trim()); const statusData: { [key: string]: string } = {}; outputLines.forEach(line => { const match = line.match(/^([^:]+):\s*(.*)$/); if (match) { statusData[match[1].trim()] = match[2].trim(); }}); const statusText = `CODESYS Status:\n - Scripting OK: ${statusData['Scripting OK'] ?? 'Unknown'}\n - Project Open: ${statusData['Project Open'] ?? 'Unknown'}\n - Project Name: ${statusData['Project Name'] ?? 'Unknown'}\n - Project Path: ${statusData['Project Path'] ?? 'N/A'}`; const isError = !result.success || statusData['Scripting OK']?.toLowerCase() !== 'true'; // **** RETURN MCP STRUCTURE **** return { contents: [{ uri: uri.href, text: statusText, contentType: "text/plain" }], isError: isError }; } catch (error: any) { console.error(`Error resource ${uri.href}:`, error); // **** RETURN MCP STRUCTURE **** return { contents: [{ uri: uri.href, text: `Failed status script: ${error.message}`, contentType: "text/plain" }], isError: true }; } }); // *** DEFINE TEMPLATES *** const projectStructureTemplate = new ResourceTemplate("codesys://project/{+project_path}/structure", { list: undefined }); const pouCodeTemplate = new ResourceTemplate("codesys://project/{+project_path}/pou/{+pou_path}/code", { list: undefined }); // *** END DEFINE TEMPLATES *** server.resource("project-structure", projectStructureTemplate, async (uri, params) => { // *** DEFINE VARIABLES (like projectPath) *** const projectPathParam = params.project_path; if (typeof projectPathParam !== 'string') { return { contents: [{ uri: uri.href, text: `Error: Invalid project path type (${typeof projectPathParam}).`, contentType: "text/plain" }], isError: true }; } const projectPath: string = projectPathParam; // Define projectPath if (!projectPath) { return { contents: [{ uri: uri.href, text: "Error: Project path missing.", contentType: "text/plain" }], isError: true }; } // *** END DEFINE VARIABLES *** console.error(`Resource request: project structure for ${projectPath}`); try { const absoluteProjPath = path.normalize(path.isAbsolute(projectPath) ? projectPath : path.join(WORKSPACE_DIR, projectPath)); const escapedPathForPython = absoluteProjPath.replace(/\\/g, '\\\\'); // *** DEFINE scriptContent *** const scriptContent = GET_PROJECT_STRUCTURE_SCRIPT_TEMPLATE.replace("{PROJECT_FILE_PATH}", escapedPathForPython); // *** END DEFINE scriptContent *** const result = await executeCodesysScript(scriptContent, codesysExePath, codesysProfileName); let structureText = `Error retrieving structure for ${absoluteProjPath}.\n\n${result.output}`; let isError = !result.success; if (result.success && result.output.includes("SCRIPT_SUCCESS")) { const startMarker = "--- PROJECT STRUCTURE START ---"; const endMarker = "--- PROJECT STRUCTURE END ---"; const startIndex = result.output.indexOf(startMarker); const endIndex = result.output.indexOf(endMarker); if (startIndex !== -1 && endIndex !== -1 && startIndex < endIndex) { structureText = result.output.substring(startIndex + startMarker.length, endIndex).replace(/\\n/g, '\n').trim(); } else { console.error("Error: Could not find structure markers in script output."); structureText = `Could not parse structure markers in output for ${absoluteProjPath}.\n\nOutput:\n${result.output}`; isError = true; } } else { isError = true; } // **** RETURN MCP STRUCTURE **** return { contents: [{ uri: uri.href, text: structureText, contentType: "text/plain" }], isError: isError }; } catch (error: any) { console.error(`Error getting structure ${uri.href}:`, error); // **** RETURN MCP STRUCTURE **** return { contents: [{ uri: uri.href, text: `Failed structure script for '${projectPath}': ${error.message}`, contentType: "text/plain" }], isError: true }; } }); server.resource("pou-code", pouCodeTemplate, async (uri, params) => { // *** DEFINE VARIABLES (like projectPath, pouPath) *** const projectPathParam = params.project_path; const pouPathParam = params.pou_path; if (typeof projectPathParam !== 'string' || typeof pouPathParam !== 'string') { return { contents: [{ uri: uri.href, text: "Error: Invalid project or POU path type.", contentType: "text/plain" }], isError: true }; } const projectPath: string = projectPathParam; // Define projectPath const pouPath: string = pouPathParam; // Define pouPath if (!projectPath || !pouPath) { return { contents: [{ uri: uri.href, text: "Error: Project or POU path missing.", contentType: "text/plain" }], isError: true }; } // *** END DEFINE VARIABLES *** console.error(`Resource request: POU code: Project='${projectPath}', POU='${pouPath}'`); try { const absoluteProjPath = path.normalize(path.isAbsolute(projectPath) ? projectPath : path.join(WORKSPACE_DIR, projectPath)); const sanitizedPouPath = String(pouPath).replace(/\\/g, '/').replace(/^\/+|\/+$/g, ''); const escapedProjPath = absoluteProjPath.replace(/\\/g, '\\\\'); // *** DEFINE scriptContent *** let scriptContent = GET_POU_CODE_SCRIPT_TEMPLATE.replace("{PROJECT_FILE_PATH}", escapedProjPath); scriptContent = scriptContent.replace("{POU_FULL_PATH}", sanitizedPouPath); // *** END DEFINE scriptContent *** const result = await executeCodesysScript(scriptContent, codesysExePath, codesysProfileName); let codeText = `Error retrieving code for object '${sanitizedPouPath}' in project '${absoluteProjPath}'.\n\n${result.output}`; let isError = !result.success; if (result.success && result.output.includes("SCRIPT_SUCCESS")) { // ... (marker parsing logic using new markers) ... const declStartMarker = "### POU DECLARATION START ###"; const declEndMarker = "### POU DECLARATION END ###"; const implStartMarker = "### POU IMPLEMENTATION START ###"; const implEndMarker = "### POU IMPLEMENTATION END ###"; const declStartIdx = result.output.indexOf(declStartMarker); const declEndIdx = result.output.indexOf(declEndMarker); const implStartIdx = result.output.indexOf(implStartMarker); const implEndIdx = result.output.indexOf(implEndMarker); let declaration = "/* Declaration not found in output */"; let implementation = "/* Implementation not found in output */"; if (declStartIdx !== -1 && declEndIdx !== -1 && declStartIdx < declEndIdx) { declaration = result.output.substring(declStartIdx + declStartMarker.length, declEndIdx).replace(/\\n/g, '\n').trim(); } else { console.error(`WARN: Declaration markers not found correctly for ${sanitizedPouPath}`); } if (implStartIdx !== -1 && implEndIdx !== -1 && implStartIdx < implEndIdx) { implementation = result.output.substring(implStartIdx + implStartMarker.length, implEndIdx).replace(/\\n/g, '\n').trim(); } else { console.error(`WARN: Implementation markers not found correctly for ${sanitizedPouPath}`); } codeText = `// ----- Declaration -----\n${declaration}\n\n// ----- Implementation -----\n${implementation}`; // *** END MARKER PARSING *** } else { isError = true; } // **** RETURN MCP STRUCTURE **** return { contents: [{ uri: uri.href, text: codeText, contentType: "text/plain" }], isError: isError }; } catch (error: any) { console.error(`Error getting POU code ${uri.href}:`, error); // **** RETURN MCP STRUCTURE **** return { contents: [{ uri: uri.href, text: `Failed POU code script for '${pouPath}' in '${projectPath}': ${error.message}`, contentType: "text/plain" }], isError: true }; } }); // --- End Resources --- // --- Tools --- server.tool("open_project", { filePath: z.string().describe("Path to the project file.") }, async (args) => { const { filePath } = args; let absPath = path.normalize(path.isAbsolute(filePath) ? filePath : path.join(WORKSPACE_DIR, filePath)); console.error(`Tool call: open_project: ${absPath}`); try { const escapedPath = absPath.replace(/\\/g, '\\\\'); // *** DEFINE script *** const script = OPEN_PROJECT_SCRIPT_TEMPLATE.replace("{PROJECT_FILE_PATH}", escapedPath); // *** END DEFINE script *** const result = await executeCodesysScript(script, codesysExePath, codesysProfileName); const success = result.success && result.output.includes("SCRIPT_SUCCESS"); // **** RETURN MCP STRUCTURE **** return { content: [{ type: "text", text: success ? `Project opened: ${absPath}` : `Failed open project ${absPath}. Output:\n${result.output}` }], isError: !success }; } catch (e: any) { console.error(`Error open_project ${absPath}: ${e}`); // **** RETURN MCP STRUCTURE **** return { content: [{ type: "text", text: `Error: ${e.message}` }], isError: true }; } }); server.tool("create_project", { filePath: z.string().describe("Path for new project.") }, async (args) => { const { filePath } = args; let absPath = path.normalize(path.isAbsolute(filePath) ? filePath : path.join(WORKSPACE_DIR, filePath)); console.error(`Tool call: create_project (copy template): ${absPath}`); let templatePath = ""; try { // Template finding logic (same as before) const baseDir = path.dirname(path.dirname(codesysExePath)); templatePath = path.normalize(path.join(baseDir, 'Templates', 'Standard.project')); if (!(await fileExists(templatePath))) { console.error(`WARN: Template not found relative to exe: ${templatePath}. Trying ProgramData...`); const programData = process.env.ALLUSERSPROFILE || process.env.ProgramData || 'C:\\ProgramData'; const possibleTemplateDir = path.join(programData, 'CODESYS', 'CODESYS', codesysProfileName, 'Templates'); let potentialTemplatePath = path.normalize(path.join(possibleTemplateDir, 'Standard.project')); if (await fileExists(potentialTemplatePath)) { templatePath = potentialTemplatePath; console.error(`DEBUG: Found template in ProgramData: ${templatePath}`); } else { const alternativeTemplateDir = path.join(programData, 'CODESYS', 'Templates'); potentialTemplatePath = path.normalize(path.join(alternativeTemplateDir, 'Standard.project')); if (await fileExists(potentialTemplatePath)) { templatePath = potentialTemplatePath; console.error(`DEBUG: Found template in ProgramData (alternative): ${templatePath}`); } else { throw new Error(`Standard template project file not found at relative path or ProgramData locations.`); } } } else { console.error(`DEBUG: Found template relative to exe: ${templatePath}`); } // *** END TEMPLATE FINDING *** } catch (e:any) { console.error(`Template Error: ${e.message}`); // **** RETURN MCP STRUCTURE **** return { content: [{ type: "text", text: `Template Error: ${e.message}` }], isError: true }; } try { const escProjPath = absPath.replace(/\\/g, '\\\\'); const escTmplPath = templatePath.replace(/\\/g, '\\\\'); // *** DEFINE script *** const script = CREATE_PROJECT_SCRIPT_TEMPLATE .replace("{PROJECT_FILE_PATH}", escProjPath) .replace("{TEMPLATE_PROJECT_PATH}", escTmplPath); // *** END DEFINE script *** console.error(">>> create_project (copy-then-open): PREPARED SCRIPT:", script.substring(0, 500) + "..."); const result = await executeCodesysScript(script, codesysExePath, codesysProfileName); console.error(">>> create_project (copy-then-open): EXECUTION RESULT:", JSON.stringify(result)); const success = result.success && result.output.includes("SCRIPT_SUCCESS"); // **** RETURN MCP STRUCTURE **** return { content: [{ type: "text", text: success ? `Project created from template: ${absPath}` : `Failed create project ${absPath} from template. Output:\n${result.output}` }], isError: !success }; } catch (e: any) { console.error(`Error create_project ${absPath}: ${e}`); // **** RETURN MCP STRUCTURE **** return { content: [{ type: "text", text: `Error: ${e.message}` }], isError: true }; } }); server.tool("save_project", { projectFilePath: z.string().describe("Path to project.") }, async (args) => { const { projectFilePath } = args; let absPath = path.normalize(path.isAbsolute(projectFilePath) ? projectFilePath : path.join(WORKSPACE_DIR, projectFilePath)); console.error(`Tool call: save_project: ${absPath}`); try { const escapedPath = absPath.replace(/\\/g, '\\\\'); // *** DEFINE script *** const script = SAVE_PROJECT_SCRIPT_TEMPLATE.replace("{PROJECT_FILE_PATH}", escapedPath); // *** END DEFINE script *** const result = await executeCodesysScript(script, codesysExePath, codesysProfileName); const success = result.success && result.output.includes("SCRIPT_SUCCESS"); // **** RETURN MCP STRUCTURE **** return { content: [{ type: "text", text: success ? `Project saved: ${absPath}` : `Failed save project ${absPath}. Output:\n${result.output}` }], isError: !success }; } catch (e:any) { console.error(`Error save_project ${absPath}: ${e}`); // **** RETURN MCP STRUCTURE **** return { content: [{ type: "text", text: `Error: ${e.message}` }], isError: true }; } }); server.tool("create_pou", { projectFilePath: z.string().describe("Path to project."), name: z.string().describe("Name for the new POU."), type: PouTypeEnum.describe("Type of POU (Program, FunctionBlock, Function)."), language: ImplementationLanguageEnum.describe("Implementation language (ST, LD, FBD, etc.). Will use default if not directly supported/mapped by script."), parentPath: z.string().describe("Relative path under project/application (e.g., 'Application' or 'MyFolder/SubFolder').") }, async (args) => { const { projectFilePath, name, type, language, parentPath } = args; let absPath = path.normalize(path.isAbsolute(projectFilePath) ? projectFilePath : path.join(WORKSPACE_DIR, projectFilePath)); const sanParentPath = parentPath.replace(/\\/g, '/').replace(/^\/+|\/+$/g, ''); const sanName = name.trim(); console.error(`Tool call: create_pou: Name='${sanName}', Type='${type}', Lang='${language}', Parent='${sanParentPath}', Project='${absPath}'`); try { const escProjPath = absPath.replace(/\\/g, '\\\\'); // *** DEFINE script *** let script = CREATE_POU_SCRIPT_TEMPLATE.replace("{PROJECT_FILE_PATH}", escProjPath); script = script.replace("{POU_NAME}", sanName); script = script.replace("{POU_TYPE_STR}", type); script = script.replace("{IMPL_LANGUAGE_STR}", language); script = script.replace("{PARENT_PATH}", sanParentPath); // *** END DEFINE script *** console.error(">>> create_pou: PREPARED SCRIPT:", script.substring(0,500)+"..."); const result = await executeCodesysScript(script, codesysExePath, codesysProfileName); console.error(">>> create_pou: EXECUTION RESULT:", JSON.stringify(result)); const success = result.success && result.output.includes("SCRIPT_SUCCESS"); // **** RETURN MCP STRUCTURE **** return { content: [{ type: "text", text: success ? `POU '${sanName}' created in '${sanParentPath}' of ${absPath}. Project saved.` : `Failed create POU '${sanName}'. Output:\n${result.output}` }], isError: !success }; } catch (e:any) { console.error(`Error create_pou ${sanName} in ${absPath}: ${e}`); // **** RETURN MCP STRUCTURE **** return { content: [{ type: "text", text: `Error: ${e.message}` }], isError: true }; } }); server.tool("set_pou_code", { projectFilePath: z.string().describe("Path to project."), pouPath: z.string().describe("Relative POU/Method/Property path (e.g., 'Application/MyPOU' or 'MyFolder/MyFB/MyMethod')."), declarationCode: z.string().describe("Code for the declaration part (VAR...END_VAR).").optional(), implementationCode: z.string().describe("Code for the implementation logic part.").optional() }, async (args) => { const { projectFilePath, pouPath, declarationCode, implementationCode } = args; if (declarationCode === undefined && implementationCode === undefined) { return { content: [{ type: "text", text: "Error: At least one of declarationCode or implementationCode must be provided." }], isError: true }; } let absPath = path.normalize(path.isAbsolute(projectFilePath) ? projectFilePath : path.join(WORKSPACE_DIR, projectFilePath)); const sanPouPath = pouPath.replace(/\\/g, '/').replace(/^\/+|\/+$/g, ''); console.error(`Tool call: set_pou_code: Target='${sanPouPath}', Project='${absPath}'`); try { const escProjPath = absPath.replace(/\\/g, '\\\\'); // Escape content for Python triple-quoted strings const sanDeclCode = (declarationCode ?? "").replace(/\\/g, '\\\\').replace(/"""/g, '\\"\\"\\"'); const sanImplCode = (implementationCode ?? "").replace(/\\/g, '\\\\').replace(/"""/g, '\\"\\"\\"'); // *** DEFINE script *** let script = SET_POU_CODE_SCRIPT_TEMPLATE.replace("{PROJECT_FILE_PATH}", escProjPath); script = script.replace("{POU_FULL_PATH}", sanPouPath); script = script.replace("{DECLARATION_CONTENT}", sanDeclCode); script = script.replace("{IMPLEMENTATION_CONTENT}", sanImplCode); // *** END DEFINE script *** console.error(">>> set_pou_code: PREPARED SCRIPT:", script.substring(0, 500) + "..."); const result = await executeCodesysScript(script, codesysExePath, codesysProfileName); console.error(">>> set_pou_code: EXECUTION RESULT:", JSON.stringify(result)); const success = result.success && result.output.includes("SCRIPT_SUCCESS"); // **** RETURN MCP STRUCTURE **** return { content: [{ type: "text", text: success ? `Code set for '${sanPouPath}' in ${absPath}. Project saved.` : `Failed set code for '${sanPouPath}'. Output:\n${result.output}` }], isError: !success }; } catch (e:any) { console.error(`Error set_pou_code ${sanPouPath} in ${absPath}: ${e}`); // **** RETURN MCP STRUCTURE **** return { content: [{ type: "text", text: `Error: ${e.message}` }], isError: true }; } }); server.tool("create_property", { projectFilePath: z.string().describe("Path to the project file."), parentPouPath: z.string().describe("Relative path to the parent POU (e.g., 'Application/MyFB')."), propertyName: z.string().describe("Name of the new property."), propertyType: z.string().describe("Data type of the property (e.g., 'BOOL', 'INT', 'MyDUT').") }, async (args) => { const { projectFilePath, parentPouPath, propertyName, propertyType } = args; let absPath = path.normalize(path.isAbsolute(projectFilePath) ? projectFilePath : path.join(WORKSPACE_DIR, projectFilePath)); const sanParentPath = parentPouPath.replace(/\\/g, '/').replace(/^\/+|\/+$/g, ''); const sanPropName = propertyName.trim(); const sanPropType = propertyType.trim(); console.error(`Tool call: create_property: Name='${sanPropName}', Type='${sanPropType}', ParentPOU='${sanParentPath}', Project='${absPath}'`); if (!sanPropName || !sanPropType) { // **** RETURN MCP STRUCTURE **** return { content: [{ type: "text", text: `Error: Property name and type cannot be empty.` }], isError: true }; } try { const escProjPath = absPath.replace(/\\/g, '\\\\'); // *** DEFINE script *** let script = CREATE_PROPERTY_SCRIPT_TEMPLATE.replace("{PROJECT_FILE_PATH}", escProjPath); script = script.replace("{PARENT_POU_FULL_PATH}", sanParentPath); script = script.replace("{PROPERTY_NAME}", sanPropName); script = script.replace("{PROPERTY_TYPE}", sanPropType); // *** END DEFINE script *** console.error(">>> create_property: PREPARED SCRIPT:", script.substring(0, 500) + "..."); const result = await executeCodesysScript(script, codesysExePath, codesysProfileName); console.error(">>> create_property: EXECUTION RESULT:", JSON.stringify(result)); const success = result.success && result.output.includes("SCRIPT_SUCCESS"); // **** RETURN MCP STRUCTURE **** return { content: [{ type: "text", text: success ? `Property '${sanPropName}' created under '${sanParentPath}' in ${absPath}. Project saved.` : `Failed to create property '${sanPropName}'. Output:\n${result.output}` }], isError: !success }; } catch (e: any) { console.error(`Error create_property ${sanPropName} in ${absPath}: ${e}`); // **** RETURN MCP STRUCTURE **** return { content: [{ type: "text", text: `Error: ${e.message}` }], isError: true }; } }); server.tool("create_method", { projectFilePath: z.string().describe("Path to the project file."), parentPouPath: z.string().describe("Relative path to the parent POU (e.g., 'Application/MyFB')."), methodName: z.string().describe("Name of the new method."), returnType: z.string().optional().describe("Return type (e.g., 'BOOL', 'INT'). Leave empty or omit for no return value."), }, async (args) => { const { projectFilePath, parentPouPath, methodName, returnType } = args; let absPath = path.normalize(path.isAbsolute(projectFilePath) ? projectFilePath : path.join(WORKSPACE_DIR, projectFilePath)); const sanParentPath = parentPouPath.replace(/\\/g, '/').replace(/^\/+|\/+$/g, ''); const sanMethName = methodName.trim(); const sanReturnType = (returnType ?? "").trim(); console.error(`Tool call: create_method: Name='${sanMethName}', Return='${sanReturnType}', ParentPOU='${sanParentPath}', Project='${absPath}'`); if (!sanMethName) { // **** RETURN MCP STRUCTURE **** return { content: [{ type: "text", text: `Error: Method name cannot be empty.` }], isError: true }; } try { const escProjPath = absPath.replace(/\\/g, '\\\\'); // *** DEFINE script *** let script = CREATE_METHOD_SCRIPT_TEMPLATE.replace("{PROJECT_FILE_PATH}", escProjPath); script = script.replace("{PARENT_POU_FULL_PATH}", sanParentPath); script = script.replace("{METHOD_NAME}", sanMethName); script = script.replace("{RETURN_TYPE}", sanReturnType); // *** END DEFINE script *** console.error(">>> create_method: PREPARED SCRIPT:", script.substring(0, 500) + "..."); const result = await executeCodesysScript(script, codesysExePath, codesysProfileName); console.error(">>> create_method: EXECUTION RESULT:", JSON.stringify(result)); const success = result.success && result.output.includes("SCRIPT_SUCCESS"); // **** RETURN MCP STRUCTURE **** return { content: [{ type: "text", text: success ? `Method '${sanMethName}' created under '${sanParentPath}' in ${absPath}. Project saved.` : `Failed to create method '${sanMethName}'. Output:\n${result.output}` }], isError: !success }; } catch (e: any) { console.error(`Error create_method ${sanMethName} in ${absPath}: ${e}`); // **** RETURN MCP STRUCTURE **** return { content: [{ type: "text", text: `Error: ${e.message}` }], isError: true }; } }); server.tool("compile_project", { projectFilePath: z.string().describe("Path to project.") }, async (args) => { const { projectFilePath } = args; let absPath = path.normalize(path.isAbsolute(projectFilePath) ? projectFilePath : path.join(WORKSPACE_DIR, projectFilePath)); console.error(`Tool call: compile_project: ${absPath}`); try { const escapedPath = absPath.replace(/\\/g, '\\\\'); // *** DEFINE script *** const script = COMPILE_PROJECT_SCRIPT_TEMPLATE.replace("{PROJECT_FILE_PATH}", escapedPath); // *** END DEFINE script *** const result = await executeCodesysScript(script, codesysExePath, codesysProfileName); const success = result.success && result.output.includes("SCRIPT_SUCCESS"); const hasCompileErrors = result.output.includes("Compile complete --") && !/ 0 error\(s\),/.test(result.output); let message = success ? `Compilation initiated for application in ${absPath}. Check CODESYS messages for results.` : `Failed initiating compilation for ${absPath}. Output:\n${result.output}`; let isError = !success; // Base error status on script success if (success && hasCompileErrors) { message += " WARNING: Build command reported errors in the output log."; console.warn("Compile project reported build errors in the output."); // Optionally set isError = true here if compile errors should fail the tool call isError = true; } // **** RETURN MCP STRUCTURE **** return { content: [{ type: "text", text: message }], isError: isError }; } catch (e:any) { console.error(`Error compile_project ${absPath}: ${e}`); // **** RETURN MCP STRUCTURE **** return { content: [{ type: "text", text: `Error: ${e.message}` }], isError: true }; } }); // --- End Tools --- console.error("SERVER.TS: Resources and Tools defined."); // --- End MCP Resources / Tools Definitions --- // --- Server Connection --- console.error("SERVER.TS: startServer() internal logic executing."); try { const transport = new StdioServerTransport(); console.error("SERVER.TS: Connecting MCP server via stdio..."); // No need to await connect here if startMcpServer is called by bin.ts which awaits it // await server.connect(transport); server.connect(transport); // Connect but don't await here, let bin.ts handle waiting console.error("SERVER.TS: MCP Server connection initiated via stdio."); // console.error("SERVER.TS: server.connect() promise resolved successfully."); // This log might be premature now } catch (error) { console.error("FATAL: Failed to initiate MCP server connection:", error); // Re-throw error so bin.ts can catch it throw error; // process.exit(1); } // --- End Server Connection --- } // --- End of startMcpServer function --- // --- Graceful Shutdown / Unhandled Rejection --- // These should remain at the top level, outside startMcpServer process.on('SIGINT', () => { console.error('\nSERVER.TS: SIGINT received, shutting down...'); process.exit(0); }); process.on('SIGTERM', () => { console.error('\nSERVER.TS: SIGTERM received, shutting down...'); process.exit(0); }); process.on('unhandledRejection', (reason, promise) => { console.error('SERVER.TS: Unhandled Rejection at:', promise, 'reason:', reason); }); // --- End Graceful Shutdown / Unhandled Rejection --- console.error(">>> SERVER.TS Module Parsed <<<"); // Log end of script parsing

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/johannesPettersson80/codesys-mcp-toolkit'

If you have feedback or need assistance with the MCP directory API, please join our Discord server